diff --git a/abis/RecurringCollector.json b/abis/RecurringCollector.json index 702cba3..7ba7df2 100644 --- a/abis/RecurringCollector.json +++ b/abis/RecurringCollector.json @@ -59,5 +59,16 @@ ], "name": "RCACollected", "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { "indexed": true, "name": "agreementId", "type": "bytes16" }, + { "indexed": true, "name": "payer", "type": "address" }, + { "indexed": true, "name": "offerType", "type": "uint8" }, + { "indexed": false, "name": "offerHash", "type": "bytes32" } + ], + "name": "OfferStored", + "type": "event" } ] diff --git a/config/hardhat.json b/config/hardhat.json index ece2ea3..afa3f34 100644 --- a/config/hardhat.json +++ b/config/hardhat.json @@ -1,6 +1,6 @@ { "network": "hardhat", - "subgraphServiceAddress": "0x0000000000000000000000000000000000000000", + "subgraphServiceAddress": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9", "recurringCollectorAddress": "0x0000000000000000000000000000000000000000", "startBlock": 0 } diff --git a/schema.graphql b/schema.graphql index 2e88a21..30213e0 100644 --- a/schema.graphql +++ b/schema.graphql @@ -71,3 +71,76 @@ type IndexerDeploymentLatest @entity(immutable: false) { blockNumber: BigInt! blockTimestamp: BigInt! } + +# Immutable transition logs. The aggregated IndexingAgreement entity above +# serves current-state queries; these append-only logs serve event-sourcing +# consumers (e.g. dipper's chain_listener polls them since-block to replay +# state transitions into its local DB). Preserving both views lets snapshot +# and streaming consumers share one subgraph. +type IndexingAgreementAccepted @entity(immutable: true) { + id: ID! + indexer: Bytes! + payer: Bytes! + agreementId: Bytes! + allocationId: Bytes! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +type IndexingAgreementCanceled @entity(immutable: true) { + id: ID! + indexer: Bytes! + payer: Bytes! + agreementId: Bytes! + "Address that initiated the cancel (from SubgraphService canceledOnBehalfOf param)" + canceledBy: Bytes! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +type IndexingAgreementUpdated @entity(immutable: true) { + id: ID! + indexer: Bytes! + payer: Bytes! + agreementId: Bytes! + allocationId: Bytes! + version: Int! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +# Immutable log of every OfferStored event emitted by RecurringCollector. +# OFFER_TYPE_NEW = 0, OFFER_TYPE_UPDATE = 1. +type OfferStored @entity(immutable: true) { + id: ID! + agreementId: Bytes! + payer: Bytes! + offerType: Int! + offerHash: Bytes! + blockNumber: BigInt! + blockTimestamp: BigInt! + transactionHash: Bytes! +} + +# First stored offer per agreementId, keyed by bytes16 agreement ID. +# Dipper queries this entity as an idempotency gate -- avoids re-submitting +# an offer after a crashed-mid-flight restart where the on-chain tx landed +# but dipper lost track of it. +# +# Declared immutable because, for a given agreementId, the RCA identifying +# fields (payer, dataService, serviceProvider, deadline, nonce) are fixed by +# the id derivation, so any duplicate OfferStored event for the same id would +# carry the same offerHash. The handler enforces this by returning early on +# the second event instead of attempting to overwrite. +type Offer @entity(immutable: true) { + id: Bytes! + payer: Bytes! + offerType: Int! + offerHash: Bytes! + createdAtBlock: BigInt! + createdAtTimestamp: BigInt! + createdAtTx: Bytes! +} diff --git a/src/recurringCollector.ts b/src/recurringCollector.ts index 32348d2..f291448 100644 --- a/src/recurringCollector.ts +++ b/src/recurringCollector.ts @@ -1,9 +1,10 @@ -import { IndexingAgreement } from '../generated/schema' +import { IndexingAgreement, Offer, OfferStored } from '../generated/schema' import { AgreementAccepted, AgreementCanceled, AgreementUpdated, RCACollected, + OfferStored as OfferStoredEvent, } from '../generated/RecurringCollector/RecurringCollector' import { createOrLoadIndexingAgreement, BIGINT_ZERO } from './helpers' @@ -61,3 +62,36 @@ export function handleRCACollected(event: RCACollected): void { agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens) agreement.save() } + +export function handleOfferStored(event: OfferStoredEvent): void { + // Immutable event log + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new OfferStored(logId) + log.agreementId = event.params.agreementId + log.payer = event.params.payer + log.offerType = event.params.offerType + log.offerHash = event.params.offerHash + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() + + // First-offer entity keyed by agreementId (bytes16). Immutable: if an + // entity already exists, a duplicate OfferStored event for the same + // agreement id (e.g. dipper crashed and re-submitted, or a chain reorg + // re-emitted) carries the same offerHash by construction and we return + // early. Writing to an immutable entity a second time is a graph-node + // error that would halt the subgraph, so the guard is load-bearing. + let existing = Offer.load(event.params.agreementId) + if (existing != null) { + return + } + let offer = new Offer(event.params.agreementId) + offer.payer = event.params.payer + offer.offerType = event.params.offerType + offer.offerHash = event.params.offerHash + offer.createdAtBlock = event.block.number + offer.createdAtTimestamp = event.block.timestamp + offer.createdAtTx = event.transaction.hash + offer.save() +} diff --git a/src/subgraphService.ts b/src/subgraphService.ts index b1a00b9..ba3fc83 100644 --- a/src/subgraphService.ts +++ b/src/subgraphService.ts @@ -1,10 +1,17 @@ import { ethereum } from '@graphprotocol/graph-ts' import { IndexingAgreementAccepted as AcceptedEvent, + IndexingAgreementCanceled as CanceledEvent, IndexingAgreementUpdated as UpdatedEvent, IndexingFeesCollectedV1 as FeesCollectedEvent, } from '../generated/SubgraphService/SubgraphService' -import { IndexerDeploymentLatest, IndexingFeeCollection } from '../generated/schema' +import { + IndexerDeploymentLatest, + IndexingFeeCollection, + IndexingAgreementAccepted, + IndexingAgreementCanceled, + IndexingAgreementUpdated, +} from '../generated/schema' import { createOrLoadIndexingAgreement } from './helpers' export function handleIndexingAgreementAccepted(event: AcceptedEvent): void { @@ -20,6 +27,34 @@ export function handleIndexingAgreementAccepted(event: AcceptedEvent): void { } agreement.save() + + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new IndexingAgreementAccepted(logId) + log.indexer = event.params.indexer + log.payer = event.params.payer + log.agreementId = event.params.agreementId + log.allocationId = event.params.allocationId + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() +} + +export function handleIndexingAgreementCanceled(event: CanceledEvent): void { + // State and canceledAt are set by RecurringCollector.handleAgreementCanceled, + // which emits the canonical cancel event with the canceledBy enum. This + // handler just writes the immutable transition log for event-sourcing + // consumers (dipper's chain_listener). + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new IndexingAgreementCanceled(logId) + log.indexer = event.params.indexer + log.payer = event.params.payer + log.agreementId = event.params.agreementId + log.canceledBy = event.params.canceledOnBehalfOf + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() } export function handleIndexingAgreementUpdated(event: UpdatedEvent): void { @@ -34,6 +69,18 @@ export function handleIndexingAgreementUpdated(event: UpdatedEvent): void { } agreement.save() + + let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString() + let log = new IndexingAgreementUpdated(logId) + log.indexer = event.params.indexer + log.payer = event.params.payer + log.agreementId = event.params.agreementId + log.allocationId = event.params.allocationId + log.version = event.params.version + log.blockNumber = event.block.number + log.blockTimestamp = event.block.timestamp + log.transactionHash = event.transaction.hash + log.save() } export function handleIndexingFeesCollectedV1(event: FeesCollectedEvent): void { diff --git a/subgraph.template.yaml b/subgraph.template.yaml index 1d7f146..73cad1e 100644 --- a/subgraph.template.yaml +++ b/subgraph.template.yaml @@ -19,12 +19,17 @@ dataSources: - IndexingAgreement - IndexingFeeCollection - IndexerDeploymentLatest + - IndexingAgreementAccepted + - IndexingAgreementCanceled + - IndexingAgreementUpdated abis: - name: SubgraphService file: ./abis/SubgraphService.json eventHandlers: - event: IndexingAgreementAccepted(indexed address,indexed address,indexed bytes16,address,bytes32,uint8,bytes) handler: handleIndexingAgreementAccepted + - event: IndexingAgreementCanceled(indexed address,indexed address,indexed bytes16,address) + handler: handleIndexingAgreementCanceled - event: IndexingAgreementUpdated(indexed address,indexed address,indexed bytes16,address,uint8,bytes) handler: handleIndexingAgreementUpdated - event: IndexingFeesCollectedV1(indexed address,indexed address,indexed bytes16,address,bytes32,uint256,uint256,uint256,bytes32,uint256,bytes) @@ -43,6 +48,8 @@ dataSources: language: wasm/assemblyscript entities: - IndexingAgreement + - OfferStored + - Offer abis: - name: RecurringCollector file: ./abis/RecurringCollector.json @@ -59,4 +66,6 @@ dataSources: - event: RCACollected(indexed address,indexed address,indexed address,bytes16,bytes32,uint256,uint256) handler: handleRCACollected topic1: ["{{subgraphServiceAddress}}"] + - event: OfferStored(indexed bytes16,indexed address,indexed uint8,bytes32) + handler: handleOfferStored file: ./src/recurringCollector.ts diff --git a/tests/recurringCollector.test.ts b/tests/recurringCollector.test.ts new file mode 100644 index 0000000..8594380 --- /dev/null +++ b/tests/recurringCollector.test.ts @@ -0,0 +1,460 @@ +import { assert, describe, test, clearStore, afterEach, newMockEvent } from 'matchstick-as' +import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts' +import { + handleAgreementAccepted, + handleAgreementCanceled, + handleAgreementUpdated, + handleRCACollected, + handleOfferStored, +} from '../src/recurringCollector' +import { + AgreementAccepted as AcceptedEvent, + AgreementCanceled as CanceledEvent, + AgreementUpdated as UpdatedEvent, + RCACollected as RCACollectedEvent, + OfferStored as OfferStoredEvent, +} from '../generated/RecurringCollector/RecurringCollector' + +const DATA_SERVICE = Address.fromString('0x000000000000000000000000000000000000000a') +const PAYER = Address.fromString('0x0000000000000000000000000000000000000002') +const SERVICE_PROVIDER = Address.fromString('0x0000000000000000000000000000000000000001') +const AGREEMENT_ID = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10') + +function createAcceptedEvent( + agreementId: Bytes, + acceptedAt: BigInt, + endsAt: BigInt, + maxInitialTokens: BigInt, + maxOngoingTokensPerSecond: BigInt, + minSecondsPerCollection: i32, + maxSecondsPerCollection: i32, +): AcceptedEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('acceptedAt', ethereum.Value.fromUnsignedBigInt(acceptedAt)), + ) + event.parameters.push( + new ethereum.EventParam('endsAt', ethereum.Value.fromUnsignedBigInt(endsAt)), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxInitialTokens', + ethereum.Value.fromUnsignedBigInt(maxInitialTokens), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxOngoingTokensPerSecond', + ethereum.Value.fromUnsignedBigInt(maxOngoingTokensPerSecond), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'minSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(minSecondsPerCollection)), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(maxSecondsPerCollection)), + ), + ) + + return event +} + +function createCanceledEvent( + agreementId: Bytes, + canceledAt: BigInt, + canceledBy: i32, +): CanceledEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('canceledAt', ethereum.Value.fromUnsignedBigInt(canceledAt)), + ) + event.parameters.push( + new ethereum.EventParam( + 'canceledBy', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(canceledBy)), + ), + ) + + return event +} + +function createUpdatedEvent( + agreementId: Bytes, + updatedAt: BigInt, + endsAt: BigInt, + maxInitialTokens: BigInt, + maxOngoingTokensPerSecond: BigInt, + minSecondsPerCollection: i32, + maxSecondsPerCollection: i32, +): UpdatedEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('updatedAt', ethereum.Value.fromUnsignedBigInt(updatedAt)), + ) + event.parameters.push( + new ethereum.EventParam('endsAt', ethereum.Value.fromUnsignedBigInt(endsAt)), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxInitialTokens', + ethereum.Value.fromUnsignedBigInt(maxInitialTokens), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxOngoingTokensPerSecond', + ethereum.Value.fromUnsignedBigInt(maxOngoingTokensPerSecond), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'minSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(minSecondsPerCollection)), + ), + ) + event.parameters.push( + new ethereum.EventParam( + 'maxSecondsPerCollection', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(maxSecondsPerCollection)), + ), + ) + + return event +} + +function createRCACollectedEvent( + agreementId: Bytes, + collectionId: Bytes, + tokens: BigInt, + blockTimestamp: BigInt, +): RCACollectedEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('dataService', ethereum.Value.fromAddress(DATA_SERVICE)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam('serviceProvider', ethereum.Value.fromAddress(SERVICE_PROVIDER)), + ) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('collectionId', ethereum.Value.fromFixedBytes(collectionId)), + ) + event.parameters.push( + new ethereum.EventParam('tokens', ethereum.Value.fromUnsignedBigInt(tokens)), + ) + event.parameters.push( + new ethereum.EventParam('dataServiceCut', ethereum.Value.fromUnsignedBigInt(BigInt.zero())), + ) + + event.block.timestamp = blockTimestamp + + return event +} + +function createOfferStoredEvent( + agreementId: Bytes, + offerType: i32, + offerHash: Bytes, +): OfferStoredEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam( + 'offerType', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(offerType)), + ), + ) + event.parameters.push( + new ethereum.EventParam('offerHash', ethereum.Value.fromFixedBytes(offerHash)), + ) + + return event +} + +describe('handleAgreementAccepted', () => { + afterEach(() => { + clearStore() + }) + + test('creates IndexingAgreement with Accepted state and all RC-specific fields', () => { + let event = createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ) + handleAgreementAccepted(event) + + let id = AGREEMENT_ID.toHexString() + assert.entityCount('IndexingAgreement', 1) + assert.fieldEquals('IndexingAgreement', id, 'payer', PAYER.toHexString()) + assert.fieldEquals('IndexingAgreement', id, 'indexer', SERVICE_PROVIDER.toHexString()) + assert.fieldEquals('IndexingAgreement', id, 'state', 'Accepted') + assert.fieldEquals('IndexingAgreement', id, 'acceptedAt', '1000') + assert.fieldEquals('IndexingAgreement', id, 'lastCollectionAt', '1000') + assert.fieldEquals('IndexingAgreement', id, 'endsAt', '2000') + assert.fieldEquals('IndexingAgreement', id, 'maxInitialTokens', '500') + assert.fieldEquals('IndexingAgreement', id, 'maxOngoingTokensPerSecond', '10') + assert.fieldEquals('IndexingAgreement', id, 'minSecondsPerCollection', '60') + assert.fieldEquals('IndexingAgreement', id, 'maxSecondsPerCollection', '3600') + assert.fieldEquals('IndexingAgreement', id, 'canceledAt', '0') + assert.fieldEquals('IndexingAgreement', id, 'tokensCollected', '0') + }) +}) + +describe('handleAgreementCanceled', () => { + afterEach(() => { + clearStore() + }) + + test('early returns when agreement does not exist', () => { + let event = createCanceledEvent(AGREEMENT_ID, BigInt.fromI32(1500), 0) + handleAgreementCanceled(event) + + assert.entityCount('IndexingAgreement', 0) + }) + + test('canceledBy=0 sets state to CanceledByServiceProvider', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + handleAgreementCanceled(createCanceledEvent(AGREEMENT_ID, BigInt.fromI32(1500), 0)) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'state', 'CanceledByServiceProvider') + assert.fieldEquals('IndexingAgreement', id, 'canceledAt', '1500') + }) + + test('canceledBy=1 sets state to CanceledByPayer', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + handleAgreementCanceled(createCanceledEvent(AGREEMENT_ID, BigInt.fromI32(1500), 1)) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'state', 'CanceledByPayer') + assert.fieldEquals('IndexingAgreement', id, 'canceledAt', '1500') + }) +}) + +describe('handleAgreementUpdated', () => { + afterEach(() => { + clearStore() + }) + + test('early returns when agreement does not exist', () => { + let event = createUpdatedEvent( + AGREEMENT_ID, + BigInt.fromI32(1500), + BigInt.fromI32(3000), + BigInt.fromI32(800), + BigInt.fromI32(20), + 120, + 7200, + ) + handleAgreementUpdated(event) + + assert.entityCount('IndexingAgreement', 0) + }) + + test('updates lastUpdatedAt, endsAt, and terms on existing agreement', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + handleAgreementUpdated( + createUpdatedEvent( + AGREEMENT_ID, + BigInt.fromI32(1500), + BigInt.fromI32(3000), + BigInt.fromI32(800), + BigInt.fromI32(20), + 120, + 7200, + ), + ) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'lastUpdatedAt', '1500') + assert.fieldEquals('IndexingAgreement', id, 'endsAt', '3000') + assert.fieldEquals('IndexingAgreement', id, 'maxInitialTokens', '800') + assert.fieldEquals('IndexingAgreement', id, 'maxOngoingTokensPerSecond', '20') + assert.fieldEquals('IndexingAgreement', id, 'minSecondsPerCollection', '120') + assert.fieldEquals('IndexingAgreement', id, 'maxSecondsPerCollection', '7200') + }) +}) + +describe('handleRCACollected', () => { + afterEach(() => { + clearStore() + }) + + test('early returns when agreement does not exist', () => { + let collectionId = Bytes.fromHexString('0x' + 'dd'.repeat(32)) + let event = createRCACollectedEvent( + AGREEMENT_ID, + collectionId, + BigInt.fromI32(100), + BigInt.fromI32(1100), + ) + handleRCACollected(event) + + assert.entityCount('IndexingAgreement', 0) + }) + + test('accumulates tokensCollected and updates lastCollectionAt', () => { + handleAgreementAccepted( + createAcceptedEvent( + AGREEMENT_ID, + BigInt.fromI32(1000), + BigInt.fromI32(2000), + BigInt.fromI32(500), + BigInt.fromI32(10), + 60, + 3600, + ), + ) + + let collectionId = Bytes.fromHexString('0x' + 'dd'.repeat(32)) + + handleRCACollected( + createRCACollectedEvent( + AGREEMENT_ID, + collectionId, + BigInt.fromI32(100), + BigInt.fromI32(1100), + ), + ) + handleRCACollected( + createRCACollectedEvent( + AGREEMENT_ID, + collectionId, + BigInt.fromI32(250), + BigInt.fromI32(1200), + ), + ) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('IndexingAgreement', id, 'tokensCollected', '350') + assert.fieldEquals('IndexingAgreement', id, 'lastCollectionAt', '1200') + }) +}) + +describe('handleOfferStored', () => { + afterEach(() => { + clearStore() + }) + + test('first event creates OfferStored log and Offer entity', () => { + let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + let event = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + handleOfferStored(event) + + assert.entityCount('OfferStored', 1) + assert.entityCount('Offer', 1) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('Offer', id, 'payer', PAYER.toHexString()) + assert.fieldEquals('Offer', id, 'offerType', '0') + assert.fieldEquals('Offer', id, 'offerHash', offerHash.toHexString()) + }) + + test('duplicate event appends log but Offer count stays at 1 (idempotency guard)', () => { + let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + + // First event + let event1 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + handleOfferStored(event1) + + // Second event for same agreementId — must not halt on immutable re-write. + // Must differ in txHash or logIndex so the OfferStored log id differs. + let event2 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + event2.transaction.hash = Bytes.fromHexString( + '0x1111111111111111111111111111111111111111111111111111111111111111', + ) as Bytes + handleOfferStored(event2) + + assert.entityCount('OfferStored', 2) + assert.entityCount('Offer', 1) + }) +}) diff --git a/tests/subgraphService.test.ts b/tests/subgraphService.test.ts index d43b67f..8285d84 100644 --- a/tests/subgraphService.test.ts +++ b/tests/subgraphService.test.ts @@ -2,11 +2,13 @@ import { assert, describe, test, clearStore, afterEach } from 'matchstick-as' import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts' import { handleIndexingAgreementAccepted, + handleIndexingAgreementCanceled, handleIndexingAgreementUpdated, handleIndexingFeesCollectedV1, } from '../src/subgraphService' import { IndexingAgreementAccepted as AcceptedEvent, + IndexingAgreementCanceled as CanceledEvent, IndexingAgreementUpdated as UpdatedEvent, IndexingFeesCollectedV1 as FeesCollectedEvent, } from '../generated/SubgraphService/SubgraphService' @@ -46,6 +48,30 @@ function createAcceptedEvent( return event } +function createCanceledEvent( + indexer: Address, + payer: Address, + agreementId: Bytes, + canceledOnBehalfOf: Address, +): CanceledEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push(new ethereum.EventParam('indexer', ethereum.Value.fromAddress(indexer))) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(payer))) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam( + 'canceledOnBehalfOf', + ethereum.Value.fromAddress(canceledOnBehalfOf), + ), + ) + + return event +} + function createUpdatedEvent( indexer: Address, payer: Address, @@ -177,6 +203,29 @@ describe('handleIndexingAgreementAccepted', () => { ) // State remains NotAccepted until RC handler fires assert.fieldEquals('IndexingAgreement', agreementId.toHexString(), 'state', 'NotAccepted') + + // Immutable transition log emitted for event-sourcing consumers + assert.entityCount('IndexingAgreementAccepted', 1) + }) +}) + +describe('handleIndexingAgreementCanceled', () => { + afterEach(() => { + clearStore() + }) + + test('emits immutable IndexingAgreementCanceled log with canceledBy address', () => { + let indexer = Address.fromString('0x0000000000000000000000000000000000000001') + let payer = Address.fromString('0x0000000000000000000000000000000000000002') + let agreementId = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10') + let canceledOnBehalfOf = Address.fromString('0x0000000000000000000000000000000000000004') + + let event = createCanceledEvent(indexer, payer, agreementId, canceledOnBehalfOf) + handleIndexingAgreementCanceled(event) + + assert.entityCount('IndexingAgreementCanceled', 1) + // Aggregated entity state is owned by the RC handler — SS handler must not create one + assert.entityCount('IndexingAgreement', 0) }) }) @@ -232,6 +281,10 @@ describe('handleIndexingAgreementUpdated', () => { 'tokensPerEntityPerSecond', '100', ) + + // One log per event: the accept and the update each emit their own immutable log + assert.entityCount('IndexingAgreementAccepted', 1) + assert.entityCount('IndexingAgreementUpdated', 1) }) })