Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,13 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error {
Service: api,
})

// Binary SSZ commit endpoint. Sized to comfortably fit a 10MB SSZ block;
// raise alongside the JSON-RPC body limit if larger blocks are needed.
server.AddHandler(
conductorrpc.CommitUnsafePayloadPath,
conductorrpc.BinaryCommitHandler(oc.log, oc, 16*1024*1024),
)

if oc.cfg.RPCEnableProxy {
execClient, err := dial.DialEthClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.ExecutionRPC)
if err != nil {
Expand Down Expand Up @@ -640,6 +647,12 @@ func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.Execu
return oc.cons.CommitUnsafePayload(payload)
}

// CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the cluster FSM.
// Used by the binary HTTP endpoint to avoid the JSON-decode -> SSZ-marshal round trip.
func (oc *OpConductor) CommitUnsafePayloadSSZ(_ context.Context, ssz []byte) error {
return oc.cons.CommitUnsafePayloadSSZ(ssz)
}

// SequencerHealthy returns true if sequencer is healthy.
func (oc *OpConductor) SequencerHealthy(_ context.Context) bool {
return oc.healthy.Load()
Expand Down
4 changes: 4 additions & 0 deletions op-conductor/consensus/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ type Consensus interface {

// CommitUnsafePayload commits latest unsafe payload to the FSM in a strongly consistent fashion.
CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
// CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the FSM,
// skipping the marshal step. The bytes are handed directly to raft.Apply and
// validated by the FSM on receive. Used by the binary HTTP endpoint.
CommitUnsafePayloadSSZ(ssz []byte) error
// LatestUnsafePayload returns the latest unsafe payload from FSM in a strongly consistent fashion.
LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error)

Expand Down
45 changes: 45 additions & 0 deletions op-conductor/consensus/mocks/Consensus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions op-conductor/consensus/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,30 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelo
return nil
}

// CommitUnsafePayloadSSZ implements Consensus. The bytes are passed directly to
// raft.Apply; the FSM validates by attempting UnmarshalSSZ on receive. This
// avoids the SSZ marshal step (and, when callers send SSZ over the wire, the
// JSON-decode-then-SSZ-marshal round trip the typed entrypoint requires).
func (rc *RaftConsensus) CommitUnsafePayloadSSZ(ssz []byte) error {
if len(ssz) == 0 {
return errors.New("empty payload")
}

applyStart := time.Now()
f := rc.r.Apply(ssz, defaultTimeout)
if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply payload envelope")
}
applyDur := time.Since(applyStart)

if rc.metrics != nil {
// No marshal step on this path.
rc.metrics.RecordCommitDuration(0, applyDur.Seconds())
rc.metrics.RecordCommitPayloadSize(float64(len(ssz)))
}
return nil
}

type instrumentedLogStore struct {
raft.LogStore
metrics ConsensusMetrics
Expand Down
96 changes: 96 additions & 0 deletions op-conductor/rpc/binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package rpc

import (
"context"
"errors"
"fmt"
"io"
"net/http"

"github.com/ethereum/go-ethereum/log"
)

// CommitUnsafePayloadPath is the HTTP route for the SSZ binary commit endpoint.
// External clients (op-node, base's Rust CL replacement, etc.) POST a raw
// SSZ-encoded ExecutionPayloadEnvelope here. The body is handed verbatim to
// raft.Apply; the FSM validates by attempting UnmarshalSSZ on receive.
//
// Wire format:
// - method: POST
// - path: /commit-unsafe-payload
// - content-type: application/octet-stream
// - body: SSZ-encoded ExecutionPayloadEnvelope (no length prefix,
// body length implies SSZ scope; current FSM tries V4 then
// V3, matching the JSON-RPC path).
// - response: 200 on success, 4xx for client errors, 5xx for raft
// errors. Body is empty on 200, plain-text error message
// otherwise.
const CommitUnsafePayloadPath = "/commit-unsafe-payload"

// SSZContentType is the content type clients should send for the binary endpoint.
const SSZContentType = "application/octet-stream"

// commitSSZBackend is the subset of the conductor backend the binary endpoint needs.
type commitSSZBackend interface {
CommitUnsafePayloadSSZ(ctx context.Context, ssz []byte) error
}

// BinaryCommitHandler returns an http.Handler that accepts SSZ-encoded payloads
// and forwards them to the conductor's raft layer. maxBodyBytes caps the
// request body to prevent DoS; 0 means no cap (not recommended).
func BinaryCommitHandler(lgr log.Logger, backend commitSSZBackend, maxBodyBytes int64) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", "POST")
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if ct := r.Header.Get("Content-Type"); ct != "" && ct != SSZContentType {
http.Error(w, fmt.Sprintf("unsupported content-type %q, want %s", ct, SSZContentType), http.StatusUnsupportedMediaType)
return
}

body := r.Body
if maxBodyBytes > 0 {
// Reject upfront if Content-Length declares an over-limit body.
if r.ContentLength > maxBodyBytes {
http.Error(w, fmt.Sprintf("payload too large: %d > %d", r.ContentLength, maxBodyBytes), http.StatusRequestEntityTooLarge)
return
}
body = http.MaxBytesReader(w, r.Body, maxBodyBytes)
}

// When Content-Length is set, pre-allocate the exact buffer and use
// ReadFull. Avoids io.ReadAll's grow-and-copy. ~10% faster end-to-end
// for multi-MB bodies; pure win when the client sends Content-Length
// (every standard HTTP client does).
var ssz []byte
var err error
if r.ContentLength > 0 {
ssz = make([]byte, r.ContentLength)
_, err = io.ReadFull(body, ssz)
} else {
ssz, err = io.ReadAll(body)
}
if err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) {
http.Error(w, fmt.Sprintf("payload too large: > %d bytes", maxErr.Limit), http.StatusRequestEntityTooLarge)
return
}
http.Error(w, fmt.Sprintf("read body: %v", err), http.StatusBadRequest)
return
}
if len(ssz) == 0 {
http.Error(w, "empty payload", http.StatusBadRequest)
return
}

if err := backend.CommitUnsafePayloadSSZ(r.Context(), ssz); err != nil {
lgr.Warn("failed to commit unsafe payload (binary)", "err", err, "size", len(ssz))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
})
}
60 changes: 60 additions & 0 deletions op-conductor/rpc/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package rpc

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"

"github.com/ethereum/go-ethereum/rpc"

Expand Down Expand Up @@ -137,3 +142,58 @@ func (c *APIClient) ClusterMembership(ctx context.Context) (*consensus.ClusterMe
err := c.c.CallContext(ctx, &clusterMembership, prefixRPC("clusterMembership"))
return &clusterMembership, err
}

// BinaryCommitClient is a thin HTTP client for the SSZ binary commit endpoint.
// It is intentionally separate from APIClient (which speaks JSON-RPC) so that
// callers can choose the binary path without sharing transport or codec.
type BinaryCommitClient struct {
httpClient *http.Client
endpoint string
}

// NewBinaryCommitClient constructs a client targeting baseURL (e.g.
// "http://conductor:8547"). httpClient may be nil; the default is used.
func NewBinaryCommitClient(baseURL string, httpClient *http.Client) *BinaryCommitClient {
if httpClient == nil {
httpClient = http.DefaultClient
}
return &BinaryCommitClient{
httpClient: httpClient,
endpoint: strings.TrimRight(baseURL, "/") + CommitUnsafePayloadPath,
}
}

// CommitUnsafePayload SSZ-encodes payload and POSTs it to the conductor's
// binary endpoint. Returns nil on 200, otherwise an error including the
// server's response body.
func (c *BinaryCommitClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
var buf bytes.Buffer
if _, err := payload.MarshalSSZ(&buf); err != nil {
return fmt.Errorf("marshal ssz: %w", err)
}
return c.CommitUnsafePayloadSSZ(ctx, buf.Bytes())
}

// CommitUnsafePayloadSSZ sends already-SSZ-encoded bytes. Useful when the
// caller already has the SSZ form (e.g. constructed directly by the EL client).
func (c *BinaryCommitClient) CommitUnsafePayloadSSZ(ctx context.Context, ssz []byte) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint, bytes.NewReader(ssz))
if err != nil {
return err
}
req.Header.Set("Content-Type", SSZContentType)
req.ContentLength = int64(len(ssz))

resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("commit failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
7 changes: 4 additions & 3 deletions op-node/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ type Config struct {
Cancel context.CancelCauseFunc

// Conductor is used to determine this node is the leader sequencer.
ConductorEnabled bool
ConductorRpc ConductorRPCFunc
ConductorRpcTimeout time.Duration
ConductorEnabled bool
ConductorRpc ConductorRPCFunc
ConductorRpcTimeout time.Duration
ConductorBinaryCommit bool

// AltDA config
AltDA altda.CLIConfig
Expand Down
10 changes: 10 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,15 @@ var (
Value: time.Second * 1,
Category: SequencerCategory,
}
ConductorBinaryCommitFlag = &cli.BoolFlag{
Name: "conductor.binary-commit",
Usage: "Use the conductor's SSZ-binary commit-unsafe-payload endpoint instead of " +
"JSON-RPC. Avoids JSON-encoding the payload (~10x faster on the leader RPC " +
"handler). Requires conductor with binary endpoint support.",
EnvVars: prefixEnvVars("CONDUCTOR_BINARY_COMMIT"),
Value: false,
Category: SequencerCategory,
}
/* Interop flags, experimental. */
InteropRPCAddr = &cli.StringFlag{
Name: "interop.rpc.addr",
Expand Down Expand Up @@ -506,6 +515,7 @@ var optionalFlags = []cli.Flag{
ConductorEnabledFlag,
ConductorRpcFlag,
ConductorRpcTimeoutFlag,
ConductorBinaryCommitFlag,
SafeDBPath,
L1ChainConfig,
L2EngineKind,
Expand Down
25 changes: 21 additions & 4 deletions op-node/node/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"context"
"fmt"
"net/http"
"sync/atomic"
"time"

Expand All @@ -25,7 +26,8 @@ type ConductorClient struct {
metrics *metrics.Metrics
log log.Logger

apiClient locks.RWValue[*conductorRpc.APIClient]
apiClient locks.RWValue[*conductorRpc.APIClient]
binaryClient locks.RWValue[*conductorRpc.BinaryCommitClient]

// overrideLeader is used to override the leader check for disaster recovery purposes.
// During disaster situations where the cluster is unhealthy (no leader, only 1 or less nodes up),
Expand Down Expand Up @@ -63,6 +65,16 @@ func (c *ConductorClient) initialize(ctx context.Context) error {
return fmt.Errorf("failed to dial conductor RPC: %w", err)
}
c.apiClient.Value = conductorRpc.NewAPIClient(conductorRpcClient)

if c.cfg.ConductorBinaryCommit {
c.binaryClient.Lock()
defer c.binaryClient.Unlock()
// Reuse the conductor RPC endpoint URL — the binary commit handler is
// served on the same HTTP server at conductorRpc.CommitUnsafePayloadPath.
httpClient := &http.Client{Timeout: c.cfg.ConductorRpcTimeout}
c.binaryClient.Value = conductorRpc.NewBinaryCommitClient(endpoint, httpClient)
c.log.Info("Conductor binary commit endpoint enabled", "endpoint", endpoint)
}
return nil
}

Expand Down Expand Up @@ -94,6 +106,8 @@ func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
}

// CommitUnsafePayload commits an unsafe payload to the conductor log.
// Uses the SSZ-binary endpoint when --conductor.binary-commit is set;
// otherwise the existing JSON-RPC method.
func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
if c.overrideLeader.Load() {
return nil
Expand All @@ -105,10 +119,13 @@ func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.
ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout)
defer cancel()

err := retry.Do0(ctx, 2, retry.Fixed(50*time.Millisecond), func() error {
commit := func() error {
if bc := c.binaryClient.Get(); bc != nil {
return bc.CommitUnsafePayload(ctx, payload)
}
return c.apiClient.Get().CommitUnsafePayload(ctx, payload)
})
return err
}
return retry.Do0(ctx, 2, retry.Fixed(50*time.Millisecond), commit)
}

// OverrideLeader implements conductor.SequencerConductor.
Expand Down
3 changes: 2 additions & 1 deletion op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func NewConfig(ctx cliiface.Context, log log.Logger) (*config.Config, error) {
ConductorRpc: func(context.Context) (string, error) {
return conductorRPCEndpoint, nil
},
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),
ConductorBinaryCommit: ctx.Bool(flags.ConductorBinaryCommitFlag.Name),

AltDA: altda.ReadCLIConfig(ctx),

Expand Down