diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index b1aa703feed..6904bf96a19 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -322,6 +322,20 @@ querier: # mixed block types (parquet and non-parquet) and not querying ingesters. # CLI flag: -querier.honor-projection-hints [honor_projection_hints: | default = false] + + # If true, classify query timeouts as 4XX (user error) or 5XX (system error) + # based on phase timing. + # CLI flag: -querier.timeout-classification-enabled + [timeout_classification_enabled: | default = false] + + # The total time before the querier proactively cancels a query for timeout + # classification. + # CLI flag: -querier.timeout-classification-deadline + [timeout_classification_deadline: | default = 59s] + + # Eval time threshold above which a timeout is classified as user error (4XX). + # CLI flag: -querier.timeout-classification-eval-threshold + [timeout_classification_eval_threshold: | default = 40s] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7c1cd7265df..8ac6f3731a0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4959,6 +4959,20 @@ thanos_engine: # types (parquet and non-parquet) and not querying ingesters. # CLI flag: -querier.honor-projection-hints [honor_projection_hints: | default = false] + +# If true, classify query timeouts as 4XX (user error) or 5XX (system error) +# based on phase timing. +# CLI flag: -querier.timeout-classification-enabled +[timeout_classification_enabled: | default = false] + +# The total time before the querier proactively cancels a query for timeout +# classification. +# CLI flag: -querier.timeout-classification-deadline +[timeout_classification_deadline: | default = 59s] + +# Eval time threshold above which a timeout is classified as user error (4XX). +# CLI flag: -querier.timeout-classification-eval-threshold +[timeout_classification_eval_threshold: | default = 40s] ``` ### `query_frontend_config` diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 7f219896b7d..5852a65cbb1 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -286,7 +286,11 @@ func NewQuerierHandler( legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1")) api.Register(legacyPromRouter) - queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin) + queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, stats.PhaseTrackerConfig{ + TotalTimeout: querierCfg.TimeoutClassificationDeadline, + EvalTimeThreshold: querierCfg.TimeoutClassificationEvalThreshold, + Enabled: querierCfg.TimeoutClassificationEnabled, + }) requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger)) var apiHandler http.Handler diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index 83eed69ec8b..7125f59d667 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -21,18 +21,21 @@ import ( "github.com/cortexproject/cortex/pkg/distributed_execution" "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/api" + "github.com/cortexproject/cortex/pkg/util/requestmeta" ) type QueryAPI struct { - queryable storage.SampleAndChunkQueryable - queryEngine engine.QueryEngine - now func() time.Time - statsRenderer v1.StatsRenderer - logger log.Logger - codecs []v1.Codec - CORSOrigin *regexp.Regexp + queryable storage.SampleAndChunkQueryable + queryEngine engine.QueryEngine + now func() time.Time + statsRenderer v1.StatsRenderer + logger log.Logger + codecs []v1.Codec + CORSOrigin *regexp.Regexp + timeoutClassification stats.PhaseTrackerConfig } func NewQueryAPI( @@ -42,15 +45,17 @@ func NewQueryAPI( logger log.Logger, codecs []v1.Codec, CORSOrigin *regexp.Regexp, + timeoutClassification stats.PhaseTrackerConfig, ) *QueryAPI { return &QueryAPI{ - queryEngine: qe, - queryable: q, - statsRenderer: statsRenderer, - logger: logger, - codecs: codecs, - CORSOrigin: CORSOrigin, - now: time.Now, + queryEngine: qe, + queryable: q, + statsRenderer: statsRenderer, + logger: logger, + codecs: codecs, + CORSOrigin: CORSOrigin, + now: time.Now, + timeoutClassification: timeoutClassification, } } @@ -84,6 +89,11 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { } ctx := r.Context() + + // Always record query start time for phase tracking, regardless of feature flag. + queryStats := stats.FromContext(ctx) + queryStats.SetQueryStart(time.Now()) + if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc timeout, err := util.ParseDurationMs(to) @@ -95,6 +105,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } + cfg := q.timeoutClassification + ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg) + if cancel != nil { + defer cancel() + } + if earlyResult != nil { + return *earlyResult + } + opts, err := extractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} @@ -138,6 +157,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { res := qry.Exec(ctx) if res.Err != nil { + // If the context was cancelled/timed out, apply timeout classification. + if ctx.Err() != nil { + if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil { + return *classified + } + } + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } @@ -159,6 +185,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { } ctx := r.Context() + + // Always record query start time for phase tracking, regardless of feature flag. + queryStats := stats.FromContext(ctx) + queryStats.SetQueryStart(time.Now()) + if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc timeout, err := util.ParseDurationMs(to) @@ -170,6 +201,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } + cfg := q.timeoutClassification + ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg) + if cancel != nil { + defer cancel() + } + if earlyResult != nil { + return *earlyResult + } + opts, err := extractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} @@ -211,6 +251,13 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { res := qry.Exec(ctx) if res.Err != nil { + // If the context was cancelled/timed out, apply timeout classification. + if ctx.Err() != nil { + if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil { + return *classified + } + } + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } @@ -281,6 +328,76 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data any, w } } +// applyTimeoutClassification creates a proactive context timeout that fires before +// the PromQL engine's own timeout, adjusted for queue wait time. Returns the +// (possibly wrapped) context, an optional cancel func, and an optional early-exit +// result when the entire timeout budget was already consumed in the queue. +func applyTimeoutClassification(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig) (context.Context, context.CancelFunc, *apiFuncResult) { + if !cfg.Enabled { + return ctx, nil, nil + } + var queueWaitTime time.Duration + queueJoin := queryStats.LoadQueueJoinTime() + queueLeave := queryStats.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + effectiveTimeout := cfg.TotalTimeout - queueWaitTime + if effectiveTimeout <= 0 { + return ctx, nil, &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable, + "query timed out: query spent too long in scheduler queue")}, nil, nil} + } + ctx, cancel := context.WithTimeout(ctx, effectiveTimeout) + return ctx, cancel, nil +} + +// classifyTimeout inspects phase timings after a context cancellation/timeout +// and returns an apiFuncResult if the timeout should be converted to a 4XX user error. +// Returns nil if no conversion applies and the caller should use the default error path. +func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig, warnings annotations.Annotations, closer func()) *apiFuncResult { + // Record query end time so that ComputeAndStoreTimingBreakdown (called + // later in scheduler_processor.runRequest) reuses the same timestamp, + // producing identical numbers in querier and query-frontend logs. + queryStats.SetQueryEnd(time.Now()) + + decision := stats.DecideTimeoutResponse(queryStats, cfg) + + fetchTime := queryStats.LoadQueryStorageWallTime() + queryEnd := queryStats.LoadQueryEnd() + totalTime := queryEnd.Sub(queryStats.LoadQueryStart()) + evalTime := totalTime - fetchTime + var queueWaitTime time.Duration + queueJoin := queryStats.LoadQueueJoinTime() + queueLeave := queryStats.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + level.Warn(q.logger).Log( + "msg", "query timed out with classification", + "request_id", requestmeta.RequestIdFromContext(ctx), + "query_start", queryStats.LoadQueryStart(), + "query_end", queryEnd, + "queue_wait_time", queueWaitTime, + "fetch_time", fetchTime, + "eval_time", evalTime, + "total_time", totalTime, + "decision", decision, + "conversion_enabled", cfg.Enabled, + ) + + if cfg.Enabled && decision == stats.UserError4XX { + return &apiFuncResult{nil, &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity, + "query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer} + } + + if cfg.Enabled { + return &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusGatewayTimeout, + "%s", ErrUpstreamRequestTimeout)}, warnings, closer} + } + + return nil +} + func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) { for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) { for _, codec := range q.codecs { diff --git a/pkg/api/queryapi/query_api_test.go b/pkg/api/queryapi/query_api_test.go index 2a0ce0cbc99..7dd13ba7549 100644 --- a/pkg/api/queryapi/query_api_test.go +++ b/pkg/api/queryapi/query_api_test.go @@ -183,7 +183,7 @@ func Test_CustomAPI(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler)) @@ -244,7 +244,7 @@ func Test_InvalidCodec(t *testing.T) { }, } - queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*")) + queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler)) @@ -285,7 +285,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) { }, } - queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler)) @@ -441,7 +441,7 @@ func Test_Logicalplan_Requests(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{}) router := mux.NewRouter() router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler)) router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler)) diff --git a/pkg/api/queryapi/util.go b/pkg/api/queryapi/util.go index e9e43e8cb27..ea35f1b6ee4 100644 --- a/pkg/api/queryapi/util.go +++ b/pkg/api/queryapi/util.go @@ -16,9 +16,10 @@ import ( ) var ( - ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") - ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") - ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") + ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") + ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") + ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") + ErrUpstreamRequestTimeout = "upstream request timeout" ) func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) { diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 6591e056c7f..a7db04b1439 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -70,7 +70,8 @@ import ( ) var ( - errInvalidHTTPPrefix = errors.New("HTTP prefix should be empty or start with /") + errInvalidHTTPPrefix = errors.New("HTTP prefix should be empty or start with /") + errTimeoutClassificationRequiresQueryStats = errors.New("timeout classification requires query stats to be enabled (frontend.query-stats-enabled)") ) // The design pattern for Cortex is a series of config objects, which are @@ -228,6 +229,9 @@ func (c *Config) Validate(log log.Logger) error { if err := c.Querier.Validate(); err != nil { return errors.Wrap(err, "invalid querier config") } + if c.Querier.TimeoutClassificationEnabled && !c.Frontend.Handler.QueryStatsEnabled { + return errTimeoutClassificationRequiresQueryStats + } if err := c.IngesterClient.Validate(log); err != nil { return errors.Wrap(err, "invalid ingester_client config") } diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 62e21b50c15..faf4e68e87b 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -239,6 +239,40 @@ func TestConfigValidation(t *testing.T) { }, expectedError: fmt.Errorf("unsupported name validation scheme: unset"), }, + { + name: "should fail when timeout classification is enabled but query stats is disabled", + getTestConfig: func() *Config { + configuration := newDefaultConfig() + configuration.Querier.TimeoutClassificationEnabled = true + configuration.Querier.TimeoutClassificationDeadline = 59 * time.Second + configuration.Querier.TimeoutClassificationEvalThreshold = 40 * time.Second + configuration.Frontend.Handler.QueryStatsEnabled = false + return configuration + }, + expectedError: errTimeoutClassificationRequiresQueryStats, + }, + { + name: "should pass when timeout classification is enabled and query stats is enabled", + getTestConfig: func() *Config { + configuration := newDefaultConfig() + configuration.Querier.TimeoutClassificationEnabled = true + configuration.Querier.TimeoutClassificationDeadline = 59 * time.Second + configuration.Querier.TimeoutClassificationEvalThreshold = 40 * time.Second + configuration.Frontend.Handler.QueryStatsEnabled = true + return configuration + }, + expectedError: nil, + }, + { + name: "should pass when timeout classification is disabled and query stats is disabled", + getTestConfig: func() *Config { + configuration := newDefaultConfig() + configuration.Querier.TimeoutClassificationEnabled = false + configuration.Frontend.Handler.QueryStatsEnabled = false + return configuration + }, + expectedError: nil, + }, } { t.Run(tc.name, func(t *testing.T) { err := tc.getTestConfig().Validate(nil) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 8fd27a11454..47ec69876b6 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -536,6 +536,19 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query logMessage = append(logMessage, "query_storage_wall_time_seconds", sws) } + if maxFetch := stats.LoadMaxFetchTime(); maxFetch > 0 { + logMessage = append(logMessage, "max_fetch_time", maxFetch) + } + if maxEval := stats.LoadMaxEvalTime(); maxEval > 0 { + logMessage = append(logMessage, "max_eval_time", maxEval) + } + if maxQueue := stats.LoadMaxQueueWaitTime(); maxQueue > 0 { + logMessage = append(logMessage, "max_queue_wait_time", maxQueue) + } + if maxTotal := stats.LoadMaxTotalTime(); maxTotal > 0 { + logMessage = append(logMessage, "max_total_time", maxTotal) + } + if splitInterval > 0 { logMessage = append(logMessage, "split_interval", splitInterval.String()) } diff --git a/pkg/frontend/transport/retry.go b/pkg/frontend/transport/retry.go index 4e2997a84ac..e28abc72efb 100644 --- a/pkg/frontend/transport/retry.go +++ b/pkg/frontend/transport/retry.go @@ -11,6 +11,7 @@ import ( "github.com/thanos-io/thanos/pkg/pool" "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -79,7 +80,15 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error) func isBodyRetryable(body string) bool { // If pool exhausted, retry at query frontend might make things worse. // Rely on retries at querier level only. - return !strings.Contains(body, pool.ErrPoolExhausted.Error()) + if strings.Contains(body, pool.ErrPoolExhausted.Error()) { + return false + } + + // If request timed out upstream, there isnt enough time to retry. + if strings.Contains(body, queryapi.ErrUpstreamRequestTimeout) { + return false + } + return true } func yoloString(b []byte) string { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8ebc66a16dc..bc865982e57 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -103,6 +103,11 @@ type Config struct { DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` HonorProjectionHints bool `yaml:"honor_projection_hints"` + + // Timeout classification flags for converting 5XX to 4XX on expensive queries. + TimeoutClassificationEnabled bool `yaml:"timeout_classification_enabled"` + TimeoutClassificationDeadline time.Duration `yaml:"timeout_classification_deadline"` + TimeoutClassificationEvalThreshold time.Duration `yaml:"timeout_classification_eval_threshold"` } var ( @@ -114,6 +119,11 @@ var ( errInvalidSeriesBatchSize = errors.New("store gateway series batch size should be greater or equal than 0") errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet") + + errTimeoutClassificationDeadlineNotPositive = errors.New("timeout_classification_deadline must be positive when timeout classification is enabled") + errTimeoutClassificationEvalThresholdNotPositive = errors.New("timeout_classification_eval_threshold must be positive when timeout classification is enabled") + errTimeoutClassificationEvalThresholdExceedsDeadline = errors.New("timeout_classification_eval_threshold must be less than or equal to timeout_classification_deadline") + errTimeoutClassificationDeadlineExceedsTimeout = errors.New("timeout_classification_deadline must be less than the querier timeout") ) // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -158,6 +168,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.HonorProjectionHints, "querier.honor-projection-hints", false, "[Experimental] If true, querier will honor projection hints and only materialize requested labels. Today, projection is only effective when Parquet Queryable is enabled. Projection is only applied when not querying mixed block types (parquet and non-parquet) and not querying ingesters.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") + f.BoolVar(&cfg.TimeoutClassificationEnabled, "querier.timeout-classification-enabled", false, "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.") + f.DurationVar(&cfg.TimeoutClassificationDeadline, "querier.timeout-classification-deadline", 59*time.Second, "The total time before the querier proactively cancels a query for timeout classification.") + f.DurationVar(&cfg.TimeoutClassificationEvalThreshold, "querier.timeout-classification-eval-threshold", 40*time.Second, "Eval time threshold above which a timeout is classified as user error (4XX).") } // Validate the config @@ -197,6 +210,21 @@ func (cfg *Config) Validate() error { } } + if cfg.TimeoutClassificationEnabled { + if cfg.TimeoutClassificationDeadline <= 0 { + return errTimeoutClassificationDeadlineNotPositive + } + if cfg.TimeoutClassificationEvalThreshold <= 0 { + return errTimeoutClassificationEvalThresholdNotPositive + } + if cfg.TimeoutClassificationEvalThreshold > cfg.TimeoutClassificationDeadline { + return errTimeoutClassificationEvalThresholdExceedsDeadline + } + if cfg.TimeoutClassificationDeadline >= cfg.Timeout { + return errTimeoutClassificationDeadlineExceedsTimeout + } + } + if err := cfg.ThanosEngine.Validate(); err != nil { return err } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 4a13dae9aaf..5b2c305bd0c 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1679,6 +1679,71 @@ func TestConfig_Validate(t *testing.T) { }, expected: nil, }, + "should pass with valid timeout classification config": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 58 * time.Second + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + }, + "should fail with zero timeout classification deadline": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 0 + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + }, + expected: errTimeoutClassificationDeadlineNotPositive, + }, + "should fail with negative timeout classification eval threshold": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 58 * time.Second + cfg.TimeoutClassificationEvalThreshold = -1 * time.Second + }, + expected: errTimeoutClassificationEvalThresholdNotPositive, + }, + "should fail when eval threshold exceeds deadline": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 30 * time.Second + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + }, + expected: errTimeoutClassificationEvalThresholdExceedsDeadline, + }, + "should pass when eval threshold equals deadline": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 45 * time.Second + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + }, + "should fail when deadline equals engine timeout": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 2 * time.Minute + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + expected: errTimeoutClassificationDeadlineExceedsTimeout, + }, + "should fail when deadline exceeds engine timeout": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = true + cfg.TimeoutClassificationDeadline = 3 * time.Minute + cfg.TimeoutClassificationEvalThreshold = 45 * time.Second + cfg.Timeout = 2 * time.Minute + }, + expected: errTimeoutClassificationDeadlineExceedsTimeout, + }, + "should pass with timeout classification disabled even with invalid values": { + setup: func(cfg *Config) { + cfg.TimeoutClassificationEnabled = false + cfg.TimeoutClassificationDeadline = 0 + cfg.TimeoutClassificationEvalThreshold = -1 * time.Second + }, + }, } for testName, testData := range tests { diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index a834cd311e1..7035bd79f68 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -3,6 +3,7 @@ package stats import ( "context" "fmt" + "strconv" "sync" "sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it. "time" @@ -10,6 +11,12 @@ import ( "github.com/weaveworks/common/httpgrpc" ) +const ( + // QueueTimeHeader is the HTTP header used to propagate the queue enqueue + // timestamp (UnixNano) from the scheduler to the querier. + QueueTimeHeader = "X-Cortex-Queue-Enqueue-Time-Ns" +) + type contextKey int var ctxKey = contextKey(0) @@ -23,6 +30,13 @@ type QueryStats struct { DataSelectMinTime int64 SplitInterval time.Duration m sync.Mutex + + // Phase tracking fields for timeout classification. + // Stored as UnixNano int64 for atomic operations. + queryStart int64 // nanosecond timestamp when query began + queryEnd int64 // nanosecond timestamp when query finished + queueJoinTime int64 // nanosecond timestamp when request entered scheduler queue + queueLeaveTime int64 // nanosecond timestamp when request left scheduler queue } // ContextWithEmptyStats returns a context with empty stats. @@ -306,6 +320,201 @@ func (s *QueryStats) LoadSplitInterval() time.Duration { return s.SplitInterval } +// SetQueryStart records when the query began execution. +func (s *QueryStats) SetQueryStart(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queryStart, t.UnixNano()) +} + +// LoadQueryStart returns the query start time. +func (s *QueryStats) LoadQueryStart() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queryStart) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// SetQueryEnd records when the query finished execution. +func (s *QueryStats) SetQueryEnd(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queryEnd, t.UnixNano()) +} + +// LoadQueryEnd returns the query end time. +func (s *QueryStats) LoadQueryEnd() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queryEnd) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// SetQueueJoinTime records when the request entered the scheduler queue. +func (s *QueryStats) SetQueueJoinTime(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queueJoinTime, t.UnixNano()) +} + +// LoadQueueJoinTime returns the queue join time. +func (s *QueryStats) LoadQueueJoinTime() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queueJoinTime) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// SetQueueLeaveTime records when the request left the scheduler queue. +func (s *QueryStats) SetQueueLeaveTime(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queueLeaveTime, t.UnixNano()) +} + +// LoadQueueLeaveTime returns the queue leave time. +func (s *QueryStats) LoadQueueLeaveTime() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queueLeaveTime) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// updateMaxDuration atomically updates a max duration field if the new value is larger. +func updateMaxDuration(addr *int64, val time.Duration) { + new := int64(val) + for { + old := atomic.LoadInt64(addr) + if new <= old { + return + } + if atomic.CompareAndSwapInt64(addr, old, new) { + return + } + } +} + +// UpdateMaxFetchTime updates the max fetch time if the provided value is larger. +func (s *QueryStats) UpdateMaxFetchTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration((*int64)(&s.MaxFetchTime), t) +} + +func (s *QueryStats) LoadMaxFetchTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxFetchTime))) +} + +func (s *QueryStats) UpdateMaxEvalTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration((*int64)(&s.MaxEvalTime), t) +} + +func (s *QueryStats) LoadMaxEvalTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxEvalTime))) +} + +func (s *QueryStats) UpdateMaxQueueWaitTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration((*int64)(&s.MaxQueueWaitTime), t) +} + +func (s *QueryStats) LoadMaxQueueWaitTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxQueueWaitTime))) +} + +func (s *QueryStats) UpdateMaxTotalTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration((*int64)(&s.MaxTotalTime), t) +} + +func (s *QueryStats) LoadMaxTotalTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64((*int64)(&s.MaxTotalTime))) +} + +// ComputeAndStoreTimingBreakdown computes the timing breakdown from phase tracking +// fields and stores them as max values. This should be called after a sub-query +// completes, before stats are sent back to the frontend. +func (s *QueryStats) ComputeAndStoreTimingBreakdown() { + if s == nil { + return + } + + queryStart := s.LoadQueryStart() + if queryStart.IsZero() { + return + } + + queryEnd := s.LoadQueryEnd() + if queryEnd.IsZero() { + queryEnd = time.Now() + s.SetQueryEnd(queryEnd) + } + + fetchTime := s.LoadQueryStorageWallTime() + totalTime := queryEnd.Sub(queryStart) + evalTime := totalTime - fetchTime + + var queueWaitTime time.Duration + queueJoin := s.LoadQueueJoinTime() + queueLeave := s.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + + s.UpdateMaxFetchTime(fetchTime) + s.UpdateMaxEvalTime(evalTime) + s.UpdateMaxQueueWaitTime(queueWaitTime) + s.UpdateMaxTotalTime(totalTime) +} + func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) { if s == nil { return @@ -396,9 +605,50 @@ func (s *QueryStats) Merge(other *QueryStats) { s.AddScannedSamples(other.LoadScannedSamples()) s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples())) s.AddExtraFields(other.LoadExtraFields()...) + s.UpdateMaxFetchTime(other.LoadMaxFetchTime()) + s.UpdateMaxEvalTime(other.LoadMaxEvalTime()) + s.UpdateMaxQueueWaitTime(other.LoadMaxQueueWaitTime()) + s.UpdateMaxTotalTime(other.LoadMaxTotalTime()) } func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { // Do no track statistics for requests failed because of a server error. return r.Code < 500 } + +// InjectQueueTimeHeader adds the enqueue timestamp as an HTTP header on the +// request so it can be propagated from the scheduler to the querier. +func InjectQueueTimeHeader(req *httpgrpc.HTTPRequest, enqueueTime time.Time) { + if req == nil || enqueueTime.IsZero() { + return + } + req.Headers = append(req.Headers, &httpgrpc.Header{ + Key: QueueTimeHeader, + Values: []string{strconv.FormatInt(enqueueTime.UnixNano(), 10)}, + }) +} + +// ExtractQueueTimeHeader reads the enqueue timestamp from the HTTP header, +// computes the queue wait duration using the current wall-clock as the dequeue +// time, and sets both join and leave times on the given QueryStats. The header +// is removed from the request after extraction. +func ExtractQueueTimeHeader(req *httpgrpc.HTTPRequest, s *QueryStats) { + if req == nil || s == nil { + return + } + remaining := make([]*httpgrpc.Header, 0, len(req.Headers)) + for _, h := range req.Headers { + if h.Key == QueueTimeHeader { + if len(h.Values) > 0 { + if ns, err := strconv.ParseInt(h.Values[0], 10, 64); err == nil { + enqueueTime := time.Unix(0, ns) + s.SetQueueJoinTime(enqueueTime) + s.SetQueueLeaveTime(time.Now()) + } + } + } else { + remaining = append(remaining, h) + } + } + req.Headers = remaining +} diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index 61bbc9da9c1..0eedf7ac11e 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -65,6 +65,12 @@ type Stats struct { // The highest count of samples considered while evaluating a query. // Equal to PeakSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go PeakSamples uint64 `protobuf:"varint,14,opt,name=peak_samples,json=peakSamples,proto3" json:"peak_samples,omitempty"` + // Max timing breakdown across sub-queries for timeout classification. + // These use max() semantics during Merge rather than sum. + MaxFetchTime time.Duration `protobuf:"bytes,15,opt,name=max_fetch_time,json=maxFetchTime,proto3,stdduration" json:"max_fetch_time"` + MaxEvalTime time.Duration `protobuf:"bytes,16,opt,name=max_eval_time,json=maxEvalTime,proto3,stdduration" json:"max_eval_time"` + MaxQueueWaitTime time.Duration `protobuf:"bytes,17,opt,name=max_queue_wait_time,json=maxQueueWaitTime,proto3,stdduration" json:"max_queue_wait_time"` + MaxTotalTime time.Duration `protobuf:"bytes,18,opt,name=max_total_time,json=maxTotalTime,proto3,stdduration" json:"max_total_time"` } func (m *Stats) Reset() { *m = Stats{} } @@ -197,6 +203,34 @@ func (m *Stats) GetPeakSamples() uint64 { return 0 } +func (m *Stats) GetMaxFetchTime() time.Duration { + if m != nil { + return m.MaxFetchTime + } + return 0 +} + +func (m *Stats) GetMaxEvalTime() time.Duration { + if m != nil { + return m.MaxEvalTime + } + return 0 +} + +func (m *Stats) GetMaxQueueWaitTime() time.Duration { + if m != nil { + return m.MaxQueueWaitTime + } + return 0 +} + +func (m *Stats) GetMaxTotalTime() time.Duration { + if m != nil { + return m.MaxTotalTime + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") proto.RegisterMapType((map[string]string)(nil), "stats.Stats.ExtraFieldsEntry") @@ -205,43 +239,48 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 574 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcf, 0x6e, 0xd3, 0x30, - 0x18, 0x8f, 0xb7, 0x75, 0x2c, 0x4e, 0x37, 0x46, 0x28, 0x22, 0x9b, 0x84, 0xd7, 0x31, 0x24, 0x7a, - 0x40, 0x19, 0x2a, 0x17, 0x04, 0x12, 0x9a, 0xba, 0x0d, 0x38, 0x20, 0x04, 0xed, 0x24, 0xa4, 0x5d, - 0x2c, 0xb7, 0x75, 0xd3, 0xa8, 0x69, 0x52, 0x12, 0x87, 0x91, 0x1b, 0x8f, 0xc0, 0x91, 0x47, 0xe0, - 0x51, 0x7a, 0xa3, 0xc7, 0x9d, 0x06, 0x4d, 0x2f, 0x1c, 0xf7, 0x08, 0xc8, 0x9f, 0x9d, 0x16, 0x26, - 0x81, 0xb8, 0xc5, 0xdf, 0xef, 0x8f, 0xfc, 0xfb, 0x7d, 0x31, 0xb6, 0x12, 0xc1, 0x44, 0xe2, 0x8e, - 0xe2, 0x48, 0x44, 0x76, 0x09, 0x0e, 0xdb, 0x15, 0x2f, 0xf2, 0x22, 0x98, 0xec, 0xcb, 0x2f, 0x05, - 0x6e, 0x13, 0x2f, 0x8a, 0xbc, 0x80, 0xef, 0xc3, 0xa9, 0x9d, 0xf6, 0xf6, 0xbb, 0x69, 0xcc, 0x84, - 0x1f, 0x85, 0x1a, 0xdf, 0xba, 0x8a, 0xb3, 0x30, 0x53, 0xd0, 0xdd, 0x6f, 0xab, 0xb8, 0xd4, 0x92, - 0xd6, 0xf6, 0x01, 0x36, 0xcf, 0x58, 0x10, 0x50, 0xe1, 0x0f, 0xb9, 0x83, 0xaa, 0xa8, 0x66, 0xd5, - 0xb7, 0x5c, 0x25, 0x74, 0x0b, 0xa1, 0x7b, 0xa4, 0x8d, 0x1b, 0x6b, 0xe3, 0x8b, 0x1d, 0xe3, 0xcb, - 0xf7, 0x1d, 0xd4, 0x5c, 0x93, 0xaa, 0x13, 0x7f, 0xc8, 0xed, 0x87, 0xb8, 0xd2, 0xe3, 0xa2, 0xd3, - 0xe7, 0x5d, 0x9a, 0xf0, 0xd8, 0xe7, 0x09, 0xed, 0x44, 0x69, 0x28, 0x9c, 0xa5, 0x2a, 0xaa, 0xad, - 0x34, 0x6d, 0x8d, 0xb5, 0x00, 0x3a, 0x94, 0x88, 0xed, 0xe2, 0x9b, 0x85, 0xa2, 0xd3, 0x4f, 0xc3, - 0x01, 0x6d, 0x67, 0x82, 0x27, 0xce, 0x32, 0x08, 0x6e, 0x68, 0xe8, 0x50, 0x22, 0x0d, 0x09, 0xd8, - 0x0f, 0x70, 0xe1, 0x42, 0xbb, 0x4c, 0x30, 0x4d, 0x5f, 0x01, 0xfa, 0xa6, 0x46, 0x8e, 0x98, 0x60, - 0x8a, 0x7d, 0x80, 0xcb, 0xfc, 0xa3, 0x88, 0x19, 0xed, 0xf9, 0x3c, 0xe8, 0x26, 0x4e, 0xa9, 0xba, - 0x5c, 0xb3, 0xea, 0x77, 0x5c, 0xd5, 0x2b, 0xa4, 0x76, 0x8f, 0x25, 0xe1, 0x39, 0xe0, 0xc7, 0xa1, - 0x88, 0xb3, 0xa6, 0xc5, 0x17, 0x93, 0xdf, 0x13, 0xc1, 0xfd, 0x8a, 0x44, 0xab, 0x7f, 0x24, 0x82, - 0x0b, 0xea, 0x44, 0x75, 0x7c, 0x6b, 0xde, 0x01, 0x1b, 0x8e, 0x82, 0x79, 0x09, 0xd7, 0x40, 0x52, - 0xc4, 0x6d, 0x29, 0x4c, 0x69, 0x76, 0xb1, 0x19, 0xf8, 0x43, 0x5f, 0xd0, 0xbe, 0x2f, 0x9c, 0xb5, - 0x2a, 0xaa, 0x99, 0x8d, 0x95, 0xf1, 0x85, 0xac, 0x16, 0xc6, 0x2f, 0x7d, 0x61, 0xef, 0xe1, 0xf5, - 0x64, 0x14, 0xf8, 0x82, 0xbe, 0x4f, 0xa1, 0x3e, 0xc7, 0x04, 0xbb, 0x32, 0x0c, 0xdf, 0xaa, 0x99, - 0x7d, 0x8a, 0x6f, 0x4b, 0x38, 0xa3, 0x89, 0x88, 0x62, 0xe6, 0x71, 0xba, 0xd8, 0x27, 0xfe, 0xff, - 0x7d, 0x56, 0xc0, 0xa3, 0xa5, 0x2c, 0xde, 0x15, 0xbb, 0x7d, 0x8d, 0xef, 0x49, 0x57, 0x4e, 0x3d, - 0x26, 0xf8, 0x19, 0xcb, 0xa8, 0x88, 0x52, 0x48, 0x39, 0x8a, 0x12, 0xe1, 0x87, 0x5e, 0x11, 0xd3, - 0x82, 0x7b, 0x55, 0x81, 0xfb, 0x42, 0x51, 0x4f, 0x14, 0xf3, 0x8d, 0x26, 0xaa, 0xcc, 0xaf, 0xf0, - 0xde, 0x3f, 0xfd, 0xf4, 0x6a, 0xcb, 0x60, 0xb7, 0xf3, 0x77, 0x3b, 0xb5, 0xe9, 0xfb, 0xf8, 0x7a, - 0xd2, 0x61, 0x61, 0xb8, 0x68, 0xdd, 0x59, 0x07, 0xe5, 0x86, 0x1e, 0xeb, 0xbe, 0xed, 0x5d, 0x5c, - 0x1e, 0x71, 0x36, 0x98, 0xb3, 0x36, 0x80, 0x65, 0xc9, 0x99, 0xa6, 0x6c, 0x3f, 0xc3, 0x9b, 0x57, - 0x7f, 0x0a, 0x7b, 0x13, 0x2f, 0x0f, 0x78, 0x06, 0xaf, 0xc2, 0x6c, 0xca, 0x4f, 0xbb, 0x82, 0x4b, - 0x1f, 0x58, 0x90, 0x72, 0xf8, 0xb9, 0xcd, 0xa6, 0x3a, 0x3c, 0x59, 0x7a, 0x8c, 0x1a, 0x4f, 0x27, - 0x53, 0x62, 0x9c, 0x4f, 0x89, 0x71, 0x39, 0x25, 0xe8, 0x53, 0x4e, 0xd0, 0xd7, 0x9c, 0xa0, 0x71, - 0x4e, 0xd0, 0x24, 0x27, 0xe8, 0x47, 0x4e, 0xd0, 0xcf, 0x9c, 0x18, 0x97, 0x39, 0x41, 0x9f, 0x67, - 0xc4, 0x98, 0xcc, 0x88, 0x71, 0x3e, 0x23, 0xc6, 0xa9, 0x7a, 0xdf, 0xed, 0x55, 0xd8, 0xcc, 0xa3, - 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x67, 0xda, 0xe5, 0xfc, 0x03, 0x00, 0x00, + // 656 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4d, 0x4f, 0x13, 0x4f, + 0x18, 0xdf, 0x01, 0xca, 0x9f, 0x9d, 0x2d, 0x50, 0x96, 0xfe, 0xe3, 0x42, 0xe2, 0x50, 0xc4, 0xc4, + 0x1e, 0xcc, 0x62, 0xf0, 0x62, 0x34, 0x31, 0xa4, 0xbc, 0x69, 0x62, 0x8c, 0xb4, 0x24, 0x24, 0x5c, + 0x26, 0x43, 0x3b, 0x2c, 0x13, 0xf6, 0xa5, 0x74, 0x67, 0xa1, 0xbd, 0x19, 0x3f, 0x81, 0x47, 0x3f, + 0x82, 0x1f, 0x85, 0x23, 0x47, 0x4e, 0x28, 0xcb, 0xc5, 0x23, 0x1f, 0xc1, 0xcc, 0x33, 0xb3, 0xa0, + 0x24, 0x9a, 0x7a, 0xdb, 0x79, 0x7e, 0x2f, 0x99, 0xdf, 0xef, 0xc9, 0x0e, 0x76, 0x52, 0xc9, 0x64, + 0xea, 0x77, 0x7b, 0x89, 0x4c, 0xdc, 0x12, 0x1c, 0xe6, 0xab, 0x41, 0x12, 0x24, 0x30, 0x59, 0x56, + 0x5f, 0x1a, 0x9c, 0x27, 0x41, 0x92, 0x04, 0x21, 0x5f, 0x86, 0xd3, 0x7e, 0x76, 0xb0, 0xdc, 0xc9, + 0x7a, 0x4c, 0x8a, 0x24, 0x36, 0xf8, 0xdc, 0x7d, 0x9c, 0xc5, 0x03, 0x0d, 0x3d, 0xfa, 0x64, 0xe3, + 0x52, 0x4b, 0x59, 0xbb, 0xab, 0xd8, 0x3e, 0x65, 0x61, 0x48, 0xa5, 0x88, 0xb8, 0x87, 0x6a, 0xa8, + 0xee, 0xac, 0xcc, 0xf9, 0x5a, 0xe8, 0x17, 0x42, 0x7f, 0xdd, 0x18, 0x37, 0x26, 0xce, 0x2e, 0x17, + 0xac, 0x2f, 0xdf, 0x16, 0x50, 0x73, 0x42, 0xa9, 0x76, 0x44, 0xc4, 0xdd, 0x67, 0xb8, 0x7a, 0xc0, + 0x65, 0xfb, 0x90, 0x77, 0x68, 0xca, 0x7b, 0x82, 0xa7, 0xb4, 0x9d, 0x64, 0xb1, 0xf4, 0x46, 0x6a, + 0xa8, 0x3e, 0xd6, 0x74, 0x0d, 0xd6, 0x02, 0x68, 0x4d, 0x21, 0xae, 0x8f, 0x67, 0x0b, 0x45, 0xfb, + 0x30, 0x8b, 0x8f, 0xe8, 0xfe, 0x40, 0xf2, 0xd4, 0x1b, 0x05, 0xc1, 0x8c, 0x81, 0xd6, 0x14, 0xd2, + 0x50, 0x80, 0xfb, 0x14, 0x17, 0x2e, 0xb4, 0xc3, 0x24, 0x33, 0xf4, 0x31, 0xa0, 0x57, 0x0c, 0xb2, + 0xce, 0x24, 0xd3, 0xec, 0x55, 0x5c, 0xe6, 0x7d, 0xd9, 0x63, 0xf4, 0x40, 0xf0, 0xb0, 0x93, 0x7a, + 0xa5, 0xda, 0x68, 0xdd, 0x59, 0x79, 0xe8, 0xeb, 0x5e, 0x21, 0xb5, 0xbf, 0xa1, 0x08, 0x9b, 0x80, + 0x6f, 0xc4, 0xb2, 0x37, 0x68, 0x3a, 0xfc, 0x6e, 0xf2, 0x6b, 0x22, 0xb8, 0x5f, 0x91, 0x68, 0xfc, + 0xb7, 0x44, 0x70, 0x41, 0x93, 0x68, 0x05, 0xff, 0x7f, 0xdb, 0x01, 0x8b, 0xba, 0xe1, 0x6d, 0x09, + 0xff, 0x81, 0xa4, 0x88, 0xdb, 0xd2, 0x98, 0xd6, 0x2c, 0x62, 0x3b, 0x14, 0x91, 0x90, 0xf4, 0x50, + 0x48, 0x6f, 0xa2, 0x86, 0xea, 0x76, 0x63, 0xec, 0xec, 0x52, 0x55, 0x0b, 0xe3, 0x37, 0x42, 0xba, + 0x4b, 0x78, 0x32, 0xed, 0x86, 0x42, 0xd2, 0xe3, 0x0c, 0xea, 0xf3, 0x6c, 0xb0, 0x2b, 0xc3, 0x70, + 0x5b, 0xcf, 0xdc, 0x3d, 0xfc, 0x40, 0xc1, 0x03, 0x9a, 0xca, 0xa4, 0xc7, 0x02, 0x4e, 0xef, 0xf6, + 0x89, 0x87, 0xdf, 0x67, 0x15, 0x3c, 0x5a, 0xda, 0x62, 0xb7, 0xd8, 0xed, 0x7b, 0xfc, 0x58, 0xb9, + 0x72, 0x1a, 0x30, 0xc9, 0x4f, 0xd9, 0x80, 0xca, 0x24, 0x83, 0x94, 0xdd, 0x24, 0x95, 0x22, 0x0e, + 0x8a, 0x98, 0x0e, 0xdc, 0xab, 0x06, 0xdc, 0x2d, 0x4d, 0xdd, 0xd1, 0xcc, 0x0f, 0x86, 0xa8, 0x33, + 0xbf, 0xc3, 0x4b, 0x7f, 0xf5, 0x33, 0xab, 0x2d, 0x83, 0xdd, 0xc2, 0x9f, 0xed, 0xf4, 0xa6, 0x9f, + 0xe0, 0xe9, 0xb4, 0xcd, 0xe2, 0xf8, 0xae, 0x75, 0x6f, 0x12, 0x94, 0x53, 0x66, 0x6c, 0xfa, 0x76, + 0x17, 0x71, 0xb9, 0xcb, 0xd9, 0xd1, 0x2d, 0x6b, 0x0a, 0x58, 0x8e, 0x9a, 0x15, 0x94, 0xb7, 0x78, + 0x2a, 0x62, 0x7d, 0x0a, 0x8b, 0xd2, 0xe5, 0x4d, 0x0f, 0x5f, 0x5e, 0x39, 0x62, 0xfd, 0x4d, 0xa5, + 0x84, 0xd2, 0xb6, 0xf0, 0xa4, 0xb2, 0xe2, 0x27, 0xcc, 0xac, 0xa1, 0x32, 0xbc, 0x93, 0x13, 0xb1, + 0xfe, 0xc6, 0x09, 0xd3, 0xed, 0x37, 0xf1, 0xac, 0x32, 0x3a, 0xce, 0x78, 0xa6, 0xb6, 0x2a, 0xa4, + 0xb6, 0x9b, 0x19, 0xde, 0xae, 0x12, 0xb1, 0xfe, 0xb6, 0x92, 0xef, 0x32, 0x21, 0xc1, 0xd3, 0xe4, + 0x94, 0x89, 0x2c, 0x6e, 0xe7, 0xfe, 0x5b, 0xce, 0x1d, 0xa5, 0x54, 0x56, 0xf3, 0xaf, 0x71, 0xe5, + 0xfe, 0x7f, 0xe4, 0x56, 0xf0, 0xe8, 0x11, 0x1f, 0xc0, 0x43, 0x62, 0x37, 0xd5, 0xa7, 0x5b, 0xc5, + 0xa5, 0x13, 0x16, 0x66, 0x1c, 0xde, 0x03, 0xbb, 0xa9, 0x0f, 0x2f, 0x47, 0x5e, 0xa0, 0xc6, 0xab, + 0xf3, 0x2b, 0x62, 0x5d, 0x5c, 0x11, 0xeb, 0xe6, 0x8a, 0xa0, 0x8f, 0x39, 0x41, 0x5f, 0x73, 0x82, + 0xce, 0x72, 0x82, 0xce, 0x73, 0x82, 0xbe, 0xe7, 0x04, 0xfd, 0xc8, 0x89, 0x75, 0x93, 0x13, 0xf4, + 0xf9, 0x9a, 0x58, 0xe7, 0xd7, 0xc4, 0xba, 0xb8, 0x26, 0xd6, 0x9e, 0x7e, 0x12, 0xf7, 0xc7, 0xe1, + 0x9e, 0xcf, 0x7f, 0x06, 0x00, 0x00, 0xff, 0xff, 0x5b, 0x42, 0x1f, 0xae, 0x2f, 0x05, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -310,13 +349,25 @@ func (this *Stats) Equal(that interface{}) bool { if this.PeakSamples != that1.PeakSamples { return false } + if this.MaxFetchTime != that1.MaxFetchTime { + return false + } + if this.MaxEvalTime != that1.MaxEvalTime { + return false + } + if this.MaxQueueWaitTime != that1.MaxQueueWaitTime { + return false + } + if this.MaxTotalTime != that1.MaxTotalTime { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 18) + s := make([]string, 0, 22) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") @@ -344,6 +395,10 @@ func (this *Stats) GoString() string { s = append(s, "StoreGatewayTouchedPostingBytes: "+fmt.Sprintf("%#v", this.StoreGatewayTouchedPostingBytes)+",\n") s = append(s, "ScannedSamples: "+fmt.Sprintf("%#v", this.ScannedSamples)+",\n") s = append(s, "PeakSamples: "+fmt.Sprintf("%#v", this.PeakSamples)+",\n") + s = append(s, "MaxFetchTime: "+fmt.Sprintf("%#v", this.MaxFetchTime)+",\n") + s = append(s, "MaxEvalTime: "+fmt.Sprintf("%#v", this.MaxEvalTime)+",\n") + s = append(s, "MaxQueueWaitTime: "+fmt.Sprintf("%#v", this.MaxQueueWaitTime)+",\n") + s = append(s, "MaxTotalTime: "+fmt.Sprintf("%#v", this.MaxTotalTime)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -375,6 +430,44 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxTotalTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxTotalTime):]) + if err1 != nil { + return 0, err1 + } + i -= n1 + i = encodeVarintStats(dAtA, i, uint64(n1)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x92 + n2, err2 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxQueueWaitTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxQueueWaitTime):]) + if err2 != nil { + return 0, err2 + } + i -= n2 + i = encodeVarintStats(dAtA, i, uint64(n2)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x8a + n3, err3 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxEvalTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxEvalTime):]) + if err3 != nil { + return 0, err3 + } + i -= n3 + i = encodeVarintStats(dAtA, i, uint64(n3)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x82 + n4, err4 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MaxFetchTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxFetchTime):]) + if err4 != nil { + return 0, err4 + } + i -= n4 + i = encodeVarintStats(dAtA, i, uint64(n4)) + i-- + dAtA[i] = 0x7a if m.PeakSamples != 0 { i = encodeVarintStats(dAtA, i, uint64(m.PeakSamples)) i-- @@ -395,12 +488,12 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x58 } - n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.QueryStorageWallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.QueryStorageWallTime):]) - if err1 != nil { - return 0, err1 + n5, err5 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.QueryStorageWallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.QueryStorageWallTime):]) + if err5 != nil { + return 0, err5 } - i -= n1 - i = encodeVarintStats(dAtA, i, uint64(n1)) + i -= n5 + i = encodeVarintStats(dAtA, i, uint64(n5)) i-- dAtA[i] = 0x52 if m.SplitQueries != 0 { @@ -459,12 +552,12 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x10 } - n2, err2 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.WallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime):]) - if err2 != nil { - return 0, err2 + n6, err6 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.WallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime):]) + if err6 != nil { + return 0, err6 } - i -= n2 - i = encodeVarintStats(dAtA, i, uint64(n2)) + i -= n6 + i = encodeVarintStats(dAtA, i, uint64(n6)) i-- dAtA[i] = 0xa return len(dAtA) - i, nil @@ -533,6 +626,14 @@ func (m *Stats) Size() (n int) { if m.PeakSamples != 0 { n += 1 + sovStats(uint64(m.PeakSamples)) } + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxFetchTime) + n += 1 + l + sovStats(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxEvalTime) + n += 2 + l + sovStats(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxQueueWaitTime) + n += 2 + l + sovStats(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MaxTotalTime) + n += 2 + l + sovStats(uint64(l)) return n } @@ -571,6 +672,10 @@ func (this *Stats) String() string { `StoreGatewayTouchedPostingBytes:` + fmt.Sprintf("%v", this.StoreGatewayTouchedPostingBytes) + `,`, `ScannedSamples:` + fmt.Sprintf("%v", this.ScannedSamples) + `,`, `PeakSamples:` + fmt.Sprintf("%v", this.PeakSamples) + `,`, + `MaxFetchTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxFetchTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, + `MaxEvalTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxEvalTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, + `MaxQueueWaitTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxQueueWaitTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, + `MaxTotalTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.MaxTotalTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -1027,6 +1132,138 @@ func (m *Stats) Unmarshal(dAtA []byte) error { break } } + case 15: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxFetchTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxFetchTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 16: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxEvalTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxEvalTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 17: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxQueueWaitTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxQueueWaitTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 18: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxTotalTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MaxTotalTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 8e53e02ccc3..6d1fcf7457b 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -45,4 +45,10 @@ message Stats { // The highest count of samples considered while evaluating a query. // Equal to PeakSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go uint64 peak_samples = 14; + // Max timing breakdown across sub-queries for timeout classification. + // These use max() semantics during Merge rather than sum. + google.protobuf.Duration max_fetch_time = 15 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + google.protobuf.Duration max_eval_time = 16 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + google.protobuf.Duration max_queue_wait_time = 17 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + google.protobuf.Duration max_total_time = 18 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index 7908d06773d..f19c9fc389d 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -282,3 +282,96 @@ func checkExtraFields(t *testing.T, expected, actual []any) { assert.Equal(t, expectedMap, actualMap) } + +func TestStats_QueryStart(t *testing.T) { + t.Run("set and load query start", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + now := time.Now() + stats.SetQueryStart(now) + + loaded := stats.LoadQueryStart() + assert.Equal(t, now.UnixNano(), loaded.UnixNano()) + }) + + t.Run("nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.SetQueryStart(time.Now()) + + assert.True(t, stats.LoadQueryStart().IsZero()) + }) + + t.Run("zero value when unset", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + + assert.True(t, stats.LoadQueryStart().IsZero()) + }) +} + +func TestStats_QueueJoinTime(t *testing.T) { + t.Run("set and load queue join time", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + now := time.Now() + stats.SetQueueJoinTime(now) + + loaded := stats.LoadQueueJoinTime() + assert.Equal(t, now.UnixNano(), loaded.UnixNano()) + }) + + t.Run("nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.SetQueueJoinTime(time.Now()) + + assert.True(t, stats.LoadQueueJoinTime().IsZero()) + }) + + t.Run("zero value when unset", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + + assert.True(t, stats.LoadQueueJoinTime().IsZero()) + }) +} + +func TestStats_QueueLeaveTime(t *testing.T) { + t.Run("set and load queue leave time", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + now := time.Now() + stats.SetQueueLeaveTime(now) + + loaded := stats.LoadQueueLeaveTime() + assert.Equal(t, now.UnixNano(), loaded.UnixNano()) + }) + + t.Run("nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.SetQueueLeaveTime(time.Now()) + + assert.True(t, stats.LoadQueueLeaveTime().IsZero()) + }) + + t.Run("zero value when unset", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + + assert.True(t, stats.LoadQueueLeaveTime().IsZero()) + }) +} + +func TestStats_Merge_DoesNotCopyPhaseTrackingFields(t *testing.T) { + t.Run("merge does not copy phase tracking fields", func(t *testing.T) { + source := &QueryStats{} + source.SetQueryStart(time.Now()) + source.SetQueueJoinTime(time.Now()) + source.SetQueueLeaveTime(time.Now()) + source.AddWallTime(time.Second) + + target := &QueryStats{} + target.Merge(source) + + // Phase tracking fields should NOT be copied + assert.True(t, target.LoadQueryStart().IsZero()) + assert.True(t, target.LoadQueueJoinTime().IsZero()) + assert.True(t, target.LoadQueueLeaveTime().IsZero()) + + // Regular fields should still be merged + assert.Equal(t, time.Second, target.LoadWallTime()) + }) +} diff --git a/pkg/querier/stats/timeout_decision.go b/pkg/querier/stats/timeout_decision.go new file mode 100644 index 00000000000..0883259224e --- /dev/null +++ b/pkg/querier/stats/timeout_decision.go @@ -0,0 +1,54 @@ +package stats + +import "time" + +// TimeoutDecision represents the classification of a query timeout. +type TimeoutDecision int + +const ( + // Default5XX means return 503 (current behavior). + Default5XX TimeoutDecision = iota + // UserError4XX means the query is too expensive, return 422. + UserError4XX +) + +// PhaseTrackerConfig holds configurable thresholds for timeout classification. +type PhaseTrackerConfig struct { + // TotalTimeout is the total time before the querier cancels the query context. + TotalTimeout time.Duration + + // EvalTimeThreshold is the eval time above which a timeout is classified as user error (4XX). + EvalTimeThreshold time.Duration + + // Enabled controls whether the 5XX-to-4XX conversion is active. + Enabled bool +} + +// DecideTimeoutResponse inspects QueryStats phase timings and returns a TimeoutDecision. +// It returns UserError4XX if eval time exceeds the threshold, Default5XX otherwise. +// It is a pure function that does not modify stats or cfg. +func DecideTimeoutResponse(stats *QueryStats, cfg PhaseTrackerConfig) TimeoutDecision { + if stats == nil { + return Default5XX + } + + queryStart := stats.LoadQueryStart() + if queryStart.IsZero() { + return Default5XX + } + + queryEnd := stats.LoadQueryEnd() + var totalTime time.Duration + if !queryEnd.IsZero() { + totalTime = queryEnd.Sub(queryStart) + } else { + totalTime = time.Since(queryStart) + } + evalTime := totalTime - stats.LoadQueryStorageWallTime() + + if evalTime > cfg.EvalTimeThreshold { + return UserError4XX + } + + return Default5XX +} diff --git a/pkg/querier/stats/timeout_decision_test.go b/pkg/querier/stats/timeout_decision_test.go new file mode 100644 index 00000000000..d6f9511ceac --- /dev/null +++ b/pkg/querier/stats/timeout_decision_test.go @@ -0,0 +1,49 @@ +package stats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDecideTimeoutResponse_NilStats(t *testing.T) { + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, Default5XX, DecideTimeoutResponse(nil, cfg)) +} + +func TestDecideTimeoutResponse_ZeroQueryStart(t *testing.T) { + stats := &QueryStats{} + // queryStart is zero by default (never set). + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, Default5XX, DecideTimeoutResponse(stats, cfg)) +} + +func TestDecideTimeoutResponse_EvalBelowThreshold(t *testing.T) { + stats := &QueryStats{} + // Set queryStart to now so evalTime ≈ 0, well below threshold. + stats.SetQueryStart(time.Now()) + // No storage wall time, so evalTime = time.Since(now) - 0 ≈ 0. + + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, Default5XX, DecideTimeoutResponse(stats, cfg)) +} + +func TestDecideTimeoutResponse_EvalAboveThreshold(t *testing.T) { + stats := &QueryStats{} + // Set queryStart 60s in the past so time.Since(queryStart) ≈ 60s. + stats.SetQueryStart(time.Now().Add(-60 * time.Second)) + // Set storageWallTime to 5s so evalTime ≈ 60s - 5s = 55s > 45s threshold. + stats.AddQueryStorageWallTime(5 * time.Second) + + cfg := PhaseTrackerConfig{ + EvalTimeThreshold: 45 * time.Second, + } + assert.Equal(t, UserError4XX, DecideTimeoutResponse(stats, cfg)) +} diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 88f7f311393..923c9576fc4 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -156,6 +156,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) } + // Compute timing breakdown before sending stats back to the frontend. + stats.ComputeAndStoreTimingBreakdown() + // Ensure responses that are too big are not retried. if len(response.Body) >= fp.maxMessageSize { errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize) diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 3bba5980442..8a31ad72118 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -176,6 +176,7 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, var stats *querier_stats.QueryStats if statsEnabled { stats, ctx = querier_stats.ContextWithEmptyStats(ctx) + querier_stats.ExtractQueueTimeHeader(request, stats) } response, err := sp.handler.Handle(ctx, request) @@ -193,6 +194,9 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) } + // Compute timing breakdown before sending stats back to the frontend. + stats.ComputeAndStoreTimingBreakdown() + if err = ctx.Err(); err != nil { return } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 882ed166174..3539c01e6bc 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -527,6 +527,10 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL r := req.(*schedulerRequest) + // Propagate the enqueue timestamp to the querier via an HTTP header. + // The querier will use its own wall-clock as the dequeue time. + stats.InjectQueueTimeHeader(r.request, r.enqueueTime) + s.queueDuration.Observe(time.Since(r.enqueueTime).Seconds()) r.queueSpan.Finish() diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 20cf970c35b..9344ccca7d9 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -6156,6 +6156,26 @@ "type": "string", "x-cli-flag": "querier.timeout", "x-format": "duration" + }, + "timeout_classification_deadline": { + "default": "59s", + "description": "The total time before the querier proactively cancels a query for timeout classification.", + "type": "string", + "x-cli-flag": "querier.timeout-classification-deadline", + "x-format": "duration" + }, + "timeout_classification_enabled": { + "default": false, + "description": "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.", + "type": "boolean", + "x-cli-flag": "querier.timeout-classification-enabled" + }, + "timeout_classification_eval_threshold": { + "default": "40s", + "description": "Eval time threshold above which a timeout is classified as user error (4XX).", + "type": "string", + "x-cli-flag": "querier.timeout-classification-eval-threshold", + "x-format": "duration" } }, "type": "object"