Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
defmodule RealtimeFilterParser do
@moduledoc """
Parses Supabase realtime filter strings such as:

"date=eq.2026-02-03,published_at=not.is.null,area=eq.Oslo\\, Norway,id=in.(1,2,3)"

Splitting rules:

• The filter string is split on commas.
• A comma can be escaped using '\\,' and will then be treated as part of the value.
Example:
area=eq.Oslo\\, Norway
becomes:
{"area", "eq", "Oslo, Norway"}

• Commas inside parentheses are NOT treated as separators.
Example:
id=in.(1,2,3)

Supported operators:

eq, neq, lt, lte, gt, gte, in, isnull, notnull

Special cases:

is.null → {"column", "isnull", nil}
not.is.null → {"column", "notnull", nil}

Returns:

{:ok, [{column, operator, value}, ...]}
{:error, reason}
"""

@filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in", "isnull", "notnull"]

@spec parse_filter(String.t() | nil) ::
{:ok, list({String.t(), String.t(), any()})} | {:error, String.t()}
def parse_filter(nil), do: {:ok, []}
def parse_filter(""), do: {:ok, []}

def parse_filter(filter) when is_binary(filter) do
with parts when is_list(parts) <- split_filter(filter),
{:ok, filters} <- parse_parts(parts) do
{:ok, filters}
else
{:error, _} = err -> err
other -> {:error, "unexpected parse error: #{inspect(other)}"}
end
end

# ------------------------------------------------------------
# Splitting logic:
# - split on commas unless escaped: '\,'
# - do not split on commas inside parentheses (for in.(...))
# - unescape '\,' -> ',' in each resulting part
# ------------------------------------------------------------

@spec split_filter(String.t()) :: [String.t()]
defp split_filter(filter) do
filter
|> String.graphemes()
|> do_split([], "", 0, false)
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
|> Enum.map(&String.replace(&1, "\\,", ","))
end

# acc: list of completed parts (reversed)
# buf: current part buffer
# paren_depth: nesting depth of parentheses
# escaped: whether previous char was '\'
defp do_split([], acc, buf, _paren_depth, _escaped),
do: Enum.reverse([buf | acc])

# split only when comma is outside parentheses and not escaped
defp do_split(["," | rest], acc, buf, 0, false) do
do_split(rest, [buf | acc], "", 0, false)
end

# backslash means next char is "escaped" for splitting purposes (we keep the backslash)
defp do_split(["\\" | rest], acc, buf, paren_depth, _escaped) do
do_split(rest, acc, buf <> "\\", paren_depth, true)
end

defp do_split(["(" | rest], acc, buf, paren_depth, _escaped) do
do_split(rest, acc, buf <> "(", paren_depth + 1, false)
end

defp do_split([")" | rest], acc, buf, paren_depth, _escaped) do
do_split(rest, acc, buf <> ")", max(paren_depth - 1, 0), false)
end

defp do_split([c | rest], acc, buf, paren_depth, _escaped) do
do_split(rest, acc, buf <> c, paren_depth, false)
end

# ------------------------------------------------------------
# Parsing logic
# ------------------------------------------------------------

@spec parse_parts([String.t()]) ::
{:ok, list({String.t(), String.t(), any()})} | {:error, String.t()}
defp parse_parts(parts) do
parts
|> Enum.reduce_while({:ok, []}, fn part, {:ok, acc} ->
case String.split(part, "=", parts: 2) do
[col, rest] ->
col = String.trim(col)

case parse_op_and_value(rest) do
{:ok, filter_type, raw_value} ->
if filter_type in @filter_types do
with {:ok, formatted} <- format_filter_value(filter_type, raw_value) do
{:cont, {:ok, [{col, filter_type, formatted} | acc]}}
else
{:error, reason} ->
{:halt, {:error, "failed to parse filter '#{part}': #{reason}"}}
end
else
{:halt,
{:error,
"unsupported filter type '#{filter_type}' for part: '#{part}'. supported: #{inspect(@filter_types)}"}}
end

{:error, reason} ->
{:halt, {:error, "failed to parse filter '#{part}': #{reason}"}}
end

_ ->
{:halt, {:error, "missing '=' in filter part: '#{part}'"}}
end
end)
|> case do
{:ok, list} -> {:ok, Enum.reverse(list)}
other -> other
end
end

# "is.null" => {"isnull", nil}
# "not.is.null" => {"notnull", nil}
# "<op>.<value>" => {op, value} (value left untouched; quotes untouched)
@spec parse_op_and_value(String.t()) :: {:ok, String.t(), any()} | {:error, String.t()}
defp parse_op_and_value(rest) when is_binary(rest) do
rest = String.trim(rest)

case String.split(rest, ".", parts: 3) do
["is", "null"] ->
{:ok, "isnull", nil}

["not", "is", "null"] ->
{:ok, "notnull", nil}

[filter_type, raw_value] ->
{:ok, filter_type, raw_value}

_ ->
{:error, "invalid filter format, expected 'is.null', 'not.is.null', or '<op>.<value>'"}
end
end

# ------------------------------------------------------------
# Value formatting
# ------------------------------------------------------------

@spec format_filter_value(String.t(), any()) :: {:ok, any()} | {:error, String.t()}
defp format_filter_value(filter, value) do
case filter do
"in" ->
case Regex.run(~r/^\((.*)\)$/, value) do
nil ->
{:error, "`in` filter value must be wrapped by parentheses"}

[_, new_value] ->
{:ok, "{#{new_value}}"}
end

"isnull" ->
{:ok, nil}

"notnull" ->
{:ok, nil}

_ ->
{:ok, value}
end
end
end
36 changes: 5 additions & 31 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
@type subscription_params :: {action_filter :: binary, schema :: binary, table :: binary, [filter]}
@type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]

@filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]

@spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
{:ok, Postgrex.Result.t()}
| {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}}
Expand Down Expand Up @@ -199,12 +197,12 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
An unsupported filter will respond with an error tuple:

iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
{:error, ~s(Error parsing `filter` params: ["like", "hey"])}
{:error, ~s(Error parsing `filter` params: unsupported filter type 'like' for part: 'subject=like.hey'. supported: ["eq", "neq", "lt", "lte", "gt", "gte", "in", "isnull", "notnull"])}

Catch `undefined` filters:

iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
{:error, ~s(Error parsing `filter` params: ["undefined"])}
{:error, ~s(Error parsing `filter` params: missing '=' in filter part: 'undefined')}

Catch `missing params`:

Expand All @@ -220,17 +218,9 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
case params do
%{"schema" => schema, "table" => table, "filter" => filter}
when is_binary(schema) and is_binary(table) and is_binary(filter) ->
with [col, rest] <- String.split(filter, "=", parts: 2),
[filter_type, value] when filter_type in @filter_types <-
String.split(rest, ".", parts: 2),
{:ok, formatted_value} <- format_filter_value(filter_type, value) do
{:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}}
else
{:error, msg} ->
{:error, "Error parsing `filter` params: #{msg}"}

e ->
{:error, "Error parsing `filter` params: #{inspect(e)}"}
case RealtimeFilterParser.parse_filter(filter) do
{:ok, filters} -> {:ok, {action_filter, schema, table, filters}}
{:error, msg} -> {:error, "Error parsing `filter` params: #{msg}"}
end

%{"schema" => schema, "table" => table}
Expand Down Expand Up @@ -267,20 +257,4 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
end

defp action_filter(_), do: "*"

defp format_filter_value(filter, value) do
case filter do
"in" ->
case Regex.run(~r/^\((.*)\)$/, value) do
nil ->
{:error, "`in` filter value must be wrapped by parentheses"}

[_, new_value] ->
{:ok, "{#{new_value}}"}
end

_ ->
{:ok, value}
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Realtime.Tenants.Migrations.SupportIsNullFilter do
@moduledoc false

use Ecto.Migration

def change do
execute("alter type realtime.equality_op add value 'isnull';")
execute("alter type realtime.equality_op add value 'notnull';")

execute("
CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text)
RETURNS boolean
LANGUAGE plpgsql
IMMUTABLE
AS $function$
/*
Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness
*/
declare
op_symbol text = (
case
when op = 'eq' then '='
when op = 'neq' then '!='
when op = 'lt' then '<'
when op = 'lte' then '<='
when op = 'gt' then '>'
when op = 'gte' then '>='
when op = 'in' then '= any'
when op = 'isnull' then 'isnull'
when op = 'notnull' then 'notnull'
else 'UNKNOWN OP'
end
);
res boolean;
begin
if op in ('isnull', 'notnull') then
execute format('select %L::'|| type_::text || ' ' || op_symbol, val_1) into res;
else
execute format(
'select %L::'|| type_::text || ' ' || op_symbol
|| ' ( %L::'
|| (
case
when op = 'in' then type_::text || '[]'
else type_::text end
)
|| ')', val_1, val_2) into res;
end if;
return res;
end;
$function$
")
end
end
32 changes: 32 additions & 0 deletions test/realtime/filter_parser_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule RealtimeFilterParserTest do
use ExUnit.Case, async: true

alias RealtimeFilterParser

test "parses complex filter string" do
input =
~s/date=eq.2026-02-03,published_at=not.is.null,area=eq.Oslo\\, Norway,id=in.(1,2,3)/

assert {:ok,
[
{"date", "eq", "2026-02-03"},
{"published_at", "notnull", nil},
{"area", "eq", "Oslo, Norway"},
{"id", "in", "{1,2,3}"}
]} = RealtimeFilterParser.parse_filter(input)
end

test "parses in(...) into { ... }" do
assert {:ok, [{"id", "in", "{1,2}"}]} = RealtimeFilterParser.parse_filter("id=in.(1,2)")
end

test "returns error for in without parentheses" do
# malformed: value not wrapped in parentheses -> should error
assert {:error, _} = RealtimeFilterParser.parse_filter("id=in.1,2,3")
end

test "empty or nil filter returns ok with empty list" do
assert {:ok, []} = RealtimeFilterParser.parse_filter("")
assert {:ok, []} = RealtimeFilterParser.parse_filter(nil)
end
end
2 changes: 1 addition & 1 deletion test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do

assert_push "system",
%{
message: "Error parsing `filter` params: [\"wrong\"]",
message: "Error parsing `filter` params: missing '=' in filter part: 'wrong'",
status: "error",
extension: "postgres_changes",
channel: "test"
Expand Down