diff --git a/components/forwarder/envelope.go b/components/forwarder/envelope.go index b8de18763..dee54850b 100644 --- a/components/forwarder/envelope.go +++ b/components/forwarder/envelope.go @@ -1,8 +1,6 @@ package forwarder import ( - "encoding/json" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/pkg/errors" @@ -40,13 +38,17 @@ func (e *messageEnvelope) validate() error { return nil } -func wrapMessageInEnvelope(destinationTopic string, msg *message.Message) (*message.Message, error) { +func wrapMessageInEnvelope( + destinationTopic string, + msg *message.Message, + marshaler Marshaler, +) (*message.Message, error) { envelope, err := newMessageEnvelope(destinationTopic, msg) if err != nil { return nil, errors.Wrap(err, "cannot envelope a message") } - envelopedMessage, err := json.Marshal(envelope) + envelopedMessage, err := marshaler.Marshal(envelope) if err != nil { return nil, errors.Wrap(err, "cannot marshal a message") } @@ -57,9 +59,12 @@ func wrapMessageInEnvelope(destinationTopic string, msg *message.Message) (*mess return wrappedMsg, nil } -func unwrapMessageFromEnvelope(msg *message.Message) (destinationTopic string, unwrappedMsg *message.Message, err error) { +func unwrapMessageFromEnvelope( + msg *message.Message, + marshaler Marshaler, +) (destinationTopic string, unwrappedMsg *message.Message, err error) { envelopedMsg := messageEnvelope{} - if err := json.Unmarshal(msg.Payload, &envelopedMsg); err != nil { + if err := marshaler.Unmarshal(msg.Payload, &envelopedMsg); err != nil { return "", nil, errors.Wrap(err, "cannot unmarshal message wrapped in an envelope") } diff --git a/components/forwarder/envelope_test.go b/components/forwarder/envelope_test.go index 1d3a706e0..bf1293726 100644 --- a/components/forwarder/envelope_test.go +++ b/components/forwarder/envelope_test.go @@ -14,6 +14,7 @@ type contextKey string func TestEnvelope(t *testing.T) { expectedUUID := watermill.NewUUID() + marshaler := DefaultMarshaler{} expectedPayload := message.Payload("msg content") expectedMetadata := message.Metadata{"key": "value"} expectedDestinationTopic := "dest_topic" @@ -24,14 +25,14 @@ func TestEnvelope(t *testing.T) { msg.Metadata = expectedMetadata msg.SetContext(ctx) - wrappedMsg, err := wrapMessageInEnvelope(expectedDestinationTopic, msg) + wrappedMsg, err := wrapMessageInEnvelope(expectedDestinationTopic, msg, marshaler) require.NoError(t, err) require.NotNil(t, wrappedMsg) v, ok := wrappedMsg.Context().Value(contextKey("key")).(string) require.True(t, ok) require.Equal(t, "value", v) - destinationTopic, unwrappedMsg, err := unwrapMessageFromEnvelope(wrappedMsg) + destinationTopic, unwrappedMsg, err := unwrapMessageFromEnvelope(wrappedMsg, marshaler) require.NoError(t, err) require.NotNil(t, unwrappedMsg) assert.Equal(t, expectedUUID, unwrappedMsg.UUID) diff --git a/components/forwarder/forwarder.go b/components/forwarder/forwarder.go index 09fb26b56..f4bdeaac2 100644 --- a/components/forwarder/forwarder.go +++ b/components/forwarder/forwarder.go @@ -30,6 +30,11 @@ type Config struct { // // If router is provided, it's not necessary to call `Forwarder.Run()` if the router is started with `router.Run()`. Router *message.Router + + // Marshaler is used to deserialize envelopes received on ForwarderTopic. + // It must match the Marshaler used by the decorated Publisher. + // Defaults to DefaultMarshaler, which uses encoding/json. + Marshaler Marshaler } func (c *Config) setDefaults() { @@ -39,6 +44,9 @@ func (c *Config) setDefaults() { if c.ForwarderTopic == "" { c.ForwarderTopic = defaultForwarderTopic } + if c.Marshaler == nil { + c.Marshaler = DefaultMarshaler{} + } } func (c *Config) Validate() error { @@ -64,7 +72,12 @@ type Forwarder struct { // // Note: Keep in mind that by default the forwarder will nack all messages which weren't sent using a decorated publisher. // You can change this behavior by passing a middleware which will ack them instead. -func NewForwarder(subscriberIn message.Subscriber, publisherOut message.Publisher, logger watermill.LoggerAdapter, config Config) (*Forwarder, error) { +func NewForwarder( + subscriberIn message.Subscriber, + publisherOut message.Publisher, + logger watermill.LoggerAdapter, + config Config, +) (*Forwarder, error) { config.setDefaults() routerConfig := message.RouterConfig{CloseTimeout: config.CloseTimeout} @@ -117,7 +130,7 @@ func (f *Forwarder) Running() chan struct{} { } func (f *Forwarder) forwardMessage(msg *message.Message) error { - destTopic, unwrappedMsg, err := unwrapMessageFromEnvelope(msg) + destTopic, unwrappedMsg, err := unwrapMessageFromEnvelope(msg, f.config.Marshaler) if err != nil { f.logger.Error("Could not unwrap a message from an envelope", err, watermill.LogFields{ "uuid": msg.UUID, diff --git a/components/forwarder/marshaler.go b/components/forwarder/marshaler.go new file mode 100644 index 000000000..2b062faa1 --- /dev/null +++ b/components/forwarder/marshaler.go @@ -0,0 +1,27 @@ +package forwarder + +import "encoding/json" + +// Marshaler is used by the Forwarder Publisher to serialize the envelope sent +// to the forwarder topic, and by the Forwarder consumer to deserialize it. +// +// The interface matches the signatures of encoding/json's Marshal and Unmarshal, +// so any drop-in JSON library (e.g. github.com/bytedance/sonic, github.com/goccy/go-json) +// can be adapted with a small wrapper. +// +// The default is DefaultMarshaler, which uses encoding/json. +type Marshaler interface { + Marshal(v any) ([]byte, error) + Unmarshal(data []byte, v any) error +} + +// DefaultMarshaler uses the standard library's encoding/json package. +type DefaultMarshaler struct{} + +func (DefaultMarshaler) Marshal(v any) ([]byte, error) { + return json.Marshal(v) +} + +func (DefaultMarshaler) Unmarshal(data []byte, v any) error { + return json.Unmarshal(data, v) +} diff --git a/components/forwarder/publisher.go b/components/forwarder/publisher.go index 9d1836791..676a5bfea 100644 --- a/components/forwarder/publisher.go +++ b/components/forwarder/publisher.go @@ -5,16 +5,25 @@ import ( "github.com/pkg/errors" ) +// PublisherConfig configures the decorating Publisher returned by NewPublisher. type PublisherConfig struct { // ForwarderTopic is a topic which the forwarder is listening to. Publisher will send enveloped messages to this topic. // Defaults to `forwarder_topic`. ForwarderTopic string + + // Marshaler is used to serialize envelopes sent to ForwarderTopic. + // It must match the Marshaler used by the Forwarder consuming the topic. + // Defaults to DefaultMarshaler, which uses encoding/json. + Marshaler Marshaler } func (c *PublisherConfig) setDefaults() { if c.ForwarderTopic == "" { c.ForwarderTopic = defaultForwarderTopic } + if c.Marshaler == nil { + c.Marshaler = DefaultMarshaler{} + } } func (c *PublisherConfig) Validate() error { @@ -44,7 +53,7 @@ func NewPublisher(publisher message.Publisher, config PublisherConfig) *Publishe func (p *Publisher) Publish(topic string, messages ...*message.Message) error { envelopedMessages := make([]*message.Message, 0, len(messages)) for _, msg := range messages { - envelopedMsg, err := wrapMessageInEnvelope(topic, msg) + envelopedMsg, err := wrapMessageInEnvelope(topic, msg, p.config.Marshaler) if err != nil { return errors.Wrapf(err, "cannot wrap message, target topic: '%s', uuid: '%s'", topic, msg.UUID) }