Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/config/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ var (
DatabaseAllowSimplePasswords = Var("CL_DATABASE_ALLOW_SIMPLE_PASSWORDS")
IgnorePrereleaseVersionCheck = Var("CL_IGNORE_PRE_RELEASE_VERSION_CHECK")
SkipAppVersionCheck = Var("CL_SKIP_APP_VERSION_CHECK")
// MeterRecordsEnabled gates emission of metering.v1.MeterRecord events for
// durable CRE resources. Accepts strconv.ParseBool values; default false.
// Temporary deploy gate; promotion to TOML config is tracked by SHARED-2718.
MeterRecordsEnabled = Var("CL_METER_RECORDS_ENABLED")

DatabaseURL = Secret("CL_DATABASE_URL")
DatabaseBackupURL = Secret("CL_DATABASE_BACKUP_URL")
Expand Down
5 changes: 3 additions & 2 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.101
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip/chains/evm v0.0.0-20260506144252-c100eabfda74
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610184803-96d1e031407b
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260615184430-ef0995b527f1
github.com/smartcontractkit/chainlink-common/keystore v1.2.0
github.com/smartcontractkit/chainlink-data-streams v0.1.15-0.20260522094612-5f9f748bd87a
github.com/smartcontractkit/chainlink-deployments-framework v0.105.0
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260609161557-8ceae53b8ab1
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20260521215851-3fdbb363496f
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260609153034-c8423a41ef9a
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32
github.com/smartcontractkit/chainlink-protos/job-distributor v0.18.0
github.com/smartcontractkit/chainlink-testing-framework/framework v0.16.2
github.com/smartcontractkit/chainlink-testing-framework/framework/components/dockercompose v0.1.23
Expand Down Expand Up @@ -494,6 +494,7 @@ require (
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect
github.com/smartcontractkit/chainlink-protos/data-feeds v0.1.1-0.20260501174546-2e8846986b36 // indirect
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20260512230622-65f10f4cd305 // indirect
github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-20260615161920-ed05faf7f0a2 // indirect
github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260512230622-65f10f4cd305 // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.1-0.20260528221400-84746b70eeeb // indirect
github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260331131315-f08a616d8dcd // indirect
Expand Down
10 changes: 6 additions & 4 deletions core/scripts/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,16 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err
atomicSettings,
creServices.OCRConfigService,
cfg.Capabilities().Local(),
// Host-injected deployment/node metering identity for spawned capability
// LOOPs. Sourced once here from node config: product constant, env/zone
// from [Telemetry.ResourceAttributes], node_id = the same CSA pubkey hex
// the node uses for beholder auth and the engine's node_id.
standardcapabilities.NodeIdentity{
Product: "cre",
Environment: cfg.Telemetry().ResourceAttributes()["env"],
Zone: cfg.Telemetry().ResourceAttributes()["zone"],
NodeID: csaPubKeyHex,
},
)
delegates[job.StandardCapabilities] = stdcapDelegate
if creServices.SetDelegatesDeps != nil {
Expand Down
1 change: 1 addition & 0 deletions core/services/cre/confidential_relay_peerid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (s stubConfig) Workflows() config.Workflows { return nil }
func (s stubConfig) CRE() config.CRE { return nil }
func (s stubConfig) P2P() config.P2P { return s.p2p }
func (s stubConfig) Sharding() config.Sharding { return nil }
func (s stubConfig) Telemetry() config.Telemetry { return nil }

func peerIDFromByte(b byte) p2pkey.PeerID {
var id p2pkey.PeerID
Expand Down
83 changes: 83 additions & 0 deletions core/services/cre/cre.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (

"github.com/smartcontractkit/chainlink-common/keystore/corekeys/p2pkey"
"github.com/smartcontractkit/chainlink-common/keystore/corekeys/workflowkey"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/billing"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/diskmonitor"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
nodeauthjwt "github.com/smartcontractkit/chainlink-common/pkg/nodeauth/jwt"
"github.com/smartcontractkit/chainlink-common/pkg/resourcemanager"
commonsrv "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/orgresolver"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
Expand All @@ -43,6 +45,7 @@ import (
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr/capregconfig"
Expand Down Expand Up @@ -284,6 +287,10 @@ func (s *Services) newSubservices(
return srvs, nil
}

// Build the syncer's base metering identity once from node config + CSA key;
// the handler resolves the workflow DON id later from the don notifier.
meterIdentity := newSyncerMeterIdentity(cfg, keyStore, lggr)

wfSyncer, billingClient, wfSyncerSrvcs, err := newWorkflowRegistrySyncer(
cfg,
relayerChainInterops,
Expand All @@ -297,6 +304,7 @@ func (s *Services) newSubservices(
opts.LimitsFactory,
s.OrgResolver,
s.GatewayConnectorWrapper,
meterIdentity,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -325,6 +333,7 @@ type Config interface {
CRE() config.CRE
P2P() config.P2P
Sharding() config.Sharding
Telemetry() config.Telemetry
}

// RelayerChainInterops is the minimal interface needed for relayer chain interops
Expand Down Expand Up @@ -705,6 +714,67 @@ func newBillingClient(lggr logger.Logger, cfg Config, opts Opts) (metering.Billi
return billing.NewWorkflowClient(lggr, cfg.Billing().URL(), workflowOpts...)
}

// Metering identity sourcing constants. meterProduct is the deployment product
// dimension for all CRE metering records. The environment/zone dimensions are
// read from the node's [Telemetry.ResourceAttributes] map under these keys, the
// same map the host injects into trigger LOOPs (see core/services/
// standardcapabilities), so a node's engine and its trigger LOOPs agree on the
// coarse identity.
const (
meterProduct = "cre"
resourceAttrEnvKey = "env"
resourceAttrZoneKey = "zone"
)

// nodeCSAPublicKeyHex returns the node's CSA public key as hex — the canonical
// node_id used uniformly across a node's engine and its trigger LOOPs (matching
// keystore.BuildBeholderAuth, which derives the beholder auth pubkey from the
// same default CSA key). A node has at most one CSA key; an empty string is
// returned (with a warning) when none is available, so metering degrades to an
// empty node_id rather than failing node startup.
func nodeCSAPublicKeyHex(keyStore Keystore, lggr logger.Logger) string {
keys, err := keyStore.CSA().GetAll()
if err != nil || len(keys) == 0 {
lggr.Warnw("no CSA key available for metering node_id; node_id will be empty", "err", err)
return ""
}
return keys[0].PublicKeyString()
}

// newSyncerMeterIdentity builds the syncer's base metering identity from node
// config: product is the constant, environment/zone come from
// [Telemetry.ResourceAttributes], and node_id is the CSA public key. The
// workflow DON id (don_id) is resolved later by the handler from the don
// notifier (the engine runs on the workflow DON), so it is intentionally left
// empty here. Service/Resource/ResourceType are stamped by WithIdentity.
func newSyncerMeterIdentity(cfg Config, keyStore Keystore, lggr logger.Logger) resourcemanager.ResourceIdentity {
attrs := cfg.Telemetry().ResourceAttributes()
return resourcemanager.ResourceIdentity{
Product: meterProduct,
Environment: attrs[resourceAttrEnvKey],
Zone: attrs[resourceAttrZoneKey],
NodeID: nodeCSAPublicKeyHex(keyStore, lggr),
}
}

// meterRecordsEnabled reads the CL_METER_RECORDS_ENABLED env var gating emission of
// metering.v1.MeterRecord events. Unset, empty, or invalid values disable emission.
// Promotion of this gate to TOML config is tracked by SHARED-2718.
func meterRecordsEnabled(lggr logger.Logger) bool {
v := env.MeterRecordsEnabled.Get()
if v == "" {
return false
}
enabled, err := strconv.ParseBool(v)
if err != nil {
// Warn, not error, matching the capability producers: a bad gate value
// disables emission but must never disturb the node.
lggr.Warnw("Invalid CL_METER_RECORDS_ENABLED value; meter record emission disabled", "value", v, "err", err)
return false
}
return enabled
}

func newShardOrchestratorClient(cfg Config, lggr logger.Logger) (*shardorchestrator.Client, error) {
shardID := cfg.Sharding().ShardIndex()
if shardID == 0 {
Expand Down Expand Up @@ -920,6 +990,7 @@ func newWorkflowRegistrySyncerV2(
lf limits.Factory,
orgResolver orgresolver.OrgResolver,
gatewayConnectorWrapper *gatewayconnector.ServiceWrapper,
meterIdentity resourcemanager.ResourceIdentity,
) (syncerV2.WorkflowRegistrySyncer, []commonsrv.Service, error) {
capCfg := cfg.Capabilities()
wfReg := capCfg.WorkflowRegistry()
Expand Down Expand Up @@ -1004,6 +1075,16 @@ func newWorkflowRegistrySyncerV2(
syncerV2.WithLocalSecretOverrides(lggr, cfg.CRE().LocalSecretOverrides()),
syncerV2.WithShardExecutionGuard(shardOrchestratorClient, shardingEnabled, shardIndex),
syncerV2.WithShardRoutingSteady(shardRoutingSteady),
// The handler owns this ResourceManager's lifecycle (starts it, registers
// itself as the snapshotted Meterable, closes it). A positive
// SnapshotInterval enables the periodic absolute-state snapshot loop; the
// RM is otherwise a no-op when metering is disabled.
syncerV2.WithResourceManager(resourcemanager.NewResourceManager(lggr, resourcemanager.ResourceManagerConfig{
Enabled: meterRecordsEnabled(lggr),
Emitter: beholder.GetEmitter(),
SnapshotInterval: resourcemanager.DefaultSnapshotInterval,
})),
syncerV2.WithIdentity(meterIdentity),
}

mc := capCfg.WorkflowRegistry().ModuleCache()
Expand Down Expand Up @@ -1147,6 +1228,7 @@ func newWorkflowRegistrySyncer(
lf limits.Factory,
orgResolver orgresolver.OrgResolver,
gatewayConnectorWrapper *gatewayconnector.ServiceWrapper,
meterIdentity resourcemanager.ResourceIdentity,
) (syncerV2.WorkflowRegistrySyncer, metering.BillingClient, []commonsrv.Service, error) {
capCfg := cfg.Capabilities()

Expand Down Expand Up @@ -1195,6 +1277,7 @@ func newWorkflowRegistrySyncer(
lf,
orgResolver,
gatewayConnectorWrapper,
meterIdentity,
)
return syncer, billingClient, srvcs, err
default:
Expand Down
27 changes: 27 additions & 0 deletions core/services/standardcapabilities/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ type RelayGetter interface {
GetIDToRelayerMap() map[types.RelayID]loop.Relayer
}

// NodeIdentity is the host-injected deployment/node metering identity that the
// Delegate stamps onto every spawned capability's StandardCapabilitiesDependencies,
// mirroring how the engine sources the same dimensions from node config (see
// core/services/cre). It is sourced once at node startup (where the CSA key and
// telemetry config are both available) so operators configure it in one place and
// a node's engine and its trigger LOOPs agree on product/environment/zone/node_id.
type NodeIdentity struct {
// Product is the deployment product, e.g. "cre".
Product string
// Environment is the deployment environment, from [Telemetry.ResourceAttributes]["env"].
Environment string
// Zone is the deployment zone, from [Telemetry.ResourceAttributes]["zone"].
Zone string
// NodeID is the node's CSA public key (hex), matching the engine's node_id.
NodeID string
}

type Delegate struct {
logger logger.Logger
ds sqlutil.DataSource
Expand All @@ -66,6 +83,7 @@ type Delegate struct {
creSettings core.SettingsBroadcaster
ocrConfigService capregconfig.OCRConfigService
localCfg coreconfig.LocalCapabilities
nodeIdentity NodeIdentity
initErr error

isNewlyCreatedJob bool
Expand Down Expand Up @@ -98,6 +116,7 @@ func NewDelegate(
creSettings core.SettingsBroadcaster,
ocrConfigService capregconfig.OCRConfigService,
localCfg coreconfig.LocalCapabilities,
nodeIdentity NodeIdentity,
opts ...func(*gateway.RoundRobinSelector),
) *Delegate {
initErr := registerOptionalMockStreamsTrigger(logger, localCfg, registry)
Expand Down Expand Up @@ -125,6 +144,7 @@ func NewDelegate(
creSettings: creSettings,
ocrConfigService: ocrConfigService,
localCfg: localCfg,
nodeIdentity: nodeIdentity,
initErr: initErr,
selectorOpts: opts,
}
Expand Down Expand Up @@ -380,6 +400,13 @@ func (d *Delegate) NewServices(
OrgResolver: d.orgResolver,
CRESettings: d.creSettings,
TriggerEventStore: triggercap.NewTriggerEventStore(d.ds),
// Host-injected deployment/node metering identity, delivered to trigger
// LOOPs through the standardized Initialise channel. CapabilityDonID is
// resolved separately by #619 (InitMyDON) and intentionally left to it.
Product: d.nodeIdentity.Product,
Environment: d.nodeIdentity.Environment,
Zone: d.nodeIdentity.Zone,
NodeID: d.nodeIdentity.NodeID,
}
standardCapability := NewStandardCapabilities(log, command, configJSON, d.cfg, dependencies)

Expand Down
Loading