From b222e37919bc2eb9a766fc5b1c67b805f5a3321d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 17:24:26 +0100 Subject: [PATCH 1/2] feat: trim heavy fields from raw_header before storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Strip dah (73%), validator_set (16%), and commit.signatures (12%) from Celestia ExtendedHeader JSON before persisting to SQLite. No consumer reads these fields — ev-node only extracts the timestamp. Reduces per-header storage from ~87KB to ~2KB (~98% reduction). Applied to all three ingestion paths: celestia-node JSON-RPC, celestia-app gRPC, and direct blockstore DB backfill. Co-Authored-By: Claude Opus 4.6 --- go.mod | 2 +- pkg/backfill/db/source.go | 28 ++++++++++++++++- pkg/fetch/celestia_app.go | 5 +++- pkg/fetch/celestia_node.go | 41 ++++++++++++++++++++++++- pkg/fetch/celestia_node_test.go | 53 +++++++++++++++++++++++++++++++++ 5 files changed, 125 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index a8a14d7..02683c1 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/cockroachdb/pebble v1.1.5 github.com/filecoin-project/go-jsonrpc v0.10.1 github.com/google/orderedcode v0.0.1 + github.com/gorilla/websocket v1.4.2 github.com/prometheus/client_golang v1.23.2 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.2 @@ -50,7 +51,6 @@ require ( github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-log/v2 v2.0.8 // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/pkg/backfill/db/source.go b/pkg/backfill/db/source.go index 36f8f6d..4d0576e 100644 --- a/pkg/backfill/db/source.go +++ b/pkg/backfill/db/source.go @@ -2,6 +2,8 @@ package db import ( "context" + "encoding/hex" + "encoding/json" "fmt" "os" "path/filepath" @@ -189,7 +191,7 @@ func (s *Source) FetchHeight(_ context.Context, height uint64, namespaces []type Hash: meta.BlockHash, DataHash: block.DataHash, Time: block.Time, - RawHeader: rawBlock, + RawHeader: buildMinimalRawHeader(height, block.Time, block.DataHash, meta.BlockHash), } if len(namespaces) == 0 { @@ -710,6 +712,30 @@ func splitIntoParts(raw []byte, partSize int) [][]byte { return out } +// buildMinimalRawHeader synthesizes a small JSON object from the decoded block +// fields. The backfill source reads raw protobuf, so we build the JSON that +// consumers (ev-node) expect rather than storing the full protobuf blob. +func buildMinimalRawHeader(height uint64, t time.Time, dataHash, blockHash []byte) []byte { + obj := map[string]any{ + "header": map[string]any{ + "height": fmt.Sprintf("%d", height), + "time": t.Format(time.RFC3339Nano), + "data_hash": hex.EncodeToString(dataHash), + }, + "commit": map[string]any{ + "height": fmt.Sprintf("%d", height), + "block_id": map[string]any{ + "hash": hex.EncodeToString(blockHash), + }, + }, + } + raw, err := json.Marshal(obj) + if err != nil { + return nil + } + return raw +} + func writeTestBlock(path, backend, layout string, height uint64, hash, dataHash []byte, t time.Time, txs [][]byte, partSize int) error { w, err := openWritable(path, backend) if err != nil { diff --git a/pkg/fetch/celestia_app.go b/pkg/fetch/celestia_app.go index b1b7de5..76145e4 100644 --- a/pkg/fetch/celestia_app.go +++ b/pkg/fetch/celestia_app.go @@ -213,7 +213,10 @@ func mapBlockResponse(blockID *cometpb.BlockID, block *cometpb.Block) (*types.He hdr := block.Header t := hdr.Time.AsTime() - raw, err := json.Marshal(block) + // Store only the header sub-object, not the full block (which includes + // last_commit signatures, evidence, and transaction data — all stored + // separately or unused). + raw, err := json.Marshal(hdr) if err != nil { return nil, fmt.Errorf("marshal raw header: %w", err) } diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index f7035bd..ddbfa78 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -400,10 +400,49 @@ func mapHeader(raw json.RawMessage) (*types.Header, error) { Hash: []byte(h.Commit.BlockID.Hash), DataHash: []byte(h.Header.DataHash), Time: t, - RawHeader: []byte(raw), + RawHeader: TrimRawHeader([]byte(raw)), }, nil } +// heavyHeaderKeys are top-level ExtendedHeader fields that are large and unused +// by any consumer. Removing them reduces per-header storage from ~87KB to ~2KB. +// +// - dah: Data Availability Header with row/column roots (~64KB, 73%) +// - validator_set: full validator public keys and voting power (~14KB, 16%) +// - commit.signatures: individual validator signatures (~10KB, 12%) +var heavyHeaderKeys = []string{"dah", "validator_set"} + +// TrimRawHeader removes large, unused fields from a Celestia ExtendedHeader JSON +// to reduce storage footprint. The remaining ~1KB "header" object plus "commit" +// metadata are sufficient for all known consumers (ev-node only reads the timestamp). +func TrimRawHeader(raw []byte) []byte { + var obj map[string]json.RawMessage + if err := json.Unmarshal(raw, &obj); err != nil { + return raw + } + + for _, key := range heavyHeaderKeys { + delete(obj, key) + } + + // Keep commit.block_id and commit.height but strip commit.signatures. + if commitRaw, ok := obj["commit"]; ok { + var commit map[string]json.RawMessage + if err := json.Unmarshal(commitRaw, &commit); err == nil { + delete(commit, "signatures") + if trimmed, err := json.Marshal(commit); err == nil { + obj["commit"] = trimmed + } + } + } + + trimmed, err := json.Marshal(obj) + if err != nil { + return raw + } + return trimmed +} + func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) { // Celestia returns null/empty for no blobs. if len(raw) == 0 || string(raw) == "null" { diff --git a/pkg/fetch/celestia_node_test.go b/pkg/fetch/celestia_node_test.go index 376fa74..f25d015 100644 --- a/pkg/fetch/celestia_node_test.go +++ b/pkg/fetch/celestia_node_test.go @@ -231,6 +231,59 @@ func TestJsonInt64(t *testing.T) { } } +func TestTrimRawHeader(t *testing.T) { + raw := json.RawMessage(`{ + "header": {"height":"100","time":"2025-01-01T00:00:00Z","chain_id":"test"}, + "dah": {"row_roots":["AAAA","BBBB"],"column_roots":["CCCC","DDDD"]}, + "validator_set": {"validators":[{"address":"abc","pub_key":{"type":"ed25519","value":"xxx"}}]}, + "commit": {"height":"100","block_id":{"hash":"1234"},"signatures":[{"validator_address":"abc","signature":"sig"}]} + }`) + + trimmed := TrimRawHeader(raw) + var obj map[string]json.RawMessage + if err := json.Unmarshal(trimmed, &obj); err != nil { + t.Fatalf("unmarshal trimmed: %v", err) + } + + if _, ok := obj["dah"]; ok { + t.Error("dah should be removed") + } + if _, ok := obj["validator_set"]; ok { + t.Error("validator_set should be removed") + } + if _, ok := obj["header"]; !ok { + t.Error("header should be preserved") + } + if _, ok := obj["commit"]; !ok { + t.Fatal("commit should be preserved") + } + + // commit should have block_id but not signatures + var commit map[string]json.RawMessage + if err := json.Unmarshal(obj["commit"], &commit); err != nil { + t.Fatalf("unmarshal commit: %v", err) + } + if _, ok := commit["block_id"]; !ok { + t.Error("commit.block_id should be preserved") + } + if _, ok := commit["signatures"]; ok { + t.Error("commit.signatures should be removed") + } + + // Verify trimmed is much smaller + if len(trimmed) >= len(raw) { + t.Errorf("trimmed (%d bytes) should be smaller than original (%d bytes)", len(trimmed), len(raw)) + } +} + +func TestTrimRawHeaderInvalidJSON(t *testing.T) { + raw := []byte(`not json`) + trimmed := TrimRawHeader(raw) + if string(trimmed) != string(raw) { + t.Error("invalid JSON should be returned as-is") + } +} + func TestHexBytes(t *testing.T) { tests := []struct { input string From a4e252fccde3b54c5a006d5d939f0df86b39aab1 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 21:26:06 +0100 Subject: [PATCH 2/2] fix: address PR review comments - Move gorilla/websocket to indirect dep (not imported by committed code) - Propagate marshal error from buildMinimalRawHeader instead of returning nil - Standardize celestia_app RawHeader to canonical envelope shape matching celestia_node and backfill sources - Consolidate TrimRawHeader tests into table-driven structure Co-Authored-By: Claude Opus 4.6 --- go.mod | 2 +- pkg/backfill/db/source.go | 13 ++-- pkg/fetch/celestia_app.go | 18 +++-- pkg/fetch/celestia_node_test.go | 112 +++++++++++++++++++------------- 4 files changed, 91 insertions(+), 54 deletions(-) diff --git a/go.mod b/go.mod index 02683c1..a8a14d7 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/cockroachdb/pebble v1.1.5 github.com/filecoin-project/go-jsonrpc v0.10.1 github.com/google/orderedcode v0.0.1 - github.com/gorilla/websocket v1.4.2 github.com/prometheus/client_golang v1.23.2 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.2 @@ -51,6 +50,7 @@ require ( github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-log/v2 v2.0.8 // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/pkg/backfill/db/source.go b/pkg/backfill/db/source.go index 4d0576e..c448e22 100644 --- a/pkg/backfill/db/source.go +++ b/pkg/backfill/db/source.go @@ -186,12 +186,17 @@ func (s *Source) FetchHeight(_ context.Context, height uint64, namespaces []type return nil, nil, fmt.Errorf("height mismatch in block: got %d want %d", block.Height, height) } + rawHeader, err := buildMinimalRawHeader(height, block.Time, block.DataHash, meta.BlockHash) + if err != nil { + return nil, nil, fmt.Errorf("build minimal raw header at height %d: %w", height, err) + } + hdr := &types.Header{ Height: height, Hash: meta.BlockHash, DataHash: block.DataHash, Time: block.Time, - RawHeader: buildMinimalRawHeader(height, block.Time, block.DataHash, meta.BlockHash), + RawHeader: rawHeader, } if len(namespaces) == 0 { @@ -715,7 +720,7 @@ func splitIntoParts(raw []byte, partSize int) [][]byte { // buildMinimalRawHeader synthesizes a small JSON object from the decoded block // fields. The backfill source reads raw protobuf, so we build the JSON that // consumers (ev-node) expect rather than storing the full protobuf blob. -func buildMinimalRawHeader(height uint64, t time.Time, dataHash, blockHash []byte) []byte { +func buildMinimalRawHeader(height uint64, t time.Time, dataHash, blockHash []byte) ([]byte, error) { obj := map[string]any{ "header": map[string]any{ "height": fmt.Sprintf("%d", height), @@ -731,9 +736,9 @@ func buildMinimalRawHeader(height uint64, t time.Time, dataHash, blockHash []byt } raw, err := json.Marshal(obj) if err != nil { - return nil + return nil, fmt.Errorf("marshal minimal raw header: %w", err) } - return raw + return raw, nil } func writeTestBlock(path, backend, layout string, height uint64, hash, dataHash []byte, t time.Time, txs [][]byte, partSize int) error { diff --git a/pkg/fetch/celestia_app.go b/pkg/fetch/celestia_app.go index 76145e4..b2f45d8 100644 --- a/pkg/fetch/celestia_app.go +++ b/pkg/fetch/celestia_app.go @@ -213,12 +213,20 @@ func mapBlockResponse(blockID *cometpb.BlockID, block *cometpb.Block) (*types.He hdr := block.Header t := hdr.Time.AsTime() - // Store only the header sub-object, not the full block (which includes - // last_commit signatures, evidence, and transaction data — all stored - // separately or unused). - raw, err := json.Marshal(hdr) + // Wrap in envelope matching the canonical shape used by celestia_node and + // backfill: {"header": ..., "commit": ...}. The gRPC response does not + // include a separate commit object, so we synthesize a minimal one from + // the block_id. + envelope := map[string]any{ + "header": hdr, + "commit": map[string]any{ + "height": fmt.Sprintf("%d", hdr.Height), + "block_id": blockID, + }, + } + raw, err := json.Marshal(envelope) if err != nil { - return nil, fmt.Errorf("marshal raw header: %w", err) + return nil, fmt.Errorf("marshal raw header envelope: %w", err) } return &types.Header{ diff --git a/pkg/fetch/celestia_node_test.go b/pkg/fetch/celestia_node_test.go index f25d015..a89bb32 100644 --- a/pkg/fetch/celestia_node_test.go +++ b/pkg/fetch/celestia_node_test.go @@ -232,55 +232,79 @@ func TestJsonInt64(t *testing.T) { } func TestTrimRawHeader(t *testing.T) { - raw := json.RawMessage(`{ - "header": {"height":"100","time":"2025-01-01T00:00:00Z","chain_id":"test"}, - "dah": {"row_roots":["AAAA","BBBB"],"column_roots":["CCCC","DDDD"]}, - "validator_set": {"validators":[{"address":"abc","pub_key":{"type":"ed25519","value":"xxx"}}]}, - "commit": {"height":"100","block_id":{"hash":"1234"},"signatures":[{"validator_address":"abc","signature":"sig"}]} - }`) - - trimmed := TrimRawHeader(raw) - var obj map[string]json.RawMessage - if err := json.Unmarshal(trimmed, &obj); err != nil { - t.Fatalf("unmarshal trimmed: %v", err) + tests := []struct { + name string + input []byte + expectedPresent []string // top-level keys that must exist + expectedAbsent []string // top-level keys that must not exist + checkCommit bool // if true, verify commit internals + expectUnchanged bool // if true, output must equal input + expectSmaller bool // if true, output must be smaller than input + }{ + { + name: "valid header trims heavy fields", + input: []byte(`{ + "header": {"height":"100","time":"2025-01-01T00:00:00Z","chain_id":"test"}, + "dah": {"row_roots":["AAAA","BBBB"],"column_roots":["CCCC","DDDD"]}, + "validator_set": {"validators":[{"address":"abc","pub_key":{"type":"ed25519","value":"xxx"}}]}, + "commit": {"height":"100","block_id":{"hash":"1234"},"signatures":[{"validator_address":"abc","signature":"sig"}]} + }`), + expectedPresent: []string{"header", "commit"}, + expectedAbsent: []string{"dah", "validator_set"}, + checkCommit: true, + expectSmaller: true, + }, + { + name: "invalid JSON returned as-is", + input: []byte(`not json`), + expectUnchanged: true, + }, } - if _, ok := obj["dah"]; ok { - t.Error("dah should be removed") - } - if _, ok := obj["validator_set"]; ok { - t.Error("validator_set should be removed") - } - if _, ok := obj["header"]; !ok { - t.Error("header should be preserved") - } - if _, ok := obj["commit"]; !ok { - t.Fatal("commit should be preserved") - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + trimmed := TrimRawHeader(tt.input) - // commit should have block_id but not signatures - var commit map[string]json.RawMessage - if err := json.Unmarshal(obj["commit"], &commit); err != nil { - t.Fatalf("unmarshal commit: %v", err) - } - if _, ok := commit["block_id"]; !ok { - t.Error("commit.block_id should be preserved") - } - if _, ok := commit["signatures"]; ok { - t.Error("commit.signatures should be removed") - } + if tt.expectUnchanged { + if string(trimmed) != string(tt.input) { + t.Error("expected output to equal input") + } + return + } - // Verify trimmed is much smaller - if len(trimmed) >= len(raw) { - t.Errorf("trimmed (%d bytes) should be smaller than original (%d bytes)", len(trimmed), len(raw)) - } -} + var obj map[string]json.RawMessage + if err := json.Unmarshal(trimmed, &obj); err != nil { + t.Fatalf("unmarshal trimmed: %v", err) + } + + for _, key := range tt.expectedPresent { + if _, ok := obj[key]; !ok { + t.Errorf("%s should be preserved", key) + } + } + for _, key := range tt.expectedAbsent { + if _, ok := obj[key]; ok { + t.Errorf("%s should be removed", key) + } + } + + if tt.checkCommit { + var commit map[string]json.RawMessage + if err := json.Unmarshal(obj["commit"], &commit); err != nil { + t.Fatalf("unmarshal commit: %v", err) + } + if _, ok := commit["block_id"]; !ok { + t.Error("commit.block_id should be preserved") + } + if _, ok := commit["signatures"]; ok { + t.Error("commit.signatures should be removed") + } + } -func TestTrimRawHeaderInvalidJSON(t *testing.T) { - raw := []byte(`not json`) - trimmed := TrimRawHeader(raw) - if string(trimmed) != string(raw) { - t.Error("invalid JSON should be returned as-is") + if tt.expectSmaller && len(trimmed) >= len(tt.input) { + t.Errorf("trimmed (%d bytes) should be smaller than original (%d bytes)", len(trimmed), len(tt.input)) + } + }) } }