From 1d9b4effa92fa927880331e748a3955908402d2f Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Tue, 24 Mar 2026 15:32:39 -0700 Subject: [PATCH 1/8] Convert heavy queries from 5xx to 4xx Signed-off-by: Essam Eldaly --- pkg/api/handlers.go | 6 +- pkg/api/queryapi/query_api.go | 133 +++++++++++++-- pkg/frontend/transport/handler.go | 13 ++ pkg/querier/querier.go | 28 ++++ pkg/querier/stats/stats.go | 191 ++++++++++++++++++++++ pkg/querier/stats/timeout_decision.go | 47 ++++++ pkg/querier/worker/frontend_processor.go | 3 + pkg/querier/worker/scheduler_processor.go | 3 + pkg/scheduler/scheduler.go | 10 ++ 9 files changed, 419 insertions(+), 15 deletions(-) create mode 100644 pkg/querier/stats/timeout_decision.go diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 7f219896b7d..5852a65cbb1 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -286,7 +286,11 @@ func NewQuerierHandler( legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1")) api.Register(legacyPromRouter) - queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin) + queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, stats.PhaseTrackerConfig{ + TotalTimeout: querierCfg.TimeoutClassificationDeadline, + EvalTimeThreshold: querierCfg.TimeoutClassificationEvalThreshold, + Enabled: querierCfg.TimeoutClassificationEnabled, + }) requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger)) var apiHandler http.Handler diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index 83eed69ec8b..ede00d471b0 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -21,18 +21,21 @@ import ( "github.com/cortexproject/cortex/pkg/distributed_execution" "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/api" + "github.com/cortexproject/cortex/pkg/util/requestmeta" ) type QueryAPI struct { - queryable storage.SampleAndChunkQueryable - queryEngine engine.QueryEngine - now func() time.Time - statsRenderer v1.StatsRenderer - logger log.Logger - codecs []v1.Codec - CORSOrigin *regexp.Regexp + queryable storage.SampleAndChunkQueryable + queryEngine engine.QueryEngine + now func() time.Time + statsRenderer v1.StatsRenderer + logger log.Logger + codecs []v1.Codec + CORSOrigin *regexp.Regexp + timeoutClassification stats.PhaseTrackerConfig } func NewQueryAPI( @@ -42,15 +45,17 @@ func NewQueryAPI( logger log.Logger, codecs []v1.Codec, CORSOrigin *regexp.Regexp, + timeoutClassification stats.PhaseTrackerConfig, ) *QueryAPI { return &QueryAPI{ - queryEngine: qe, - queryable: q, - statsRenderer: statsRenderer, - logger: logger, - codecs: codecs, - CORSOrigin: CORSOrigin, - now: time.Now, + queryEngine: qe, + queryable: q, + statsRenderer: statsRenderer, + logger: logger, + codecs: codecs, + CORSOrigin: CORSOrigin, + now: time.Now, + timeoutClassification: timeoutClassification, } } @@ -84,6 +89,11 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { } ctx := r.Context() + + // Always record query start time for phase tracking, regardless of feature flag. + queryStats := stats.FromContext(ctx) + queryStats.SetQueryStart(time.Now()) + if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc timeout, err := util.ParseDurationMs(to) @@ -95,6 +105,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } + cfg := q.timeoutClassification + ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg) + if cancel != nil { + defer cancel() + } + if earlyResult != nil { + return *earlyResult + } + opts, err := extractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} @@ -138,6 +157,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { res := qry.Exec(ctx) if res.Err != nil { + // If the context was cancelled/timed out, apply timeout classification. + if ctx.Err() != nil { + if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil { + return *classified + } + } + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } @@ -159,6 +185,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { } ctx := r.Context() + + // Always record query start time for phase tracking, regardless of feature flag. + queryStats := stats.FromContext(ctx) + queryStats.SetQueryStart(time.Now()) + if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc timeout, err := util.ParseDurationMs(to) @@ -170,6 +201,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } + cfg := q.timeoutClassification + ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg) + if cancel != nil { + defer cancel() + } + if earlyResult != nil { + return *earlyResult + } + opts, err := extractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} @@ -211,6 +251,13 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { res := qry.Exec(ctx) if res.Err != nil { + // If the context was cancelled/timed out, apply timeout classification. + if ctx.Err() != nil { + if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil { + return *classified + } + } + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } @@ -281,6 +328,64 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data any, w } } +// applyTimeoutClassification creates a proactive context timeout that fires before +// the PromQL engine's own timeout, adjusted for queue wait time. Returns the +// (possibly wrapped) context, an optional cancel func, and an optional early-exit +// result when the entire timeout budget was already consumed in the queue. +func applyTimeoutClassification(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig) (context.Context, context.CancelFunc, *apiFuncResult) { + if !cfg.Enabled { + return ctx, nil, nil + } + var queueWaitTime time.Duration + queueJoin := queryStats.LoadQueueJoinTime() + queueLeave := queryStats.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + effectiveTimeout := cfg.TotalTimeout - queueWaitTime + if effectiveTimeout <= 0 { + return ctx, nil, &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable, + "query timed out: query spent too long in scheduler queue")}, nil, nil} + } + ctx, cancel := context.WithTimeout(ctx, effectiveTimeout) + return ctx, cancel, nil +} + +// classifyTimeout inspects phase timings after a context cancellation/timeout +// and returns an apiFuncResult if the timeout should be converted to a 4XX user error. +// Returns nil if no conversion applies and the caller should use the default error path. +func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig, warnings annotations.Annotations, closer func()) *apiFuncResult { + decision := stats.DecideTimeoutResponse(queryStats, cfg) + + fetchTime := queryStats.LoadQueryStorageWallTime() + totalTime := time.Since(queryStats.LoadQueryStart()) + evalTime := totalTime - fetchTime + var queueWaitTime time.Duration + queueJoin := queryStats.LoadQueueJoinTime() + queueLeave := queryStats.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + level.Warn(q.logger).Log( + "msg", "query timed out with classification", + "request_id", requestmeta.RequestIdFromContext(ctx), + "query_start", queryStats.LoadQueryStart(), + "queue_wait_time", queueWaitTime, + "fetch_time", fetchTime, + "eval_time", evalTime, + "total_time", totalTime, + "decision", decision, + "conversion_enabled", cfg.Enabled, + ) + + if cfg.Enabled && decision == stats.UserError4XX { + return &apiFuncResult{nil, &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity, + "query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer} + } + + return nil +} + func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) { for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) { for _, codec := range q.codecs { diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 8fd27a11454..47ec69876b6 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -536,6 +536,19 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query logMessage = append(logMessage, "query_storage_wall_time_seconds", sws) } + if maxFetch := stats.LoadMaxFetchTime(); maxFetch > 0 { + logMessage = append(logMessage, "max_fetch_time", maxFetch) + } + if maxEval := stats.LoadMaxEvalTime(); maxEval > 0 { + logMessage = append(logMessage, "max_eval_time", maxEval) + } + if maxQueue := stats.LoadMaxQueueWaitTime(); maxQueue > 0 { + logMessage = append(logMessage, "max_queue_wait_time", maxQueue) + } + if maxTotal := stats.LoadMaxTotalTime(); maxTotal > 0 { + logMessage = append(logMessage, "max_total_time", maxTotal) + } + if splitInterval > 0 { logMessage = append(logMessage, "split_interval", splitInterval.String()) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8ebc66a16dc..bc865982e57 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -103,6 +103,11 @@ type Config struct { DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` HonorProjectionHints bool `yaml:"honor_projection_hints"` + + // Timeout classification flags for converting 5XX to 4XX on expensive queries. + TimeoutClassificationEnabled bool `yaml:"timeout_classification_enabled"` + TimeoutClassificationDeadline time.Duration `yaml:"timeout_classification_deadline"` + TimeoutClassificationEvalThreshold time.Duration `yaml:"timeout_classification_eval_threshold"` } var ( @@ -114,6 +119,11 @@ var ( errInvalidSeriesBatchSize = errors.New("store gateway series batch size should be greater or equal than 0") errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet") + + errTimeoutClassificationDeadlineNotPositive = errors.New("timeout_classification_deadline must be positive when timeout classification is enabled") + errTimeoutClassificationEvalThresholdNotPositive = errors.New("timeout_classification_eval_threshold must be positive when timeout classification is enabled") + errTimeoutClassificationEvalThresholdExceedsDeadline = errors.New("timeout_classification_eval_threshold must be less than or equal to timeout_classification_deadline") + errTimeoutClassificationDeadlineExceedsTimeout = errors.New("timeout_classification_deadline must be less than the querier timeout") ) // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -158,6 +168,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.HonorProjectionHints, "querier.honor-projection-hints", false, "[Experimental] If true, querier will honor projection hints and only materialize requested labels. Today, projection is only effective when Parquet Queryable is enabled. Projection is only applied when not querying mixed block types (parquet and non-parquet) and not querying ingesters.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") + f.BoolVar(&cfg.TimeoutClassificationEnabled, "querier.timeout-classification-enabled", false, "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.") + f.DurationVar(&cfg.TimeoutClassificationDeadline, "querier.timeout-classification-deadline", 59*time.Second, "The total time before the querier proactively cancels a query for timeout classification.") + f.DurationVar(&cfg.TimeoutClassificationEvalThreshold, "querier.timeout-classification-eval-threshold", 40*time.Second, "Eval time threshold above which a timeout is classified as user error (4XX).") } // Validate the config @@ -197,6 +210,21 @@ func (cfg *Config) Validate() error { } } + if cfg.TimeoutClassificationEnabled { + if cfg.TimeoutClassificationDeadline <= 0 { + return errTimeoutClassificationDeadlineNotPositive + } + if cfg.TimeoutClassificationEvalThreshold <= 0 { + return errTimeoutClassificationEvalThresholdNotPositive + } + if cfg.TimeoutClassificationEvalThreshold > cfg.TimeoutClassificationDeadline { + return errTimeoutClassificationEvalThresholdExceedsDeadline + } + if cfg.TimeoutClassificationDeadline >= cfg.Timeout { + return errTimeoutClassificationDeadlineExceedsTimeout + } + } + if err := cfg.ThanosEngine.Validate(); err != nil { return err } diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index a834cd311e1..f7ad32725b3 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -23,6 +23,19 @@ type QueryStats struct { DataSelectMinTime int64 SplitInterval time.Duration m sync.Mutex + + // Phase tracking fields for timeout classification. + // Stored as UnixNano int64 for atomic operations. + queryStart int64 // nanosecond timestamp when query began + queueJoinTime int64 // nanosecond timestamp when request entered scheduler queue + queueLeaveTime int64 // nanosecond timestamp when request left scheduler queue + + // Max timing breakdown across sub-queries (nanoseconds). + // These use max() semantics during Merge rather than sum. + maxFetchTime int64 // max storage fetch time across sub-queries + maxEvalTime int64 // max PromQL evaluation time across sub-queries + maxQueueWaitTime int64 // max scheduler queue wait time across sub-queries + maxTotalTime int64 // max total sub-query time across sub-queries } // ContextWithEmptyStats returns a context with empty stats. @@ -306,6 +319,180 @@ func (s *QueryStats) LoadSplitInterval() time.Duration { return s.SplitInterval } +// SetQueryStart records when the query began execution. +func (s *QueryStats) SetQueryStart(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queryStart, t.UnixNano()) +} + +// LoadQueryStart returns the query start time. +func (s *QueryStats) LoadQueryStart() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queryStart) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// SetQueueJoinTime records when the request entered the scheduler queue. +func (s *QueryStats) SetQueueJoinTime(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queueJoinTime, t.UnixNano()) +} + +// LoadQueueJoinTime returns the queue join time. +func (s *QueryStats) LoadQueueJoinTime() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queueJoinTime) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// SetQueueLeaveTime records when the request left the scheduler queue. +func (s *QueryStats) SetQueueLeaveTime(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queueLeaveTime, t.UnixNano()) +} + +// LoadQueueLeaveTime returns the queue leave time. +func (s *QueryStats) LoadQueueLeaveTime() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queueLeaveTime) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// updateMaxDuration atomically updates a max duration field if the new value is larger. +func updateMaxDuration(addr *int64, val time.Duration) { + new := int64(val) + for { + old := atomic.LoadInt64(addr) + if new <= old { + return + } + if atomic.CompareAndSwapInt64(addr, old, new) { + return + } + } +} + +// UpdateMaxFetchTime updates the max fetch time if the provided value is larger. +func (s *QueryStats) UpdateMaxFetchTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxFetchTime, t) +} + +// LoadMaxFetchTime returns the max fetch time across sub-queries. +func (s *QueryStats) LoadMaxFetchTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxFetchTime)) +} + +// UpdateMaxEvalTime updates the max eval time if the provided value is larger. +func (s *QueryStats) UpdateMaxEvalTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxEvalTime, t) +} + +// LoadMaxEvalTime returns the max eval time across sub-queries. +func (s *QueryStats) LoadMaxEvalTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxEvalTime)) +} + +// UpdateMaxQueueWaitTime updates the max queue wait time if the provided value is larger. +func (s *QueryStats) UpdateMaxQueueWaitTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxQueueWaitTime, t) +} + +// LoadMaxQueueWaitTime returns the max queue wait time across sub-queries. +func (s *QueryStats) LoadMaxQueueWaitTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxQueueWaitTime)) +} + +// UpdateMaxTotalTime updates the max total time if the provided value is larger. +func (s *QueryStats) UpdateMaxTotalTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxTotalTime, t) +} + +// LoadMaxTotalTime returns the max total sub-query time across sub-queries. +func (s *QueryStats) LoadMaxTotalTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxTotalTime)) +} + +// ComputeAndStoreTimingBreakdown computes the timing breakdown from phase tracking +// fields and stores them as max values. This should be called after a sub-query +// completes, before stats are sent back to the frontend. +func (s *QueryStats) ComputeAndStoreTimingBreakdown() { + if s == nil { + return + } + + queryStart := s.LoadQueryStart() + if queryStart.IsZero() { + return + } + + fetchTime := s.LoadQueryStorageWallTime() + totalTime := time.Since(queryStart) + evalTime := totalTime - fetchTime + + var queueWaitTime time.Duration + queueJoin := s.LoadQueueJoinTime() + queueLeave := s.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + + s.UpdateMaxFetchTime(fetchTime) + s.UpdateMaxEvalTime(evalTime) + s.UpdateMaxQueueWaitTime(queueWaitTime) + s.UpdateMaxTotalTime(totalTime) +} + func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) { if s == nil { return @@ -396,6 +583,10 @@ func (s *QueryStats) Merge(other *QueryStats) { s.AddScannedSamples(other.LoadScannedSamples()) s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples())) s.AddExtraFields(other.LoadExtraFields()...) + s.UpdateMaxFetchTime(other.LoadMaxFetchTime()) + s.UpdateMaxEvalTime(other.LoadMaxEvalTime()) + s.UpdateMaxQueueWaitTime(other.LoadMaxQueueWaitTime()) + s.UpdateMaxTotalTime(other.LoadMaxTotalTime()) } func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { diff --git a/pkg/querier/stats/timeout_decision.go b/pkg/querier/stats/timeout_decision.go new file mode 100644 index 00000000000..e1848fca78d --- /dev/null +++ b/pkg/querier/stats/timeout_decision.go @@ -0,0 +1,47 @@ +package stats + +import "time" + +// TimeoutDecision represents the classification of a query timeout. +type TimeoutDecision int + +const ( + // Default5XX means return 503 (current behavior). + Default5XX TimeoutDecision = iota + // UserError4XX means the query is too expensive, return 422. + UserError4XX +) + +// PhaseTrackerConfig holds configurable thresholds for timeout classification. +type PhaseTrackerConfig struct { + // TotalTimeout is the total time before the querier cancels the query context. + TotalTimeout time.Duration + + // EvalTimeThreshold is the eval time above which a timeout is classified as user error (4XX). + EvalTimeThreshold time.Duration + + // Enabled controls whether the 5XX-to-4XX conversion is active. + Enabled bool +} + +// DecideTimeoutResponse inspects QueryStats phase timings and returns a TimeoutDecision. +// It returns UserError4XX if eval time exceeds the threshold, Default5XX otherwise. +// It is a pure function that does not modify stats or cfg. +func DecideTimeoutResponse(stats *QueryStats, cfg PhaseTrackerConfig) TimeoutDecision { + if stats == nil { + return Default5XX + } + + queryStart := stats.LoadQueryStart() + if queryStart.IsZero() { + return Default5XX + } + + evalTime := time.Since(queryStart) - stats.LoadQueryStorageWallTime() + + if evalTime > cfg.EvalTimeThreshold { + return UserError4XX + } + + return Default5XX +} diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 88f7f311393..923c9576fc4 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -156,6 +156,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) } + // Compute timing breakdown before sending stats back to the frontend. + stats.ComputeAndStoreTimingBreakdown() + // Ensure responses that are too big are not retried. if len(response.Body) >= fp.maxMessageSize { errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize) diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 3bba5980442..eb9d9b138b1 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -193,6 +193,9 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) } + // Compute timing breakdown before sending stats back to the frontend. + stats.ComputeAndStoreTimingBreakdown() + if err = ctx.Err(); err != nil { return } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 882ed166174..3ee91e80f33 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -424,6 +424,11 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr req.enqueueTime = now req.ctxCancel = cancel + // Record queue join time for timeout classification phase tracking. + if qStats := stats.FromContext(ctx); qStats != nil { + qStats.SetQueueJoinTime(now) + } + // aggregate the max queriers limit in the case of a multi tenant query tenantIDs, err := users.TenantIDsFromOrgID(userID) if err != nil { @@ -527,6 +532,11 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL r := req.(*schedulerRequest) + // Record queue leave time for timeout classification phase tracking. + if qStats := stats.FromContext(r.ctx); qStats != nil { + qStats.SetQueueLeaveTime(time.Now()) + } + s.queueDuration.Observe(time.Since(r.enqueueTime).Seconds()) r.queueSpan.Finish() From e5fe31a4f13122a4bec1e10447a0a0ee6a11dad3 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 26 Mar 2026 15:24:55 -0700 Subject: [PATCH 2/8] Fix 504 default return, disable retry on querier timeout Signed-off-by: Essam Eldaly --- pkg/api/queryapi/query_api.go | 5 +++++ pkg/api/queryapi/util.go | 7 ++++--- pkg/frontend/transport/retry.go | 11 ++++++++++- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index ede00d471b0..ca58f5fc6df 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -383,6 +383,11 @@ func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryS "query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer} } + if cfg.Enabled { + return &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusGatewayTimeout, + ErrUpstreamRequestTimeout)}, warnings, closer} + } + return nil } diff --git a/pkg/api/queryapi/util.go b/pkg/api/queryapi/util.go index e9e43e8cb27..ea35f1b6ee4 100644 --- a/pkg/api/queryapi/util.go +++ b/pkg/api/queryapi/util.go @@ -16,9 +16,10 @@ import ( ) var ( - ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") - ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") - ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") + ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") + ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") + ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") + ErrUpstreamRequestTimeout = "upstream request timeout" ) func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) { diff --git a/pkg/frontend/transport/retry.go b/pkg/frontend/transport/retry.go index 4e2997a84ac..e28abc72efb 100644 --- a/pkg/frontend/transport/retry.go +++ b/pkg/frontend/transport/retry.go @@ -11,6 +11,7 @@ import ( "github.com/thanos-io/thanos/pkg/pool" "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -79,7 +80,15 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error) func isBodyRetryable(body string) bool { // If pool exhausted, retry at query frontend might make things worse. // Rely on retries at querier level only. - return !strings.Contains(body, pool.ErrPoolExhausted.Error()) + if strings.Contains(body, pool.ErrPoolExhausted.Error()) { + return false + } + + // If request timed out upstream, there isnt enough time to retry. + if strings.Contains(body, queryapi.ErrUpstreamRequestTimeout) { + return false + } + return true } func yoloString(b []byte) string { From 792e6c3ffbb3123fb08248d730aa2c665d07eb57 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 30 Mar 2026 15:46:07 -0700 Subject: [PATCH 3/8] Add tests Signed-off-by: Essam Eldaly --- pkg/api/queryapi/query_api_test.go | 8 +- pkg/querier/querier_test.go | 65 +++++++++++++++ pkg/querier/stats/stats_test.go | 93 ++++++++++++++++++++++ pkg/querier/stats/timeout_decision_test.go | 49 ++++++++++++ 4 files changed, 211 insertions(+), 4 deletions(-) create mode 100644 pkg/querier/stats/timeout_decision_test.go diff --git a/pkg/api/queryapi/query_api_test.go b/pkg/api/queryapi/query_api_test.go index 2a0ce0cbc99..7dd13ba7549 100644 --- a/pkg/api/queryapi/query_api_test.go +++ b/pkg/api/queryapi/query_api_test.go @@ -183,7 +183,7 @@ func Test_CustomAPI(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler)) @@ -244,7 +244,7 @@ func Test_InvalidCodec(t *testing.T) { }, } - queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*")) + queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler)) @@ -285,7 +285,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) { }, } - queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler)) @@ -441,7 +441,7 @@ func Test_Logicalplan_Requests(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler)) router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler)) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 4a13dae9aaf..5b2c305bd0c 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1679,6 +1679,71 @@ func TestConfig_Validate(t *testing.T) { }, expected: nil, }, + "should pass with valid timeout classification config": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 58 * time.Second + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + }, + "should fail with zero timeout classification deadline": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 0 + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + }, + expected: errTimeoutClassificationDeadlineNotPositive, + }, + "should fail with negative timeout classification eval threshold": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 58 * time.Second + cfg.TimeoutClassificationEvalThreshold = -1 * time.Second + }, + expected: errTimeoutClassificationEvalThresholdNotPositive, + }, + "should fail when eval threshold exceeds deadline": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 30 * time.Second + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + }, + expected: errTimeoutClassificationEvalThresholdExceedsDeadline, + }, + "should pass when eval threshold equals deadline": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 45 * time.Second + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + }, + "should fail when deadline equals engine timeout": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 2 * time.Minute + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + expected: errTimeoutClassificationDeadlineExceedsTimeout, + }, + "should fail when deadline exceeds engine timeout": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 3 * time.Minute + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + expected: errTimeoutClassificationDeadlineExceedsTimeout, + }, + "should pass with timeout classification disabled even with invalid values": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = false + cfg.TimeoutClassificationDeadline = 0 + cfg.TimeoutClassificationEvalThreshold = -1 * time.Second + }, + }, } for testName, testData := range tests { diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index 7908d06773d..f19c9fc389d 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -282,3 +282,96 @@ func checkExtraFields(t *testing.T, expected, actual []any) { assert.Equal(t, expectedMap, actualMap) } + +func TestStats_QueryStart(t *testing.T) { + t.Run("set and load query start", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + now := time.Now() + stats.SetQueryStart(now) + + loaded := stats.LoadQueryStart() + assert.Equal(t, now.UnixNano(), loaded.UnixNano()) + }) + + t.Run("nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.SetQueryStart(time.Now()) + + assert.True(t, stats.LoadQueryStart().IsZero()) + }) + + t.Run("zero value when unset", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + + assert.True(t, stats.LoadQueryStart().IsZero()) + }) +} + +func TestStats_QueueJoinTime(t *testing.T) { + t.Run("set and load queue join time", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + now := time.Now() + stats.SetQueueJoinTime(now) + + loaded := stats.LoadQueueJoinTime() + assert.Equal(t, now.UnixNano(), loaded.UnixNano()) + }) + + t.Run("nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.SetQueueJoinTime(time.Now()) + + assert.True(t, stats.LoadQueueJoinTime().IsZero()) + }) + + t.Run("zero value when unset", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + + assert.True(t, stats.LoadQueueJoinTime().IsZero()) + }) +} + +func TestStats_QueueLeaveTime(t *testing.T) { + t.Run("set and load queue leave time", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + now := time.Now() + stats.SetQueueLeaveTime(now) + + loaded := stats.LoadQueueLeaveTime() + assert.Equal(t, now.UnixNano(), loaded.UnixNano()) + }) + + t.Run("nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.SetQueueLeaveTime(time.Now()) + + assert.True(t, stats.LoadQueueLeaveTime().IsZero()) + }) + + t.Run("zero value when unset", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + + assert.True(t, stats.LoadQueueLeaveTime().IsZero()) + }) +} + +func TestStats_Merge_DoesNotCopyPhaseTrackingFields(t *testing.T) { + t.Run("merge does not copy phase tracking fields", func(t *testing.T) { + source := &QueryStats{} + source.SetQueryStart(time.Now()) + source.SetQueueJoinTime(time.Now()) + source.SetQueueLeaveTime(time.Now()) + source.AddWallTime(time.Second) + + target := &QueryStats{} + target.Merge(source) + + // Phase tracking fields should NOT be copied + assert.True(t, target.LoadQueryStart().IsZero()) + assert.True(t, target.LoadQueueJoinTime().IsZero()) + assert.True(t, target.LoadQueueLeaveTime().IsZero()) + + // Regular fields should still be merged + assert.Equal(t, time.Second, target.LoadWallTime()) + }) +} diff --git a/pkg/querier/stats/timeout_decision_test.go b/pkg/querier/stats/timeout_decision_test.go new file mode 100644 index 00000000000..d6f9511ceac --- /dev/null +++ b/pkg/querier/stats/timeout_decision_test.go @@ -0,0 +1,49 @@ +package stats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDecideTimeoutResponse_NilStats(t *testing.T) { + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, Default5XX, DecideTimeoutResponse(nil, cfg)) +} + +func TestDecideTimeoutResponse_ZeroQueryStart(t *testing.T) { + stats := &QueryStats{} + // queryStart is zero by default (never set). + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, Default5XX, DecideTimeoutResponse(stats, cfg)) +} + +func TestDecideTimeoutResponse_EvalBelowThreshold(t *testing.T) { + stats := &QueryStats{} + // Set queryStart to now so evalTime ≈ 0, well below threshold. + stats.SetQueryStart(time.Now()) + // No storage wall time, so evalTime = time.Since(now) - 0 ≈ 0. + + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, Default5XX, DecideTimeoutResponse(stats, cfg)) +} + +func TestDecideTimeoutResponse_EvalAboveThreshold(t *testing.T) { + stats := &QueryStats{} + // Set queryStart 60s in the past so time.Since(queryStart) ≈ 60s. + stats.SetQueryStart(time.Now().Add(-60 * time.Second)) + // Set storageWallTime to 5s so evalTime ≈ 60s - 5s = 55s > 45s threshold. + stats.AddQueryStorageWallTime(5 * time.Second) + + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, UserError4XX, DecideTimeoutResponse(stats, cfg)) +} From 627b92e01da760f36193ff75a370c8e48ba64464 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 30 Mar 2026 15:59:00 -0700 Subject: [PATCH 4/8] make check doc Signed-off-by: Essam Eldaly --- docs/blocks-storage/querier.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index b1aa703feed..6904bf96a19 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -322,6 +322,20 @@ querier: # mixed block types (parquet and non-parquet) and not querying ingesters. # CLI flag: -querier.honor-projection-hints [honor_projection_hints: | default = false] + + # If true, classify query timeouts as 4XX (user error) or 5XX (system error) + # based on phase timing. + # CLI flag: -querier.timeout-classification-enabled + [timeout_classification_enabled: | default = false] + + # The total time before the querier proactively cancels a query for timeout + # classification. + # CLI flag: -querier.timeout-classification-deadline + [timeout_classification_deadline: | default = 59s] + + # Eval time threshold above which a timeout is classified as user error (4XX). + # CLI flag: -querier.timeout-classification-eval-threshold + [timeout_classification_eval_threshold: | default = 40s] ``` ### `blocks_storage_config` From 101df716be639b46e932f9bf765cca652a3b3309 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 30 Mar 2026 15:59:12 -0700 Subject: [PATCH 5/8] make check doc Signed-off-by: Essam Eldaly --- docs/configuration/config-file-reference.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7c1cd7265df..8ac6f3731a0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4959,6 +4959,20 @@ thanos_engine: # types (parquet and non-parquet) and not querying ingesters. # CLI flag: -querier.honor-projection-hints [honor_projection_hints: | default = false] + +# If true, classify query timeouts as 4XX (user error) or 5XX (system error) +# based on phase timing. +# CLI flag: -querier.timeout-classification-enabled +[timeout_classification_enabled: | default = false] + +# The total time before the querier proactively cancels a query for timeout +# classification. +# CLI flag: -querier.timeout-classification-deadline +[timeout_classification_deadline: | default = 59s] + +# Eval time threshold above which a timeout is classified as user error (4XX). +# CLI flag: -querier.timeout-classification-eval-threshold +[timeout_classification_eval_threshold: | default = 40s] ``` ### `query_frontend_config` From 67bc01fd9ba1f5c3828bd5ab6a6f1518972f3501 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 30 Mar 2026 16:01:15 -0700 Subject: [PATCH 6/8] fix QueryStats new max stats Signed-off-by: Essam Eldaly --- pkg/querier/stats/stats.go | 30 +-- pkg/querier/stats/stats.pb.go | 398 ++++++++++++++++++++++++++-------- pkg/querier/stats/stats.proto | 6 + 3 files changed, 317 insertions(+), 117 deletions(-) diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index f7ad32725b3..bb1662f492f 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -29,13 +29,6 @@ type QueryStats struct { queryStart int64 // nanosecond timestamp when query began queueJoinTime int64 // nanosecond timestamp when request entered scheduler queue queueLeaveTime int64 // nanosecond timestamp when request left scheduler queue - - // Max timing breakdown across sub-queries (nanoseconds). - // These use max() semantics during Merge rather than sum. - maxFetchTime int64 // max storage fetch time across sub-queries - maxEvalTime int64 // max PromQL evaluation time across sub-queries - maxQueueWaitTime int64 // max scheduler queue wait time across sub-queries - maxTotalTime int64 // max total sub-query time across sub-queries } // ContextWithEmptyStats returns a context with empty stats. @@ -404,63 +397,56 @@ func (s *QueryStats) UpdateMaxFetchTime(t time.Duration) { if s == nil { return } - updateMaxDuration(&s.maxFetchTime, t) + updateMaxDuration((*int64)(&s.MaxFetchTime), t) } -// LoadMaxFetchTime returns the max fetch time across sub-queries. func (s *QueryStats) LoadMaxFetchTime() time.Duration { if s == nil { return 0 } - return time.Duration(atomic.LoadInt64(&s.maxFetchTime)) + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxFetchTime))) } -// UpdateMaxEvalTime updates the max eval time if the provided value is larger. func (s *QueryStats) UpdateMaxEvalTime(t time.Duration) { if s == nil { return } - updateMaxDuration(&s.maxEvalTime, t) + updateMaxDuration((*int64)(&s.MaxEvalTime), t) } -// LoadMaxEvalTime returns the max eval time across sub-queries. func (s *QueryStats) LoadMaxEvalTime() time.Duration { if s == nil { return 0 } - return time.Duration(atomic.LoadInt64(&s.maxEvalTime)) + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxEvalTime))) } -// UpdateMaxQueueWaitTime updates the max queue wait time if the provided value is larger. func (s *QueryStats) UpdateMaxQueueWaitTime(t time.Duration) { if s == nil { return } - updateMaxDuration(&s.maxQueueWaitTime, t) + updateMaxDuration((*int64)(&s.MaxQueueWaitTime), t) } -// LoadMaxQueueWaitTime returns the max queue wait time across sub-queries. func (s *QueryStats) LoadMaxQueueWaitTime() time.Duration { if s == nil { return 0 } - return time.Duration(atomic.LoadInt64(&s.maxQueueWaitTime)) + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxQueueWaitTime))) } -// UpdateMaxTotalTime updates the max total time if the provided value is larger. func (s *QueryStats) UpdateMaxTotalTime(t time.Duration) { if s == nil { return } - updateMaxDuration(&s.maxTotalTime, t) + updateMaxDuration((*int64)(&s.MaxTotalTime), t) } -// LoadMaxTotalTime returns the max total sub-query time across sub-queries. func (s *QueryStats) LoadMaxTotalTime() time.Duration { if s == nil { return 0 } - return time.Duration(atomic.LoadInt64(&s.maxTotalTime)) + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxTotalTime))) } // ComputeAndStoreTimingBreakdown computes the timing breakdown from phase tracking diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index 61bbc9da9c1..f0e1880ae85 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -65,6 +65,12 @@ type Stats struct { // The highest count of samples considered while evaluating a query. // Equal to PeakSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go PeakSamples uint64 `protobuf:"varint,14,opt,name=peak_samples,json=peakSamples,proto3" json:"peak_samples,omitempty"` + // Max timing breakdown across sub-queries for timeout classification. + // These use max() semantics during Merge rather than sum. + MaxFetchTime time.Duration `protobuf:"bytes,15,opt,name=max_fetch_time,json=maxFetchTime,proto3,stdduration" json:"max_fetch_time"` + MaxEvalTime time.Duration `protobuf:"bytes,16,opt,name=max_eval_time,json=maxEvalTime,proto3,stdduration" json:"max_eval_time"` + MaxQueueWaitTime time.Duration `protobuf:"bytes,17,opt,name=max_queue_wait_time,json=maxQueueWaitTime,proto3,stdduration" json:"max_queue_wait_time"` + MaxTotalTime time.Duration `protobuf:"bytes,18,opt,name=max_total_time,json=maxTotalTime,proto3,stdduration" json:"max_total_time"` } func (m *Stats) Reset() { *m = Stats{} } @@ -197,6 +203,34 @@ func (m *Stats) GetPeakSamples() uint64 { return 0 } +func (m *Stats) GetMaxFetchTime() time.Duration { + if m != nil { + return m.MaxFetchTime + } + return 0 +} + +func (m *Stats) GetMaxEvalTime() time.Duration { + if m != nil { + return m.MaxEvalTime + } + return 0 +} + +func (m *Stats) GetMaxQueueWaitTime() time.Duration { + if m != nil { + return m.MaxQueueWaitTime + } + return 0 +} + +func (m *Stats) GetMaxTotalTime() time.Duration { + if m != nil { + return m.MaxTotalTime + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") proto.RegisterMapType((map[string]string)(nil), "stats.Stats.ExtraFieldsEntry") @@ -205,43 +239,48 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 574 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcf, 0x6e, 0xd3, 0x30, - 0x18, 0x8f, 0xb7, 0x75, 0x2c, 0x4e, 0x37, 0x46, 0x28, 0x22, 0x9b, 0x84, 0xd7, 0x31, 0x24, 0x7a, - 0x40, 0x19, 0x2a, 0x17, 0x04, 0x12, 0x9a, 0xba, 0x0d, 0x38, 0x20, 0x04, 0xed, 0x24, 0xa4, 0x5d, - 0x2c, 0xb7, 0x75, 0xd3, 0xa8, 0x69, 0x52, 0x12, 0x87, 0x91, 0x1b, 0x8f, 0xc0, 0x91, 0x47, 0xe0, - 0x51, 0x7a, 0xa3, 0xc7, 0x9d, 0x06, 0x4d, 0x2f, 0x1c, 0xf7, 0x08, 0xc8, 0x9f, 0x9d, 0x16, 0x26, - 0x81, 0xb8, 0xc5, 0xdf, 0xef, 0x8f, 0xfc, 0xfb, 0x7d, 0x31, 0xb6, 0x12, 0xc1, 0x44, 0xe2, 0x8e, - 0xe2, 0x48, 0x44, 0x76, 0x09, 0x0e, 0xdb, 0x15, 0x2f, 0xf2, 0x22, 0x98, 0xec, 0xcb, 0x2f, 0x05, - 0x6e, 0x13, 0x2f, 0x8a, 0xbc, 0x80, 0xef, 0xc3, 0xa9, 0x9d, 0xf6, 0xf6, 0xbb, 0x69, 0xcc, 0x84, - 0x1f, 0x85, 0x1a, 0xdf, 0xba, 0x8a, 0xb3, 0x30, 0x53, 0xd0, 0xdd, 0x6f, 0xab, 0xb8, 0xd4, 0x92, - 0xd6, 0xf6, 0x01, 0x36, 0xcf, 0x58, 0x10, 0x50, 0xe1, 0x0f, 0xb9, 0x83, 0xaa, 0xa8, 0x66, 0xd5, - 0xb7, 0x5c, 0x25, 0x74, 0x0b, 0xa1, 0x7b, 0xa4, 0x8d, 0x1b, 0x6b, 0xe3, 0x8b, 0x1d, 0xe3, 0xcb, - 0xf7, 0x1d, 0xd4, 0x5c, 0x93, 0xaa, 0x13, 0x7f, 0xc8, 0xed, 0x87, 0xb8, 0xd2, 0xe3, 0xa2, 0xd3, - 0xe7, 0x5d, 0x9a, 0xf0, 0xd8, 0xe7, 0x09, 0xed, 0x44, 0x69, 0x28, 0x9c, 0xa5, 0x2a, 0xaa, 0xad, - 0x34, 0x6d, 0x8d, 0xb5, 0x00, 0x3a, 0x94, 0x88, 0xed, 0xe2, 0x9b, 0x85, 0xa2, 0xd3, 0x4f, 0xc3, - 0x01, 0x6d, 0x67, 0x82, 0x27, 0xce, 0x32, 0x08, 0x6e, 0x68, 0xe8, 0x50, 0x22, 0x0d, 0x09, 0xd8, - 0x0f, 0x70, 0xe1, 0x42, 0xbb, 0x4c, 0x30, 0x4d, 0x5f, 0x01, 0xfa, 0xa6, 0x46, 0x8e, 0x98, 0x60, - 0x8a, 0x7d, 0x80, 0xcb, 0xfc, 0xa3, 0x88, 0x19, 0xed, 0xf9, 0x3c, 0xe8, 0x26, 0x4e, 0xa9, 0xba, - 0x5c, 0xb3, 0xea, 0x77, 0x5c, 0xd5, 0x2b, 0xa4, 0x76, 0x8f, 0x25, 0xe1, 0x39, 0xe0, 0xc7, 0xa1, - 0x88, 0xb3, 0xa6, 0xc5, 0x17, 0x93, 0xdf, 0x13, 0xc1, 0xfd, 0x8a, 0x44, 0xab, 0x7f, 0x24, 0x82, - 0x0b, 0xea, 0x44, 0x75, 0x7c, 0x6b, 0xde, 0x01, 0x1b, 0x8e, 0x82, 0x79, 0x09, 0xd7, 0x40, 0x52, - 0xc4, 0x6d, 0x29, 0x4c, 0x69, 0x76, 0xb1, 0x19, 0xf8, 0x43, 0x5f, 0xd0, 0xbe, 0x2f, 0x9c, 0xb5, - 0x2a, 0xaa, 0x99, 0x8d, 0x95, 0xf1, 0x85, 0xac, 0x16, 0xc6, 0x2f, 0x7d, 0x61, 0xef, 0xe1, 0xf5, - 0x64, 0x14, 0xf8, 0x82, 0xbe, 0x4f, 0xa1, 0x3e, 0xc7, 0x04, 0xbb, 0x32, 0x0c, 0xdf, 0xaa, 0x99, - 0x7d, 0x8a, 0x6f, 0x4b, 0x38, 0xa3, 0x89, 0x88, 0x62, 0xe6, 0x71, 0xba, 0xd8, 0x27, 0xfe, 0xff, - 0x7d, 0x56, 0xc0, 0xa3, 0xa5, 0x2c, 0xde, 0x15, 0xbb, 0x7d, 0x8d, 0xef, 0x49, 0x57, 0x4e, 0x3d, - 0x26, 0xf8, 0x19, 0xcb, 0xa8, 0x88, 0x52, 0x48, 0x39, 0x8a, 0x12, 0xe1, 0x87, 0x5e, 0x11, 0xd3, - 0x82, 0x7b, 0x55, 0x81, 0xfb, 0x42, 0x51, 0x4f, 0x14, 0xf3, 0x8d, 0x26, 0xaa, 0xcc, 0xaf, 0xf0, - 0xde, 0x3f, 0xfd, 0xf4, 0x6a, 0xcb, 0x60, 0xb7, 0xf3, 0x77, 0x3b, 0xb5, 0xe9, 0xfb, 0xf8, 0x7a, - 0xd2, 0x61, 0x61, 0xb8, 0x68, 0xdd, 0x59, 0x07, 0xe5, 0x86, 0x1e, 0xeb, 0xbe, 0xed, 0x5d, 0x5c, - 0x1e, 0x71, 0x36, 0x98, 0xb3, 0x36, 0x80, 0x65, 0xc9, 0x99, 0xa6, 0x6c, 0x3f, 0xc3, 0x9b, 0x57, - 0x7f, 0x0a, 0x7b, 0x13, 0x2f, 0x0f, 0x78, 0x06, 0xaf, 0xc2, 0x6c, 0xca, 0x4f, 0xbb, 0x82, 0x4b, - 0x1f, 0x58, 0x90, 0x72, 0xf8, 0xb9, 0xcd, 0xa6, 0x3a, 0x3c, 0x59, 0x7a, 0x8c, 0x1a, 0x4f, 0x27, - 0x53, 0x62, 0x9c, 0x4f, 0x89, 0x71, 0x39, 0x25, 0xe8, 0x53, 0x4e, 0xd0, 0xd7, 0x9c, 0xa0, 0x71, - 0x4e, 0xd0, 0x24, 0x27, 0xe8, 0x47, 0x4e, 0xd0, 0xcf, 0x9c, 0x18, 0x97, 0x39, 0x41, 0x9f, 0x67, - 0xc4, 0x98, 0xcc, 0x88, 0x71, 0x3e, 0x23, 0xc6, 0xa9, 0x7a, 0xdf, 0xed, 0x55, 0xd8, 0xcc, 0xa3, - 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x67, 0xda, 0xe5, 0xfc, 0x03, 0x00, 0x00, + // 656 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4d, 0x4f, 0x13, 0x4f, + 0x18, 0xdf, 0x01, 0xca, 0x9f, 0x9d, 0x2d, 0x50, 0x96, 0xfe, 0xe3, 0x42, 0xe2, 0x50, 0xc4, 0xc4, + 0x1e, 0xcc, 0x62, 0xf0, 0x62, 0x34, 0x31, 0xa4, 0xbc, 0x69, 0x62, 0x8c, 0xb4, 0x24, 0x24, 0x5c, + 0x26, 0x43, 0x3b, 0x2c, 0x13, 0xf6, 0xa5, 0x74, 0x67, 0xa1, 0xbd, 0x19, 0x3f, 0x81, 0x47, 0x3f, + 0x82, 0x1f, 0x85, 0x23, 0x47, 0x4e, 0x28, 0xcb, 0xc5, 0x23, 0x1f, 0xc1, 0xcc, 0x33, 0xb3, 0xa0, + 0x24, 0x9a, 0x7a, 0xdb, 0x79, 0x7e, 0x2f, 0x99, 0xdf, 0xef, 0xc9, 0x0e, 0x76, 0x52, 0xc9, 0x64, + 0xea, 0x77, 0x7b, 0x89, 0x4c, 0xdc, 0x12, 0x1c, 0xe6, 0xab, 0x41, 0x12, 0x24, 0x30, 0x59, 0x56, + 0x5f, 0x1a, 0x9c, 0x27, 0x41, 0x92, 0x04, 0x21, 0x5f, 0x86, 0xd3, 0x7e, 0x76, 0xb0, 0xdc, 0xc9, + 0x7a, 0x4c, 0x8a, 0x24, 0x36, 0xf8, 0xdc, 0x7d, 0x9c, 0xc5, 0x03, 0x0d, 0x3d, 0xfa, 0x64, 0xe3, + 0x52, 0x4b, 0x59, 0xbb, 0xab, 0xd8, 0x3e, 0x65, 0x61, 0x48, 0xa5, 0x88, 0xb8, 0x87, 0x6a, 0xa8, + 0xee, 0xac, 0xcc, 0xf9, 0x5a, 0xe8, 0x17, 0x42, 0x7f, 0xdd, 0x18, 0x37, 0x26, 0xce, 0x2e, 0x17, + 0xac, 0x2f, 0xdf, 0x16, 0x50, 0x73, 0x42, 0xa9, 0x76, 0x44, 0xc4, 0xdd, 0x67, 0xb8, 0x7a, 0xc0, + 0x65, 0xfb, 0x90, 0x77, 0x68, 0xca, 0x7b, 0x82, 0xa7, 0xb4, 0x9d, 0x64, 0xb1, 0xf4, 0x46, 0x6a, + 0xa8, 0x3e, 0xd6, 0x74, 0x0d, 0xd6, 0x02, 0x68, 0x4d, 0x21, 0xae, 0x8f, 0x67, 0x0b, 0x45, 0xfb, + 0x30, 0x8b, 0x8f, 0xe8, 0xfe, 0x40, 0xf2, 0xd4, 0x1b, 0x05, 0xc1, 0x8c, 0x81, 0xd6, 0x14, 0xd2, + 0x50, 0x80, 0xfb, 0x14, 0x17, 0x2e, 0xb4, 0xc3, 0x24, 0x33, 0xf4, 0x31, 0xa0, 0x57, 0x0c, 0xb2, + 0xce, 0x24, 0xd3, 0xec, 0x55, 0x5c, 0xe6, 0x7d, 0xd9, 0x63, 0xf4, 0x40, 0xf0, 0xb0, 0x93, 0x7a, + 0xa5, 0xda, 0x68, 0xdd, 0x59, 0x79, 0xe8, 0xeb, 0x5e, 0x21, 0xb5, 0xbf, 0xa1, 0x08, 0x9b, 0x80, + 0x6f, 0xc4, 0xb2, 0x37, 0x68, 0x3a, 0xfc, 0x6e, 0xf2, 0x6b, 0x22, 0xb8, 0x5f, 0x91, 0x68, 0xfc, + 0xb7, 0x44, 0x70, 0x41, 0x93, 0x68, 0x05, 0xff, 0x7f, 0xdb, 0x01, 0x8b, 0xba, 0xe1, 0x6d, 0x09, + 0xff, 0x81, 0xa4, 0x88, 0xdb, 0xd2, 0x98, 0xd6, 0x2c, 0x62, 0x3b, 0x14, 0x91, 0x90, 0xf4, 0x50, + 0x48, 0x6f, 0xa2, 0x86, 0xea, 0x76, 0x63, 0xec, 0xec, 0x52, 0x55, 0x0b, 0xe3, 0x37, 0x42, 0xba, + 0x4b, 0x78, 0x32, 0xed, 0x86, 0x42, 0xd2, 0xe3, 0x0c, 0xea, 0xf3, 0x6c, 0xb0, 0x2b, 0xc3, 0x70, + 0x5b, 0xcf, 0xdc, 0x3d, 0xfc, 0x40, 0xc1, 0x03, 0x9a, 0xca, 0xa4, 0xc7, 0x02, 0x4e, 0xef, 0xf6, + 0x89, 0x87, 0xdf, 0x67, 0x15, 0x3c, 0x5a, 0xda, 0x62, 0xb7, 0xd8, 0xed, 0x7b, 0xfc, 0x58, 0xb9, + 0x72, 0x1a, 0x30, 0xc9, 0x4f, 0xd9, 0x80, 0xca, 0x24, 0x83, 0x94, 0xdd, 0x24, 0x95, 0x22, 0x0e, + 0x8a, 0x98, 0x0e, 0xdc, 0xab, 0x06, 0xdc, 0x2d, 0x4d, 0xdd, 0xd1, 0xcc, 0x0f, 0x86, 0xa8, 0x33, + 0xbf, 0xc3, 0x4b, 0x7f, 0xf5, 0x33, 0xab, 0x2d, 0x83, 0xdd, 0xc2, 0x9f, 0xed, 0xf4, 0xa6, 0x9f, + 0xe0, 0xe9, 0xb4, 0xcd, 0xe2, 0xf8, 0xae, 0x75, 0x6f, 0x12, 0x94, 0x53, 0x66, 0x6c, 0xfa, 0x76, + 0x17, 0x71, 0xb9, 0xcb, 0xd9, 0xd1, 0x2d, 0x6b, 0x0a, 0x58, 0x8e, 0x9a, 0x15, 0x94, 0xb7, 0x78, + 0x2a, 0x62, 0x7d, 0x0a, 0x8b, 0xd2, 0xe5, 0x4d, 0x0f, 0x5f, 0x5e, 0x39, 0x62, 0xfd, 0x4d, 0xa5, + 0x84, 0xd2, 0xb6, 0xf0, 0xa4, 0xb2, 0xe2, 0x27, 0xcc, 0xac, 0xa1, 0x32, 0xbc, 0x93, 0x13, 0xb1, + 0xfe, 0xc6, 0x09, 0xd3, 0xed, 0x37, 0xf1, 0xac, 0x32, 0x3a, 0xce, 0x78, 0xa6, 0xb6, 0x2a, 0xa4, + 0xb6, 0x9b, 0x19, 0xde, 0xae, 0x12, 0xb1, 0xfe, 0xb6, 0x92, 0xef, 0x32, 0x21, 0xc1, 0xd3, 0xe4, + 0x94, 0x89, 0x2c, 0x6e, 0xe7, 0xfe, 0x5b, 0xce, 0x1d, 0xa5, 0x54, 0x56, 0xf3, 0xaf, 0x71, 0xe5, + 0xfe, 0x7f, 0xe4, 0x56, 0xf0, 0xe8, 0x11, 0x1f, 0xc0, 0x43, 0x62, 0x37, 0xd5, 0xa7, 0x5b, 0xc5, + 0xa5, 0x13, 0x16, 0x66, 0x1c, 0xde, 0x03, 0xbb, 0xa9, 0x0f, 0x2f, 0x47, 0x5e, 0xa0, 0xc6, 0xab, + 0xf3, 0x2b, 0x62, 0x5d, 0x5c, 0x11, 0xeb, 0xe6, 0x8a, 0xa0, 0x8f, 0x39, 0x41, 0x5f, 0x73, 0x82, + 0xce, 0x72, 0x82, 0xce, 0x73, 0x82, 0xbe, 0xe7, 0x04, 0xfd, 0xc8, 0x89, 0x75, 0x93, 0x13, 0xf4, + 0xf9, 0x9a, 0x58, 0xe7, 0xd7, 0xc4, 0xba, 0xb8, 0x26, 0xd6, 0x9e, 0x7e, 0x12, 0xf7, 0xc7, 0xe1, + 0x9e, 0xcf, 0x7f, 0x06, 0x00, 0x00, 0xff, 0xff, 0x5b, 0x42, 0x1f, 0xae, 0x2f, 0x05, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -310,13 +349,25 @@ func (this *Stats) Equal(that interface{}) bool { if this.PeakSamples != that1.PeakSamples { return false } + if this.MaxFetchTime != that1.MaxFetchTime { + return false + } + if this.MaxEvalTime != that1.MaxEvalTime { + return false + } + if this.MaxQueueWaitTime != that1.MaxQueueWaitTime { + return false + } + if this.MaxTotalTime != that1.MaxTotalTime { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 18) + s := make([]string, 0, 22) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") @@ -344,6 +395,10 @@ func (this *Stats) GoString() string { s = append(s, "StoreGatewayTouchedPostingBytes: "+fmt.Sprintf("%#v", this.StoreGatewayTouchedPostingBytes)+",\n") s = append(s, "ScannedSamples: "+fmt.Sprintf("%#v", this.ScannedSamples)+",\n") s = append(s, "PeakSamples: "+fmt.Sprintf("%#v", this.PeakSamples)+",\n") + s = append(s, "MaxFetchTime: "+fmt.Sprintf("%#v", this.MaxFetchTime)+",\n") + s = append(s, "MaxEvalTime: "+fmt.Sprintf("%#v", this.MaxEvalTime)+",\n") + s = append(s, "MaxQueueWaitTime: "+fmt.Sprintf("%#v", this.MaxQueueWaitTime)+",\n") + s = append(s, "MaxTotalTime: "+fmt.Sprintf("%#v", this.MaxTotalTime)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -375,6 +430,44 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxTotalTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxTotalTime):]) + if err1 != nil { + return 0, err1 + } + i -= n1 + i = encodeVarintStats(dAtA, i, uint64(n1)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x92 + n2, err2 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxQueueWaitTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxQueueWaitTime):]) + if err2 != nil { + return 0, err2 + } + i -= n2 + i = encodeVarintStats(dAtA, i, uint64(n2)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x8a + n3, err3 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxEvalTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxEvalTime):]) + if err3 != nil { + return 0, err3 + } + i -= n3 + i = encodeVarintStats(dAtA, i, uint64(n3)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x82 + n4, err4 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxFetchTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxFetchTime):]) + if err4 != nil { + return 0, err4 + } + i -= n4 + i = encodeVarintStats(dAtA, i, uint64(n4)) + i-- + dAtA[i] = 0x7a if m.PeakSamples != 0 { i = encodeVarintStats(dAtA, i, uint64(m.PeakSamples)) i-- @@ -395,12 +488,12 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x58 } - n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.QueryStorageWallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.QueryStorageWallTime):]) - if err1 != nil { - return 0, err1 + n5, err5 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.QueryStorageWallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.QueryStorageWallTime):]) + if err5 != nil { + return 0, err5 } - i -= n1 - i = encodeVarintStats(dAtA, i, uint64(n1)) + i -= n5 + i = encodeVarintStats(dAtA, i, uint64(n5)) i-- dAtA[i] = 0x52 if m.SplitQueries != 0 { @@ -459,12 +552,12 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x10 } - n2, err2 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.WallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime):]) - if err2 != nil { - return 0, err2 + n6, err6 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.WallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime):]) + if err6 != nil { + return 0, err6 } - i -= n2 - i = encodeVarintStats(dAtA, i, uint64(n2)) + i -= n6 + i = encodeVarintStats(dAtA, i, uint64(n6)) i-- dAtA[i] = 0xa return len(dAtA) - i, nil @@ -533,6 +626,14 @@ func (m *Stats) Size() (n int) { if m.PeakSamples != 0 { n += 1 + sovStats(uint64(m.PeakSamples)) } + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxFetchTime) + n += 1 + l + sovStats(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxEvalTime) + n += 2 + l + sovStats(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxQueueWaitTime) + n += 2 + l + sovStats(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxTotalTime) + n += 2 + l + sovStats(uint64(l)) return n } @@ -571,6 +672,10 @@ func (this *Stats) String() string { `StoreGatewayTouchedPostingBytes:` + fmt.Sprintf("%v", this.StoreGatewayTouchedPostingBytes) + `,`, `ScannedSamples:` + fmt.Sprintf("%v", this.ScannedSamples) + `,`, `PeakSamples:` + fmt.Sprintf("%v", this.PeakSamples) + `,`, + `MaxFetchTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxFetchTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, + `MaxEvalTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxEvalTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, + `MaxQueueWaitTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxQueueWaitTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, + `MaxTotalTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxTotalTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -818,7 +923,7 @@ func (m *Stats) Unmarshal(dAtA []byte) error { if err != nil { return err } - if skippy < 0 { + if (skippy < 0) || (iNdEx+skippy) < 0 { return ErrInvalidLengthStats } if (iNdEx + skippy) > postIndex { @@ -1027,16 +1132,145 @@ func (m *Stats) Unmarshal(dAtA []byte) error { break } } + case 15: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxFetchTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxFetchTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 16: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxEvalTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxEvalTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 17: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxQueueWaitTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxQueueWaitTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 18: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxTotalTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxTotalTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) if err != nil { return err } - if skippy < 0 { - return ErrInvalidLengthStats - } - if (iNdEx + skippy) < 0 { + if (skippy < 0) || (iNdEx+skippy) < 0 { return ErrInvalidLengthStats } if (iNdEx + skippy) > l { @@ -1054,6 +1288,7 @@ func (m *Stats) Unmarshal(dAtA []byte) error { func skipStats(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 + depth := 0 for iNdEx < l { var wire uint64 for shift := uint(0); ; shift += 7 { @@ -1085,10 +1320,8 @@ func skipStats(dAtA []byte) (n int, err error) { break } } - return iNdEx, nil case 1: iNdEx += 8 - return iNdEx, nil case 2: var length int for shift := uint(0); ; shift += 7 { @@ -1109,55 +1342,30 @@ func skipStats(dAtA []byte) (n int, err error) { return 0, ErrInvalidLengthStats } iNdEx += length - if iNdEx < 0 { - return 0, ErrInvalidLengthStats - } - return iNdEx, nil case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowStats - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipStats(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - if iNdEx < 0 { - return 0, ErrInvalidLengthStats - } - } - return iNdEx, nil + depth++ case 4: - return iNdEx, nil + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupStats + } + depth-- case 5: iNdEx += 4 - return iNdEx, nil default: return 0, fmt.Errorf("proto: illegal wireType %d", wireType) } + if iNdEx < 0 { + return 0, ErrInvalidLengthStats + } + if depth == 0 { + return iNdEx, nil + } } - panic("unreachable") + return 0, io.ErrUnexpectedEOF } var ( - ErrInvalidLengthStats = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowStats = fmt.Errorf("proto: integer overflow") + ErrInvalidLengthStats = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowStats = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupStats = fmt.Errorf("proto: unexpected end of group") ) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 8e53e02ccc3..6d1fcf7457b 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -45,4 +45,10 @@ message Stats { // The highest count of samples considered while evaluating a query. // Equal to PeakSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go uint64 peak_samples = 14; + // Max timing breakdown across sub-queries for timeout classification. + // These use max() semantics during Merge rather than sum. + google.protobuf.Duration max_fetch_time = 15 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + google.protobuf.Duration max_eval_time = 16 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + google.protobuf.Duration max_queue_wait_time = 17 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + google.protobuf.Duration max_total_time = 18 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; } From d547ee0a9bd66379999def45009f168362c82353 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 30 Mar 2026 16:10:15 -0700 Subject: [PATCH 7/8] docs config schema Signed-off-by: Essam Eldaly --- schemas/cortex-config-schema.json | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 20cf970c35b..9344ccca7d9 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -6156,6 +6156,26 @@ "type": "string", "x-cli-flag": "querier.timeout", "x-format": "duration" + }, + "timeout_classification_deadline": { + "default": "59s", + "description": "The total time before the querier proactively cancels a query for timeout classification.", + "type": "string", + "x-cli-flag": "querier.timeout-classification-deadline", + "x-format": "duration" + }, + "timeout_classification_enabled": { + "default": false, + "description": "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.", + "type": "boolean", + "x-cli-flag": "querier.timeout-classification-enabled" + }, + "timeout_classification_eval_threshold": { + "default": "40s", + "description": "Eval time threshold above which a timeout is classified as user error (4XX).", + "type": "string", + "x-cli-flag": "querier.timeout-classification-eval-threshold", + "x-format": "duration" } }, "type": "object" From fb1f31269724c17d2f50be1821a46a878bab446b Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 30 Mar 2026 16:13:10 -0700 Subject: [PATCH 8/8] fix non-constant format string call Signed-off-by: Essam Eldaly --- pkg/api/queryapi/query_api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index ca58f5fc6df..4b06cf4f5f4 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -385,7 +385,7 @@ func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryS if cfg.Enabled { return &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusGatewayTimeout, - ErrUpstreamRequestTimeout)}, warnings, closer} + "%s", ErrUpstreamRequestTimeout)}, warnings, closer} } return nil