Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions packages/bridge/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
},
"./node": {
"types": "./dist/node.d.ts",
"import": "./dist/node.js"
},
"./beeper": {
"types": "./dist/beeper.d.ts",
"import": "./dist/beeper.js"
Expand Down
76 changes: 76 additions & 0 deletions packages/bridge/src/appservice-websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,84 @@ describe("AppserviceWebsocket", () => {
}));
});

it("forwards appservice transactions before acknowledging them", async () => {
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer });
servers.push(wsServer, httpServer);
await new Promise<void>((resolve) => httpServer.listen(0, "127.0.0.1", resolve));
const homeserver = `http://127.0.0.1:${(httpServer.address() as AddressInfo).port}/_hungryserv/alice`;
let releaseTransaction!: () => void;
const transactionGate = new Promise<void>((resolve) => {
releaseTransaction = resolve;
});
const handleTransaction = vi.fn(() => transactionGate);
let acknowledged = false;
const connected = new Promise<void>((resolve, reject) => {
wsServer.on("connection", (socket) => {
socket.once("message", (raw) => {
try {
acknowledged = true;
const response = JSON.parse(raw.toString()) as { command: string; data: { txn_id: string }; id: number };
expect(response).toEqual({
command: "response",
data: { txn_id: "txn-td" },
id: 8,
});
resolve();
} catch (error) {
reject(error);
}
});
socket.send(JSON.stringify({
command: "transaction",
id: 8,
to_device: [{
content: { device_id: "DESKTOP", event_id: "$event", room_id: "!room:example" },
sender: "@alice:example",
to_device_id: "PICKLE",
to_user_id: "@bot:example",
type: "com.beeper.stream.subscribe",
}],
txn_id: "txn-td",
}));
});
});
const websocket = createWebsocket(homeserver, {
handleTransaction,
log: (() => {}) as BridgeLogger,
});
websockets.push(websocket);

websocket.start();
const ackBeforeRelease = await Promise.race([
connected.then(() => true),
delay(20).then(() => false),
]);
expect(ackBeforeRelease).toBe(false);
expect(acknowledged).toBe(false);
releaseTransaction();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
await connected;

expect(handleTransaction).toHaveBeenCalledWith(expect.objectContaining({
to_device: [expect.objectContaining({
content: { device_id: "DESKTOP", event_id: "$event", room_id: "!room:example" },
sender: "@alice:example",
to_device_id: "PICKLE",
to_user_id: "@bot:example",
type: "com.beeper.stream.subscribe",
})],
txn_id: "txn-td",
}));
});

it("handles http_proxy appservice transaction requests", async () => {
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer });
servers.push(wsServer, httpServer);
await new Promise<void>((resolve) => httpServer.listen(0, "127.0.0.1", resolve));
const homeserver = `http://127.0.0.1:${(httpServer.address() as AddressInfo).port}/_hungryserv/alice`;
const dispatch = vi.fn(async () => {});
const handleTransaction = vi.fn(async () => {});
const connected = new Promise<void>((resolve, reject) => {
wsServer.on("connection", (socket) => {
socket.once("message", (raw) => {
Expand Down Expand Up @@ -113,6 +184,7 @@ describe("AppserviceWebsocket", () => {
});
const websocket = createWebsocket(homeserver, {
dispatch,
handleTransaction,
log: (() => {}) as BridgeLogger,
});
websockets.push(websocket);
Expand All @@ -125,6 +197,10 @@ describe("AppserviceWebsocket", () => {
kind: "message",
text: "proxied",
}));
expect(handleTransaction).toHaveBeenCalledWith(expect.objectContaining({
events: [expect.objectContaining({ event_id: "$proxied" })],
txn_id: "txn-2",
}));
});

it("reconnects with capped exponential backoff and resets after a stable connection", async () => {
Expand Down
18 changes: 17 additions & 1 deletion packages/bridge/src/appservice-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface AppserviceWebsocketOptions {
appservice: MatrixAppserviceInitOptions;
dispatch(event: MatrixClientEvent): Promise<unknown>;
handleHTTPProxy?(request: HTTPProxyRequest): Promise<HTTPProxyResponse | null>;
handleTransaction?(transaction: Record<string, unknown>): Promise<unknown>;
log: BridgeLogger;
onClose?(event: AppserviceWebsocketCloseEvent): void | Promise<void>;
onOpen?(): void | Promise<void>;
Expand Down Expand Up @@ -41,6 +42,7 @@ export class AppserviceWebsocket {
readonly #appservice: MatrixAppserviceInitOptions;
readonly #dispatch: (event: MatrixClientEvent) => Promise<unknown>;
readonly #handleProxy: ((request: HTTPProxyRequest) => Promise<HTTPProxyResponse | null>) | undefined;
readonly #handleTransaction: ((transaction: Record<string, unknown>) => Promise<unknown>) | undefined;
readonly #log: BridgeLogger;
readonly #onClose: ((event: AppserviceWebsocketCloseEvent) => void | Promise<void>) | undefined;
readonly #onOpen: (() => void | Promise<void>) | undefined;
Expand All @@ -61,6 +63,7 @@ export class AppserviceWebsocket {
this.#appservice = options.appservice;
this.#dispatch = options.dispatch;
this.#handleProxy = options.handleHTTPProxy;
this.#handleTransaction = options.handleTransaction;
this.#log = options.log;
this.#onClose = options.onClose;
this.#onOpen = options.onOpen;
Expand Down Expand Up @@ -203,6 +206,7 @@ export class AppserviceWebsocket {
command: message.command ?? "transaction",
eventCount: message.events?.length,
id: message.id,
toDeviceCount: eventCount(message.to_device),
txnId: message.txn_id,
});
try {
Expand All @@ -215,6 +219,7 @@ export class AppserviceWebsocket {
}
if (message.command === "response" || message.command === "error") return;
if (!message.command || message.command === "transaction") {
await this.#handleTransaction?.(message as Record<string, unknown>);
for (const raw of message.events ?? []) {
const event = rawMatrixEvent(raw);
this.#log("debug", "appservice_websocket_transaction_event", {
Expand Down Expand Up @@ -254,12 +259,17 @@ export class AppserviceWebsocket {
const method = request.method ?? "GET";
const transactionMatch = /^\/?_matrix\/app\/v1\/transactions\/([^/]+)$/.exec(path);
if (method === "PUT" && transactionMatch) {
const transaction = objectValue(request.body) ?? {};
const transaction: Record<string, unknown> = {
...(objectValue(request.body) ?? {}),
txn_id: transactionMatch[1],
};
const events = Array.isArray(transaction.events) ? transaction.events : [];
this.#log("debug", "appservice_websocket_http_transaction", {
eventCount: events.length,
toDeviceCount: eventCount(transaction.to_device),
txnId: transactionMatch[1],
});
await this.#handleTransaction?.(transaction);
for (const raw of events) {
const event = rawMatrixEvent(raw as RawMatrixEvent);
if (event) await this.#dispatch(event);
Expand Down Expand Up @@ -317,6 +327,7 @@ interface WebsocketMessage {
events?: RawMatrixEvent[];
id?: number;
status?: string;
to_device?: unknown;
txn_id?: string;
}

Expand All @@ -336,6 +347,7 @@ export interface HTTPProxyResponse {
}

interface RawMatrixEvent {
[key: string]: unknown;
content?: Record<string, unknown>;
event_id?: string;
origin_server_ts?: number;
Expand Down Expand Up @@ -384,6 +396,10 @@ function joinPath(base: string, suffix: string): string {
return `${base.replace(/\/+$/, "")}/${suffix.replace(/^\/+/, "")}`;
}

function eventCount(events: unknown): number | undefined {
return Array.isArray(events) && events.length > 0 ? events.length : undefined;
}

function rawMatrixEvent(raw: RawMatrixEvent): MatrixClientEvent | null {
const type = raw.type ?? "";
const content = raw.content ?? {};
Expand Down
2 changes: 2 additions & 0 deletions packages/bridge/src/beeper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ describe("Beeper bridge manager helpers", () => {
user_ids: [{ exclusive: true, regex: "@dummy_.*:beeper.local" }],
},
rate_limited: false,
receive_ephemeral: true,
sender_localpart: "dummybot",
url: "https://bridge.example",
});
Expand Down Expand Up @@ -93,6 +94,7 @@ describe("Beeper bridge manager helpers", () => {
hsToken: "hs",
id: "sh-dummy",
namespaces: { users: [{ exclusive: true, regex: "@dummy_.*:beeper.local" }] },
receive_ephemeral: true,
senderLocalpart: "dummybot",
url: "",
});
Expand Down
5 changes: 2 additions & 3 deletions packages/bridge/src/beeper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ function normalizeRegistration(raw: unknown): MatrixAppserviceRegistration {
const namespaces = input.namespaces as Record<string, unknown> | undefined;
return stripUndefined({
asToken: stringField(input, "asToken", "as_token"),
ephemeralEvents: booleanField(input, "ephemeralEvents", "ephemeral_events"),
hsToken: stringField(input, "hsToken", "hs_token"),
id: stringField(input, "id"),
msc3202: booleanField(input, "msc3202"),
Expand All @@ -253,8 +252,8 @@ function stringField(input: Record<string, unknown>, camel: string, snake?: stri
return value;
}

function booleanField(input: Record<string, unknown>, camel: string, snake?: string): boolean | undefined {
const value = input[camel] ?? (snake ? input[snake] : undefined);
function booleanField(input: Record<string, unknown>, ...keys: string[]): boolean | undefined {
const value = keys.map((key) => input[key]).find((candidate) => candidate != null);
return typeof value === "boolean" ? value : undefined;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down
Loading
Loading