diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index cc82c8acee5..ac6ec7b951c 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -191,8 +191,8 @@ impl BlockingDataset { } // Set up namespace commit handler if namespace and table_id are provided - if let (Some(ns), Some(tid)) = (namespace, table_id) { - let external_store = LanceNamespaceExternalManifestStore::new(ns, tid); + if let (Some(namespace_client), Some(tid)) = (namespace, table_id) { + let external_store = LanceNamespaceExternalManifestStore::new(namespace_client, tid); let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { external_manifest_store: Arc::new(external_store), }); @@ -1436,20 +1436,21 @@ pub(crate) fn extract_namespace_info( return Ok(None); } - let namespace: Arc = if is_directory_namespace(env, namespace_obj)? { + let namespace_client: Arc = if is_directory_namespace(env, namespace_obj)? { let native_handle = get_native_namespace_handle(env, namespace_obj)?; - let ns = unsafe { &*(native_handle as *const BlockingDirectoryNamespace) }; - ns.inner.clone() + let dir_namespace_client = + unsafe { &*(native_handle as *const BlockingDirectoryNamespace) }; + dir_namespace_client.inner.clone() } else if is_rest_namespace(env, namespace_obj)? { let native_handle = get_native_namespace_handle(env, namespace_obj)?; - let ns = unsafe { &*(native_handle as *const BlockingRestNamespace) }; - ns.inner.clone() + let rest_namespace_client = unsafe { &*(native_handle as *const BlockingRestNamespace) }; + rest_namespace_client.inner.clone() } else { create_java_lance_namespace(env, namespace_obj)? }; let table_id = env.get_strings(table_id_obj)?; - Ok(Some((namespace, table_id))) + Ok(Some((namespace_client, table_id))) } #[unsafe(no_mangle)] diff --git a/java/lance-jni/src/namespace.rs b/java/lance-jni/src/namespace.rs index fa5a67437c7..aedf9385197 100644 --- a/java/lance-jni/src/namespace.rs +++ b/java/lance-jni/src/namespace.rs @@ -1610,8 +1610,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_listNamespace ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_namespaces(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_namespaces(req)) }), std::ptr::null_mut() ) @@ -1627,8 +1627,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_describeNames ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_namespace(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_namespace(req)) }), std::ptr::null_mut() ) @@ -1644,8 +1644,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_createNamespa ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.create_namespace(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.create_namespace(req)) }), std::ptr::null_mut() ) @@ -1661,8 +1661,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_dropNamespace ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.drop_namespace(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.drop_namespace(req)) }), std::ptr::null_mut() ) @@ -1678,8 +1678,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_namespaceExis ) { ok_or_throw_without_return!( env, - call_namespace_void_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.namespace_exists(req)) + call_namespace_void_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.namespace_exists(req)) }) ) } @@ -1693,8 +1693,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_listTablesNat ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_tables(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_tables(req)) }), std::ptr::null_mut() ) @@ -1710,8 +1710,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_describeTable ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_table(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_table(req)) }), std::ptr::null_mut() ) @@ -1727,8 +1727,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_registerTable ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.register_table(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.register_table(req)) }), std::ptr::null_mut() ) @@ -1744,8 +1744,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_tableExistsNa ) { ok_or_throw_without_return!( env, - call_namespace_void_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.table_exists(req)) + call_namespace_void_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.table_exists(req)) }) ) } @@ -1759,8 +1759,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_dropTableNati ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.drop_table(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.drop_table(req)) }), std::ptr::null_mut() ) @@ -1776,8 +1776,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_deregisterTab ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.deregister_table(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.deregister_table(req)) }), std::ptr::null_mut() ) @@ -1813,7 +1813,9 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_createTableNa handle, request_json, request_data, - |ns, req, data| { RT.block_on(ns.inner.create_table(req, data)) } + |namespace_client, req, data| { + RT.block_on(namespace_client.inner.create_table(req, data)) + } ), std::ptr::null_mut() ) @@ -1829,8 +1831,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_declareTableN ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.declare_table(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.declare_table(req)) }), std::ptr::null_mut() ) @@ -1852,7 +1854,9 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_insertIntoTab handle, request_json, request_data, - |ns, req, data| { RT.block_on(ns.inner.insert_into_table(req, data)) } + |namespace_client, req, data| { + RT.block_on(namespace_client.inner.insert_into_table(req, data)) + } ), std::ptr::null_mut() ) @@ -1874,7 +1878,9 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_mergeInsertIn handle, request_json, request_data, - |ns, req, data| { RT.block_on(ns.inner.merge_insert_into_table(req, data)) } + |namespace_client, req, data| { + RT.block_on(namespace_client.inner.merge_insert_into_table(req, data)) + } ), std::ptr::null_mut() ) @@ -1890,8 +1896,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_updateTableNa ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.update_table(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.update_table(req)) }), std::ptr::null_mut() ) @@ -1907,8 +1913,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_deleteFromTab ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.delete_from_table(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.delete_from_table(req)) }), std::ptr::null_mut() ) @@ -1939,8 +1945,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_createTableIn ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.create_table_index(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.create_table_index(req)) }), std::ptr::null_mut() ) @@ -1956,8 +1962,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_listTableIndi ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_table_indices(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_table_indices(req)) }), std::ptr::null_mut() ) @@ -1973,8 +1979,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_describeTable ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_table_index_stats(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_table_index_stats(req)) }), std::ptr::null_mut() ) @@ -1990,8 +1996,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_describeTrans ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_transaction(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_transaction(req)) }), std::ptr::null_mut() ) @@ -2007,8 +2013,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_alterTransact ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.alter_transaction(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.alter_transaction(req)) }), std::ptr::null_mut() ) @@ -2024,8 +2030,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_listTableVers ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_table_versions(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_table_versions(req)) }), std::ptr::null_mut() ) @@ -2041,8 +2047,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_createTableVe ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.create_table_version(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.create_table_version(req)) }), std::ptr::null_mut() ) @@ -2058,8 +2064,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_describeTable ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_table_version(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_table_version(req)) }), std::ptr::null_mut() ) @@ -2075,8 +2081,8 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_batchDeleteTa ) -> jstring { ok_or_throw_with_return!( env, - call_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.batch_delete_table_versions(req)) + call_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.batch_delete_table_versions(req)) }), std::ptr::null_mut() ) @@ -2183,8 +2189,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_listNamespacesNati ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_namespaces(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_namespaces(req)) }), std::ptr::null_mut() ) @@ -2200,8 +2206,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_describeNamespaceN ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_namespace(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_namespace(req)) }), std::ptr::null_mut() ) @@ -2217,8 +2223,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_createNamespaceNat ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.create_namespace(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.create_namespace(req)) }), std::ptr::null_mut() ) @@ -2234,8 +2240,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_dropNamespaceNativ ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.drop_namespace(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.drop_namespace(req)) }), std::ptr::null_mut() ) @@ -2251,8 +2257,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_namespaceExistsNat ) { ok_or_throw_without_return!( env, - call_rest_namespace_void_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.namespace_exists(req)) + call_rest_namespace_void_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.namespace_exists(req)) }) ) } @@ -2266,8 +2272,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_listTablesNative( ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_tables(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_tables(req)) }), std::ptr::null_mut() ) @@ -2283,8 +2289,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_describeTableNativ ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_table(req)) }), std::ptr::null_mut() ) @@ -2300,8 +2306,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_registerTableNativ ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.register_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.register_table(req)) }), std::ptr::null_mut() ) @@ -2317,8 +2323,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_tableExistsNative( ) { ok_or_throw_without_return!( env, - call_rest_namespace_void_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.table_exists(req)) + call_rest_namespace_void_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.table_exists(req)) }) ) } @@ -2332,8 +2338,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_dropTableNative( ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.drop_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.drop_table(req)) }), std::ptr::null_mut() ) @@ -2349,8 +2355,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_deregisterTableNat ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.deregister_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.deregister_table(req)) }), std::ptr::null_mut() ) @@ -2386,7 +2392,9 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_createTableNative( handle, request_json, request_data, - |ns, req, data| { RT.block_on(ns.inner.create_table(req, data)) } + |namespace_client, req, data| { + RT.block_on(namespace_client.inner.create_table(req, data)) + } ), std::ptr::null_mut() ) @@ -2402,8 +2410,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_declareTableNative ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.declare_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.declare_table(req)) }), std::ptr::null_mut() ) @@ -2419,8 +2427,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_renameTableNative( ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.rename_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.rename_table(req)) }), std::ptr::null_mut() ) @@ -2442,7 +2450,9 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_insertIntoTableNat handle, request_json, request_data, - |ns, req, data| { RT.block_on(ns.inner.insert_into_table(req, data)) } + |namespace_client, req, data| { + RT.block_on(namespace_client.inner.insert_into_table(req, data)) + } ), std::ptr::null_mut() ) @@ -2464,7 +2474,9 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_mergeInsertIntoTab handle, request_json, request_data, - |ns, req, data| { RT.block_on(ns.inner.merge_insert_into_table(req, data)) } + |namespace_client, req, data| { + RT.block_on(namespace_client.inner.merge_insert_into_table(req, data)) + } ), std::ptr::null_mut() ) @@ -2480,8 +2492,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_updateTableNative( ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.update_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.update_table(req)) }), std::ptr::null_mut() ) @@ -2497,8 +2509,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_deleteFromTableNat ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.delete_from_table(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.delete_from_table(req)) }), std::ptr::null_mut() ) @@ -2529,8 +2541,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_createTableIndexNa ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.create_table_index(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.create_table_index(req)) }), std::ptr::null_mut() ) @@ -2546,8 +2558,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_listTableIndicesNa ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_table_indices(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_table_indices(req)) }), std::ptr::null_mut() ) @@ -2563,8 +2575,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_describeTableIndex ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_table_index_stats(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_table_index_stats(req)) }), std::ptr::null_mut() ) @@ -2580,8 +2592,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_describeTransactio ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_transaction(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_transaction(req)) }), std::ptr::null_mut() ) @@ -2597,8 +2609,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_alterTransactionNa ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.alter_transaction(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.alter_transaction(req)) }), std::ptr::null_mut() ) @@ -2614,8 +2626,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_listTableVersionsN ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.list_table_versions(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.list_table_versions(req)) }), std::ptr::null_mut() ) @@ -2631,8 +2643,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_createTableVersion ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.create_table_version(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.create_table_version(req)) }), std::ptr::null_mut() ) @@ -2648,8 +2660,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_describeTableVersi ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.describe_table_version(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.describe_table_version(req)) }), std::ptr::null_mut() ) @@ -2665,8 +2677,8 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_batchDeleteTableVe ) -> jstring { ok_or_throw_with_return!( env, - call_rest_namespace_method(&mut env, handle, request_json, |ns, req| { - RT.block_on(ns.inner.batch_delete_table_versions(req)) + call_rest_namespace_method(&mut env, handle, request_json, |namespace_client, req| { + RT.block_on(namespace_client.inner.batch_delete_table_versions(req)) }), std::ptr::null_mut() ) @@ -2915,27 +2927,27 @@ pub struct BlockingRestAdapter { pub extern "system" fn Java_org_lance_namespace_RestAdapter_createNative( mut env: JNIEnv, _obj: JObject, - namespace_impl: JString, + namespace_client_impl: JString, properties_map: JObject, host: JString, port: JObject, ) -> jlong { ok_or_throw_with_return!( env, - create_rest_adapter_internal(&mut env, namespace_impl, properties_map, host, port), + create_rest_adapter_internal(&mut env, namespace_client_impl, properties_map, host, port), 0 ) } fn create_rest_adapter_internal( env: &mut JNIEnv, - namespace_impl: JString, + namespace_client_impl: JString, properties_map: JObject, host: JString, port: JObject, ) -> Result { - // Get namespace implementation type - let impl_str: String = env.get_string(&namespace_impl)?.into(); + // Get namespace client implementation type + let impl_str: String = env.get_string(&namespace_client_impl)?.into(); // Convert Java HashMap to Rust HashMap let jmap = JMap::from_env(env, &properties_map)?; diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index cdc5975bbfd..f28510baf1a 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -1473,7 +1473,7 @@ fn inner_commit_to_uri<'local>( let namespace_info = extract_namespace_info(env, &namespace_obj, &table_id_obj)?; let (open_namespace, open_table_id) = match &namespace_info { - Some((ns, tid)) => (Some(ns.clone()), Some(tid.clone())), + Some((namespace_client, tid)) => (Some(namespace_client.clone()), Some(tid.clone())), None => (None, None), }; @@ -1523,8 +1523,8 @@ fn inner_commit_to_uri<'local>( } // Set namespace commit handler if provided - if let Some((ns, tid)) = namespace_info { - let external_store = LanceNamespaceExternalManifestStore::new(ns, tid); + if let Some((namespace_client, tid)) = namespace_info { + let external_store = LanceNamespaceExternalManifestStore::new(namespace_client, tid); let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { external_manifest_store: Arc::new(external_store), }); diff --git a/java/src/main/java/org/lance/CommitBuilder.java b/java/src/main/java/org/lance/CommitBuilder.java index c56f1ab5631..0ab9ea3356c 100644 --- a/java/src/main/java/org/lance/CommitBuilder.java +++ b/java/src/main/java/org/lance/CommitBuilder.java @@ -66,7 +66,7 @@ public class CommitBuilder { private Map writeParams; private StorageOptionsProvider storageOptionsProvider; - private LanceNamespace namespace; + private LanceNamespace namespaceClient; private List tableId; private boolean enableV2ManifestPaths = true; private boolean detached = false; @@ -117,12 +117,29 @@ public CommitBuilder writeParams(Map writeParams) { * * @param provider the storage options provider * @return this builder instance + * @deprecated This method is deprecated and will be removed in version 5.0.0. When using + * namespace and tableId, the storage options provider is created automatically from the + * namespace. Use writeParams for static or initial storage options. */ + @Deprecated public CommitBuilder storageOptionsProvider(StorageOptionsProvider provider) { this.storageOptionsProvider = provider; return this; } + /** + * Set the namespace for managed versioning. When set, commits are routed through the namespace's + * {@code createTableVersion} API instead of writing directly to the object store. This is + * supported for both dataset-based and URI-based commits. + * + * @param namespaceClient the LanceNamespace client instance + * @return this builder instance + */ + public CommitBuilder namespaceClient(LanceNamespace namespaceClient) { + this.namespaceClient = namespaceClient; + return this; + } + /** * Set the namespace for managed versioning. When set, commits are routed through the namespace's * {@code createTableVersion} API instead of writing directly to the object store. This is @@ -130,15 +147,20 @@ public CommitBuilder storageOptionsProvider(StorageOptionsProvider provider) { * * @param namespace the LanceNamespace instance * @return this builder instance + * @deprecated This method is deprecated and will be removed in version 5.0.0. Use {@link + * #namespaceClient(LanceNamespace)} instead. */ + @Deprecated public CommitBuilder namespace(LanceNamespace namespace) { - this.namespace = namespace; + this.namespaceClient = namespace; return this; } /** * Set the table ID for namespace-based commit handling. * + *

Must be provided together with `namespaceClient`. + * * @param tableId the table identifier (e.g., ["workspace", "table_name"]) * @return this builder instance */ @@ -253,7 +275,7 @@ public Dataset execute(Transaction transaction) { storageFormat, maxRetries, skipAutoCleanup, - namespace, + namespaceClient, tableId); result.setAllocator(dataset.allocator()); return result; @@ -266,7 +288,7 @@ public Dataset execute(Transaction transaction) { detached, enableV2ManifestPaths, storageOptionsProvider, - namespace, + namespaceClient, tableId, allocator, writeParams, diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index d9c58e9b54a..a239cb319d3 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -100,12 +100,12 @@ private Dataset() {} * .execute(); * } * - *

Example usage with namespace and empty table: + *

Example usage with namespaceClient and empty table: * *

{@code
    * Dataset dataset = Dataset.write()
    *     .schema(mySchema)
-   *     .namespace(myNamespace)
+   *     .namespaceClient(myNamespaceClient)
    *     .tableId(Arrays.asList("my_table"))
    *     .mode(WriteMode.CREATE)
    *     .execute();
@@ -239,13 +239,13 @@ private static native Dataset createWithFfiStreamAndProvider(
       Optional storageOptionsProvider,
       Optional> initialBases,
       Optional> targetBases,
-      LanceNamespace namespace,
+      LanceNamespace namespaceClient,
       List tableId);
 
   /**
    * Creates a dataset with optional namespace support for managed versioning.
    *
-   * 

When a namespace is provided, the commit handler will use the namespace's + *

When a namespaceClient is provided, the commit handler will use the namespace's * create_table_version method for version tracking. * * @param allocator buffer allocator @@ -253,7 +253,7 @@ private static native Dataset createWithFfiStreamAndProvider( * @param path dataset uri * @param params write parameters * @param storageOptionsProvider optional provider for dynamic storage options/credentials - * @param namespace optional namespace implementation for managed versioning (can be null) + * @param namespaceClient optional namespace implementation for managed versioning (can be null) * @param tableId optional table identifier within the namespace (can be null) * @return Dataset */ @@ -263,7 +263,7 @@ static Dataset create( String path, WriteParams params, StorageOptionsProvider storageOptionsProvider, - LanceNamespace namespace, + LanceNamespace namespaceClient, List tableId) { Preconditions.checkNotNull(allocator); Preconditions.checkNotNull(stream); @@ -284,7 +284,7 @@ static Dataset create( Optional.ofNullable(storageOptionsProvider), params.getInitialBases(), params.getTargetBases(), - namespace, + namespaceClient, tableId); dataset.allocator = allocator; return dataset; @@ -367,7 +367,8 @@ static Dataset open( * * @param path file path * @param options the open options - * @param namespace the LanceNamespace to use for managed versioning (null if not using namespace) + * @param namespaceClient the LanceNamespace to use for managed versioning (null if not using + * namespace) * @param tableId table identifier (null if not using namespace) * @return Dataset */ @@ -377,7 +378,7 @@ static Dataset open( String path, ReadOptions options, Session session, - LanceNamespace namespace, + LanceNamespace namespaceClient, List tableId) { Preconditions.checkNotNull(path); Preconditions.checkNotNull(allocator); @@ -400,7 +401,7 @@ static Dataset open( options.getSerializedManifest(), options.getStorageOptionsProvider(), sessionHandle, - namespace, + namespaceClient, tableId); dataset.allocator = allocator; dataset.selfManagedAllocator = selfManagedAllocator; @@ -423,7 +424,7 @@ private static native Dataset openNative( Optional serializedManifest, Optional storageOptionsProvider, long sessionHandle, - LanceNamespace namespace, + LanceNamespace namespaceClient, List tableId); /** @@ -440,11 +441,11 @@ private static native Dataset openNative( * .build(); * }

* - *

Example usage with namespace: + *

Example usage with namespaceClient: * *

{@code
    * Dataset dataset = Dataset.open()
-   *     .namespace(myNamespace)
+   *     .namespaceClient(myNamespaceClient)
    *     .tableId(Arrays.asList("my_table"))
    *     .build();
    * }
diff --git a/java/src/main/java/org/lance/OpenDatasetBuilder.java b/java/src/main/java/org/lance/OpenDatasetBuilder.java index 85bc19eac6e..38b15eed919 100644 --- a/java/src/main/java/org/lance/OpenDatasetBuilder.java +++ b/java/src/main/java/org/lance/OpenDatasetBuilder.java @@ -55,7 +55,7 @@ public class OpenDatasetBuilder { private BufferAllocator allocator; private boolean selfManagedAllocator = false; private String uri; - private LanceNamespace namespace; + private LanceNamespace namespaceClient; private List tableId; private ReadOptions options = new ReadOptions.Builder().build(); private Session session; @@ -89,6 +89,20 @@ public OpenDatasetBuilder uri(String uri) { return this; } + /** + * Sets the namespace client. + * + *

Must be used together with tableId(). Either uri() or namespaceClient()+tableId() must be + * specified, but not both. + * + * @param namespaceClient The namespace implementation to fetch table info from + * @return this builder instance + */ + public OpenDatasetBuilder namespaceClient(LanceNamespace namespaceClient) { + this.namespaceClient = namespaceClient; + return this; + } + /** * Sets the namespace. * @@ -97,9 +111,12 @@ public OpenDatasetBuilder uri(String uri) { * * @param namespace The namespace implementation to fetch table info from * @return this builder instance + * @deprecated This method is deprecated and will be removed in version 5.0.0. Use {@link + * #namespaceClient(LanceNamespace)} instead. */ + @Deprecated public OpenDatasetBuilder namespace(LanceNamespace namespace) { - this.namespace = namespace; + this.namespaceClient = namespace; return this; } @@ -156,25 +173,26 @@ public OpenDatasetBuilder session(Session session) { * @throws IllegalArgumentException if required parameters are missing or invalid */ public Dataset build() { - // Validate that exactly one of uri or namespace+tableId is provided + // Validate that exactly one of uri or namespaceClient+tableId is provided boolean hasUri = uri != null; - boolean hasNamespace = namespace != null && tableId != null; + boolean hasNamespaceClient = namespaceClient != null && tableId != null; - if (hasUri && hasNamespace) { + if (hasUri && hasNamespaceClient) { throw new IllegalArgumentException( - "Cannot specify both uri and namespace+tableId. Use one or the other."); + "Cannot specify both uri and namespaceClient+tableId. Use one or the other."); } - if (!hasUri && !hasNamespace) { - if (namespace != null) { + if (!hasUri && !hasNamespaceClient) { + if (namespaceClient != null) { throw new IllegalArgumentException( - "namespace is set but tableId is missing. Both namespace and tableId must be" - + " provided together."); + "namespaceClient is set but tableId is missing. Both namespaceClient and tableId must" + + " be provided together."); } else if (tableId != null) { throw new IllegalArgumentException( - "tableId is set but namespace is missing. Both namespace and tableId must be" - + " provided together."); + "tableId is set but namespaceClient is missing. Both namespaceClient and tableId must" + + " be provided together."); } else { - throw new IllegalArgumentException("Either uri or namespace+tableId must be provided."); + throw new IllegalArgumentException( + "Either uri or namespaceClient+tableId must be provided."); } } @@ -187,22 +205,22 @@ public Dataset build() { } // Handle namespace-based opening - if (hasNamespace) { - return buildFromNamespace(); + if (hasNamespaceClient) { + return buildFromNamespaceClient(); } // Handle URI-based opening return Dataset.open(allocator, selfManagedAllocator, uri, options, session); } - private Dataset buildFromNamespace() { + private Dataset buildFromNamespaceClient() { // Call describe_table to get location and storage options DescribeTableRequest request = new DescribeTableRequest(); request.setId(tableId); // Only set version if present options.getVersion().ifPresent(v -> request.setVersion(Long.valueOf(v))); - DescribeTableResponse response = namespace.describeTable(request); + DescribeTableResponse response = namespaceClient.describeTable(request); String location = response.getLocation(); if (location == null || location.isEmpty()) { @@ -219,9 +237,11 @@ private Dataset buildFromNamespace() { .setIndexCacheSizeBytes(options.getIndexCacheSizeBytes()) .setMetadataCacheSizeBytes(options.getMetadataCacheSizeBytes()); - if (namespaceStorageOptions != null && !namespaceStorageOptions.isEmpty()) { + // null = namespace doesn't vend credentials (don't create provider) + // empty {} or non-empty = namespace vends credentials (create provider) + if (namespaceStorageOptions != null) { LanceNamespaceStorageOptionsProvider storageOptionsProvider = - new LanceNamespaceStorageOptionsProvider(namespace, tableId); + new LanceNamespaceStorageOptionsProvider(namespaceClient, tableId); optionsBuilder.setStorageOptionsProvider(storageOptionsProvider); } @@ -235,7 +255,7 @@ private Dataset buildFromNamespace() { } optionsBuilder.setStorageOptions(storageOptions); - // If managed_versioning is true, pass namespace for commit handler setup + // If managed_versioning is true, pass namespaceClient for commit handler setup if (Boolean.TRUE.equals(managedVersioning)) { return Dataset.open( allocator, @@ -243,7 +263,7 @@ private Dataset buildFromNamespace() { location, optionsBuilder.build(), session, - namespace, + namespaceClient, tableId); } diff --git a/java/src/main/java/org/lance/ReadOptions.java b/java/src/main/java/org/lance/ReadOptions.java index b9a244c55a5..cd50f0767b0 100644 --- a/java/src/main/java/org/lance/ReadOptions.java +++ b/java/src/main/java/org/lance/ReadOptions.java @@ -220,7 +220,12 @@ public Builder setSerializedManifest(ByteBuffer serializedManifest) { * * @param storageOptionsProvider the storage options provider implementation * @return this builder + * @deprecated This method is deprecated and will be removed in version 5.0.0. When using + * namespace and tableId via {@link OpenDatasetBuilder}, the storage options provider is + * created automatically from the namespace. Use {@link #setStorageOptions(Map)} for static + * or initial storage options. */ + @Deprecated public Builder setStorageOptionsProvider(StorageOptionsProvider storageOptionsProvider) { this.storageOptionsProvider = Optional.of(storageOptionsProvider); return this; diff --git a/java/src/main/java/org/lance/WriteDatasetBuilder.java b/java/src/main/java/org/lance/WriteDatasetBuilder.java index a95561acb2b..b555bf03da8 100644 --- a/java/src/main/java/org/lance/WriteDatasetBuilder.java +++ b/java/src/main/java/org/lance/WriteDatasetBuilder.java @@ -67,7 +67,7 @@ public class WriteDatasetBuilder { private ArrowReader reader; private ArrowArrayStream stream; private String uri; - private LanceNamespace namespace; + private LanceNamespace namespaceClient; private List tableId; private WriteParams.WriteMode mode = WriteParams.WriteMode.CREATE; private Schema schema; @@ -142,6 +142,20 @@ public WriteDatasetBuilder uri(String uri) { return this; } + /** + * Sets the namespace client. + * + *

Must be used together with tableId(). Either uri() or namespaceClient()+tableId() must be + * specified, but not both. + * + * @param namespaceClient The namespace implementation to use for table operations + * @return this builder instance + */ + public WriteDatasetBuilder namespaceClient(LanceNamespace namespaceClient) { + this.namespaceClient = namespaceClient; + return this; + } + /** * Sets the namespace. * @@ -150,9 +164,12 @@ public WriteDatasetBuilder uri(String uri) { * * @param namespace The namespace implementation to use for table operations * @return this builder instance + * @deprecated This method is deprecated and will be removed in version 5.0.0. Use {@link + * #namespaceClient(LanceNamespace)} instead. */ + @Deprecated public WriteDatasetBuilder namespace(LanceNamespace namespace) { - this.namespace = namespace; + this.namespaceClient = namespace; return this; } @@ -313,23 +330,24 @@ public Dataset execute() { allocator = new RootAllocator(Long.MAX_VALUE); } - // Validate that exactly one of uri or namespace is provided + // Validate that exactly one of uri or namespaceClient is provided boolean hasUri = uri != null; - boolean hasNamespace = namespace != null && tableId != null; + boolean hasNamespaceClient = namespaceClient != null && tableId != null; - if (hasUri && hasNamespace) { + if (hasUri && hasNamespaceClient) { throw new IllegalArgumentException( - "Cannot specify both uri() and namespace()+tableId(). Use one or the other."); + "Cannot specify both uri() and namespaceClient()+tableId(). Use one or the other."); } - if (!hasUri && !hasNamespace) { - if (namespace != null) { + if (!hasUri && !hasNamespaceClient) { + if (namespaceClient != null) { throw new IllegalArgumentException( - "namespace() is set but tableId() is missing. Both must be provided together."); + "namespaceClient() is set but tableId() is missing. Both must be provided together."); } else if (tableId != null) { throw new IllegalArgumentException( - "tableId() is set but namespace() is missing. Both must be provided together."); + "tableId() is set but namespaceClient() is missing. Both must be provided together."); } else { - throw new IllegalArgumentException("Either uri() or namespace()+tableId() must be called."); + throw new IllegalArgumentException( + "Either uri() or namespaceClient()+tableId() must be called."); } } @@ -350,15 +368,15 @@ public Dataset execute() { } // Handle namespace-based writing - if (hasNamespace) { - return executeWithNamespace(); + if (hasNamespaceClient) { + return executeWithNamespaceClient(); } // Handle URI-based writing return executeWithUri(); } - private Dataset executeWithNamespace() { + private Dataset executeWithNamespaceClient() { String tableUri; Map namespaceStorageOptions = null; boolean managedVersioning = false; @@ -367,7 +385,7 @@ private Dataset executeWithNamespace() { if (mode == WriteParams.WriteMode.CREATE) { DeclareTableRequest declareRequest = new DeclareTableRequest(); declareRequest.setId(tableId); - DeclareTableResponse declareResponse = namespace.declareTable(declareRequest); + DeclareTableResponse declareResponse = namespaceClient.declareTable(declareRequest); tableUri = declareResponse.getLocation(); if (tableUri == null || tableUri.isEmpty()) { @@ -382,7 +400,7 @@ private Dataset executeWithNamespace() { DescribeTableRequest request = new DescribeTableRequest(); request.setId(tableId); - DescribeTableResponse response = namespace.describeTable(request); + DescribeTableResponse response = namespaceClient.describeTable(request); tableUri = response.getLocation(); if (tableUri == null || tableUri.isEmpty()) { @@ -415,15 +433,17 @@ private Dataset executeWithNamespace() { WriteParams params = paramsBuilder.build(); // Create storage options provider for credential refresh during long-running writes + // null = namespace doesn't vend credentials (don't create provider) + // empty {} or non-empty = namespace vends credentials (create provider) StorageOptionsProvider storageOptionsProvider = - ignoreNamespaceStorageOptions + (ignoreNamespaceStorageOptions || namespaceStorageOptions == null) ? null - : new LanceNamespaceStorageOptionsProvider(namespace, tableId); + : new LanceNamespaceStorageOptionsProvider(namespaceClient, tableId); - // Only use namespace for commit handling if managedVersioning is enabled + // Only use namespaceClient for commit handling if managedVersioning is enabled if (managedVersioning) { - return createDatasetWithStreamAndNamespace( - tableUri, params, storageOptionsProvider, namespace, tableId); + return createDatasetWithStreamAndNamespaceClient( + tableUri, params, storageOptionsProvider, namespaceClient, tableId); } else { return createDatasetWithStream(tableUri, params, storageOptionsProvider); } @@ -469,16 +489,16 @@ private Dataset createDatasetWithStream( throw new IllegalStateException("No data source provided"); } - private Dataset createDatasetWithStreamAndNamespace( + private Dataset createDatasetWithStreamAndNamespaceClient( String path, WriteParams params, StorageOptionsProvider storageOptionsProvider, - LanceNamespace namespace, + LanceNamespace namespaceClient, List tableId) { // If stream is directly provided, use it if (stream != null) { return Dataset.create( - allocator, stream, path, params, storageOptionsProvider, namespace, tableId); + allocator, stream, path, params, storageOptionsProvider, namespaceClient, tableId); } // If reader is provided, convert to stream @@ -486,7 +506,7 @@ private Dataset createDatasetWithStreamAndNamespace( try (ArrowArrayStream tempStream = ArrowArrayStream.allocateNew(allocator)) { Data.exportArrayStream(allocator, reader, tempStream); return Dataset.create( - allocator, tempStream, path, params, storageOptionsProvider, namespace, tableId); + allocator, tempStream, path, params, storageOptionsProvider, namespaceClient, tableId); } } diff --git a/java/src/main/java/org/lance/WriteFragmentBuilder.java b/java/src/main/java/org/lance/WriteFragmentBuilder.java index 42ccf2d8dd3..3025a08903e 100644 --- a/java/src/main/java/org/lance/WriteFragmentBuilder.java +++ b/java/src/main/java/org/lance/WriteFragmentBuilder.java @@ -14,6 +14,8 @@ package org.lance; import org.lance.io.StorageOptionsProvider; +import org.lance.namespace.LanceNamespace; +import org.lance.namespace.LanceNamespaceStorageOptionsProvider; import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.memory.BufferAllocator; @@ -48,6 +50,8 @@ public class WriteFragmentBuilder { private WriteParams writeParams; private WriteParams.Builder writeParamsBuilder; private StorageOptionsProvider storageOptionsProvider; + private LanceNamespace namespaceClient; + private List tableId; WriteFragmentBuilder() {} @@ -127,12 +131,44 @@ public WriteFragmentBuilder storageOptions(Map storageOptions) { * * @param provider the storage options provider * @return this builder + * @deprecated This method is deprecated and will be removed in version 5.0.0. When using + * namespace-backed datasets, the storage options provider is created automatically from the + * namespace. Use storageOptions for static or initial storage options. */ + @Deprecated public WriteFragmentBuilder storageOptionsProvider(StorageOptionsProvider provider) { this.storageOptionsProvider = provider; return this; } + /** + * Set the namespace client for automatic credential refresh. + * + *

When provided with `tableId`, a storage options provider will be created automatically to + * refresh credentials via the namespace. Must be provided together with `tableId`. The caller + * should provide initial/merged storage options via the `storageOptions` method. + * + * @param namespaceClient the LanceNamespace client instance + * @return this builder + */ + public WriteFragmentBuilder namespaceClient(LanceNamespace namespaceClient) { + this.namespaceClient = namespaceClient; + return this; + } + + /** + * Set the table ID for namespace-based credential refresh. + * + *

Must be provided together with `namespaceClient`. + * + * @param tableId the table identifier (e.g., ["workspace", "table_name"]) + * @return this builder + */ + public WriteFragmentBuilder tableId(List tableId) { + this.tableId = tableId; + return this; + } + /** * Set the maximum number of rows per file. * @@ -213,15 +249,20 @@ public WriteFragmentBuilder dataStorageVersion(String version) { public List execute() { validate(); - // Build the write params if builder was used + // Create storage options provider from namespaceClient + tableId if not already set + StorageOptionsProvider effectiveProvider = storageOptionsProvider; + if (namespaceClient != null && effectiveProvider == null) { + effectiveProvider = new LanceNamespaceStorageOptionsProvider(namespaceClient, tableId); + } + + // Build the write params WriteParams finalWriteParams = buildWriteParams(); if (vectorSchemaRoot != null) { return Fragment.create( - datasetUri, allocator, vectorSchemaRoot, finalWriteParams, storageOptionsProvider); + datasetUri, allocator, vectorSchemaRoot, finalWriteParams, effectiveProvider); } else { - return Fragment.create( - datasetUri, arrowArrayStream, finalWriteParams, storageOptionsProvider); + return Fragment.create(datasetUri, arrowArrayStream, finalWriteParams, effectiveProvider); } } @@ -255,5 +296,9 @@ private void validate() { Preconditions.checkState( writeParams == null || writeParamsBuilder == null, "Cannot use both writeParams() and individual parameter methods"); + Preconditions.checkState( + (namespaceClient == null && tableId == null) + || (namespaceClient != null && tableId != null), + "Both 'namespaceClient' and 'tableId' must be provided together"); } } diff --git a/java/src/main/java/org/lance/io/StorageOptionsProvider.java b/java/src/main/java/org/lance/io/StorageOptionsProvider.java index 3876ca1fb06..3955e583077 100644 --- a/java/src/main/java/org/lance/io/StorageOptionsProvider.java +++ b/java/src/main/java/org/lance/io/StorageOptionsProvider.java @@ -59,7 +59,12 @@ *

If fetchStorageOptions() throws an exception, operations requiring credentials will fail. * Implementations should handle recoverable errors internally (e.g., retry token refresh) and only * throw exceptions for unrecoverable errors. + * + * @deprecated This interface is deprecated and will be removed in version 5.0.0. When using + * namespaceClient and tableId parameters in dataset operations, credential refresh is handled + * automatically. Pass storageOptions for static credentials instead. */ +@Deprecated public interface StorageOptionsProvider { /** diff --git a/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java b/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java index fb65e235c36..9416a673c3f 100644 --- a/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java +++ b/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java @@ -53,23 +53,44 @@ * .build() * ); * } + * + * @deprecated This class is deprecated and will be removed in version 5.0.0. When using + * namespaceClient and tableId parameters in dataset operations, credential refresh is handled + * automatically. You no longer need to create this provider manually. */ +@Deprecated +@SuppressWarnings("deprecation") public class LanceNamespaceStorageOptionsProvider implements StorageOptionsProvider { - private final LanceNamespace namespace; + private final LanceNamespace namespaceClient; private final List tableId; /** * Create a storage options provider that fetches storage options from a LanceNamespace. * - * @param namespace The namespace instance to fetch storage options from + * @param namespaceClient The namespace instance to fetch storage options from * @param tableId The table identifier (e.g., ["workspace", "table_name"]) */ - public LanceNamespaceStorageOptionsProvider(LanceNamespace namespace, List tableId) { - this.namespace = namespace; + public LanceNamespaceStorageOptionsProvider( + LanceNamespace namespaceClient, List tableId) { + this.namespaceClient = namespaceClient; this.tableId = tableId; } + /** + * Create a storage options provider that fetches storage options from a LanceNamespace. + * + * @param namespace The namespace instance to fetch storage options from + * @param tableId The table identifier (e.g., ["workspace", "table_name"]) + * @deprecated This constructor is deprecated and will be removed in version 5.0.0. Use {@link + * #LanceNamespaceStorageOptionsProvider(LanceNamespace, List)} with namespaceClient instead. + */ + @Deprecated + public static LanceNamespaceStorageOptionsProvider withNamespace( + LanceNamespace namespace, List tableId) { + return new LanceNamespaceStorageOptionsProvider(namespace, tableId); + } + /** * Fetch credentials from the namespace. * @@ -87,8 +108,8 @@ public Map fetchStorageOptions() { DescribeTableRequest request = new DescribeTableRequest(); request.setId(tableId); - // Call namespace to describe the table and get credentials - DescribeTableResponse response = namespace.describeTable(request); + // Call namespaceClient to describe the table and get credentials + DescribeTableResponse response = namespaceClient.describeTable(request); // Extract storage options - should already be a flat Map Map storageOptions = response.getStorageOptions(); @@ -114,8 +135,8 @@ public Map fetchStorageOptions() { */ @Override public String providerId() { - // Call namespaceId() on the namespace (requires lance-namespace >= 0.0.20) - String namespaceId = namespace.namespaceId(); + // Call namespaceId() on the namespaceClient (requires lance-namespace >= 0.0.20) + String namespaceId = namespaceClient.namespaceId(); return String.format( "LanceNamespaceStorageOptionsProvider { namespace: %s, table_id: %s }", namespaceId, tableId); diff --git a/java/src/main/java/org/lance/namespace/RestAdapter.java b/java/src/main/java/org/lance/namespace/RestAdapter.java index 534a7eabb9e..13c6e695357 100644 --- a/java/src/main/java/org/lance/namespace/RestAdapter.java +++ b/java/src/main/java/org/lance/namespace/RestAdapter.java @@ -58,16 +58,17 @@ public class RestAdapter implements Closeable, AutoCloseable { /** * Creates a new REST adapter with the given backend namespace. * - * @param namespaceImpl The namespace implementation type (e.g., "dir" for DirectoryNamespace) - * @param backendConfig Configuration properties for the backend namespace + * @param namespaceClientImpl The namespace client implementation type (e.g., "dir" for + * DirectoryNamespace) + * @param backendConfig Configuration properties for the backend namespace client * @param host Host to bind the server to, or null for default (127.0.0.1) * @param port Port to bind the server to. Use 0 to let the OS assign an available port, or null * for default (2333). */ public RestAdapter( - String namespaceImpl, Map backendConfig, String host, Integer port) { - if (namespaceImpl == null || namespaceImpl.isEmpty()) { - throw new IllegalArgumentException("namespace implementation cannot be null or empty"); + String namespaceClientImpl, Map backendConfig, String host, Integer port) { + if (namespaceClientImpl == null || namespaceClientImpl.isEmpty()) { + throw new IllegalArgumentException("namespace client implementation cannot be null or empty"); } if (backendConfig == null) { throw new IllegalArgumentException("backend config cannot be null"); @@ -76,7 +77,34 @@ public RestAdapter( throw new IllegalArgumentException("port must be between 0 and 65535"); } - this.nativeRestAdapterHandle = createNative(namespaceImpl, backendConfig, host, port); + this.nativeRestAdapterHandle = createNative(namespaceClientImpl, backendConfig, host, port); + } + + /** + * Creates a new REST adapter with default host and port. + * + * @param namespaceClientImpl The namespace client implementation type + * @param backendConfig Configuration properties for the backend namespace client + */ + public RestAdapter(String namespaceClientImpl, Map backendConfig) { + this(namespaceClientImpl, backendConfig, null, null); + } + + /** + * Creates a new REST adapter with the given backend namespace. + * + * @param namespaceImpl The namespace implementation type (e.g., "dir" for DirectoryNamespace) + * @param backendConfig Configuration properties for the backend namespace + * @param host Host to bind the server to, or null for default (127.0.0.1) + * @param port Port to bind the server to. Use 0 to let the OS assign an available port, or null + * for default (2333). + * @deprecated Use {@link #RestAdapter(String, Map, String, Integer)} with namespaceClientImpl + * parameter instead. Will be removed in version 5.0.0. + */ + @Deprecated + public static RestAdapter createWithDeprecatedParams( + String namespaceImpl, Map backendConfig, String host, Integer port) { + return new RestAdapter(namespaceImpl, backendConfig, host, port); } /** @@ -84,9 +112,13 @@ public RestAdapter( * * @param namespaceImpl The namespace implementation type * @param backendConfig Configuration properties for the backend namespace + * @deprecated Use {@link #RestAdapter(String, Map)} with namespaceClientImpl parameter instead. + * Will be removed in version 5.0.0. */ - public RestAdapter(String namespaceImpl, Map backendConfig) { - this(namespaceImpl, backendConfig, null, null); + @Deprecated + public static RestAdapter createWithDeprecatedParams( + String namespaceImpl, Map backendConfig) { + return new RestAdapter(namespaceImpl, backendConfig, null, null); } /** @@ -144,7 +176,7 @@ public void close() { // Native methods private native long createNative( - String namespaceImpl, Map backendConfig, String host, Integer port); + String namespaceClientImpl, Map backendConfig, String host, Integer port); private native void start(long handle); diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index 95bacfc3091..8a8d5d322da 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -101,9 +101,11 @@ def dataset( index_cache_size_bytes: Optional[int] = None, read_params: Optional[Dict[str, any]] = None, session: Optional[Session] = None, - namespace: Optional[LanceNamespace] = None, + namespace_client: Optional[LanceNamespace] = None, table_id: Optional[List[str]] = None, storage_options_provider: Optional[Any] = None, + # Deprecated parameters + namespace: Optional[LanceNamespace] = None, ) -> LanceDataset: """ Opens the Lance dataset from the address specified. @@ -163,54 +165,81 @@ def dataset( session : optional, lance.Session A session to use for this dataset. This contains the caches used by the across multiple datasets. - namespace : optional, LanceNamespace - A namespace instance from which to fetch table location and storage options. + namespace_client : optional, LanceNamespace + A namespace client from which to fetch table location and storage options. Use lance.namespace.connect() to create a namespace instance. Must be provided together with `table_id`. Cannot be used with `uri`. When provided, the table location will be fetched automatically from the namespace via describe_table(). table_id : optional, List[str] The table identifier when using a namespace (e.g., ["my_table"]). - Must be provided together with `namespace`. Cannot be used with `uri`. + Must be provided together with `namespace_client`. Cannot be used with `uri`. storage_options_provider : optional - A storage options provider for automatic credential refresh. Must implement - `fetch_storage_options()` method that returns a dict of storage options. - If provided along with `namespace`, this takes precedence over the - namespace-created provider. + .. deprecated:: 4.0.0 + This parameter is deprecated and will be removed in version 5.0.0. + When using `namespace_client` and `table_id`, the storage options provider + is created automatically from the namespace. Pass `storage_options` + instead for static or initial storage options. + namespace : optional, LanceNamespace + .. deprecated:: 4.0.0 + This parameter is deprecated and will be removed in version 5.0.0. + Use `namespace_client` instead. Notes ----- - When using `namespace` and `table_id`: + When using `namespace_client` and `table_id`: - The `uri` parameter is optional and will be fetched from the namespace - Storage options from describe_table() will be used automatically - A dynamic storage options provider will be created to refresh credentials - Initial storage options from describe_table() will be merged with any provided `storage_options` """ - # Validate that user provides either uri OR (namespace + table_id), not both + # Emit deprecation warning for storage_options_provider + if storage_options_provider is not None: + warnings.warn( + "The 'storage_options_provider' parameter is deprecated and will be " + "removed in version 5.0.0. When using 'namespace_client' and " + "'table_id', the storage options provider is created automatically. " + "Pass 'storage_options' instead for static or initial storage options.", + DeprecationWarning, + stacklevel=2, + ) + + # Handle deprecated 'namespace' parameter + if namespace is not None: + warnings.warn( + "The 'namespace' parameter is deprecated and will be removed in version " + "5.0.0. Use 'namespace_client' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client is None: + namespace_client = namespace + + # Validate that user provides either uri OR (namespace_client + table_id), not both has_uri = uri is not None - has_namespace = namespace is not None or table_id is not None + has_namespace = namespace_client is not None or table_id is not None if has_uri and has_namespace: raise ValueError( - "Cannot specify both 'uri' and 'namespace/table_id'. " - "Please provide either 'uri' or both 'namespace' and 'table_id'." + "Cannot specify both 'uri' and 'namespace_client/table_id'. " + "Please provide either 'uri' or both 'namespace_client' and 'table_id'." ) elif not has_uri and not has_namespace: raise ValueError( - "Must specify either 'uri' or both 'namespace' and 'table_id'." + "Must specify either 'uri' or both 'namespace_client' and 'table_id'." ) # Handle namespace resolution in Python managed_versioning = False - if namespace is not None: + if namespace_client is not None: if table_id is None: raise ValueError( - "Both 'namespace' and 'table_id' must be provided together." + "Both 'namespace_client' and 'table_id' must be provided together." ) request = DescribeTableRequest(id=table_id, version=version) - response = namespace.describe_table(request) + response = namespace_client.describe_table(request) uri = response.location if uri is None: @@ -221,10 +250,12 @@ def dataset( namespace_storage_options = response.storage_options - if namespace_storage_options: + # None = namespace doesn't vend credentials (don't create provider) + # {} or {...} = namespace vends credentials (create provider) + if namespace_storage_options is not None: if storage_options_provider is None: storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id + namespace_client=namespace_client, table_id=table_id ) if storage_options is None: storage_options = namespace_storage_options @@ -233,7 +264,9 @@ def dataset( merged_options.update(namespace_storage_options) storage_options = merged_options elif table_id is not None: - raise ValueError("Both 'namespace' and 'table_id' must be provided together.") + raise ValueError( + "Both 'namespace_client' and 'table_id' must be provided together." + ) ds = LanceDataset( uri, @@ -248,7 +281,7 @@ def dataset( read_params=read_params, session=session, storage_options_provider=storage_options_provider, - namespace=namespace if managed_versioning else None, + namespace_client=namespace_client if managed_versioning else None, table_id=table_id if managed_versioning else None, ) if version is None and asof is not None: diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7496746285a..a0c52f27a32 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -431,14 +431,38 @@ def __init__( read_params: Optional[Dict[str, Any]] = None, session: Optional[Session] = None, storage_options_provider: Optional[Any] = None, - namespace: Optional[Any] = None, + namespace_client: Optional[Any] = None, table_id: Optional[List[str]] = None, + # Deprecated parameters + namespace: Optional[Any] = None, ): uri = os.fspath(uri) if isinstance(uri, Path) else uri self._uri = uri self._storage_options = storage_options self._storage_options_provider = storage_options_provider + # Handle deprecation warning for storage_options_provider + if storage_options_provider is not None: + warnings.warn( + "The 'storage_options_provider' parameter is deprecated and will be " + "removed in version 5.0.0. When using 'namespace_client' and " + "'table_id', the storage options provider is created automatically. " + "Pass 'storage_options' instead for static or initial storage options.", + DeprecationWarning, + stacklevel=2, + ) + + # Handle deprecated 'namespace' parameter + if namespace is not None: + warnings.warn( + "The 'namespace' parameter is deprecated and will be removed in " + "version 5.0.0. Use 'namespace_client' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client is None: + namespace_client = namespace + # Handle deprecation warning for index_cache_size if index_cache_size is not None: warnings.warn( @@ -463,7 +487,7 @@ def __init__( read_params=read_params, session=session, storage_options_provider=storage_options_provider, - namespace=namespace, + namespace=namespace_client, table_id=table_id, ) self._default_scan_options = default_scan_options @@ -3605,8 +3629,10 @@ def commit( *, commit_message: Optional[str] = None, enable_stable_row_ids: Optional[bool] = None, - namespace: Optional["LanceNamespace"] = None, + namespace_client: Optional["LanceNamespace"] = None, table_id: Optional[List[str]] = None, + # Deprecated parameters + namespace: Optional["LanceNamespace"] = None, ) -> LanceDataset: """Create a new version of dataset @@ -3673,12 +3699,15 @@ def commit( row IDs assign each row a monotonically increasing id that persists across compaction and other maintenance operations. This option is ignored for existing datasets. - namespace : LanceNamespace, optional - A namespace instance. Must be provided together with table_id. + namespace_client : LanceNamespace, optional + A namespace client. Must be provided together with table_id. Use lance.namespace.connect() to create a namespace. table_id : List[str], optional The table identifier within the namespace (e.g., ["workspace", "table"]). - Must be provided together with namespace. + Must be provided together with namespace_client. + namespace : LanceNamespace, optional + .. deprecated:: 4.0.0 + Use `namespace_client` instead. Will be removed in version 5.0.0. Returns ------- @@ -3706,6 +3735,28 @@ def commit( 2 3 c 3 4 d """ + # Emit deprecation warning for storage_options_provider + if storage_options_provider is not None: + warnings.warn( + "The 'storage_options_provider' parameter is deprecated and will be " + "removed in version 5.0.0. When using 'namespace_client' and " + "'table_id', the storage options provider is created automatically. " + "Pass 'storage_options' instead for static or initial storage options.", + DeprecationWarning, + stacklevel=2, + ) + + # Handle deprecated 'namespace' parameter + if namespace is not None: + warnings.warn( + "The 'namespace' parameter is deprecated and will be removed in " + "version 5.0.0. Use 'namespace_client' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client is None: + namespace_client = namespace + if isinstance(base_uri, Path): base_uri = str(base_uri) elif isinstance(base_uri, LanceDataset): @@ -3749,7 +3800,7 @@ def commit( detached=detached, max_retries=max_retries, enable_stable_row_ids=enable_stable_row_ids, - namespace=namespace, + namespace=namespace_client, table_id=table_id, ) elif isinstance(operation, LanceOperation.BaseOperation): @@ -3765,7 +3816,7 @@ def commit( max_retries=max_retries, commit_message=commit_message, enable_stable_row_ids=enable_stable_row_ids, - namespace=namespace, + namespace=namespace_client, table_id=table_id, ) else: @@ -3846,6 +3897,17 @@ def commit_batch( merged: Transaction The merged transaction that was applied to the dataset. """ + # Emit deprecation warning for storage_options_provider + if storage_options_provider is not None: + warnings.warn( + "The 'storage_options_provider' parameter is deprecated and will be " + "removed in version 5.0.0. When using 'namespace_client' and " + "'table_id', the storage options provider is created automatically. " + "Pass 'storage_options' instead for static or initial storage options.", + DeprecationWarning, + stacklevel=2, + ) + if isinstance(dest, Path): dest = str(dest) elif isinstance(dest, LanceDataset): @@ -5933,8 +5995,10 @@ def write_dataset( initial_bases: Optional[List[DatasetBasePath]] = None, target_bases: Optional[List[str]] = None, allow_external_blob_outside_bases: bool = False, - namespace: Optional[LanceNamespace] = None, + namespace_client: Optional[LanceNamespace] = None, table_id: Optional[List[str]] = None, + # Deprecated parameters + namespace: Optional[LanceNamespace] = None, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -6030,19 +6094,22 @@ def write_dataset( allow_external_blob_outside_bases: bool, default False If False, external blob URIs must map to the dataset root or a registered base path. If True, external blob URIs outside registered bases are allowed. - namespace : optional, LanceNamespace - A namespace instance from which to fetch table location and storage options. + namespace_client : optional, LanceNamespace + A namespace client from which to fetch table location and storage options. Must be provided together with `table_id`. Cannot be used with `uri`. When provided, the table location will be fetched automatically from the namespace via describe_table(). Storage options will be automatically refreshed before they expire. table_id : optional, List[str] The table identifier when using a namespace (e.g., ["my_table"]). - Must be provided together with `namespace`. Cannot be used with `uri`. + Must be provided together with `namespace_client`. Cannot be used with `uri`. + namespace : optional, LanceNamespace + .. deprecated:: 4.0.0 + Use `namespace_client` instead. Will be removed in version 5.0.0. Notes ----- - When using `namespace` and `table_id`: + When using `namespace_client` and `table_id`: - The `uri` parameter is optional and will be fetched from the namespace - Storage options from describe_table() will be used automatically - A `LanceNamespaceStorageOptionsProvider` will be created automatically for @@ -6050,25 +6117,36 @@ def write_dataset( - Initial storage options from describe_table() will be merged with any provided `storage_options` """ - # Validate that user provides either uri OR (namespace + table_id), not both + # Handle deprecated 'namespace' parameter + if namespace is not None: + warnings.warn( + "The 'namespace' parameter is deprecated and will be removed in version " + "5.0.0. Use 'namespace_client' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client is None: + namespace_client = namespace + + # Validate that user provides either uri OR (namespace_client + table_id), not both has_uri = uri is not None - has_namespace = namespace is not None or table_id is not None + has_namespace = namespace_client is not None or table_id is not None if has_uri and has_namespace: raise ValueError( - "Cannot specify both 'uri' and 'namespace/table_id'. " - "Please provide either 'uri' or both 'namespace' and 'table_id'." + "Cannot specify both 'uri' and 'namespace_client/table_id'. " + "Please provide either 'uri' or both 'namespace_client' and 'table_id'." ) elif not has_uri and not has_namespace: raise ValueError( - "Must specify either 'uri' or both 'namespace' and 'table_id'." + "Must specify either 'uri' or both 'namespace_client' and 'table_id'." ) # Handle namespace-based dataset writing - if namespace is not None: + if namespace_client is not None: if table_id is None: raise ValueError( - "Both 'namespace' and 'table_id' must be provided together." + "Both 'namespace_client' and 'table_id' must be provided together." ) # Implement write_into_namespace logic in Python @@ -6086,10 +6164,10 @@ def write_dataset( # Determine which namespace method to call based on mode if mode == "create": declare_request = DeclareTableRequest(id=table_id, location=None) - response = namespace.declare_table(declare_request) + response = namespace_client.declare_table(declare_request) elif mode in ("append", "overwrite"): request = DescribeTableRequest(id=table_id, version=None) - response = namespace.describe_table(request) + response = namespace_client.describe_table(request) else: raise ValueError(f"Invalid mode: {mode}") @@ -6107,10 +6185,12 @@ def write_dataset( namespace_storage_options = response.storage_options # Set up storage options and provider - if namespace_storage_options: + # None = namespace doesn't vend credentials (don't create provider) + # {} or {...} = namespace vends credentials (create provider) + if namespace_storage_options is not None: # Create the storage options provider for automatic refresh storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id + namespace_client=namespace_client, table_id=table_id ) # Merge namespace storage options with any existing options @@ -6124,7 +6204,9 @@ def write_dataset( else: storage_options_provider = None elif table_id is not None: - raise ValueError("Both 'namespace' and 'table_id' must be provided together.") + raise ValueError( + "Both 'namespace_client' and 'table_id' must be provided together." + ) else: storage_options_provider = None managed_versioning = False @@ -6169,9 +6251,9 @@ def write_dataset( if storage_options_provider is not None: params["storage_options_provider"] = storage_options_provider - # Add namespace and table_id for managed versioning (external manifest store) - if managed_versioning and namespace is not None and table_id is not None: - params["namespace"] = namespace + # Add namespace_client and table_id for managed versioning (external manifest store) + if managed_versioning and namespace_client is not None and table_id is not None: + params["namespace"] = namespace_client params["table_id"] = table_id if commit_lock: diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index f8cff450f06..24b12064b14 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -11,6 +11,7 @@ from pathlib import Path from typing import ( TYPE_CHECKING, + Any, Callable, Dict, Iterator, @@ -34,6 +35,7 @@ RowIdMeta as RowIdMeta, ) from .lance import _Fragment, _write_fragments, _write_fragments_transaction +from .namespace import LanceNamespace, LanceNamespaceStorageOptionsProvider from .progress import FragmentWriteProgress, NoopFragmentWriteProgress from .types import _coerce_reader from .udf import BatchUDF, normalize_transform @@ -877,10 +879,12 @@ def write_fragments( data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, - storage_options_provider=None, + storage_options_provider: Optional[Any] = None, enable_stable_row_ids: bool = False, target_bases: Optional[List[str]] = None, initial_bases: Optional[List["DatasetBasePath"]] = None, + namespace_client: Optional[LanceNamespace] = None, + table_id: Optional[List[str]] = None, ) -> Transaction: ... @overload @@ -898,10 +902,12 @@ def write_fragments( data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, - storage_options_provider=None, + storage_options_provider: Optional[Any] = None, enable_stable_row_ids: bool = False, target_bases: Optional[List[str]] = None, initial_bases: Optional[List["DatasetBasePath"]] = None, + namespace_client: Optional[LanceNamespace] = None, + table_id: Optional[List[str]] = None, ) -> List[FragmentMetadata]: ... @@ -919,10 +925,12 @@ def write_fragments( data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, - storage_options_provider=None, + storage_options_provider: Optional[Any] = None, enable_stable_row_ids: bool = False, target_bases: Optional[List[str]] = None, initial_bases: Optional[List["DatasetBasePath"]] = None, + namespace_client: Optional[LanceNamespace] = None, + table_id: Optional[List[str]] = None, ) -> List[FragmentMetadata] | Transaction: """ Write data into one or more fragments. @@ -972,6 +980,12 @@ def write_fragments( Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. storage_options_provider : Optional[StorageOptionsProvider] + .. deprecated:: 4.0.0 + This parameter is deprecated and will be removed in version 5.0.0. + When using `namespace` and `table_id`, the storage options provider + is created automatically from the namespace. Pass `storage_options` + instead for static or initial storage options. + A storage options provider that can fetch and refresh storage options dynamically. This is useful for credentials that expire and need to be refreshed automatically. @@ -1002,6 +1016,15 @@ def write_fragments( **Only valid in CREATE mode**. Will raise an error if used with APPEND/OVERWRITE modes. + namespace_client : optional, LanceNamespace + A namespace client for automatic credential refresh. When provided with + `table_id`, a storage options provider will be created automatically to + refresh credentials via the namespace. Must be provided together with + `table_id`. The caller should provide initial/merged storage options via + the `storage_options` parameter. + table_id : optional, List[str] + The table identifier when using a namespace (e.g., ["my_table"]). + Must be provided together with `namespace_client`. Returns ------- @@ -1019,6 +1042,32 @@ def write_fragments( """ from .dataset import LanceDataset + # Emit deprecation warning for storage_options_provider + if storage_options_provider is not None: + warnings.warn( + "The 'storage_options_provider' parameter is deprecated and will be " + "removed in version 5.0.0. When using 'namespace' and 'table_id', the " + "storage options provider is created automatically. Pass 'storage_options' " + "instead for static or initial storage options.", + DeprecationWarning, + stacklevel=2, + ) + + # Create storage options provider from namespace_client + table_id + if namespace_client is not None: + if table_id is None: + raise ValueError( + "Both 'namespace_client' and 'table_id' must be provided together." + ) + if storage_options_provider is None: + storage_options_provider = LanceNamespaceStorageOptionsProvider( + namespace_client=namespace_client, table_id=table_id + ) + elif table_id is not None: + raise ValueError( + "Both 'namespace_client' and 'table_id' must be provided together." + ) + reader = _coerce_reader(data, schema) if isinstance(dataset_uri, Path): diff --git a/python/python/lance/io.py b/python/python/lance/io.py index b12d6dc106f..b3f7a0a91d7 100644 --- a/python/python/lance/io.py +++ b/python/python/lance/io.py @@ -14,6 +14,12 @@ class StorageOptionsProvider(ABC): """Abstract base class for providing storage options to Lance datasets. + .. deprecated:: 4.0.0 + This class is deprecated and will be removed in version 5.0.0. + When using `namespace_client` and `table_id` parameters in dataset + operations, credential refresh is handled automatically. Pass + `storage_options` for static credentials instead. + Storage options providers enable automatic refresh for long-running operations on cloud storage (S3, Azure, GCS). This is currently only used for refreshing AWS temporary access credentials. Implement this interface to integrate with diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py index 63ad2abd007..26299347a0d 100644 --- a/python/python/lance/namespace.py +++ b/python/python/lance/namespace.py @@ -669,11 +669,12 @@ class RestAdapter: Parameters ---------- - namespace_impl : str - Namespace implementation type ("dir", "rest", etc.) - namespace_properties : dict, optional - Configuration properties for the backend namespace. + namespace_client_impl : str + Namespace client implementation type ("dir", "rest", etc.) + namespace_client_properties : dict, optional + Configuration properties for the backend namespace client. For DirectoryNamespace ("dir"): + - root (required): Root directory path or URI - manifest_enabled (optional): Enable manifest tracking (default: "true") - dir_listing_enabled (optional): Enable directory listing fallback @@ -686,6 +687,12 @@ class RestAdapter: Port to listen on. Default 2333 per REST spec. Use 0 to let the OS assign an available ephemeral port. Use the `port` property after `start()` to get the actual port. + namespace_impl : str + .. deprecated:: 5.0.0 + Use `namespace_client_impl` instead. + namespace_properties : dict, optional + .. deprecated:: 5.0.0 + Use `namespace_client_properties` instead. Examples -------- @@ -701,11 +708,14 @@ class RestAdapter: def __init__( self, - namespace_impl: str, - namespace_properties: Dict[str, str] = None, + namespace_client_impl: str = None, + namespace_client_properties: Dict[str, str] = None, session=None, host: str = None, port: int = None, + *, + namespace_impl: str = None, + namespace_properties: Dict[str, str] = None, ): if PyRestAdapter is None: raise RuntimeError( @@ -714,15 +724,48 @@ def __init__( "Please rebuild with the 'rest-adapter' feature enabled." ) + # Handle deprecated 'namespace_impl' parameter + if namespace_impl is not None: + import warnings + + warnings.warn( + "The 'namespace_impl' parameter is deprecated and will be removed in " + "version 5.0.0. Use 'namespace_client_impl' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client_impl is None: + namespace_client_impl = namespace_impl + + # Handle deprecated 'namespace_properties' parameter + if namespace_properties is not None: + import warnings + + warnings.warn( + "The 'namespace_properties' parameter is deprecated and will be " + "removed in version 5.0.0. Use 'namespace_client_properties' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client_properties is None: + namespace_client_properties = namespace_properties + + if namespace_client_impl is None: + raise ValueError("namespace_client_impl is required") + # Convert to string properties - if namespace_properties is None: - namespace_properties = {} - str_properties = {str(k): str(v) for k, v in namespace_properties.items()} + if namespace_client_properties is None: + namespace_client_properties = {} + str_properties = { + str(k): str(v) for k, v in namespace_client_properties.items() + } # Create the underlying Rust adapter - self._inner = PyRestAdapter(namespace_impl, str_properties, session, host, port) + self._inner = PyRestAdapter( + namespace_client_impl, str_properties, session, host, port + ) self.host = host - self.namespace_impl = namespace_impl + self.namespace_client_impl = namespace_client_impl @property def port(self) -> int: @@ -757,6 +800,12 @@ def __repr__(self) -> str: class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): """Storage options provider that fetches storage options from a LanceNamespace. + .. deprecated:: 4.0.0 + This class is deprecated and will be removed in version 5.0.0. + When using `namespace_client` and `table_id` parameters in dataset + operations, credential refresh is handled automatically. You no longer + need to create this provider manually. + This provider automatically fetches fresh storage options by calling the namespace's describe_table() method, which returns both the table location and time-limited storage options. This is currently only used for refreshing @@ -767,11 +816,14 @@ class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): Parameters ---------- - namespace : LanceNamespace - The namespace instance to fetch storage options from. Use + namespace_client : LanceNamespace + The namespace client to fetch storage options from. Use lance.namespace.connect() to create a namespace instance. table_id : List[str] The table identifier (e.g., ["workspace", "table_name"]) + namespace : LanceNamespace, optional + .. deprecated:: 4.0.0 + Use `namespace_client` instead. Will be removed in version 5.0.0. Example ------- @@ -783,11 +835,13 @@ class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): import lance.namespace # Connect to a namespace - namespace = lance.namespace.connect("rest", {"uri": "http://localhost:4099"}) + namespace_client = lance.namespace.connect( + "rest", {"uri": "http://localhost:4099"} + ) # Create storage options provider provider = lance.LanceNamespaceStorageOptionsProvider( - namespace=namespace, + namespace_client=namespace_client, table_id=["workspace", "table_name"] ) @@ -798,17 +852,40 @@ class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): ) """ - def __init__(self, namespace: LanceNamespace, table_id: List[str]): - """Initialize with namespace and table ID. + def __init__( + self, + namespace_client: LanceNamespace, + table_id: List[str], + *, + # Deprecated parameter (keyword-only) + namespace: LanceNamespace = None, + ): + """Initialize with namespace client and table ID. Parameters ---------- - namespace : LanceNamespace - The namespace instance with a describe_table() method + namespace_client : LanceNamespace + The namespace client with a describe_table() method table_id : List[str] The table identifier + namespace : LanceNamespace, optional + .. deprecated:: 4.0.0 + Use `namespace_client` instead. Will be removed in version 5.0.0. """ - self._namespace = namespace + # Handle deprecated 'namespace' parameter + if namespace is not None: + import warnings + + warnings.warn( + "The 'namespace' parameter is deprecated and will be removed in " + "version 5.0.0. Use 'namespace_client' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client is None: + namespace_client = namespace + + self._namespace = namespace_client self._table_id = table_id def fetch_storage_options(self) -> Dict[str, str]: diff --git a/python/python/lance/tf/data.py b/python/python/lance/tf/data.py index 7b6ff8e51a4..f35fcd8e19a 100644 --- a/python/python/lance/tf/data.py +++ b/python/python/lance/tf/data.py @@ -141,9 +141,11 @@ def from_lance( filter: Optional[str] = None, fragments: Union[Iterable[int], Iterable[LanceFragment], tf.data.Dataset] = None, output_signature: Optional[Dict[str, tf.TypeSpec]] = None, - namespace: Optional["LanceNamespace"] = None, + namespace_client: Optional["LanceNamespace"] = None, table_id: Optional[List[str]] = None, ignore_namespace_table_storage_options: bool = False, + # Deprecated parameters + namespace: Optional["LanceNamespace"] = None, ) -> tf.data.Dataset: """Create a ``tf.data.Dataset`` from a Lance dataset. @@ -151,7 +153,7 @@ def from_lance( ---------- dataset : Union[str, Path, LanceDataset], optional Lance dataset or dataset URI/path. Either ``dataset`` or both - ``namespace`` and ``table_id`` must be provided. + ``namespace_client`` and ``table_id`` must be provided. columns : Optional[List[str]], optional List of columns to include in the output dataset. If not set, all columns will be read. @@ -165,13 +167,18 @@ def from_lance( output_signature : Optional[tf.TypeSpec], optional Override output signature of the returned tensors. If not provided, the output signature is inferred from the projection Schema. - namespace : Optional[LanceNamespace], optional - Namespace to resolve the table location when ``table_id`` is provided. + namespace_client : Optional[LanceNamespace], optional + Namespace client to resolve the table location when ``table_id`` is + provided. table_id : Optional[List[str]], optional - Table identifier used together with ``namespace`` to locate the table. + Table identifier used together with ``namespace_client`` to locate + the table. ignore_namespace_table_storage_options : bool, default False - When using ``namespace``/``table_id``, ignore storage options returned - by the namespace. + When using ``namespace_client``/``table_id``, ignore storage options + returned by the namespace. + namespace : Optional[LanceNamespace], optional + .. deprecated:: 4.0.0 + Use ``namespace_client`` instead. Will be removed in version 5.0.0. Examples -------- @@ -211,16 +218,29 @@ def from_lance( print(batch["image"].shape) """ + # Handle deprecated 'namespace' parameter + if namespace is not None: + import warnings + + warnings.warn( + "The 'namespace' parameter is deprecated and will be removed in " + "version 5.0.0. Use 'namespace_client' instead.", + DeprecationWarning, + stacklevel=2, + ) + if namespace_client is None: + namespace_client = namespace + if isinstance(dataset, LanceDataset): - if namespace is not None or table_id is not None: + if namespace_client is not None or table_id is not None: raise ValueError( - "Cannot specify 'namespace' or 'table_id' when passing " + "Cannot specify 'namespace_client' or 'table_id' when passing " "a LanceDataset instance" ) else: dataset = lance.dataset( dataset, - namespace=namespace, + namespace_client=namespace_client, table_id=table_id, ignore_namespace_table_storage_options=ignore_namespace_table_storage_options, ) diff --git a/python/python/tests/test_namespace_dir.py b/python/python/tests/test_namespace_dir.py index 49523a58e81..41460c672cf 100644 --- a/python/python/tests/test_namespace_dir.py +++ b/python/python/tests/test_namespace_dir.py @@ -817,7 +817,7 @@ def test_external_manifest_store_invokes_namespace_apis(): # Create initial table table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}]) ds = lance.write_dataset( - table1, namespace=namespace, table_id=table_id, mode="create" + table1, namespace_client=namespace, table_id=table_id, mode="create" ) assert ds.count_rows() == 2 assert len(ds.versions()) == 1 @@ -830,7 +830,7 @@ def test_external_manifest_store_invokes_namespace_apis(): # Open dataset through namespace - should call list_table_versions for latest initial_list_count = namespace.list_table_versions_count - ds_from_namespace = lance.dataset(namespace=namespace, table_id=table_id) + ds_from_namespace = lance.dataset(namespace_client=namespace, table_id=table_id) assert ds_from_namespace.count_rows() == 2 assert ds_from_namespace.version == 1 assert namespace.list_table_versions_count == initial_list_count + 1, ( @@ -845,7 +845,7 @@ def test_external_manifest_store_invokes_namespace_apis(): # Append data - should call create_table_version again table2 = pa.Table.from_pylist([{"a": 100, "b": 200}, {"a": 1000, "b": 2000}]) ds = lance.write_dataset( - table2, namespace=namespace, table_id=table_id, mode="append" + table2, namespace_client=namespace, table_id=table_id, mode="append" ) assert ds.count_rows() == 4 assert len(ds.versions()) == 2 @@ -856,7 +856,7 @@ def test_external_manifest_store_invokes_namespace_apis(): # Open latest version - should call list_table_versions list_count_before_latest = namespace.list_table_versions_count - latest_ds = lance.dataset(namespace=namespace, table_id=table_id) + latest_ds = lance.dataset(namespace_client=namespace, table_id=table_id) assert latest_ds.count_rows() == 4 assert latest_ds.version == 2 assert namespace.list_table_versions_count == list_count_before_latest + 1, ( @@ -865,7 +865,7 @@ def test_external_manifest_store_invokes_namespace_apis(): # Open specific version (v1) - should call describe_table_version describe_count_before_v1 = namespace.describe_table_version_count - v1_ds = lance.dataset(namespace=namespace, table_id=table_id, version=1) + v1_ds = lance.dataset(namespace_client=namespace, table_id=table_id, version=1) assert v1_ds.count_rows() == 2 assert v1_ds.version == 1 assert namespace.describe_table_version_count == describe_count_before_v1 + 1, ( diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index 9cb87011e9c..facc90598d3 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -172,7 +172,7 @@ def test_namespace_open_dataset(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -182,7 +182,7 @@ def test_namespace_open_dataset(s3_bucket: str): assert namespace.get_create_call_count() == 1 ds_from_namespace = lance.dataset( - namespace=namespace, + namespace_client=namespace, table_id=table_id, storage_options=storage_options, ) @@ -219,7 +219,7 @@ def test_namespace_with_refresh(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -228,7 +228,7 @@ def test_namespace_with_refresh(s3_bucket: str): assert namespace.get_create_call_count() == 1 ds_from_namespace = lance.dataset( - namespace=namespace, + namespace_client=namespace, table_id=table_id, storage_options=storage_options, ) @@ -271,7 +271,7 @@ def test_namespace_append_through_namespace(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -284,7 +284,7 @@ def test_namespace_append_through_namespace(s3_bucket: str): table2 = pa.Table.from_pylist([{"a": 10, "b": 20}]) ds = lance.write_dataset( table2, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="append", storage_options=storage_options, @@ -295,7 +295,7 @@ def test_namespace_append_through_namespace(s3_bucket: str): assert namespace.get_describe_call_count() == initial_describe_count + 1 ds_from_namespace = lance.dataset( - namespace=namespace, + namespace_client=namespace, table_id=table_id, storage_options=storage_options, ) @@ -324,7 +324,7 @@ def test_namespace_write_create_mode(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=["test_ns", table_name], mode="create", storage_options=storage_options, @@ -357,7 +357,7 @@ def test_namespace_write_append_mode(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -370,7 +370,7 @@ def test_namespace_write_append_mode(s3_bucket: str): ds = lance.write_dataset( table2, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="append", storage_options=storage_options, @@ -408,7 +408,7 @@ def test_namespace_write_overwrite_mode(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -421,7 +421,7 @@ def test_namespace_write_overwrite_mode(s3_bucket: str): ds = lance.write_dataset( table2, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="overwrite", storage_options=storage_options, @@ -470,7 +470,7 @@ def test_namespace_distributed_write(s3_bucket: str): assert namespace_storage_options is not None storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id + namespace_client=namespace, table_id=table_id ) merged_options = dict(storage_options) @@ -529,7 +529,7 @@ def test_namespace_distributed_write(s3_bucket: str): assert result == expected ds_from_namespace = lance.dataset( - namespace=namespace, + namespace_client=namespace, table_id=table_id, storage_options=storage_options, ) @@ -559,7 +559,7 @@ def test_file_writer_with_storage_options_provider(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -575,7 +575,7 @@ def test_file_writer_with_storage_options_provider(s3_bucket: str): merged_options.update(describe_response.storage_options) provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id + namespace_client=namespace, table_id=table_id ) initial_describe_count = namespace.get_describe_call_count() @@ -668,7 +668,7 @@ def test_file_reader_with_storage_options_provider(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -683,7 +683,7 @@ def test_file_reader_with_storage_options_provider(s3_bucket: str): merged_options.update(describe_response.storage_options) provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id + namespace_client=namespace, table_id=table_id ) file_uri = f"s3://{s3_bucket}/{table_name}_file_reader_test.lance" @@ -775,7 +775,7 @@ def test_file_session_with_storage_options_provider(s3_bucket: str): ds = lance.write_dataset( table1, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, @@ -790,7 +790,7 @@ def test_file_session_with_storage_options_provider(s3_bucket: str): merged_options.update(describe_response.storage_options) provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id + namespace_client=namespace, table_id=table_id ) initial_describe_count = namespace.get_describe_call_count() @@ -914,7 +914,7 @@ def test_basic_create_and_drop_on_s3(s3_bucket: str): # Create table using lance.write_dataset ds = lance.write_dataset( table_data, - namespace=namespace, + namespace_client=namespace, table_id=table_id, mode="create", storage_options=storage_options, diff --git a/python/python/tests/test_tf.py b/python/python/tests/test_tf.py index 432be52b482..3652df0e938 100644 --- a/python/python/tests/test_tf.py +++ b/python/python/tests/test_tf.py @@ -116,12 +116,12 @@ def fake_dataset(uri=None, **kwargs): ns = object() ds = from_lance( None, - namespace=ns, + namespace_client=ns, table_id=["tbl"], ignore_namespace_table_storage_options=True, ) - assert calls["kwargs"]["namespace"] is ns + assert calls["kwargs"]["namespace_client"] is ns assert calls["kwargs"]["table_id"] == ["tbl"] assert calls["kwargs"]["ignore_namespace_table_storage_options"] is True diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 35306636c93..39bb7035a95 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -669,9 +669,9 @@ impl Dataset { } // Set up namespace commit handler if namespace and table_id are provided - if let (Some(ns), Some(tid)) = (namespace, table_id) { - let ns_arc = extract_namespace_arc(py, ns)?; - let external_store = LanceNamespaceExternalManifestStore::new(ns_arc, tid); + if let (Some(namespace_client), Some(tid)) = (namespace, table_id) { + let namespace_client = extract_namespace_arc(py, namespace_client)?; + let external_store = LanceNamespaceExternalManifestStore::new(namespace_client, tid); let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { external_manifest_store: Arc::new(external_store), }); @@ -2310,24 +2310,25 @@ impl Dataset { }; // Create commit_handler: prefer user-provided commit_lock, then namespace-based handler - let commit_handler: Option> = - if let Some(commit_lock) = commit_lock.as_ref() { - // User provided a commit_lock - Some( - commit_lock - .into_py_any(commit_lock.py()) - .map(|cl| Arc::new(PyCommitLock::new(cl)) as Arc)?, - ) - } else if let (Some(ns), Some(tid)) = (namespace, table_id) { - // Create ExternalManifestCommitHandler from namespace and table_id - let ns_arc = extract_namespace_arc(ns.py(), ns)?; - let external_store = LanceNamespaceExternalManifestStore::new(ns_arc, tid); - Some(Arc::new(ExternalManifestCommitHandler { - external_manifest_store: Arc::new(external_store), - }) as Arc) - } else { - None - }; + let commit_handler: Option> = if let Some(commit_lock) = + commit_lock.as_ref() + { + // User provided a commit_lock + Some( + commit_lock + .into_py_any(commit_lock.py()) + .map(|cl| Arc::new(PyCommitLock::new(cl)) as Arc)?, + ) + } else if let (Some(namespace_client), Some(tid)) = (namespace, table_id) { + // Create ExternalManifestCommitHandler from namespace and table_id + let namespace_client = extract_namespace_arc(namespace_client.py(), namespace_client)?; + let external_store = LanceNamespaceExternalManifestStore::new(namespace_client, tid); + Some(Arc::new(ExternalManifestCommitHandler { + external_manifest_store: Arc::new(external_store), + }) as Arc) + } else { + None + }; let mut builder = CommitBuilder::new(dest.as_dest()) .enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(true)) @@ -3283,9 +3284,10 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult>(options, "namespace")?; let table_id_opt = get_dict_opt::>(options, "table_id")?; - if let (Some(ns), Some(table_id)) = (namespace_opt, table_id_opt) { - let ns_arc = extract_namespace_arc(options.py(), &ns)?; - let external_store = LanceNamespaceExternalManifestStore::new(ns_arc, table_id); + if let (Some(namespace_client), Some(table_id)) = (namespace_opt, table_id_opt) { + let namespace_client = extract_namespace_arc(options.py(), &namespace_client)?; + let external_store = + LanceNamespaceExternalManifestStore::new(namespace_client, table_id); let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { external_manifest_store: Arc::new(external_store), diff --git a/python/src/namespace.rs b/python/src/namespace.rs index b4876ac5559..af28a6059d7 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -864,37 +864,37 @@ impl LanceNamespaceTrait for PyLanceNamespace { /// are wrapped with PyLanceNamespace to call through Python. pub fn extract_namespace_arc( py: Python<'_>, - ns: &Bound<'_, PyAny>, + namespace_client: &Bound<'_, PyAny>, ) -> PyResult> { // Direct PyO3 class - if let Ok(dir_ns) = ns.downcast::() { - return Ok(dir_ns.borrow().inner.clone()); + if let Ok(dir_namespace_client) = namespace_client.downcast::() { + return Ok(dir_namespace_client.borrow().inner.clone()); } - if let Ok(rest_ns) = ns.downcast::() { - return Ok(rest_ns.borrow().inner.clone()); + if let Ok(rest_namespace_client) = namespace_client.downcast::() { + return Ok(rest_namespace_client.borrow().inner.clone()); } // Python wrapper class - check if it's the exact wrapper class - if let Ok(inner) = ns.getattr("_inner") { - let type_name = ns + if let Ok(inner) = namespace_client.getattr("_inner") { + let type_name = namespace_client .get_type() .name() .map(|n| n.to_string()) .unwrap_or_default(); if type_name == "DirectoryNamespace" { - if let Ok(dir_ns) = inner.downcast::() { - return Ok(dir_ns.borrow().inner.clone()); + if let Ok(dir_namespace_client) = inner.downcast::() { + return Ok(dir_namespace_client.borrow().inner.clone()); } } else if type_name == "RestNamespace" - && let Ok(rest_ns) = inner.downcast::() + && let Ok(rest_namespace_client) = inner.downcast::() { - return Ok(rest_ns.borrow().inner.clone()); + return Ok(rest_namespace_client.borrow().inner.clone()); } } // Custom Python implementation or subclass - wrap with PyLanceNamespace - PyLanceNamespace::create_arc(py, ns) + PyLanceNamespace::create_arc(py, namespace_client) } /// Python wrapper for REST adapter server @@ -911,21 +911,21 @@ impl PyRestAdapter { /// Default port is 2333 per REST spec. Use port 0 to let OS assign an ephemeral port. /// Use `port` property after `start()` to get the actual port. #[new] - #[pyo3(signature = (namespace_impl, namespace_properties, session = None, host = None, port = None))] + #[pyo3(signature = (namespace_client_impl, namespace_client_properties, session = None, host = None, port = None))] fn new( - namespace_impl: String, - namespace_properties: Option<&Bound<'_, PyDict>>, + namespace_client_impl: String, + namespace_client_properties: Option<&Bound<'_, PyDict>>, session: Option<&Bound<'_, Session>>, host: Option, port: Option, ) -> PyResult { let mut props = HashMap::new(); - if let Some(dict) = namespace_properties { + if let Some(dict) = namespace_client_properties { props = dict_to_hashmap(dict)?; } - let mut builder = ConnectBuilder::new(namespace_impl); + let mut builder = ConnectBuilder::new(namespace_client_impl); for (k, v) in props { builder = builder.property(k, v); } diff --git a/rust/lance-datafusion/src/substrait.rs b/rust/lance-datafusion/src/substrait.rs index 8375c49abb9..1c465fcae4a 100644 --- a/rust/lance-datafusion/src/substrait.rs +++ b/rust/lance-datafusion/src/substrait.rs @@ -1127,4 +1127,40 @@ mod tests { agg.aggregates[1].schema_name() ); } + + // ==================== LIKE and starts_with tests ==================== + + #[tokio::test] + async fn test_substrait_roundtrip_like() { + use datafusion::logical_expr::Like; + + let schema = Schema::new(vec![Field::new("name", DataType::Utf8, true)]); + + let like_expr = Expr::Like(Like { + negated: false, + expr: Box::new(Expr::Column(Column::new_unqualified("name"))), + pattern: Box::new(Expr::Literal( + ScalarValue::Utf8(Some("test%".to_string())), + None, + )), + escape_char: None, + case_insensitive: false, + }); + + assert_substrait_roundtrip(schema, like_expr).await; + } + + #[tokio::test] + async fn test_substrait_roundtrip_starts_with() { + use datafusion::functions::string::starts_with; + + let schema = Schema::new(vec![Field::new("name", DataType::Utf8, true)]); + + let starts_with_expr = starts_with().call(vec![ + Expr::Column(Column::new_unqualified("name")), + Expr::Literal(ScalarValue::Utf8(Some("prefix".to_string())), None), + ]); + + assert_substrait_roundtrip(schema, starts_with_expr).await; + } } diff --git a/rust/lance-io/src/object_store/storage_options.rs b/rust/lance-io/src/object_store/storage_options.rs index b16281ea4be..bf3079dca5d 100644 --- a/rust/lance-io/src/object_store/storage_options.rs +++ b/rust/lance-io/src/object_store/storage_options.rs @@ -94,7 +94,7 @@ pub trait StorageOptionsProvider: Send + Sync + fmt::Debug { /// StorageOptionsProvider implementation that fetches options from a LanceNamespace pub struct LanceNamespaceStorageOptionsProvider { - namespace: Arc, + namespace_client: Arc, table_id: Vec, } @@ -114,11 +114,11 @@ impl LanceNamespaceStorageOptionsProvider { /// Create a new LanceNamespaceStorageOptionsProvider /// /// # Arguments - /// * `namespace` - The namespace implementation to fetch storage options from + /// * `namespace_client` - The namespace implementation to fetch storage options from /// * `table_id` - The table identifier - pub fn new(namespace: Arc, table_id: Vec) -> Self { + pub fn new(namespace_client: Arc, table_id: Vec) -> Self { Self { - namespace, + namespace_client, table_id, } } @@ -132,20 +132,24 @@ impl StorageOptionsProvider for LanceNamespaceStorageOptionsProvider { ..Default::default() }; - let response = self.namespace.describe_table(request).await.map_err(|e| { - Error::io_source(Box::new(std::io::Error::other(format!( - "Failed to fetch storage options: {}", - e - )))) - })?; + let response = self + .namespace_client + .describe_table(request) + .await + .map_err(|e| { + Error::io_source(Box::new(std::io::Error::other(format!( + "Failed to fetch storage options: {}", + e + )))) + })?; Ok(response.storage_options) } fn provider_id(&self) -> String { format!( - "LanceNamespaceStorageOptionsProvider {{ namespace: {}, table_id: {:?} }}", - self.namespace.namespace_id(), + "LanceNamespaceStorageOptionsProvider {{ namespace_client: {}, table_id: {:?} }}", + self.namespace_client.namespace_id(), self.table_id ) } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 817954da710..827535dca2e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -752,12 +752,12 @@ impl Dataset { /// # Arguments /// /// * `batches` - The record batches to write - /// * `namespace` - The namespace to use for table management + /// * `namespace_client` - The namespace to use for table management /// * `table_id` - The table identifier /// * `params` - Write parameters pub async fn write_into_namespace( batches: impl RecordBatchReader + Send + 'static, - namespace: Arc, + namespace_client: Arc, table_id: Vec, mut params: Option, ) -> Result { @@ -769,7 +769,7 @@ impl Dataset { id: Some(table_id.clone()), ..Default::default() }; - let response = namespace + let response = namespace_client .declare_table(declare_request) .await .map_err(|e| Error::namespace_source(Box::new(e)))?; @@ -783,7 +783,7 @@ impl Dataset { // Set up commit handler when managed_versioning is enabled if response.managed_versioning == Some(true) { let external_store = LanceNamespaceExternalManifestStore::new( - namespace.clone(), + namespace_client.clone(), table_id.clone(), ); let commit_handler: Arc = @@ -793,10 +793,10 @@ impl Dataset { write_params.commit_handler = Some(commit_handler); } - // Set initial credentials and provider from namespace + // Set initial credentials and provider from namespace_client if let Some(namespace_storage_options) = response.storage_options { let provider: Arc = Arc::new( - LanceNamespaceStorageOptionsProvider::new(namespace, table_id), + LanceNamespaceStorageOptionsProvider::new(namespace_client, table_id), ); // Merge namespace storage options with any existing options @@ -826,7 +826,7 @@ impl Dataset { id: Some(table_id.clone()), ..Default::default() }; - let response = namespace + let response = namespace_client .describe_table(request) .await .map_err(|e| Error::namespace_source(Box::new(e)))?; @@ -840,7 +840,7 @@ impl Dataset { // Set up commit handler when managed_versioning is enabled if response.managed_versioning == Some(true) { let external_store = LanceNamespaceExternalManifestStore::new( - namespace.clone(), + namespace_client.clone(), table_id.clone(), ); let commit_handler: Arc = @@ -850,11 +850,11 @@ impl Dataset { write_params.commit_handler = Some(commit_handler); } - // Set initial credentials and provider from namespace + // Set initial credentials and provider from namespace_client if let Some(namespace_storage_options) = response.storage_options { let provider: Arc = Arc::new(LanceNamespaceStorageOptionsProvider::new( - namespace.clone(), + namespace_client.clone(), table_id.clone(), )); diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index b1981d05a28..2e30ed12b5f 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -119,7 +119,7 @@ impl DatasetBuilder { /// ``` #[allow(deprecated)] pub async fn from_namespace( - namespace: Arc, + namespace_client: Arc, table_id: Vec, ) -> Result { let request = DescribeTableRequest { @@ -127,7 +127,7 @@ impl DatasetBuilder { ..Default::default() }; - let response = namespace + let response = namespace_client .describe_table(request) .await .map_err(|e| Error::namespace_source(Box::new(e)))?; @@ -142,8 +142,10 @@ impl DatasetBuilder { // Check managed_versioning flag to determine if namespace-managed commits should be used if response.managed_versioning == Some(true) { - let external_store = - LanceNamespaceExternalManifestStore::new(namespace.clone(), table_id.clone()); + let external_store = LanceNamespaceExternalManifestStore::new( + namespace_client.clone(), + table_id.clone(), + ); let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { external_manifest_store: Arc::new(external_store), }); @@ -157,7 +159,7 @@ impl DatasetBuilder { if let Some(initial_opts) = namespace_storage_options { let provider: Arc = Arc::new( - LanceNamespaceStorageOptionsProvider::new(namespace, table_id), + LanceNamespaceStorageOptionsProvider::new(namespace_client, table_id), ); builder.options.storage_options_accessor = Some(Arc::new( StorageOptionsAccessor::with_initial_and_provider(initial_opts, provider), diff --git a/rust/lance/src/io/commit/namespace_manifest.rs b/rust/lance/src/io/commit/namespace_manifest.rs index 2593ad89dc6..5b3de301886 100644 --- a/rust/lance/src/io/commit/namespace_manifest.rs +++ b/rust/lance/src/io/commit/namespace_manifest.rs @@ -16,14 +16,14 @@ use object_store::path::Path; #[derive(Debug)] pub struct LanceNamespaceExternalManifestStore { - namespace: Arc, + namespace_client: Arc, table_id: Vec, } impl LanceNamespaceExternalManifestStore { - pub fn new(namespace: Arc, table_id: Vec) -> Self { + pub fn new(namespace_client: Arc, table_id: Vec) -> Self { Self { - namespace, + namespace_client, table_id, } } @@ -38,7 +38,10 @@ impl ExternalManifestStore for LanceNamespaceExternalManifestStore { ..Default::default() }; - let response = self.namespace.describe_table_version(request).await?; + let response = self + .namespace_client + .describe_table_version(request) + .await?; // Namespace returns full path (relative to object store root) Ok(response.version.manifest_path) @@ -52,7 +55,7 @@ impl ExternalManifestStore for LanceNamespaceExternalManifestStore { ..Default::default() }; - let response = self.namespace.list_table_versions(request).await?; + let response = self.namespace_client.list_table_versions(request).await?; if response.versions.is_empty() { return Ok(None); @@ -94,7 +97,7 @@ impl ExternalManifestStore for LanceNamespaceExternalManifestStore { ..Default::default() }; - let response = self.namespace.create_table_version(request).await?; + let response = self.namespace_client.create_table_version(request).await?; // Get version info from response let version_info = response.version.ok_or_else(|| {