Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export P2P_BOOTSTRAP_NODES=
export P2P_FILTER_ANNOUNCED_ADDRESSES=

## compute
# Example with cross-resource constraints (constraints are optional and backwards-compatible):
# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01}]}]},"resources":[{"id":"cpu","total":8,"max":8,"min":1,"constraints":[{"id":"ram","min":1,"max":3},{"id":"disk","min":10,"max":100}]},{"id":"ram","total":32,"max":32,"min":1},{"id":"disk","total":500,"max":500,"min":10},{"id":"gpu","total":4,"max":4,"min":0,"constraints":[{"id":"ram","min":8,"max":32},{"id":"cpu","min":2,"max":4}]}]}]'
export DOCKER_COMPUTE_ENVIRONMENTS=


10,113 changes: 5,840 additions & 4,273 deletions package-lock.json

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions src/@types/C2D/C2D.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export interface C2DClusterInfo {

export type ComputeResourceType = 'cpu' | 'ram' | 'disk' | any

export interface ResourceConstraint {
id: ComputeResourceType // the resource being constrained
min?: number // min units of this resource per unit of parent resource
max?: number // max units of this resource per unit of parent resource
}

export interface ComputeResourcesPricingInfo {
id: ComputeResourceType
price: number // price per unit per minute
Expand Down Expand Up @@ -63,6 +69,7 @@ export interface ComputeResource {
*/
platform?: string
init?: dockerHwInit
constraints?: ResourceConstraint[] // optional cross-resource constraints
}
export interface ComputeResourceRequest {
id: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export class RawPrivateKeyProvider implements IKeyProvider {
key && key.length > 0
? Buffer.from(key).toString('hex')
: new eciesjs.PrivateKey(privateKey.raw).publicKey.toHex()
encryptedData = eciesjs.encrypt(recipientPublicKeyHex, data)
encryptedData = Buffer.from(eciesjs.encrypt(recipientPublicKeyHex, data))
}
return encryptedData
}
Expand All @@ -142,7 +142,7 @@ export class RawPrivateKeyProvider implements IKeyProvider {
decryptedData = Buffer.concat([decipher.update(data), decipher.final()])
} else if (algorithm === EncryptMethod.ECIES) {
const sk = new eciesjs.PrivateKey(privateKey.raw)
decryptedData = eciesjs.decrypt(sk.secret, data)
decryptedData = Buffer.from(eciesjs.decrypt(sk.secret, data))
}
return decryptedData
}
Expand Down
4 changes: 2 additions & 2 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -959,11 +959,11 @@ export class OceanP2P extends EventEmitter {
const cid = await cidFromRawString(input)
const peersFound = []
try {
// @ts-ignore ignore the type mismatch
const f = this._libp2p.contentRouting.findProviders(cid, {
queryFuncTimeout: timeout || 20000 // 20 seconds
// on timeout the query ends with an abort signal => CodeError: Query aborted
})
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as any)
for await (const value of f) {
peersFound.push(value)
}
Expand Down
56 changes: 56 additions & 0 deletions src/components/c2d/compute_engine_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,65 @@ export abstract class C2DEngine {
properResources.push({ id: device, amount: desired })
}

this.checkResourceConstraints(properResources, env, isFree)
return properResources
}

protected checkResourceConstraints(
resources: ComputeResourceRequest[],
env: ComputeEnvironment,
isFree: boolean
): void {
const envResources = isFree ? (env.free?.resources ?? []) : (env.resources ?? [])
for (const envResource of envResources) {
if (!envResource.constraints || envResource.constraints.length === 0) continue
const parentAmount = this.getResourceRequest(resources, envResource.id)
if (!parentAmount || parentAmount <= 0) continue

for (const constraint of envResource.constraints) {
let constrainedAmount = this.getResourceRequest(resources, constraint.id) ?? 0

if (constraint.min !== undefined) {
const requiredMin = parentAmount * constraint.min
if (constrainedAmount < requiredMin) {
const constrainedMaxMin = this.getMaxMinResource(constraint.id, env, isFree)
if (requiredMin > constrainedMaxMin.max) {
throw new Error(
`Cannot satisfy constraint: ${parentAmount} ${envResource.id} requires at least ${requiredMin} ${constraint.id}, but max is ${constrainedMaxMin.max}`
)
}
this.setResourceAmount(resources, constraint.id, requiredMin)
constrainedAmount = requiredMin
}
}

if (constraint.max !== undefined) {
const requiredMax = parentAmount * constraint.max
// re-read in case it was bumped above
constrainedAmount = this.getResourceRequest(resources, constraint.id) ?? 0
if (constrainedAmount > requiredMax) {
throw new Error(
`Too much ${constraint.id} for ${parentAmount} ${envResource.id}. Max allowed: ${requiredMax}, requested: ${constrainedAmount}`
)
}
}
}
}
}

protected setResourceAmount(
resources: ComputeResourceRequest[],
id: ComputeResourceType,
amount: number
): void {
for (const resource of resources) {
if (resource.id === id) {
resource.amount = amount
return
}
}
}

public async getUsedResources(env: ComputeEnvironment): Promise<any> {
const usedResources: { [x: string]: any } = {}
const usedFreeResources: { [x: string]: any } = {}
Expand Down
10 changes: 5 additions & 5 deletions src/test/integration/compute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ describe('Compute', () => {
oceanNode.blockchainRegistry
)
oceanNode.addIndexer(indexer)
oceanNode.addC2DEngines()
await oceanNode.addC2DEngines()

provider = new JsonRpcProvider('http://127.0.0.1:8545')
publisherAccount = (await provider.getSigner(0)) as Signer
Expand Down Expand Up @@ -2215,7 +2215,7 @@ describe('Compute', () => {

after(async () => {
await tearDownEnvironment(previousConfiguration)
indexer.stopAllChainIndexers()
await indexer.stopAllChainIndexers()
})
})

Expand Down Expand Up @@ -2368,7 +2368,7 @@ describe('Compute Access Restrictions', () => {
oceanNode.blockchainRegistry
)
oceanNode.addIndexer(indexer)
oceanNode.addC2DEngines()
await oceanNode.addC2DEngines()

publishedComputeDataset = await publishAsset(computeAsset, publisherAccount)
publishedAlgoDataset = await publishAsset(algoAsset, publisherAccount)
Expand Down Expand Up @@ -2555,7 +2555,7 @@ describe('Compute Access Restrictions', () => {
oceanNode.blockchainRegistry
)
oceanNode.addIndexer(indexer)
oceanNode.addC2DEngines()
await oceanNode.addC2DEngines()

publishedComputeDataset = await publishAsset(computeAsset, publisherAccount)
publishedAlgoDataset = await publishAsset(algoAsset, publisherAccount)
Expand Down Expand Up @@ -2685,7 +2685,7 @@ describe('Compute Access Restrictions', () => {
oceanNode.blockchainRegistry
)
oceanNode.addIndexer(indexer)
oceanNode.addC2DEngines()
await oceanNode.addC2DEngines()

const provider = new JsonRpcProvider('http://127.0.0.1:8545')
const publisherAccount = (await provider.getSigner(0)) as Signer
Expand Down
Loading
Loading