Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions packages/bridge/src/appservice-websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,80 @@ describe("AppserviceWebsocket", () => {
}));
});

it("forwards appservice transactions before acknowledging them", async () => {
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer });
servers.push(wsServer, httpServer);
await new Promise<void>((resolve) => httpServer.listen(0, "127.0.0.1", resolve));
const homeserver = `http://127.0.0.1:${(httpServer.address() as AddressInfo).port}/_hungryserv/alice`;
let releaseTransaction!: () => void;
const transactionGate = new Promise<void>((resolve) => {
releaseTransaction = resolve;
});
const handleTransaction = vi.fn(() => transactionGate);
let acknowledged = false;
const connected = new Promise<void>((resolve, reject) => {
wsServer.on("connection", (socket) => {
socket.once("message", (raw) => {
try {
acknowledged = true;
const response = JSON.parse(raw.toString()) as { command: string; data: { txn_id: string }; id: number };
expect(response).toEqual({
command: "response",
data: { txn_id: "txn-td" },
id: 8,
});
resolve();
} catch (error) {
reject(error);
}
});
socket.send(JSON.stringify({
command: "transaction",
id: 8,
to_device: [{
content: { device_id: "DESKTOP", event_id: "$event", room_id: "!room:example" },
sender: "@alice:example",
to_device_id: "PICKLE",
to_user_id: "@bot:example",
type: "com.beeper.stream.subscribe",
}],
txn_id: "txn-td",
}));
});
});
const websocket = createWebsocket(homeserver, {
handleTransaction,
log: (() => {}) as BridgeLogger,
});
websockets.push(websocket);

websocket.start();
await delay(20);
expect(acknowledged).toBe(false);
releaseTransaction();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
await connected;

expect(handleTransaction).toHaveBeenCalledWith(expect.objectContaining({
to_device: [expect.objectContaining({
content: { device_id: "DESKTOP", event_id: "$event", room_id: "!room:example" },
sender: "@alice:example",
to_device_id: "PICKLE",
to_user_id: "@bot:example",
type: "com.beeper.stream.subscribe",
})],
txn_id: "txn-td",
}));
});

it("handles http_proxy appservice transaction requests", async () => {
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer });
servers.push(wsServer, httpServer);
await new Promise<void>((resolve) => httpServer.listen(0, "127.0.0.1", resolve));
const homeserver = `http://127.0.0.1:${(httpServer.address() as AddressInfo).port}/_hungryserv/alice`;
const dispatch = vi.fn(async () => {});
const handleTransaction = vi.fn(async () => {});
const connected = new Promise<void>((resolve, reject) => {
wsServer.on("connection", (socket) => {
socket.once("message", (raw) => {
Expand Down Expand Up @@ -113,6 +180,7 @@ describe("AppserviceWebsocket", () => {
});
const websocket = createWebsocket(homeserver, {
dispatch,
handleTransaction,
log: (() => {}) as BridgeLogger,
});
websockets.push(websocket);
Expand All @@ -125,6 +193,10 @@ describe("AppserviceWebsocket", () => {
kind: "message",
text: "proxied",
}));
expect(handleTransaction).toHaveBeenCalledWith(expect.objectContaining({
events: [expect.objectContaining({ event_id: "$proxied" })],
txn_id: "txn-2",
}));
});

it("reconnects with capped exponential backoff and resets after a stable connection", async () => {
Expand Down
18 changes: 17 additions & 1 deletion packages/bridge/src/appservice-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface AppserviceWebsocketOptions {
appservice: MatrixAppserviceInitOptions;
dispatch(event: MatrixClientEvent): Promise<unknown>;
handleHTTPProxy?(request: HTTPProxyRequest): Promise<HTTPProxyResponse | null>;
handleTransaction?(transaction: Record<string, unknown>): Promise<unknown>;
log: BridgeLogger;
onClose?(event: AppserviceWebsocketCloseEvent): void | Promise<void>;
onOpen?(): void | Promise<void>;
Expand Down Expand Up @@ -41,6 +42,7 @@ export class AppserviceWebsocket {
readonly #appservice: MatrixAppserviceInitOptions;
readonly #dispatch: (event: MatrixClientEvent) => Promise<unknown>;
readonly #handleProxy: ((request: HTTPProxyRequest) => Promise<HTTPProxyResponse | null>) | undefined;
readonly #handleTransaction: ((transaction: Record<string, unknown>) => Promise<unknown>) | undefined;
readonly #log: BridgeLogger;
readonly #onClose: ((event: AppserviceWebsocketCloseEvent) => void | Promise<void>) | undefined;
readonly #onOpen: (() => void | Promise<void>) | undefined;
Expand All @@ -61,6 +63,7 @@ export class AppserviceWebsocket {
this.#appservice = options.appservice;
this.#dispatch = options.dispatch;
this.#handleProxy = options.handleHTTPProxy;
this.#handleTransaction = options.handleTransaction;
this.#log = options.log;
this.#onClose = options.onClose;
this.#onOpen = options.onOpen;
Expand Down Expand Up @@ -203,6 +206,7 @@ export class AppserviceWebsocket {
command: message.command ?? "transaction",
eventCount: message.events?.length,
id: message.id,
toDeviceCount: eventCount(message.to_device),
txnId: message.txn_id,
});
try {
Expand All @@ -215,6 +219,7 @@ export class AppserviceWebsocket {
}
if (message.command === "response" || message.command === "error") return;
if (!message.command || message.command === "transaction") {
await this.#handleTransaction?.(message as Record<string, unknown>);
for (const raw of message.events ?? []) {
const event = rawMatrixEvent(raw);
this.#log("debug", "appservice_websocket_transaction_event", {
Expand Down Expand Up @@ -254,12 +259,17 @@ export class AppserviceWebsocket {
const method = request.method ?? "GET";
const transactionMatch = /^\/?_matrix\/app\/v1\/transactions\/([^/]+)$/.exec(path);
if (method === "PUT" && transactionMatch) {
const transaction = objectValue(request.body) ?? {};
const transaction: Record<string, unknown> = {
...(objectValue(request.body) ?? {}),
txn_id: transactionMatch[1],
};
const events = Array.isArray(transaction.events) ? transaction.events : [];
this.#log("debug", "appservice_websocket_http_transaction", {
eventCount: events.length,
toDeviceCount: eventCount(transaction.to_device),
txnId: transactionMatch[1],
});
await this.#handleTransaction?.(transaction);
for (const raw of events) {
const event = rawMatrixEvent(raw as RawMatrixEvent);
if (event) await this.#dispatch(event);
Expand Down Expand Up @@ -317,6 +327,7 @@ interface WebsocketMessage {
events?: RawMatrixEvent[];
id?: number;
status?: string;
to_device?: unknown;
txn_id?: string;
}

Expand All @@ -336,6 +347,7 @@ export interface HTTPProxyResponse {
}

interface RawMatrixEvent {
[key: string]: unknown;
content?: Record<string, unknown>;
event_id?: string;
origin_server_ts?: number;
Expand Down Expand Up @@ -384,6 +396,10 @@ function joinPath(base: string, suffix: string): string {
return `${base.replace(/\/+$/, "")}/${suffix.replace(/^\/+/, "")}`;
}

function eventCount(events: unknown): number | undefined {
return Array.isArray(events) && events.length > 0 ? events.length : undefined;
}

function rawMatrixEvent(raw: RawMatrixEvent): MatrixClientEvent | null {
const type = raw.type ?? "";
const content = raw.content ?? {};
Expand Down
2 changes: 2 additions & 0 deletions packages/bridge/src/beeper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ describe("Beeper bridge manager helpers", () => {
user_ids: [{ exclusive: true, regex: "@dummy_.*:beeper.local" }],
},
rate_limited: false,
receive_ephemeral: true,
sender_localpart: "dummybot",
url: "https://bridge.example",
});
Expand Down Expand Up @@ -93,6 +94,7 @@ describe("Beeper bridge manager helpers", () => {
hsToken: "hs",
id: "sh-dummy",
namespaces: { users: [{ exclusive: true, regex: "@dummy_.*:beeper.local" }] },
receive_ephemeral: true,
senderLocalpart: "dummybot",
url: "",
});
Expand Down
5 changes: 2 additions & 3 deletions packages/bridge/src/beeper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ function normalizeRegistration(raw: unknown): MatrixAppserviceRegistration {
const namespaces = input.namespaces as Record<string, unknown> | undefined;
return stripUndefined({
asToken: stringField(input, "asToken", "as_token"),
ephemeralEvents: booleanField(input, "ephemeralEvents", "ephemeral_events"),
hsToken: stringField(input, "hsToken", "hs_token"),
id: stringField(input, "id"),
msc3202: booleanField(input, "msc3202"),
Expand All @@ -253,8 +252,8 @@ function stringField(input: Record<string, unknown>, camel: string, snake?: stri
return value;
}

function booleanField(input: Record<string, unknown>, camel: string, snake?: string): boolean | undefined {
const value = input[camel] ?? (snake ? input[snake] : undefined);
function booleanField(input: Record<string, unknown>, ...keys: string[]): boolean | undefined {
const value = keys.map((key) => input[key]).find((candidate) => candidate != null);
return typeof value === "boolean" ? value : undefined;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down
Loading
Loading