From 5d7e3dd7431b059d71b9a8b4483106198b8ec03b Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Thu, 23 Apr 2026 12:04:54 +0800 Subject: [PATCH] feat: add RecurringCollector data source and Offer entity Adds a second data source for RecurringCollector alongside the existing SubgraphService data source, with a handleOfferStored mapping that writes both an immutable OfferStored log entity and an immutable first-wins Offer entity keyed by agreementId. handleOfferStored early-returns if an Offer for the same id already exists, guarding dipper crash-recovery resubmissions and chain reorg re-emissions from halting the subgraph on an immutable rewrite. The Offer entity lets dipper verify via subgraph query that a stored RCA offer exists on-chain before accepting a DIPs proposal with an empty signature. For a given agreementId the RCA identifying fields (payer, dataService, serviceProvider, deadline, nonce) are fixed by the id derivation, so any duplicate OfferStored event for the same id carries the same offerHash by construction -- the entity is modelled as write-only and first-wins to reflect that. Alongside the new data source, restores the immutable IndexingAgreementAccepted / Canceled / Updated event-log entities that PR #5 removed when it introduced the aggregated stateful IndexingAgreement entity. Dipper's chain_listener replays those transitions since-block into its local DB; dropping the logs broke event-sourcing consumers. Both shapes now coexist so dashboards can query current state while streaming consumers keep their replay stream. Config files split the contract address into subgraphServiceAddress and recurringCollectorAddress. The local-network deploy pipeline pulls the RecurringCollector address from horizon.json alongside the existing SubgraphService substitution. Includes matchstick coverage for the RecurringCollector handlers (handleAgreementAccepted, handleAgreementCanceled with both canceledBy variants and early-return, handleAgreementUpdated with early-return and field updates, handleRCACollected with early-return and accumulation) and handleOfferStored (first-wins behaviour and duplicate-event guard). Co-Authored-By: Claude Opus 4.7 (1M context) --- abis/RecurringCollector.json | 11 + config/hardhat.json | 2 +- schema.graphql | 73 +++++ src/recurringCollector.ts | 36 ++- src/subgraphService.ts | 49 +++- subgraph.template.yaml | 9 + tests/recurringCollector.test.ts | 460 +++++++++++++++++++++++++++++++ tests/subgraphService.test.ts | 53 ++++ 8 files changed, 690 insertions(+), 3 deletions(-) create mode 100644 tests/recurringCollector.test.ts diff --git a/abis/RecurringCollector.json b/abis/RecurringCollector.json index 702cba3..7ba7df2 100644 --- a/abis/RecurringCollector.json +++ b/abis/RecurringCollector.json @@ -59,5 +59,16 @@ ], "name": "RCACollected", "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { "indexed": true, "name": "agreementId", "type": "bytes16" }, + { "indexed": true, "name": "payer", "type": "address" }, + { "indexed": true, "name": "offerType", "type": "uint8" }, + { "indexed": false, "name": "offerHash", "type": "bytes32" } + ], + "name": "OfferStored", + "type": "event" } ] diff --git a/config/hardhat.json b/config/hardhat.json index ece2ea3..afa3f34 100644 --- a/config/hardhat.json +++ b/config/hardhat.json @@ -1,6 +1,6 @@ { "network": "hardhat", - "subgraphServiceAddress": "0x0000000000000000000000000000000000000000", + "subgraphServiceAddress": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9", "recurringCollectorAddress": "0x0000000000000000000000000000000000000000", "startBlock": 0 } diff --git a/schema.graphql b/schema.graphql index 2e88a21..30213e0 100644 --- a/schema.graphql +++ b/schema.graphql @@ -71,3 +71,76 @@ type IndexerDeploymentLatest @entity(immutable: false) { blockNumber: BigInt! blockTimestamp: BigInt! } + +# Immutable transition logs. The aggregated IndexingAgreement entity above +# serves current-state queries; these append-only logs serve event-sourcing +# consumers (e.g. dipper's chain_listener polls them since-block to replay +# state transitions into its local DB). Preserving both views lets snapshot +# and streaming consumers share one subgraph. +type IndexingAgreementAccepted @entity(immutable: true) { + id: ID! + indexer: Bytes! + payer: Bytes! + agreementId: Bytes! + allocationId: Bytes! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +type IndexingAgreementCanceled @entity(immutable: true) { + id: ID! + indexer: Bytes! + payer: Bytes! + agreementId: Bytes! + "Address that initiated the cancel (from SubgraphService canceledOnBehalfOf param)" + canceledBy: Bytes! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +type IndexingAgreementUpdated @entity(immutable: true) { + id: ID! + indexer: Bytes! + payer: Bytes! + agreementId: Bytes! + allocationId: Bytes! + version: Int! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +# Immutable log of every OfferStored event emitted by RecurringCollector. +# OFFER_TYPE_NEW = 0, OFFER_TYPE_UPDATE = 1. +type OfferStored @entity(immutable: true) { + id: ID! + agreementId: Bytes! + payer: Bytes! + offerType: Int! + offerHash: Bytes! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +# First stored offer per agreementId, keyed by bytes16 agreement ID. +# Dipper queries this entity as an idempotency gate -- avoids re-submitting +# an offer after a crashed-mid-flight restart where the on-chain tx landed +# but dipper lost track of it. +# +# Declared immutable because, for a given agreementId, the RCA identifying +# fields (payer, dataService, serviceProvider, deadline, nonce) are fixed by +# the id derivation, so any duplicate OfferStored event for the same id would +# carry the same offerHash. The handler enforces this by returning early on +# the second event instead of attempting to overwrite. +type Offer @entity(immutable: true) { + id: Bytes! + payer: Bytes! + offerType: Int! + offerHash: Bytes! + createdAtBlock: BigInt! + createdAtTimestamp: BigInt! + createdAtTx: Bytes! +} diff --git a/src/recurringCollector.ts b/src/recurringCollector.ts index 32348d2..f291448 100644 --- a/src/recurringCollector.ts +++ b/src/recurringCollector.ts @@ -1,9 +1,10 @@ -import { IndexingAgreement } from '../generated/schema' +import { IndexingAgreement, Offer, OfferStored } from '../generated/schema' import { AgreementAccepted, AgreementCanceled, AgreementUpdated, RCACollected, + OfferStored as OfferStoredEvent, } from '../generated/RecurringCollector/RecurringCollector' import { createOrLoadIndexingAgreement, BIGINT_ZERO } from './helpers' @@ -61,3 +62,36 @@ export function handleRCACollected(event: RCACollected): void { agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens) agreement.save() } + +export function handleOfferStored(event: OfferStoredEvent): void { + // Immutable event log + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new OfferStored(logId) + log.agreementId = event.params.agreementId + log.payer = event.params.payer + log.offerType = event.params.offerType + log.offerHash = event.params.offerHash + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() + + // First-offer entity keyed by agreementId (bytes16). Immutable: if an + // entity already exists, a duplicate OfferStored event for the same + // agreement id (e.g. dipper crashed and re-submitted, or a chain reorg + // re-emitted) carries the same offerHash by construction and we return + // early. Writing to an immutable entity a second time is a graph-node + // error that would halt the subgraph, so the guard is load-bearing. + let existing = Offer.load(event.params.agreementId) + if (existing != null) { + return + } + let offer = new Offer(event.params.agreementId) + offer.payer = event.params.payer + offer.offerType = event.params.offerType + offer.offerHash = event.params.offerHash + offer.createdAtBlock = event.block.number + offer.createdAtTimestamp = event.block.timestamp + offer.createdAtTx = event.transaction.hash + offer.save() +} diff --git a/src/subgraphService.ts b/src/subgraphService.ts index b1a00b9..ba3fc83 100644 --- a/src/subgraphService.ts +++ b/src/subgraphService.ts @@ -1,10 +1,17 @@ import { ethereum } from '@graphprotocol/graph-ts' import { IndexingAgreementAccepted as AcceptedEvent, + IndexingAgreementCanceled as CanceledEvent, IndexingAgreementUpdated as UpdatedEvent, IndexingFeesCollectedV1 as FeesCollectedEvent, } from '../generated/SubgraphService/SubgraphService' -import { IndexerDeploymentLatest, IndexingFeeCollection } from '../generated/schema' +import { + IndexerDeploymentLatest, + IndexingFeeCollection, + IndexingAgreementAccepted, + IndexingAgreementCanceled, + IndexingAgreementUpdated, +} from '../generated/schema' import { createOrLoadIndexingAgreement } from './helpers' export function handleIndexingAgreementAccepted(event: AcceptedEvent): void { @@ -20,6 +27,34 @@ export function handleIndexingAgreementAccepted(event: AcceptedEvent): void { } agreement.save() + + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new IndexingAgreementAccepted(logId) + log.indexer = event.params.indexer + log.payer = event.params.payer + log.agreementId = event.params.agreementId + log.allocationId = event.params.allocationId + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() +} + +export function handleIndexingAgreementCanceled(event: CanceledEvent): void { + // State and canceledAt are set by RecurringCollector.handleAgreementCanceled, + // which emits the canonical cancel event with the canceledBy enum. This + // handler just writes the immutable transition log for event-sourcing + // consumers (dipper's chain_listener). + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new IndexingAgreementCanceled(logId) + log.indexer = event.params.indexer + log.payer = event.params.payer + log.agreementId = event.params.agreementId + log.canceledBy = event.params.canceledOnBehalfOf + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() } export function handleIndexingAgreementUpdated(event: UpdatedEvent): void { @@ -34,6 +69,18 @@ export function handleIndexingAgreementUpdated(event: UpdatedEvent): void { } agreement.save() + + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new IndexingAgreementUpdated(logId) + log.indexer = event.params.indexer + log.payer = event.params.payer + log.agreementId = event.params.agreementId + log.allocationId = event.params.allocationId + log.version = event.params.version + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() } export function handleIndexingFeesCollectedV1(event: FeesCollectedEvent): void { diff --git a/subgraph.template.yaml b/subgraph.template.yaml index 1d7f146..73cad1e 100644 --- a/subgraph.template.yaml +++ b/subgraph.template.yaml @@ -19,12 +19,17 @@ dataSources: - IndexingAgreement - IndexingFeeCollection - IndexerDeploymentLatest + - IndexingAgreementAccepted + - IndexingAgreementCanceled + - IndexingAgreementUpdated abis: - name: SubgraphService file: ./abis/SubgraphService.json eventHandlers: - event: IndexingAgreementAccepted(indexed address,indexed address,indexed bytes16,address,bytes32,uint8,bytes) handler: handleIndexingAgreementAccepted + - event: IndexingAgreementCanceled(indexed address,indexed address,indexed bytes16,address) + handler: handleIndexingAgreementCanceled - event: IndexingAgreementUpdated(indexed address,indexed address,indexed bytes16,address,uint8,bytes) handler: handleIndexingAgreementUpdated - event: IndexingFeesCollectedV1(indexed address,indexed address,indexed bytes16,address,bytes32,uint256,uint256,uint256,bytes32,uint256,bytes) @@ -43,6 +48,8 @@ dataSources: language: wasm/assemblyscript entities: - IndexingAgreement + - OfferStored + - Offer abis: - name: RecurringCollector file: ./abis/RecurringCollector.json @@ -59,4 +66,6 @@ dataSources: - event: RCACollected(indexed address,indexed address,indexed address,bytes16,bytes32,uint256,uint256) handler: handleRCACollected topic1: ["{{subgraphServiceAddress}}"] + - event: OfferStored(indexed bytes16,indexed address,indexed uint8,bytes32) + handler: handleOfferStored file: ./src/recurringCollector.ts diff --git a/tests/recurringCollector.test.ts b/tests/recurringCollector.test.ts new file mode 100644 index 0000000..8594380 --- /dev/null +++ b/tests/recurringCollector.test.ts @@ -0,0 +1,460 @@ +import { assert, describe, test, clearStore, afterEach, newMockEvent } from 'matchstick-as' +import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts' +import { + handleAgreementAccepted, + handleAgreementCanceled, + handleAgreementUpdated, + handleRCACollected, + handleOfferStored, +} from '../src/recurringCollector' +import { + AgreementAccepted as AcceptedEvent, + AgreementCanceled as CanceledEvent, + AgreementUpdated as UpdatedEvent, + RCACollected as RCACollectedEvent, + OfferStored as OfferStoredEvent, +} from '../generated/RecurringCollector/RecurringCollector' + +const DATA_SERVICE = Address.fromString('0x000000000000000000000000000000000000000a') +const PAYER = Address.fromString('0x0000000000000000000000000000000000000002') +const SERVICE_PROVIDER = Address.fromString('0x0000000000000000000000000000000000000001') +const AGREEMENT_ID = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10') + +function createAcceptedEvent( + agreementId: Bytes, + acceptedAt: BigInt, + endsAt: BigInt, + maxInitialTokens: BigInt, + maxOngoingTokensPerSecond: BigInt, + minSecondsPerCollection: i32, + maxSecondsPerCollection: i32, +): AcceptedEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('acceptedAt', ethereum.Value.fromUnsignedBigInt(acceptedAt)), + ) + event.parameters.push( + new ethereum.EventParam('endsAt', ethereum.Value.fromUnsignedBigInt(endsAt)), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxInitialTokens', + ethereum.Value.fromUnsignedBigInt(maxInitialTokens), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxOngoingTokensPerSecond', + ethereum.Value.fromUnsignedBigInt(maxOngoingTokensPerSecond), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'minSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(minSecondsPerCollection)), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(maxSecondsPerCollection)), + ), + ) + + return event +} + +function createCanceledEvent( + agreementId: Bytes, + canceledAt: BigInt, + canceledBy: i32, +): CanceledEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('canceledAt', ethereum.Value.fromUnsignedBigInt(canceledAt)), + ) + event.parameters.push( + new ethereum.EventParam( + 'canceledBy', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(canceledBy)), + ), + ) + + return event +} + +function createUpdatedEvent( + agreementId: Bytes, + updatedAt: BigInt, + endsAt: BigInt, + maxInitialTokens: BigInt, + maxOngoingTokensPerSecond: BigInt, + minSecondsPerCollection: i32, + maxSecondsPerCollection: i32, +): UpdatedEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('updatedAt', ethereum.Value.fromUnsignedBigInt(updatedAt)), + ) + event.parameters.push( + new ethereum.EventParam('endsAt', ethereum.Value.fromUnsignedBigInt(endsAt)), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxInitialTokens', + ethereum.Value.fromUnsignedBigInt(maxInitialTokens), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxOngoingTokensPerSecond', + ethereum.Value.fromUnsignedBigInt(maxOngoingTokensPerSecond), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'minSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(minSecondsPerCollection)), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(maxSecondsPerCollection)), + ), + ) + + return event +} + +function createRCACollectedEvent( + agreementId: Bytes, + collectionId: Bytes, + tokens: BigInt, + blockTimestamp: BigInt, +): RCACollectedEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('collectionId', ethereum.Value.fromFixedBytes(collectionId)), + ) + event.parameters.push( + new ethereum.EventParam('tokens', ethereum.Value.fromUnsignedBigInt(tokens)), + ) + event.parameters.push( + new ethereum.EventParam('dataServiceCut', ethereum.Value.fromUnsignedBigInt(BigInt.zero())), + ) + + event.block.timestamp = blockTimestamp + + return event +} + +function createOfferStoredEvent( + agreementId: Bytes, + offerType: i32, + offerHash: Bytes, +): OfferStoredEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam( + 'offerType', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(offerType)), + ), + ) + event.parameters.push( + new ethereum.EventParam('offerHash', ethereum.Value.fromFixedBytes(offerHash)), + ) + + return event +} + +describe('handleAgreementAccepted', () => { + afterEach(() => { + clearStore() + }) + + test('creates IndexingAgreement with Accepted state and all RC-specific fields', () => { + let event = createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ) + handleAgreementAccepted(event) + + let id = AGREEMENT_ID.toHexString() + assert.entityCount('IndexingAgreement', 1) + assert.fieldEquals('IndexingAgreement', id, 'payer', PAYER.toHexString()) + assert.fieldEquals('IndexingAgreement', id, 'indexer', SERVICE_PROVIDER.toHexString()) + assert.fieldEquals('IndexingAgreement', id, 'state', 'Accepted') + assert.fieldEquals('IndexingAgreement', id, 'acceptedAt', '1000') + assert.fieldEquals('IndexingAgreement', id, 'lastCollectionAt', '1000') + assert.fieldEquals('IndexingAgreement', id, 'endsAt', '2000') + assert.fieldEquals('IndexingAgreement', id, 'maxInitialTokens', '500') + assert.fieldEquals('IndexingAgreement', id, 'maxOngoingTokensPerSecond', '10') + assert.fieldEquals('IndexingAgreement', id, 'minSecondsPerCollection', '60') + assert.fieldEquals('IndexingAgreement', id, 'maxSecondsPerCollection', '3600') + assert.fieldEquals('IndexingAgreement', id, 'canceledAt', '0') + assert.fieldEquals('IndexingAgreement', id, 'tokensCollected', '0') + }) +}) + +describe('handleAgreementCanceled', () => { + afterEach(() => { + clearStore() + }) + + test('early returns when agreement does not exist', () => { + let event = createCanceledEvent(AGREEMENT_ID, BigInt.fromI32(1500), 0) + handleAgreementCanceled(event) + + assert.entityCount('IndexingAgreement', 0) + }) + + test('canceledBy=0 sets state to CanceledByServiceProvider', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + handleAgreementCanceled(createCanceledEvent(AGREEMENT_ID, BigInt.fromI32(1500), 0)) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'state', 'CanceledByServiceProvider') + assert.fieldEquals('IndexingAgreement', id, 'canceledAt', '1500') + }) + + test('canceledBy=1 sets state to CanceledByPayer', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + handleAgreementCanceled(createCanceledEvent(AGREEMENT_ID, BigInt.fromI32(1500), 1)) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'state', 'CanceledByPayer') + assert.fieldEquals('IndexingAgreement', id, 'canceledAt', '1500') + }) +}) + +describe('handleAgreementUpdated', () => { + afterEach(() => { + clearStore() + }) + + test('early returns when agreement does not exist', () => { + let event = createUpdatedEvent( + AGREEMENT_ID, + BigInt.fromI32(1500), + BigInt.fromI32(3000), + BigInt.fromI32(800), + BigInt.fromI32(20), + 120, + 7200, + ) + handleAgreementUpdated(event) + + assert.entityCount('IndexingAgreement', 0) + }) + + test('updates lastUpdatedAt, endsAt, and terms on existing agreement', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + handleAgreementUpdated( + createUpdatedEvent( + AGREEMENT_ID, + BigInt.fromI32(1500), + BigInt.fromI32(3000), + BigInt.fromI32(800), + BigInt.fromI32(20), + 120, + 7200, + ), + ) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'lastUpdatedAt', '1500') + assert.fieldEquals('IndexingAgreement', id, 'endsAt', '3000') + assert.fieldEquals('IndexingAgreement', id, 'maxInitialTokens', '800') + assert.fieldEquals('IndexingAgreement', id, 'maxOngoingTokensPerSecond', '20') + assert.fieldEquals('IndexingAgreement', id, 'minSecondsPerCollection', '120') + assert.fieldEquals('IndexingAgreement', id, 'maxSecondsPerCollection', '7200') + }) +}) + +describe('handleRCACollected', () => { + afterEach(() => { + clearStore() + }) + + test('early returns when agreement does not exist', () => { + let collectionId = Bytes.fromHexString('0x' + 'dd'.repeat(32)) + let event = createRCACollectedEvent( + AGREEMENT_ID, + collectionId, + BigInt.fromI32(100), + BigInt.fromI32(1100), + ) + handleRCACollected(event) + + assert.entityCount('IndexingAgreement', 0) + }) + + test('accumulates tokensCollected and updates lastCollectionAt', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + let collectionId = Bytes.fromHexString('0x' + 'dd'.repeat(32)) + + handleRCACollected( + createRCACollectedEvent( + AGREEMENT_ID, + collectionId, + BigInt.fromI32(100), + BigInt.fromI32(1100), + ), + ) + handleRCACollected( + createRCACollectedEvent( + AGREEMENT_ID, + collectionId, + BigInt.fromI32(250), + BigInt.fromI32(1200), + ), + ) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'tokensCollected', '350') + assert.fieldEquals('IndexingAgreement', id, 'lastCollectionAt', '1200') + }) +}) + +describe('handleOfferStored', () => { + afterEach(() => { + clearStore() + }) + + test('first event creates OfferStored log and Offer entity', () => { + let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + let event = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + handleOfferStored(event) + + assert.entityCount('OfferStored', 1) + assert.entityCount('Offer', 1) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('Offer', id, 'payer', PAYER.toHexString()) + assert.fieldEquals('Offer', id, 'offerType', '0') + assert.fieldEquals('Offer', id, 'offerHash', offerHash.toHexString()) + }) + + test('duplicate event appends log but Offer count stays at 1 (idempotency guard)', () => { + let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + + // First event + let event1 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + handleOfferStored(event1) + + // Second event for same agreementId — must not halt on immutable re-write. + // Must differ in txHash or logIndex so the OfferStored log id differs. + let event2 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + event2.transaction.hash = Bytes.fromHexString( + '0x1111111111111111111111111111111111111111111111111111111111111111', + ) as Bytes + handleOfferStored(event2) + + assert.entityCount('OfferStored', 2) + assert.entityCount('Offer', 1) + }) +}) diff --git a/tests/subgraphService.test.ts b/tests/subgraphService.test.ts index d43b67f..8285d84 100644 --- a/tests/subgraphService.test.ts +++ b/tests/subgraphService.test.ts @@ -2,11 +2,13 @@ import { assert, describe, test, clearStore, afterEach } from 'matchstick-as' import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts' import { handleIndexingAgreementAccepted, + handleIndexingAgreementCanceled, handleIndexingAgreementUpdated, handleIndexingFeesCollectedV1, } from '../src/subgraphService' import { IndexingAgreementAccepted as AcceptedEvent, + IndexingAgreementCanceled as CanceledEvent, IndexingAgreementUpdated as UpdatedEvent, IndexingFeesCollectedV1 as FeesCollectedEvent, } from '../generated/SubgraphService/SubgraphService' @@ -46,6 +48,30 @@ function createAcceptedEvent( return event } +function createCanceledEvent( + indexer: Address, + payer: Address, + agreementId: Bytes, + canceledOnBehalfOf: Address, +): CanceledEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push(new ethereum.EventParam('indexer', ethereum.Value.fromAddress(indexer))) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(payer))) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam( + 'canceledOnBehalfOf', + ethereum.Value.fromAddress(canceledOnBehalfOf), + ), + ) + + return event +} + function createUpdatedEvent( indexer: Address, payer: Address, @@ -177,6 +203,29 @@ describe('handleIndexingAgreementAccepted', () => { ) // State remains NotAccepted until RC handler fires assert.fieldEquals('IndexingAgreement', agreementId.toHexString(), 'state', 'NotAccepted') + + // Immutable transition log emitted for event-sourcing consumers + assert.entityCount('IndexingAgreementAccepted', 1) + }) +}) + +describe('handleIndexingAgreementCanceled', () => { + afterEach(() => { + clearStore() + }) + + test('emits immutable IndexingAgreementCanceled log with canceledBy address', () => { + let indexer = Address.fromString('0x0000000000000000000000000000000000000001') + let payer = Address.fromString('0x0000000000000000000000000000000000000002') + let agreementId = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10') + let canceledOnBehalfOf = Address.fromString('0x0000000000000000000000000000000000000004') + + let event = createCanceledEvent(indexer, payer, agreementId, canceledOnBehalfOf) + handleIndexingAgreementCanceled(event) + + assert.entityCount('IndexingAgreementCanceled', 1) + // Aggregated entity state is owned by the RC handler — SS handler must not create one + assert.entityCount('IndexingAgreement', 0) }) }) @@ -232,6 +281,10 @@ describe('handleIndexingAgreementUpdated', () => { 'tokensPerEntityPerSecond', '100', ) + + // One log per event: the accept and the update each emit their own immutable log + assert.entityCount('IndexingAgreementAccepted', 1) + assert.entityCount('IndexingAgreementUpdated', 1) }) })