Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
656 changes: 656 additions & 0 deletions Examples/dashboard-template.yaml

Large diffs are not rendered by default.

394 changes: 392 additions & 2 deletions core/common/src/buffer_type.rs

Large diffs are not rendered by default.

148 changes: 146 additions & 2 deletions core/common/src/otel_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
//! extracted from the eBPF struct, allowing downstream collectors to group
//! telemetry by process.

use crate::buffer_type::{NetworkMetrics, TimeStampMetrics};
use crate::buffer_type::{
CpuFrequency, CpuIdle, MemAlloc, NetworkMetrics, SchedStatRuntime, SchedStatWait,
TimeStampMetrics,
};
use opentelemetry::KeyValue;
use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter};
pub struct Metrics {
Expand All @@ -35,6 +38,27 @@ pub struct Metrics {
/// Histogram of `ts_us` values seen in both `net_metrics` and
/// `time_stamp_events`.
pub ts_us: Histogram<u64>,

/// Cpu bytes alloc total events
pub cpu_bytes_alloc_events_total: Counter<u64>,

/// Cpu bytes allocation
pub cpu_bytes_alloc: Gauge<i64>,

/// Total number of memory allocation (mmap) events processed.
pub mem_alloc_events_total: Counter<u64>,

/// Observed bytes requested via mmap syscalls.
pub enter_mem_alloc: Gauge<i64>,

/// Observed scheduler wait time in nanoseconds (sched_stat_wait).
pub sched_stat_wait: Gauge<i64>,

/// Observed scheduler runtime in nanoseconds (sched_stat_runtime).
pub sched_stat_runtime: Gauge<i64>,

/// Current CPU idle C-state per cpu_id, updated only on state change.
pub cpu_idle_state: Gauge<i64>,
}

impl Metrics {
Expand Down Expand Up @@ -66,7 +90,7 @@ impl Metrics {

// delta microseconds
let delta_us = meter
.u64_histogram("cortexbrain_delta_us")
.u64_histogram("delta_us")
.with_description("Distribution of delta_us values from timestamp events")
.build();

Expand All @@ -76,13 +100,62 @@ impl Metrics {
.with_description("Distribution of timestamp values from eBPF events")
.build();

// cpu bytes alloc total events
let cpu_bytes_alloc_events_total = meter
.u64_counter("bytes_alloc_events_total")
.with_description("Total bytes_alloc events occuring in the CPU")
.build();

// cpu bytes allocation
let cpu_bytes_alloc = meter
.i64_gauge("cpu_bytes_alloc")
.with_description("Cpu bytes allocation per event")
.build();

// memory allocation (mmap) events total
let mem_alloc_events_total = meter
.u64_counter("mem_alloc_events_total")
.with_description("Total number of memory allocation (mmap) events processed")
.build();

// bytes requested via mmap syscalls
let enter_mem_alloc = meter
.i64_gauge("enter_mem_alloc")
.with_description("Bytes requested via mmap syscalls")
.build();

// scheduler wait time in nanoseconds
let sched_stat_wait = meter
.i64_gauge("sched_stat_wait")
.with_description("Scheduler wait time in nanoseconds from sched_stat_wait")
.build();

// scheduler runtime in nanoseconds
let sched_stat_runtime = meter
.i64_gauge("sched_stat_runtime")
.with_description("Scheduler runtime in nanoseconds from sched_stat_runtime")
.build();

// current CPU idle C-state per cpu_id
let cpu_idle_state = meter
.i64_gauge("cpu_idle_state")
.with_description("Current CPU idle C-state per cpu_id, updated only on state change")
.build();

Self {
events_total,
packets_total,
sk_drops,
sk_err,
delta_us,
ts_us,
cpu_bytes_alloc,
cpu_bytes_alloc_events_total,
mem_alloc_events_total,
enter_mem_alloc,
sched_stat_wait,
sched_stat_runtime,
cpu_idle_state,
}
}

Expand Down Expand Up @@ -130,4 +203,75 @@ impl Metrics {
self.delta_us.record(m.delta_us, attrs);
self.ts_us.record(m.ts_us, attrs);
}

pub fn record_cpu_bytes_alloc(&self, m: &CpuFrequency) {
let bytes_allocated = m.bytes_alloc;
let tgid = m.pid; // percpu tracepoints expose TGID in common_pid
let comm = String::from_utf8_lossy(&m.command);
let command = comm.trim_end_matches('\0').to_string();
let attrs = &[
KeyValue::new("tgid", tgid as i64),
KeyValue::new("command", command),
];
self.cpu_bytes_alloc_events_total.add(1, attrs);
self.cpu_bytes_alloc.record(bytes_allocated as i64, attrs);
}

/// Record a single [`MemAlloc`] event (mmap syscall).
///
/// Increments the dedicated `mem_alloc_events_total` counter and records
/// the requested length in the `enter_mem_alloc` gauge. The shared
/// `events_total` counter is intentionally **not** incremented for these
/// events.
pub fn record_enter_mem_alloc(&self, m: &MemAlloc) {
let comm = String::from_utf8_lossy(&m.command);
let command = comm.trim_end_matches('\0').to_string();
let attrs = &[
KeyValue::new("tgid", m.tgid as i64),
KeyValue::new("command", command),
];

self.mem_alloc_events_total.add(1, attrs);
self.enter_mem_alloc.record(m.length as i64, attrs);
}

/// Record a single [`SchedStatWait`] event.
///
/// Records `delay` in the `sched_stat_wait` gauge. No shared or dedicated
/// counter is incremented, as requested.
pub fn record_sched_stat_wait(&self, m: &SchedStatWait) {
let comm = String::from_utf8_lossy(&m.command);
let command = comm.trim_end_matches('\0').to_string();
let attrs = &[
KeyValue::new("tgid", m.tgid as i64),
KeyValue::new("command", command),
];

self.sched_stat_wait.record(m.delay as i64, attrs);
}

/// Record a single [`SchedStatRuntime`] event.
///
/// Records `runtime` in the `sched_stat_runtime` gauge. No shared or
/// dedicated counter is incremented, as requested.
pub fn record_sched_stat_runtime(&self, m: &SchedStatRuntime) {
let comm = String::from_utf8_lossy(&m.command);
let command = comm.trim_end_matches('\0').to_string();
let attrs = &[
KeyValue::new("tgid", m.tgid as i64),
KeyValue::new("command", command),
];

self.sched_stat_runtime.record(m.runtime as i64, attrs);
}

/// Record a single [`CpuIdle`] event.
///
/// Updates `cpu_idle_state` gauge to the latest C-state for the given
/// `cpu_id`. Events are only emitted by eBPF when the state changes.
pub fn record_cpu_idle(&self, m: &CpuIdle) {
let attrs = &[KeyValue::new("cpu_id", m.cpu_id as i64)];

self.cpu_idle_state.record(m.state as i64, attrs);
}
}
52 changes: 51 additions & 1 deletion core/common/src/program_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use aya::{Ebpf, programs::KProbe};
use aya::{
Ebpf,
programs::{KProbe, TracePoint},
};
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use tracing::{error, info};
Expand Down Expand Up @@ -48,3 +51,50 @@ pub fn load_program(

Ok(())
}

#[cfg(feature = "program-handlers")]
pub fn load_tracepoint_program(
bpf: Arc<Mutex<Ebpf>>,
program_name: &str,
tracepoint_type: &str,
tracepoint_symbol: &str,
) -> Result<(), anyhow::Error> {
let mut bpf_new = bpf
.lock()
.map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?;

// Load and attach the eBPF program
let program: &mut TracePoint = bpf_new
.program_mut(program_name)
.ok_or_else(|| anyhow::anyhow!("Program {} not found", program_name))?
.try_into()
.map_err(|e| anyhow::anyhow!("Failed to convert program: {:?}", e))?;

// STEP 1: load program

program
.load()
.map_err(|e| anyhow::anyhow!("Cannot load program: {}. Error: {}", &program_name, e))?;

// STEP 2: Attach the loaded program to kernel symbol
match program.attach(tracepoint_type, tracepoint_symbol) {
Ok(_) => info!(
"{} program attached successfully to tracepoint {}",
&program_name, &tracepoint_symbol
),
Err(e) => {
error!(
"Error attaching {} program to tracepoint {}. Reason: {:?}",
&program_name, &tracepoint_symbol, e
);
return Err(anyhow::anyhow!(
"Failed to attach program {} to tracepoint {}. Reason {:?}",
&program_name,
&tracepoint_symbol,
e
));
}
};

Ok(())
}
118 changes: 118 additions & 0 deletions core/src/components/metrics/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,34 @@ pub async fn event_listener(bpf_maps: BpfMapsData, meter: Meter) -> Result<(), a
.remove("net_metrics")
.expect("Cannot create net_perf_buffer");

let (_cpu_frequency_events_array, cpu_frequency_perf_buffer) = maps
.remove("cpu_frequency")
.expect("Cannot create cpu_frequency_perf_buffer");

let (_cpu_idle_array, cpu_idle_perf_buffer) = maps
.remove("cpu_idle")
.expect("Cannot create cpu_idle perf buffer");

let (_mem_alloc_array, mem_alloc_perf_buffer) = maps
.remove("mem_alloc")
.expect("Cannot create mem_alloc perf buffer");

let (_sched_stat_wait_array, sched_stat_wait_perf_buffer) = maps
.remove("sched_stat_wait")
.expect("Cannot create sched_stat_wait perf buffer");

let (_sched_stat_runtime_array, sched_stat_runtime_perf_buffer) = maps
.remove("sched_stat_runtime")
.expect("Cannot create sched_stat_runtime perf buffer");

// Allocate byte-buffers sized for each structure type
let net_metrics_buffers = BufferSize::NetworkMetricsEvents.set_buffer();
let time_stamp_events_buffers = BufferSize::TimeMetricsEvents.set_buffer();
let cpu_frequency_events_buffers = BufferSize::CpuFrequency.set_buffer();
let cpu_idle_buffers = BufferSize::CpuIdle.set_buffer();
let mem_alloc_buffers = BufferSize::MemAlloc.set_buffer();
let sched_stat_wait_buffers = BufferSize::SchedStatWait.set_buffer();
let sched_stat_runtime_buffers = BufferSize::SchedStatRuntime.set_buffer();

let metrics = Arc::new(Metrics::new(&meter));

Expand Down Expand Up @@ -100,6 +125,69 @@ pub async fn event_listener(bpf_maps: BpfMapsData, meter: Meter) -> Result<(), a
})
};

let cpu_frequency_metrics = {
let metrics = Arc::clone(&metrics);
let mut array_buffers = cpu_frequency_perf_buffer;
let mut buffers = cpu_frequency_events_buffers;
tokio::spawn(async move {
read_perf_buffer(
array_buffers,
buffers,
BufferType::CpuFrequency,
Some(metrics),
)
.await;
})
};

let cpu_idle_metrics = {
let metrics = Arc::clone(&metrics);
let mut array_buffers = cpu_idle_perf_buffer;
let mut buffers = cpu_idle_buffers;
tokio::spawn(async move {
read_perf_buffer(array_buffers, buffers, BufferType::CpuIdle, Some(metrics)).await;
})
};

let mem_alloc_metrics = {
let metrics = Arc::clone(&metrics);
let mut array_buffers = mem_alloc_perf_buffer;
let mut buffers = mem_alloc_buffers;
tokio::spawn(async move {
read_perf_buffer(array_buffers, buffers, BufferType::MemAlloc, Some(metrics)).await;
})
};

let sched_stat_wait_metrics = {
let metrics = Arc::clone(&metrics);
let mut array_buffers = sched_stat_wait_perf_buffer;
let mut buffers = sched_stat_wait_buffers;
tokio::spawn(async move {
read_perf_buffer(
array_buffers,
buffers,
BufferType::SchedStatWait,
Some(metrics),
)
.await;
})
};

let sched_stat_runtime_metrics = {
let metrics = Arc::clone(&metrics);
let mut array_buffers = sched_stat_runtime_perf_buffer;
let mut buffers = sched_stat_runtime_buffers;
tokio::spawn(async move {
read_perf_buffer(
array_buffers,
buffers,
BufferType::SchedStatRuntime,
Some(metrics),
)
.await;
})
};

info!("Event listeners started, entering main loop...");

tokio::select! {
Expand All @@ -115,6 +203,36 @@ pub async fn event_listener(bpf_maps: BpfMapsData, meter: Meter) -> Result<(), a
}
}

result = cpu_frequency_metrics => {
if let Err(e) = result {
error!("Cpu frequency events task failed: {:?}", e);
}
}

result = cpu_idle_metrics => {
if let Err(e) = result {
error!("CpuIdle events task failed: {:?}", e);
}
}

result = mem_alloc_metrics => {
if let Err(e) = result {
error!("MemAlloc events task failed: {:?}", e);
}
}

result = sched_stat_wait_metrics => {
if let Err(e) = result {
error!("SchedStatWait events task failed: {:?}", e);
}
}

result = sched_stat_runtime_metrics => {
if let Err(e) = result {
error!("SchedStatRuntime events task failed: {:?}", e);
}
}

_ = signal::ctrl_c() => {
info!("Ctrl-C received, shutting down...");
}
Expand Down
Loading