Skip to content

Fix replication sent$ emitting documents in master format#8400

Open
pubkey wants to merge 1 commit intomasterfrom
claude/fix-replication-bug-7pmDJ
Open

Fix replication sent$ emitting documents in master format#8400
pubkey wants to merge 1 commit intomasterfrom
claude/fix-replication-bug-7pmDJ

Conversation

@pubkey
Copy link
Copy Markdown
Owner

@pubkey pubkey commented Apr 19, 2026

This PR contains:

  • A BUGFIX
  • IMPROVED TESTS
  • A CHANGELOG ENTRY

Describe the problem you have without this PR

The sent$ observable in the replication state was emitting documents in the master format (using the user-defined deletedField like is_deleted) instead of the typed WithDeleted<RxDocType> format (with _deleted: boolean). This occurred because the deletedField swap operation was mutating the same row object that was later forwarded to sent$ subscribers, causing the format conversion to leak into the observable stream.

What changed

Source Code Fix (src/plugins/replication/index.ts):

  • Modified the deletedField swap logic to create a new row object instead of mutating the original
  • The swap now returns a new object with the converted newDocumentState and assumedMasterState, leaving the original row intact
  • This ensures that the row forwarded to processed.up (which feeds sent$) retains the _deleted format

Test Coverage (test/unit/replication.test.ts):

  • Added comprehensive test 'sent$ must emit documents in WithDeleted format with _deleted field when deletedField is custom'
  • Test verifies that:
    • The push handler receives documents in the master format (with custom deletedField)
    • The sent$ observable emits documents in the WithDeleted format (with _deleted: boolean)
    • Both alive and deleted documents are correctly formatted in each stream

Test Plan

The added unit test covers the fix by:

  1. Creating a replication with a custom deletedField (is_deleted)
  2. Inserting and deleting documents
  3. Verifying the push handler receives the master format
  4. Verifying sent$ emits the WithDeleted format with _deleted field
  5. Confirming both _deleted=false and _deleted=true cases work correctly

Existing tests continue to pass, and the new test specifically validates the corrected behavior.

https://claude.ai/code/session_0125qhPLxzvT8XV8ADq94zQZ

The sent$ observable is typed as Observable<WithDeleted<RxDocType>> and
must emit documents with `_deleted: boolean`. When using a custom
deletedField, the upstream's row mutation in masterWrite swapped the
field on the same row object that was later forwarded to processed.up,
causing sent$ to emit docs with the master-format field (e.g.
`is_deleted`) and an undefined `_deleted`.

Create new row objects for the swap instead of mutating the originals.
@github-actions
Copy link
Copy Markdown
Contributor

✅ Verify Test Reproduction: Tests FAILED without the fix (expected)

This confirms the changed tests correctly reproduce the bug that the source changes fix.

This workflow runs the changed tests without the source fix to verify they reproduce the bug.

Show output
...(truncated, showing last 200 of 2897 lines)
        �[32m✓ �[39mcannot observe non-existent field
    .deleted$
      �[32m✓ �[39mshould not emit when deleted state has not changed
      positive
        �[32m✓ �[39mdeleted$ is true, on delete
    .$
      �[32m✓ �[39mshould emit a RxDocument, not only the document data
    .get$()
      negative
        �[32m✓ �[39mprimary cannot be observed
        �[32m✓ �[39mfinal fields cannot be observed
    issues
      �[32m✓ �[39m#4546 - should return the same valueObj for the same objPath
      �[32m✓ �[39mget$() on nested object path should not emit when unrelated field changes

  cleanup.test.js
    basics
      �[32m✓ �[39mshould clean up the deleted documents
      �[32m✓ �[39mshould work by manually calling RxCollection.cleanup()
    cleanup and replication
      �[32m✓ �[39mshould pause the cleanup when a replication is not in sync
      �[32m✓ �[39mshould also run a cleanup on the replication state meta data
    issues
      �[32m✓ �[39mminimumDeletedTime not respected
      �[32m✓ �[39mshould correctly loop cleanup when storage cleanup returns false (batched cleanup)
      �[32m✓ �[39mcleanup() should return true as stated by the TypeScript return type
      �[32m✓ �[39mfields with umlauts and emojis could break the state after cleanup in some storages

  hooks.test.js
    get/set
      �[32m✓ �[39mshould set a hook
      �[32m✓ �[39mshould get a hook
      �[32m✓ �[39mshould get a parallel hook
    insert
      pre
        positive
          �[32m✓ �[39mseries
          �[32m✓ �[39mparallel
          �[32m✓ �[39mshould save a modified document
          �[32m✓ �[39masync: should save a modified document
          �[32m✓ �[39mshould not insert if hook throws
          �[32m✓ �[39mshould have the collection bound to the this-scope
      post
        positive
          �[32m✓ �[39mseries
          �[32m✓ �[39mparallel
          �[32m✓ �[39mshould call post insert hook after bulkInsert
    save
      pre
        positive
          �[32m✓ �[39mseries
          �[32m✓ �[39mparallel
          �[32m✓ �[39mshould save a modified document
          �[32m✓ �[39masync: should save a modified document
          �[32m✓ �[39mshould not save if hook throws
      post
        positive
          �[32m✓ �[39mseries
          �[32m✓ �[39mparallel
          �[32m✓ �[39mshould receive the RxDocument instance as second argument
    remove
      pre
        positive
          �[32m✓ �[39mseries
          �[32m✓ �[39mparallel
          �[32m✓ �[39mshould not remove if hook throws
          �[32m✓ �[39mshould call pre remove hook before bulkRemove
          �[32m✓ �[39mshould keep the field value that was added by the hook
      post
        positive
          �[32m✓ �[39mseries
          �[32m✓ �[39mparallel
          �[32m✓ �[39mshould have the collection bound to the this-scope
          �[32m✓ �[39mshould call post remove hook after bulkRemove
    postCreate
      positive
        �[32m✓ �[39mshould define a getter
      negative
        �[32m✓ �[39mshould throw when adding an async-hook
    issues
      �[32m✓ �[39mISSUE #158 : Throwing error in async preInsert does not prevent insert

  rx-pipeline.test.js
    basics
      �[32m✓ �[39madd and remove a pipeline
      �[32m✓ �[39mwrite some document depending on another
      �[32m✓ �[39mwrite some document depending on another with schema validator
      �[32m✓ �[39mshould store the transformed data to the destination
    .awaitIdle()
      �[32m✓ �[39mshould have updated its internal timestamps
    error handling
      �[32m✓ �[39mshould not swallow the error if the handler throws
      �[32m✓ �[39mshould not swallow the error if the handler throws (async)
      �[32m✓ �[39mshould not break reads on destination after handler throws
    checkpoints
      �[32m✓ �[39mshould continue from the correct checkpoint
    multiInstance
      �[32m✓ �[39mshould only run the pipeline at the leader
      �[32m✓ �[39mshould halt reads on other tab while pipeline is running
    transactional behavior
      �[32m✓ �[39mshould not block reads/writes that come from inside the pipeline handler
      �[32m✓ �[39mshould not block reads that come from inside the pipeline handler when already cached outside
      �[32m✓ �[39mshould be able to do writes dependent on reads
      �[32m✓ �[39mshould not block reads when localDocument inserted
    multiple pipelines to same destination
      �[32m✓ �[39mshould not deadlock when two pipelines write to the same destination and both handlers read from it
    .remove()
      �[32m✓ �[39mshould properly clean up checkpoint when remove() is called while pipeline is processing

  orm.test.js
    statics
      create
        positive
          �[32m✓ �[39mcreate a collection with static-methods
        negative
          �[32m✓ �[39mcrash when name not allowed (startsWith(_))
          �[32m✓ �[39mcrash when name not allowed (name reserved)
      run
        �[32m✓ �[39mshould be able to run the method
        �[32m✓ �[39mshould have the right this-context
        �[32m✓ �[39mshould be able to use this.insert()
    instance-methods
      create
        positive
          �[32m✓ �[39mcreate a collection with instance-methods
          �[32m✓ �[39mthis-scope should be bound to document
        negative
          �[32m✓ �[39mcrash when name not allowed (startsWith(_))
          �[32m✓ �[39mcrash when name not allowed (name reserved)
          �[32m✓ �[39mcrash when name not allowed (name is top-level field in schema)
      run
        �[32m✓ �[39mshould be able to run the method
        �[32m✓ �[39mshould have the right this-context
        �[32m✓ �[39mshould not be confused with many collections
    ISSUES
      �[32m✓ �[39mattachment ORM method with built-in name should throw
      �[32m✓ �[39mORM method with populate-getter suffix should throw COL18
      �[32m✓ �[39m#791 Document methods are not bind() to the document

  replication-protocol.test.ts (implementation: dexie)
    helpers
      checkpoint
        �[32m✓ �[39m#3627 should not write a duplicate checkpoint
    down
      �[32m✓ �[39mit should write the initial data and also the ongoing insert
    up
      �[32m✓ �[39mit should write the initial data and also the ongoing insert
      �[32m✓ �[39mshould replicate the insert and the update and the delete
    different configurations
      �[32m✓ �[39mshould be able to replicate A->Master<-B
      �[32m✓ �[39mshould be able to replicate A->B->C->Master
      �[32m✓ �[39mon multi instance it should be able to mount on top of the same storage config with a different instance
      �[32m✓ �[39mshould respect the given local start checkpoint
    conflict handling
      �[32m✓ �[39mboth have inserted the exact same document -> no conflict handler must be called
      �[32m✓ �[39mboth have inserted the same document with different properties
      �[32m✓ �[39mboth have updated the document with different values
      �[32m✓ �[39mdoing many writes on the fork should not lead to many writes on the master
    attachment replication
      �[32m✓ �[39mpush-only: should replicate the attachments to master
      �[32m✓ �[39mpull-only: should replicate the attachments to fork
    stability
      �[32m✓ �[39mBUG: writes to both sides can make it not end up with the correct state
      �[32m✓ �[39mdo many writes while replication is running (0)
      �[32m✓ �[39mdo many writes while replication is running (1)
      �[32m✓ �[39mdo many writes while replication is running (2)
      �[32m✓ �[39mdo many writes while replication is running (3)
      �[32m✓ �[39mdo many writes while replication is running (4)
    issues
      �[32m✓ �[39mshould be able to replicate local documents
      �[32m✓ �[39mshould not stuck when replicating many document in the initial replication

  replication.test.ts
    init
      �[32m✓ �[39mcreate storage
    non-live replication
      �[32m✓ �[39mshould replicate both sides
      �[32m✓ �[39mshould allow asynchronous push and pull modifiers
      �[32m✓ �[39mshould skip the document when the push-modifier returns null
      �[32m✓ �[39msent$ must not emit null when the push-modifier returns null
      �[31m✗ �[39m�[31msent$ must emit documents in WithDeleted format with _deleted field when deletedField is custom�[39m
	AssertionError [ERR_ASSERTION]: sent$ doc at index 0 must have _deleted: boolean, got {"id":"alive","name":"ImqbWO9öLMc","age":57,"updatedAt":1776604684763,"is_deleted":false}
	    at eval (webpack://rxdb/./test_tmp/unit/replication.test.js?:313:45)
	    at Array.forEach (<anonymous>)
	    at Context.eval (webpack://rxdb/./test_tmp/unit/replication.test.js?:312:16)


Chrome Headless 147.0.0.0 (Linux 0.0.0): Executed 935 of 1281�[31m (1 FAILED)�[39m (26.079 secs / 25.664 secs)
�[31mTOTAL: 1 FAILED, 934 SUCCESS�[39m


�[31m1) sent$ must emit documents in WithDeleted format with _deleted field when deletedField is custom
�[39m�[31m     replication.test.ts non-live replication
�[39m�[90m     AssertionError [ERR_ASSERTION]: sent$ doc at index 0 must have _deleted: boolean, got {"id":"alive","name":"ImqbWO9öLMc","age":57,"updatedAt":1776604684763,"is_deleted":false}
    at eval (webpack://rxdb/./test_tmp/unit/replication.test.js?:313:45)
    at Array.forEach (<anonymous>)
    at Context.eval (webpack://rxdb/./test_tmp/unit/replication.test.js?:312:16)
�[39m



View full workflow run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants