Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions packages/bridge/src/appservice-websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,63 @@ describe("AppserviceWebsocket", () => {
}));
});

it("forwards appservice transactions before acknowledging them", async () => {
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer });
servers.push(wsServer, httpServer);
await new Promise<void>((resolve) => httpServer.listen(0, "127.0.0.1", resolve));
const homeserver = `http://127.0.0.1:${(httpServer.address() as AddressInfo).port}/_hungryserv/alice`;
const handleTransaction = vi.fn(async () => {});
const connected = new Promise<void>((resolve, reject) => {
wsServer.on("connection", (socket) => {
socket.once("message", (raw) => {
try {
const response = JSON.parse(raw.toString()) as { command: string; data: { txn_id: string }; id: number };
expect(response).toEqual({
command: "response",
data: { txn_id: "txn-td" },
id: 8,
});
resolve();
} catch (error) {
reject(error);
}
});
socket.send(JSON.stringify({
command: "transaction",
id: 8,
to_device: [{
content: { device_id: "DESKTOP", event_id: "$event", room_id: "!room:example" },
sender: "@alice:example",
to_device_id: "PICKLE",
to_user_id: "@bot:example",
type: "com.beeper.stream.subscribe",
}],
txn_id: "txn-td",
}));
});
});
const websocket = createWebsocket(homeserver, {
handleTransaction,
log: (() => {}) as BridgeLogger,
});
websockets.push(websocket);

websocket.start();
await connected;

expect(handleTransaction).toHaveBeenCalledWith(expect.objectContaining({
to_device: [expect.objectContaining({
content: { device_id: "DESKTOP", event_id: "$event", room_id: "!room:example" },
sender: "@alice:example",
to_device_id: "PICKLE",
to_user_id: "@bot:example",
type: "com.beeper.stream.subscribe",
})],
txn_id: "txn-td",
}));
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
});

it("handles http_proxy appservice transaction requests", async () => {
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer });
Expand Down
13 changes: 13 additions & 0 deletions packages/bridge/src/appservice-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface AppserviceWebsocketOptions {
appservice: MatrixAppserviceInitOptions;
dispatch(event: MatrixClientEvent): Promise<unknown>;
handleHTTPProxy?(request: HTTPProxyRequest): Promise<HTTPProxyResponse | null>;
handleTransaction?(transaction: Record<string, unknown>): Promise<unknown>;
log: BridgeLogger;
onClose?(event: AppserviceWebsocketCloseEvent): void | Promise<void>;
onOpen?(): void | Promise<void>;
Expand Down Expand Up @@ -41,6 +42,7 @@ export class AppserviceWebsocket {
readonly #appservice: MatrixAppserviceInitOptions;
readonly #dispatch: (event: MatrixClientEvent) => Promise<unknown>;
readonly #handleProxy: ((request: HTTPProxyRequest) => Promise<HTTPProxyResponse | null>) | undefined;
readonly #handleTransaction: ((transaction: Record<string, unknown>) => Promise<unknown>) | undefined;
readonly #log: BridgeLogger;
readonly #onClose: ((event: AppserviceWebsocketCloseEvent) => void | Promise<void>) | undefined;
readonly #onOpen: (() => void | Promise<void>) | undefined;
Expand All @@ -61,6 +63,7 @@ export class AppserviceWebsocket {
this.#appservice = options.appservice;
this.#dispatch = options.dispatch;
this.#handleProxy = options.handleHTTPProxy;
this.#handleTransaction = options.handleTransaction;
this.#log = options.log;
this.#onClose = options.onClose;
this.#onOpen = options.onOpen;
Expand Down Expand Up @@ -203,6 +206,7 @@ export class AppserviceWebsocket {
command: message.command ?? "transaction",
eventCount: message.events?.length,
id: message.id,
toDeviceCount: eventCount(message.to_device),
txnId: message.txn_id,
});
try {
Expand All @@ -215,6 +219,7 @@ export class AppserviceWebsocket {
}
if (message.command === "response" || message.command === "error") return;
if (!message.command || message.command === "transaction") {
await this.#handleTransaction?.(message as Record<string, unknown>);
for (const raw of message.events ?? []) {
const event = rawMatrixEvent(raw);
this.#log("debug", "appservice_websocket_transaction_event", {
Expand Down Expand Up @@ -258,8 +263,10 @@ export class AppserviceWebsocket {
const events = Array.isArray(transaction.events) ? transaction.events : [];
this.#log("debug", "appservice_websocket_http_transaction", {
eventCount: events.length,
toDeviceCount: eventCount(transaction.to_device),
txnId: transactionMatch[1],
});
await this.#handleTransaction?.(transaction);
for (const raw of events) {
const event = rawMatrixEvent(raw as RawMatrixEvent);
if (event) await this.#dispatch(event);
Expand Down Expand Up @@ -317,6 +324,7 @@ interface WebsocketMessage {
events?: RawMatrixEvent[];
id?: number;
status?: string;
to_device?: unknown;
txn_id?: string;
}

Expand All @@ -336,6 +344,7 @@ export interface HTTPProxyResponse {
}

interface RawMatrixEvent {
[key: string]: unknown;
content?: Record<string, unknown>;
event_id?: string;
origin_server_ts?: number;
Expand Down Expand Up @@ -384,6 +393,10 @@ function joinPath(base: string, suffix: string): string {
return `${base.replace(/\/+$/, "")}/${suffix.replace(/^\/+/, "")}`;
}

function eventCount(events: unknown): number | undefined {
return Array.isArray(events) && events.length > 0 ? events.length : undefined;
}

function rawMatrixEvent(raw: RawMatrixEvent): MatrixClientEvent | null {
const type = raw.type ?? "";
const content = raw.content ?? {};
Expand Down
2 changes: 2 additions & 0 deletions packages/bridge/src/beeper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ describe("Beeper bridge manager helpers", () => {
user_ids: [{ exclusive: true, regex: "@dummy_.*:beeper.local" }],
},
rate_limited: false,
receive_ephemeral: true,
sender_localpart: "dummybot",
url: "https://bridge.example",
});
Expand Down Expand Up @@ -93,6 +94,7 @@ describe("Beeper bridge manager helpers", () => {
hsToken: "hs",
id: "sh-dummy",
namespaces: { users: [{ exclusive: true, regex: "@dummy_.*:beeper.local" }] },
receive_ephemeral: true,
senderLocalpart: "dummybot",
url: "",
});
Expand Down
5 changes: 2 additions & 3 deletions packages/bridge/src/beeper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ function normalizeRegistration(raw: unknown): MatrixAppserviceRegistration {
const namespaces = input.namespaces as Record<string, unknown> | undefined;
return stripUndefined({
asToken: stringField(input, "asToken", "as_token"),
ephemeralEvents: booleanField(input, "ephemeralEvents", "ephemeral_events"),
hsToken: stringField(input, "hsToken", "hs_token"),
id: stringField(input, "id"),
msc3202: booleanField(input, "msc3202"),
Expand All @@ -253,8 +252,8 @@ function stringField(input: Record<string, unknown>, camel: string, snake?: stri
return value;
}

function booleanField(input: Record<string, unknown>, camel: string, snake?: string): boolean | undefined {
const value = input[camel] ?? (snake ? input[snake] : undefined);
function booleanField(input: Record<string, unknown>, ...keys: string[]): boolean | undefined {
const value = keys.map((key) => input[key]).find((candidate) => candidate !== undefined);
return typeof value === "boolean" ? value : undefined;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down
Loading
Loading