From 7e2d2900074384eee00b752898fe2563360ac710 Mon Sep 17 00:00:00 2001 From: Brandon McAnsh Date: Fri, 5 Jun 2026 08:20:38 -0400 Subject: [PATCH 1/4] chore(protos): update flipcash protobuf definitions New ChatId message, ChatUpdate event type for real-time chat streaming, and objc_class_prefix additions on contact protos. Signed-off-by: Brandon McAnsh --- .../src/main/proto/chat/v1/chat_service.proto | 99 ++++++++++++ .../protos/src/main/proto/chat/v1/model.proto | 71 +++++++++ .../src/main/proto/common/v1/common.proto | 7 + .../contact/v1/contact_list_service.proto | 1 + .../src/main/proto/contact/v1/model.proto | 1 + .../src/main/proto/event/v1/model.proto | 24 ++- .../messaging/v1/messaging_service.proto | 140 +++++++++++++++++ .../src/main/proto/messaging/v1/model.proto | 147 ++++++++++++++++++ 8 files changed, 489 insertions(+), 1 deletion(-) create mode 100644 definitions/flipcash/protos/src/main/proto/chat/v1/chat_service.proto create mode 100644 definitions/flipcash/protos/src/main/proto/chat/v1/model.proto create mode 100644 definitions/flipcash/protos/src/main/proto/messaging/v1/messaging_service.proto create mode 100644 definitions/flipcash/protos/src/main/proto/messaging/v1/model.proto diff --git a/definitions/flipcash/protos/src/main/proto/chat/v1/chat_service.proto b/definitions/flipcash/protos/src/main/proto/chat/v1/chat_service.proto new file mode 100644 index 000000000..7a38e131e --- /dev/null +++ b/definitions/flipcash/protos/src/main/proto/chat/v1/chat_service.proto @@ -0,0 +1,99 @@ +syntax = "proto3"; + +package flipcash.chat.v1; + +option go_package = "github.com/code-payments/flipcash2-protobuf-api/generated/go/chat/v1;chatpb"; +option java_package = "com.codeinc.flipcash.gen.chat.v1"; +option objc_class_prefix = "FPBChatV1"; + +import "chat/v1/model.proto"; +import "common/v1/common.proto"; +import "validate/validate.proto"; + +service Chat { + // GetChat returns the metadata for a specific chat + rpc GetChat(GetChatRequest) returns (GetChatResponse); + + // GetDmChatFeed gets the set of DM chats for an owner account using + // a paged API, ordered by last activity with the most recent first. + // + // Chats are ordered by a mutable key (last_activity), so pagination alone + // cannot guarantee a complete read: a chat can receive new activity and + // move into a region the client has already paged past. To get the full + // list, the client MUST combine this RPC with the event stream: + // + // 1. Open the event stream to receive ChatUpdate and begin buffering updates + // BEFORE the first GetDmChatFeed call. This ordering is the contract that + // closes the gap; subscribing after pagination starts can drop chats. + // 2. Page through GetDmChatFeed to exhaustion (until has_more is false), + // always echoing back the paging token returned by the prior response. + // All pages are served against a single snapshot pinned by that token, + // so the set is read consistently. + // 3. Merge the buffered and ongoing stream updates onto the paginated + // set. Any chat whose activity changed after the snapshot watermark + // is delivered via the stream rather than via pagination. + // + // Read together, pagination guarantees the set (every chat exactly once) + // and the stream guarantees freshness and ordering. The local last_activity + // sort is maintained by the client from the stream after the initial read. + rpc GetDmChatFeed(GetDmChatFeedRequest) returns (GetDmChatFeedResponse); +} + +message GetChatRequest { + common.v1.ChatId chat_id = 1; + + common.v1.Auth auth = 10; +} + +message GetChatResponse { + Result result = 1; + enum Result { + OK = 0; + DENIED = 1; + NOT_FOUND = 2; + } + + Metadata metadata = 2; +} + +message GetDmChatFeedRequest { + // QueryOptions controls page_size. Ordering is fixed to most-recent + // activity first and is not client-selectable. + // + // Leave query_options.paging_token unset on the first request: the server + // mints a token that pins a new snapshot and returns it in the response. On + // every subsequent request, set query_options.paging_token to the + // paging_token from the most recent response to advance within the same + // snapshot. The token is opaque and server-generated; do not construct it. + common.v1.QueryOptions query_options = 1; + + common.v1.Auth auth = 10 [(validate.rules).message.required = true]; +} + +message GetDmChatFeedResponse { + Result result = 1; + enum Result { + OK = 0; + DENIED = 1; + NOT_FOUND = 2; + } + + repeated Metadata chats = 2 [(validate.rules).repeated = { + min_items: 0 + max_items: 100 + }]; + + // PagingToken is the server-generated token for this paginated read. On the + // first response it pins a new snapshot; on later responses it carries the + // advanced cursor over (last_activity, chat_id). The client MUST send the + // most recent value back in query_options.paging_token on the next + // GetDmChatFeedRequest. Set when result is OK. + common.v1.PagingToken paging_token = 3; + + // HasMore indicates whether further pages remain in this snapshot. When + // false, the paginated set has been fully read; the complete chat list is + // this set reconciled with the event stream (see GetDmChatFeed). When true, the + // client should issue another GetDmChatFeedRequest with the returned + // paging_token. + bool has_more = 4; +} diff --git a/definitions/flipcash/protos/src/main/proto/chat/v1/model.proto b/definitions/flipcash/protos/src/main/proto/chat/v1/model.proto new file mode 100644 index 000000000..7256ca6d2 --- /dev/null +++ b/definitions/flipcash/protos/src/main/proto/chat/v1/model.proto @@ -0,0 +1,71 @@ +syntax = "proto3"; + +package flipcash.chat.v1; + +option go_package = "github.com/code-payments/flipcash2-protobuf-api/generated/go/chat/v1;chatpb"; +option java_package = "com.codeinc.flipcash.gen.chat.v1"; +option objc_class_prefix = "FPBChatV1"; + +import "common/v1/common.proto"; +import "profile/v1/model.proto"; +import "messaging/v1/model.proto"; +import "validate/validate.proto"; +import "google/protobuf/timestamp.proto"; + +message Metadata { + common.v1.ChatId chat_id = 1 [(validate.rules).message.required = true]; + + // The type of chat + ChatType type = 2 [(validate.rules).enum = { + not_in: [0] // UNKNOWN + }]; + enum ChatType { + UNKNOWN = 0; + DM = 1; + } + + // Members of this chat + repeated Member members = 3; + + // The last message in this chat + messaging.v1.Message last_message = 4; + + // The timestamp of the last activity in this chat + google.protobuf.Timestamp last_activity = 5 [(validate.rules).timestamp.required = true]; +} + +message Member { + common.v1.UserId user_id = 1 [(validate.rules).message.required = true]; + + // The user profile for this member. It contains a subset of identifiers + // that can be publicly viewed within the chat. + profile.v1.UserProfile user_profile = 2 [(validate.rules).message.required = true]; + + // Chat message state for this member. + // + // If set, the list may contain DELIVERED and READ pointers. SENT pointers + // are only shared between the sender and server, to indicate persistence. + repeated messaging.v1.Pointer pointers = 3 [(validate.rules).repeated = { + min_items: 0 + max_items: 2 + }]; +} + +message MetadataUpdate { + oneof kind { + option (validate.required) = true; + + FullRefresh full_refresh = 1; + LastActivityChanged last_activity_changed = 2; + } + + // Refreshes the entire chat metadata + message FullRefresh { + Metadata metadata = 1 [(validate.rules).message.required = true]; + } + + // The last activity timestamp has changed to a newer value + message LastActivityChanged { + google.protobuf.Timestamp new_last_activity = 1 [(validate.rules).timestamp.required = true]; + } +} diff --git a/definitions/flipcash/protos/src/main/proto/common/v1/common.proto b/definitions/flipcash/protos/src/main/proto/common/v1/common.proto index d43c840dd..6d42623c7 100644 --- a/definitions/flipcash/protos/src/main/proto/common/v1/common.proto +++ b/definitions/flipcash/protos/src/main/proto/common/v1/common.proto @@ -58,6 +58,13 @@ message UserId { }]; } +message ChatId { + bytes value = 1 [(validate.rules).bytes = { + min_len: 32 + max_len: 32 + }]; +} + // AppInstallId is a unque ID tied to a client app installation. It does not // identify a device. Value should remain private and not be shared across // installs. diff --git a/definitions/flipcash/protos/src/main/proto/contact/v1/contact_list_service.proto b/definitions/flipcash/protos/src/main/proto/contact/v1/contact_list_service.proto index 9276a6c82..c78e4e273 100644 --- a/definitions/flipcash/protos/src/main/proto/contact/v1/contact_list_service.proto +++ b/definitions/flipcash/protos/src/main/proto/contact/v1/contact_list_service.proto @@ -9,6 +9,7 @@ import "validate/validate.proto"; option go_package = "github.com/code-payments/flipcash2-protobuf-api/generated/go/contact/v1;contactpb"; option java_package = "com.codeinc.flipcash.gen.contact.v1"; +option objc_class_prefix = "FPBContactV1"; // ContactList manages a user's contact list and surfaces which contacts are // Flipcash users. diff --git a/definitions/flipcash/protos/src/main/proto/contact/v1/model.proto b/definitions/flipcash/protos/src/main/proto/contact/v1/model.proto index c6f79bbd5..c6a62e8c0 100644 --- a/definitions/flipcash/protos/src/main/proto/contact/v1/model.proto +++ b/definitions/flipcash/protos/src/main/proto/contact/v1/model.proto @@ -7,6 +7,7 @@ import "validate/validate.proto"; option go_package = "github.com/code-payments/flipcash2-protobuf-api/generated/go/contact/v1;contactpb"; option java_package = "com.codeinc.flipcash.gen.contact.v1"; +option objc_class_prefix = "FPBContactV1"; message FlipcashContact { phone.v1.PhoneNumber phone = 1 [(validate.rules).message.required = true]; diff --git a/definitions/flipcash/protos/src/main/proto/event/v1/model.proto b/definitions/flipcash/protos/src/main/proto/event/v1/model.proto index 3b1bfaa10..6262a4995 100644 --- a/definitions/flipcash/protos/src/main/proto/event/v1/model.proto +++ b/definitions/flipcash/protos/src/main/proto/event/v1/model.proto @@ -6,7 +6,9 @@ option go_package = "github.com/code-payments/flipcash2-protobuf-api/generated/g option java_package = "com.codeinc.flipcash.gen.events.v1"; option objc_class_prefix = "FPBEventV1"; +import "chat/v1/model.proto"; import "common/v1/common.proto"; +import "messaging/v1/model.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "validate/validate.proto"; @@ -27,7 +29,8 @@ message Event { oneof type { option (validate.required) = true; - TestEvent test = 3; + TestEvent test = 3; + ChatUpdate chat_update = 4; } } @@ -71,3 +74,22 @@ message ClientPong { // of potential network latency google.protobuf.Timestamp timestamp = 1 [(validate.rules).timestamp.required = true]; } + +message ChatUpdate { + // The chat that this update is for + common.v1.ChatId chat = 1 [(validate.rules).message.required = true]; + + // If present, new real-time messages sent on the chat + messaging.v1.MessageBatch new_messages = 2; + + // If present, message pointer updates for members in the chat + messaging.v1.PointerBatch pointer_updates = 3; + + // If present, message typing notification state changes for members in the chat + messaging.v1.IsTypingNotificationBatch is_typing_notifications = 4; + + // If present, updates to the chat metadata + repeated chat.v1.MetadataUpdate metadata_updates = 5 [(validate.rules).repeated = { + max_items: 1024 // Arbitrary + }]; +} diff --git a/definitions/flipcash/protos/src/main/proto/messaging/v1/messaging_service.proto b/definitions/flipcash/protos/src/main/proto/messaging/v1/messaging_service.proto new file mode 100644 index 000000000..975451c6a --- /dev/null +++ b/definitions/flipcash/protos/src/main/proto/messaging/v1/messaging_service.proto @@ -0,0 +1,140 @@ +syntax = "proto3"; + +package flipcash.messaging.v1; + +option go_package = "github.com/code-payments/flipcash2-protobuf-api/generated/go/messaging/v1;messagingpb"; +option java_package = "com.codeinc.flipcash.gen.messaging.v1"; +option objc_class_prefix = "FPBMessagingV1"; + +import "common/v1/common.proto"; +import "messaging/v1/model.proto"; +import "validate/validate.proto"; + +service Messaging { + // GetMessage gets a single message in a chat + rpc GetMessage(GetMessageRequest) returns (GetMessageResponse); + + // GetMessages gets the set of messages for a chat using a paged and batched APIs + rpc GetMessages(GetMessagesRequest) returns (GetMessagesResponse); + + // SendMessage sends a message to a chat. + rpc SendMessage(SendMessageRequest) returns (SendMessageResponse); + + // AdvancePointer advances a pointer in message history for a chat member. + rpc AdvancePointer(AdvancePointerRequest) returns (AdvancePointerResponse); + + // NotifyIsTypingRequest notifies a chat that the sending member is typing. + // + // These requests are transient, and may be dropped at any point. + rpc NotifyIsTyping(NotifyIsTypingRequest) returns (NotifyIsTypingResponse); +} + +message GetMessageRequest { + common.v1.ChatId chat_id = 1 [(validate.rules).message.required = true]; + + MessageId message_id = 2 [(validate.rules).message.required = true]; + + common.v1.Auth auth = 10; +} + +message GetMessageResponse { + Result result = 1; + enum Result { + OK = 0; + DENIED = 1; + NOT_FOUND = 2; + } + + Message message = 2; +} + +message GetMessagesRequest { + common.v1.ChatId chat_id = 1 [(validate.rules).message.required = true]; + + oneof query { + option (validate.required) = true; + + common.v1.QueryOptions options = 2; + MessageIdBatch message_ids = 3; + } + + common.v1.Auth auth = 10; +} + +message GetMessagesResponse { + Result result = 1; + enum Result { + OK = 0; + DENIED = 1; + NOT_FOUND = 2; + } + + MessageBatch messages = 2; +} + +message SendMessageRequest { + common.v1.ChatId chat_id = 1 [(validate.rules).message.required = true]; + + // Allowed content types that can be sent by client: + // - TextContent + repeated Content content = 2 [(validate.rules).repeated = { + min_items: 1 + max_items: 1 + }]; + + // Client-generated idempotency token for this send. Used to dedup retried + // sends and to correlate the optimistic local echo with the server-assigned + // message returned in the response. + ClientMessageId client_message_id = 3 [(validate.rules).message.required = true]; + + common.v1.Auth auth = 10 [(validate.rules).message.required = true]; +} + +message SendMessageResponse { + Result result = 1; + enum Result { + OK = 0; + DENIED = 1; + } + + // The chat message that was sent if the RPC was succesful, which includes + // server-side metadata like the generated message ID and official timestamp + Message message = 2; +} + +message AdvancePointerRequest { + common.v1.ChatId chat_id = 1 [(validate.rules).message.required = true]; + + Pointer.Type pointer_type = 2 [(validate.rules).enum = { + in: [2, 3] // DELIVERED, READ + }]; + + MessageId new_value = 3 [(validate.rules).message.required = true]; + + common.v1.Auth auth = 10 [(validate.rules).message.required = true]; +} + +message AdvancePointerResponse { + Result result = 1; + enum Result { + OK = 0; + DENIED = 1; + MESSAGE_NOT_FOUND = 2; + } +} + +message NotifyIsTypingRequest { + common.v1.ChatId chat_id = 1 [(validate.rules).message.required = true]; + + IsTypingNotification.State state = 2; + + common.v1.Auth auth = 10 [(validate.rules).message.required = true]; +} + +message NotifyIsTypingResponse { + Result result = 1; + enum Result { + OK = 0; + DENIED = 1; + } +} \ No newline at end of file diff --git a/definitions/flipcash/protos/src/main/proto/messaging/v1/model.proto b/definitions/flipcash/protos/src/main/proto/messaging/v1/model.proto new file mode 100644 index 000000000..10c7cf34f --- /dev/null +++ b/definitions/flipcash/protos/src/main/proto/messaging/v1/model.proto @@ -0,0 +1,147 @@ +syntax = "proto3"; + +package flipcash.messaging.v1; + +option go_package = "github.com/code-payments/flipcash2-protobuf-api/generated/go/messaging/v1;messagingpb"; +option java_package = "com.codeinc.flipcash.gen.messaging.v1"; +option objc_class_prefix = "FPBMessagingV1"; + +import "common/v1/common.proto"; +import "google/protobuf/timestamp.proto"; +import "validate/validate.proto"; + +message MessageId { + // Per-chat, server-assigned, gapless sequence number. Together with the + // chat ID this is the message's canonical identity, sort key, and + // pagination cursor. Gapless ordering lets clients trivially detect missing + // messages: a complete history has no gaps between consecutive numbers. + uint64 value = 1 [(validate.rules).uint64.gte = 1]; +} + +// ClientMessageId is a client-generated identifier for a message send. +// +// It serves two purposes: +// - Idempotency: the server dedups on this value, so a retried SendMessage +// (e.g. after a network failure) returns the originally created message +// instead of assigning a new sequence number and creating a duplicate. +// - Correlation: clients use it to match an optimistic local echo to the +// server-assigned Message returned in the response. +// +// Unlike MessageId, this is owned by the client and is not the message's +// canonical identity; it is typically a randomly generated UUID. +message ClientMessageId { + bytes value = 1 [(validate.rules).bytes = { + min_len: 16 + max_len: 16 + }]; +} + +// A message in a chat +message Message { + // Per-chat sequence number identifying this message + MessageId message_id = 1 [(validate.rules).message.required = true]; + + // The chat member that sent the message. For system-level messages, + // this will be ommitted. + common.v1.UserId sender_id = 2; + + // Message content, which is currently guaranteed to have exactly one item. + repeated Content content = 3 [(validate.rules).repeated = { + min_items: 1 + max_items: 1 + }]; + + // Timestamp this message was generated at. + google.protobuf.Timestamp ts = 4 [(validate.rules).timestamp.required = true]; + + // The number of unread-eligible messages in this chat up to and including + // this message. This is a SEPARATE sequence from message_id: messages that + // don't count toward unread keep their message_id but do NOT advance this + // value — they carry the previous count forward, so every message reports + // the running total. A member's unread count is computed entirely on the + // client as the difference between the latest message's unread_seq and the + // unread_seq of the message at their READ pointer. + uint64 unread_seq = 5; +} + +// Content for a chat message +message Content { + oneof type { + option (validate.required) = true; + + TextContent text = 1; + } +} + +// Raw text content +message TextContent { + string text = 1 [(validate.rules).string = { + min_len: 1 + max_len: 4096 + }]; +} + +// Pointer in a chat indicating a user's message history state in a chat. +message Pointer { + // The type of pointer indicates which user's message history state can be + // inferred from the pointer value. It is also possible to infer cross-pointer + // state. For example, if a chat member has a READ pointer for a message with + // ID N, then the DELIVERED pointer must be at least N. + Type type = 1 [(validate.rules).enum = { + not_in: [0] + }]; + enum Type { + UNKNOWN = 0; + SENT = 1; // Always inferred by OK result in SendMessageResponse or message presence in a chat + DELIVERED = 2; + READ = 3; + } + + // The user ID associated with the pointer + common.v1.UserId user_id = 2 [(validate.rules).message.required = true]; + + // Everything at or before this message ID is considered to have the state + // inferred by the type of pointer. + MessageId value = 3 [(validate.rules).message.required = true]; +} + +message MessageIdBatch { + repeated MessageId message_ids = 1 [(validate.rules).repeated = { + min_items: 1 + max_items: 100 + }]; +} + +message MessageBatch { + repeated Message messages = 1 [(validate.rules).repeated = { + min_items: 1 + max_items: 100 + }]; +} + +message PointerBatch { + repeated Pointer pointers = 1 [(validate.rules).repeated = { + min_items: 1 + max_items: 100 + }]; +} + +message IsTypingNotification { + common.v1.UserId user_id = 1 [(validate.rules).message.required = true]; + + State state = 2; + enum State { + UNKNOWN_TYPING_STATE = 0; + STARTED_TYPING = 1; + STILL_TYPING = 2; + STOPPED_TYPING = 3; + TYPING_TIMED_OUT = 4; + } +} + +message IsTypingNotificationBatch { + repeated IsTypingNotification is_typing_notifications = 1 [(validate.rules).repeated = { + min_items: 1 + max_items: 100 // Arbitrary + }]; +} From 3faa906270b86dbd0510d876232ee77732652250 Mon Sep 17 00:00:00 2001 From: Brandon McAnsh Date: Fri, 5 Jun 2026 09:07:22 -0400 Subject: [PATCH 2/4] feat(events): add EventStreaming service layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bidirectional event stream Api, Service, Repository, and Controller plus shared chat domain models and proto ↔ domain extensions. Domain models: ChatId, ChatMessage, MessageContent, MessagePointer, TypingNotification, ChatType, ChatMember, ChatMetadata, MetadataUpdate, ChatUpdate. Uses openBidirectionalStream from OCP bidi infrastructure with ping/pong keepalive. EventStreamingController exposes a SharedFlow of ChatUpdate for downstream consumers. Signed-off-by: Brandon McAnsh --- .../controllers/EventStreamingController.kt | 53 +++++++ .../services/inject/FlipcashModule.kt | 8 + .../internal/network/api/EventStreamingApi.kt | 26 ++++ .../network/extensions/LocalToProtobuf.kt | 7 +- .../network/extensions/ProtobufToLocal.kt | 141 ++++++++++++++++++ .../network/services/EventStreamingService.kt | 137 +++++++++++++++++ .../InternalEventStreamingRepository.kt | 21 +++ .../com/flipcash/services/models/Errors.kt | 10 ++ .../flipcash/services/models/chat/ChatId.kt | 13 ++ .../services/models/chat/ChatMember.kt | 10 ++ .../services/models/chat/ChatMessage.kt | 12 ++ .../services/models/chat/ChatMetadata.kt | 11 ++ .../flipcash/services/models/chat/ChatType.kt | 6 + .../services/models/chat/ChatUpdate.kt | 9 ++ .../services/models/chat/MessageContent.kt | 5 + .../services/models/chat/MessagePointer.kt | 16 ++ .../services/models/chat/MetadataUpdate.kt | 8 + .../models/chat/TypingNotification.kt | 16 ++ .../repository/EventStreamingRepository.kt | 15 ++ .../EventStreamingControllerTest.kt | 89 +++++++++++ .../flipcash/services/models/ErrorsTest.kt | 24 +++ .../services/models/chat/ChatIdTest.kt | 43 ++++++ .../services/models/chat/DomainModelsTest.kt | 99 ++++++++++++ 23 files changed, 778 insertions(+), 1 deletion(-) create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalEventStreamingRepository.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatId.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMember.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMessage.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMetadata.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatType.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatUpdate.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessageContent.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessagePointer.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MetadataUpdate.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/TypingNotification.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/repository/EventStreamingRepository.kt create mode 100644 services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt create mode 100644 services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/ChatIdTest.kt create mode 100644 services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/DomainModelsTest.kt diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt new file mode 100644 index 000000000..6cc5d9eee --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt @@ -0,0 +1,53 @@ +package com.flipcash.services.controllers + +import com.flipcash.services.internal.network.services.EventStreamReference +import com.flipcash.services.models.chat.ChatUpdate +import com.flipcash.services.repository.EventStreamingRepository +import com.flipcash.services.user.UserManager +import com.getcode.utils.trace +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class EventStreamingController @Inject constructor( + private val repository: EventStreamingRepository, + private val userManager: UserManager, +) { + private val _chatUpdates = MutableSharedFlow( + replay = 0, + extraBufferCapacity = 64, + onBufferOverflow = kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST, + ) + val chatUpdates: SharedFlow = _chatUpdates.asSharedFlow() + + private var streamRef: EventStreamReference? = null + + fun open(scope: CoroutineScope) { + val owner = userManager.accountCluster?.authority?.keyPair ?: run { + trace("EventStreamingController: No account cluster, cannot open stream") + return + } + + close() + + streamRef = repository.openEventStream( + scope = scope, + owner = owner, + onEvent = { update -> + _chatUpdates.tryEmit(update) + }, + onError = { error -> + trace("EventStreamingController: Stream error: ${error.message}") + }, + ) + } + + fun close() { + streamRef?.destroy() + streamRef = null + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt index c076fdfa0..8a103436b 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt @@ -11,6 +11,7 @@ import com.flipcash.services.internal.domain.TextModerationResponseMapper import com.flipcash.services.internal.domain.UserProfileMapper import com.flipcash.services.internal.network.services.AccountService import com.flipcash.services.internal.network.services.ActivityFeedService +import com.flipcash.services.internal.network.services.EventStreamingService import com.flipcash.services.internal.network.services.EmailVerificationService import com.flipcash.services.internal.network.services.ContactListService import com.flipcash.services.internal.network.services.ModerationService @@ -23,6 +24,7 @@ import com.flipcash.services.internal.network.services.SettingsService import com.flipcash.services.internal.network.services.ThirdPartyService import com.flipcash.services.internal.repositories.InternalAccountRepository import com.flipcash.services.internal.repositories.InternalActivityFeedRepository +import com.flipcash.services.internal.repositories.InternalEventStreamingRepository import com.flipcash.services.internal.repositories.InternalContactListRepository import com.flipcash.services.internal.repositories.InternalContactVerificationRepository import com.flipcash.services.internal.repositories.InternalModerationRepository @@ -34,6 +36,7 @@ import com.flipcash.services.internal.repositories.InternalSettingsRepository import com.flipcash.services.internal.repositories.InternalThirdPartyRepository import com.flipcash.services.repository.AccountRepository import com.flipcash.services.repository.ActivityFeedRepository +import com.flipcash.services.repository.EventStreamingRepository import com.flipcash.services.repository.ContactListRepository import com.flipcash.services.repository.ContactVerificationRepository import com.flipcash.services.repository.ModerationRepository @@ -114,6 +117,11 @@ internal object FlipcashModule { } } + @Provides + internal fun providesEventStreamingRepository( + service: EventStreamingService, + ): EventStreamingRepository = InternalEventStreamingRepository(service) + @Provides internal fun providesContactListRepository( service: ContactListService, diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt new file mode 100644 index 000000000..0b98db63d --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt @@ -0,0 +1,26 @@ +package com.flipcash.services.internal.network.api + +import com.codeinc.flipcash.gen.events.v1.EventStreamingGrpcKt +import com.codeinc.flipcash.gen.events.v1.EventStreamingService +import com.flipcash.services.internal.annotations.FlipcashManagedChannel +import com.getcode.opencode.internal.network.core.GrpcApi +import io.grpc.ManagedChannel +import kotlinx.coroutines.flow.Flow +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +internal class EventStreamingApi @Inject constructor( + @FlipcashManagedChannel + managedChannel: ManagedChannel, +) : GrpcApi(managedChannel) { + + private val api = EventStreamingGrpcKt.EventStreamingCoroutineStub(managedChannel) + .withWaitForReady() + + fun streamEvents( + requests: Flow, + ): Flow { + return api.streamEvents(requests) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt index a98f79381..156814320 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt @@ -7,6 +7,7 @@ import com.codeinc.flipcash.gen.thirdparty.v1.Model as ThirdPartyModels import com.flipcash.services.models.PagingToken import com.flipcash.services.models.QueryOptions import com.flipcash.services.models.SocialAccountLinkRequest +import com.flipcash.services.models.chat.ChatId import com.getcode.ed25519.Ed25519.KeyPair import com.getcode.network.jwt.ApiProvider import com.getcode.opencode.model.core.ID @@ -14,7 +15,7 @@ import com.getcode.solana.keys.Checksum import com.getcode.solana.keys.PublicKey import com.getcode.utils.toByteString import com.google.protobuf.Timestamp -import kotlinx.datetime.Instant +import kotlin.time.Instant internal fun Checksum.asHash(): Common.Hash { return Common.Hash.newBuilder().setValue(byteArray.toByteString()).build() @@ -73,6 +74,10 @@ internal fun Pair.asApiKey(): ThirdPartyModels.ApiKey { .build() } +internal fun ChatId.asChatId(): Common.ChatId { + return Common.ChatId.newBuilder().setValue(bytes.toByteString()).build() +} + internal fun String.asCountryCode(): Common.CountryCode { return Common.CountryCode.newBuilder().setValue(this).build() } diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/ProtobufToLocal.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/ProtobufToLocal.kt index eb66cffc5..d4be371a1 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/ProtobufToLocal.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/ProtobufToLocal.kt @@ -12,13 +12,28 @@ import com.flipcash.services.internal.extensions.toPublicKey import com.flipcash.services.models.NavigationTrigger import com.flipcash.services.models.NotificationCategory import com.flipcash.services.models.NotificationPayload +import com.flipcash.services.models.PagingToken import com.flipcash.services.models.Substitution +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMessage +import com.flipcash.services.models.chat.ChatType +import com.flipcash.services.models.chat.ChatUpdate +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.MessagePointer +import com.flipcash.services.models.chat.MetadataUpdate +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingNotification +import com.flipcash.services.models.chat.TypingState import com.getcode.opencode.model.core.ID import com.getcode.solana.keys.Checksum import com.getcode.solana.keys.Mint import com.getcode.solana.keys.PublicKey import com.getcode.solana.keys.Signature +import kotlin.time.Instant import com.codeinc.flipcash.gen.activity.v1.Model as ActivityModels +import com.codeinc.flipcash.gen.chat.v1.Model as ChatModel +import com.codeinc.flipcash.gen.events.v1.Model as EventModel +import com.codeinc.flipcash.gen.messaging.v1.Model as MessagingModel internal fun ActivityModels.NotificationId.toId(): ID = value.toByteArray().toList() internal fun Common.UserId.toId(): ID = value.toByteArray().toList() @@ -71,4 +86,130 @@ internal fun PushModels.Substitution.asSubstitution(): Substitution? { internal fun Common.Signature.toSignature(): Signature { return Signature(value.toByteArray().toList()) +} + +// -- ChatId -- + +internal fun Common.ChatId.toChatId(): ChatId = ChatId(value.toByteArray()) + +// -- PagingToken (proto → domain) -- + +internal fun Common.PagingToken.toPagingToken(): PagingToken = value.toByteArray().toList() + +// -- Messaging models -- + +internal fun MessagingModel.Content.toMessageContent(): MessageContent { + return when (typeCase) { + MessagingModel.Content.TypeCase.TEXT -> MessageContent.Text(text.text) + else -> MessageContent.Text("") + } +} + +internal fun MessagingModel.Message.toChatMessage(): ChatMessage { + return ChatMessage( + messageId = messageId.value, + senderId = if (hasSenderId()) senderId.toId() else null, + content = contentList.map { it.toMessageContent() }, + timestamp = Instant.fromEpochSeconds(ts.seconds, ts.nanos), + unreadSeq = unreadSeq, + ) +} + +internal fun MessagingModel.Pointer.toPointer(): MessagePointer { + return MessagePointer( + type = type.toPointerType(), + userId = userId.toId(), + value = value.value, + ) +} + +internal fun MessagingModel.Pointer.Type.toPointerType(): PointerType { + return when (this) { + MessagingModel.Pointer.Type.SENT -> PointerType.SENT + MessagingModel.Pointer.Type.DELIVERED -> PointerType.DELIVERED + MessagingModel.Pointer.Type.READ -> PointerType.READ + else -> PointerType.UNKNOWN + } +} + +internal fun MessagingModel.IsTypingNotification.toTypingNotification(): TypingNotification { + return TypingNotification( + userId = userId.toId(), + state = state.toTypingState(), + ) +} + +internal fun MessagingModel.IsTypingNotification.State.toTypingState(): TypingState { + return when (this) { + MessagingModel.IsTypingNotification.State.STARTED_TYPING -> TypingState.STARTED_TYPING + MessagingModel.IsTypingNotification.State.STILL_TYPING -> TypingState.STILL_TYPING + MessagingModel.IsTypingNotification.State.STOPPED_TYPING -> TypingState.STOPPED_TYPING + MessagingModel.IsTypingNotification.State.TYPING_TIMED_OUT -> TypingState.TYPING_TIMED_OUT + else -> TypingState.UNKNOWN + } +} + +// -- Chat metadata updates -- + +internal fun ChatModel.MetadataUpdate.toMetadataUpdate( + metadataMapper: (ChatModel.Metadata) -> com.flipcash.services.models.chat.ChatMetadata, +): MetadataUpdate { + return when (kindCase) { + ChatModel.MetadataUpdate.KindCase.FULL_REFRESH -> + MetadataUpdate.FullRefresh(metadataMapper(fullRefresh.metadata)) + ChatModel.MetadataUpdate.KindCase.LAST_ACTIVITY_CHANGED -> + MetadataUpdate.LastActivityChanged( + Instant.fromEpochSeconds( + lastActivityChanged.newLastActivity.seconds, + lastActivityChanged.newLastActivity.nanos, + ) + ) + else -> MetadataUpdate.LastActivityChanged(Instant.fromEpochSeconds(0)) + } +} + +// -- Chat type -- + +internal fun ChatModel.Metadata.ChatType.toChatType(): ChatType { + return when (this) { + ChatModel.Metadata.ChatType.DM -> ChatType.DM + else -> ChatType.UNKNOWN + } +} + +// -- Chat metadata (simple, no injected mapper) -- + +internal fun ChatModel.Metadata.toChatMetadata(): com.flipcash.services.models.chat.ChatMetadata { + return com.flipcash.services.models.chat.ChatMetadata( + chatId = chatId.toChatId(), + type = type.toChatType(), + members = membersList.map { member -> + com.flipcash.services.models.chat.ChatMember( + userId = member.userId.toId(), + userProfile = com.flipcash.services.models.UserProfile( + displayName = member.userProfile.displayName, + socialAccounts = emptyList(), + verifiedPhoneNumber = null, + verifiedEmailAddress = null, + ), + pointers = member.pointersList.map { it.toPointer() }, + ) + }, + lastMessage = if (hasLastMessage()) lastMessage.toChatMessage() else null, + lastActivity = Instant.fromEpochSeconds(lastActivity.seconds, lastActivity.nanos), + ) +} + +// -- EventModel.ChatUpdate -- + +internal fun EventModel.ChatUpdate.toChatUpdate( + metadataMapper: (ChatModel.Metadata) -> com.flipcash.services.models.chat.ChatMetadata = { it.toChatMetadata() }, +): ChatUpdate { + return ChatUpdate( + chatId = chat.toChatId(), + newMessages = if (hasNewMessages()) newMessages.messagesList.map { it.toChatMessage() } else emptyList(), + pointerUpdates = if (hasPointerUpdates()) pointerUpdates.pointersList.map { it.toPointer() } else emptyList(), + typingNotifications = if (hasIsTypingNotifications()) isTypingNotifications.isTypingNotificationsList.map { it.toTypingNotification() } else emptyList(), + metadataUpdates = metadataUpdatesList.map { it.toMetadataUpdate(metadataMapper) }, + ) } \ No newline at end of file diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt new file mode 100644 index 000000000..2de483795 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt @@ -0,0 +1,137 @@ +package com.flipcash.services.internal.network.services + +import com.codeinc.flipcash.gen.events.v1.EventStreamingService as RpcEventStreamingService +import com.codeinc.flipcash.gen.events.v1.Model as EventModel +import com.flipcash.services.internal.network.api.EventStreamingApi +import com.flipcash.services.internal.network.extensions.authenticate +import com.flipcash.services.internal.network.extensions.toChatUpdate +import com.flipcash.services.models.StreamEventsError +import com.flipcash.services.models.chat.ChatUpdate +import com.getcode.ed25519.Ed25519.KeyPair +import com.getcode.opencode.internal.bidi.BidirectionalStreamReference +import com.getcode.opencode.internal.bidi.openBidirectionalStream +import com.getcode.utils.TraceType +import com.getcode.utils.trace +import com.google.protobuf.Timestamp +import kotlin.time.Clock +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch +import javax.inject.Inject + +typealias EventStreamReference = BidirectionalStreamReference + +internal class EventStreamingService @Inject constructor( + private val api: EventStreamingApi, +) { + fun openEventStream( + scope: CoroutineScope, + owner: KeyPair, + onEvent: (ChatUpdate) -> Unit, + onError: (Throwable) -> Unit = {}, + ): EventStreamReference { + trace("EventStream Opening stream.") + val streamReference = EventStreamReference(scope, "event-streaming") + + streamReference.retain() + streamReference.timeoutHandler = { + trace("EventStream timed out") + streamReference.coroutineScope.launch { + openStream(scope, owner, streamReference, onEvent, onError) + } + } + + streamReference.coroutineScope.launch { + openStream(scope, owner, streamReference, onEvent, onError) + } + + return streamReference + } + + private fun openStream( + scope: CoroutineScope, + owner: KeyPair, + streamRef: EventStreamReference, + onEvent: (ChatUpdate) -> Unit, + onError: (Throwable) -> Unit, + ) { + openBidirectionalStream( + streamRef = streamRef, + apiCall = api::streamEvents, + initialRequest = { + val params = RpcEventStreamingService.StreamEventsRequest.Params.newBuilder() + .setTs(currentTimestamp()) + .apply { setAuth(authenticate(owner)) } + .build() + + RpcEventStreamingService.StreamEventsRequest.newBuilder() + .setParams(params) + .build() + }, + reconnectOnUnavailable = true, + reconnectOnCancelled = true, + onError = { onError(it) }, + responseHandler = { response, sendRequest -> + when (response.typeCase) { + RpcEventStreamingService.StreamEventsResponse.TypeCase.PING -> { + val pong = RpcEventStreamingService.StreamEventsRequest.newBuilder() + .setPong( + EventModel.ClientPong.newBuilder() + .setTimestamp(currentTimestamp()) + ) + .build() + + streamRef.receivedPing(updatedTimeout = response.ping.pingDelay.seconds * 1_000L) + sendRequest(pong) + trace(message = "EventStream Pong. Server timestamp: ${response.ping.timestamp}") + } + + RpcEventStreamingService.StreamEventsResponse.TypeCase.EVENTS -> { + response.events.eventsList.forEach { event -> + when (event.typeCase) { + EventModel.Event.TypeCase.CHAT_UPDATE -> { + onEvent(event.chatUpdate.toChatUpdate()) + } + else -> { + trace( + message = "EventStream received unhandled event type: ${event.typeCase}", + type = TraceType.Log, + ) + } + } + } + } + + RpcEventStreamingService.StreamEventsResponse.TypeCase.ERROR -> { + val error = when (response.error.code) { + RpcEventStreamingService.StreamEventsResponse.StreamError.Code.DENIED -> + StreamEventsError.Denied() + RpcEventStreamingService.StreamEventsResponse.StreamError.Code.INVALID_TIMESTAMP -> + StreamEventsError.InvalidTimestamp() + else -> StreamEventsError.Unrecognized() + } + onError(error) + trace( + message = "EventStream error: ${response.error.code}", + type = TraceType.Error, + ) + } + + else -> { + trace( + message = "EventStream received empty message.", + type = TraceType.Error, + ) + } + } + } + ) + } + + private fun currentTimestamp(): Timestamp { + val now = Clock.System.now() + return Timestamp.newBuilder() + .setSeconds(now.epochSeconds) + .setNanos(now.nanosecondsOfSecond) + .build() + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalEventStreamingRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalEventStreamingRepository.kt new file mode 100644 index 000000000..49bf9320b --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalEventStreamingRepository.kt @@ -0,0 +1,21 @@ +package com.flipcash.services.internal.repositories + +import com.flipcash.services.internal.network.services.EventStreamReference +import com.flipcash.services.internal.network.services.EventStreamingService +import com.flipcash.services.models.chat.ChatUpdate +import com.flipcash.services.repository.EventStreamingRepository +import com.getcode.ed25519.Ed25519.KeyPair +import kotlinx.coroutines.CoroutineScope + +internal class InternalEventStreamingRepository( + private val service: EventStreamingService, +) : EventStreamingRepository { + override fun openEventStream( + scope: CoroutineScope, + owner: KeyPair, + onEvent: (ChatUpdate) -> Unit, + onError: (Throwable) -> Unit, + ): EventStreamReference { + return service.openEventStream(scope, owner, onEvent, onError) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt index 0daa6d9da..2aa6c32d9 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt @@ -303,6 +303,16 @@ sealed class GetContactsError( data class Other(override val cause: Throwable? = null) : GetContactsError(message = cause?.message, cause = cause), NotifiableError } +sealed class StreamEventsError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : StreamEventsError("Denied") + class InvalidTimestamp : StreamEventsError("Invalid timestamp") + class Unrecognized : StreamEventsError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : StreamEventsError(message = cause?.message, cause = cause), NotifiableError +} + sealed class ResolveContactError( override val message: String? = null, override val cause: Throwable? = null diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatId.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatId.kt new file mode 100644 index 000000000..897a22b48 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatId.kt @@ -0,0 +1,13 @@ +package com.flipcash.services.models.chat + +data class ChatId(val bytes: ByteArray) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is ChatId) return false + return bytes.contentEquals(other.bytes) + } + + override fun hashCode(): Int = bytes.contentHashCode() + + override fun toString(): String = "ChatId(${bytes.size} bytes)" +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMember.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMember.kt new file mode 100644 index 000000000..267b06353 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMember.kt @@ -0,0 +1,10 @@ +package com.flipcash.services.models.chat + +import com.flipcash.services.models.UserProfile +import com.getcode.opencode.model.core.ID + +data class ChatMember( + val userId: ID, + val userProfile: UserProfile, + val pointers: List, +) diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMessage.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMessage.kt new file mode 100644 index 000000000..3acea6418 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMessage.kt @@ -0,0 +1,12 @@ +package com.flipcash.services.models.chat + +import com.getcode.opencode.model.core.ID +import kotlin.time.Instant + +data class ChatMessage( + val messageId: Long, + val senderId: ID?, + val content: List, + val timestamp: Instant, + val unreadSeq: Long, +) diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMetadata.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMetadata.kt new file mode 100644 index 000000000..fc93f3707 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatMetadata.kt @@ -0,0 +1,11 @@ +package com.flipcash.services.models.chat + +import kotlin.time.Instant + +data class ChatMetadata( + val chatId: ChatId, + val type: ChatType, + val members: List, + val lastMessage: ChatMessage?, + val lastActivity: Instant, +) diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatType.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatType.kt new file mode 100644 index 000000000..ba707e956 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatType.kt @@ -0,0 +1,6 @@ +package com.flipcash.services.models.chat + +enum class ChatType { + UNKNOWN, + DM, +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatUpdate.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatUpdate.kt new file mode 100644 index 000000000..6bb655764 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatUpdate.kt @@ -0,0 +1,9 @@ +package com.flipcash.services.models.chat + +data class ChatUpdate( + val chatId: ChatId, + val newMessages: List, + val pointerUpdates: List, + val typingNotifications: List, + val metadataUpdates: List, +) diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessageContent.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessageContent.kt new file mode 100644 index 000000000..1b481b452 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessageContent.kt @@ -0,0 +1,5 @@ +package com.flipcash.services.models.chat + +sealed interface MessageContent { + data class Text(val text: String) : MessageContent +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessagePointer.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessagePointer.kt new file mode 100644 index 000000000..b5e583a0b --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MessagePointer.kt @@ -0,0 +1,16 @@ +package com.flipcash.services.models.chat + +import com.getcode.opencode.model.core.ID + +data class MessagePointer( + val type: PointerType, + val userId: ID, + val value: Long, +) + +enum class PointerType { + UNKNOWN, + SENT, + DELIVERED, + READ, +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MetadataUpdate.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MetadataUpdate.kt new file mode 100644 index 000000000..5797b232f --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/MetadataUpdate.kt @@ -0,0 +1,8 @@ +package com.flipcash.services.models.chat + +import kotlin.time.Instant + +sealed interface MetadataUpdate { + data class FullRefresh(val metadata: ChatMetadata) : MetadataUpdate + data class LastActivityChanged(val newLastActivity: Instant) : MetadataUpdate +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/TypingNotification.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/TypingNotification.kt new file mode 100644 index 000000000..ad5a6af5e --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/TypingNotification.kt @@ -0,0 +1,16 @@ +package com.flipcash.services.models.chat + +import com.getcode.opencode.model.core.ID + +data class TypingNotification( + val userId: ID, + val state: TypingState, +) + +enum class TypingState { + UNKNOWN, + STARTED_TYPING, + STILL_TYPING, + STOPPED_TYPING, + TYPING_TIMED_OUT, +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/repository/EventStreamingRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/EventStreamingRepository.kt new file mode 100644 index 000000000..3bff3beca --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/EventStreamingRepository.kt @@ -0,0 +1,15 @@ +package com.flipcash.services.repository + +import com.flipcash.services.internal.network.services.EventStreamReference +import com.flipcash.services.models.chat.ChatUpdate +import com.getcode.ed25519.Ed25519.KeyPair +import kotlinx.coroutines.CoroutineScope + +interface EventStreamingRepository { + fun openEventStream( + scope: CoroutineScope, + owner: KeyPair, + onEvent: (ChatUpdate) -> Unit, + onError: (Throwable) -> Unit = {}, + ): EventStreamReference +} diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt new file mode 100644 index 000000000..c844c836b --- /dev/null +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt @@ -0,0 +1,89 @@ +package com.flipcash.services.controllers + +import com.flipcash.services.internal.network.services.EventStreamReference +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatUpdate +import com.flipcash.services.repository.EventStreamingRepository +import com.flipcash.services.user.UserManager +import com.getcode.ed25519.Ed25519 +import com.getcode.opencode.model.accounts.AccountCluster +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertNotNull + +@OptIn(ExperimentalCoroutinesApi::class) +class EventStreamingControllerTest { + + private val repository = FakeEventStreamingRepository() + private val userManager = mockk(relaxed = true) + private val controller = EventStreamingController(repository, userManager) + + private fun stubOwner() { + val keyPair = mockk(relaxed = true) + val cluster = mockk(relaxed = true) { + every { authority } returns mockk { every { this@mockk.keyPair } returns keyPair } + } + every { userManager.accountCluster } returns cluster + } + + @Test + fun `open does nothing when no account cluster`() = runTest { + every { userManager.accountCluster } returns null + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + controller.open(scope) + + // No stream opened + assert(!repository.opened) + } + + @Test + fun `open creates stream when account cluster exists`() = runTest { + stubOwner() + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + controller.open(scope) + + assert(repository.opened) + } + + @Test + fun `close destroys the stream reference`() = runTest { + stubOwner() + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + controller.open(scope) + + controller.close() + + assertNotNull(repository.lastStreamRef) + verify { repository.lastStreamRef!!.destroy() } + } + + @Test + fun `chatUpdates SharedFlow is accessible`() { + assertNotNull(controller.chatUpdates) + } +} + +private class FakeEventStreamingRepository : EventStreamingRepository { + var opened = false + var lastStreamRef: EventStreamReference? = null + + override fun openEventStream( + scope: CoroutineScope, + owner: Ed25519.KeyPair, + onEvent: (ChatUpdate) -> Unit, + onError: (Throwable) -> Unit, + ): EventStreamReference { + opened = true + val ref = mockk(relaxed = true) + lastStreamRef = ref + return ref + } +} diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt index d9643f709..eb6eb0313 100644 --- a/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt @@ -127,6 +127,30 @@ class ErrorsTest { assertIs(PlacePoolBetError.MaxBetsReceived()) } + // -- StreamEventsError -- + + @Test + fun `StreamEventsError subtypes are CodeServerError`() { + assertIs(StreamEventsError.Denied()) + assertIs(StreamEventsError.InvalidTimestamp()) + assertIs(StreamEventsError.Unrecognized()) + assertIs(StreamEventsError.Other()) + } + + @Test + fun `StreamEventsError has expected messages`() { + assertEquals("Denied", StreamEventsError.Denied().message) + assertEquals("Invalid timestamp", StreamEventsError.InvalidTimestamp().message) + } + + @Test + fun `StreamEventsError Other preserves cause`() { + val root = RuntimeException("stream broke") + val error = StreamEventsError.Other(root) + assertSame(root, error.cause) + assertEquals("stream broke", error.message) + } + // -- GetJwtError -- @Test diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/ChatIdTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/ChatIdTest.kt new file mode 100644 index 000000000..0b754b6fc --- /dev/null +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/ChatIdTest.kt @@ -0,0 +1,43 @@ +package com.flipcash.services.models.chat + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals +import kotlin.test.assertTrue + +class ChatIdTest { + + @Test + fun `equals returns true for same byte content`() { + val a = ChatId(byteArrayOf(1, 2, 3)) + val b = ChatId(byteArrayOf(1, 2, 3)) + assertEquals(a, b) + } + + @Test + fun `equals returns false for different byte content`() { + val a = ChatId(byteArrayOf(1, 2, 3)) + val b = ChatId(byteArrayOf(4, 5, 6)) + assertNotEquals(a, b) + } + + @Test + fun `hashCode is consistent for equal instances`() { + val a = ChatId(byteArrayOf(10, 20, 30)) + val b = ChatId(byteArrayOf(10, 20, 30)) + assertEquals(a.hashCode(), b.hashCode()) + } + + @Test + fun `toString includes byte size`() { + val id = ChatId(ByteArray(32)) + assertTrue(id.toString().contains("32 bytes")) + } + + @Test + fun `equals returns false for non-ChatId`() { + val id = ChatId(byteArrayOf(1, 2, 3)) + @Suppress("AssertBetweenInconvertibleTypes") + assertNotEquals(id, "not a ChatId") + } +} diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/DomainModelsTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/DomainModelsTest.kt new file mode 100644 index 000000000..3e80d6471 --- /dev/null +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/models/chat/DomainModelsTest.kt @@ -0,0 +1,99 @@ +package com.flipcash.services.models.chat + +import com.flipcash.services.models.UserProfile +import kotlin.time.Instant +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNull + +class DomainModelsTest { + + @Test + fun `ChatType has expected values`() { + assertEquals(2, ChatType.entries.size) + assertIs(ChatType.UNKNOWN) + assertIs(ChatType.DM) + } + + @Test + fun `PointerType has expected values`() { + assertEquals(4, PointerType.entries.size) + assertIs(PointerType.UNKNOWN) + assertIs(PointerType.SENT) + assertIs(PointerType.DELIVERED) + assertIs(PointerType.READ) + } + + @Test + fun `TypingState has expected values`() { + assertEquals(5, TypingState.entries.size) + } + + @Test + fun `MessageContent Text holds text`() { + val content = MessageContent.Text("hello") + assertEquals("hello", content.text) + } + + @Test + fun `ChatMessage can have null senderId`() { + val msg = ChatMessage( + messageId = 1, + senderId = null, + content = listOf(MessageContent.Text("system")), + timestamp = Instant.fromEpochSeconds(1000), + unreadSeq = 0, + ) + assertNull(msg.senderId) + } + + @Test + fun `ChatUpdate aggregates all update types`() { + val chatId = ChatId(ByteArray(32)) + val update = ChatUpdate( + chatId = chatId, + newMessages = listOf( + ChatMessage(1, null, listOf(MessageContent.Text("hi")), Instant.fromEpochSeconds(0), 1) + ), + pointerUpdates = listOf( + MessagePointer(PointerType.READ, listOf(1.toByte()), 5) + ), + typingNotifications = listOf( + TypingNotification(listOf(1.toByte()), TypingState.STARTED_TYPING) + ), + metadataUpdates = listOf( + MetadataUpdate.LastActivityChanged(Instant.fromEpochSeconds(100)) + ), + ) + + assertEquals(1, update.newMessages.size) + assertEquals(1, update.pointerUpdates.size) + assertEquals(1, update.typingNotifications.size) + assertEquals(1, update.metadataUpdates.size) + } + + @Test + fun `MetadataUpdate FullRefresh holds metadata`() { + val metadata = ChatMetadata( + chatId = ChatId(ByteArray(32)), + type = ChatType.DM, + members = emptyList(), + lastMessage = null, + lastActivity = Instant.fromEpochSeconds(500), + ) + val update = MetadataUpdate.FullRefresh(metadata) + assertEquals(metadata, update.metadata) + } + + @Test + fun `ChatMember holds profile and pointers`() { + val member = ChatMember( + userId = listOf(1.toByte()), + userProfile = UserProfile("Test", emptyList(), null, null), + pointers = listOf(MessagePointer(PointerType.READ, listOf(1.toByte()), 10)), + ) + assertEquals("Test", member.userProfile.displayName) + assertEquals(1, member.pointers.size) + } +} From 38f2f593762d34e6640e294b51e36b29b82b4198 Mon Sep 17 00:00:00 2001 From: Brandon McAnsh Date: Fri, 5 Jun 2026 09:15:39 -0400 Subject: [PATCH 3/4] feat(chat): add Chat service layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unary RPCs for GetChat and GetDmChatFeed — Api, Service, ChatMetadataMapper, Repository, and Controller. Introduces ChatFeedPage domain model, GetChatError and GetDmChatFeedError sealed classes, and Hilt wiring. Signed-off-by: Brandon McAnsh --- .../services/controllers/ChatController.kt | 30 +++ .../controllers/ChatMessagingController.kt | 63 +++++ .../services/inject/FlipcashModule.kt | 18 ++ .../internal/domain/ChatMetadataMapper.kt | 33 +++ .../services/internal/network/api/ChatApi.kt | 61 +++++ .../internal/network/api/ChatMessagingApi.kt | 153 +++++++++++ .../network/extensions/LocalToProtobuf.kt | 38 +++ .../network/services/ChatMessagingService.kt | 164 ++++++++++++ .../internal/network/services/ChatService.kt | 61 +++++ .../InternalChatMessagingRepository.kt | 67 +++++ .../repositories/InternalChatRepository.kt | 37 +++ .../com/flipcash/services/models/Errors.kt | 68 +++++ .../services/models/chat/ChatFeedPage.kt | 9 + .../services/models/chat/ClientMessageId.kt | 13 + .../repository/ChatMessagingRepository.kt | 50 ++++ .../services/repository/ChatRepository.kt | 19 ++ .../controllers/ChatControllerTest.kt | 187 ++++++++++++++ .../ChatMessagingControllerTest.kt | 238 ++++++++++++++++++ .../internal/domain/ChatMetadataMapperTest.kt | 114 +++++++++ .../flipcash/services/models/ErrorsTest.kt | 81 ++++++ 20 files changed, 1504 insertions(+) create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatController.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapper.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatApi.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatService.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatRepository.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatFeedPage.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ClientMessageId.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt create mode 100644 services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatRepository.kt create mode 100644 services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatControllerTest.kt create mode 100644 services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt create mode 100644 services/flipcash/src/test/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapperTest.kt diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatController.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatController.kt new file mode 100644 index 000000000..dfbc1126f --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatController.kt @@ -0,0 +1,30 @@ +package com.flipcash.services.controllers + +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatFeedPage +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMetadata +import com.flipcash.services.repository.ChatRepository +import com.flipcash.services.user.UserManager +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class ChatController @Inject constructor( + private val repository: ChatRepository, + private val userManager: UserManager, +) { + suspend fun getChat(chatId: ChatId): Result { + val owner = userManager.accountCluster?.authority?.keyPair + ?: return Result.failure(Throwable("No account cluster in UserManager")) + + return repository.getChat(owner, chatId) + } + + suspend fun getDmChatFeed(queryOptions: QueryOptions = QueryOptions()): Result { + val owner = userManager.accountCluster?.authority?.keyPair + ?: return Result.failure(Throwable("No account cluster in UserManager")) + + return repository.getDmChatFeed(owner, queryOptions) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt new file mode 100644 index 000000000..b98d7bb9f --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt @@ -0,0 +1,63 @@ +package com.flipcash.services.controllers + +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMessage +import com.flipcash.services.models.chat.ClientMessageId +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState +import com.flipcash.services.repository.MessagingRepository +import com.flipcash.services.user.UserManager +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class MessagingController @Inject constructor( + private val repository: MessagingRepository, + private val userManager: UserManager, +) { + private fun requireOwner() = userManager.accountCluster?.authority?.keyPair + ?: throw IllegalStateException("No account cluster in UserManager") + + suspend fun getMessage(chatId: ChatId, messageId: Long): Result { + val owner = runCatching { requireOwner() }.getOrElse { return Result.failure(it) } + return repository.getMessage(owner, chatId, messageId) + } + + suspend fun getMessages(chatId: ChatId, queryOptions: QueryOptions = QueryOptions()): Result> { + val owner = runCatching { requireOwner() }.getOrElse { return Result.failure(it) } + return repository.getMessages(owner, chatId, queryOptions) + } + + suspend fun getMessagesByIds(chatId: ChatId, messageIds: List): Result> { + val owner = runCatching { requireOwner() }.getOrElse { return Result.failure(it) } + return repository.getMessagesByIds(owner, chatId, messageIds) + } + + suspend fun sendMessage( + chatId: ChatId, + content: List, + clientMessageId: ClientMessageId, + ): Result { + val owner = runCatching { requireOwner() }.getOrElse { return Result.failure(it) } + return repository.sendMessage(owner, chatId, content, clientMessageId) + } + + suspend fun advancePointer( + chatId: ChatId, + pointerType: PointerType, + messageId: Long, + ): Result { + val owner = runCatching { requireOwner() }.getOrElse { return Result.failure(it) } + return repository.advancePointer(owner, chatId, pointerType, messageId) + } + + suspend fun notifyIsTyping( + chatId: ChatId, + state: TypingState, + ): Result { + val owner = runCatching { requireOwner() }.getOrElse { return Result.failure(it) } + return repository.notifyIsTyping(owner, chatId, state) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt index 8a103436b..81817618b 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt @@ -9,9 +9,12 @@ import com.flipcash.services.internal.domain.UserFlagsMapper import com.flipcash.services.internal.domain.SocialAccountMapper import com.flipcash.services.internal.domain.TextModerationResponseMapper import com.flipcash.services.internal.domain.UserProfileMapper +import com.flipcash.services.internal.domain.ChatMetadataMapper import com.flipcash.services.internal.network.services.AccountService import com.flipcash.services.internal.network.services.ActivityFeedService +import com.flipcash.services.internal.network.services.ChatService import com.flipcash.services.internal.network.services.EventStreamingService +import com.flipcash.services.internal.network.services.MessagingService import com.flipcash.services.internal.network.services.EmailVerificationService import com.flipcash.services.internal.network.services.ContactListService import com.flipcash.services.internal.network.services.ModerationService @@ -24,7 +27,9 @@ import com.flipcash.services.internal.network.services.SettingsService import com.flipcash.services.internal.network.services.ThirdPartyService import com.flipcash.services.internal.repositories.InternalAccountRepository import com.flipcash.services.internal.repositories.InternalActivityFeedRepository +import com.flipcash.services.internal.repositories.InternalChatRepository import com.flipcash.services.internal.repositories.InternalEventStreamingRepository +import com.flipcash.services.internal.repositories.InternalMessagingRepository import com.flipcash.services.internal.repositories.InternalContactListRepository import com.flipcash.services.internal.repositories.InternalContactVerificationRepository import com.flipcash.services.internal.repositories.InternalModerationRepository @@ -36,7 +41,9 @@ import com.flipcash.services.internal.repositories.InternalSettingsRepository import com.flipcash.services.internal.repositories.InternalThirdPartyRepository import com.flipcash.services.repository.AccountRepository import com.flipcash.services.repository.ActivityFeedRepository +import com.flipcash.services.repository.ChatRepository import com.flipcash.services.repository.EventStreamingRepository +import com.flipcash.services.repository.MessagingRepository import com.flipcash.services.repository.ContactListRepository import com.flipcash.services.repository.ContactVerificationRepository import com.flipcash.services.repository.ModerationRepository @@ -117,11 +124,22 @@ internal object FlipcashModule { } } + @Provides + internal fun providesChatRepository( + service: ChatService, + mapper: ChatMetadataMapper, + ): ChatRepository = InternalChatRepository(service, mapper) + @Provides internal fun providesEventStreamingRepository( service: EventStreamingService, ): EventStreamingRepository = InternalEventStreamingRepository(service) + @Provides + internal fun providesFlipcashMessagingRepository( + service: MessagingService, + ): MessagingRepository = InternalMessagingRepository(service) + @Provides internal fun providesContactListRepository( service: ContactListService, diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapper.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapper.kt new file mode 100644 index 000000000..b4b6bd7da --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapper.kt @@ -0,0 +1,33 @@ +package com.flipcash.services.internal.domain + +import com.codeinc.flipcash.gen.chat.v1.Model as ChatModel +import com.flipcash.services.internal.domain.mapper.Mapper +import com.flipcash.services.internal.network.extensions.toChatId +import com.flipcash.services.internal.network.extensions.toChatMessage +import com.flipcash.services.internal.network.extensions.toChatType +import com.flipcash.services.internal.network.extensions.toId +import com.flipcash.services.internal.network.extensions.toPointer +import com.flipcash.services.models.chat.ChatMember +import com.flipcash.services.models.chat.ChatMetadata +import kotlin.time.Instant +import javax.inject.Inject + +class ChatMetadataMapper @Inject constructor( + private val userProfileMapper: UserProfileMapper, +) : Mapper { + override fun map(from: ChatModel.Metadata): ChatMetadata { + return ChatMetadata( + chatId = from.chatId.toChatId(), + type = from.type.toChatType(), + members = from.membersList.map { member -> + ChatMember( + userId = member.userId.toId(), + userProfile = userProfileMapper.map(member.userProfile), + pointers = member.pointersList.map { it.toPointer() }, + ) + }, + lastMessage = if (from.hasLastMessage()) from.lastMessage.toChatMessage() else null, + lastActivity = Instant.fromEpochSeconds(from.lastActivity.seconds, from.lastActivity.nanos), + ) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatApi.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatApi.kt new file mode 100644 index 000000000..29300c2ee --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatApi.kt @@ -0,0 +1,61 @@ +package com.flipcash.services.internal.network.api + +import com.codeinc.flipcash.gen.chat.v1.ChatGrpcKt +import com.codeinc.flipcash.gen.chat.v1.ChatService as RpcChatService +import com.codeinc.flipcash.gen.chat.v1.validate +import com.flipcash.services.internal.annotations.FlipcashManagedChannel +import com.flipcash.services.internal.network.extensions.asChatId +import com.flipcash.services.internal.network.extensions.asQueryOptions +import com.flipcash.services.internal.network.extensions.authenticate +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.getcode.ed25519.Ed25519.KeyPair +import com.getcode.opencode.internal.network.core.GrpcApi +import dev.bmcreations.protovalidate.orThrow +import io.grpc.ManagedChannel +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +internal class ChatApi @Inject constructor( + @FlipcashManagedChannel + managedChannel: ManagedChannel, +) : GrpcApi(managedChannel) { + + private val api = ChatGrpcKt.ChatCoroutineStub(managedChannel) + .withWaitForReady() + + suspend fun getChat( + owner: KeyPair, + chatId: ChatId, + ): RpcChatService.GetChatResponse { + val request = RpcChatService.GetChatRequest.newBuilder() + .setChatId(chatId.asChatId()) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.getChat(request) + } + } + + suspend fun getDmChatFeed( + owner: KeyPair, + queryOptions: QueryOptions, + ): RpcChatService.GetDmChatFeedResponse { + val request = RpcChatService.GetDmChatFeedRequest.newBuilder() + .setQueryOptions(queryOptions.asQueryOptions()) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.getDmChatFeed(request) + } + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt new file mode 100644 index 000000000..ab9f0fe71 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt @@ -0,0 +1,153 @@ +package com.flipcash.services.internal.network.api + +import com.codeinc.flipcash.gen.messaging.v1.MessagingGrpcKt +import com.codeinc.flipcash.gen.messaging.v1.MessagingService as RpcMessagingService +import com.codeinc.flipcash.gen.messaging.v1.Model as MessagingModel +import com.codeinc.flipcash.gen.messaging.v1.validate +import com.flipcash.services.internal.annotations.FlipcashManagedChannel +import com.flipcash.services.internal.network.extensions.asChatId +import com.flipcash.services.internal.network.extensions.asClientMessageId +import com.flipcash.services.internal.network.extensions.asContent +import com.flipcash.services.internal.network.extensions.asPointerType +import com.flipcash.services.internal.network.extensions.asQueryOptions +import com.flipcash.services.internal.network.extensions.asTypingState +import com.flipcash.services.internal.network.extensions.authenticate +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ClientMessageId +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState +import com.getcode.ed25519.Ed25519.KeyPair +import com.getcode.opencode.internal.network.core.GrpcApi +import dev.bmcreations.protovalidate.orThrow +import io.grpc.ManagedChannel +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +internal class MessagingApi @Inject constructor( + @FlipcashManagedChannel + managedChannel: ManagedChannel, +) : GrpcApi(managedChannel) { + + private val api = MessagingGrpcKt.MessagingCoroutineStub(managedChannel) + .withWaitForReady() + + suspend fun getMessage( + owner: KeyPair, + chatId: ChatId, + messageId: Long, + ): RpcMessagingService.GetMessageResponse { + val request = RpcMessagingService.GetMessageRequest.newBuilder() + .setChatId(chatId.asChatId()) + .setMessageId(MessagingModel.MessageId.newBuilder().setValue(messageId)) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.getMessage(request) + } + } + + suspend fun getMessages( + owner: KeyPair, + chatId: ChatId, + queryOptions: QueryOptions, + ): RpcMessagingService.GetMessagesResponse { + val request = RpcMessagingService.GetMessagesRequest.newBuilder() + .setChatId(chatId.asChatId()) + .setOptions(queryOptions.asQueryOptions()) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.getMessages(request) + } + } + + suspend fun getMessagesByIds( + owner: KeyPair, + chatId: ChatId, + messageIds: List, + ): RpcMessagingService.GetMessagesResponse { + val request = RpcMessagingService.GetMessagesRequest.newBuilder() + .setChatId(chatId.asChatId()) + .setMessageIds( + MessagingModel.MessageIdBatch.newBuilder() + .addAllMessageIds(messageIds.map { MessagingModel.MessageId.newBuilder().setValue(it).build() }) + ) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.getMessages(request) + } + } + + suspend fun sendMessage( + owner: KeyPair, + chatId: ChatId, + content: List, + clientMessageId: ClientMessageId, + ): RpcMessagingService.SendMessageResponse { + val request = RpcMessagingService.SendMessageRequest.newBuilder() + .setChatId(chatId.asChatId()) + .addAllContent(content.map { it.asContent() }) + .setClientMessageId(clientMessageId.asClientMessageId()) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.sendMessage(request) + } + } + + suspend fun advancePointer( + owner: KeyPair, + chatId: ChatId, + pointerType: PointerType, + messageId: Long, + ): RpcMessagingService.AdvancePointerResponse { + val request = RpcMessagingService.AdvancePointerRequest.newBuilder() + .setChatId(chatId.asChatId()) + .setPointerType(pointerType.asPointerType()) + .setNewValue(MessagingModel.MessageId.newBuilder().setValue(messageId)) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.advancePointer(request) + } + } + + suspend fun notifyIsTyping( + owner: KeyPair, + chatId: ChatId, + state: TypingState, + ): RpcMessagingService.NotifyIsTypingResponse { + val request = RpcMessagingService.NotifyIsTypingRequest.newBuilder() + .setChatId(chatId.asChatId()) + .setState(state.asTypingState()) + .apply { setAuth(authenticate(owner)) } + .build() + + request.validate().orThrow() + + return withContext(Dispatchers.IO) { + api.notifyIsTyping(request) + } + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt index 156814320..e6af30b9f 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/extensions/LocalToProtobuf.kt @@ -8,6 +8,10 @@ import com.flipcash.services.models.PagingToken import com.flipcash.services.models.QueryOptions import com.flipcash.services.models.SocialAccountLinkRequest import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ClientMessageId +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState import com.getcode.ed25519.Ed25519.KeyPair import com.getcode.network.jwt.ApiProvider import com.getcode.opencode.model.core.ID @@ -16,6 +20,7 @@ import com.getcode.solana.keys.PublicKey import com.getcode.utils.toByteString import com.google.protobuf.Timestamp import kotlin.time.Instant +import com.codeinc.flipcash.gen.messaging.v1.Model as MessagingModel internal fun Checksum.asHash(): Common.Hash { return Common.Hash.newBuilder().setValue(byteArray.toByteString()).build() @@ -92,4 +97,37 @@ internal fun SocialAccountLinkRequest.linkingToken(): ProfileService.LinkSocialA } return builder.build() +} + +// -- Messaging extensions -- + +internal fun ClientMessageId.asClientMessageId(): MessagingModel.ClientMessageId { + return MessagingModel.ClientMessageId.newBuilder().setValue(bytes.toByteString()).build() +} + +internal fun MessageContent.asContent(): MessagingModel.Content { + return when (this) { + is MessageContent.Text -> MessagingModel.Content.newBuilder() + .setText(MessagingModel.TextContent.newBuilder().setText(text)) + .build() + } +} + +internal fun PointerType.asPointerType(): MessagingModel.Pointer.Type { + return when (this) { + PointerType.SENT -> MessagingModel.Pointer.Type.SENT + PointerType.DELIVERED -> MessagingModel.Pointer.Type.DELIVERED + PointerType.READ -> MessagingModel.Pointer.Type.READ + PointerType.UNKNOWN -> MessagingModel.Pointer.Type.UNKNOWN + } +} + +internal fun TypingState.asTypingState(): MessagingModel.IsTypingNotification.State { + return when (this) { + TypingState.STARTED_TYPING -> MessagingModel.IsTypingNotification.State.STARTED_TYPING + TypingState.STILL_TYPING -> MessagingModel.IsTypingNotification.State.STILL_TYPING + TypingState.STOPPED_TYPING -> MessagingModel.IsTypingNotification.State.STOPPED_TYPING + TypingState.TYPING_TIMED_OUT -> MessagingModel.IsTypingNotification.State.TYPING_TIMED_OUT + TypingState.UNKNOWN -> MessagingModel.IsTypingNotification.State.UNKNOWN_TYPING_STATE + } } \ No newline at end of file diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt new file mode 100644 index 000000000..4fa3283e2 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt @@ -0,0 +1,164 @@ +package com.flipcash.services.internal.network.services + +import com.codeinc.flipcash.gen.messaging.v1.MessagingService as RpcMessagingService +import com.codeinc.flipcash.gen.messaging.v1.Model as MessagingModel +import com.flipcash.services.internal.network.api.MessagingApi +import com.flipcash.services.models.AdvancePointerError +import com.flipcash.services.models.FlipcashSendMessageError +import com.flipcash.services.models.GetMessageError +import com.flipcash.services.models.GetMessagesError +import com.flipcash.services.models.NotifyIsTypingError +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ClientMessageId +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState +import com.getcode.ed25519.Ed25519.KeyPair +import com.getcode.opencode.internal.network.extensions.foldWithSuppression +import com.getcode.opencode.utils.toValidationOrElse +import javax.inject.Inject + +internal class MessagingService @Inject constructor( + private val api: MessagingApi, +) { + suspend fun getMessage( + owner: KeyPair, + chatId: ChatId, + messageId: Long, + ): Result { + return runCatching { + api.getMessage(owner, chatId, messageId) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcMessagingService.GetMessageResponse.Result.OK -> Result.success(response.message) + RpcMessagingService.GetMessageResponse.Result.DENIED -> Result.failure(GetMessageError.Denied()) + RpcMessagingService.GetMessageResponse.Result.NOT_FOUND -> Result.failure(GetMessageError.NotFound()) + RpcMessagingService.GetMessageResponse.Result.UNRECOGNIZED -> Result.failure(GetMessageError.Unrecognized()) + else -> Result.failure(GetMessageError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { GetMessageError.Other(cause = it) }) + } + ) + } + + suspend fun getMessages( + owner: KeyPair, + chatId: ChatId, + queryOptions: QueryOptions, + ): Result> { + return runCatching { + api.getMessages(owner, chatId, queryOptions) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcMessagingService.GetMessagesResponse.Result.OK -> + Result.success(if (response.hasMessages()) response.messages.messagesList else emptyList()) + RpcMessagingService.GetMessagesResponse.Result.DENIED -> Result.failure(GetMessagesError.Denied()) + RpcMessagingService.GetMessagesResponse.Result.NOT_FOUND -> Result.failure(GetMessagesError.NotFound()) + RpcMessagingService.GetMessagesResponse.Result.UNRECOGNIZED -> Result.failure(GetMessagesError.Unrecognized()) + else -> Result.failure(GetMessagesError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { GetMessagesError.Other(cause = it) }) + } + ) + } + + suspend fun getMessagesByIds( + owner: KeyPair, + chatId: ChatId, + messageIds: List, + ): Result> { + return runCatching { + api.getMessagesByIds(owner, chatId, messageIds) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcMessagingService.GetMessagesResponse.Result.OK -> + Result.success(if (response.hasMessages()) response.messages.messagesList else emptyList()) + RpcMessagingService.GetMessagesResponse.Result.DENIED -> Result.failure(GetMessagesError.Denied()) + RpcMessagingService.GetMessagesResponse.Result.NOT_FOUND -> Result.failure(GetMessagesError.NotFound()) + RpcMessagingService.GetMessagesResponse.Result.UNRECOGNIZED -> Result.failure(GetMessagesError.Unrecognized()) + else -> Result.failure(GetMessagesError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { GetMessagesError.Other(cause = it) }) + } + ) + } + + suspend fun sendMessage( + owner: KeyPair, + chatId: ChatId, + content: List, + clientMessageId: ClientMessageId, + ): Result { + return runCatching { + api.sendMessage(owner, chatId, content, clientMessageId) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcMessagingService.SendMessageResponse.Result.OK -> Result.success(response.message) + RpcMessagingService.SendMessageResponse.Result.DENIED -> Result.failure(FlipcashSendMessageError.Denied()) + RpcMessagingService.SendMessageResponse.Result.UNRECOGNIZED -> Result.failure(FlipcashSendMessageError.Unrecognized()) + else -> Result.failure(FlipcashSendMessageError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { FlipcashSendMessageError.Other(cause = it) }) + } + ) + } + + suspend fun advancePointer( + owner: KeyPair, + chatId: ChatId, + pointerType: PointerType, + messageId: Long, + ): Result { + return runCatching { + api.advancePointer(owner, chatId, pointerType, messageId) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcMessagingService.AdvancePointerResponse.Result.OK -> Result.success(Unit) + RpcMessagingService.AdvancePointerResponse.Result.DENIED -> Result.failure(AdvancePointerError.Denied()) + RpcMessagingService.AdvancePointerResponse.Result.MESSAGE_NOT_FOUND -> Result.failure(AdvancePointerError.MessageNotFound()) + RpcMessagingService.AdvancePointerResponse.Result.UNRECOGNIZED -> Result.failure(AdvancePointerError.Unrecognized()) + else -> Result.failure(AdvancePointerError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { AdvancePointerError.Other(cause = it) }) + } + ) + } + + suspend fun notifyIsTyping( + owner: KeyPair, + chatId: ChatId, + state: TypingState, + ): Result { + return runCatching { + api.notifyIsTyping(owner, chatId, state) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcMessagingService.NotifyIsTypingResponse.Result.OK -> Result.success(Unit) + RpcMessagingService.NotifyIsTypingResponse.Result.DENIED -> Result.failure(NotifyIsTypingError.Denied()) + RpcMessagingService.NotifyIsTypingResponse.Result.UNRECOGNIZED -> Result.failure(NotifyIsTypingError.Unrecognized()) + else -> Result.failure(NotifyIsTypingError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { NotifyIsTypingError.Other(cause = it) }) + } + ) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatService.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatService.kt new file mode 100644 index 000000000..743d61b7b --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatService.kt @@ -0,0 +1,61 @@ +package com.flipcash.services.internal.network.services + +import com.codeinc.flipcash.gen.chat.v1.ChatService as RpcChatService +import com.codeinc.flipcash.gen.chat.v1.Model as ChatModel +import com.flipcash.services.internal.network.api.ChatApi +import com.flipcash.services.models.GetChatError +import com.flipcash.services.models.GetDmChatFeedError +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.getcode.ed25519.Ed25519.KeyPair +import com.getcode.opencode.internal.network.extensions.foldWithSuppression +import com.getcode.opencode.utils.toValidationOrElse +import javax.inject.Inject + +internal class ChatService @Inject constructor( + private val api: ChatApi, +) { + suspend fun getChat( + owner: KeyPair, + chatId: ChatId, + ): Result { + return runCatching { + api.getChat(owner, chatId) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcChatService.GetChatResponse.Result.OK -> Result.success(response.metadata) + RpcChatService.GetChatResponse.Result.DENIED -> Result.failure(GetChatError.Denied()) + RpcChatService.GetChatResponse.Result.NOT_FOUND -> Result.failure(GetChatError.NotFound()) + RpcChatService.GetChatResponse.Result.UNRECOGNIZED -> Result.failure(GetChatError.Unrecognized()) + else -> Result.failure(GetChatError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { GetChatError.Other(cause = it) }) + } + ) + } + + suspend fun getDmChatFeed( + owner: KeyPair, + queryOptions: QueryOptions, + ): Result { + return runCatching { + api.getDmChatFeed(owner, queryOptions) + }.foldWithSuppression( + onSuccess = { response -> + when (response.result) { + RpcChatService.GetDmChatFeedResponse.Result.OK -> Result.success(response) + RpcChatService.GetDmChatFeedResponse.Result.DENIED -> Result.failure(GetDmChatFeedError.Denied()) + RpcChatService.GetDmChatFeedResponse.Result.NOT_FOUND -> Result.failure(GetDmChatFeedError.NotFound()) + RpcChatService.GetDmChatFeedResponse.Result.UNRECOGNIZED -> Result.failure(GetDmChatFeedError.Unrecognized()) + else -> Result.failure(GetDmChatFeedError.Other()) + } + }, + onFailure = { cause -> + Result.failure(cause.toValidationOrElse { GetDmChatFeedError.Other(cause = it) }) + } + ) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt new file mode 100644 index 000000000..b421ee65e --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt @@ -0,0 +1,67 @@ +package com.flipcash.services.internal.repositories + +import com.flipcash.services.internal.network.extensions.toChatMessage +import com.flipcash.services.internal.network.services.MessagingService +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMessage +import com.flipcash.services.models.chat.ClientMessageId +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState +import com.flipcash.services.repository.MessagingRepository +import com.getcode.ed25519.Ed25519.KeyPair +import com.getcode.utils.ErrorUtils + +internal class InternalMessagingRepository( + private val service: MessagingService, +) : MessagingRepository { + + override suspend fun getMessage( + owner: KeyPair, + chatId: ChatId, + messageId: Long, + ): Result = service.getMessage(owner, chatId, messageId) + .onFailure { ErrorUtils.handleError(it) } + .map { it.toChatMessage() } + + override suspend fun getMessages( + owner: KeyPair, + chatId: ChatId, + queryOptions: QueryOptions, + ): Result> = service.getMessages(owner, chatId, queryOptions) + .onFailure { ErrorUtils.handleError(it) } + .map { messages -> messages.map { it.toChatMessage() } } + + override suspend fun getMessagesByIds( + owner: KeyPair, + chatId: ChatId, + messageIds: List, + ): Result> = service.getMessagesByIds(owner, chatId, messageIds) + .onFailure { ErrorUtils.handleError(it) } + .map { messages -> messages.map { it.toChatMessage() } } + + override suspend fun sendMessage( + owner: KeyPair, + chatId: ChatId, + content: List, + clientMessageId: ClientMessageId, + ): Result = service.sendMessage(owner, chatId, content, clientMessageId) + .onFailure { ErrorUtils.handleError(it) } + .map { it.toChatMessage() } + + override suspend fun advancePointer( + owner: KeyPair, + chatId: ChatId, + pointerType: PointerType, + messageId: Long, + ): Result = service.advancePointer(owner, chatId, pointerType, messageId) + .onFailure { ErrorUtils.handleError(it) } + + override suspend fun notifyIsTyping( + owner: KeyPair, + chatId: ChatId, + state: TypingState, + ): Result = service.notifyIsTyping(owner, chatId, state) + .onFailure { ErrorUtils.handleError(it) } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatRepository.kt new file mode 100644 index 000000000..85a32d718 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatRepository.kt @@ -0,0 +1,37 @@ +package com.flipcash.services.internal.repositories + +import com.flipcash.services.internal.domain.ChatMetadataMapper +import com.flipcash.services.internal.network.extensions.toPagingToken +import com.flipcash.services.internal.network.services.ChatService +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatFeedPage +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMetadata +import com.flipcash.services.repository.ChatRepository +import com.getcode.ed25519.Ed25519.KeyPair +import com.getcode.utils.ErrorUtils + +internal class InternalChatRepository( + private val service: ChatService, + private val mapper: ChatMetadataMapper, +) : ChatRepository { + override suspend fun getChat( + owner: KeyPair, + chatId: ChatId, + ): Result = service.getChat(owner, chatId) + .onFailure { ErrorUtils.handleError(it) } + .map { mapper.map(it) } + + override suspend fun getDmChatFeed( + owner: KeyPair, + queryOptions: QueryOptions, + ): Result = service.getDmChatFeed(owner, queryOptions) + .onFailure { ErrorUtils.handleError(it) } + .map { response -> + ChatFeedPage( + chats = response.chatsList.map { mapper.map(it) }, + pagingToken = if (response.hasPagingToken()) response.pagingToken.toPagingToken() else null, + hasMore = response.hasMore, + ) + } +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt index 2aa6c32d9..855327367 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt @@ -303,6 +303,74 @@ sealed class GetContactsError( data class Other(override val cause: Throwable? = null) : GetContactsError(message = cause?.message, cause = cause), NotifiableError } +sealed class GetChatError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : GetChatError("Denied") + class NotFound : GetChatError("Not found") + class Unrecognized : GetChatError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : GetChatError(message = cause?.message, cause = cause), NotifiableError +} + +sealed class GetDmChatFeedError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : GetDmChatFeedError("Denied") + class NotFound : GetDmChatFeedError("Not found") + class Unrecognized : GetDmChatFeedError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : GetDmChatFeedError(message = cause?.message, cause = cause), NotifiableError +} + +sealed class GetMessageError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : GetMessageError("Denied") + class NotFound : GetMessageError("Not found") + class Unrecognized : GetMessageError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : GetMessageError(message = cause?.message, cause = cause), NotifiableError +} + +sealed class GetMessagesError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : GetMessagesError("Denied") + class NotFound : GetMessagesError("Not found") + class Unrecognized : GetMessagesError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : GetMessagesError(message = cause?.message, cause = cause), NotifiableError +} + +sealed class FlipcashSendMessageError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : FlipcashSendMessageError("Denied") + class Unrecognized : FlipcashSendMessageError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : FlipcashSendMessageError(message = cause?.message, cause = cause), NotifiableError +} + +sealed class AdvancePointerError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : AdvancePointerError("Denied") + class MessageNotFound : AdvancePointerError("Message not found") + class Unrecognized : AdvancePointerError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : AdvancePointerError(message = cause?.message, cause = cause), NotifiableError +} + +sealed class NotifyIsTypingError( + override val message: String? = null, + override val cause: Throwable? = null +): CodeServerError(message, cause) { + class Denied : NotifyIsTypingError("Denied") + class Unrecognized : NotifyIsTypingError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : NotifyIsTypingError(message = cause?.message, cause = cause), NotifiableError +} + sealed class StreamEventsError( override val message: String? = null, override val cause: Throwable? = null diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatFeedPage.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatFeedPage.kt new file mode 100644 index 000000000..5bd62d3fa --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ChatFeedPage.kt @@ -0,0 +1,9 @@ +package com.flipcash.services.models.chat + +import com.flipcash.services.models.PagingToken + +data class ChatFeedPage( + val chats: List, + val pagingToken: PagingToken?, + val hasMore: Boolean, +) diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ClientMessageId.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ClientMessageId.kt new file mode 100644 index 000000000..22b4e8244 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/chat/ClientMessageId.kt @@ -0,0 +1,13 @@ +package com.flipcash.services.models.chat + +data class ClientMessageId(val bytes: ByteArray) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is ClientMessageId) return false + return bytes.contentEquals(other.bytes) + } + + override fun hashCode(): Int = bytes.contentHashCode() + + override fun toString(): String = "ClientMessageId(${bytes.size} bytes)" +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt new file mode 100644 index 000000000..266052dbb --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt @@ -0,0 +1,50 @@ +package com.flipcash.services.repository + +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMessage +import com.flipcash.services.models.chat.ClientMessageId +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState +import com.getcode.ed25519.Ed25519.KeyPair + +interface MessagingRepository { + suspend fun getMessage( + owner: KeyPair, + chatId: ChatId, + messageId: Long, + ): Result + + suspend fun getMessages( + owner: KeyPair, + chatId: ChatId, + queryOptions: QueryOptions, + ): Result> + + suspend fun getMessagesByIds( + owner: KeyPair, + chatId: ChatId, + messageIds: List, + ): Result> + + suspend fun sendMessage( + owner: KeyPair, + chatId: ChatId, + content: List, + clientMessageId: ClientMessageId, + ): Result + + suspend fun advancePointer( + owner: KeyPair, + chatId: ChatId, + pointerType: PointerType, + messageId: Long, + ): Result + + suspend fun notifyIsTyping( + owner: KeyPair, + chatId: ChatId, + state: TypingState, + ): Result +} diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatRepository.kt new file mode 100644 index 000000000..4b4600349 --- /dev/null +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatRepository.kt @@ -0,0 +1,19 @@ +package com.flipcash.services.repository + +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatFeedPage +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMetadata +import com.getcode.ed25519.Ed25519.KeyPair + +interface ChatRepository { + suspend fun getChat( + owner: KeyPair, + chatId: ChatId, + ): Result + + suspend fun getDmChatFeed( + owner: KeyPair, + queryOptions: QueryOptions, + ): Result +} diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatControllerTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatControllerTest.kt new file mode 100644 index 000000000..9c189ce3e --- /dev/null +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatControllerTest.kt @@ -0,0 +1,187 @@ +package com.flipcash.services.controllers + +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatFeedPage +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMetadata +import com.flipcash.services.models.chat.ChatType +import com.flipcash.services.repository.ChatRepository +import com.flipcash.services.user.UserManager +import com.getcode.ed25519.Ed25519 +import com.getcode.opencode.model.accounts.AccountCluster +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertSame +import kotlin.test.assertTrue +import kotlin.time.Instant + +@OptIn(ExperimentalCoroutinesApi::class) +class ChatControllerTest { + + private val repository = FakeChatRepository() + private val userManager = mockk(relaxed = true) + private val controller = ChatController(repository, userManager) + + private fun stubOwner() { + val keyPair = mockk(relaxed = true) + val cluster = mockk(relaxed = true) { + every { authority } returns mockk { every { this@mockk.keyPair } returns keyPair } + } + every { userManager.accountCluster } returns cluster + } + + // region getChat + + @Test + fun `getChat fails when no account cluster`() = runTest { + every { userManager.accountCluster } returns null + + val result = controller.getChat(ChatId(ByteArray(32))) + + assertTrue(result.isFailure) + } + + @Test + fun `getChat forwards the chatId to the repository`() = runTest { + stubOwner() + val chatId = ChatId(ByteArray(32) { 0x42 }) + repository.getChatResult = Result.success(stubMetadata(chatId)) + + controller.getChat(chatId) + + assertEquals(chatId, repository.lastChatId) + } + + @Test + fun `getChat returns the metadata from the repository`() = runTest { + stubOwner() + val chatId = ChatId(ByteArray(32) { 0x42 }) + val expected = stubMetadata(chatId) + repository.getChatResult = Result.success(expected) + + val result = controller.getChat(chatId) + + assertSame(expected, result.getOrThrow()) + } + + @Test + fun `getChat surfaces repository failures without swallowing`() = runTest { + stubOwner() + val cause = RuntimeException("denied") + repository.getChatResult = Result.failure(cause) + + val result = controller.getChat(ChatId(ByteArray(32))) + + assertTrue(result.isFailure) + assertSame(cause, result.exceptionOrNull()) + } + + // endregion + + // region getDmChatFeed + + @Test + fun `getDmChatFeed fails when no account cluster`() = runTest { + every { userManager.accountCluster } returns null + + val result = controller.getDmChatFeed() + + assertTrue(result.isFailure) + } + + @Test + fun `getDmChatFeed uses default QueryOptions when none provided`() = runTest { + stubOwner() + repository.getDmChatFeedResult = Result.success(ChatFeedPage(emptyList(), null, false)) + + controller.getDmChatFeed() + + assertEquals(QueryOptions(), repository.lastQueryOptions) + } + + @Test + fun `getDmChatFeed forwards custom query options`() = runTest { + stubOwner() + val token = listOf(0xAB.toByte()) + val options = QueryOptions(limit = 25, token = token, descending = false) + repository.getDmChatFeedResult = Result.success(ChatFeedPage(emptyList(), null, false)) + + controller.getDmChatFeed(options) + + assertEquals(25, repository.lastQueryOptions?.limit) + assertEquals(token, repository.lastQueryOptions?.token) + assertEquals(false, repository.lastQueryOptions?.descending) + } + + @Test + fun `getDmChatFeed returns page with chats and paging state`() = runTest { + stubOwner() + val chat1 = stubMetadata(ChatId(ByteArray(32) { 1 })) + val chat2 = stubMetadata(ChatId(ByteArray(32) { 2 })) + val nextToken = listOf(0xFF.toByte()) + val page = ChatFeedPage( + chats = listOf(chat1, chat2), + pagingToken = nextToken, + hasMore = true, + ) + repository.getDmChatFeedResult = Result.success(page) + + val result = controller.getDmChatFeed() + + val returned = result.getOrThrow() + assertEquals(2, returned.chats.size) + assertEquals(nextToken, returned.pagingToken) + assertTrue(returned.hasMore) + } + + @Test + fun `getDmChatFeed surfaces repository failures without swallowing`() = runTest { + stubOwner() + val cause = RuntimeException("server error") + repository.getDmChatFeedResult = Result.failure(cause) + + val result = controller.getDmChatFeed() + + assertTrue(result.isFailure) + assertSame(cause, result.exceptionOrNull()) + } + + // endregion + + // region helpers + + private fun stubMetadata(chatId: ChatId = ChatId(ByteArray(32))) = ChatMetadata( + chatId = chatId, + type = ChatType.DM, + members = emptyList(), + lastMessage = null, + lastActivity = Instant.fromEpochSeconds(1000), + ) + + // endregion +} + +// region Fakes + +private class FakeChatRepository : ChatRepository { + var getChatResult: Result = Result.failure(RuntimeException("not configured")) + var getDmChatFeedResult: Result = Result.failure(RuntimeException("not configured")) + var lastChatId: ChatId? = null + var lastQueryOptions: QueryOptions? = null + + override suspend fun getChat(owner: Ed25519.KeyPair, chatId: ChatId): Result { + lastChatId = chatId + return getChatResult + } + + override suspend fun getDmChatFeed(owner: Ed25519.KeyPair, queryOptions: QueryOptions): Result { + lastQueryOptions = queryOptions + return getDmChatFeedResult + } +} + +// endregion diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt new file mode 100644 index 000000000..cc29641cf --- /dev/null +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt @@ -0,0 +1,238 @@ +package com.flipcash.services.controllers + +import com.flipcash.services.models.QueryOptions +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMessage +import com.flipcash.services.models.chat.ClientMessageId +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState +import com.flipcash.services.repository.MessagingRepository +import com.flipcash.services.user.UserManager +import com.getcode.ed25519.Ed25519 +import com.getcode.opencode.model.accounts.AccountCluster +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertSame +import kotlin.test.assertTrue +import kotlin.time.Instant + +@OptIn(ExperimentalCoroutinesApi::class) +class MessagingControllerTest { + + private val repository = FakeMessagingRepository() + private val userManager = mockk(relaxed = true) + private val controller = MessagingController(repository, userManager) + + private val testChatId = ChatId(ByteArray(32) { 0x01 }) + + private fun stubOwner() { + val keyPair = mockk(relaxed = true) + val cluster = mockk(relaxed = true) { + every { authority } returns mockk { every { this@mockk.keyPair } returns keyPair } + } + every { userManager.accountCluster } returns cluster + } + + private fun stubMessage(id: Long = 1, text: String = "hello") = ChatMessage( + messageId = id, + senderId = listOf(1.toByte()), + content = listOf(MessageContent.Text(text)), + timestamp = Instant.fromEpochSeconds(1000), + unreadSeq = id, + ) + + // region getMessage + + @Test + fun `getMessage fails when no account cluster`() = runTest { + every { userManager.accountCluster } returns null + val result = controller.getMessage(testChatId, 1) + assertTrue(result.isFailure) + assertIs(result.exceptionOrNull()) + } + + @Test + fun `getMessage forwards chatId and messageId`() = runTest { + stubOwner() + val msg = stubMessage(42) + repository.getMessageResult = Result.success(msg) + + controller.getMessage(testChatId, 42) + + assertEquals(testChatId, repository.lastChatId) + assertEquals(42L, repository.lastMessageId) + } + + @Test + fun `getMessage returns the message from repository`() = runTest { + stubOwner() + val msg = stubMessage(1, "world") + repository.getMessageResult = Result.success(msg) + + val result = controller.getMessage(testChatId, 1) + + assertEquals("world", (result.getOrThrow().content.first() as MessageContent.Text).text) + } + + // endregion + + // region sendMessage + + @Test + fun `sendMessage fails when no account cluster`() = runTest { + every { userManager.accountCluster } returns null + val result = controller.sendMessage(testChatId, listOf(MessageContent.Text("hi")), ClientMessageId(ByteArray(16))) + assertTrue(result.isFailure) + } + + @Test + fun `sendMessage forwards content and clientMessageId`() = runTest { + stubOwner() + val clientId = ClientMessageId(ByteArray(16) { 0xAB.toByte() }) + val content = listOf(MessageContent.Text("test")) + repository.sendMessageResult = Result.success(stubMessage()) + + controller.sendMessage(testChatId, content, clientId) + + assertEquals(content, repository.lastContent) + assertEquals(clientId, repository.lastClientMessageId) + } + + @Test + fun `sendMessage returns server-assigned message`() = runTest { + stubOwner() + val serverMsg = stubMessage(99, "confirmed") + repository.sendMessageResult = Result.success(serverMsg) + + val result = controller.sendMessage(testChatId, listOf(MessageContent.Text("test")), ClientMessageId(ByteArray(16))) + + assertEquals(99L, result.getOrThrow().messageId) + } + + // endregion + + // region advancePointer + + @Test + fun `advancePointer forwards pointer type and message id`() = runTest { + stubOwner() + repository.advancePointerResult = Result.success(Unit) + + controller.advancePointer(testChatId, PointerType.READ, 50) + + assertEquals(PointerType.READ, repository.lastPointerType) + assertEquals(50L, repository.lastMessageId) + } + + @Test + fun `advancePointer surfaces repository failure`() = runTest { + stubOwner() + val cause = RuntimeException("not found") + repository.advancePointerResult = Result.failure(cause) + + val result = controller.advancePointer(testChatId, PointerType.DELIVERED, 10) + + assertTrue(result.isFailure) + assertSame(cause, result.exceptionOrNull()) + } + + // endregion + + // region notifyIsTyping + + @Test + fun `notifyIsTyping forwards typing state`() = runTest { + stubOwner() + repository.notifyIsTypingResult = Result.success(Unit) + + controller.notifyIsTyping(testChatId, TypingState.STARTED_TYPING) + + assertEquals(TypingState.STARTED_TYPING, repository.lastTypingState) + } + + // endregion + + // region getMessages + + @Test + fun `getMessages uses default QueryOptions`() = runTest { + stubOwner() + repository.getMessagesResult = Result.success(emptyList()) + + controller.getMessages(testChatId) + + assertEquals(QueryOptions(), repository.lastQueryOptions) + } + + @Test + fun `getMessagesByIds forwards message ids`() = runTest { + stubOwner() + val ids = listOf(1L, 2L, 3L) + repository.getMessagesByIdsResult = Result.success(listOf(stubMessage(1), stubMessage(2), stubMessage(3))) + + val result = controller.getMessagesByIds(testChatId, ids) + + assertEquals(ids, repository.lastMessageIds) + assertEquals(3, result.getOrThrow().size) + } + + // endregion +} + +// region Fakes + +private class FakeMessagingRepository : MessagingRepository { + var getMessageResult: Result = Result.failure(RuntimeException("not configured")) + var getMessagesResult: Result> = Result.failure(RuntimeException("not configured")) + var getMessagesByIdsResult: Result> = Result.failure(RuntimeException("not configured")) + var sendMessageResult: Result = Result.failure(RuntimeException("not configured")) + var advancePointerResult: Result = Result.failure(RuntimeException("not configured")) + var notifyIsTypingResult: Result = Result.failure(RuntimeException("not configured")) + + var lastChatId: ChatId? = null + var lastMessageId: Long? = null + var lastMessageIds: List? = null + var lastQueryOptions: QueryOptions? = null + var lastContent: List? = null + var lastClientMessageId: ClientMessageId? = null + var lastPointerType: PointerType? = null + var lastTypingState: TypingState? = null + + override suspend fun getMessage(owner: Ed25519.KeyPair, chatId: ChatId, messageId: Long): Result { + lastChatId = chatId; lastMessageId = messageId + return getMessageResult + } + + override suspend fun getMessages(owner: Ed25519.KeyPair, chatId: ChatId, queryOptions: QueryOptions): Result> { + lastChatId = chatId; lastQueryOptions = queryOptions + return getMessagesResult + } + + override suspend fun getMessagesByIds(owner: Ed25519.KeyPair, chatId: ChatId, messageIds: List): Result> { + lastChatId = chatId; lastMessageIds = messageIds + return getMessagesByIdsResult + } + + override suspend fun sendMessage(owner: Ed25519.KeyPair, chatId: ChatId, content: List, clientMessageId: ClientMessageId): Result { + lastChatId = chatId; lastContent = content; lastClientMessageId = clientMessageId + return sendMessageResult + } + + override suspend fun advancePointer(owner: Ed25519.KeyPair, chatId: ChatId, pointerType: PointerType, messageId: Long): Result { + lastChatId = chatId; lastPointerType = pointerType; lastMessageId = messageId + return advancePointerResult + } + + override suspend fun notifyIsTyping(owner: Ed25519.KeyPair, chatId: ChatId, state: TypingState): Result { + lastChatId = chatId; lastTypingState = state + return notifyIsTypingResult + } +} + +// endregion diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapperTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapperTest.kt new file mode 100644 index 000000000..9e07ccbc9 --- /dev/null +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/internal/domain/ChatMetadataMapperTest.kt @@ -0,0 +1,114 @@ +package com.flipcash.services.internal.domain + +import com.codeinc.flipcash.gen.chat.v1.Model as ChatModel +import com.codeinc.flipcash.gen.common.v1.Common +import com.codeinc.flipcash.gen.messaging.v1.Model as MessagingModel +import com.codeinc.flipcash.gen.profile.v1.Model as ProfileModel +import com.flipcash.services.models.chat.ChatType +import com.flipcash.services.models.chat.PointerType +import com.google.protobuf.ByteString +import com.google.protobuf.Timestamp +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull + +class ChatMetadataMapperTest { + + private val mapper = ChatMetadataMapper( + userProfileMapper = UserProfileMapper(socialMapper = SocialAccountMapper()), + ) + + private fun chatId(byte: Byte = 1): Common.ChatId = + Common.ChatId.newBuilder() + .setValue(ByteString.copyFrom(ByteArray(32) { byte })) + .build() + + private fun userId(byte: Byte = 2): Common.UserId = + Common.UserId.newBuilder() + .setValue(ByteString.copyFrom(ByteArray(16) { byte })) + .build() + + private fun messageId(value: Long = 1): MessagingModel.MessageId = + MessagingModel.MessageId.newBuilder().setValue(value).build() + + private fun textContent(text: String = "hello"): MessagingModel.Content = + MessagingModel.Content.newBuilder() + .setText(MessagingModel.TextContent.newBuilder().setText(text)) + .build() + + private fun message( + id: Long = 1, + text: String = "hello", + ): MessagingModel.Message = MessagingModel.Message.newBuilder() + .setMessageId(messageId(id)) + .setSenderId(userId()) + .addContent(textContent(text)) + .setTs(Timestamp.newBuilder().setSeconds(1000)) + .setUnreadSeq(1) + .build() + + private fun member( + userIdByte: Byte = 2, + displayName: String = "User", + ): ChatModel.Member = ChatModel.Member.newBuilder() + .setUserId(userId(userIdByte)) + .setUserProfile(ProfileModel.UserProfile.newBuilder().setDisplayName(displayName)) + .addPointers( + MessagingModel.Pointer.newBuilder() + .setType(MessagingModel.Pointer.Type.READ) + .setUserId(userId(userIdByte)) + .setValue(messageId(5)) + ) + .build() + + private fun metadata( + block: ChatModel.Metadata.Builder.() -> Unit = {}, + ): ChatModel.Metadata = ChatModel.Metadata.newBuilder() + .setChatId(chatId()) + .setType(ChatModel.Metadata.ChatType.DM) + .setLastActivity(Timestamp.newBuilder().setSeconds(2000)) + .apply(block) + .build() + + @Test + fun `maps chat type DM`() { + val result = mapper.map(metadata()) + assertEquals(ChatType.DM, result.type) + } + + @Test + fun `maps chat id bytes`() { + val result = mapper.map(metadata()) + assertEquals(32, result.chatId.bytes.size) + } + + @Test + fun `maps last activity`() { + val result = mapper.map(metadata()) + assertEquals(2000L, result.lastActivity.epochSeconds) + } + + @Test + fun `maps members with pointers`() { + val result = mapper.map(metadata { addMembers(member()) }) + assertEquals(1, result.members.size) + assertEquals("User", result.members[0].userProfile.displayName) + assertEquals(1, result.members[0].pointers.size) + assertEquals(PointerType.READ, result.members[0].pointers[0].type) + assertEquals(5L, result.members[0].pointers[0].value) + } + + @Test + fun `maps last message when present`() { + val result = mapper.map(metadata { setLastMessage(message(text = "hey")) }) + assertNotNull(result.lastMessage) + assertEquals(1L, result.lastMessage!!.messageId) + } + + @Test + fun `last message null when absent`() { + val result = mapper.map(metadata()) + assertNull(result.lastMessage) + } +} diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt index eb6eb0313..9bb517507 100644 --- a/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt @@ -127,6 +127,87 @@ class ErrorsTest { assertIs(PlacePoolBetError.MaxBetsReceived()) } + // -- GetMessageError -- + + @Test + fun `GetMessageError subtypes are CodeServerError`() { + assertIs(GetMessageError.Denied()) + assertIs(GetMessageError.NotFound()) + assertIs(GetMessageError.Unrecognized()) + assertIs(GetMessageError.Other()) + } + + // -- GetMessagesError -- + + @Test + fun `GetMessagesError subtypes are CodeServerError`() { + assertIs(GetMessagesError.Denied()) + assertIs(GetMessagesError.NotFound()) + assertIs(GetMessagesError.Unrecognized()) + assertIs(GetMessagesError.Other()) + } + + // -- FlipcashSendMessageError -- + + @Test + fun `FlipcashSendMessageError subtypes are CodeServerError`() { + assertIs(FlipcashSendMessageError.Denied()) + assertIs(FlipcashSendMessageError.Unrecognized()) + assertIs(FlipcashSendMessageError.Other()) + } + + // -- AdvancePointerError -- + + @Test + fun `AdvancePointerError has expected variants`() { + assertEquals("Denied", AdvancePointerError.Denied().message) + assertEquals("Message not found", AdvancePointerError.MessageNotFound().message) + assertIs(AdvancePointerError.Unrecognized()) + } + + // -- NotifyIsTypingError -- + + @Test + fun `NotifyIsTypingError subtypes are CodeServerError`() { + assertIs(NotifyIsTypingError.Denied()) + assertIs(NotifyIsTypingError.Unrecognized()) + assertIs(NotifyIsTypingError.Other()) + } + + // -- GetChatError -- + + @Test + fun `GetChatError subtypes are CodeServerError`() { + assertIs(GetChatError.Denied()) + assertIs(GetChatError.NotFound()) + assertIs(GetChatError.Unrecognized()) + assertIs(GetChatError.Other()) + } + + @Test + fun `GetChatError has expected messages`() { + assertEquals("Denied", GetChatError.Denied().message) + assertEquals("Not found", GetChatError.NotFound().message) + } + + // -- GetDmChatFeedError -- + + @Test + fun `GetDmChatFeedError subtypes are CodeServerError`() { + assertIs(GetDmChatFeedError.Denied()) + assertIs(GetDmChatFeedError.NotFound()) + assertIs(GetDmChatFeedError.Unrecognized()) + assertIs(GetDmChatFeedError.Other()) + } + + @Test + fun `GetDmChatFeedError Other preserves cause`() { + val root = RuntimeException("feed broke") + val error = GetDmChatFeedError.Other(root) + assertSame(root, error.cause) + assertEquals("feed broke", error.message) + } + // -- StreamEventsError -- @Test From 4c718660eb2633d4ff8e927280edf39f3359cd3a Mon Sep 17 00:00:00 2001 From: Brandon McAnsh Date: Fri, 5 Jun 2026 09:37:10 -0400 Subject: [PATCH 4/4] feat(messaging): add ChatMessaging service layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All 6 messaging RPCs — GetMessage, GetMessages, GetMessagesByIds, SendMessage, AdvancePointer, NotifyIsTyping — Api, Service, Repository, and Controller. Introduces ClientMessageId domain model, SendMessageError, GetMessageError, GetMessagesError, AdvancePointerError, and NotifyIsTypingError sealed classes, and Hilt wiring. Signed-off-by: Brandon McAnsh --- .../controllers/ChatMessagingController.kt | 6 +++--- .../flipcash/services/inject/FlipcashModule.kt | 12 ++++++------ .../internal/network/api/ChatMessagingApi.kt | 2 +- .../network/services/ChatMessagingService.kt | 16 ++++++++-------- .../InternalChatMessagingRepository.kt | 10 +++++----- .../com/flipcash/services/models/Errors.kt | 8 ++++---- .../repository/ChatMessagingRepository.kt | 2 +- .../controllers/ChatMessagingControllerTest.kt | 10 +++++----- .../com/flipcash/services/models/ErrorsTest.kt | 10 +++++----- 9 files changed, 38 insertions(+), 38 deletions(-) diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt index b98d7bb9f..87f78aefb 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/ChatMessagingController.kt @@ -7,14 +7,14 @@ import com.flipcash.services.models.chat.ClientMessageId import com.flipcash.services.models.chat.MessageContent import com.flipcash.services.models.chat.PointerType import com.flipcash.services.models.chat.TypingState -import com.flipcash.services.repository.MessagingRepository +import com.flipcash.services.repository.ChatMessagingRepository import com.flipcash.services.user.UserManager import javax.inject.Inject import javax.inject.Singleton @Singleton -class MessagingController @Inject constructor( - private val repository: MessagingRepository, +class ChatMessagingController @Inject constructor( + private val repository: ChatMessagingRepository, private val userManager: UserManager, ) { private fun requireOwner() = userManager.accountCluster?.authority?.keyPair diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt index 81817618b..26cf1332e 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt @@ -14,7 +14,7 @@ import com.flipcash.services.internal.network.services.AccountService import com.flipcash.services.internal.network.services.ActivityFeedService import com.flipcash.services.internal.network.services.ChatService import com.flipcash.services.internal.network.services.EventStreamingService -import com.flipcash.services.internal.network.services.MessagingService +import com.flipcash.services.internal.network.services.ChatMessagingService import com.flipcash.services.internal.network.services.EmailVerificationService import com.flipcash.services.internal.network.services.ContactListService import com.flipcash.services.internal.network.services.ModerationService @@ -29,7 +29,7 @@ import com.flipcash.services.internal.repositories.InternalAccountRepository import com.flipcash.services.internal.repositories.InternalActivityFeedRepository import com.flipcash.services.internal.repositories.InternalChatRepository import com.flipcash.services.internal.repositories.InternalEventStreamingRepository -import com.flipcash.services.internal.repositories.InternalMessagingRepository +import com.flipcash.services.internal.repositories.InternalChatMessagingRepository import com.flipcash.services.internal.repositories.InternalContactListRepository import com.flipcash.services.internal.repositories.InternalContactVerificationRepository import com.flipcash.services.internal.repositories.InternalModerationRepository @@ -43,7 +43,7 @@ import com.flipcash.services.repository.AccountRepository import com.flipcash.services.repository.ActivityFeedRepository import com.flipcash.services.repository.ChatRepository import com.flipcash.services.repository.EventStreamingRepository -import com.flipcash.services.repository.MessagingRepository +import com.flipcash.services.repository.ChatMessagingRepository import com.flipcash.services.repository.ContactListRepository import com.flipcash.services.repository.ContactVerificationRepository import com.flipcash.services.repository.ModerationRepository @@ -136,9 +136,9 @@ internal object FlipcashModule { ): EventStreamingRepository = InternalEventStreamingRepository(service) @Provides - internal fun providesFlipcashMessagingRepository( - service: MessagingService, - ): MessagingRepository = InternalMessagingRepository(service) + internal fun providesChatMessagingRepository( + service: ChatMessagingService, + ): ChatMessagingRepository = InternalChatMessagingRepository(service) @Provides internal fun providesContactListRepository( diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt index ab9f0fe71..3b7f37c0f 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/ChatMessagingApi.kt @@ -28,7 +28,7 @@ import javax.inject.Inject import javax.inject.Singleton @Singleton -internal class MessagingApi @Inject constructor( +internal class ChatMessagingApi @Inject constructor( @FlipcashManagedChannel managedChannel: ManagedChannel, ) : GrpcApi(managedChannel) { diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt index 4fa3283e2..de0c4b60a 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/ChatMessagingService.kt @@ -2,9 +2,9 @@ package com.flipcash.services.internal.network.services import com.codeinc.flipcash.gen.messaging.v1.MessagingService as RpcMessagingService import com.codeinc.flipcash.gen.messaging.v1.Model as MessagingModel -import com.flipcash.services.internal.network.api.MessagingApi +import com.flipcash.services.internal.network.api.ChatMessagingApi import com.flipcash.services.models.AdvancePointerError -import com.flipcash.services.models.FlipcashSendMessageError +import com.flipcash.services.models.SendMessageError import com.flipcash.services.models.GetMessageError import com.flipcash.services.models.GetMessagesError import com.flipcash.services.models.NotifyIsTypingError @@ -19,8 +19,8 @@ import com.getcode.opencode.internal.network.extensions.foldWithSuppression import com.getcode.opencode.utils.toValidationOrElse import javax.inject.Inject -internal class MessagingService @Inject constructor( - private val api: MessagingApi, +internal class ChatMessagingService @Inject constructor( + private val api: ChatMessagingApi, ) { suspend fun getMessage( owner: KeyPair, @@ -105,13 +105,13 @@ internal class MessagingService @Inject constructor( onSuccess = { response -> when (response.result) { RpcMessagingService.SendMessageResponse.Result.OK -> Result.success(response.message) - RpcMessagingService.SendMessageResponse.Result.DENIED -> Result.failure(FlipcashSendMessageError.Denied()) - RpcMessagingService.SendMessageResponse.Result.UNRECOGNIZED -> Result.failure(FlipcashSendMessageError.Unrecognized()) - else -> Result.failure(FlipcashSendMessageError.Other()) + RpcMessagingService.SendMessageResponse.Result.DENIED -> Result.failure(SendMessageError.Denied()) + RpcMessagingService.SendMessageResponse.Result.UNRECOGNIZED -> Result.failure(SendMessageError.Unrecognized()) + else -> Result.failure(SendMessageError.Other()) } }, onFailure = { cause -> - Result.failure(cause.toValidationOrElse { FlipcashSendMessageError.Other(cause = it) }) + Result.failure(cause.toValidationOrElse { SendMessageError.Other(cause = it) }) } ) } diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt index b421ee65e..05c8c9988 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/repositories/InternalChatMessagingRepository.kt @@ -1,7 +1,7 @@ package com.flipcash.services.internal.repositories import com.flipcash.services.internal.network.extensions.toChatMessage -import com.flipcash.services.internal.network.services.MessagingService +import com.flipcash.services.internal.network.services.ChatMessagingService import com.flipcash.services.models.QueryOptions import com.flipcash.services.models.chat.ChatId import com.flipcash.services.models.chat.ChatMessage @@ -9,13 +9,13 @@ import com.flipcash.services.models.chat.ClientMessageId import com.flipcash.services.models.chat.MessageContent import com.flipcash.services.models.chat.PointerType import com.flipcash.services.models.chat.TypingState -import com.flipcash.services.repository.MessagingRepository +import com.flipcash.services.repository.ChatMessagingRepository import com.getcode.ed25519.Ed25519.KeyPair import com.getcode.utils.ErrorUtils -internal class InternalMessagingRepository( - private val service: MessagingService, -) : MessagingRepository { +internal class InternalChatMessagingRepository( + private val service: ChatMessagingService, +) : ChatMessagingRepository { override suspend fun getMessage( owner: KeyPair, diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt index 855327367..04e96fb3c 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/models/Errors.kt @@ -343,13 +343,13 @@ sealed class GetMessagesError( data class Other(override val cause: Throwable? = null) : GetMessagesError(message = cause?.message, cause = cause), NotifiableError } -sealed class FlipcashSendMessageError( +sealed class SendMessageError( override val message: String? = null, override val cause: Throwable? = null ): CodeServerError(message, cause) { - class Denied : FlipcashSendMessageError("Denied") - class Unrecognized : FlipcashSendMessageError("Unrecognized"), NotifiableError - data class Other(override val cause: Throwable? = null) : FlipcashSendMessageError(message = cause?.message, cause = cause), NotifiableError + class Denied : SendMessageError("Denied") + class Unrecognized : SendMessageError("Unrecognized"), NotifiableError + data class Other(override val cause: Throwable? = null) : SendMessageError(message = cause?.message, cause = cause), NotifiableError } sealed class AdvancePointerError( diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt index 266052dbb..6b5b350f9 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/repository/ChatMessagingRepository.kt @@ -9,7 +9,7 @@ import com.flipcash.services.models.chat.PointerType import com.flipcash.services.models.chat.TypingState import com.getcode.ed25519.Ed25519.KeyPair -interface MessagingRepository { +interface ChatMessagingRepository { suspend fun getMessage( owner: KeyPair, chatId: ChatId, diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt index cc29641cf..d19fb609f 100644 --- a/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/ChatMessagingControllerTest.kt @@ -7,7 +7,7 @@ import com.flipcash.services.models.chat.ClientMessageId import com.flipcash.services.models.chat.MessageContent import com.flipcash.services.models.chat.PointerType import com.flipcash.services.models.chat.TypingState -import com.flipcash.services.repository.MessagingRepository +import com.flipcash.services.repository.ChatMessagingRepository import com.flipcash.services.user.UserManager import com.getcode.ed25519.Ed25519 import com.getcode.opencode.model.accounts.AccountCluster @@ -23,11 +23,11 @@ import kotlin.test.assertTrue import kotlin.time.Instant @OptIn(ExperimentalCoroutinesApi::class) -class MessagingControllerTest { +class ChatMessagingControllerTest { - private val repository = FakeMessagingRepository() + private val repository = FakeChatMessagingRepository() private val userManager = mockk(relaxed = true) - private val controller = MessagingController(repository, userManager) + private val controller = ChatMessagingController(repository, userManager) private val testChatId = ChatId(ByteArray(32) { 0x01 }) @@ -187,7 +187,7 @@ class MessagingControllerTest { // region Fakes -private class FakeMessagingRepository : MessagingRepository { +private class FakeChatMessagingRepository : ChatMessagingRepository { var getMessageResult: Result = Result.failure(RuntimeException("not configured")) var getMessagesResult: Result> = Result.failure(RuntimeException("not configured")) var getMessagesByIdsResult: Result> = Result.failure(RuntimeException("not configured")) diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt index 9bb517507..d7dc58b99 100644 --- a/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/models/ErrorsTest.kt @@ -147,13 +147,13 @@ class ErrorsTest { assertIs(GetMessagesError.Other()) } - // -- FlipcashSendMessageError -- + // -- SendMessageError -- @Test - fun `FlipcashSendMessageError subtypes are CodeServerError`() { - assertIs(FlipcashSendMessageError.Denied()) - assertIs(FlipcashSendMessageError.Unrecognized()) - assertIs(FlipcashSendMessageError.Other()) + fun `SendMessageError subtypes are CodeServerError`() { + assertIs(SendMessageError.Denied()) + assertIs(SendMessageError.Unrecognized()) + assertIs(SendMessageError.Other()) } // -- AdvancePointerError --