Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,61 @@ updates node configuration and reloads it gracefully (admin only)

---

## Get Escrow Events

### `HTTP` GET /api/services/escrow/events?

### `HTTP` POST /directCommand

### `P2P` command: getEscrowEvents

#### Description

Returns indexed Escrow contract events. The indexer matches Escrow logs by topic hash, verifies they came from the chain's `Escrow` contract (`Deposit`/`Withdraw`/`Lock` are generic signatures), and stores one row per event in the append-only `escrow` collection keyed by `${txHash}-${logIndex}`. All filters are optional.

#### Parameters

| name | type | required | description |
| --------- | ------ | --------- | --------------------------------------------------------- |
| command | string | POST only | command name (`getEscrowEvents`) |
| chainId | number | | chain id |
| eventType | string | | one of `Auth, Lock, Claimed, Canceled, Deposit, Withdraw` |
| payer | string | | payer address (case-insensitive) |
| payee | string | | payee address (case-insensitive) |
| token | string | | token address (case-insensitive) |
| jobId | string | | compute job id |
| txId | string | | transaction hash |
| offset | number | | rows to skip (default 0) |
| size | number | | page size (default 100, max 250) |

#### Request (POST /directCommand)

```json
{ "command": "getEscrowEvents", "chainId": 8996, "eventType": "Deposit", "offset": 0, "size": 50 }
```

#### Response

Every row has `id, eventType, chainId, contract, block, txHash` plus event-specific fields (`payer, payee, token, jobId, amount, expiry, proof, maxLockedAmount, maxLockSeconds, maxLockCounts`).

```json
[
{
"id": "0x39f3...6575-3",
"eventType": "Deposit",
"chainId": 8996,
"contract": "0x282d...a1a1",
"block": 55,
"txHash": "0x39f3...6575",
"payer": "0xbe54...ab5e",
"token": "0x282d...a1a1",
"amount": "100000000000000000000"
}
]
```

---

# Compute

For starters, you can find a list of algorithms in the [Ocean Algorithms repository](https://github.com/oceanprotocol/algo_dockers) and the docker images in the [Algo Dockerhub](https://hub.docker.com/r/oceanprotocol/algo_dockers/tags).
Expand Down
1 change: 1 addition & 0 deletions docs/Arhitecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ An off-chain, multi-chain metadata & chain events cache. It continually monitors
- validates DDO, according to multiple SHACL schemas
- provides proof for valid DDOs
- monitors datatokens contracts & stores orders
- monitors the Escrow contract events (Auth, Lock, Claimed, Canceled, Deposit, Withdraw) and stores them for querying
- allows querys for all the above
- supports graceful shutdown and chain-specific reindexing

Expand Down
19 changes: 19 additions & 0 deletions src/@types/Escrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,22 @@ export interface EscrowLock {
expiry: BigInt
token: string
}

export interface EscrowEvent {
id: string
eventType: string
chainId: number
contract: string
block: number
txHash: string
payer?: string
payee?: string
token?: string
jobId?: string
amount?: string
expiry?: string
proof?: string
maxLockedAmount?: string
maxLockSeconds?: string
maxLockCounts?: string
}
12 changes: 12 additions & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ export interface QueryCommand extends Command {
maxResultsPerPage?: number
pageNumber?: number
}

export interface GetEscrowEventsCommand extends Command {
chainId?: number
eventType?: string
payer?: string
payee?: string
token?: string
jobId?: string
txId?: string
offset?: number
size?: number
}
export interface ReindexCommand extends Command {
txId: string
chainId: number
Expand Down
9 changes: 8 additions & 1 deletion src/components/Indexer/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
NewAccessListEventProcessor,
AddressAddedEventProcessor,
AddressRemovedEventProcessor,
EscrowEventProcessor,
ProcessorConstructor
} from './processors/index.js'
import { findEventByKey } from './utils.js'
Expand All @@ -42,7 +43,13 @@ const EVENT_PROCESSOR_MAP: Record<string, ProcessorConstructor> = {
[EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor,
[EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor,
[EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor,
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor,
[EVENTS.ESCROW_AUTH]: EscrowEventProcessor,
[EVENTS.ESCROW_LOCK]: EscrowEventProcessor,
[EVENTS.ESCROW_CLAIMED]: EscrowEventProcessor,
[EVENTS.ESCROW_CANCELED]: EscrowEventProcessor,
[EVENTS.ESCROW_DEPOSIT]: EscrowEventProcessor,
[EVENTS.ESCROW_WITHDRAW]: EscrowEventProcessor
}

const processorInstances = new Map<string, BaseEventProcessor>()
Expand Down
112 changes: 112 additions & 0 deletions src/components/Indexer/processors/EscrowEventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import { getContractAddress } from '../utils.js'
import { EVENTS } from '../../../utils/constants.js'
import { EscrowEvent } from '../../../@types/Escrow.js'
import { OceanNodeConfig } from '../../../@types/OceanNode.js'
import EscrowJson from '@oceanprotocol/contracts/artifacts/contracts/escrow/Escrow.sol/Escrow.json' with { type: 'json' }

const escrowInterface = new Interface(EscrowJson.abi)

const addr = (v: any): string => v?.toString().toLowerCase()
const num = (v: any): string => v?.toString()

export class EscrowEventProcessor extends BaseEventProcessor {
private readonly escrowAddress: string

constructor(chainId: number, config: OceanNodeConfig) {
super(chainId, config)
this.escrowAddress = getContractAddress(chainId, 'Escrow')
}

async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider,
eventName?: string
): Promise<any> {
try {
if (
!this.escrowAddress ||
event.address.toLowerCase() !== this.escrowAddress.toLowerCase()
) {
return null
}

const decoded = escrowInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const { args } = decoded
const record: EscrowEvent = {
id: `${event.transactionHash}-${event.index}`,
eventType: eventName,
chainId,
contract: event.address.toLowerCase(),
block: event.blockNumber,
txHash: event.transactionHash
}

switch (eventName) {
case EVENTS.ESCROW_AUTH:
record.payer = addr(args.payer)
record.payee = addr(args.payee)
record.maxLockedAmount = num(args.maxLockedAmount)
record.maxLockSeconds = num(args.maxLockSeconds)
record.maxLockCounts = num(args.maxLockCounts)
break
case EVENTS.ESCROW_LOCK:
record.payer = addr(args.payer)
record.payee = addr(args.payee)
record.jobId = num(args.jobId)
record.amount = num(args.amount)
record.expiry = num(args.expiry)
record.token = addr(args.token)
break
case EVENTS.ESCROW_CLAIMED:
record.payee = addr(args.payee)
record.jobId = num(args.jobId)
record.token = addr(args.token)
record.payer = addr(args.payer)
record.amount = num(args.amount)
record.proof = args.proof?.toString()
break
case EVENTS.ESCROW_CANCELED:
record.payee = addr(args.payee)
record.jobId = num(args.jobId)
record.token = addr(args.token)
record.payer = addr(args.payer)
record.amount = num(args.amount)
break
case EVENTS.ESCROW_DEPOSIT:
case EVENTS.ESCROW_WITHDRAW:
record.payer = addr(args.payer)
record.token = addr(args.token)
record.amount = num(args.amount)
break
default:
return null
}

const { escrow } = await this.getDatabase()
if (!escrow) return null
const result = await escrow.create(record)
INDEXER_LOGGER.logMessage(
`[Escrow] ${eventName} indexed for tx ${event.transactionHash} on chain ${chainId}`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing Escrow ${eventName} event: ${err.message}`,
true
)
return null
}
}
}
1 change: 1 addition & 0 deletions src/components/Indexer/processors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export * from './OrderStartedEventProcessor.js'
export * from './NewAccessListEventProcessor.js'
export * from './AddressAddedEventProcessor.js'
export * from './AddressRemovedEventProcessor.js'
export * from './EscrowEventProcessor.js'
export * from './BaseProcessor.js'

export type ProcessorConstructor = new (
Expand Down
5 changes: 5 additions & 0 deletions src/components/core/handler/coreHandlersRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import {
PersistentStorageUploadFileHandler
} from './persistentStorage.js'
import { GetAccessListHandler, SearchAccessListHandler } from './accessListHandler.js'
import { EscrowEventsHandler } from './escrowHandler.js'

export type HandlerRegistry = {
handlerName: string // name of the handler
Expand Down Expand Up @@ -208,6 +209,10 @@ export class CoreHandlersRegistry {
PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST,
new SearchAccessListHandler(node)
)
this.registerCoreHandler(
PROTOCOL_COMMANDS.GET_ESCROW_EVENTS,
new EscrowEventsHandler(node)
)
}

public static getInstance(
Expand Down
57 changes: 57 additions & 0 deletions src/components/core/handler/escrowHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { CommandHandler } from './handler.js'
import { GetEscrowEventsCommand } from '../../../@types/commands.js'
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import { Readable } from 'stream'
import {
ValidateParams,
validateCommandParameters
} from '../../httpRoutes/validateCommands.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'

export class EscrowEventsHandler extends CommandHandler {
validate(command: GetEscrowEventsCommand): ValidateParams {
return validateCommandParameters(command, [])
}

async handle(task: GetEscrowEventsCommand): Promise<P2PCommandResponse> {
const validationResponse = await this.verifyParamsAndRateLimits(task)
if (this.shouldDenyTaskHandling(validationResponse)) {
return validationResponse
}
try {
const database = await this.getOceanNode().getDatabase()
if (!database || !database.escrow) {
CORE_LOGGER.error('Escrow database is not available')
return {
stream: null,
status: { httpStatus: 503, error: 'Escrow database is not available' }
}
}

const filters: Record<string, any> = {
chainId: task.chainId,
eventType: task.eventType,
payer: typeof task.payer === 'string' ? task.payer.toLowerCase() : undefined,
payee: typeof task.payee === 'string' ? task.payee.toLowerCase() : undefined,
token: typeof task.token === 'string' ? task.token.toLowerCase() : undefined,
jobId: task.jobId,
txHash: task.txId
}

let result = await database.escrow.search(filters, task.offset, task.size)
if (!result) {
result = []
}
return {
stream: Readable.from(JSON.stringify(result)),
status: { httpStatus: 200 }
}
} catch (error) {
CORE_LOGGER.error(`Error in EscrowEventsHandler: ${error.message}`)
return {
stream: null,
status: { httpStatus: 500, error: 'Unknown error: ' + error.message }
}
}
}
}
23 changes: 23 additions & 0 deletions src/components/database/BaseDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Schema } from '.'
import { OceanNodeDBConfig } from '../../@types'
import { AccessListUser } from '../../@types/AccessList.js'
import { EscrowEvent } from '../../@types/Escrow.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { ElasticsearchSchema } from './ElasticSchemas.js'
Expand Down Expand Up @@ -152,6 +153,28 @@ export abstract class AbstractOrderDatabase {
abstract delete(orderId: string): Promise<any>
}

export abstract class AbstractEscrowDatabase {
protected config: OceanNodeDBConfig
protected schema: Schema

constructor(config: OceanNodeDBConfig, schema: Schema) {
this.config = config
this.schema = schema
}

abstract create(event: EscrowEvent): Promise<any>

abstract retrieve(id: string): Promise<Record<string, any> | null>

abstract search(
filters: Record<string, any>,
offset?: number,
size?: number
): Promise<Record<string, any>[] | null>

abstract delete(id: string): Promise<any>
}

export abstract class AbstractDdoDatabase {
protected config: OceanNodeDBConfig
protected schemas: Schema[]
Expand Down
Loading
Loading