Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Telemetry API Receiver

| Status | |
| ------------------------ |--------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs |
| Distributions | [extension] |
| Status | |
|--------------------------|-----------------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs, metrics |
| Distributions | [extension] |

This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup.

Expand All @@ -15,22 +15,26 @@ Supported events:

## Configuration

| Field | Default | Description |
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 0 (dynamically determined by OS) | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |
| Field | Default | Description |
|-----------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 0 (dynamically determined by OS) | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |
| `metrics_temporality` | cumulative | The [aggregation temporality](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality) to use for metrics. Supported values: `delta`, `cumulative`. |
| `export_interval_ms` | 60000 | The interval in milliseconds at which metrics are exported. If set to 0, metrics are exported immediately upon receipt. |


```yaml
receivers:
telemetryapi:
telemetryapi/1:
port: 4326
export_interval_ms: 30000
telemetryapi/2:
port: 4327
types:
- platform
- function
metrics_temporality: delta
telemetryapi/3:
port: 4328
types: ["platform", "function"]
Expand Down
4 changes: 4 additions & 0 deletions collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ type Config struct {
Types []string `mapstructure:"types"`
LogReport bool `mapstructure:"log_report"`
MetricsTemporality string `mapstructure:"metrics_temporality"`
ExportInterval int `mapstructure:"export_interval_ms"`
}

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
if cfg.ExportInterval < 0 {
return fmt.Errorf("export_interval_ms must be non-negative: %d", cfg.ExportInterval)
}
for _, t := range cfg.Types {
if t != platform && t != function && t != extension {
return fmt.Errorf("unknown extension type: %s", t)
Expand Down
7 changes: 4 additions & 3 deletions collector/receiver/telemetryapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ func TestLoadConfig(t *testing.T) {
// Helper function to create expected Config
createExpectedConfig := func(types []string) *Config {
return &Config{
extensionID: "extensionID",
Port: 12345,
Types: types,
extensionID: "extensionID",
Port: 12345,
Types: types,
ExportInterval: defaultExportInterval,
}
}

Expand Down
20 changes: 11 additions & 9 deletions collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
)

const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 0
platform = "platform"
function = "function"
extension = "extension"
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 0
defaultExportInterval = 60000
platform = "platform"
function = "function"
extension = "extension"
)

var (
Expand All @@ -44,9 +45,10 @@ func NewFactory(extensionID string) receiver.Factory {
Type,
func() component.Config {
return &Config{
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
ExportInterval: defaultExportInterval,
}
},
receiver.WithTraces(createTracesReceiver, stability),
Expand Down
7 changes: 6 additions & 1 deletion collector/receiver/telemetryapireceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func TestNewFactory(t *testing.T) {
testFunc: func(t *testing.T) {
factory := NewFactory("test")

var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}}
var expectedCfg component.Config = &Config{
extensionID: "test",
Port: defaultPort,
Types: []string{platform, function, extension},
ExportInterval: defaultExportInterval,
}

require.Equal(t, expectedCfg, factory.CreateDefaultConfig())
},
Expand Down
4 changes: 2 additions & 2 deletions collector/receiver/telemetryapireceiver/metric_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (h *HistogramMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetr
}
}

if !export {
if !export || len(h.dataPoints) == 0 {
return
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (c *CounterMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetric
}
}

if !export {
if !export || len(c.dataPoints) == 0 {
return
}

Expand Down
72 changes: 72 additions & 0 deletions collector/receiver/telemetryapireceiver/metric_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,78 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) {
})
}

func TestHistogramMetricBuilder_AppendDataPoints_NoData(t *testing.T) {
startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour))

t.Run("cumulative temporality", func(t *testing.T) {
builder := NewHistogramMetricBuilder(
"test.histogram", "Test histogram", "ms",
nil, startTime, pmetric.AggregationTemporalityCumulative,
)

metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
scopeMetrics := rm.ScopeMetrics().AppendEmpty()

timestamp := pcommon.NewTimestampFromTime(time.Now())
builder.AppendDataPoints(scopeMetrics, timestamp)

assert.Equal(t, 0, scopeMetrics.Metrics().Len())
})

t.Run("delta temporality", func(t *testing.T) {
builder := NewHistogramMetricBuilder(
"test.histogram", "Test histogram", "ms",
nil, startTime, pmetric.AggregationTemporalityDelta,
)

metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
scopeMetrics := rm.ScopeMetrics().AppendEmpty()

timestamp := pcommon.NewTimestampFromTime(time.Now())
builder.AppendDataPoints(scopeMetrics, timestamp)

assert.Equal(t, 0, scopeMetrics.Metrics().Len())
})
}

func TestCounterMetricBuilder_AppendDataPoints_NoData(t *testing.T) {
startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour))

t.Run("cumulative temporality", func(t *testing.T) {
builder := NewCounterMetricBuilder(
"test.counter", "Test counter", "{count}",
true, startTime, pmetric.AggregationTemporalityCumulative,
)

metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
scopeMetrics := rm.ScopeMetrics().AppendEmpty()

timestamp := pcommon.NewTimestampFromTime(time.Now())
builder.AppendDataPoints(scopeMetrics, timestamp)

assert.Equal(t, 0, scopeMetrics.Metrics().Len())
})

t.Run("delta temporality", func(t *testing.T) {
builder := NewCounterMetricBuilder(
"test.counter", "Test counter", "{count}",
true, startTime, pmetric.AggregationTemporalityDelta,
)

metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
scopeMetrics := rm.ScopeMetrics().AppendEmpty()

timestamp := pcommon.NewTimestampFromTime(time.Now())
builder.AppendDataPoints(scopeMetrics, timestamp)

assert.Equal(t, 0, scopeMetrics.Metrics().Len())
})
}

func TestHistogramMetricBuilder_AggregationTemporality(t *testing.T) {
startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour))

Expand Down
Loading
Loading