diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 5ce5025e73..25640d71c1 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -87,6 +87,7 @@ type telemetryAPIReceiver struct { exportInterval time.Duration stopCh chan struct{} wg sync.WaitGroup + lastEventTime pcommon.Timestamp } func (r *telemetryAPIReceiver) bindListener() (net.Listener, string, error) { @@ -191,7 +192,10 @@ func (r *telemetryAPIReceiver) flushMetricsLocked(ctx context.Context) error { scopeMetric.Scope().SetName(scopeName) scopeMetric.SetSchemaUrl(semconv.SchemaURL) - ts := pcommon.NewTimestampFromTime(time.Now()) + ts := r.lastEventTime + if ts == 0 { + ts = pcommon.NewTimestampFromTime(time.Now()) + } r.faaSMetricBuilders.coldstartsMetric.AppendDataPoints(scopeMetric, ts) r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, ts) r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, ts) @@ -359,6 +363,12 @@ func (r *telemetryAPIReceiver) recordMetrics(slice []event) { continue } + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { + if ets := pcommon.NewTimestampFromTime(t); ets > r.lastEventTime { + r.lastEventTime = ets + } + } + switch el.Type { case string(telemetryapi.PlatformInitStart): r.faaSMetricBuilders.coldstartsMetric.Add(1) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 7e549a628d..acc656027b 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -163,6 +163,66 @@ func TestRecordMetrics(t *testing.T) { require.True(t, foundDuration) } +func TestMetricTimestampMatchesEventTime(t *testing.T) { + r, err := newTelemetryAPIReceiver( + &Config{ExportInterval: 60000}, + receivertest.NewNopSettings(Type), + ) + require.NoError(t, err) + c := &mockConsumer{} + r.registerMetricsConsumer(c) + + const earliest = "2006-01-02T15:04:04.000Z" + const latest = "2006-01-02T15:04:05.000Z" + + slice := []event{ + { + Time: earliest, + Type: "platform.initStart", + Record: map[string]any{ + "functionName": "test-func", + }, + }, + { + Time: latest, + Type: "platform.runtimeDone", + Record: map[string]any{ + "status": "success", + "metrics": map[string]any{ + "durationMs": 100.0, + }, + }, + }, + } + r.recordMetrics(slice) + + err = r.flushMetrics(context.Background()) + require.NoError(t, err) + require.Len(t, c.metricBatches, 1) + + expected, err := time.Parse(time.RFC3339, latest) + require.NoError(t, err) + expectedTS := pcommon.NewTimestampFromTime(expected) + + sm := c.metricBatches[0].ResourceMetrics().At(0).ScopeMetrics().At(0) + require.Greater(t, sm.Metrics().Len(), 0) + for i := 0; i < sm.Metrics().Len(); i++ { + m := sm.Metrics().At(i) + switch m.Type() { + case pmetric.MetricTypeSum: + dps := m.Sum().DataPoints() + for j := 0; j < dps.Len(); j++ { + require.Equal(t, expectedTS, dps.At(j).Timestamp(), "sum metric %q dp %d", m.Name(), j) + } + case pmetric.MetricTypeHistogram: + dps := m.Histogram().DataPoints() + for j := 0; j < dps.Len(); j++ { + require.Equal(t, expectedTS, dps.At(j).Timestamp(), "histogram metric %q dp %d", m.Name(), j) + } + } + } +} + func TestFlushMetricsIntervalImmediate(t *testing.T) { // Test immediate flush when interval = 0 r, err := newTelemetryAPIReceiver(