Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4e3815f
feat: add URL scanning support to PlanResolver trait
jonmmease Mar 12, 2026
8d5cf64
fix: address code review findings and CI failures
jonmmease Mar 12, 2026
a14dec6
fix: thread parse spec through ParsedUrl to DataFusionResolver::scan_url
jonmmease Mar 12, 2026
0eb7694
fix: fix Windows CI failures in URL scheme check and path tests
jonmmease Mar 12, 2026
f705ba2
refactor: rename protocol to scheme across URL and external dataset s…
jonmmease Mar 13, 2026
9858e58
refactor: move PlanResolver trait from vegafusion-core to vegafusion-…
jonmmease Mar 13, 2026
0ff4df6
feat: add scheme parameter to resolve_table API
jonmmease Mar 13, 2026
82f964c
refactor: make scheme required on ExternalTableProvider and ExternalD…
jonmmease Mar 13, 2026
1f0e121
style: fix ruff formatting in test_plan_resolver.py
jonmmease Mar 13, 2026
09dbd37
docs: clarify wasm32 path_to_file_url fallback comment
jonmmease Mar 13, 2026
e50d7c5
fix: anchor scheme detection in has_url_scheme to start of string
jonmmease Mar 13, 2026
c788d49
feat: add supports_arrow_tables capability and plan-aware materializa…
jonmmease Mar 14, 2026
fd06387
docs: add PlanResolver documentation, examples, and docstring improve…
jonmmease Mar 14, 2026
37fdf11
style: fix ruff formatting in example scripts
jonmmease Mar 14, 2026
4df3bfb
style: add future annotations import to example scripts
jonmmease Mar 16, 2026
bff8ac5
docs: remove journey comment from DataBaseUrlSetting
jonmmease Mar 16, 2026
9985a33
refactor: simplify has_url_scheme with regex, fix line length
jonmmease Mar 16, 2026
45c65ce
style: remove added comments from tasks.proto
jonmmease Mar 16, 2026
6d887cb
docs: add three-state comment to DataBaseUrlSettingProto
jonmmease Mar 16, 2026
97d0c4f
docs: clarify QueryRequest is for WASM transport, not gRPC
jonmmease Mar 16, 2026
ade3163
refactor: remove ResolverCapabilities from planning, unify URL resolu…
jonmmease Mar 16, 2026
4e600cd
fix: reject unsupported formats (e.g. topojson) at planning time
jonmmease Mar 16, 2026
d249749
style: remove section header comments
jonmmease Mar 16, 2026
1396662
refactor: move URL utilities from runtime/plan_resolver.rs to data/ur…
jonmmease Mar 16, 2026
5720a7a
refactor: move data_base_url from task graph to runtime
jonmmease Mar 17, 2026
fa3a62c
refactor: introduce TaskContext and move data_base_url to VegaFusionR…
jonmmease Mar 17, 2026
6ff1cdc
feat: add filter predicates to resolve_table and unparse_expr_to_sql
jonmmease Mar 18, 2026
2e540be
refactor: address PR #587 review comments
jonmmease Mar 18, 2026
0e303ed
test: add filter pushdown test with TODO for _vf_order restructuring
jonmmease Mar 18, 2026
b081089
style: remove redundant examples
jonmmease Mar 18, 2026
2b7cd8f
style: clean up Python test assertions
jonmmease Mar 18, 2026
fd0140a
docs: fix plan_resolver.md links and update signatures
jonmmease Mar 18, 2026
746edd5
docs: improve plan_resolver.md examples and fix review findings
jonmmease Mar 18, 2026
88bef32
style: fix example review findings
jonmmease Mar 18, 2026
fab9c57
Align VegaFusion URL config with server policy
jonmmease Mar 21, 2026
cae6867
fix: strip Windows \\?\ prefix in canonicalize and fix formatting
jonmmease Mar 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/source/features/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ transform_spec
transform_extract
chart_state
inline_datasets
plan_resolver
grpc
embed
jupyter_widget
Expand Down
13 changes: 12 additions & 1 deletion docs/source/features/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
The VegaFusion Runtime can run as a [gRPC](https://grpc.io/) service, which makes it possible for multiple clients to connect to the same runtime, and share a cache (See [How it Works](../about/how_it_works) for more details). This also makes it possible for the Runtime to reside on a different host than the client.

:::{warning}
VegaFusion's gRPC server does not currently support authentication, and chart specifications may reference the local file system of the machine running the server. It is not currently recommended to use VegaFusion server with untrusted Vega specifications unless other measures are taken to isolate the service.
VegaFusion's gRPC server does not currently support authentication. If you use it with untrusted Vega specifications, lock down the server process with `--no-allowed-urls`, `--allowed-base-url`, `--base-url`, or `--no-base-url`, and apply any additional isolation your deployment requires.

URL policy is enforced against the initial resolved URL only. VegaFusion does not re-check redirect destinations after a fetch begins.
:::

## VegaFusion Server
Expand All @@ -18,6 +20,15 @@ The server may then be launched using a particular port as follows:
vegafusion-server --port 50051
```

The server process owns URL resolution and access policy for all gRPC clients. For example:

```
vegafusion-server \
--port 50051 \
--base-url https://cdn.jsdelivr.net/npm/vega-datasets@v2.9.0/ \
--allowed-base-url https://cdn.jsdelivr.net/
```

## Python
The `vf.runtime.grpc_connect` method is used to connect the Python client to a VegaFusion Server instance.

Expand Down
2 changes: 2 additions & 0 deletions docs/source/features/inline_datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ See [inline_datasets.py](https://github.com/vega/vegafusion/tree/main/examples/p
In Rust, `inline_datasets` should be a `HashMap<String, VegaFusionDataset>` from dataset names (e.g. `movies` in the example above) to `VegaFusionDataset` instances. `VegaFusionDataset` is an enum that may be either a `VegaFusionTable` (which is a thin wrapper around Arrow RecordBatches), or a DataFusion [`LocalPlan`](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html) (which represents an arbitrary DataFusion query).

See [inline_datasets.rs](https://github.com/vega/vegafusion/tree/main/examples/rust-examples/examples/inline_datasets.rs) for a complete example using a `VegaFusionTable`, and see [inline_datasets_plan.rs](https://github.com/vega/vegafusion/tree/main/examples/rust-examples/examples/inline_datasets_plan.rs) for a complete example using a DataFusion ``LogicalPlan``.

For more advanced data source integration (custom URL schemes, SQL transpilation, remote execution), see [Plan Resolver](./plan_resolver.md).
152 changes: 152 additions & 0 deletions docs/source/features/plan_resolver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Plan Resolver

PlanResolver lets you connect custom data sources to VegaFusion. Use it when data lives in an external system (Spark, Snowflake, DuckDB, a custom API) and you want to push computation there instead of pulling it all into memory. For data you already have in Python as DataFrames or Arrow tables, [inline datasets](./inline_datasets.md) are simpler.

:::{note}
`resolve_table`, `resolve_plan_proto` (bytes variant), and `unparse_to_sql` with bytes require no additional dependencies beyond `vegafusion`.

`external_table_scan_node`, `inline_table_scan_node`, and `resolve_plan` (deserialized `LogicalPlanNode` variant) require the protobuf package:

```
pip install vegafusion[plan-resolver]
```
:::

## Python

Override one of these methods on `PlanResolver` (simplest first):

- `resolve_table`: return an Arrow table for a single external data source. VegaFusion handles the rest — it applies Vega transforms (filter, aggregate, etc.) via DataFusion after your resolver provides the data.
- `resolve_plan` / `resolve_plan_proto`: evaluate an entire logical plan, or the parts your backend supports. Use this to transpile the plan to SQL and execute it remotely, or to push supported operations to your query engine while letting DataFusion handle the rest.

### scan_url + resolve_table

For custom URL schemes in Vega specs (e.g. `"url": "mydb://warehouse/sales"`), override `scan_url()` and `resolve_table()`:

```python
import vegafusion as vf
from vegafusion import PlanResolver
from vegafusion.plan_resolver import external_table_scan_node

class MyResolver(PlanResolver):
def scan_url(self, parsed_url):
if parsed_url["scheme"] != "mydb":
return None # pass to next resolver

# Look up the table schema from your data source.
# This is called at planning time, so avoid loading data here.
schema = get_table_schema(parsed_url["path"])

return external_table_scan_node(
table_name=parsed_url["url"],
schema=schema,
scheme="mydb",
metadata={"path": parsed_url["path"]},
)

def resolve_table(self, name, scheme, schema, metadata=None,
projected_columns=None, filters=None):
# Called at execution time — load the actual data.
# projected_columns lists only the columns DataFusion needs,
# so you can avoid reading unnecessary columns.
return load_table(metadata["path"], columns=projected_columns)
```

`scan_url()` is called at planning time — it inspects the URL and returns an `ExternalTableProvider` plan node with the table's schema. `resolve_table()` is called at execution time to provide the actual data.

Use `base_url` on the runtime to set a base path for relative URLs in Vega specs:

```python
resolver = MyResolver()
rt = vf.VegaFusionRuntime(
plan_resolver=resolver,
base_url="mydb://warehouse/",
)

# Vega spec with "url": "sales" resolves to "mydb://warehouse/sales"
```

See [plan_resolver_url_scanning.py](https://github.com/vega/vegafusion/tree/main/examples/python-examples/plan_resolver_url_scanning.py) for a complete example.

### resolve_table only

If data comes from `ExternalDataset` inline datasets (not URLs), you only need `resolve_table`:

```python
import vegafusion as vf
from vegafusion import ExternalDataset, PlanResolver

class MyResolver(PlanResolver):
def resolve_table(self, name, scheme, schema, metadata=None,
projected_columns=None, filters=None):
# Look up data by name from your data source
df = my_database.query(name, columns=projected_columns)
return df.to_arrow()

ext = ExternalDataset(scheme="mydb", schema=table.schema, data=table)
rt = vf.VegaFusionRuntime(plan_resolver=MyResolver())
datasets, _ = rt.pre_transform_datasets(
spec, datasets=["result"],
inline_datasets={"source": ext}, dataset_format="pyarrow",
)
```

No protobuf dependency is needed for this pattern.

### resolve_plan + unparse_to_sql

Override `resolve_plan_proto` to receive the full logical plan and transpile it to SQL for remote execution:

```python
from vegafusion import PlanResolver
from vegafusion.plan_resolver import unparse_to_sql

class SqlResolver(PlanResolver):
def __init__(self, connection):
self._conn = connection

def resolve_plan_proto(self, plan_bytes, datasets):
# Convert the DataFusion logical plan to a SQL string
sql = unparse_to_sql(plan_bytes, dialect="default")

# Execute the SQL against your database
cursor = self._conn.cursor()
cursor.execute(sql)
return cursor.fetch_arrow_all()
```

`resolve_plan_proto` receives protobuf bytes that can be passed directly to `unparse_to_sql()` without deserialization. To inspect or modify the plan tree, use `resolve_plan()` instead (it receives a deserialized `LogicalPlanNode`).

Supported SQL dialects: `"default"`, `"postgres"`, `"mysql"`, `"sqlite"`, `"duckdb"`, `"bigquery"`.

See [plan_resolver_sql.py](https://github.com/vega/vegafusion/tree/main/examples/python-examples/plan_resolver_sql.py) for a complete example.

### Configuration

`PlanResolver` cannot be used with `grpc_connect()` (resolvers run in-process). Class-level attributes control resolver behavior:

- `thread_safe` (default `True`) — set to `False` for backends with thread-affine connections (e.g. DuckDB)
- `skip_when_no_external_tables` (default `True`) — set to `False` to receive all plans, not just those with external tables (e.g. for logging)
- `supports_arrow_tables` (default `False`) — set to `True` to let the runtime eagerly materialize plans into Arrow tables

### API Reference

```{eval-rst}
.. autoclass:: vegafusion.PlanResolver
:members:

.. autoclass:: vegafusion.ExternalDataset
:members:

.. autofunction:: vegafusion.plan_resolver.external_table_scan_node

.. autofunction:: vegafusion.plan_resolver.unparse_to_sql

.. autofunction:: vegafusion.plan_resolver.unparse_expr_to_sql

.. autofunction:: vegafusion.plan_resolver.inline_table_scan_node
```

## Rust

The `PlanResolver` trait in `vegafusion-runtime` provides the same two-phase architecture (scan_url at planning time, resolve_table/resolve_plan at execution time). See the [vegafusion-runtime docs on docs.rs](https://docs.rs/vegafusion-runtime/) for the full API.
5 changes: 5 additions & 0 deletions examples/editor-demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Launch gRPC-Web server with:
./vegafusion-server --port 50051 --web
```

Add `--base-url`, `--no-base-url`, `--allowed-base-url`, or `--no-allowed-urls`
to control how the server resolves and accesses external data URLs.
Policy checks apply to the initial resolved URL only; redirect destinations are
not re-checked after a fetch begins.

Build and launch editor with
```
npm install
Expand Down
87 changes: 87 additions & 0 deletions examples/python-examples/plan_resolver_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Demonstrates SQL transpilation using resolve_plan_proto() + unparse_to_sql().
# The resolver receives a serialized logical plan, converts it to SQL, and prints it.
# In a real application you would execute the SQL against a database.

import json
from typing import Any

import pyarrow as pa

import vegafusion as vf
from vegafusion import ExternalDataset, PlanResolver
from vegafusion.plan_resolver import unparse_to_sql


def main() -> None:
source_table = pa.table({"x": [1, 5, 10], "y": ["a", "b", "c"]})
ext = ExternalDataset(scheme="table", schema=source_table.schema, data=source_table)

resolver = SqlTranspileResolver()
rt = vf.VegaFusionRuntime(plan_resolver=resolver)

spec = get_spec()
datasets, warnings = rt.pre_transform_datasets(
spec,
datasets=["filtered"],
inline_datasets={"source": ext},
dataset_format="pyarrow",
)

assert warnings == []
result = datasets[0]
assert result.column("x").to_pylist() == [5, 10]
assert result.column("y").to_pylist() == ["b", "c"]
assert resolver.captured_sql is not None
assert "SELECT" in resolver.captured_sql

print("Captured SQL (postgres dialect):")
print(resolver.captured_sql)
print()
print("Result table:")
print(result)


class SqlTranspileResolver(PlanResolver):
"""Converts the logical plan to Postgres-dialect SQL."""

def __init__(self) -> None:
self.captured_sql: str | None = None

def resolve_plan_proto(
self, plan_bytes: bytes, datasets: dict[str, Any]
) -> pa.Table:
sql = unparse_to_sql(plan_bytes, dialect="postgres")
self.captured_sql = sql

# In a real resolver, you would execute `sql` against your database
# and return the result as an Arrow table. Here we return hardcoded
# data matching the expected query result for demonstration.
return pa.table({"x": [5, 10], "y": ["b", "c"]})


def get_spec() -> dict[str, Any]:
return json.loads("""
{
"$schema": "https://vega.github.io/schema/vega/v5.json",
"data": [
{
"name": "source",
"url": "table://source"
},
{
"name": "filtered",
"source": "source",
"transform": [
{
"type": "filter",
"expr": "datum.x > 3"
}
]
}
]
}
""")


if __name__ == "__main__":
main()
Loading
Loading