Skip to content

Commit c31a621

Browse files
Fix flaky TestBlocksCleaner by awaiting HeartBeat goroutine completion
Signed-off-by: Artem Muterko <artem@sopho.tech>
1 parent 889ca8f commit c31a621

2 files changed

Lines changed: 47 additions & 2 deletions

File tree

pkg/compactor/blocks_cleaner.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,14 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
357357
return nil
358358
}
359359
errChan := make(chan error, 1)
360-
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
360+
doneChan := make(chan struct{})
361+
go func() {
362+
visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
363+
close(doneChan)
364+
}()
361365
defer func() {
362366
errChan <- nil
367+
<-doneChan
363368
}()
364369
return errors.Wrapf(c.cleanUser(ctx, userLogger, userBucket, userID, firstRun), "failed to delete blocks for user: %s", userID)
365370
})
@@ -392,9 +397,14 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
392397
return nil
393398
}
394399
errChan := make(chan error, 1)
395-
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
400+
doneChan := make(chan struct{})
401+
go func() {
402+
visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
403+
close(doneChan)
404+
}()
396405
defer func() {
397406
errChan <- nil
407+
<-doneChan
398408
}()
399409
return errors.Wrapf(c.deleteUserMarkedForDeletion(ctx, userLogger, userBucket, userID), "failed to delete user marked for deletion: %s", userID)
400410
})

pkg/cortexpb/histograms.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,41 @@ func FloatHistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram
9595
}
9696
}
9797

98+
// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
99+
// provided proto message and returns it as a FloatHistogram. The caller has to
100+
// make sure that the proto message represents an integer histogram and not a
101+
// float histogram, or it panics.
102+
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L669-L688
103+
func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
104+
if hp.IsFloatHistogram() {
105+
panic("HistogramProtoToFloatHistogram called with a float histogram")
106+
}
107+
return &histogram.FloatHistogram{
108+
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
109+
Schema: hp.Schema,
110+
ZeroThreshold: hp.ZeroThreshold,
111+
ZeroCount: float64(hp.GetZeroCountInt()),
112+
Count: float64(hp.GetCountInt()),
113+
Sum: hp.Sum,
114+
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
115+
PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()),
116+
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
117+
NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()),
118+
CustomValues: hp.GetCustomValues(),
119+
}
120+
}
121+
122+
// deltasToCounts converts a slice of deltas to a slice of cumulative float64 counts.
123+
func deltasToCounts(deltas []int64) []float64 {
124+
counts := make([]float64, len(deltas))
125+
var cur float64
126+
for i, d := range deltas {
127+
cur += float64(d)
128+
counts[i] = cur
129+
}
130+
return counts
131+
}
132+
98133
// HistogramToHistogramProto converts a (normal integer) Histogram to its protobuf message type.
99134
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L709-L723
100135
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) Histogram {

0 commit comments

Comments
 (0)