Skip to content

Commit c94398f

Browse files
Make Telemetry API log record type generic (#1098)
* make log record type generic * propagate log record type generic * replace LambdaTelemetry with a type alias * reduce diff size * go back to previous default generic approach * actually go back to previous default generic approach * fix semver breakage * docs: note with_telemetry_record_type usage in with_telemetry_record * Fix formatting of documentation comment in extension.rs --------- Co-authored-by: Jess Izen <44884346+jlizen@users.noreply.github.com>
1 parent c565705 commit c94398f

5 files changed

Lines changed: 157 additions & 24 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "extension-telemetry-basic"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
lambda-extension = { path = "../../lambda-extension" }
8+
serde_json = "1"
9+
tokio = { version = "1", features = ["macros", "rt"] }
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# AWS Lambda Telemetry extension example
2+
3+
## Build & Deploy
4+
5+
1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
6+
2. Build the extension with `cargo lambda build --release --extension`
7+
3. Deploy the extension as a layer with `cargo lambda deploy --extension`
8+
9+
The last command will give you an ARN for the extension layer that you can use in your functions.
10+
11+
## Build for ARM 64
12+
13+
Build the extension with `cargo lambda build --release --extension --arm64`
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use lambda_extension::{service_fn, tracing, Error, Extension, LambdaTelemetry, LambdaTelemetryRecord, SharedService};
2+
3+
async fn handler(events: Vec<LambdaTelemetry<serde_json::Value>>) -> Result<(), Error> {
4+
for event in events {
5+
match event.record {
6+
LambdaTelemetryRecord::Function(record) => tracing::info!("[logs] [function] {}", record),
7+
LambdaTelemetryRecord::Extension(record) => tracing::info!("[extension] [function] {}", record),
8+
_ => (),
9+
}
10+
}
11+
12+
Ok(())
13+
}
14+
15+
#[tokio::main]
16+
async fn main() -> Result<(), Error> {
17+
// required to enable CloudWatch error logging by the runtime
18+
tracing::init_default_subscriber();
19+
20+
let telemetry_processor = SharedService::new(service_fn(handler));
21+
22+
Extension::new()
23+
.with_telemetry_record_type::<serde_json::Value>()
24+
.with_telemetry_processor(telemetry_processor)
25+
.run()
26+
.await?;
27+
28+
Ok(())
29+
}

lambda-extension/src/extension.rs

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ use hyper::{body::Incoming, server::conn::http1, service::service_fn};
44

55
use hyper_util::rt::tokio::TokioIo;
66
use lambda_runtime_api_client::Client;
7-
use serde::Deserialize;
7+
use serde::{de::DeserializeOwned, Deserialize};
88
use std::{
99
convert::Infallible,
1010
fmt,
1111
future::{ready, Future},
12+
marker::PhantomData,
1213
net::SocketAddr,
1314
path::PathBuf,
1415
pin::Pin,
@@ -29,7 +30,7 @@ const DEFAULT_LOG_PORT_NUMBER: u16 = 9002;
2930
const DEFAULT_TELEMETRY_PORT_NUMBER: u16 = 9003;
3031

3132
/// An Extension that runs event, log and telemetry processors
32-
pub struct Extension<'a, E, L, T> {
33+
pub struct Extension<'a, E, L, T, TL = String> {
3334
extension_name: Option<&'a str>,
3435
events: Option<&'a [&'a str]>,
3536
events_processor: E,
@@ -41,6 +42,7 @@ pub struct Extension<'a, E, L, T> {
4142
telemetry_processor: Option<T>,
4243
telemetry_buffering: Option<LogBuffering>,
4344
telemetry_port_number: u16,
45+
_telemetry_record_type: PhantomData<fn(TL)>,
4446
}
4547

4648
impl Extension<'_, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIdentity<Vec<LambdaTelemetry>>> {
@@ -58,6 +60,7 @@ impl Extension<'_, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIden
5860
telemetry_buffering: None,
5961
telemetry_processor: None,
6062
telemetry_port_number: DEFAULT_TELEMETRY_PORT_NUMBER,
63+
_telemetry_record_type: PhantomData,
6164
}
6265
}
6366
}
@@ -70,7 +73,7 @@ impl Default
7073
}
7174
}
7275

73-
impl<'a, E, L, T> Extension<'a, E, L, T>
76+
impl<'a, E, L, T, TL> Extension<'a, E, L, T, TL>
7477
where
7578
E: Service<LambdaEvent>,
7679
E::Future: Future<Output = Result<(), E::Error>>,
@@ -85,12 +88,13 @@ where
8588
L::Future: Send,
8689

8790
// Fixme: 'static bound might be too restrictive
88-
T: MakeService<(), Vec<LambdaTelemetry>, Response = ()> + Send + Sync + 'static,
89-
T::Service: Service<Vec<LambdaTelemetry>, Response = ()> + Send + Sync,
90-
<T::Service as Service<Vec<LambdaTelemetry>>>::Future: Send + 'a,
91+
T: MakeService<(), Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync + 'static,
92+
T::Service: Service<Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync,
93+
<T::Service as Service<Vec<LambdaTelemetry<TL>>>>::Future: Send + 'a,
9194
T::Error: Into<Error> + fmt::Debug,
9295
T::MakeError: Into<Error> + fmt::Debug,
9396
T::Future: Send,
97+
TL: DeserializeOwned + Send + 'static,
9498
{
9599
/// Create a new [`Extension`] with a given extension name
96100
pub fn with_extension_name(self, extension_name: &'a str) -> Self {
@@ -110,7 +114,7 @@ where
110114
}
111115

112116
/// Create a new [`Extension`] with a service that receives Lambda events.
113-
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L, T>
117+
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L, T, TL>
114118
where
115119
N: Service<LambdaEvent>,
116120
N::Future: Future<Output = Result<(), N::Error>>,
@@ -128,11 +132,12 @@ where
128132
telemetry_buffering: self.telemetry_buffering,
129133
telemetry_processor: self.telemetry_processor,
130134
telemetry_port_number: self.telemetry_port_number,
135+
_telemetry_record_type: self._telemetry_record_type,
131136
}
132137
}
133138

134139
/// Create a new [`Extension`] with a service that receives Lambda logs.
135-
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N, T>
140+
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N, T, TL>
136141
where
137142
N: Service<()>,
138143
N::Future: Future<Output = Result<NS, N::Error>>,
@@ -150,6 +155,7 @@ where
150155
telemetry_buffering: self.telemetry_buffering,
151156
telemetry_processor: self.telemetry_processor,
152157
telemetry_port_number: self.telemetry_port_number,
158+
_telemetry_record_type: self._telemetry_record_type,
153159
}
154160
}
155161

@@ -179,7 +185,11 @@ where
179185
}
180186

181187
/// Create a new [`Extension`] with a service that receives Lambda telemetry data.
182-
pub fn with_telemetry_processor<N, NS>(self, lp: N) -> Extension<'a, E, L, N>
188+
///
189+
/// By default, telemetry log records are deserialized as `String`, but
190+
/// it's possible to configure Lambda functions to emit logs in JSON format.
191+
/// For more information, refer to [`Self::with_telemetry_record_type`].
192+
pub fn with_telemetry_processor<N, NS>(self, lp: N) -> Extension<'a, E, L, N, TL>
183193
where
184194
N: Service<()>,
185195
N::Future: Future<Output = Result<NS, N::Error>>,
@@ -197,6 +207,7 @@ where
197207
telemetry_types: self.telemetry_types,
198208
telemetry_buffering: self.telemetry_buffering,
199209
telemetry_port_number: self.telemetry_port_number,
210+
_telemetry_record_type: self._telemetry_record_type,
200211
}
201212
}
202213

@@ -345,6 +356,48 @@ where
345356
}
346357
}
347358

359+
impl<'a, E, L> Extension<'a, E, L, MakeIdentity<Vec<LambdaTelemetry>>> {
360+
/// Set the deserialization type for telemetry log records.
361+
///
362+
/// By default, telemetry log records are deserialized as `String`, but
363+
/// it's possible to configure Lambda functions to emit logs in JSON format.
364+
/// Use this method to deserialize into a different type, such as
365+
/// `serde_json::Value`.
366+
///
367+
/// Must be called before [`Self::with_telemetry_processor`].
368+
///
369+
/// ```
370+
/// use lambda_extension::{Extension, LambdaTelemetry, SharedService, service_fn};
371+
///
372+
/// async fn handler(events: Vec<LambdaTelemetry<serde_json::Value>>) -> Result<(), lambda_extension::Error> {
373+
/// for event in &events {
374+
/// println!("{event:?}");
375+
/// }
376+
/// Ok(())
377+
/// }
378+
///
379+
/// let _ext = Extension::new()
380+
/// .with_telemetry_record_type::<serde_json::Value>()
381+
/// .with_telemetry_processor(SharedService::new(service_fn(handler)));
382+
/// ```
383+
pub fn with_telemetry_record_type<N>(self) -> Extension<'a, E, L, MakeIdentity<Vec<LambdaTelemetry<N>>>, N> {
384+
Extension {
385+
_telemetry_record_type: PhantomData,
386+
telemetry_processor: None,
387+
events_processor: self.events_processor,
388+
extension_name: self.extension_name,
389+
events: self.events,
390+
log_types: self.log_types,
391+
log_buffering: self.log_buffering,
392+
logs_processor: self.logs_processor,
393+
log_port_number: self.log_port_number,
394+
telemetry_types: self.telemetry_types,
395+
telemetry_buffering: self.telemetry_buffering,
396+
telemetry_port_number: self.telemetry_port_number,
397+
}
398+
}
399+
}
400+
348401
/// An extension registered by calling [`Extension::register`].
349402
pub struct RegisteredExtension<E> {
350403
/// The ID of the registered extension. This ID is unique per extension and remains constant

lambda-extension/src/telemetry.rs

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,31 @@ use http::{Request, Response};
33
use http_body_util::BodyExt;
44
use hyper::body::Incoming;
55
use lambda_runtime_api_client::body::Body;
6-
use serde::{Deserialize, Serialize};
6+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
77
use std::{boxed::Box, fmt, sync::Arc};
88
use tokio::sync::Mutex;
99
use tower::Service;
1010
use tracing::{error, trace};
1111

1212
/// Payload received from the Telemetry API
1313
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
14-
pub struct LambdaTelemetry {
14+
pub struct LambdaTelemetry<L = String> {
1515
/// Time when the telemetry was generated
1616
pub time: DateTime<Utc>,
1717
/// Telemetry record entry
1818
#[serde(flatten)]
19-
pub record: LambdaTelemetryRecord,
19+
pub record: LambdaTelemetryRecord<L>,
2020
}
2121

2222
/// Record in a LambdaTelemetry entry
2323
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
2424
#[serde(tag = "type", content = "record", rename_all = "lowercase")]
25-
pub enum LambdaTelemetryRecord {
25+
pub enum LambdaTelemetryRecord<L = String> {
2626
/// Function log records
27-
Function(String),
27+
Function(L),
2828

2929
/// Extension log records
30-
Extension(String),
30+
Extension(L),
3131

3232
/// Platform init start record
3333
#[serde(rename = "platform.initStart", rename_all = "camelCase")]
@@ -269,14 +269,15 @@ pub struct RuntimeDoneMetrics {
269269
///
270270
/// This takes an `hyper::Request` and transforms it into `Vec<LambdaTelemetry>` for the
271271
/// underlying `Service` to process.
272-
pub(crate) async fn telemetry_wrapper<S>(
272+
pub(crate) async fn telemetry_wrapper<S, L>(
273273
service: Arc<Mutex<S>>,
274274
req: Request<Incoming>,
275275
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>>
276276
where
277-
S: Service<Vec<LambdaTelemetry>, Response = ()>,
277+
S: Service<Vec<LambdaTelemetry<L>>, Response = ()>,
278278
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
279279
S::Future: Send,
280+
L: DeserializeOwned,
280281
{
281282
trace!("Received telemetry request");
282283
// Parse the request body as a Vec<LambdaTelemetry>
@@ -291,7 +292,7 @@ where
291292
}
292293
};
293294

294-
let telemetry: Vec<LambdaTelemetry> = match serde_json::from_slice(&body.to_bytes()) {
295+
let telemetry: Vec<LambdaTelemetry<L>> = match serde_json::from_slice(&body.to_bytes()) {
295296
Ok(telemetry) => telemetry,
296297
Err(e) => {
297298
error!("Error parsing telemetry: {}", e);
@@ -319,17 +320,17 @@ mod deserialization_tests {
319320
use chrono::{TimeDelta, TimeZone};
320321

321322
macro_rules! deserialize_tests {
322-
($($name:ident: $value:expr,)*) => {
323+
($($name:ident$(<$log:ty>)?: $value:expr,)*) => {
323324
$(
324325
#[test]
325326
fn $name() {
326327
let (input, expected) = $value;
327-
let actual = serde_json::from_str::<LambdaTelemetry>(&input).expect("unable to deserialize");
328+
let actual = serde_json::from_str::<LambdaTelemetry$(<$log>)?>(&input).expect("unable to deserialize");
328329

329330
assert!(actual.record == expected);
330331
}
331332
)*
332-
}
333+
};
333334
}
334335

335336
deserialize_tests! {
@@ -339,12 +340,24 @@ mod deserialization_tests {
339340
LambdaTelemetryRecord::Function("hello world".to_string()),
340341
),
341342

343+
// function (json)
344+
function_generic<bool>: (
345+
r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": true}"#,
346+
LambdaTelemetryRecord::Function(true),
347+
),
348+
342349
// extension
343350
extension: (
344351
r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#,
345352
LambdaTelemetryRecord::Extension("hello world".to_string()),
346353
),
347354

355+
// extension (json)
356+
extension_generic<bool>: (
357+
r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": true}"#,
358+
LambdaTelemetryRecord::Extension(true),
359+
),
360+
348361
// platform.start
349362
platform_start: (
350363
r#"{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
@@ -477,11 +490,11 @@ mod serialization_tests {
477490

478491
use super::*;
479492
macro_rules! serialize_tests {
480-
($($name:ident: $value:expr,)*) => {
493+
($($name:ident$(<$log:ty>)?: $value:expr,)*) => {
481494
$(
482495
#[test]
483496
fn $name() {
484-
let (input, expected) = $value;
497+
let (input, expected): (LambdaTelemetry$(<$log>)?, &str) = $value;
485498
let actual = serde_json::to_string(&input).expect("unable to serialize");
486499
println!("Input: {:?}\n", input);
487500
println!("Expected:\n {:?}\n", expected);
@@ -490,7 +503,7 @@ mod serialization_tests {
490503
assert!(actual == expected);
491504
}
492505
)*
493-
}
506+
};
494507
}
495508

496509
serialize_tests! {
@@ -502,6 +515,14 @@ mod serialization_tests {
502515
},
503516
r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":"hello world"}"#,
504517
),
518+
// function (json)
519+
function_generic<bool>: (
520+
LambdaTelemetry {
521+
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
522+
record: LambdaTelemetryRecord::Function(true),
523+
},
524+
r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":true}"#,
525+
),
505526
// extension
506527
extension: (
507528
LambdaTelemetry {
@@ -510,6 +531,14 @@ mod serialization_tests {
510531
},
511532
r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":"hello world"}"#,
512533
),
534+
// extension (json)
535+
extension_generic<bool>: (
536+
LambdaTelemetry {
537+
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
538+
record: LambdaTelemetryRecord::Extension(true),
539+
},
540+
r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":true}"#,
541+
),
513542
//platform.Start
514543
platform_start: (
515544
LambdaTelemetry{

0 commit comments

Comments
 (0)