diff --git a/CHANGELOG.md b/CHANGELOG.md
index 87276b4f384..8c02f898972 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -20,6 +20,8 @@
#### Incubating
+* Add support for configuring `setMaxExportBatchSize` in `PeriodicMetricReader` ([#8296](https://github.com/open-telemetry/opentelemetry-java/pull/8296))
+
* **BREAKING** Update `EnvironmentGetter` and `EnvironmentSetter` key normalization to reflect spec
changes
([#8233](https://github.com/open-telemetry/opentelemetry-java/pull/8233))
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java
new file mode 100644
index 00000000000..ee2d5320445
--- /dev/null
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.metrics.export;
+
+import io.opentelemetry.sdk.metrics.data.DoublePointData;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
+import io.opentelemetry.sdk.metrics.data.HistogramData;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import io.opentelemetry.sdk.metrics.data.SumData;
+import io.opentelemetry.sdk.metrics.data.SummaryPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Batches metric data into multiple batches based on the maximum export batch size. This is used by
+ * the {@link PeriodicMetricReader} to batch metric data before exporting it.
+ *
+ *
This class is internal and is hence not for public use. Its APIs are unstable and can change
+ * at any time.
+ */
+class MetricExportBatcher {
+
+ private MetricExportBatcher() {}
+
+ private static void validateMaxExportBatchSize(int maxExportBatchSize) {
+ if (maxExportBatchSize <= 0) {
+ throw new IllegalArgumentException("maxExportBatchSize must be positive");
+ }
+ }
+
+ /**
+ * Batches the given metric data into multiple batches based on the maximum export batch size.
+ *
+ * @param metrics The collection of metric data objects to batch based on the number of data
+ * points they contain.
+ * @return A collection of batches of metric data.
+ */
+ static Collection> batchMetrics(
+ Collection metrics, int maxExportBatchSize) {
+ validateMaxExportBatchSize(maxExportBatchSize);
+ if (metrics.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collection> preparedBatchesForExport = new ArrayList<>();
+ List currentBatch = new ArrayList<>(maxExportBatchSize);
+ int currentPointsInBatch = 0;
+ for (MetricData metricData : metrics) {
+ int totalPointsInMetric = metricData.getData().getPoints().size();
+ if (currentPointsInBatch + totalPointsInMetric <= maxExportBatchSize) {
+ currentBatch.add(metricData);
+ currentPointsInBatch += totalPointsInMetric;
+ continue;
+ }
+ int currentIndex = 0;
+ List originalPointsList = new ArrayList<>(metricData.getData().getPoints());
+ while (currentIndex < totalPointsInMetric) {
+ if (currentPointsInBatch == maxExportBatchSize) {
+ preparedBatchesForExport.add(currentBatch);
+ currentBatch = new ArrayList<>(maxExportBatchSize);
+ currentPointsInBatch = 0;
+ }
+ int pointsToTake =
+ Math.min(maxExportBatchSize - currentPointsInBatch, totalPointsInMetric - currentIndex);
+ currentBatch.add(
+ copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake));
+ currentPointsInBatch += pointsToTake;
+ currentIndex += pointsToTake;
+ }
+ }
+ if (!currentBatch.isEmpty()) {
+ preparedBatchesForExport.add(currentBatch);
+ }
+ return Collections.unmodifiableCollection(preparedBatchesForExport);
+ }
+
+ private static MetricData copyMetricData(
+ MetricData original,
+ List originalPointsList,
+ int dataPointsOffset,
+ int dataPointsToTake) {
+ List points =
+ Collections.unmodifiableList(
+ new ArrayList<>(
+ originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake)));
+ return createMetricDataWithPoints(original, points);
+ }
+
+ /**
+ * Creates a new MetricData with the given points.
+ *
+ * @param original The original MetricData.
+ * @param points The points to use for the new MetricData.
+ * @return A new MetricData with the given points.
+ */
+ @SuppressWarnings("unchecked")
+ private static MetricData createMetricDataWithPoints(
+ MetricData original, Collection points) {
+ switch (original.getType()) {
+ case DOUBLE_GAUGE:
+ return ImmutableMetricData.createDoubleGauge(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableGaugeData.create((Collection) (Collection>) points));
+ case LONG_GAUGE:
+ return ImmutableMetricData.createLongGauge(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableGaugeData.create((Collection) (Collection>) points));
+ case DOUBLE_SUM:
+ SumData doubleSumData = original.getDoubleSumData();
+ return ImmutableMetricData.createDoubleSum(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableSumData.create(
+ doubleSumData.isMonotonic(),
+ doubleSumData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case LONG_SUM:
+ SumData longSumData = original.getLongSumData();
+ return ImmutableMetricData.createLongSum(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableSumData.create(
+ longSumData.isMonotonic(),
+ longSumData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case HISTOGRAM:
+ HistogramData histogramData = original.getHistogramData();
+ return ImmutableMetricData.createDoubleHistogram(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableHistogramData.create(
+ histogramData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case EXPONENTIAL_HISTOGRAM:
+ ExponentialHistogramData expHistogramData = original.getExponentialHistogramData();
+ return ImmutableMetricData.createExponentialHistogram(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableExponentialHistogramData.create(
+ expHistogramData.getAggregationTemporality(),
+ (Collection) (Collection>) points));
+ case SUMMARY:
+ return ImmutableMetricData.createDoubleSummary(
+ original.getResource(),
+ original.getInstrumentationScopeInfo(),
+ original.getName(),
+ original.getDescription(),
+ original.getUnit(),
+ ImmutableSummaryData.create((Collection) (Collection>) points));
+ }
+ throw new UnsupportedOperationException("Unsupported metric type: " + original.getType());
+ }
+}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java
index c8e33fde1e0..25e0bc0b2f1 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java
@@ -17,6 +17,7 @@
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -51,6 +52,7 @@ public final class PeriodicMetricReader implements MetricReader {
private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();
@Nullable private volatile ScheduledFuture> scheduledFuture;
+ private final int maxExportBatchSize;
/**
* Returns a new {@link PeriodicMetricReader} which exports to the {@code exporter} once every
@@ -66,10 +68,14 @@ public static PeriodicMetricReaderBuilder builder(MetricExporter exporter) {
}
PeriodicMetricReader(
- MetricExporter exporter, long intervalNanos, ScheduledExecutorService scheduler) {
+ MetricExporter exporter,
+ long intervalNanos,
+ ScheduledExecutorService scheduler,
+ int maxExportBatchSize) {
this.exporter = exporter;
this.intervalNanos = intervalNanos;
this.scheduler = scheduler;
+ this.maxExportBatchSize = maxExportBatchSize;
this.scheduled = new Scheduled();
}
@@ -163,6 +169,8 @@ public String toString() {
+ exporter
+ ", intervalNanos="
+ intervalNanos
+ + ", maxExportBatchSize="
+ + maxExportBatchSize
+ '}';
}
@@ -187,13 +195,56 @@ private final class Scheduled implements Runnable {
private Scheduled() {}
+ private CompletableResultCode exportMetrics(Collection metricData) {
+ if (maxExportBatchSize == 0) {
+ return exporter.export(metricData);
+ }
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(metricData, maxExportBatchSize);
+ CompletableResultCode sequentialResult = new CompletableResultCode();
+ AtomicBoolean anyFailed = new AtomicBoolean(false);
+ Iterator> batchIterator = batches.iterator();
+ Runnable exportNext =
+ new Runnable() {
+ @Override
+ public void run() {
+ while (batchIterator.hasNext()) {
+ Collection currentBatch = batchIterator.next();
+ CompletableResultCode currentResult = exporter.export(currentBatch);
+ if (currentResult.isDone()) {
+ if (!currentResult.isSuccess()) {
+ anyFailed.set(true);
+ }
+ } else {
+ currentResult.whenComplete(
+ () -> {
+ if (!currentResult.isSuccess()) {
+ anyFailed.set(true);
+ }
+ this.run();
+ });
+ return;
+ }
+ }
+ if (anyFailed.get()) {
+ sequentialResult.fail();
+ } else {
+ sequentialResult.succeed();
+ }
+ }
+ };
+ exportNext.run();
+ return sequentialResult;
+ }
+
void setMeterProvider(MeterProvider meterProvider) {
instrumentation = new MetricReaderInstrumentation(COMPONENT_ID, meterProvider);
}
@Override
public void run() {
- // Ignore the CompletableResultCode from doRun() in order to keep run() asynchronous
+ // Ignore the CompletableResultCode from doRun() in order to keep run()
+ // asynchronous
doRun();
}
@@ -217,11 +268,11 @@ CompletableResultCode doRun() {
exportAvailable.set(true);
flushResult.succeed();
} else {
- CompletableResultCode result = exporter.export(metricData);
+ CompletableResultCode result = exportMetrics(metricData);
result.whenComplete(
() -> {
if (!result.isSuccess()) {
- logger.log(Level.FINE, "Exporter failed");
+ logger.log(Level.WARNING, "Exporter failed");
}
exportAvailable.set(true);
flushResult.succeed();
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java
index 04cdd27506d..138c68a006f 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java
@@ -30,6 +30,8 @@ public final class PeriodicMetricReaderBuilder {
@Nullable private ScheduledExecutorService executor;
+ private int maxExportBatchSize;
+
PeriodicMetricReaderBuilder(MetricExporter metricExporter) {
this.metricExporter = metricExporter;
}
@@ -59,6 +61,20 @@ public PeriodicMetricReaderBuilder setExecutor(ScheduledExecutorService executor
return this;
}
+ /**
+ * Sets the maximum number of data points to include in a single export batch. If unset, no
+ * batching will be performed. The maximum number of data points is considered across MetricData
+ * objects scheduled for export.
+ *
+ * @param maxExportBatchSize The maximum number of data points to include in a single export
+ * batch.
+ */
+ PeriodicMetricReaderBuilder setMaxExportBatchSize(int maxExportBatchSize) {
+ checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive");
+ this.maxExportBatchSize = maxExportBatchSize;
+ return this;
+ }
+
/** Build a {@link PeriodicMetricReader} with the configuration of this builder. */
public PeriodicMetricReader build() {
ScheduledExecutorService executor = this.executor;
@@ -66,6 +82,6 @@ public PeriodicMetricReader build() {
executor =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("PeriodicMetricReader"));
}
- return new PeriodicMetricReader(metricExporter, intervalNanos, executor);
+ return new PeriodicMetricReader(metricExporter, intervalNanos, executor, maxExportBatchSize);
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java
index 5bec97a8603..68206dab82e 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java
@@ -76,6 +76,21 @@ public static SdkMeterProviderBuilder addMeterConfiguratorCondition(
return sdkMeterProviderBuilder;
}
+ /** Reflectively set the max export batch size for the {@link SdkMeterProviderBuilder}. */
+ public static SdkMeterProviderBuilder setMaxExportBatchSize(
+ SdkMeterProviderBuilder sdkMeterProviderBuilder, int maxExportBatchSize) {
+ try {
+ Method method =
+ SdkMeterProviderBuilder.class.getDeclaredMethod("setMaxExportBatchSize", int.class);
+ method.setAccessible(true);
+ method.invoke(sdkMeterProviderBuilder, maxExportBatchSize);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new IllegalStateException(
+ "Error calling setMaxExportBatchSize on SdkMeterProviderBuilder", e);
+ }
+ return sdkMeterProviderBuilder;
+ }
+
/**
* Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends
* key-values from baggage to all measurements.
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java
new file mode 100644
index 00000000000..0a99b03fc92
--- /dev/null
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java
@@ -0,0 +1,753 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.metrics.export;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.DoublePointData;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
+import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.SummaryPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramBuckets;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile;
+import io.opentelemetry.sdk.resources.Resource;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+
+class MetricExportBatcherTest {
+
+ @Test
+ void batchMetrics_InvalidMaxExportBatchSize() {
+ assertThatThrownBy(() -> MetricExportBatcher.batchMetrics(Collections.emptyList(), 0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+ assertThatThrownBy(() -> MetricExportBatcher.batchMetrics(Collections.emptyList(), -1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+ }
+
+ @Test
+ void batchMetrics_EmptyMetrics() {
+ assertThat(MetricExportBatcher.batchMetrics(Collections.emptyList(), 10)).isEmpty();
+ }
+
+ @Test
+ void batchMetrics_MetricFitsIntact() {
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Collections.singletonList(p1)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 10);
+ assertThat(batches).hasSize(1);
+ assertThat(batches.iterator().next()).containsExactly(metric);
+ }
+
+ @Test
+ void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() {
+ DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0);
+ DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0);
+ DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0);
+ DoublePointData p4 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 4.0);
+ DoublePointData p5 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 5.0);
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 2);
+ List> batchesList = new ArrayList<>(batches);
+
+ assertThat(batchesList.size()).isEqualTo(3);
+ Collection firstBatch = batchesList.get(0);
+ Collection secondBatch = batchesList.get(1);
+ Collection thirdBatch = batchesList.get(2);
+
+ assertThat(firstBatch.size()).isEqualTo(1);
+ assertThat(secondBatch.size()).isEqualTo(1);
+ assertThat(thirdBatch.size()).isEqualTo(1);
+
+ MetricData b1m1 = firstBatch.iterator().next();
+ assertThat(b1m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE);
+ assertThat(b1m1.getName()).isEqualTo("name");
+ assertThat(b1m1.getDescription()).isEqualTo("desc");
+ assertThat(b1m1.getUnit()).isEqualTo("1");
+ assertThat(b1m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2);
+
+ MetricData b2m1 = secondBatch.iterator().next();
+ assertThat(b2m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE);
+ assertThat(b2m1.getName()).isEqualTo("name");
+ assertThat(b2m1.getDescription()).isEqualTo("desc");
+ assertThat(b2m1.getUnit()).isEqualTo("1");
+ assertThat(b2m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4);
+
+ // Last batch is partially filled.
+ MetricData b3m1 = thirdBatch.iterator().next();
+ assertThat(b3m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE);
+ assertThat(b3m1.getName()).isEqualTo("name");
+ assertThat(b3m1.getDescription()).isEqualTo("desc");
+ assertThat(b3m1.getUnit()).isEqualTo("1");
+ assertThat(b3m1.getDoubleGaugeData().getPoints()).containsExactly(p5);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_SingleBatchPartiallyFilled() {
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 4);
+
+ assertThat(batches).hasSize(1);
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(1); // There is only 1 MetricData
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3);
+ }
+
+ @Test
+ void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() {
+ DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0);
+ DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0);
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true, AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 2);
+
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(1); // There is only 1 MetricData
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1, p2);
+ assertThat(m1.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m1.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics() {
+ Attributes attrs1 = Attributes.builder().put("key1", "val1").build();
+ Attributes attrs2 = Attributes.builder().put("key2", "val2").build();
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L);
+
+ MetricData metric1 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc_1",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2)));
+
+ MetricData metric2 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc_2",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Arrays.asList(metric1, metric2), 1);
+
+ assertThat(batches).hasSize(4);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ Collection thirdBatch = batches.stream().skip(2).findFirst().get();
+ Collection fourthBatch = batches.stream().skip(3).findFirst().get();
+
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+ assertThat(thirdBatch).hasSize(1);
+ assertThat(fourthBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m1.getName()).isEqualTo("name_1");
+ assertThat(m1.getDescription()).isEqualTo("desc_1");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getLongSumData().getPoints()).containsExactly(p1);
+ assertThat(m1.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1);
+ assertThat(m1.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m1.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m2.getName()).isEqualTo("name_1");
+ assertThat(m2.getDescription()).isEqualTo("desc_1");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getLongSumData().getPoints()).containsExactly(p2);
+ assertThat(m2.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2);
+ assertThat(m2.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m2.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+
+ MetricData m3 = thirdBatch.iterator().next();
+ assertThat(m3.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m3.getName()).isEqualTo("name_2");
+ assertThat(m3.getDescription()).isEqualTo("desc_2");
+ assertThat(m3.getUnit()).isEqualTo("1");
+ assertThat(m3.getLongSumData().getPoints()).containsExactly(p1);
+ assertThat(m3.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1);
+ assertThat(m3.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m3.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+
+ MetricData m4 = fourthBatch.iterator().next();
+ assertThat(m4.getType()).isEqualTo(MetricDataType.LONG_SUM);
+ assertThat(m4.getName()).isEqualTo("name_2");
+ assertThat(m4.getDescription()).isEqualTo("desc_2");
+ assertThat(m4.getUnit()).isEqualTo("1");
+ assertThat(m4.getLongSumData().getPoints()).containsExactly(p2);
+ assertThat(m4.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2);
+ assertThat(m4.getLongSumData().isMonotonic()).isFalse();
+ assertThat(m4.getLongSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.DELTA);
+ }
+
+ @Test
+ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() {
+ ImmutableHistogramPointData p1 =
+ ImmutableHistogramPointData.create(
+ 1,
+ 2,
+ Attributes.empty(),
+ 1.0,
+ /* hasMin= */ false,
+ 0.0,
+ /* hasMax= */ false,
+ 0.0,
+ Collections.emptyList(),
+ Collections.singletonList(1L));
+ ImmutableHistogramPointData p2 =
+ ImmutableHistogramPointData.create(
+ 1,
+ 2,
+ Attributes.empty(),
+ 2.0,
+ /* hasMin= */ false,
+ 0.0,
+ /* hasMax= */ false,
+ 0.0,
+ Collections.emptyList(),
+ Collections.singletonList(2L));
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleHistogram(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableHistogramData.create(
+ AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 1);
+
+ assertThat(batches).hasSize(2);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.HISTOGRAM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getHistogramData().getPoints()).containsExactly(p1);
+ assertThat(m1.getHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.HISTOGRAM);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getHistogramData().getPoints()).containsExactly(p2);
+ assertThat(m2.getHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_MultipleMetricsExactCapacityMatch() {
+ Attributes attrs1 = Attributes.builder().put("k", "v1").build();
+ Attributes attrs2 = Attributes.builder().put("k", "v2").build();
+ Attributes attrs3 = Attributes.builder().put("k", "v3").build();
+ Attributes attrs4 = Attributes.builder().put("k", "v4").build();
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, attrs3, 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, attrs4, 4L);
+
+ MetricData m1 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2)));
+ MetricData m2 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p3, p4)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Arrays.asList(m1, m2), 4);
+ assertThat(batches).hasSize(1);
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).containsExactly(m1, m2);
+
+ MetricData res1 = firstBatch.iterator().next();
+ MetricData res2 = firstBatch.stream().skip(1).findFirst().get();
+
+ assertThat(res1.getName()).isEqualTo("name_1");
+ assertThat(res1.getLongGaugeData().getPoints()).containsExactly(p1, p2);
+ assertThat(res2.getName()).isEqualTo("name_2");
+ assertThat(res2.getLongGaugeData().getPoints()).containsExactly(p3, p4);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_MultipleMetrics_ExceedsCapacity() {
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L);
+ LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L);
+ LongPointData p6 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 6L);
+
+ MetricData m1 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3)));
+ MetricData m2 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p4, p5, p6)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Arrays.asList(m1, m2), 4);
+
+ assertThat(batches).hasSize(2);
+
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(2);
+ MetricData b1m1 = firstBatch.iterator().next();
+ MetricData b1m2 = firstBatch.stream().skip(1).findFirst().get();
+ assertThat(b1m1.getName()).isEqualTo("name_1");
+ assertThat(b1m1.getDescription()).isEqualTo("desc");
+ assertThat(b1m1.getUnit()).isEqualTo("1");
+ assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3);
+
+ assertThat(b1m2.getName()).isEqualTo("name_2");
+ assertThat(b1m2.getDescription()).isEqualTo("desc");
+ assertThat(b1m2.getUnit()).isEqualTo("1");
+ assertThat(b1m2.getLongGaugeData().getPoints()).containsExactly(p4);
+
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(secondBatch).hasSize(1);
+ MetricData b2m1 = secondBatch.iterator().next();
+ assertThat(b2m1.getName()).isEqualTo("name_2");
+ assertThat(b2m1.getDescription()).isEqualTo("desc");
+ assertThat(b2m1.getUnit()).isEqualTo("1");
+ assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p5, p6);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_MultipleMetrics_PerfectFillThenSplit() {
+ // m1 fills the batch completely (remaining capacity becomes 0).
+ // m2 has 3 points, which forces it to split from the start of a fully-exhausted
+ // previous pass.
+ // This test case fails if there is an empty batch
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L);
+ LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L);
+
+ MetricData m1 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_1",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2)));
+ MetricData m2 =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name_2",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p3, p4, p5)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Arrays.asList(m1, m2), 2);
+
+ assertThat(batches).hasSize(3);
+
+ // Batch 1 should contain exactly m1 (p1, p2)
+ Collection firstBatch = batches.iterator().next();
+ assertThat(firstBatch).hasSize(1);
+ MetricData b1m1 = firstBatch.iterator().next();
+ assertThat(b1m1.getName()).isEqualTo("name_1");
+ assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2);
+
+ // Batch 2 should contain the first part of m2 (p3, p4)
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(secondBatch).hasSize(1);
+ MetricData b2m1 = secondBatch.iterator().next();
+ assertThat(b2m1.getName()).isEqualTo("name_2");
+ assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p3, p4);
+
+ // Batch 3 should contain the rest of m2 (p5)
+ Collection thirdBatch = batches.stream().skip(2).findFirst().get();
+ assertThat(thirdBatch).hasSize(1);
+ MetricData b3m1 = thirdBatch.iterator().next();
+ assertThat(b3m1.getName()).isEqualTo("name_2");
+ assertThat(b3m1.getLongGaugeData().getPoints()).containsExactly(p5);
+ }
+
+ @Test
+ void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() {
+ ExponentialHistogramBuckets buckets =
+ ImmutableExponentialHistogramBuckets.create(
+ /* scale= */ 20, /* offset= */ 0, /* bucketCounts= */ Collections.singletonList(1L));
+ ExponentialHistogramPointData p1 =
+ ImmutableExponentialHistogramPointData.create(
+ /* scale= */ 20,
+ /* sum= */ 1.0,
+ /* zeroCount= */ 0,
+ /* hasMin= */ false,
+ /* min= */ 0.0,
+ /* hasMax= */ false,
+ /* max= */ 0.0,
+ /* positiveBuckets= */ buckets,
+ /* negativeBuckets= */ buckets,
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* exemplars= */ Collections.emptyList());
+ ExponentialHistogramPointData p2 =
+ ImmutableExponentialHistogramPointData.create(
+ /* scale= */ 20,
+ /* sum= */ 2.0,
+ /* zeroCount= */ 0,
+ /* hasMin= */ false,
+ /* min= */ 0.0,
+ /* hasMax= */ false,
+ /* max= */ 0.0,
+ /* positiveBuckets= */ buckets,
+ /* negativeBuckets= */ buckets,
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* exemplars= */ Collections.emptyList());
+
+ MetricData metric =
+ ImmutableMetricData.createExponentialHistogram(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableExponentialHistogramData.create(
+ AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 1);
+
+ assertThat(batches).hasSize(2);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getExponentialHistogramData().getPoints()).containsExactly(p1);
+ assertThat(m1.getExponentialHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getExponentialHistogramData().getPoints()).containsExactly(p2);
+ assertThat(m2.getExponentialHistogramData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_SplitsSummary_MultipleBatchesCompletelyFilled_SingleMetric() {
+ SummaryPointData p1 =
+ ImmutableSummaryPointData.create(
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* count= */ 1,
+ /* sum= */ 1.0,
+ /* percentileValues= */ Collections.singletonList(
+ ImmutableValueAtQuantile.create(0.5, 1.0)));
+ SummaryPointData p2 =
+ ImmutableSummaryPointData.create(
+ /* startEpochNanos= */ 1,
+ /* epochNanos= */ 2,
+ /* attributes= */ Attributes.empty(),
+ /* count= */ 1,
+ /* sum= */ 2.0,
+ /* percentileValues= */ Collections.singletonList(
+ ImmutableValueAtQuantile.create(0.5, 2.0)));
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleSummary(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableSummaryData.create(Arrays.asList(p1, p2)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 1);
+
+ assertThat(batches).hasSize(2);
+ Collection firstBatch = batches.iterator().next();
+ Collection secondBatch = batches.stream().skip(1).findFirst().get();
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.SUMMARY);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getSummaryData().getPoints()).containsExactly(p1);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.SUMMARY);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getSummaryData().getPoints()).containsExactly(p2);
+ }
+
+ @Test
+ void batchMetrics_SplitsLongGauge_MultipleBatches() {
+ LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L);
+ LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L);
+ LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L);
+ LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L);
+ LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L);
+
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 2);
+ List> batchesList = new ArrayList<>(batches);
+
+ assertThat(batchesList).hasSize(3);
+ Collection firstBatch = batchesList.get(0);
+ Collection secondBatch = batchesList.get(1);
+ Collection thirdBatch = batchesList.get(2);
+
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+ assertThat(thirdBatch).hasSize(1);
+
+ MetricData firstBatchMetricData = firstBatch.iterator().next();
+ assertThat(firstBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(firstBatchMetricData.getName()).isEqualTo("name");
+ assertThat(firstBatchMetricData.getDescription()).isEqualTo("desc");
+ assertThat(firstBatchMetricData.getUnit()).isEqualTo("1");
+ assertThat(firstBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p1, p2);
+
+ MetricData secondBatchMetricData = secondBatch.iterator().next();
+ assertThat(secondBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(secondBatchMetricData.getName()).isEqualTo("name");
+ assertThat(secondBatchMetricData.getDescription()).isEqualTo("desc");
+ assertThat(secondBatchMetricData.getUnit()).isEqualTo("1");
+ assertThat(secondBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p3, p4);
+
+ MetricData thirdBatchMetricData = thirdBatch.iterator().next();
+ assertThat(thirdBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE);
+ assertThat(thirdBatchMetricData.getName()).isEqualTo("name");
+ assertThat(thirdBatchMetricData.getDescription()).isEqualTo("desc");
+ assertThat(thirdBatchMetricData.getUnit()).isEqualTo("1");
+ assertThat(thirdBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p5);
+ }
+
+ @Test
+ void batchMetrics_SplitsDoubleSum_MultipleBatches() {
+ DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0);
+ DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0);
+ DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0);
+
+ MetricData metric =
+ ImmutableMetricData.createDoubleSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ Arrays.asList(p1, p2, p3)));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 1);
+ List> batchesList = new ArrayList<>(batches);
+
+ assertThat(batchesList).hasSize(3);
+ Collection firstBatch = batchesList.get(0);
+ Collection secondBatch = batchesList.get(1);
+ Collection thirdBatch = batchesList.get(2);
+
+ assertThat(firstBatch).hasSize(1);
+ assertThat(secondBatch).hasSize(1);
+ assertThat(thirdBatch).hasSize(1);
+
+ MetricData m1 = firstBatch.iterator().next();
+ assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m1.getName()).isEqualTo("name");
+ assertThat(m1.getDescription()).isEqualTo("desc");
+ assertThat(m1.getUnit()).isEqualTo("1");
+ assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1);
+ assertThat(m1.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m1.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+
+ MetricData m2 = secondBatch.iterator().next();
+ assertThat(m2.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m2.getName()).isEqualTo("name");
+ assertThat(m2.getDescription()).isEqualTo("desc");
+ assertThat(m2.getUnit()).isEqualTo("1");
+ assertThat(m2.getDoubleSumData().getPoints()).containsExactly(p2);
+ assertThat(m2.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m2.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+
+ MetricData m3 = thirdBatch.iterator().next();
+ assertThat(m3.getType()).isEqualTo(MetricDataType.DOUBLE_SUM);
+ assertThat(m3.getName()).isEqualTo("name");
+ assertThat(m3.getDescription()).isEqualTo("desc");
+ assertThat(m3.getUnit()).isEqualTo("1");
+ assertThat(m3.getDoubleSumData().getPoints()).containsExactly(p3);
+ assertThat(m3.getDoubleSumData().isMonotonic()).isTrue();
+ assertThat(m3.getDoubleSumData().getAggregationTemporality())
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ @Test
+ void batchMetrics_EmptyPointsInMetricData() {
+ MetricData metric =
+ ImmutableMetricData.createLongGauge(
+ Resource.empty(),
+ InstrumentationScopeInfo.empty(),
+ "name",
+ "desc",
+ "1",
+ ImmutableGaugeData.create(Collections.emptyList()));
+
+ Collection> batches =
+ MetricExportBatcher.batchMetrics(Collections.singletonList(metric), 2);
+ assertThat(batches).hasSize(1);
+ assertThat(batches.iterator().next()).containsExactly(metric);
+ }
+}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
index 1e74ffcaa9e..d097a4bb4a4 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
@@ -15,6 +15,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
@@ -30,6 +31,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -46,6 +48,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
@@ -55,8 +58,13 @@
@MockitoSettings(strictness = Strictness.LENIENT)
class PeriodicMetricReaderTest {
private static final List LONG_POINT_LIST =
- Collections.singletonList(
- ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1234567));
+ Arrays.asList(
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 2L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 3L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 4L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 5L),
+ ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 6L));
private static final MetricData METRIC_DATA =
ImmutableMetricData.createLongSum(
@@ -71,6 +79,10 @@ class PeriodicMetricReaderTest {
@Mock private CollectionRegistration collectionRegistration;
@Mock private MetricExporter metricExporter;
+ @RegisterExtension
+ LogCapturer logCapturer =
+ LogCapturer.create().captureForLogger(PeriodicMetricReader.class.getName());
+
@BeforeEach
void setup() {
when(collectionRegistration.collectAllMetrics())
@@ -97,6 +109,19 @@ void startOnlyOnce() {
verify(scheduler, times(1)).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());
}
+ @Test
+ void build_WithIllegalMaxExportSize() {
+ assertThatThrownBy(
+ () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(0).build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+
+ assertThatThrownBy(
+ () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(-1).build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxExportBatchSize must be positive");
+ }
+
@Test
void periodicExport() throws Exception {
WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
@@ -118,6 +143,103 @@ void periodicExport() throws Exception {
}
}
+ @Test
+ void periodicExport_WithMaxExportBatchSize_PartiallyFilledBatch() throws Exception {
+ WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(waitingMetricExporter)
+ .setInterval(Duration.ofMillis(100))
+ .setMaxExportBatchSize(4)
+ .build();
+
+ reader.register(collectionRegistration);
+ MetricData expectedMetricDataBatch1 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(0, 4)));
+ MetricData expectedMetricDataBatch2 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(4, 6)));
+ try {
+ assertThat(waitingMetricExporter.waitForNumberOfExports(2))
+ .containsExactly(
+ Collections.singletonList(expectedMetricDataBatch1),
+ Collections.singletonList(expectedMetricDataBatch2));
+ } finally {
+ reader.shutdown();
+ }
+ }
+
+ @Test
+ void periodicExport_WithMaxExportBatchSize_CompletelyFilledBatch() throws Exception {
+ WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(waitingMetricExporter)
+ .setInterval(Duration.ofMillis(100))
+ .setMaxExportBatchSize(2)
+ .build();
+
+ reader.register(collectionRegistration);
+ MetricData expectedMetricDataBatch1 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(0, 2)));
+ MetricData expectedMetricDataBatch2 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(2, 4)));
+
+ MetricData expectedMetricDataBatch3 =
+ ImmutableMetricData.createLongSum(
+ Resource.empty(),
+ InstrumentationScopeInfo.create("PeriodicMetricReaderTest"),
+ "my metric",
+ "my metric description",
+ "us",
+ ImmutableSumData.create(
+ /* isMonotonic= */ true,
+ AggregationTemporality.CUMULATIVE,
+ LONG_POINT_LIST.subList(4, 6)));
+ try {
+ assertThat(waitingMetricExporter.waitForNumberOfExports(3))
+ .containsExactly(
+ Collections.singletonList(expectedMetricDataBatch1),
+ Collections.singletonList(expectedMetricDataBatch2),
+ Collections.singletonList(expectedMetricDataBatch3));
+ } finally {
+ reader.shutdown();
+ }
+ }
+
@Test
void periodicExport_NoMetricsSkipsExport() {
WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
@@ -297,7 +419,8 @@ public CompletableResultCode shutdown() {
shutdownThread.start();
// Give shutdown() time to reach the flushInProgress.join() wait.
- // Even if this executes before shutdown enters the wait, the assertions below still
+ // Even if this executes before shutdown enters the wait, the assertions below
+ // still
// validate correctness — they just won't exercise the concurrent case.
Thread.sleep(200);
@@ -312,7 +435,8 @@ public CompletableResultCode shutdown() {
assertThat(flushResult.isSuccess()).isTrue();
// Final shutdown export also ran (in-flight + final = 2)
assertThat(exportCount.get()).isEqualTo(2);
- // Exporter.shutdown() was not called while the in-flight export was still pending
+ // Exporter.shutdown() was not called while the in-flight export was still
+ // pending
assertThat(shutdownCalledWhileExportPending.get()).isFalse();
}
@@ -335,18 +459,190 @@ void invalidConfig() {
.hasMessage("executor");
}
+ @Test
+ void periodicExport_SequentialBatches() throws Exception {
+ MetricExporter mockExporter = mock(MetricExporter.class);
+ when(mockExporter.getAggregationTemporality(any()))
+ .thenReturn(AggregationTemporality.CUMULATIVE);
+ when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
+ when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+
+ CompletableResultCode batch1Result = new CompletableResultCode();
+ CompletableResultCode batch2Result = CompletableResultCode.ofSuccess();
+
+ // Configure mock to return pending for 1st call, success for 2nd
+ when(mockExporter.export(any())).thenReturn(batch1Result).thenReturn(batch2Result);
+
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(mockExporter)
+ .setInterval(
+ Duration.ofSeconds(Integer.MAX_VALUE)) // Long interval to prevent auto-trigger
+ .setMaxExportBatchSize(3)
+ .build();
+ // Setup metrics that will result in 2 batches (we have 6 points in
+ // LONG_POINT_LIST)
+ when(collectionRegistration.collectAllMetrics())
+ .thenReturn(Collections.singletonList(METRIC_DATA));
+ reader.register(collectionRegistration);
+
+ // Trigger manual flush
+ CompletableResultCode flushResult = reader.forceFlush();
+ // Verify that the first batch WAS exported
+ verify(mockExporter, times(1)).export(any());
+ // At this point, batch 1 is stuck waiting. Batch 2 should NOT be exported yet.
+ // We verify that export was only called once in total so far.
+ verify(mockExporter, times(1)).export(any());
+ // Now we complete the first batch
+ batch1Result.succeed();
+ // Verify that the second batch IS NOW exported
+ verify(mockExporter, times(2)).export(any());
+ // Ensure the flush operation completes successfully
+ assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
+ reader.shutdown();
+ }
+
+ @Test
+ @SuppressLogger(PeriodicMetricReader.class)
+ void periodicExport_SequentialBatches_PartialFailure() throws Exception {
+ MetricExporter mockExporter = mock(MetricExporter.class);
+ when(mockExporter.getAggregationTemporality(any()))
+ .thenReturn(AggregationTemporality.CUMULATIVE);
+ when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
+ when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+
+ CompletableResultCode batch1Result = new CompletableResultCode();
+ CompletableResultCode batch2Result = new CompletableResultCode();
+ CompletableResultCode batch3Result = new CompletableResultCode();
+
+ when(mockExporter.export(any()))
+ .thenReturn(batch1Result)
+ .thenReturn(batch2Result)
+ .thenReturn(batch3Result);
+
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(mockExporter)
+ .setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
+ .setMaxExportBatchSize(2) // 6 points / 2 = 3 batches
+ .build();
+
+ when(collectionRegistration.collectAllMetrics())
+ .thenReturn(Collections.singletonList(METRIC_DATA));
+ reader.register(collectionRegistration);
+
+ CompletableResultCode flushResult = reader.forceFlush();
+
+ verify(mockExporter, times(1)).export(any());
+
+ batch1Result.succeed();
+ verify(mockExporter, times(2)).export(any());
+
+ batch2Result.fail();
+ verify(mockExporter, times(3)).export(any());
+
+ batch3Result.succeed();
+
+ // Failed export results are logged, but forceFlush preserves the prior
+ // partial-success
+ // behavior.
+ assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
+
+ logCapturer.assertContains("Exporter failed");
+
+ reader.shutdown();
+ }
+
+ @Test
+ void periodicExport_SequentialBatches_PurelySynchronous() throws Exception {
+ MetricExporter mockExporter = mock(MetricExporter.class);
+ when(mockExporter.getAggregationTemporality(any()))
+ .thenReturn(AggregationTemporality.CUMULATIVE);
+ when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
+ when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+
+ when(mockExporter.export(any()))
+ .thenReturn(CompletableResultCode.ofSuccess())
+ .thenReturn(CompletableResultCode.ofSuccess())
+ .thenReturn(CompletableResultCode.ofSuccess());
+
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(mockExporter)
+ .setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
+ .setMaxExportBatchSize(2) // 6 points / 2 = 3 batches
+ .build();
+
+ when(collectionRegistration.collectAllMetrics())
+ .thenReturn(Collections.singletonList(METRIC_DATA));
+ reader.register(collectionRegistration);
+
+ CompletableResultCode flushResult = reader.forceFlush();
+
+ // Verify that all 3 batches WERE exported immediately
+ verify(mockExporter, times(3)).export(any());
+
+ assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
+
+ reader.shutdown();
+ }
+
+ @Test
+ void periodicExport_SequentialBatches_PurelyAsynchronous() throws Exception {
+ MetricExporter mockExporter = mock(MetricExporter.class);
+ when(mockExporter.getAggregationTemporality(any()))
+ .thenReturn(AggregationTemporality.CUMULATIVE);
+ when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
+ when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+
+ CompletableResultCode batch1Result = new CompletableResultCode();
+ CompletableResultCode batch2Result = new CompletableResultCode();
+ CompletableResultCode batch3Result = new CompletableResultCode();
+
+ when(mockExporter.export(any()))
+ .thenReturn(batch1Result)
+ .thenReturn(batch2Result)
+ .thenReturn(batch3Result);
+
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(mockExporter)
+ .setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
+ .setMaxExportBatchSize(2) // 6 points / 2 = 3 batches
+ .build();
+
+ when(collectionRegistration.collectAllMetrics())
+ .thenReturn(Collections.singletonList(METRIC_DATA));
+ reader.register(collectionRegistration);
+
+ CompletableResultCode flushResult = reader.forceFlush();
+
+ verify(mockExporter, times(1)).export(any());
+
+ batch1Result.succeed();
+ verify(mockExporter, times(2)).export(any());
+
+ batch2Result.succeed();
+ verify(mockExporter, times(3)).export(any());
+
+ batch3Result.succeed();
+
+ assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
+ logCapturer.assertDoesNotContain("Exporter failed");
+
+ reader.shutdown();
+ }
+
@Test
void stringRepresentation() {
when(metricExporter.toString()).thenReturn("MockMetricExporter{}");
assertThat(
PeriodicMetricReader.builder(metricExporter)
.setInterval(Duration.ofSeconds(1))
+ .setMaxExportBatchSize(200)
.build()
.toString())
.isEqualTo(
"PeriodicMetricReader{"
+ "exporter=MockMetricExporter{}, "
- + "intervalNanos=1000000000"
+ + "intervalNanos=1000000000, "
+ + "maxExportBatchSize=200"
+ "}");
}