Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions abis/RecurringCollector.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,16 @@
],
"name": "RCACollected",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "agreementId", "type": "bytes16" },
{ "indexed": true, "name": "payer", "type": "address" },
{ "indexed": true, "name": "offerType", "type": "uint8" },
{ "indexed": false, "name": "offerHash", "type": "bytes32" }
],
"name": "OfferStored",
"type": "event"
}
]
2 changes: 1 addition & 1 deletion config/hardhat.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"network": "hardhat",
"subgraphServiceAddress": "0x0000000000000000000000000000000000000000",
"subgraphServiceAddress": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9",
"recurringCollectorAddress": "0x0000000000000000000000000000000000000000",
"startBlock": 0
}
73 changes: 73 additions & 0 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,76 @@ type IndexerDeploymentLatest @entity(immutable: false) {
blockNumber: BigInt!
blockTimestamp: BigInt!
}

# Immutable transition logs. The aggregated IndexingAgreement entity above
# serves current-state queries; these append-only logs serve event-sourcing
# consumers (e.g. dipper's chain_listener polls them since-block to replay
# state transitions into its local DB). Preserving both views lets snapshot
# and streaming consumers share one subgraph.
type IndexingAgreementAccepted @entity(immutable: true) {
id: ID!
indexer: Bytes!
payer: Bytes!
agreementId: Bytes!
allocationId: Bytes!
blockNumber: BigInt!
blockTimestamp: BigInt!
transactionHash: Bytes!
}

type IndexingAgreementCanceled @entity(immutable: true) {
id: ID!
indexer: Bytes!
payer: Bytes!
agreementId: Bytes!
"Address that initiated the cancel (from SubgraphService canceledOnBehalfOf param)"
canceledBy: Bytes!
blockNumber: BigInt!
blockTimestamp: BigInt!
transactionHash: Bytes!
}

type IndexingAgreementUpdated @entity(immutable: true) {
id: ID!
indexer: Bytes!
payer: Bytes!
agreementId: Bytes!
allocationId: Bytes!
version: Int!
blockNumber: BigInt!
blockTimestamp: BigInt!
transactionHash: Bytes!
}

# Immutable log of every OfferStored event emitted by RecurringCollector.
# OFFER_TYPE_NEW = 0, OFFER_TYPE_UPDATE = 1.
type OfferStored @entity(immutable: true) {
id: ID!
agreementId: Bytes!
payer: Bytes!
offerType: Int!
offerHash: Bytes!
blockNumber: BigInt!
blockTimestamp: BigInt!
transactionHash: Bytes!
}

# First stored offer per agreementId, keyed by bytes16 agreement ID.
# Dipper queries this entity as an idempotency gate -- avoids re-submitting
# an offer after a crashed-mid-flight restart where the on-chain tx landed
# but dipper lost track of it.
#
# Declared immutable because, for a given agreementId, the RCA identifying
# fields (payer, dataService, serviceProvider, deadline, nonce) are fixed by
# the id derivation, so any duplicate OfferStored event for the same id would
# carry the same offerHash. The handler enforces this by returning early on
# the second event instead of attempting to overwrite.
type Offer @entity(immutable: true) {
id: Bytes!
payer: Bytes!
offerType: Int!
offerHash: Bytes!
createdAtBlock: BigInt!
createdAtTimestamp: BigInt!
createdAtTx: Bytes!
}
36 changes: 35 additions & 1 deletion src/recurringCollector.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { IndexingAgreement } from '../generated/schema'
import { IndexingAgreement, Offer, OfferStored } from '../generated/schema'
import {
AgreementAccepted,
AgreementCanceled,
AgreementUpdated,
RCACollected,
OfferStored as OfferStoredEvent,
} from '../generated/RecurringCollector/RecurringCollector'
import { createOrLoadIndexingAgreement, BIGINT_ZERO } from './helpers'

Expand Down Expand Up @@ -61,3 +62,36 @@ export function handleRCACollected(event: RCACollected): void {
agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens)
agreement.save()
}

export function handleOfferStored(event: OfferStoredEvent): void {
// Immutable event log
let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString()
let log = new OfferStored(logId)
log.agreementId = event.params.agreementId
log.payer = event.params.payer
log.offerType = event.params.offerType
log.offerHash = event.params.offerHash
log.blockNumber = event.block.number
log.blockTimestamp = event.block.timestamp
log.transactionHash = event.transaction.hash
log.save()

// First-offer entity keyed by agreementId (bytes16). Immutable: if an
// entity already exists, a duplicate OfferStored event for the same
// agreement id (e.g. dipper crashed and re-submitted, or a chain reorg
// re-emitted) carries the same offerHash by construction and we return
// early. Writing to an immutable entity a second time is a graph-node
// error that would halt the subgraph, so the guard is load-bearing.
let existing = Offer.load(event.params.agreementId)
if (existing != null) {
return
}
let offer = new Offer(event.params.agreementId)
offer.payer = event.params.payer
offer.offerType = event.params.offerType
offer.offerHash = event.params.offerHash
offer.createdAtBlock = event.block.number
offer.createdAtTimestamp = event.block.timestamp
offer.createdAtTx = event.transaction.hash
offer.save()
}
49 changes: 48 additions & 1 deletion src/subgraphService.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import { ethereum } from '@graphprotocol/graph-ts'
import {
IndexingAgreementAccepted as AcceptedEvent,
IndexingAgreementCanceled as CanceledEvent,
IndexingAgreementUpdated as UpdatedEvent,
IndexingFeesCollectedV1 as FeesCollectedEvent,
} from '../generated/SubgraphService/SubgraphService'
import { IndexerDeploymentLatest, IndexingFeeCollection } from '../generated/schema'
import {
IndexerDeploymentLatest,
IndexingFeeCollection,
IndexingAgreementAccepted,
IndexingAgreementCanceled,
IndexingAgreementUpdated,
} from '../generated/schema'
import { createOrLoadIndexingAgreement } from './helpers'

export function handleIndexingAgreementAccepted(event: AcceptedEvent): void {
Expand All @@ -20,6 +27,34 @@ export function handleIndexingAgreementAccepted(event: AcceptedEvent): void {
}

agreement.save()

let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString()
let log = new IndexingAgreementAccepted(logId)
log.indexer = event.params.indexer
log.payer = event.params.payer
log.agreementId = event.params.agreementId
log.allocationId = event.params.allocationId
log.blockNumber = event.block.number
log.blockTimestamp = event.block.timestamp
log.transactionHash = event.transaction.hash
log.save()
}

export function handleIndexingAgreementCanceled(event: CanceledEvent): void {
// State and canceledAt are set by RecurringCollector.handleAgreementCanceled,
// which emits the canonical cancel event with the canceledBy enum. This
// handler just writes the immutable transition log for event-sourcing
// consumers (dipper's chain_listener).
let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString()
let log = new IndexingAgreementCanceled(logId)
log.indexer = event.params.indexer
log.payer = event.params.payer
log.agreementId = event.params.agreementId
log.canceledBy = event.params.canceledOnBehalfOf
log.blockNumber = event.block.number
log.blockTimestamp = event.block.timestamp
log.transactionHash = event.transaction.hash
log.save()
}

export function handleIndexingAgreementUpdated(event: UpdatedEvent): void {
Expand All @@ -34,6 +69,18 @@ export function handleIndexingAgreementUpdated(event: UpdatedEvent): void {
}

agreement.save()

let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString()
let log = new IndexingAgreementUpdated(logId)
log.indexer = event.params.indexer
log.payer = event.params.payer
log.agreementId = event.params.agreementId
log.allocationId = event.params.allocationId
log.version = event.params.version
log.blockNumber = event.block.number
log.blockTimestamp = event.block.timestamp
log.transactionHash = event.transaction.hash
log.save()
}

export function handleIndexingFeesCollectedV1(event: FeesCollectedEvent): void {
Expand Down
9 changes: 9 additions & 0 deletions subgraph.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ dataSources:
- IndexingAgreement
- IndexingFeeCollection
- IndexerDeploymentLatest
- IndexingAgreementAccepted
- IndexingAgreementCanceled
- IndexingAgreementUpdated
abis:
- name: SubgraphService
file: ./abis/SubgraphService.json
eventHandlers:
- event: IndexingAgreementAccepted(indexed address,indexed address,indexed bytes16,address,bytes32,uint8,bytes)
handler: handleIndexingAgreementAccepted
- event: IndexingAgreementCanceled(indexed address,indexed address,indexed bytes16,address)
handler: handleIndexingAgreementCanceled
- event: IndexingAgreementUpdated(indexed address,indexed address,indexed bytes16,address,uint8,bytes)
handler: handleIndexingAgreementUpdated
- event: IndexingFeesCollectedV1(indexed address,indexed address,indexed bytes16,address,bytes32,uint256,uint256,uint256,bytes32,uint256,bytes)
Expand All @@ -43,6 +48,8 @@ dataSources:
language: wasm/assemblyscript
entities:
- IndexingAgreement
- OfferStored
- Offer
abis:
- name: RecurringCollector
file: ./abis/RecurringCollector.json
Expand All @@ -59,4 +66,6 @@ dataSources:
- event: RCACollected(indexed address,indexed address,indexed address,bytes16,bytes32,uint256,uint256)
handler: handleRCACollected
topic1: ["{{subgraphServiceAddress}}"]
- event: OfferStored(indexed bytes16,indexed address,indexed uint8,bytes32)
handler: handleOfferStored
file: ./src/recurringCollector.ts
Loading
Loading