From ba7db110e40b92135dfa045c051fe100a3ef230e Mon Sep 17 00:00:00 2001 From: Makisuo Date: Fri, 29 May 2026 01:05:33 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(ingest):=20local=20Maple=20binary=20?= =?UTF-8?q?=E2=80=94=20OTLP=20ingest=20+=20embedded=20chDB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `maple start`, a standalone single-binary local mode: OTLP/HTTP ingest into an embedded in-process ClickHouse (chDB), reusing the production OTLP→NDJSON encoders and the generated ClickHouse schema so local rows are shaped identically to cloud. Single-tenant (OrgId="local"). - chdb module: one dedicated writer thread owns the chDB session; all bootstrap/insert/query is funneled through it (chDB is single-owner). - new `maple` bin gated behind the `local` cargo feature so the production maple-ingest build never links libchdb; clap CLI, Axum routes, rust-embed SPA fallback. - telemetry::encode_local_{traces,logs,metrics} wrap the private encoders for zero row-mapping divergence with the Tinybird path. - schema codegen: emit local-schema.sql + local-inserts.json from the Tinybird manifest, wired into the clickhouse:schema task. Co-Authored-By: Claude Opus 4.7 --- apps/ingest/Cargo.lock | 807 ++++++++++- apps/ingest/Cargo.toml | 16 + apps/ingest/schema/local-inserts.json | 344 +++++ apps/ingest/schema/local-schema.sql | 1220 +++++++++++++++++ apps/ingest/src/bin/local.rs | 290 ++++ apps/ingest/src/chdb.rs | 206 +++ apps/ingest/src/lib.rs | 2 + apps/ingest/src/telemetry.rs | 84 ++ apps/ingest/ui-dist/index.html | 25 + package.json | 4 +- .../domain/src/generated/clickhouse-schema.ts | 2 +- .../generate-clickhouse-insert-mappings.ts | 124 ++ scripts/generate-clickhouse-schema-sql.ts | 51 + 13 files changed, 3110 insertions(+), 65 deletions(-) create mode 100644 apps/ingest/schema/local-inserts.json create mode 100644 apps/ingest/schema/local-schema.sql create mode 100644 apps/ingest/src/bin/local.rs create mode 100644 apps/ingest/src/chdb.rs create mode 100644 apps/ingest/ui-dist/index.html create mode 100644 scripts/generate-clickhouse-insert-mappings.ts create mode 100644 scripts/generate-clickhouse-schema-sql.ts diff --git a/apps/ingest/Cargo.lock b/apps/ingest/Cargo.lock index 5d037f583..05f3e6401 100644 --- a/apps/ingest/Cargo.lock +++ b/apps/ingest/Cargo.lock @@ -32,12 +32,56 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.101" @@ -110,10 +154,10 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "itoa", "matchit", @@ -125,7 +169,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tower", "tower-layer", @@ -141,23 +185,55 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", - "sync_wrapper", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bindgen" +version = "0.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" +dependencies = [ + "bitflags 2.10.0", + "cexpr", + "clang-sys", + "itertools 0.10.5", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -209,6 +285,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -221,6 +306,19 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chdb-rust" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db53787943679165395651367ba9122092078256ec5ee1fb53b81c692d77667" +dependencies = [ + "bindgen", + "flate2", + "reqwest 0.11.27", + "tar", + "thiserror 1.0.69", +] + [[package]] name = "chrono" version = "0.4.43" @@ -259,23 +357,49 @@ dependencies = [ "half", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" -version = "4.6.1" +version = "4.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] name = "clap_builder" -version = "4.6.0" +version = "4.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -293,6 +417,12 @@ dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "combine" version = "4.6.7" @@ -324,6 +454,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -500,6 +640,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -537,6 +686,22 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + +[[package]] +name = "filetime" +version = "0.2.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c287a33c7f0a620c38e641e7f60827713987b3c0f26e8ddc9462cc69cf75759" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -559,6 +724,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -651,9 +831,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -696,6 +878,31 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.13" @@ -707,7 +914,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.4.0", "indexmap", "slab", "tokio", @@ -738,6 +945,12 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.5.2" @@ -753,6 +966,17 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.4.0" @@ -763,6 +987,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -770,7 +1005,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.4.0", ] [[package]] @@ -781,8 +1016,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -798,6 +1033,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.8.1" @@ -808,9 +1067,9 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -827,8 +1086,8 @@ version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.4.0", + "hyper 1.8.1", "hyper-util", "rustls", "rustls-native-certs", @@ -844,31 +1103,44 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.8.1", "hyper-util", "pin-project-lite", "tokio", "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.32", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-util", - "http", - "http-body", - "hyper", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.8.1", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.2", "tokio", "tower-service", "tracing", @@ -1037,6 +1309,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.10.5" @@ -1115,6 +1393,22 @@ version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link", +] + +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.1" @@ -1148,15 +1442,18 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", - "base64", + "base64 0.22.1", "bytes", + "chdb-rust", "chrono", + "clap", "crc32fast", "criterion", "dashmap", "dotenvy", "flate2", "hmac", + "mime_guess", "moka", "num_cpus", "opentelemetry", @@ -1165,6 +1462,7 @@ dependencies = [ "opentelemetry_sdk", "prost", "reqwest 0.13.2", + "rust-embed", "serde", "serde_json", "sha2", @@ -1206,6 +1504,22 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -1247,6 +1561,33 @@ dependencies = [ "uuid", ] +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1281,18 +1622,61 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "openssl" +version = "0.10.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a45fa2aa886c42762255da344f0a0d313e254066c46aad76f300c3d3da62d967" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "foreign-types", + "libc", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-sys" +version = "0.9.116" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28a22dc7140cda5f096e5e7724a6962ca81a7f8bfd2979f9b18c11af56318c4" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.31.0" @@ -1315,7 +1699,7 @@ checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", - "http", + "http 1.4.0", "opentelemetry", "reqwest 0.12.28", ] @@ -1326,7 +1710,7 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f69cd6acbb9af919df949cd1ec9e5e7fdc2ef15d234b6b795aaa525cc02f71f" dependencies = [ - "http", + "http 1.4.0", "opentelemetry", "opentelemetry-http", "opentelemetry-proto", @@ -1342,7 +1726,7 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ - "base64", + "base64 0.22.1", "const-hex", "opentelemetry", "opentelemetry_sdk", @@ -1437,6 +1821,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "plotters" version = "0.3.7" @@ -1489,6 +1879,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -1504,7 +1904,7 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37566cb3fdacef14c0737f9546df7cfeadbfbc9fef10991038bf5015d0c80532" dependencies = [ - "bitflags", + "bitflags 2.10.0", "num-traits", "rand", "rand_chacha", @@ -1547,9 +1947,9 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.1.1", "rustls", - "socket2", + "socket2 0.6.2", "thiserror 2.0.18", "tokio", "tracing", @@ -1568,7 +1968,7 @@ dependencies = [ "lru-slab", "rand", "ring", - "rustc-hash", + "rustc-hash 2.1.1", "rustls", "rustls-pki-types", "slab", @@ -1587,7 +1987,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.2", "tracing", "windows-sys 0.60.2", ] @@ -1671,7 +2071,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -1703,19 +2103,59 @@ version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "reqwest" version = "0.12.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-rustls", "hyper-util", "js-sys", @@ -1729,7 +2169,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-rustls", "tower", @@ -1747,14 +2187,14 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-core", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-rustls", "hyper-util", "js-sys", @@ -1767,7 +2207,7 @@ dependencies = [ "rustls-platform-verifier", "serde", "serde_json", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-rustls", "tower", @@ -1793,12 +2233,65 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rust-embed" +version = "8.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04113cb9355a377d83f06ef1f0a45b8ab8cd7d8b1288160717d66df5c7988d27" +dependencies = [ + "rust-embed-impl", + "rust-embed-utils", + "walkdir", +] + +[[package]] +name = "rust-embed-impl" +version = "8.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0902e4c7c8e997159ab384e6d0fc91c221375f6894346ae107f47dd0f3ccaa" +dependencies = [ + "proc-macro2", + "quote", + "rust-embed-utils", + "syn", + "walkdir", +] + +[[package]] +name = "rust-embed-utils" +version = "8.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bcdef0be6fe7f6fa333b1073c949729274b05f123a0ad7efcb8efd878e5c3b1" +dependencies = [ + "sha2", + "walkdir", +] + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags 2.10.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.36" @@ -1826,6 +2319,15 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -1842,7 +2344,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" dependencies = [ - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "jni", "log", @@ -1917,8 +2419,8 @@ version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d17b898a6d6948c3a8ee4372c17cb384f90d2e6e912ef00895b14fd7ab54ec38" dependencies = [ - "bitflags", - "core-foundation", + "bitflags 2.10.0", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -2054,6 +2556,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.2" @@ -2070,6 +2582,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -2087,6 +2605,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.2" @@ -2107,12 +2631,57 @@ dependencies = [ "syn", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tagptr" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "tar" +version = "0.4.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6221d9a6003c78398e3b239969f352578258df48c8eb051caadae0015bc840" +dependencies = [ + "filetime", + "libc", + "xattr", +] + +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -2228,7 +2797,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.2", "tokio-macros", "windows-sys 0.61.2", ] @@ -2244,6 +2813,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" @@ -2286,19 +2865,19 @@ checksum = "7f32a6f80051a4111560201420c7885d0082ba9efe2ab61875c587bb6b18b9a0" dependencies = [ "async-trait", "axum", - "base64", + "base64 0.22.1", "bytes", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-timeout", "hyper-util", "percent-encoding", "pin-project", - "socket2", - "sync_wrapper", + "socket2 0.6.2", + "sync_wrapper 1.0.2", "tokio", "tokio-stream", "tower", @@ -2329,7 +2908,7 @@ dependencies = [ "indexmap", "pin-project-lite", "slab", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-util", "tower-layer", @@ -2343,11 +2922,11 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower", @@ -2463,6 +3042,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-ident" version = "1.0.23" @@ -2493,6 +3078,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.20.0" @@ -2510,6 +3101,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -2715,6 +3312,15 @@ dependencies = [ "windows-targets 0.42.2", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2757,6 +3363,21 @@ dependencies = [ "windows_x86_64_msvc 0.42.2", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -2796,6 +3417,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -2814,6 +3441,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -2832,6 +3465,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2862,6 +3501,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -2880,6 +3525,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -2898,6 +3549,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -2916,6 +3573,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -2928,6 +3591,16 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wit-bindgen" version = "0.51.0" @@ -2940,6 +3613,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/apps/ingest/Cargo.toml b/apps/ingest/Cargo.toml index 4b4e26b43..fee8e64e7 100644 --- a/apps/ingest/Cargo.toml +++ b/apps/ingest/Cargo.toml @@ -53,6 +53,22 @@ tower-http = { version = "0.6", features = ["cors"] } url = "2.5.7" uuid = { version = "1", features = ["v4"] } moka = { version = "0.12", features = ["future"] } +# Local (`maple start`) only. Optional so the production ingest build never +# pulls libchdb (~319 MB) or the CLI/asset-embed deps. +chdb-rust = { version = "1.3.1", optional = true } +clap = { version = "4", features = ["derive"], optional = true } +rust-embed = { version = "8", optional = true } +mime_guess = { version = "2", optional = true } + +[features] +# Gates the standalone `maple` binary (embedded chDB + bundled SPA). Build with +# `cargo build --release --features local --bin maple`. +local = ["dep:chdb-rust", "dep:clap", "dep:rust-embed", "dep:mime_guess"] + +[[bin]] +name = "maple" +path = "src/bin/local.rs" +required-features = ["local"] [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio"] } diff --git a/apps/ingest/schema/local-inserts.json b/apps/ingest/schema/local-inserts.json new file mode 100644 index 000000000..5fee982d9 --- /dev/null +++ b/apps/ingest/schema/local-inserts.json @@ -0,0 +1,344 @@ +{ + "projectRevision": "db2debb5006e36794755071a9e6a62a71b358dfd97b89de0e1da515209d53880", + "orgPlaceholder": "__ORG__", + "datasources": { + "traces": { + "table": "traces", + "columns": [ + "OrgId", + "Timestamp", + "TraceId", + "SpanId", + "ParentSpanId", + "TraceState", + "SpanName", + "SpanKind", + "ServiceName", + "ResourceSchemaUrl", + "ResourceAttributes", + "ScopeSchemaUrl", + "ScopeName", + "ScopeVersion", + "ScopeAttributes", + "Duration", + "StatusCode", + "StatusMessage", + "SpanAttributes", + "EventsTimestamp", + "EventsName", + "EventsAttributes", + "LinksTraceId", + "LinksSpanId", + "LinksTraceState", + "LinksAttributes" + ], + "selects": [ + "__ORG__", + "start_time", + "trace_id", + "span_id", + "parent_span_id", + "trace_state", + "span_name", + "span_kind", + "service_name", + "resource_schema_url", + "resource_attributes", + "scope_schema_url", + "scope_name", + "scope_version", + "scope_attributes", + "duration", + "status_code", + "status_message", + "span_attributes", + "events_timestamp", + "events_name", + "events_attributes", + "links_trace_id", + "links_span_id", + "links_trace_state", + "links_attributes" + ], + "inputSchema": "start_time DateTime64(9), trace_id String, span_id String, parent_span_id String, trace_state String, span_name LowCardinality(String), span_kind LowCardinality(String), service_name LowCardinality(String), resource_schema_url String, resource_attributes Map(LowCardinality(String), String), scope_schema_url String, scope_name String, scope_version String, scope_attributes Map(LowCardinality(String), String), duration UInt64, status_code LowCardinality(String), status_message String, span_attributes Map(LowCardinality(String), String), events_timestamp Array(DateTime64(9)), events_name Array(LowCardinality(String)), events_attributes Array(Map(LowCardinality(String), String)), links_trace_id Array(String), links_span_id Array(String), links_trace_state Array(String), links_attributes Array(Map(LowCardinality(String), String))" + }, + "logs": { + "table": "logs", + "columns": [ + "OrgId", + "Timestamp", + "TimestampTime", + "TraceId", + "SpanId", + "TraceFlags", + "SeverityText", + "SeverityNumber", + "ServiceName", + "Body", + "ResourceSchemaUrl", + "ResourceAttributes", + "ScopeSchemaUrl", + "ScopeName", + "ScopeVersion", + "ScopeAttributes", + "LogAttributes" + ], + "selects": [ + "__ORG__", + "timestamp", + "timestamp", + "trace_id", + "span_id", + "flags", + "severity_text", + "severity_number", + "service_name", + "body", + "resource_schema_url", + "resource_attributes", + "scope_schema_url", + "scope_name", + "scope_version", + "scope_attributes", + "log_attributes" + ], + "inputSchema": "timestamp DateTime64(9), trace_id String, span_id String, flags UInt8, severity_text LowCardinality(String), severity_number UInt8, service_name LowCardinality(String), body String, resource_schema_url String, resource_attributes Map(LowCardinality(String), String), scope_schema_url String, scope_name String, scope_version String, scope_attributes Map(LowCardinality(String), String), log_attributes Map(LowCardinality(String), String)" + }, + "metrics_sum": { + "table": "metrics_sum", + "columns": [ + "OrgId", + "ResourceAttributes", + "ResourceSchemaUrl", + "ScopeName", + "ScopeVersion", + "ScopeAttributes", + "ScopeSchemaUrl", + "ServiceName", + "MetricName", + "MetricDescription", + "MetricUnit", + "Attributes", + "StartTimeUnix", + "TimeUnix", + "Value", + "Flags", + "ExemplarsTraceId", + "ExemplarsSpanId", + "ExemplarsTimestamp", + "ExemplarsValue", + "ExemplarsFilteredAttributes", + "AggregationTemporality", + "IsMonotonic" + ], + "selects": [ + "__ORG__", + "resource_attributes", + "resource_schema_url", + "scope_name", + "scope_version", + "scope_attributes", + "scope_schema_url", + "service_name", + "metric_name", + "metric_description", + "metric_unit", + "metric_attributes", + "start_timestamp", + "timestamp", + "value", + "flags", + "exemplars_trace_id", + "exemplars_span_id", + "exemplars_timestamp", + "exemplars_value", + "exemplars_filtered_attributes", + "aggregation_temporality", + "is_monotonic" + ], + "inputSchema": "resource_attributes Map(LowCardinality(String), String), resource_schema_url String, scope_name String, scope_version String, scope_attributes Map(LowCardinality(String), String), scope_schema_url String, service_name LowCardinality(String), metric_name LowCardinality(String), metric_description LowCardinality(String), metric_unit LowCardinality(String), metric_attributes Map(LowCardinality(String), String), start_timestamp DateTime64(9), timestamp DateTime64(9), value Float64, flags UInt32, exemplars_trace_id Array(String), exemplars_span_id Array(String), exemplars_timestamp Array(DateTime64(9)), exemplars_value Array(Float64), exemplars_filtered_attributes Array(Map(LowCardinality(String), String)), aggregation_temporality Int32, is_monotonic Bool" + }, + "metrics_gauge": { + "table": "metrics_gauge", + "columns": [ + "OrgId", + "ResourceAttributes", + "ResourceSchemaUrl", + "ScopeName", + "ScopeVersion", + "ScopeAttributes", + "ScopeSchemaUrl", + "ServiceName", + "MetricName", + "MetricDescription", + "MetricUnit", + "Attributes", + "StartTimeUnix", + "TimeUnix", + "Value", + "Flags", + "ExemplarsTraceId", + "ExemplarsSpanId", + "ExemplarsTimestamp", + "ExemplarsValue", + "ExemplarsFilteredAttributes" + ], + "selects": [ + "__ORG__", + "resource_attributes", + "resource_schema_url", + "scope_name", + "scope_version", + "scope_attributes", + "scope_schema_url", + "service_name", + "metric_name", + "metric_description", + "metric_unit", + "metric_attributes", + "start_timestamp", + "timestamp", + "value", + "flags", + "exemplars_trace_id", + "exemplars_span_id", + "exemplars_timestamp", + "exemplars_value", + "exemplars_filtered_attributes" + ], + "inputSchema": "resource_attributes Map(LowCardinality(String), String), resource_schema_url String, scope_name String, scope_version String, scope_attributes Map(LowCardinality(String), String), scope_schema_url String, service_name LowCardinality(String), metric_name LowCardinality(String), metric_description LowCardinality(String), metric_unit LowCardinality(String), metric_attributes Map(LowCardinality(String), String), start_timestamp DateTime64(9), timestamp DateTime64(9), value Float64, flags UInt32, exemplars_trace_id Array(String), exemplars_span_id Array(String), exemplars_timestamp Array(DateTime64(9)), exemplars_value Array(Float64), exemplars_filtered_attributes Array(Map(LowCardinality(String), String))" + }, + "metrics_histogram": { + "table": "metrics_histogram", + "columns": [ + "OrgId", + "ResourceAttributes", + "ResourceSchemaUrl", + "ScopeName", + "ScopeVersion", + "ScopeAttributes", + "ScopeSchemaUrl", + "ServiceName", + "MetricName", + "MetricDescription", + "MetricUnit", + "Attributes", + "StartTimeUnix", + "TimeUnix", + "Count", + "Sum", + "BucketCounts", + "ExplicitBounds", + "ExemplarsTraceId", + "ExemplarsSpanId", + "ExemplarsTimestamp", + "ExemplarsValue", + "ExemplarsFilteredAttributes", + "Flags", + "Min", + "Max", + "AggregationTemporality" + ], + "selects": [ + "__ORG__", + "resource_attributes", + "resource_schema_url", + "scope_name", + "scope_version", + "scope_attributes", + "scope_schema_url", + "service_name", + "metric_name", + "metric_description", + "metric_unit", + "metric_attributes", + "start_timestamp", + "timestamp", + "count", + "sum", + "bucket_counts", + "explicit_bounds", + "exemplars_trace_id", + "exemplars_span_id", + "exemplars_timestamp", + "exemplars_value", + "exemplars_filtered_attributes", + "flags", + "min", + "max", + "aggregation_temporality" + ], + "inputSchema": "resource_attributes Map(LowCardinality(String), String), resource_schema_url String, scope_name String, scope_version String, scope_attributes Map(LowCardinality(String), String), scope_schema_url String, service_name LowCardinality(String), metric_name LowCardinality(String), metric_description LowCardinality(String), metric_unit LowCardinality(String), metric_attributes Map(LowCardinality(String), String), start_timestamp DateTime64(9), timestamp DateTime64(9), count UInt64, sum Float64, bucket_counts Array(UInt64), explicit_bounds Array(Float64), exemplars_trace_id Array(String), exemplars_span_id Array(String), exemplars_timestamp Array(DateTime64(9)), exemplars_value Array(Float64), exemplars_filtered_attributes Array(Map(LowCardinality(String), String)), flags UInt32, min Nullable(Float64), max Nullable(Float64), aggregation_temporality Int32" + }, + "metrics_exponential_histogram": { + "table": "metrics_exponential_histogram", + "columns": [ + "OrgId", + "ResourceAttributes", + "ResourceSchemaUrl", + "ScopeName", + "ScopeVersion", + "ScopeAttributes", + "ScopeSchemaUrl", + "ServiceName", + "MetricName", + "MetricDescription", + "MetricUnit", + "Attributes", + "StartTimeUnix", + "TimeUnix", + "Count", + "Sum", + "Scale", + "ZeroCount", + "PositiveOffset", + "PositiveBucketCounts", + "NegativeOffset", + "NegativeBucketCounts", + "ExemplarsTraceId", + "ExemplarsSpanId", + "ExemplarsTimestamp", + "ExemplarsValue", + "ExemplarsFilteredAttributes", + "Flags", + "Min", + "Max", + "AggregationTemporality" + ], + "selects": [ + "__ORG__", + "resource_attributes", + "resource_schema_url", + "scope_name", + "scope_version", + "scope_attributes", + "scope_schema_url", + "service_name", + "metric_name", + "metric_description", + "metric_unit", + "metric_attributes", + "start_timestamp", + "timestamp", + "count", + "sum", + "scale", + "zero_count", + "positive_offset", + "positive_bucket_counts", + "negative_offset", + "negative_bucket_counts", + "exemplars_trace_id", + "exemplars_span_id", + "exemplars_timestamp", + "exemplars_value", + "exemplars_filtered_attributes", + "flags", + "min", + "max", + "aggregation_temporality" + ], + "inputSchema": "resource_attributes Map(LowCardinality(String), String), resource_schema_url String, scope_name String, scope_version String, scope_attributes Map(LowCardinality(String), String), scope_schema_url String, service_name LowCardinality(String), metric_name LowCardinality(String), metric_description LowCardinality(String), metric_unit LowCardinality(String), metric_attributes Map(LowCardinality(String), String), start_timestamp DateTime64(9), timestamp DateTime64(9), count UInt64, sum Float64, scale Int32, zero_count UInt64, positive_offset Int32, positive_bucket_counts Array(UInt64), negative_offset Int32, negative_bucket_counts Array(UInt64), exemplars_trace_id Array(String), exemplars_span_id Array(String), exemplars_timestamp Array(DateTime64(9)), exemplars_value Array(Float64), exemplars_filtered_attributes Array(Map(LowCardinality(String), String)), flags UInt32, min Nullable(Float64), max Nullable(Float64), aggregation_temporality Int32" + } + } +} diff --git a/apps/ingest/schema/local-schema.sql b/apps/ingest/schema/local-schema.sql new file mode 100644 index 000000000..c69933f7c --- /dev/null +++ b/apps/ingest/schema/local-schema.sql @@ -0,0 +1,1220 @@ +-- This file is generated by scripts/generate-clickhouse-schema-sql.ts +-- Do not edit manually. Run `bun run clickhouse:schema` to regenerate. +-- projectRevision: db2debb5006e36794755071a9e6a62a71b358dfd97b89de0e1da515209d53880 + +CREATE TABLE IF NOT EXISTS alert_checks ( + OrgId LowCardinality(String), + RuleId String, + GroupKey String, + Timestamp DateTime64(3), + Status LowCardinality(String), + SignalType LowCardinality(String), + Comparator LowCardinality(String), + Threshold Float64, + ObservedValue Nullable(Float64), + SampleCount UInt32, + WindowMinutes UInt16, + WindowStart DateTime64(3), + WindowEnd DateTime64(3), + ConsecutiveBreaches UInt16, + ConsecutiveHealthy UInt16, + IncidentId Nullable(String), + IncidentTransition LowCardinality(String), + EvaluationDurationMs UInt32 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, RuleId, GroupKey, Timestamp) +TTL toDate(Timestamp) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS attribute_keys_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + AttributeKey LowCardinality(String), + AttributeScope LowCardinality(String), + UsageCount SimpleAggregateFunction(sum, UInt64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, AttributeScope, Hour, AttributeKey) +TTL Hour + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS attribute_values_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + AttributeKey LowCardinality(String), + AttributeValue String, + AttributeScope LowCardinality(String), + UsageCount SimpleAggregateFunction(sum, UInt64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, AttributeScope, AttributeKey, Hour, AttributeValue) +TTL Hour + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS error_events ( + OrgId LowCardinality(String), + Timestamp DateTime, + TraceId String, + SpanId String, + ParentSpanId String DEFAULT '__unset__', + ServiceName LowCardinality(String), + DeploymentEnv LowCardinality(String), + ExceptionType LowCardinality(String), + ExceptionMessage String, + ExceptionStacktrace String, + TopFrame String, + FingerprintHash UInt64, + StatusMessage String, + Duration UInt64, + ErrorLabel String +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, FingerprintHash, Timestamp) +TTL Timestamp + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS error_spans ( + OrgId LowCardinality(String), + Timestamp DateTime, + TraceId String, + SpanId String, + ParentSpanId String DEFAULT '__unset__', + ServiceName LowCardinality(String), + StatusMessage String, + Duration UInt64, + DeploymentEnv LowCardinality(String) +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, ServiceName, Timestamp) +TTL Timestamp + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS logs ( + OrgId LowCardinality(String), + Timestamp DateTime64(9), + TimestampTime DateTime, + TraceId String, + SpanId String, + TraceFlags UInt8, + SeverityText LowCardinality(String), + SeverityNumber UInt8, + ServiceName LowCardinality(String), + Body String, + ResourceSchemaUrl String, + ResourceAttributes Map(LowCardinality(String), String), + ScopeSchemaUrl String, + ScopeName String, + ScopeVersion String, + ScopeAttributes Map(LowCardinality(String), String), + LogAttributes Map(LowCardinality(String), String) +) +ENGINE = MergeTree +PARTITION BY toDate(TimestampTime) +ORDER BY (OrgId, ServiceName, TimestampTime, Timestamp) +TTL toDate(TimestampTime) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS logs_aggregates_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + ServiceName LowCardinality(String), + SeverityText LowCardinality(String), + DeploymentEnv LowCardinality(String), + Count SimpleAggregateFunction(sum, UInt64), + SizeBytes SimpleAggregateFunction(sum, UInt64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, Hour, ServiceName, SeverityText, DeploymentEnv) +TTL toDate(Hour) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS metric_catalog ( + OrgId LowCardinality(String), + Hour DateTime, + MetricType LowCardinality(String), + ServiceName LowCardinality(String), + MetricName LowCardinality(String), + MetricDescription SimpleAggregateFunction(anyLast, String), + MetricUnit SimpleAggregateFunction(anyLast, String), + IsMonotonic SimpleAggregateFunction(anyLast, UInt8), + DataPointCount SimpleAggregateFunction(sum, UInt64), + FirstSeen SimpleAggregateFunction(min, DateTime), + LastSeen SimpleAggregateFunction(max, DateTime) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, MetricType, ServiceName, MetricName, Hour) +TTL Hour + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS metrics_exponential_histogram ( + OrgId LowCardinality(String), + ResourceAttributes Map(LowCardinality(String), String), + ResourceSchemaUrl String, + ScopeName String, + ScopeVersion String, + ScopeAttributes Map(LowCardinality(String), String), + ScopeSchemaUrl String, + ServiceName LowCardinality(String), + MetricName LowCardinality(String), + MetricDescription LowCardinality(String), + MetricUnit LowCardinality(String), + Attributes Map(LowCardinality(String), String), + StartTimeUnix DateTime64(9), + TimeUnix DateTime64(9), + Count UInt64, + Sum Float64, + Scale Int32, + ZeroCount UInt64, + PositiveOffset Int32, + PositiveBucketCounts Array(UInt64), + NegativeOffset Int32, + NegativeBucketCounts Array(UInt64), + ExemplarsTraceId Array(String), + ExemplarsSpanId Array(String), + ExemplarsTimestamp Array(DateTime64(9)), + ExemplarsValue Array(Float64), + ExemplarsFilteredAttributes Array(Map(LowCardinality(String), String)), + Flags UInt32, + Min Nullable(Float64), + Max Nullable(Float64), + AggregationTemporality Int32 +) +ENGINE = MergeTree +PARTITION BY toDate(TimeUnix) +ORDER BY (OrgId, ServiceName, MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix)) +TTL toDate(TimeUnix) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS metrics_gauge ( + OrgId LowCardinality(String), + ResourceAttributes Map(LowCardinality(String), String), + ResourceSchemaUrl String, + ScopeName String, + ScopeVersion String, + ScopeAttributes Map(LowCardinality(String), String), + ScopeSchemaUrl String, + ServiceName LowCardinality(String), + MetricName LowCardinality(String), + MetricDescription LowCardinality(String), + MetricUnit LowCardinality(String), + Attributes Map(LowCardinality(String), String), + StartTimeUnix DateTime64(9), + TimeUnix DateTime64(9), + Value Float64, + Flags UInt32, + ExemplarsTraceId Array(String), + ExemplarsSpanId Array(String), + ExemplarsTimestamp Array(DateTime64(9)), + ExemplarsValue Array(Float64), + ExemplarsFilteredAttributes Array(Map(LowCardinality(String), String)) +) +ENGINE = MergeTree +PARTITION BY toDate(TimeUnix) +ORDER BY (OrgId, ServiceName, MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix)) +TTL toDate(TimeUnix) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS metrics_histogram ( + OrgId LowCardinality(String), + ResourceAttributes Map(LowCardinality(String), String), + ResourceSchemaUrl String, + ScopeName String, + ScopeVersion String, + ScopeAttributes Map(LowCardinality(String), String), + ScopeSchemaUrl String, + ServiceName LowCardinality(String), + MetricName LowCardinality(String), + MetricDescription LowCardinality(String), + MetricUnit LowCardinality(String), + Attributes Map(LowCardinality(String), String), + StartTimeUnix DateTime64(9), + TimeUnix DateTime64(9), + Count UInt64, + Sum Float64, + BucketCounts Array(UInt64), + ExplicitBounds Array(Float64), + ExemplarsTraceId Array(String), + ExemplarsSpanId Array(String), + ExemplarsTimestamp Array(DateTime64(9)), + ExemplarsValue Array(Float64), + ExemplarsFilteredAttributes Array(Map(LowCardinality(String), String)), + Flags UInt32, + Min Nullable(Float64), + Max Nullable(Float64), + AggregationTemporality Int32 +) +ENGINE = MergeTree +PARTITION BY toDate(TimeUnix) +ORDER BY (OrgId, ServiceName, MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix)) +TTL toDate(TimeUnix) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS metrics_sum ( + OrgId LowCardinality(String), + ResourceAttributes Map(LowCardinality(String), String), + ResourceSchemaUrl String, + ScopeName String, + ScopeVersion String, + ScopeAttributes Map(LowCardinality(String), String), + ScopeSchemaUrl String, + ServiceName LowCardinality(String), + MetricName LowCardinality(String), + MetricDescription LowCardinality(String), + MetricUnit LowCardinality(String), + Attributes Map(LowCardinality(String), String), + StartTimeUnix DateTime64(9), + TimeUnix DateTime64(9), + Value Float64, + Flags UInt32, + ExemplarsTraceId Array(String), + ExemplarsSpanId Array(String), + ExemplarsTimestamp Array(DateTime64(9)), + ExemplarsValue Array(Float64), + ExemplarsFilteredAttributes Array(Map(LowCardinality(String), String)), + AggregationTemporality Int32, + IsMonotonic Bool +) +ENGINE = MergeTree +PARTITION BY toDate(TimeUnix) +ORDER BY (OrgId, ServiceName, MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix)) +TTL toDate(TimeUnix) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS service_address_resolutions_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + SourceService LowCardinality(String), + ParentServerAddress String, + ResolvedTargetService LowCardinality(String), + DeploymentEnv LowCardinality(String) +) +ENGINE = ReplacingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, Hour, DeploymentEnv, SourceService, ParentServerAddress, ResolvedTargetService) +TTL toDate(Hour) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS service_external_edges_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + ServiceName LowCardinality(String), + TargetType LowCardinality(String), + TargetSystem LowCardinality(String), + TargetName String, + DeploymentEnv LowCardinality(String), + CallCount SimpleAggregateFunction(sum, UInt64), + ErrorCount SimpleAggregateFunction(sum, UInt64), + DurationSumMs SimpleAggregateFunction(sum, Float64), + MaxDurationMs SimpleAggregateFunction(max, Float64), + SampleRateSum SimpleAggregateFunction(sum, Float64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, Hour, DeploymentEnv, ServiceName, TargetType, TargetSystem, TargetName) +TTL toDate(Hour) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS service_map_children ( + OrgId LowCardinality(String), + Timestamp DateTime, + TraceId String, + ParentSpanId String, + ServiceName LowCardinality(String), + SpanKind LowCardinality(String), + Duration UInt64, + StatusCode LowCardinality(String), + TraceState String, + DeploymentEnv LowCardinality(String) +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, TraceId, ParentSpanId, Timestamp) +TTL Timestamp + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS service_map_db_edges_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + ServiceName LowCardinality(String), + DbSystem LowCardinality(String), + DeploymentEnv LowCardinality(String), + CallCount SimpleAggregateFunction(sum, UInt64), + ErrorCount SimpleAggregateFunction(sum, UInt64), + DurationSumMs SimpleAggregateFunction(sum, Float64), + MaxDurationMs SimpleAggregateFunction(max, Float64), + SampledSpanCount SimpleAggregateFunction(sum, UInt64), + UnsampledSpanCount SimpleAggregateFunction(sum, UInt64), + SampleRateSum SimpleAggregateFunction(sum, Float64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, Hour, DeploymentEnv, ServiceName, DbSystem) +TTL toDate(Hour) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS service_map_edges_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + SourceService LowCardinality(String), + TargetService String, + DeploymentEnv LowCardinality(String), + CallCount SimpleAggregateFunction(sum, UInt64), + ErrorCount SimpleAggregateFunction(sum, UInt64), + DurationSumMs SimpleAggregateFunction(sum, Float64), + MaxDurationMs SimpleAggregateFunction(max, Float64), + SampledSpanCount SimpleAggregateFunction(sum, UInt64), + UnsampledSpanCount SimpleAggregateFunction(sum, UInt64), + SampleRateSum SimpleAggregateFunction(sum, Float64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, Hour, DeploymentEnv, SourceService, TargetService) +TTL toDate(Hour) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS service_map_spans ( + OrgId LowCardinality(String), + Timestamp DateTime, + TraceId String, + SpanId String, + ParentSpanId String, + ServiceName LowCardinality(String), + SpanKind LowCardinality(String), + Duration UInt64, + StatusCode LowCardinality(String), + TraceState String, + DeploymentEnv LowCardinality(String) +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, TraceId, SpanId, Timestamp) +TTL Timestamp + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS service_overview_spans ( + OrgId LowCardinality(String), + Timestamp DateTime, + ServiceName LowCardinality(String), + Duration UInt64, + StatusCode LowCardinality(String), + TraceState String, + DeploymentEnv LowCardinality(String), + CommitSha LowCardinality(String), + SampleRate Float64 DEFAULT 1 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, ServiceName, Timestamp) +TTL Timestamp + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS service_platforms_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + ServiceName LowCardinality(String), + DeploymentEnv LowCardinality(String), + K8sCluster SimpleAggregateFunction(max, String), + K8sPodName SimpleAggregateFunction(max, String), + K8sDeploymentName SimpleAggregateFunction(max, String), + CloudPlatform SimpleAggregateFunction(max, String), + CloudProvider SimpleAggregateFunction(max, String), + FaasName SimpleAggregateFunction(max, String), + MapleSdkType SimpleAggregateFunction(max, String), + ProcessRuntimeName SimpleAggregateFunction(max, String), + SpanCount SimpleAggregateFunction(sum, UInt64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, Hour, ServiceName, DeploymentEnv) +TTL toDate(Hour) + INTERVAL 90 DAY; + +CREATE TABLE IF NOT EXISTS service_usage ( + OrgId LowCardinality(String), + ServiceName LowCardinality(String), + Hour DateTime, + LogCount UInt64, + LogSizeBytes UInt64, + TraceCount UInt64, + TraceSizeBytes UInt64, + SumMetricCount UInt64, + SumMetricSizeBytes UInt64, + GaugeMetricCount UInt64, + GaugeMetricSizeBytes UInt64, + HistogramMetricCount UInt64, + HistogramMetricSizeBytes UInt64, + ExpHistogramMetricCount UInt64, + ExpHistogramMetricSizeBytes UInt64 +) +ENGINE = SummingMergeTree +ORDER BY (OrgId, ServiceName, Hour) +TTL Hour + INTERVAL 365 DAY; + +CREATE TABLE IF NOT EXISTS session_events ( + OrgId LowCardinality(String), + SessionId String, + Timestamp DateTime64(9), + Seq UInt32 DEFAULT 0, + Type LowCardinality(String), + Url String DEFAULT '', + TraceId String DEFAULT '', + Level LowCardinality(String) DEFAULT '', + Message String DEFAULT '', + TargetSelector String DEFAULT '', + TargetText String DEFAULT '', + NetMethod LowCardinality(String) DEFAULT '', + NetUrl String DEFAULT '', + NetStatus UInt16 DEFAULT 0, + NetDurationMs UInt32 DEFAULT 0, + ErrorStack String DEFAULT '', + Attributes Map(LowCardinality(String), String) +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, SessionId, Timestamp, Seq) +TTL toDate(Timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS session_replay_events ( + OrgId LowCardinality(String), + SessionId String, + ChunkSeq UInt32, + Timestamp DateTime64(9), + DurationMs UInt32 DEFAULT 0, + EventCount UInt32 DEFAULT 0, + ByteSize UInt32 DEFAULT 0, + Events String, + IsCheckpoint UInt8 DEFAULT 0 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, SessionId, ChunkSeq) +TTL toDate(Timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS session_replays ( + OrgId LowCardinality(String), + SessionId String, + StartTime DateTime64(9), + EndTime Nullable(DateTime64(9)), + DurationMs Nullable(UInt32), + Status LowCardinality(String), + UserId String, + UrlInitial String, + UserAgent String, + BrowserName LowCardinality(String), + OsName LowCardinality(String), + DeviceType LowCardinality(String), + Country LowCardinality(String) DEFAULT '', + ServiceName LowCardinality(String), + PageViews UInt32 DEFAULT 0, + ClickCount UInt32 DEFAULT 0, + ErrorCount UInt32 DEFAULT 0, + TraceIds Array(String) DEFAULT [], + ResourceAttributes Map(LowCardinality(String), String), + Version UInt32 +) +ENGINE = ReplacingMergeTree +PARTITION BY toDate(StartTime) +ORDER BY (OrgId, SessionId) +TTL toDate(StartTime) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS trace_detail_spans ( + OrgId LowCardinality(String), + Timestamp DateTime64(9), + TraceId String, + SpanId String, + ParentSpanId String, + SpanName LowCardinality(String), + SpanKind LowCardinality(String), + ServiceName LowCardinality(String), + Duration UInt64 DEFAULT 0, + StatusCode LowCardinality(String), + StatusMessage String, + SpanAttributes Map(LowCardinality(String), String), + ResourceAttributes Map(LowCardinality(String), String), + EventsTimestamp Array(DateTime64(9)), + EventsName Array(LowCardinality(String)), + EventsAttributes Array(Map(LowCardinality(String), String)) +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, TraceId, SpanId) +TTL toDate(Timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS trace_list_mv ( + OrgId LowCardinality(String), + TraceId String, + Timestamp DateTime, + ServiceName LowCardinality(String), + SpanName String, + SpanKind LowCardinality(String), + Duration UInt64, + StatusCode LowCardinality(String), + HttpMethod LowCardinality(String), + HttpRoute String, + HttpStatusCode LowCardinality(String), + DeploymentEnv LowCardinality(String), + HasError UInt8, + TraceState String +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, Timestamp, TraceId) +TTL Timestamp + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS traces ( + OrgId LowCardinality(String), + Timestamp DateTime64(9), + TraceId String, + SpanId String, + ParentSpanId String, + TraceState String, + SpanName LowCardinality(String), + SpanKind LowCardinality(String), + ServiceName LowCardinality(String), + ResourceSchemaUrl String, + ResourceAttributes Map(LowCardinality(String), String), + ScopeSchemaUrl String, + ScopeName String, + ScopeVersion String, + ScopeAttributes Map(LowCardinality(String), String), + Duration UInt64 DEFAULT 0, + StatusCode LowCardinality(String), + StatusMessage String, + SpanAttributes Map(LowCardinality(String), String), + EventsTimestamp Array(DateTime64(9)), + EventsName Array(LowCardinality(String)), + EventsAttributes Array(Map(LowCardinality(String), String)), + LinksTraceId Array(String), + LinksSpanId Array(String), + LinksTraceState Array(String), + LinksAttributes Array(Map(LowCardinality(String), String)), + SampleRate Float64 DEFAULT multiIf(SpanAttributes['SampleRate'] != '' AND toFloat64OrZero(SpanAttributes['SampleRate']) >= 1.0, toFloat64OrZero(SpanAttributes['SampleRate']), match(TraceState, 'th:[0-9a-f]+'), 1.0 / greatest(1.0 - reinterpretAsUInt64(reverse(unhex(rightPad(extract(TraceState, 'th:([0-9a-f]+)'), 16, '0')))) / pow(2.0, 64), 0.0001), 1.0), + IsEntryPoint UInt8 DEFAULT if(SpanKind IN ('Server', 'Consumer') OR ParentSpanId = '', 1, 0), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_keys mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_vals mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_resource_attr_keys mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_resource_attr_vals mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (OrgId, ServiceName, SpanName, toDateTime(Timestamp)) +TTL toDate(Timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS traces_aggregates_hourly ( + OrgId LowCardinality(String), + Hour DateTime, + ServiceName LowCardinality(String), + SpanName LowCardinality(String), + SpanKind LowCardinality(String), + StatusCode LowCardinality(String), + IsEntryPoint UInt8, + DeploymentEnv LowCardinality(String), + WeightedCount SimpleAggregateFunction(sum, Float64), + WeightedDurationSum SimpleAggregateFunction(sum, Float64), + WeightedErrorCount SimpleAggregateFunction(sum, Float64), + DurationQuantiles AggregateFunction(quantilesTDigestWeighted(0.5, 0.95, 0.99), UInt64, UInt32), + DurationMin SimpleAggregateFunction(min, UInt64), + DurationMax SimpleAggregateFunction(max, UInt64) +) +ENGINE = AggregatingMergeTree +PARTITION BY toDate(Hour) +ORDER BY (OrgId, Hour, ServiceName, SpanName, SpanKind, StatusCode, IsEntryPoint, DeploymentEnv) +TTL toDate(Hour) + INTERVAL 90 DAY; + +CREATE MATERIALIZED VIEW IF NOT EXISTS error_events_mv TO error_events AS +WITH + arrayFirstIndex(n -> n = 'exception', EventsName) AS _ei, + if(_ei > 0, EventsAttributes[_ei]['exception.type'], '') AS _exType, + if(_ei > 0, EventsAttributes[_ei]['exception.message'], StatusMessage) AS _exMsg, + if(_ei > 0, EventsAttributes[_ei]['exception.stacktrace'], '') AS _exStack, + arraySlice( + arrayFilter( + line -> match(line, ':[0-9]+|line [0-9]+'), + splitByChar('\n', _exStack) + ), + 1, 3 + ) AS _rawFrames, + arrayMap( + line -> replaceRegexpAll(line, ':[0-9]+|line [0-9]+|0x[0-9a-fA-F]+', ''), + _rawFrames + ) AS _topFrames, + if(length(_topFrames) > 0, _topFrames[1], '') AS _topFrame, + arrayStringConcat(_topFrames, '\n') AS _fpFrames, + -- JSON detection (only consulted when _fpFrames = '') + isValidJSON(StatusMessage) AS _isJson, + _isJson AND JSONType(StatusMessage) = 'Object' AS _isJsonObj, + -- General, KEY-NAME-AGNOSTIC canonical signature: iterate ALL top-level + -- keys, redact volatile tokens (long hex / numbers) in each raw value, then + -- sort by "key=value" so key order & whitespace don't matter. No assumption + -- about which keys exist — works for any producer's JSON shape. (Nested + -- objects are hashed as their raw substring; only top-level is canonicalized.) + arrayStringConcat( + arraySort( + arrayMap( + kv -> concat(kv.1, '=', replaceRegexpAll(kv.2, '[0-9a-fA-F]{8,}|[0-9]+', '#')), + JSONExtractKeysAndValuesRaw(StatusMessage) + ) + ), + '|' + ) AS _jsonSig, + -- Fold into the existing fallback hash slot. Non-JSON path is unchanged. + multiIf( + _fpFrames != '', '', + _isJsonObj, _jsonSig, + replaceRegexpAll(substring(StatusMessage, 1, 200), '[0-9a-fA-F]{8,}|[0-9]+', '#') + ) AS _msgFallback, + -- Display-only, best-effort human label (decoupled from the fingerprint: + -- many labels may map to one hash). The broad key list here is a DISPLAY + -- heuristic only; the fingerprint above makes no key-name assumption. + multiIf( + JSONExtractString(StatusMessage, 'title') != '', JSONExtractString(StatusMessage, 'title'), + JSONExtractString(StatusMessage, 'message') != '', JSONExtractString(StatusMessage, 'message'), + JSONExtractString(StatusMessage, 'error') != '', JSONExtractString(StatusMessage, 'error'), + JSONExtractString(StatusMessage, '_tag') != '', JSONExtractString(StatusMessage, '_tag'), + JSONExtractString(StatusMessage, 'reason') != '', JSONExtractString(StatusMessage, 'reason'), + JSONExtractString(StatusMessage, 'name') != '', JSONExtractString(StatusMessage, 'name'), + JSONExtractString(StatusMessage, 'type') != '', extract(JSONExtractString(StatusMessage, 'type'), '([^/]+)$'), + 'JSON error' + ) AS _jsonLabel, + multiIf( + StatusMessage = '', 'Unknown Error', + position(StatusMessage, '{ readonly') = 1 OR position(StatusMessage, '└─') > 0, + if( + extract(StatusMessage, 'readonly (\\w+)') != '', + concat('Schema parse error: ', extract(StatusMessage, 'readonly (\\w+)')), + 'Schema parse error' + ), + _isJsonObj OR position(StatusMessage, '[') = 1, _jsonLabel, + left(StatusMessage, multiIf( + position(StatusMessage, ': ') > 3, toInt64(position(StatusMessage, ': ')) - 1, + position(StatusMessage, ' (') > 3, toInt64(position(StatusMessage, ' (')) - 1, + position(StatusMessage, '\n') > 3, toInt64(position(StatusMessage, '\n')) - 1, + least(toInt64(length(StatusMessage)), 150) + )) + ) AS _statusLabel, + if(_exType != '', _exType, _statusLabel) AS _errorLabel + SELECT + OrgId, + toDateTime(Timestamp) AS Timestamp, + TraceId, + SpanId, + ParentSpanId, + ServiceName, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + _exType AS ExceptionType, + _exMsg AS ExceptionMessage, + _exStack AS ExceptionStacktrace, + _topFrame AS TopFrame, + cityHash64(OrgId, ServiceName, _exType, _fpFrames, _msgFallback) AS FingerprintHash, + StatusMessage, + Duration, + _errorLabel AS ErrorLabel + FROM traces + WHERE StatusCode = 'Error'; + +CREATE MATERIALIZED VIEW IF NOT EXISTS error_spans_mv TO error_spans AS +SELECT + OrgId, + toDateTime(Timestamp) AS Timestamp, + TraceId, + SpanId, + ParentSpanId, + ServiceName, + StatusMessage, + Duration, + ResourceAttributes['deployment.environment'] AS DeploymentEnv + FROM traces + WHERE StatusCode = 'Error'; + +CREATE MATERIALIZED VIEW IF NOT EXISTS log_attribute_keys_mv TO attribute_keys_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + arrayJoin(mapKeys(LogAttributes)) AS AttributeKey, + 'log' AS AttributeScope, + count() AS UsageCount + FROM logs + WHERE LogAttributes != map() + GROUP BY OrgId, Hour, AttributeKey, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS log_attribute_values_mv TO attribute_values_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + AttributeKey, + AttributeValue, + 'log' AS AttributeScope, + count() AS UsageCount + FROM logs + ARRAY JOIN + mapKeys(LogAttributes) AS AttributeKey, + mapValues(LogAttributes) AS AttributeValue + WHERE AttributeValue != '' + GROUP BY OrgId, Hour, AttributeKey, AttributeValue, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS logs_aggregates_hourly_mv TO logs_aggregates_hourly AS +SELECT + OrgId, + toStartOfHour(TimestampTime) AS Hour, + ServiceName, + SeverityText, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + count() AS Count, + sum(length(Body) + 200) AS SizeBytes + FROM logs + GROUP BY OrgId, Hour, ServiceName, SeverityText, DeploymentEnv; + +CREATE MATERIALIZED VIEW IF NOT EXISTS metric_attribute_keys_mv TO attribute_keys_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + arrayJoin(mapKeys(Attributes)) AS AttributeKey, + 'metric' AS AttributeScope, + count() AS UsageCount + FROM metrics_sum + WHERE Attributes != map() + GROUP BY OrgId, Hour, AttributeKey, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS metric_attribute_values_mv TO attribute_values_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + AttributeKey, + AttributeValue, + 'metric' AS AttributeScope, + count() AS UsageCount + FROM metrics_sum + ARRAY JOIN + mapKeys(Attributes) AS AttributeKey, + mapValues(Attributes) AS AttributeValue + WHERE AttributeValue != '' + GROUP BY OrgId, Hour, AttributeKey, AttributeValue, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS metric_catalog_exp_histogram_mv TO metric_catalog AS +SELECT + OrgId, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 'exponential_histogram' AS MetricType, + ServiceName, + MetricName, + anyLast(MetricDescription) AS MetricDescription, + anyLast(MetricUnit) AS MetricUnit, + toUInt8(0) AS IsMonotonic, + count() AS DataPointCount, + min(toDateTime(TimeUnix)) AS FirstSeen, + max(toDateTime(TimeUnix)) AS LastSeen + FROM metrics_exponential_histogram + GROUP BY OrgId, Hour, MetricType, ServiceName, MetricName; + +CREATE MATERIALIZED VIEW IF NOT EXISTS metric_catalog_gauge_mv TO metric_catalog AS +SELECT + OrgId, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 'gauge' AS MetricType, + ServiceName, + MetricName, + anyLast(MetricDescription) AS MetricDescription, + anyLast(MetricUnit) AS MetricUnit, + toUInt8(0) AS IsMonotonic, + count() AS DataPointCount, + min(toDateTime(TimeUnix)) AS FirstSeen, + max(toDateTime(TimeUnix)) AS LastSeen + FROM metrics_gauge + GROUP BY OrgId, Hour, MetricType, ServiceName, MetricName; + +CREATE MATERIALIZED VIEW IF NOT EXISTS metric_catalog_histogram_mv TO metric_catalog AS +SELECT + OrgId, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 'histogram' AS MetricType, + ServiceName, + MetricName, + anyLast(MetricDescription) AS MetricDescription, + anyLast(MetricUnit) AS MetricUnit, + toUInt8(0) AS IsMonotonic, + count() AS DataPointCount, + min(toDateTime(TimeUnix)) AS FirstSeen, + max(toDateTime(TimeUnix)) AS LastSeen + FROM metrics_histogram + GROUP BY OrgId, Hour, MetricType, ServiceName, MetricName; + +CREATE MATERIALIZED VIEW IF NOT EXISTS metric_catalog_sum_mv TO metric_catalog AS +SELECT + OrgId, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 'sum' AS MetricType, + ServiceName, + MetricName, + anyLast(MetricDescription) AS MetricDescription, + anyLast(MetricUnit) AS MetricUnit, + anyLast(toUInt8(IsMonotonic)) AS IsMonotonic, + count() AS DataPointCount, + min(toDateTime(TimeUnix)) AS FirstSeen, + max(toDateTime(TimeUnix)) AS LastSeen + FROM metrics_sum + GROUP BY OrgId, Hour, MetricType, ServiceName, MetricName; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_external_edges_hourly_mv TO service_external_edges_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + ServiceName, + multiIf( + SpanAttributes['messaging.destination'] != '' OR SpanAttributes['messaging.system'] != '', 'messaging', + SpanAttributes['rpc.service'] != '' OR SpanAttributes['rpc.system'] != '', 'rpc', + 'http' + ) AS TargetType, + multiIf( + SpanAttributes['messaging.destination'] != '' OR SpanAttributes['messaging.system'] != '', SpanAttributes['messaging.system'], + SpanAttributes['rpc.service'] != '' OR SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'], + '' + ) AS TargetSystem, + multiIf( + SpanAttributes['messaging.destination'] != '' OR SpanAttributes['messaging.system'] != '', + if(SpanAttributes['messaging.destination'] != '', SpanAttributes['messaging.destination'], SpanAttributes['messaging.system']), + SpanAttributes['rpc.service'] != '' OR SpanAttributes['rpc.system'] != '', + if(SpanAttributes['rpc.service'] != '', SpanAttributes['rpc.service'], SpanAttributes['rpc.system']), + if(SpanAttributes['server.address'] != '', + SpanAttributes['server.address'], + if(SpanAttributes['http.host'] != '', + SpanAttributes['http.host'], + SpanAttributes['url.authority'])) + ) AS TargetName, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + count() AS CallCount, + countIf(StatusCode = 'Error') AS ErrorCount, + sum(Duration / 1000000) AS DurationSumMs, + max(Duration / 1000000) AS MaxDurationMs, + sum(SampleRate) AS SampleRateSum + FROM traces + WHERE SpanKind IN ('Client', 'Producer') + AND SpanAttributes['db.system.name'] = '' + AND ServiceName != '' + AND ( + SpanAttributes['server.address'] != '' + OR SpanAttributes['http.host'] != '' + OR SpanAttributes['url.authority'] != '' + OR SpanAttributes['messaging.destination'] != '' + OR SpanAttributes['messaging.system'] != '' + OR SpanAttributes['rpc.service'] != '' + OR SpanAttributes['rpc.system'] != '' + ) + GROUP BY OrgId, Hour, ServiceName, TargetType, TargetSystem, TargetName, DeploymentEnv + HAVING TargetName != ''; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_map_children_mv TO service_map_children AS +SELECT + OrgId, + toDateTime(Timestamp) AS Timestamp, + TraceId, + ParentSpanId, + ServiceName, + SpanKind, + Duration, + StatusCode, + TraceState, + ResourceAttributes['deployment.environment'] AS DeploymentEnv + FROM traces + WHERE SpanKind IN ('Server', 'Consumer') + AND ParentSpanId != ''; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_map_db_edges_hourly_mv TO service_map_db_edges_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + ServiceName, + SpanAttributes['db.system.name'] AS DbSystem, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + count() AS CallCount, + countIf(StatusCode = 'Error') AS ErrorCount, + sum(Duration / 1000000) AS DurationSumMs, + max(Duration / 1000000) AS MaxDurationMs, + countIf(TraceState LIKE '%th:%') AS SampledSpanCount, + countIf(TraceState = '' OR TraceState NOT LIKE '%th:%') AS UnsampledSpanCount, + sum(SampleRate) AS SampleRateSum + FROM traces + WHERE SpanKind IN ('Client', 'Producer') + AND SpanAttributes['db.system.name'] != '' + AND ServiceName != '' + GROUP BY OrgId, Hour, ServiceName, DbSystem, DeploymentEnv; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_map_spans_mv TO service_map_spans AS +SELECT + OrgId, + toDateTime(Timestamp) AS Timestamp, + TraceId, + SpanId, + ParentSpanId, + ServiceName, + SpanKind, + Duration, + StatusCode, + TraceState, + ResourceAttributes['deployment.environment'] AS DeploymentEnv + FROM traces + WHERE SpanKind IN ('Client', 'Producer', 'Server', 'Consumer'); + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_overview_spans_mv TO service_overview_spans AS +SELECT + OrgId, + toDateTime(Timestamp) AS Timestamp, + ServiceName, + Duration, + StatusCode, + TraceState, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + ResourceAttributes['deployment.commit_sha'] AS CommitSha, + SampleRate + FROM traces + WHERE SpanKind IN ('Server', 'Consumer') OR ParentSpanId = ''; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_platforms_hourly_mv TO service_platforms_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + ServiceName, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + max(ResourceAttributes['k8s.cluster.name']) AS K8sCluster, + max(ResourceAttributes['k8s.pod.name']) AS K8sPodName, + max(ResourceAttributes['k8s.deployment.name']) AS K8sDeploymentName, + max(ResourceAttributes['cloud.platform']) AS CloudPlatform, + max(ResourceAttributes['cloud.provider']) AS CloudProvider, + max(ResourceAttributes['faas.name']) AS FaasName, + max(ResourceAttributes['maple.sdk.type']) AS MapleSdkType, + max(ResourceAttributes['process.runtime.name']) AS ProcessRuntimeName, + count() AS SpanCount + FROM traces + WHERE ServiceName != '' + GROUP BY OrgId, Hour, ServiceName, DeploymentEnv; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_usage_logs_mv TO service_usage AS +SELECT + OrgId, + ServiceName, + toStartOfHour(TimestampTime) AS Hour, + count() AS LogCount, + sum(length(Body) + 200) AS LogSizeBytes, + 0 AS TraceCount, + 0 AS TraceSizeBytes, + 0 AS SumMetricCount, + 0 AS SumMetricSizeBytes, + 0 AS GaugeMetricCount, + 0 AS GaugeMetricSizeBytes, + 0 AS HistogramMetricCount, + 0 AS HistogramMetricSizeBytes, + 0 AS ExpHistogramMetricCount, + 0 AS ExpHistogramMetricSizeBytes + FROM logs + GROUP BY OrgId, ServiceName, Hour; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_usage_metrics_exp_histogram_mv TO service_usage AS +SELECT + OrgId, + ServiceName, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 0 AS LogCount, + 0 AS LogSizeBytes, + 0 AS TraceCount, + 0 AS TraceSizeBytes, + 0 AS SumMetricCount, + 0 AS SumMetricSizeBytes, + 0 AS GaugeMetricCount, + 0 AS GaugeMetricSizeBytes, + 0 AS HistogramMetricCount, + 0 AS HistogramMetricSizeBytes, + count() AS ExpHistogramMetricCount, + count() * 300 AS ExpHistogramMetricSizeBytes + FROM metrics_exponential_histogram + GROUP BY OrgId, ServiceName, Hour; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_usage_metrics_gauge_mv TO service_usage AS +SELECT + OrgId, + ServiceName, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 0 AS LogCount, + 0 AS LogSizeBytes, + 0 AS TraceCount, + 0 AS TraceSizeBytes, + 0 AS SumMetricCount, + 0 AS SumMetricSizeBytes, + count() AS GaugeMetricCount, + count() * 150 AS GaugeMetricSizeBytes, + 0 AS HistogramMetricCount, + 0 AS HistogramMetricSizeBytes, + 0 AS ExpHistogramMetricCount, + 0 AS ExpHistogramMetricSizeBytes + FROM metrics_gauge + GROUP BY OrgId, ServiceName, Hour; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_usage_metrics_histogram_mv TO service_usage AS +SELECT + OrgId, + ServiceName, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 0 AS LogCount, + 0 AS LogSizeBytes, + 0 AS TraceCount, + 0 AS TraceSizeBytes, + 0 AS SumMetricCount, + 0 AS SumMetricSizeBytes, + 0 AS GaugeMetricCount, + 0 AS GaugeMetricSizeBytes, + count() AS HistogramMetricCount, + count() * 250 AS HistogramMetricSizeBytes, + 0 AS ExpHistogramMetricCount, + 0 AS ExpHistogramMetricSizeBytes + FROM metrics_histogram + GROUP BY OrgId, ServiceName, Hour; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_usage_metrics_sum_mv TO service_usage AS +SELECT + OrgId, + ServiceName, + toStartOfHour(toDateTime(TimeUnix)) AS Hour, + 0 AS LogCount, + 0 AS LogSizeBytes, + 0 AS TraceCount, + 0 AS TraceSizeBytes, + count() AS SumMetricCount, + count() * 150 AS SumMetricSizeBytes, + 0 AS GaugeMetricCount, + 0 AS GaugeMetricSizeBytes, + 0 AS HistogramMetricCount, + 0 AS HistogramMetricSizeBytes, + 0 AS ExpHistogramMetricCount, + 0 AS ExpHistogramMetricSizeBytes + FROM metrics_sum + GROUP BY OrgId, ServiceName, Hour; + +CREATE MATERIALIZED VIEW IF NOT EXISTS service_usage_traces_mv TO service_usage AS +SELECT + OrgId, + ServiceName, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + 0 AS LogCount, + 0 AS LogSizeBytes, + count() AS TraceCount, + sum(length(SpanName) + 300) AS TraceSizeBytes, + 0 AS SumMetricCount, + 0 AS SumMetricSizeBytes, + 0 AS GaugeMetricCount, + 0 AS GaugeMetricSizeBytes, + 0 AS HistogramMetricCount, + 0 AS HistogramMetricSizeBytes, + 0 AS ExpHistogramMetricCount, + 0 AS ExpHistogramMetricSizeBytes + FROM traces + GROUP BY OrgId, ServiceName, Hour; + +CREATE MATERIALIZED VIEW IF NOT EXISTS trace_detail_spans_mv TO trace_detail_spans AS +SELECT + OrgId, + Timestamp, + TraceId, + SpanId, + ParentSpanId, + SpanName, + SpanKind, + ServiceName, + Duration, + StatusCode, + StatusMessage, + SpanAttributes, + ResourceAttributes, + EventsTimestamp, + EventsName, + EventsAttributes + FROM traces; + +CREATE MATERIALIZED VIEW IF NOT EXISTS trace_list_mv_mv TO trace_list_mv AS +SELECT + OrgId, + TraceId, + toDateTime(Timestamp) AS Timestamp, + ServiceName, + if( + (SpanName LIKE 'http.server %' OR SpanName IN ('GET','POST','PUT','PATCH','DELETE','HEAD','OPTIONS')) + AND (SpanAttributes['http.route'] != '' OR SpanAttributes['url.path'] != ''), + concat( + if(SpanName LIKE 'http.server %', replaceOne(SpanName, 'http.server ', ''), SpanName), + ' ', + if(SpanAttributes['http.route'] != '', SpanAttributes['http.route'], SpanAttributes['url.path']) + ), + SpanName + ) AS SpanName, + SpanKind, + Duration, + StatusCode, + if(SpanAttributes['http.method'] != '', SpanAttributes['http.method'], SpanAttributes['http.request.method']) AS HttpMethod, + if(SpanAttributes['http.route'] != '', SpanAttributes['http.route'], if(SpanAttributes['url.path'] != '', SpanAttributes['url.path'], SpanAttributes['http.target'])) AS HttpRoute, + if(SpanAttributes['http.status_code'] != '', SpanAttributes['http.status_code'], SpanAttributes['http.response.status_code']) AS HttpStatusCode, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + toUInt8( + StatusCode = 'Error' + OR (SpanAttributes['http.status_code'] != '' AND toUInt16OrZero(SpanAttributes['http.status_code']) >= 500) + OR (SpanAttributes['http.response.status_code'] != '' AND toUInt16OrZero(SpanAttributes['http.response.status_code']) >= 500) + ) AS HasError, + TraceState + FROM traces + WHERE ParentSpanId = ''; + +CREATE MATERIALIZED VIEW IF NOT EXISTS trace_resource_attribute_keys_mv TO attribute_keys_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + arrayJoin(mapKeys(ResourceAttributes)) AS AttributeKey, + 'resource' AS AttributeScope, + count() AS UsageCount + FROM traces + WHERE ResourceAttributes != map() + GROUP BY OrgId, Hour, AttributeKey, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS trace_resource_attribute_values_mv TO attribute_values_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + AttributeKey, + AttributeValue, + 'resource' AS AttributeScope, + count() AS UsageCount + FROM traces + ARRAY JOIN + mapKeys(ResourceAttributes) AS AttributeKey, + mapValues(ResourceAttributes) AS AttributeValue + WHERE AttributeValue != '' + GROUP BY OrgId, Hour, AttributeKey, AttributeValue, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS trace_span_attribute_keys_mv TO attribute_keys_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + arrayJoin(mapKeys(SpanAttributes)) AS AttributeKey, + 'span' AS AttributeScope, + count() AS UsageCount + FROM traces + WHERE SpanAttributes != map() + GROUP BY OrgId, Hour, AttributeKey, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS trace_span_attribute_values_mv TO attribute_values_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + AttributeKey, + AttributeValue, + 'span' AS AttributeScope, + count() AS UsageCount + FROM traces + ARRAY JOIN + mapKeys(SpanAttributes) AS AttributeKey, + mapValues(SpanAttributes) AS AttributeValue + WHERE AttributeValue != '' + GROUP BY OrgId, Hour, AttributeKey, AttributeValue, AttributeScope; + +CREATE MATERIALIZED VIEW IF NOT EXISTS traces_aggregates_hourly_mv TO traces_aggregates_hourly AS +SELECT + OrgId, + toStartOfHour(toDateTime(Timestamp)) AS Hour, + ServiceName, + SpanName, + SpanKind, + StatusCode, + IsEntryPoint, + ResourceAttributes['deployment.environment'] AS DeploymentEnv, + sum(SampleRate) AS WeightedCount, + sum(toFloat64(Duration) * SampleRate) AS WeightedDurationSum, + sumIf(SampleRate, StatusCode = 'Error') AS WeightedErrorCount, + quantilesTDigestWeightedState(0.5, 0.95, 0.99)(Duration, toUInt32(SampleRate)) AS DurationQuantiles, + min(Duration) AS DurationMin, + max(Duration) AS DurationMax + FROM traces + GROUP BY OrgId, Hour, ServiceName, SpanName, SpanKind, StatusCode, IsEntryPoint, DeploymentEnv; diff --git a/apps/ingest/src/bin/local.rs b/apps/ingest/src/bin/local.rs new file mode 100644 index 000000000..4d18c86a2 --- /dev/null +++ b/apps/ingest/src/bin/local.rs @@ -0,0 +1,290 @@ +//! `maple` — the standalone local binary. +//! +//! A single process that owns an embedded chDB (in-process ClickHouse) and +//! serves three things on one port: +//! - OTLP/HTTP ingest (`POST /v1/{traces,logs,metrics}`) +//! - a raw SQL query API for the bundled UI (`POST /local/query`) +//! - the bundled SPA (everything else, with client-side-routing fallback) +//! +//! It reuses the production ingest's OTLP→NDJSON encoders (`maple_ingest:: +//! telemetry::encode_local_*`) so local rows are shaped identically to cloud +//! rows, then writes them to chDB via the embedded schema + insert mappings. +//! Single-tenant: every row is pinned to `OrgId = "local"`. + +use std::io::Read; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; + +use axum::extract::{DefaultBodyLimit, State}; +use axum::http::header::{CONTENT_ENCODING, CONTENT_TYPE}; +use axum::http::{HeaderMap, Method, StatusCode, Uri}; +use axum::response::{IntoResponse, Response}; +use axum::routing::{get, post}; +use axum::{Json, Router}; +use clap::{Args, Parser, Subcommand}; +use flate2::read::GzDecoder; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use prost::Message; +use rust_embed::RustEmbed; +use serde::Deserialize; +use serde_json::json; +use tower_http::cors::{Any, CorsLayer}; + +use maple_ingest::chdb::Chdb; +use maple_ingest::telemetry::{self, LocalBatch}; + +/// Pinned single-tenant org id. Must match the org the UI passes to every +/// `CH.compile(...)` and the placeholder the insert mappings substitute. +const ORG_ID: &str = "local"; + +/// SPA assets baked into the binary at compile time. `apps/local-ui` builds +/// here in a later phase; until then this holds a placeholder page. +#[derive(RustEmbed)] +#[folder = "ui-dist/"] +struct UiAssets; + +struct LocalState { + chdb: Chdb, +} + +#[derive(Parser)] +#[command(name = "maple", version, about = "Local Maple: OTLP ingest + embedded ClickHouse + UI")] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Start the local ingest + query server. + Start(StartArgs), +} + +#[derive(Args)] +struct StartArgs { + /// Port for OTLP/HTTP ingest, the query API, and the bundled UI. + #[arg(long, default_value_t = 4318)] + port: u16, + /// Directory for the embedded ClickHouse data (default: ~/.maple/data). + #[arg(long)] + data_dir: Option, +} + +#[tokio::main] +async fn main() { + match Cli::parse().command { + Commands::Start(args) => start(args).await, + } +} + +async fn start(args: StartArgs) { + let data_dir = args.data_dir.unwrap_or_else(default_data_dir); + if let Err(error) = std::fs::create_dir_all(&data_dir) { + eprintln!("Failed to create data dir {}: {error}", data_dir.display()); + std::process::exit(1); + } + + eprintln!("Opening chDB at {} (bootstrapping schema)...", data_dir.display()); + let chdb = match Chdb::start(data_dir, ORG_ID) { + Ok(chdb) => chdb, + Err(error) => { + eprintln!("Failed to start chDB: {error}"); + std::process::exit(1); + } + }; + + let state = Arc::new(LocalState { chdb }); + + let cors = CorsLayer::new() + .allow_origin(Any) + .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) + .allow_headers([CONTENT_TYPE, CONTENT_ENCODING]); + + let app = Router::new() + .route("/health", get(health)) + .route("/v1/traces", post(handle_traces)) + .route("/v1/logs", post(handle_logs)) + .route("/v1/metrics", post(handle_metrics)) + .route("/local/query", post(handle_query)) + .fallback(serve_ui) + .layer(cors) + .layer(DefaultBodyLimit::max(64 * 1024 * 1024)) + .with_state(state); + + let addr = SocketAddr::from(([127, 0, 0, 1], args.port)); + let listener = match tokio::net::TcpListener::bind(addr).await { + Ok(listener) => listener, + Err(error) => { + eprintln!("Failed to bind {addr}: {error}"); + std::process::exit(1); + } + }; + + println!("maple listening on http://{addr}"); + println!(" OTLP/HTTP: POST /v1/{{traces,logs,metrics}}"); + println!(" query API: POST /local/query {{ \"sql\": \"...\" }}"); + println!(" UI: http://{addr}/"); + + if let Err(error) = axum::serve(listener, app).await { + eprintln!("Server error: {error}"); + std::process::exit(1); + } +} + +fn default_data_dir() -> PathBuf { + match std::env::var_os("HOME") { + Some(home) => PathBuf::from(home).join(".maple").join("data"), + None => PathBuf::from(".maple").join("data"), + } +} + +async fn health() -> &'static str { + "OK" +} + +// --- OTLP ingest ----------------------------------------------------------- + +async fn handle_traces(State(state): State>, headers: HeaderMap, body: axum::body::Bytes) -> Response { + let raw = match decompress(&headers, &body) { + Ok(raw) => raw, + Err(response) => return response, + }; + let request: ExportTraceServiceRequest = match decode(&headers, &raw) { + Ok(request) => request, + Err(response) => return response, + }; + match telemetry::encode_local_traces(ORG_ID, &request) { + Ok(batches) => ingest_batches(&state.chdb, batches).await, + Err(error) => server_error(format!("encode traces: {error}")), + } +} + +async fn handle_logs(State(state): State>, headers: HeaderMap, body: axum::body::Bytes) -> Response { + let raw = match decompress(&headers, &body) { + Ok(raw) => raw, + Err(response) => return response, + }; + let request: ExportLogsServiceRequest = match decode(&headers, &raw) { + Ok(request) => request, + Err(response) => return response, + }; + match telemetry::encode_local_logs(ORG_ID, &request) { + Ok(batches) => ingest_batches(&state.chdb, batches).await, + Err(error) => server_error(format!("encode logs: {error}")), + } +} + +async fn handle_metrics(State(state): State>, headers: HeaderMap, body: axum::body::Bytes) -> Response { + let raw = match decompress(&headers, &body) { + Ok(raw) => raw, + Err(response) => return response, + }; + let request: ExportMetricsServiceRequest = match decode(&headers, &raw) { + Ok(request) => request, + Err(response) => return response, + }; + match telemetry::encode_local_metrics(ORG_ID, &request) { + Ok(batches) => ingest_batches(&state.chdb, batches).await, + Err(error) => server_error(format!("encode metrics: {error}")), + } +} + +async fn ingest_batches(chdb: &Chdb, batches: Vec) -> Response { + let mut accepted = 0usize; + for batch in batches { + accepted += batch.row_count; + if let Err(error) = chdb.insert(batch.datasource, batch.payload).await { + return server_error(format!("chDB insert: {error}")); + } + } + (StatusCode::OK, Json(json!({ "accepted": accepted }))).into_response() +} + +/// gzip is the only content-encoding OTLP/HTTP exporters use; anything else is +/// rejected so a mislabeled body can't be silently fed to the decoder. +fn decompress(headers: &HeaderMap, body: &[u8]) -> Result, Response> { + match headers.get(CONTENT_ENCODING).and_then(|value| value.to_str().ok()) { + None => Ok(body.to_vec()), + Some("gzip") => { + let mut decoder = GzDecoder::new(body); + let mut out = Vec::new(); + decoder + .read_to_end(&mut out) + .map_err(|_| bad_request("invalid gzip body"))?; + Ok(out) + } + Some(other) => Err(bad_request(format!("unsupported content-encoding: {other}"))), + } +} + +/// Decode an OTLP request as protobuf (default) or JSON (when the content-type +/// says so). The proto types derive serde via the `with-serde` feature. +fn decode(headers: &HeaderMap, raw: &[u8]) -> Result +where + T: Message + Default + for<'de> Deserialize<'de>, +{ + let is_json = headers + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(|content_type| content_type.contains("json")) + .unwrap_or(false); + + if is_json { + serde_json::from_slice::(raw).map_err(|_| bad_request("invalid OTLP JSON payload")) + } else { + T::decode(raw).map_err(|_| bad_request("invalid OTLP protobuf payload")) + } +} + +// --- Query API ------------------------------------------------------------- + +#[derive(Deserialize)] +struct QueryBody { + sql: String, +} + +async fn handle_query(State(state): State>, Json(body): Json) -> Response { + match state.chdb.query(body.sql).await { + Ok(bytes) => { + // chDB returns JSONEachRow (one JSON object per line). Wrap the + // lines into a JSON array without re-parsing each row. + let text = String::from_utf8_lossy(&bytes); + let rows: Vec<&str> = text.lines().map(str::trim).filter(|line| !line.is_empty()).collect(); + let array = format!("[{}]", rows.join(",")); + ([(CONTENT_TYPE, "application/json")], array).into_response() + } + Err(error) => server_error(format!("query failed: {error}")), + } +} + +// --- Bundled SPA ----------------------------------------------------------- + +async fn serve_ui(uri: Uri) -> Response { + let path = uri.path().trim_start_matches('/'); + let path = if path.is_empty() { "index.html" } else { path }; + + if let Some(asset) = UiAssets::get(path) { + let mime = mime_guess::from_path(path).first_or_octet_stream(); + return ([(CONTENT_TYPE, mime.as_ref())], asset.data.into_owned()).into_response(); + } + + // Unknown path with no file extension → client-side route; serve the SPA + // shell so the router can take over. + match UiAssets::get("index.html") { + Some(asset) => ([(CONTENT_TYPE, "text/html")], asset.data.into_owned()).into_response(), + None => (StatusCode::NOT_FOUND, "UI not built").into_response(), + } +} + +// --- Errors ---------------------------------------------------------------- + +fn bad_request(message: impl Into) -> Response { + (StatusCode::BAD_REQUEST, message.into()).into_response() +} + +fn server_error(message: impl Into) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, message.into()).into_response() +} diff --git a/apps/ingest/src/chdb.rs b/apps/ingest/src/chdb.rs new file mode 100644 index 000000000..a0ca82ccc --- /dev/null +++ b/apps/ingest/src/chdb.rs @@ -0,0 +1,206 @@ +//! Embedded chDB (in-process ClickHouse) for the standalone `maple` binary. +//! +//! chDB is embedded — exactly one OS thread may own the data directory — so all +//! access (bootstrap, inserts, queries) is funneled through a single dedicated +//! writer thread. Async callers hand work to it over a channel and await a +//! oneshot reply. +//! +//! The schema and the OTLP→column insert mappings are embedded at compile time +//! from the artifacts the TS codegen produces (`clickhouse:schema`), so the +//! local path uses the exact same ClickHouse schema as the cloud deployment. + +use std::collections::HashMap; +use std::path::PathBuf; + +use chdb_rust::arg::Arg; +use chdb_rust::format::OutputFormat; +use chdb_rust::session::{Session, SessionBuilder}; +use serde::Deserialize; +use tokio::sync::{mpsc, oneshot}; + +/// Full DDL (base tables + materialized views), `IF NOT EXISTS`, generated from +/// `latestSnapshotStatements`. Applied once at startup via `Arg::MultiQuery`. +const SCHEMA_SQL: &str = include_str!("../schema/local-schema.sql"); + +/// OTLP-datasource → column/select/input-schema mappings, generated from the +/// Tinybird manifest so the snake_case NDJSON the encoders emit maps to the +/// PascalCase table columns with zero divergence. +const INSERT_MAPPINGS_JSON: &str = include_str!("../schema/local-inserts.json"); + +#[derive(Deserialize)] +struct InsertMappingsFile { + #[serde(rename = "orgPlaceholder")] + org_placeholder: String, + datasources: HashMap, +} + +#[derive(Deserialize)] +struct DatasourceMapping { + table: String, + columns: Vec, + selects: Vec, + #[serde(rename = "inputSchema")] + input_schema: String, +} + +/// Per-datasource INSERT template, split around the NDJSON string literal: +/// `INSERT INTO () SELECT FROM format(JSONEachRow, '', '')`. +struct InsertTemplate { + prefix: String, + suffix: String, +} + +struct Templates { + by_datasource: HashMap, +} + +impl Templates { + fn build(org_literal: &str) -> Result { + let file: InsertMappingsFile = serde_json::from_str(INSERT_MAPPINGS_JSON) + .map_err(|error| format!("parse local-inserts.json: {error}"))?; + let org_escaped = escape_sql_literal(org_literal); + + let mut by_datasource = HashMap::with_capacity(file.datasources.len()); + for (name, mapping) in file.datasources { + // Pin OrgId to the local tenant; every other select references a + // column produced by the format() table function. + let selects: Vec = mapping + .selects + .iter() + .map(|select| { + if select == &file.org_placeholder { + format!("'{org_escaped}'") + } else { + select.clone() + } + }) + .collect(); + + let prefix = format!( + "INSERT INTO {table} ({columns}) SELECT {selects} FROM format(JSONEachRow, '{schema}', '", + table = mapping.table, + columns = mapping.columns.join(", "), + selects = selects.join(", "), + schema = mapping.input_schema, + ); + by_datasource.insert(name, InsertTemplate { prefix, suffix: "')".to_string() }); + } + + Ok(Self { by_datasource }) + } +} + +enum Command { + Insert { datasource: String, ndjson: Vec, reply: oneshot::Sender> }, + Query { sql: String, reply: oneshot::Sender, String>> }, +} + +/// Handle to the single chDB writer thread. Cheap to clone (shares the channel). +#[derive(Clone)] +pub struct Chdb { + tx: mpsc::UnboundedSender, +} + +impl Chdb { + /// Open (or reopen) the chDB data directory, bootstrap the schema, and spawn + /// the writer thread. Blocks until bootstrap finishes so a failure surfaces + /// before the server starts accepting traffic. `org_literal` is the pinned + /// single-tenant OrgId (e.g. `"local"`). + pub fn start(data_dir: PathBuf, org_literal: &str) -> Result { + let templates = Templates::build(org_literal)?; + let (tx, mut rx) = mpsc::unbounded_channel::(); + let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); + + std::thread::Builder::new() + .name("chdb-writer".to_string()) + .spawn(move || { + let session = match SessionBuilder::new().with_data_path(data_dir).build() { + Ok(session) => session, + Err(error) => { + let _ = ready_tx.send(Err(format!("open chDB session: {error}"))); + return; + } + }; + + // Idempotent (`IF NOT EXISTS`); MultiQuery so the ClickHouse + // parser splits statements correctly (a naive `;` split breaks + // on semicolons inside `--` comments). + if let Err(error) = session.execute(SCHEMA_SQL, Some(&[Arg::MultiQuery])) { + let _ = ready_tx.send(Err(format!("bootstrap chDB schema: {error}"))); + return; + } + let _ = ready_tx.send(Ok(())); + + while let Some(command) = rx.blocking_recv() { + match command { + Command::Insert { datasource, ndjson, reply } => { + let _ = reply.send(insert_rows(&session, &templates, &datasource, &ndjson)); + } + Command::Query { sql, reply } => { + let _ = reply.send(run_query(&session, &sql)); + } + } + } + }) + .map_err(|error| format!("spawn chDB writer thread: {error}"))?; + + ready_rx + .recv() + .map_err(|_| "chDB writer thread exited during init".to_string())??; + + Ok(Self { tx }) + } + + /// Insert one datasource's NDJSON batch. `datasource` must be a key present + /// in the embedded insert mappings (e.g. `"traces"`, `"logs"`). + pub async fn insert(&self, datasource: String, ndjson: Vec) -> Result<(), String> { + let (reply, rx) = oneshot::channel(); + self.tx + .send(Command::Insert { datasource, ndjson, reply }) + .map_err(|_| "chDB writer thread is gone".to_string())?; + rx.await.map_err(|_| "chDB writer dropped insert reply".to_string())? + } + + /// Run a read query and return its rows as JSONEachRow bytes (one JSON + /// object per line; empty when there are no rows). + pub async fn query(&self, sql: String) -> Result, String> { + let (reply, rx) = oneshot::channel(); + self.tx + .send(Command::Query { sql, reply }) + .map_err(|_| "chDB writer thread is gone".to_string())?; + rx.await.map_err(|_| "chDB writer dropped query reply".to_string())? + } +} + +fn insert_rows( + session: &Session, + templates: &Templates, + datasource: &str, + ndjson: &[u8], +) -> Result<(), String> { + let template = templates + .by_datasource + .get(datasource) + .ok_or_else(|| format!("no insert mapping for datasource '{datasource}'"))?; + + let data = String::from_utf8_lossy(ndjson); + let escaped = data.replace('\\', "\\\\").replace('\'', "\\'"); + let sql = format!("{}{}{}", template.prefix, escaped, template.suffix); + + session + .execute(&sql, None) + .map_err(|error| format!("chDB insert into '{datasource}': {error}"))?; + Ok(()) +} + +fn run_query(session: &Session, sql: &str) -> Result, String> { + let result = session + .execute(sql, Some(&[Arg::OutputFormat(OutputFormat::JSONEachRow)])) + .map_err(|error| format!("chDB query: {error}"))?; + Ok(result.data_utf8_lossy().as_bytes().to_vec()) +} + +/// Escape a value for a single-quoted ClickHouse SQL string literal. +fn escape_sql_literal(value: &str) -> String { + value.replace('\\', "\\\\").replace('\'', "\\'") +} diff --git a/apps/ingest/src/lib.rs b/apps/ingest/src/lib.rs index 870ac9466..05bb58178 100644 --- a/apps/ingest/src/lib.rs +++ b/apps/ingest/src/lib.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "local")] +pub mod chdb; pub mod metrics; pub mod otel; pub mod telemetry; diff --git a/apps/ingest/src/telemetry.rs b/apps/ingest/src/telemetry.rs index c6f1fd0b5..03c1344a5 100644 --- a/apps/ingest/src/telemetry.rs +++ b/apps/ingest/src/telemetry.rs @@ -977,6 +977,90 @@ fn gzip(body: Vec) -> Result, String> { .map_err(|error| format!("finish gzip Tinybird body: {error}")) } +// --- Local (chDB) encode path ---------------------------------------------- +// +// The standalone `maple` binary writes to an embedded chDB instead of the +// Tinybird WAL/export pipeline. It reuses the exact same OTLP→NDJSON encoders +// below so the two write paths can never diverge in row shape. These helpers +// expose a datasource-tagged batch decoupled from `EncodedFrame`/the pipeline. + +/// One datasource's NDJSON rows, ready to INSERT into the matching chDB table. +pub struct LocalBatch { + pub datasource: String, + pub row_count: usize, + pub payload: Vec, +} + +/// Standard datasource names with inert WAL/export settings. The encoders only +/// read the `datasource_*` fields; every other field is unused on the encode +/// path, so the pipeline-only values here are placeholders. +fn local_encode_config() -> TinybirdConfig { + TinybirdConfig { + endpoint: String::new(), + token: String::new(), + queue_dir: PathBuf::new(), + queue_max_bytes: 1, + org_queue_max_bytes: 1, + queue_channel_capacity: 1, + wal_shards: 1, + batch_max_rows: 1, + batch_max_bytes: 1, + batch_max_wait: Duration::from_secs(1), + export_concurrency_per_shard: 1, + export_max_attempts: 1, + datasource_traces: "traces".to_string(), + datasource_logs: "logs".to_string(), + datasource_metrics_sum: "metrics_sum".to_string(), + datasource_metrics_gauge: "metrics_gauge".to_string(), + datasource_metrics_histogram: "metrics_histogram".to_string(), + datasource_metrics_exponential_histogram: "metrics_exponential_histogram".to_string(), + datasource_session_replays: "session_replays".to_string(), + datasource_session_replay_events: "session_replay_events".to_string(), + datasource_session_events: "session_events".to_string(), + } +} + +fn frames_to_batches(frames: Vec) -> Vec { + frames + .into_iter() + .filter(|frame| frame.row_count > 0) + .map(|frame| LocalBatch { + datasource: frame.datasource, + row_count: frame.row_count, + payload: frame.payload, + }) + .collect() +} + +/// Encode OTLP traces to per-datasource NDJSON batches for the local write +/// path. Sampling is disabled (keep everything) since local volume is small. +pub fn encode_local_traces( + org_id: &str, + request: &ExportTraceServiceRequest, +) -> Result, PipelineError> { + let (frames, _) = + encode_traces(&local_encode_config(), org_id, request, &SamplingPolicy::default(), &[])?; + Ok(frames_to_batches(frames)) +} + +/// Encode OTLP logs to NDJSON batches for the local write path. +pub fn encode_local_logs( + org_id: &str, + request: &ExportLogsServiceRequest, +) -> Result, PipelineError> { + let (frames, _) = encode_logs(&local_encode_config(), org_id, request)?; + Ok(frames_to_batches(frames)) +} + +/// Encode OTLP metrics to per-type NDJSON batches for the local write path. +pub fn encode_local_metrics( + org_id: &str, + request: &ExportMetricsServiceRequest, +) -> Result, PipelineError> { + let (frames, _) = encode_metrics(&local_encode_config(), org_id, request)?; + Ok(frames_to_batches(frames)) +} + fn encode_traces( cfg: &TinybirdConfig, org_id: &str, diff --git a/apps/ingest/ui-dist/index.html b/apps/ingest/ui-dist/index.html new file mode 100644 index 000000000..45576f872 --- /dev/null +++ b/apps/ingest/ui-dist/index.html @@ -0,0 +1,25 @@ + + + + + + Maple (local) + + +
+

Maple is running

+

+ The bundled UI has not been built into this binary yet. The local + ingest endpoint and query API are live: +

+
    +
  • POST /v1/traces, /v1/logs, /v1/metrics — OTLP/HTTP
  • +
  • POST /local/query{ "sql": "SELECT ..." }
  • +
+

+ Build the SPA (apps/local-ui) and copy its dist/ + into apps/ingest/ui-dist/, then rebuild to embed it. +

+
+ + diff --git a/package.json b/package.json index 04e967d1e..3401ff223 100644 --- a/package.json +++ b/package.json @@ -22,8 +22,8 @@ "test": "turbo test", "tinybird:manifest": "bun run ./scripts/generate-tinybird-project-manifest.ts", "tinybird:manifest:check": "bun run ./scripts/generate-tinybird-project-manifest.ts --check", - "clickhouse:schema": "bun run ./scripts/generate-clickhouse-schema.ts && bun run ./scripts/generate-clickhouse-schema-sql.ts && bun run ./scripts/lint-clickhouse-schema.ts", - "clickhouse:schema:check": "bun run ./scripts/generate-clickhouse-schema.ts --check && bun run ./scripts/generate-clickhouse-schema-sql.ts --check && bun run ./scripts/lint-clickhouse-schema.ts", + "clickhouse:schema": "bun run ./scripts/generate-clickhouse-schema.ts && bun run ./scripts/generate-clickhouse-schema-sql.ts && bun run ./scripts/generate-clickhouse-insert-mappings.ts && bun run ./scripts/lint-clickhouse-schema.ts", + "clickhouse:schema:check": "bun run ./scripts/generate-clickhouse-schema.ts --check && bun run ./scripts/generate-clickhouse-schema-sql.ts --check && bun run ./scripts/generate-clickhouse-insert-mappings.ts --check && bun run ./scripts/lint-clickhouse-schema.ts", "clickhouse:schema:lint": "bun run ./scripts/lint-clickhouse-schema.ts", "knip": "knip", "knip:fix": "knip --fix --allow-remove-files", diff --git a/packages/domain/src/generated/clickhouse-schema.ts b/packages/domain/src/generated/clickhouse-schema.ts index c3c7e1a3f..bf29e3199 100644 --- a/packages/domain/src/generated/clickhouse-schema.ts +++ b/packages/domain/src/generated/clickhouse-schema.ts @@ -1,7 +1,7 @@ // This file is generated by scripts/generate-clickhouse-schema.ts // Do not edit manually. -export const projectRevision = "ba338845154c51a4e679f88ad63c08095bd4e8e24be0cd69b3dee54d5a1838f3" as const +export const projectRevision = "db2debb5006e36794755071a9e6a62a71b358dfd97b89de0e1da515209d53880" as const export const latestSnapshotStatements: ReadonlyArray = [ "CREATE TABLE IF NOT EXISTS alert_checks (\n OrgId LowCardinality(String),\n RuleId String,\n GroupKey String,\n Timestamp DateTime64(3),\n Status LowCardinality(String),\n SignalType LowCardinality(String),\n Comparator LowCardinality(String),\n Threshold Float64,\n ObservedValue Nullable(Float64),\n SampleCount UInt32,\n WindowMinutes UInt16,\n WindowStart DateTime64(3),\n WindowEnd DateTime64(3),\n ConsecutiveBreaches UInt16,\n ConsecutiveHealthy UInt16,\n IncidentId Nullable(String),\n IncidentTransition LowCardinality(String),\n EvaluationDurationMs UInt32\n)\nENGINE = MergeTree\nPARTITION BY toDate(Timestamp)\nORDER BY (OrgId, RuleId, GroupKey, Timestamp)\nTTL toDate(Timestamp) + INTERVAL 90 DAY", diff --git a/scripts/generate-clickhouse-insert-mappings.ts b/scripts/generate-clickhouse-insert-mappings.ts new file mode 100644 index 000000000..ff7f5bcee --- /dev/null +++ b/scripts/generate-clickhouse-insert-mappings.ts @@ -0,0 +1,124 @@ +import { mkdirSync, readFileSync, writeFileSync } from "node:fs" +import { dirname } from "node:path" +import { fileURLToPath } from "node:url" +import { emitJsonPathSpec } from "../packages/domain/src/clickhouse/ddl-emitter" +import { buildTinybirdProjectManifest } from "../packages/domain/src/tinybird/project-manifest" + +// Datasources the local OTLP ingest path actually writes. session_* (replay +// ingest) and alert_checks (alerting engine) are written by subsystems the +// lightweight local binary does not run, so they are out of scope here. +const OTLP_DATASOURCES = [ + "traces", + "logs", + "metrics_sum", + "metrics_gauge", + "metrics_histogram", + "metrics_exponential_histogram", +] as const + +// Replaced by the Rust binary with the pinned, escaped org-id string literal. +const ORG_PLACEHOLDER = "__ORG__" + +const outputPath = fileURLToPath( + new URL("../apps/ingest/schema/local-inserts.json", import.meta.url), +) +const checkMode = process.argv.includes("--check") + +interface DatasourceMapping { + readonly table: string + readonly columns: ReadonlyArray + readonly selects: ReadonlyArray + readonly inputSchema: string +} + +const manifest = await buildTinybirdProjectManifest() +const byName = new Map(manifest.datasources.map((ds) => [ds.name, ds])) + +const datasources: Record = {} +for (const name of OTLP_DATASOURCES) { + const ds = byName.get(name) + if (!ds) { + throw new Error(`OTLP datasource "${name}" not found in Tinybird manifest`) + } + datasources[name] = buildMapping(name, emitJsonPathSpec(ds)) +} + +const rendered = `${JSON.stringify( + { projectRevision: manifest.projectRevision, orgPlaceholder: ORG_PLACEHOLDER, datasources }, + null, + 2, +)}\n` + +let existing = "" +try { + existing = readFileSync(outputPath, "utf8") +} catch { + existing = "" +} + +if (checkMode) { + if (existing !== rendered) { + console.error("local-inserts.json is out of date. Run `bun run clickhouse:schema`.") + process.exit(1) + } + console.log( + `local-inserts.json is up to date (${manifest.projectRevision}, ${OTLP_DATASOURCES.length} datasources).`, + ) +} else { + mkdirSync(dirname(outputPath), { recursive: true }) + writeFileSync(outputPath, rendered) + console.log( + `Wrote local-inserts.json (${manifest.projectRevision}, ${OTLP_DATASOURCES.length} datasources) to ${outputPath}.`, + ) +} + +function buildMapping( + table: string, + spec: ReadonlyArray<{ column: string; type: string; jsonPath: string | null }>, +): DatasourceMapping { + const columns: string[] = [] + const selects: string[] = [] + const inputFields: string[] = [] + // Two columns can map to the same JSON leaf (e.g. logs `Timestamp` and + // `TimestampTime` both read `$.timestamp`). The `format()` structure must + // declare each leaf once — but the SELECT may reference it for several + // target columns. Track the first type seen per leaf and skip duplicates in + // the input schema only. + const seenLeaves = new Set() + + for (const { column, type, jsonPath } of spec) { + if (column === "OrgId") { + // Single-tenant local mode pins OrgId; never extracted from JSON. + columns.push(column) + selects.push(ORG_PLACEHOLDER) + continue + } + if (jsonPath === null || jsonPath === `$.${column}`) { + // No JSON path, or a PascalCase-identity path (a computed DEFAULT/ + // MATERIALIZED column the gateway never emits, e.g. SampleRate, + // IsEntryPoint). Omit so the table's DEFAULT expression computes it. + continue + } + const leaf = jsonLeaf(table, column, jsonPath) + columns.push(column) + selects.push(leaf) + if (!seenLeaves.has(leaf)) { + seenLeaves.add(leaf) + inputFields.push(`${leaf} ${type}`) + } + } + + return { table, columns, selects, inputSchema: inputFields.join(", ") } +} + +function jsonLeaf(table: string, column: string, jsonPath: string): string { + // Accept `$.field` and `$.field[:]` (array). Anything else is a nested or + // expression path we don't auto-map for local ingest. + const match = /^\$\.([A-Za-z_][A-Za-z0-9_]*)(\[:\])?$/.exec(jsonPath) + if (!match) { + throw new Error( + `Unsupported jsonPath "${jsonPath}" for ${table}.${column}; generator only handles top-level $.field and $.field[:] paths.`, + ) + } + return match[1] as string +} diff --git a/scripts/generate-clickhouse-schema-sql.ts b/scripts/generate-clickhouse-schema-sql.ts new file mode 100644 index 000000000..5f741ce03 --- /dev/null +++ b/scripts/generate-clickhouse-schema-sql.ts @@ -0,0 +1,51 @@ +import { mkdirSync, readFileSync, writeFileSync } from "node:fs" +import { dirname } from "node:path" +import { fileURLToPath } from "node:url" +import { + latestSnapshotStatements, + projectRevision, +} from "../packages/domain/src/generated/clickhouse-schema" + +const outputPath = fileURLToPath( + new URL("../apps/ingest/schema/local-schema.sql", import.meta.url), +) +const checkMode = process.argv.includes("--check") + +const rendered = render(latestSnapshotStatements, projectRevision) +let existing = "" + +try { + existing = readFileSync(outputPath, "utf8") +} catch { + existing = "" +} + +if (checkMode) { + if (existing !== rendered) { + console.error( + "local-schema.sql is out of date. Run `bun run clickhouse:schema`.", + ) + process.exit(1) + } + console.log( + `local-schema.sql is up to date (${projectRevision}, ${latestSnapshotStatements.length} statements).`, + ) +} else { + mkdirSync(dirname(outputPath), { recursive: true }) + writeFileSync(outputPath, rendered) + console.log( + `Wrote local-schema.sql (${projectRevision}, ${latestSnapshotStatements.length} statements) to ${outputPath}.`, + ) +} + +function render(stmts: ReadonlyArray, revision: string): string { + const header = [ + "-- This file is generated by scripts/generate-clickhouse-schema-sql.ts", + "-- Do not edit manually. Run `bun run clickhouse:schema` to regenerate.", + `-- projectRevision: ${revision}`, + "", + "", + ].join("\n") + const body = stmts.map((s) => `${s};`).join("\n\n") + return `${header}${body}\n` +} From 1bfbc89ed44680d6b5785f954ae8be90e02070b8 Mon Sep 17 00:00:00 2001 From: Makisuo Date: Fri, 29 May 2026 16:09:27 +0200 Subject: [PATCH 2/2] fix(web): pass children into AutocompleteValuesProvider props MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-existing regression on `main` (introduced by ac723de1e "feat: fix some react stuff", which rewrote this provider to use createElement). main's CI is red on the same error; this branch inherits it via rebase onto main. AutocompleteValuesProvider's `children` prop is required, so React 19's createElement overload requires it in the props object — passing it as the variadic 3rd arg leaves the required prop unsatisfied (TS2769). Move children into the props object. Functionally identical; unblocks @maple/web typecheck. Co-Authored-By: Claude Opus 4.8 --- .../config/widget-builder-provider.tsx | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/apps/web/src/components/dashboard-builder/config/widget-builder-provider.tsx b/apps/web/src/components/dashboard-builder/config/widget-builder-provider.tsx index 97f2e39a0..184cb016d 100644 --- a/apps/web/src/components/dashboard-builder/config/widget-builder-provider.tsx +++ b/apps/web/src/components/dashboard-builder/config/widget-builder-provider.tsx @@ -30,15 +30,12 @@ export function WidgetBuilderProvider({ createElement( WidgetBuilderPreview.Provider, { value: initialState as never }, - createElement( - AutocompleteValuesProvider, - { - startTime: resolvedTime?.startTime, - endTime: resolvedTime?.endTime, - lazy: true, - }, + createElement(AutocompleteValuesProvider, { + startTime: resolvedTime?.startTime, + endTime: resolvedTime?.endTime, + lazy: true, children, - ), + }), ), ), )