From 3d6223365ea2c3904d6efed5bffa098df4e2e547 Mon Sep 17 00:00:00 2001 From: Daniil Bee Date: Mon, 11 May 2026 21:01:46 +0300 Subject: [PATCH 1/2] fix(collector): use Telemetry API event time as metric timestamp --- .../receiver/telemetryapireceiver/receiver.go | 37 +++++++++--- .../telemetryapireceiver/receiver_test.go | 60 +++++++++++++++++++ .../receiver/telemetryapireceiver/types.go | 9 ++- 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 5ce5025e73..dde3f120fd 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -87,6 +87,7 @@ type telemetryAPIReceiver struct { exportInterval time.Duration stopCh chan struct{} wg sync.WaitGroup + lastEventTime pcommon.Timestamp } func (r *telemetryAPIReceiver) bindListener() (net.Listener, string, error) { @@ -191,7 +192,10 @@ func (r *telemetryAPIReceiver) flushMetricsLocked(ctx context.Context) error { scopeMetric.Scope().SetName(scopeName) scopeMetric.SetSchemaUrl(semconv.SchemaURL) - ts := pcommon.NewTimestampFromTime(time.Now()) + ts := r.lastEventTime + if ts == 0 { + ts = pcommon.NewTimestampFromTime(time.Now()) + } r.faaSMetricBuilders.coldstartsMetric.AppendDataPoints(scopeMetric, ts) r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, ts) r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, ts) @@ -237,6 +241,12 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ return } + for i := range slice { + if t, err := time.Parse(time.RFC3339, slice[i].Time); err == nil { + slice[i].parsedTime = t + } + } + r.mu.Lock() defer r.mu.Unlock() @@ -359,6 +369,16 @@ func (r *telemetryAPIReceiver) recordMetrics(slice []event) { continue } + t := el.parsedTime + if t.IsZero() { + t, _ = time.Parse(time.RFC3339, el.Time) + } + if !t.IsZero() { + if ets := pcommon.NewTimestampFromTime(t); ets > r.lastEventTime { + r.lastEventTime = ets + } + } + switch el.Type { case string(telemetryapi.PlatformInitStart): r.faaSMetricBuilders.coldstartsMetric.Add(1) @@ -428,13 +448,16 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) logRecord := scopeLog.LogRecords().AppendEmpty() logRecord.Attributes().PutStr("type", el.Type) - if t, err := time.Parse(time.RFC3339, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - } else { - r.logger.Error("error parsing time", zap.Error(err)) - return plog.Logs{}, err + t := el.parsedTime + if t.IsZero() { + var err error + if t, err = time.Parse(time.RFC3339, el.Time); err != nil { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } } + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) if record, ok := el.Record.(map[string]interface{}); ok { requestId := r.getRecordRequestId(record) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 7e549a628d..acc656027b 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -163,6 +163,66 @@ func TestRecordMetrics(t *testing.T) { require.True(t, foundDuration) } +func TestMetricTimestampMatchesEventTime(t *testing.T) { + r, err := newTelemetryAPIReceiver( + &Config{ExportInterval: 60000}, + receivertest.NewNopSettings(Type), + ) + require.NoError(t, err) + c := &mockConsumer{} + r.registerMetricsConsumer(c) + + const earliest = "2006-01-02T15:04:04.000Z" + const latest = "2006-01-02T15:04:05.000Z" + + slice := []event{ + { + Time: earliest, + Type: "platform.initStart", + Record: map[string]any{ + "functionName": "test-func", + }, + }, + { + Time: latest, + Type: "platform.runtimeDone", + Record: map[string]any{ + "status": "success", + "metrics": map[string]any{ + "durationMs": 100.0, + }, + }, + }, + } + r.recordMetrics(slice) + + err = r.flushMetrics(context.Background()) + require.NoError(t, err) + require.Len(t, c.metricBatches, 1) + + expected, err := time.Parse(time.RFC3339, latest) + require.NoError(t, err) + expectedTS := pcommon.NewTimestampFromTime(expected) + + sm := c.metricBatches[0].ResourceMetrics().At(0).ScopeMetrics().At(0) + require.Greater(t, sm.Metrics().Len(), 0) + for i := 0; i < sm.Metrics().Len(); i++ { + m := sm.Metrics().At(i) + switch m.Type() { + case pmetric.MetricTypeSum: + dps := m.Sum().DataPoints() + for j := 0; j < dps.Len(); j++ { + require.Equal(t, expectedTS, dps.At(j).Timestamp(), "sum metric %q dp %d", m.Name(), j) + } + case pmetric.MetricTypeHistogram: + dps := m.Histogram().DataPoints() + for j := 0; j < dps.Len(); j++ { + require.Equal(t, expectedTS, dps.At(j).Timestamp(), "histogram metric %q dp %d", m.Name(), j) + } + } + } +} + func TestFlushMetricsIntervalImmediate(t *testing.T) { // Test immediate flush when interval = 0 r, err := newTelemetryAPIReceiver( diff --git a/collector/receiver/telemetryapireceiver/types.go b/collector/receiver/telemetryapireceiver/types.go index fdcb4e3f28..af0095bf33 100644 --- a/collector/receiver/telemetryapireceiver/types.go +++ b/collector/receiver/telemetryapireceiver/types.go @@ -14,8 +14,11 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" +import "time" + type event struct { - Time string `json:"time"` - Type string `json:"type"` - Record any `json:"record"` + Time string `json:"time"` + Type string `json:"type"` + Record any `json:"record"` + parsedTime time.Time } From 2a7457048f21d677988c7fbc44081cf83b45651f Mon Sep 17 00:00:00 2001 From: Daniil Bee Date: Wed, 13 May 2026 14:30:17 +0300 Subject: [PATCH 2/2] revert refactor: keep parse inline per reviewer feedback --- .../receiver/telemetryapireceiver/receiver.go | 27 +++++-------------- .../receiver/telemetryapireceiver/types.go | 9 +++---- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index dde3f120fd..25640d71c1 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -241,12 +241,6 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ return } - for i := range slice { - if t, err := time.Parse(time.RFC3339, slice[i].Time); err == nil { - slice[i].parsedTime = t - } - } - r.mu.Lock() defer r.mu.Unlock() @@ -369,11 +363,7 @@ func (r *telemetryAPIReceiver) recordMetrics(slice []event) { continue } - t := el.parsedTime - if t.IsZero() { - t, _ = time.Parse(time.RFC3339, el.Time) - } - if !t.IsZero() { + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { if ets := pcommon.NewTimestampFromTime(t); ets > r.lastEventTime { r.lastEventTime = ets } @@ -448,16 +438,13 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) logRecord := scopeLog.LogRecords().AppendEmpty() logRecord.Attributes().PutStr("type", el.Type) - t := el.parsedTime - if t.IsZero() { - var err error - if t, err = time.Parse(time.RFC3339, el.Time); err != nil { - r.logger.Error("error parsing time", zap.Error(err)) - return plog.Logs{}, err - } + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err } - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) if record, ok := el.Record.(map[string]interface{}); ok { requestId := r.getRecordRequestId(record) diff --git a/collector/receiver/telemetryapireceiver/types.go b/collector/receiver/telemetryapireceiver/types.go index af0095bf33..fdcb4e3f28 100644 --- a/collector/receiver/telemetryapireceiver/types.go +++ b/collector/receiver/telemetryapireceiver/types.go @@ -14,11 +14,8 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" -import "time" - type event struct { - Time string `json:"time"` - Type string `json:"type"` - Record any `json:"record"` - parsedTime time.Time + Time string `json:"time"` + Type string `json:"type"` + Record any `json:"record"` }