From 5b82c5ed5531baa059fba9bb95e7659ff01e93e4 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 3 May 2026 23:09:31 +0900 Subject: [PATCH] feat: expose catalog metadata api for python --- .../python/pypaimon_rust/datafusion.pyi | 21 +++++ bindings/python/src/context.rs | 51 ++++++++-- bindings/python/src/lib.rs | 2 + bindings/python/src/schema.rs | 92 +++++++++++++++++++ bindings/python/src/table.rs | 49 ++++++++++ bindings/python/tests/test_datafusion.py | 22 +++++ 6 files changed, 228 insertions(+), 9 deletions(-) create mode 100644 bindings/python/src/schema.rs create mode 100644 bindings/python/src/table.rs diff --git a/bindings/python/python/pypaimon_rust/datafusion.pyi b/bindings/python/python/pypaimon_rust/datafusion.pyi index dfa63c7a..172c1fe8 100644 --- a/bindings/python/python/pypaimon_rust/datafusion.pyi +++ b/bindings/python/python/pypaimon_rust/datafusion.pyi @@ -23,9 +23,30 @@ ArrowTypeLike: TypeAlias = Union[pyarrow.DataType, pyarrow.Field, str] InputFieldsLike: TypeAlias = Union[ArrowTypeLike, Sequence[ArrowTypeLike]] VolatilityLike: TypeAlias = Union[str, Any] +class DataField: + def name(self) -> str: ... + def field_type(self) -> str: ... + def is_nullable(self) -> bool: ... + def description(self) -> Optional[str]: ... + +class TableSchema: + def fields(self) -> List[DataField]: ... + def partition_keys(self) -> List[str]: ... + def primary_keys(self) -> List[str]: ... + def options(self) -> Dict[str, str]: ... + def comment(self) -> Optional[str]: ... + +class Table: + def identifier(self) -> str: ... + def location(self) -> str: ... + def schema(self) -> TableSchema: ... + class PaimonCatalog: def __init__(self, catalog_options: Dict[str, str]) -> None: ... def __datafusion_catalog_provider__(self, session: Any) -> object: ... + def list_databases(self) -> List[str]: ... + def list_tables(self, database_name: str) -> List[str]: ... + def get_table(self, identifier: str) -> Table: ... class PythonScalarUDF: def __init__( diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs index f65d6a1a..cc855ee8 100644 --- a/bindings/python/src/context.rs +++ b/bindings/python/src/context.rs @@ -24,25 +24,25 @@ use datafusion::catalog::CatalogProvider; use datafusion::logical_expr::{Signature, TypeSignature, Volatility}; use datafusion_ffi::catalog_provider::FFI_CatalogProvider; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; -use paimon::{CatalogFactory, Options}; +use paimon::catalog::Identifier; +use paimon::{Catalog, CatalogFactory, Options}; use paimon_datafusion::{PaimonCatalogProvider, SQLContext}; -use pyo3::exceptions::PyRuntimeWarning; +use pyo3::exceptions::{PyRuntimeWarning, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyCapsule; use crate::blob::PyBlobReaderRegistry; use crate::error::{df_to_py_err, to_py_err}; +use crate::table::PyTable; use crate::udf::{build_python_scalar_udf, udf, PyPythonScalarUDFObject}; use paimon_datafusion::runtime::runtime; -fn build_paimon_catalog_provider( - catalog_options: HashMap, -) -> PyResult> { +fn build_paimon_catalog(catalog_options: HashMap) -> PyResult> { let rt = runtime(); rt.block_on(async { let options = Options::from_map(catalog_options); let catalog = CatalogFactory::create(options).await.map_err(to_py_err)?; - Ok::<_, PyErr>(Arc::new(PaimonCatalogProvider::new(catalog))) + Ok::<_, PyErr>(catalog) }) } @@ -65,6 +65,7 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) -> PyResult, provider: Arc, } @@ -73,9 +74,9 @@ impl PaimonCatalog { /// Create a Paimon catalog that can be registered into a DataFusion session. #[new] fn new(catalog_options: HashMap) -> PyResult { - Ok(Self { - provider: build_paimon_catalog_provider(catalog_options)?, - }) + let catalog = build_paimon_catalog(catalog_options)?; + let provider = Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))); + Ok(Self { catalog, provider }) } /// Export this catalog as a DataFusion catalog provider PyCapsule. @@ -90,6 +91,35 @@ impl PaimonCatalog { let provider = FFI_CatalogProvider::new_with_ffi_codec(provider, Some(runtime()), codec); PyCapsule::new(py, provider, Some(name)) } + + /// List all databases in this catalog. + fn list_databases(&self) -> PyResult> { + runtime() + .block_on(self.catalog.list_databases()) + .map_err(to_py_err) + } + + /// List all tables in the given database. + fn list_tables(&self, database_name: &str) -> PyResult> { + runtime() + .block_on(self.catalog.list_tables(database_name)) + .map_err(to_py_err) + } + + /// Get a table handle by `"db.table"` identifier. + fn get_table(&self, identifier: &str) -> PyResult { + let parts: Vec<&str> = identifier.splitn(2, '.').collect(); + if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() { + return Err(PyValueError::new_err(format!( + "expected identifier in 'db.table' format, got '{identifier}'" + ))); + } + let id = Identifier::new(parts[0], parts[1]); + let table = runtime() + .block_on(self.catalog.get_table(&id)) + .map_err(to_py_err)?; + Ok(PyTable::new(Arc::new(table))) + } } /// A SQL context that supports registering multiple Paimon catalogs and executing SQL. @@ -226,6 +256,9 @@ impl PySQLContext { pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { let this = PyModule::new(py, "datafusion")?; this.add_class::()?; + this.add_class::()?; + this.add_class::()?; + this.add_class::()?; this.add_class::()?; this.add_class::()?; this.add_function(wrap_pyfunction!(udf, &this)?)?; diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 5f8d17a4..d0d2002b 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -20,6 +20,8 @@ use pyo3::prelude::*; mod blob; mod context; mod error; +mod schema; +mod table; mod udf; #[pymodule] diff --git a/bindings/python/src/schema.rs b/bindings/python/src/schema.rs new file mode 100644 index 00000000..194cb3ad --- /dev/null +++ b/bindings/python/src/schema.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use paimon::spec::{DataField, TableSchema}; +use pyo3::prelude::*; + +#[pyclass(name = "TableSchema", module = "pypaimon_rust.datafusion")] +pub struct PyTableSchema { + inner: TableSchema, +} + +impl PyTableSchema { + pub fn new(inner: TableSchema) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyTableSchema { + fn fields(&self) -> Vec { + self.inner + .fields() + .iter() + .cloned() + .map(PyDataField::new) + .collect() + } + + fn partition_keys(&self) -> Vec { + self.inner.partition_keys().to_vec() + } + + fn primary_keys(&self) -> Vec { + self.inner.primary_keys().to_vec() + } + + fn options(&self) -> HashMap { + self.inner.options().clone() + } + + fn comment(&self) -> Option { + self.inner.comment().map(str::to_string) + } +} + +#[pyclass(name = "DataField", module = "pypaimon_rust.datafusion")] +pub struct PyDataField { + inner: DataField, +} + +impl PyDataField { + pub fn new(inner: DataField) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyDataField { + fn name(&self) -> String { + self.inner.name().to_string() + } + + fn field_type(&self) -> String { + // TODO(#284 follow-up): mirror Java DataType.asSQLString() once + // a Display impl is added to paimon::spec::DataType. + format!("{:?}", self.inner.data_type()) + } + + fn is_nullable(&self) -> bool { + self.inner.data_type().is_nullable() + } + + fn description(&self) -> Option { + self.inner.description().map(str::to_string) + } +} diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs new file mode 100644 index 00000000..0a0c35c9 --- /dev/null +++ b/bindings/python/src/table.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use pyo3::prelude::*; + +use crate::schema::PyTableSchema; + +#[pyclass(name = "Table", module = "pypaimon_rust.datafusion")] +pub struct PyTable { + pub(crate) inner: Arc, +} + +impl PyTable { + pub fn new(inner: Arc) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyTable { + fn identifier(&self) -> String { + let id = self.inner.identifier(); + format!("{}.{}", id.database(), id.object()) + } + + fn location(&self) -> String { + self.inner.location().to_string() + } + + fn schema(&self) -> PyTableSchema { + PyTableSchema::new(self.inner.schema().clone()) + } +} diff --git a/bindings/python/tests/test_datafusion.py b/bindings/python/tests/test_datafusion.py index d120a945..236b130f 100644 --- a/bindings/python/tests/test_datafusion.py +++ b/bindings/python/tests/test_datafusion.py @@ -294,6 +294,7 @@ def test_query_simple_table_via_catalog_provider(): ] + def test_sql_context_ddl_dml(): with tempfile.TemporaryDirectory() as warehouse: ctx = SQLContext() @@ -663,3 +664,24 @@ def test_table_functions_registered_with_catalog(): pytest.fail(f"expected {fn} to reject a single argument") except Exception as e: assert "requires 4 arguments" in str(e), str(e) + + +def test_list_databases_and_tables(): + catalog = PaimonCatalog({"warehouse": WAREHOUSE}) + + assert "default" in catalog.list_databases() + + tables = catalog.list_tables("default") + assert "simple_log_table" in tables + + table = catalog.get_table("default.simple_log_table") + assert table.identifier() == "default.simple_log_table" + assert table.location().endswith("/default.db/simple_log_table") or table.location() + + schema = table.schema() + field_names = [f.name() for f in schema.fields()] + assert "id" in field_names + assert "name" in field_names + # simple_log_table is non-partitioned, so partition keys are empty. + assert schema.partition_keys() == [] +