Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl ParseableSinkProcessor {
vec![log_source_entry],
TelemetryType::default(),
tenant_id,
None,
vec![],
vec![],
)
.await?;

Expand Down
16 changes: 13 additions & 3 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
use crate::handlers::{
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
};
use crate::metadata::SchemaVersion;
Expand Down Expand Up @@ -120,7 +120,8 @@ pub async fn ingest(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -206,6 +207,8 @@ pub async fn setup_otel_stream(
expected_log_source: LogSource,
known_fields: &[&str],
telemetry_type: TelemetryType,
dataset_tags: Vec<DatasetTag>,
dataset_labels: Vec<String>,
) -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
Expand Down Expand Up @@ -239,7 +242,8 @@ pub async fn setup_otel_stream(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
dataset_tags,
dataset_labels,
)
.await?;
let mut time_partition = None;
Expand Down Expand Up @@ -362,6 +366,8 @@ pub async fn handle_otel_logs_ingestion(
LogSource::OtelLogs,
&OTEL_LOG_KNOWN_FIELD_LIST,
TelemetryType::Logs,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand All @@ -386,6 +392,8 @@ pub async fn handle_otel_metrics_ingestion(
LogSource::OtelMetrics,
&OTEL_METRICS_KNOWN_FIELD_LIST,
TelemetryType::Metrics,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -417,6 +425,8 @@ pub async fn handle_otel_traces_ingestion(
LogSource::OtelTraces,
&OTEL_TRACES_KNOWN_FIELD_LIST,
TelemetryType::Traces,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down
33 changes: 18 additions & 15 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
*
*/

use actix_web::http::header::HeaderMap;

use crate::{
event::format::LogSource,
handlers::{
CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
TelemetryType, UPDATE_STREAM_KEY,
CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
parse_dataset_labels, parse_dataset_tags,
},
storage::StreamType,
};
use actix_web::http::header::HeaderMap;
use tracing::warn;

#[derive(Debug, Default)]
pub struct PutStreamHeaders {
Expand All @@ -38,7 +39,8 @@ pub struct PutStreamHeaders {
pub stream_type: StreamType,
pub log_source: LogSource,
pub telemetry_type: TelemetryType,
pub dataset_tag: Option<DatasetTag>,
pub dataset_tags: Vec<DatasetTag>,
pub dataset_labels: Vec<String>,
}

impl From<&HeaderMap> for PutStreamHeaders {
Expand Down Expand Up @@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders {
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok())
.map_or(TelemetryType::Logs, TelemetryType::from),
dataset_tag: headers
.get(DATASET_TAG_KEY)
dataset_tags: headers
.get(DATASET_TAGS_KEY)
.or_else(|| headers.get(DATASET_TAG_KEY))
.and_then(|v| v.to_str().ok())
.and_then(|v| match DatasetTag::try_from(v) {
Ok(tag) => Some(tag),
Err(err) => {
warn!("Invalid dataset tag '{v}': {err}");
None
}
}),
.map(parse_dataset_tags)
.unwrap_or_default(),
dataset_labels: headers
.get(DATASET_LABELS_KEY)
.and_then(|v| v.to_str().ok())
.map(parse_dataset_labels)
.unwrap_or_default(),
}
}
}
3 changes: 2 additions & 1 deletion src/handlers/http/prism_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ pub async fn post_datasets(
req: HttpRequest,
) -> Result<impl Responder, PrismLogstreamError> {
let session_key = extract_session_key_from_req(&req)?;
let tenant_id = get_tenant_id_from_request(&req);
let dataset = dataset_req
.map(|Json(r)| r)
.unwrap_or_default()
.get_datasets(session_key)
.get_datasets(session_key, tenant_id)
.await?;

Ok(web::Json(dataset))
Expand Down
46 changes: 44 additions & 2 deletions src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*
*/

use std::collections::HashSet;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use tracing::warn;

pub mod airplane;
pub mod http;
Expand All @@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag";
pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags";
pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels";
pub const TENANT_ID: &str = "x-p-tenant";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
Expand Down Expand Up @@ -85,12 +89,14 @@ impl Display for TelemetryType {
}

/// Tag for categorizing datasets/streams by observability domain
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum DatasetTag {
AgentObservability,
K8sObservability,
DatabaseObservability,
APM,
ServiceMap,
}

impl TryFrom<&str> for DatasetTag {
Expand All @@ -101,8 +107,10 @@ impl TryFrom<&str> for DatasetTag {
"agent-observability" => Ok(DatasetTag::AgentObservability),
"k8s-observability" => Ok(DatasetTag::K8sObservability),
"database-observability" => Ok(DatasetTag::DatabaseObservability),
"apm" => Ok(DatasetTag::APM),
"service-map" => Ok(DatasetTag::ServiceMap),
_ => Err(
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability",
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability, apm, service-map",
),
}
}
Expand All @@ -114,6 +122,40 @@ impl Display for DatasetTag {
DatasetTag::AgentObservability => "agent-observability",
DatasetTag::K8sObservability => "k8s-observability",
DatasetTag::DatabaseObservability => "database-observability",
DatasetTag::APM => "apm",
DatasetTag::ServiceMap => "service-map",
})
}
}

pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
header_value
.split(',')
.filter_map(|s| {
let trimmed = s.trim();
if trimmed.is_empty() {
None
} else {
match DatasetTag::try_from(trimmed) {
Ok(tag) => Some(tag),
Err(err) => {
warn!("Invalid dataset tag '{trimmed}': {err}");
None
}
}
}
})
.collect::<HashSet<_>>()
.into_iter()
.collect()
}

pub fn parse_dataset_labels(header_value: &str) -> Vec<String> {
header_value
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}
9 changes: 6 additions & 3 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ pub struct LogStreamMetadata {
pub stream_type: StreamType,
pub log_source: Vec<LogSourceEntry>,
pub telemetry_type: TelemetryType,
pub dataset_tag: Option<DatasetTag>,
pub dataset_tags: Vec<DatasetTag>,
pub dataset_labels: Vec<String>,
}

impl LogStreamMetadata {
Expand All @@ -109,7 +110,8 @@ impl LogStreamMetadata {
schema_version: SchemaVersion,
log_source: Vec<LogSourceEntry>,
telemetry_type: TelemetryType,
dataset_tag: Option<DatasetTag>,
dataset_tags: Vec<DatasetTag>,
dataset_labels: Vec<String>,
) -> Self {
LogStreamMetadata {
created_at: if created_at.is_empty() {
Expand All @@ -134,7 +136,8 @@ impl LogStreamMetadata {
schema_version,
log_source,
telemetry_type,
dataset_tag,
dataset_tags,
dataset_labels,
..Default::default()
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ async fn setup_logstream_metadata(
stream_type,
log_source,
telemetry_type,
dataset_tag,
dataset_tags,
dataset_labels,
..
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();

Expand Down Expand Up @@ -500,7 +501,8 @@ async fn setup_logstream_metadata(
stream_type,
log_source,
telemetry_type,
dataset_tag,
dataset_tags,
dataset_labels,
};

Ok(metadata)
Expand Down
30 changes: 20 additions & 10 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ impl Parseable {
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
let telemetry_type = stream_metadata.telemetry_type;
let dataset_tag = stream_metadata.dataset_tag;
let dataset_tags = stream_metadata.dataset_tags;
let dataset_labels = stream_metadata.dataset_labels;
let mut metadata = LogStreamMetadata::new(
created_at,
time_partition,
Expand All @@ -428,7 +429,8 @@ impl Parseable {
schema_version,
log_source,
telemetry_type,
dataset_tag,
dataset_tags,
dataset_labels,
);

// Set hot tier fields from the stored metadata
Expand Down Expand Up @@ -474,7 +476,8 @@ impl Parseable {
vec![log_source_entry.clone()],
TelemetryType::Logs,
&tenant_id,
None,
vec![],
vec![],
)
.await;

Expand Down Expand Up @@ -533,7 +536,8 @@ impl Parseable {
log_source: Vec<LogSourceEntry>,
telemetry_type: TelemetryType,
tenant_id: &Option<String>,
dataset_tag: Option<DatasetTag>,
dataset_tags: Vec<DatasetTag>,
dataset_labels: Vec<String>,
) -> Result<bool, PostError> {
if self.streams.contains(stream_name, tenant_id) {
return Ok(true);
Expand Down Expand Up @@ -566,7 +570,8 @@ impl Parseable {
log_source,
telemetry_type,
tenant_id,
dataset_tag,
dataset_tags,
dataset_labels,
)
.await?;

Expand Down Expand Up @@ -643,7 +648,8 @@ impl Parseable {
stream_type,
log_source,
telemetry_type,
dataset_tag,
dataset_tags,
dataset_labels,
} = headers.into();

let stream_in_memory_dont_update =
Expand Down Expand Up @@ -717,7 +723,8 @@ impl Parseable {
vec![log_source_entry],
telemetry_type,
tenant_id,
dataset_tag,
dataset_tags,
dataset_labels,
)
.await?;

Expand Down Expand Up @@ -779,7 +786,8 @@ impl Parseable {
log_source: Vec<LogSourceEntry>,
telemetry_type: TelemetryType,
tenant_id: &Option<String>,
dataset_tag: Option<DatasetTag>,
dataset_tags: Vec<DatasetTag>,
dataset_labels: Vec<String>,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal {
Expand All @@ -804,7 +812,8 @@ impl Parseable {
},
log_source: log_source.clone(),
telemetry_type,
dataset_tag,
dataset_tags: dataset_tags.clone(),
dataset_labels: dataset_labels.clone(),
..Default::default()
};

Expand Down Expand Up @@ -834,7 +843,8 @@ impl Parseable {
SchemaVersion::V1, // New stream
log_source,
telemetry_type,
dataset_tag,
dataset_tags,
dataset_labels,
);
let ingestor_id = INGESTOR_META
.get()
Expand Down
Loading
Loading