Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,20 @@ querier:
# mixed block types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# If true, classify query timeouts as 4XX (user error) or 5XX (system error)
# based on phase timing.
# CLI flag: -querier.timeout-classification-enabled
[timeout_classification_enabled: <boolean> | default = false]

# The total time before the querier proactively cancels a query for timeout
# classification.
# CLI flag: -querier.timeout-classification-deadline
[timeout_classification_deadline: <duration> | default = 59s]

# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 40s]
```

### `blocks_storage_config`
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4959,6 +4959,20 @@ thanos_engine:
# types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# If true, classify query timeouts as 4XX (user error) or 5XX (system error)
# based on phase timing.
# CLI flag: -querier.timeout-classification-enabled
[timeout_classification_enabled: <boolean> | default = false]

# The total time before the querier proactively cancels a query for timeout
# classification.
# CLI flag: -querier.timeout-classification-deadline
[timeout_classification_deadline: <duration> | default = 59s]

# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 40s]
```

### `query_frontend_config`
Expand Down
6 changes: 5 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ func NewQuerierHandler(
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
api.Register(legacyPromRouter)

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, stats.PhaseTrackerConfig{
TotalTimeout: querierCfg.TimeoutClassificationDeadline,
EvalTimeThreshold: querierCfg.TimeoutClassificationEvalThreshold,
Enabled: querierCfg.TimeoutClassificationEnabled,
})

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
Expand Down
138 changes: 124 additions & 14 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import (
"github.com/cortexproject/cortex/pkg/distributed_execution"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/api"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
timeoutClassification stats.PhaseTrackerConfig
}

func NewQueryAPI(
Expand All @@ -42,15 +45,17 @@ func NewQueryAPI(
logger log.Logger,
codecs []v1.Codec,
CORSOrigin *regexp.Regexp,
timeoutClassification stats.PhaseTrackerConfig,
) *QueryAPI {
return &QueryAPI{
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
timeoutClassification: timeoutClassification,
}
}

Expand Down Expand Up @@ -84,6 +89,11 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -95,6 +105,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -138,6 +157,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand All @@ -159,6 +185,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -170,6 +201,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -211,6 +251,13 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand Down Expand Up @@ -281,6 +328,69 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data any, w
}
}

// applyTimeoutClassification creates a proactive context timeout that fires before
// the PromQL engine's own timeout, adjusted for queue wait time. Returns the
// (possibly wrapped) context, an optional cancel func, and an optional early-exit
// result when the entire timeout budget was already consumed in the queue.
func applyTimeoutClassification(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig) (context.Context, context.CancelFunc, *apiFuncResult) {
if !cfg.Enabled {
return ctx, nil, nil
}
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
effectiveTimeout := cfg.TotalTimeout - queueWaitTime
if effectiveTimeout <= 0 {
return ctx, nil, &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable,
"query timed out: query spent too long in scheduler queue")}, nil, nil}
}
ctx, cancel := context.WithTimeout(ctx, effectiveTimeout)
return ctx, cancel, nil
}

// classifyTimeout inspects phase timings after a context cancellation/timeout
// and returns an apiFuncResult if the timeout should be converted to a 4XX user error.
// Returns nil if no conversion applies and the caller should use the default error path.
func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig, warnings annotations.Annotations, closer func()) *apiFuncResult {
decision := stats.DecideTimeoutResponse(queryStats, cfg)

fetchTime := queryStats.LoadQueryStorageWallTime()
totalTime := time.Since(queryStats.LoadQueryStart())
evalTime := totalTime - fetchTime
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
level.Warn(q.logger).Log(
"msg", "query timed out with classification",
"request_id", requestmeta.RequestIdFromContext(ctx),
"query_start", queryStats.LoadQueryStart(),
"queue_wait_time", queueWaitTime,
"fetch_time", fetchTime,
"eval_time", evalTime,
"total_time", totalTime,
"decision", decision,
"conversion_enabled", cfg.Enabled,
)

if cfg.Enabled && decision == stats.UserError4XX {
return &apiFuncResult{nil, &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity,
"query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer}
}

if cfg.Enabled {
return &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusGatewayTimeout,
"%s", ErrUpstreamRequestTimeout)}, warnings, closer}
}

return nil
}

func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
for _, codec := range q.codecs {
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/queryapi/query_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func Test_CustomAPI(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})

router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
Expand Down Expand Up @@ -244,7 +244,7 @@ func Test_InvalidCodec(t *testing.T) {
},
}

queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"))
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})
router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))

Expand Down Expand Up @@ -285,7 +285,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) {
},
}

queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})

router := mux.NewRouter()
router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
Expand Down Expand Up @@ -441,7 +441,7 @@ func Test_Logicalplan_Requests(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})
router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler))
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/queryapi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

var (
ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time")
ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer")
ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time")
ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer")
ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
ErrUpstreamRequestTimeout = "upstream request timeout"
)

func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,19 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
logMessage = append(logMessage, "query_storage_wall_time_seconds", sws)
}

if maxFetch := stats.LoadMaxFetchTime(); maxFetch > 0 {
logMessage = append(logMessage, "max_fetch_time", maxFetch)
}
if maxEval := stats.LoadMaxEvalTime(); maxEval > 0 {
logMessage = append(logMessage, "max_eval_time", maxEval)
}
if maxQueue := stats.LoadMaxQueueWaitTime(); maxQueue > 0 {
logMessage = append(logMessage, "max_queue_wait_time", maxQueue)
}
if maxTotal := stats.LoadMaxTotalTime(); maxTotal > 0 {
logMessage = append(logMessage, "max_total_time", maxTotal)
}

if splitInterval > 0 {
logMessage = append(logMessage, "split_interval", splitInterval.String())
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/frontend/transport/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/thanos-io/thanos/pkg/pool"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

Expand Down Expand Up @@ -79,7 +80,15 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)
func isBodyRetryable(body string) bool {
// If pool exhausted, retry at query frontend might make things worse.
// Rely on retries at querier level only.
return !strings.Contains(body, pool.ErrPoolExhausted.Error())
if strings.Contains(body, pool.ErrPoolExhausted.Error()) {
return false
}

// If request timed out upstream, there isnt enough time to retry.
if strings.Contains(body, queryapi.ErrUpstreamRequestTimeout) {
return false
}
return true
}

func yoloString(b []byte) string {
Expand Down
Loading
Loading