diff --git a/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex new file mode 100644 index 000000000..0a537fa72 --- /dev/null +++ b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex @@ -0,0 +1,188 @@ +defmodule RealtimeFilterParser do + @moduledoc """ + Parses Supabase realtime filter strings such as: + + "date=eq.2026-02-03,published_at=not.is.null,area=eq.Oslo\\, Norway,id=in.(1,2,3)" + + Splitting rules: + + • The filter string is split on commas. + • A comma can be escaped using '\\,' and will then be treated as part of the value. + Example: + area=eq.Oslo\\, Norway + becomes: + {"area", "eq", "Oslo, Norway"} + + • Commas inside parentheses are NOT treated as separators. + Example: + id=in.(1,2,3) + + Supported operators: + + eq, neq, lt, lte, gt, gte, in, isnull, notnull + + Special cases: + + is.null → {"column", "isnull", nil} + not.is.null → {"column", "notnull", nil} + + Returns: + + {:ok, [{column, operator, value}, ...]} + {:error, reason} + """ + + @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in", "isnull", "notnull"] + + @spec parse_filter(String.t() | nil) :: + {:ok, list({String.t(), String.t(), any()})} | {:error, String.t()} + def parse_filter(nil), do: {:ok, []} + def parse_filter(""), do: {:ok, []} + + def parse_filter(filter) when is_binary(filter) do + with parts when is_list(parts) <- split_filter(filter), + {:ok, filters} <- parse_parts(parts) do + {:ok, filters} + else + {:error, _} = err -> err + other -> {:error, "unexpected parse error: #{inspect(other)}"} + end + end + + # ------------------------------------------------------------ + # Splitting logic: + # - split on commas unless escaped: '\,' + # - do not split on commas inside parentheses (for in.(...)) + # - unescape '\,' -> ',' in each resulting part + # ------------------------------------------------------------ + + @spec split_filter(String.t()) :: [String.t()] + defp split_filter(filter) do + filter + |> String.graphemes() + |> do_split([], "", 0, false) + |> Enum.map(&String.trim/1) + |> Enum.reject(&(&1 == "")) + |> Enum.map(&String.replace(&1, "\\,", ",")) + end + + # acc: list of completed parts (reversed) + # buf: current part buffer + # paren_depth: nesting depth of parentheses + # escaped: whether previous char was '\' + defp do_split([], acc, buf, _paren_depth, _escaped), + do: Enum.reverse([buf | acc]) + + # split only when comma is outside parentheses and not escaped + defp do_split(["," | rest], acc, buf, 0, false) do + do_split(rest, [buf | acc], "", 0, false) + end + + # backslash means next char is "escaped" for splitting purposes (we keep the backslash) + defp do_split(["\\" | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> "\\", paren_depth, true) + end + + defp do_split(["(" | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> "(", paren_depth + 1, false) + end + + defp do_split([")" | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> ")", max(paren_depth - 1, 0), false) + end + + defp do_split([c | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> c, paren_depth, false) + end + + # ------------------------------------------------------------ + # Parsing logic + # ------------------------------------------------------------ + + @spec parse_parts([String.t()]) :: + {:ok, list({String.t(), String.t(), any()})} | {:error, String.t()} + defp parse_parts(parts) do + parts + |> Enum.reduce_while({:ok, []}, fn part, {:ok, acc} -> + case String.split(part, "=", parts: 2) do + [col, rest] -> + col = String.trim(col) + + case parse_op_and_value(rest) do + {:ok, filter_type, raw_value} -> + if filter_type in @filter_types do + with {:ok, formatted} <- format_filter_value(filter_type, raw_value) do + {:cont, {:ok, [{col, filter_type, formatted} | acc]}} + else + {:error, reason} -> + {:halt, {:error, "failed to parse filter '#{part}': #{reason}"}} + end + else + {:halt, + {:error, + "unsupported filter type '#{filter_type}' for part: '#{part}'. supported: #{inspect(@filter_types)}"}} + end + + {:error, reason} -> + {:halt, {:error, "failed to parse filter '#{part}': #{reason}"}} + end + + _ -> + {:halt, {:error, "missing '=' in filter part: '#{part}'"}} + end + end) + |> case do + {:ok, list} -> {:ok, Enum.reverse(list)} + other -> other + end + end + + # "is.null" => {"isnull", nil} + # "not.is.null" => {"notnull", nil} + # "." => {op, value} (value left untouched; quotes untouched) + @spec parse_op_and_value(String.t()) :: {:ok, String.t(), any()} | {:error, String.t()} + defp parse_op_and_value(rest) when is_binary(rest) do + rest = String.trim(rest) + + case String.split(rest, ".", parts: 3) do + ["is", "null"] -> + {:ok, "isnull", nil} + + ["not", "is", "null"] -> + {:ok, "notnull", nil} + + [filter_type, raw_value] -> + {:ok, filter_type, raw_value} + + _ -> + {:error, "invalid filter format, expected 'is.null', 'not.is.null', or '.'"} + end + end + + # ------------------------------------------------------------ + # Value formatting + # ------------------------------------------------------------ + + @spec format_filter_value(String.t(), any()) :: {:ok, any()} | {:error, String.t()} + defp format_filter_value(filter, value) do + case filter do + "in" -> + case Regex.run(~r/^\((.*)\)$/, value) do + nil -> + {:error, "`in` filter value must be wrapped by parentheses"} + + [_, new_value] -> + {:ok, "{#{new_value}}"} + end + + "isnull" -> + {:ok, nil} + + "notnull" -> + {:ok, nil} + + _ -> + {:ok, value} + end + end +end diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index db6cd027b..b8f1f5365 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -11,8 +11,6 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do @type subscription_params :: {action_filter :: binary, schema :: binary, table :: binary, [filter]} @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}] - @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"] - @spec create(conn(), String.t(), subscription_list, pid(), pid()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}} @@ -199,12 +197,12 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do An unsupported filter will respond with an error tuple: iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"}) - {:error, ~s(Error parsing `filter` params: ["like", "hey"])} + {:error, ~s(Error parsing `filter` params: unsupported filter type 'like' for part: 'subject=like.hey'. supported: ["eq", "neq", "lt", "lte", "gt", "gte", "in", "isnull", "notnull"])} Catch `undefined` filters: iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"}) - {:error, ~s(Error parsing `filter` params: ["undefined"])} + {:error, ~s(Error parsing `filter` params: missing '=' in filter part: 'undefined')} Catch `missing params`: @@ -220,17 +218,9 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do case params do %{"schema" => schema, "table" => table, "filter" => filter} when is_binary(schema) and is_binary(table) and is_binary(filter) -> - with [col, rest] <- String.split(filter, "=", parts: 2), - [filter_type, value] when filter_type in @filter_types <- - String.split(rest, ".", parts: 2), - {:ok, formatted_value} <- format_filter_value(filter_type, value) do - {:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}} - else - {:error, msg} -> - {:error, "Error parsing `filter` params: #{msg}"} - - e -> - {:error, "Error parsing `filter` params: #{inspect(e)}"} + case RealtimeFilterParser.parse_filter(filter) do + {:ok, filters} -> {:ok, {action_filter, schema, table, filters}} + {:error, msg} -> {:error, "Error parsing `filter` params: #{msg}"} end %{"schema" => schema, "table" => table} @@ -267,20 +257,4 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do end defp action_filter(_), do: "*" - - defp format_filter_value(filter, value) do - case filter do - "in" -> - case Regex.run(~r/^\((.*)\)$/, value) do - nil -> - {:error, "`in` filter value must be wrapped by parentheses"} - - [_, new_value] -> - {:ok, "{#{new_value}}"} - end - - _ -> - {:ok, value} - end - end end diff --git a/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs b/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs new file mode 100644 index 000000000..e4c5df74b --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs @@ -0,0 +1,54 @@ +defmodule Realtime.Tenants.Migrations.SupportIsNullFilter do + @moduledoc false + + use Ecto.Migration + + def change do + execute("alter type realtime.equality_op add value 'isnull';") + execute("alter type realtime.equality_op add value 'notnull';") + + execute(" +CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) + RETURNS boolean + LANGUAGE plpgsql + IMMUTABLE +AS $function$ + /* + Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness + */ + declare + op_symbol text = ( + case + when op = 'eq' then '=' + when op = 'neq' then '!=' + when op = 'lt' then '<' + when op = 'lte' then '<=' + when op = 'gt' then '>' + when op = 'gte' then '>=' + when op = 'in' then '= any' + when op = 'isnull' then 'isnull' + when op = 'notnull' then 'notnull' + else 'UNKNOWN OP' + end + ); + res boolean; + begin + if op in ('isnull', 'notnull') then + execute format('select %L::'|| type_::text || ' ' || op_symbol, val_1) into res; + else + execute format( + 'select %L::'|| type_::text || ' ' || op_symbol + || ' ( %L::' + || ( + case + when op = 'in' then type_::text || '[]' + else type_::text end + ) + || ')', val_1, val_2) into res; + end if; + return res; + end; + $function$ + ") + end +end diff --git a/test/realtime/filter_parser_test.exs b/test/realtime/filter_parser_test.exs new file mode 100644 index 000000000..3fd60c33a --- /dev/null +++ b/test/realtime/filter_parser_test.exs @@ -0,0 +1,32 @@ +defmodule RealtimeFilterParserTest do + use ExUnit.Case, async: true + + alias RealtimeFilterParser + + test "parses complex filter string" do + input = + ~s/date=eq.2026-02-03,published_at=not.is.null,area=eq.Oslo\\, Norway,id=in.(1,2,3)/ + + assert {:ok, + [ + {"date", "eq", "2026-02-03"}, + {"published_at", "notnull", nil}, + {"area", "eq", "Oslo, Norway"}, + {"id", "in", "{1,2,3}"} + ]} = RealtimeFilterParser.parse_filter(input) + end + + test "parses in(...) into { ... }" do + assert {:ok, [{"id", "in", "{1,2}"}]} = RealtimeFilterParser.parse_filter("id=in.(1,2)") + end + + test "returns error for in without parentheses" do + # malformed: value not wrapped in parentheses -> should error + assert {:error, _} = RealtimeFilterParser.parse_filter("id=in.1,2,3") + end + + test "empty or nil filter returns ok with empty list" do + assert {:ok, []} = RealtimeFilterParser.parse_filter("") + assert {:ok, []} = RealtimeFilterParser.parse_filter(nil) + end +end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 3663f2acb..9fb45802d 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -168,7 +168,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert_push "system", %{ - message: "Error parsing `filter` params: [\"wrong\"]", + message: "Error parsing `filter` params: missing '=' in filter part: 'wrong'", status: "error", extension: "postgres_changes", channel: "test"