Skip to content

Performance issue while buiding per-session function registry #22614

@fred1268

Description

@fred1268

Describe the bug

Problem Statement

In DataFusion, functions are registered into the session state on every session creation. All three function categories — scalar, aggregate, and window — are collected into fresh Vecs and then inserted individually into new HashMaps. This work is repeated identically for every session, regardless of how many sessions are created or how fast they are created.

Function counts (at time of writing)

Category Count Source Config-dependent
Scalar UDFs 115 all_default_functions() — math, string, datetime, regex, crypto, unicode, encoding, core 6
Aggregate UDFs 38 all_default_aggregate_functions() 0
Window UDFs 12 all_default_window_functions() 0
Total 165 6

Per-session initialization cost

Each call to SessionStateBuilder::build() — which happens on every session creation — pays the following costs:

Operation Count Detail
Arc::clone ~220–250 into Vecs + into HashMaps + alias clones
HashMap::insert ~180+ primary names + aliases across all 3 maps
String allocs ~180+ one heap key per HashMap entry
Vec/HashMap allocs ~6 3 collector Vecs + 3 result HashMaps

The TaskContext double-copy

Before any physical plan is executed, DataFusion calls session_state.task_ctx(), which creates a TaskContext from the session state via From<&SessionState>. TaskContext owns its own independent copies of all three function maps, cloned in full from the session state:

// session_state.rs:2288 — full clone of all three HashMaps
impl From<&SessionState> for TaskContext {
    fn from(state: &SessionState) -> Self {
        TaskContext::new(
            ...,
            state.scalar_functions.clone(),     // full HashMap clone
            state.aggregate_functions.clone(),  // full HashMap clone
            state.window_functions.clone(),     // full HashMap clone
            ...
        )
    }
}

This means every query pays the HashMap rebuild cost twice: once when build() constructs the SessionState, and once when task_ctx() converts it to a TaskContext.

The waste: 6 session-dependent functions out of 165

Only 6 scalar UDFs are actually session-dependent, meaning their return type changes with session configuration (specifically the timezone setting):

Function Reason for session-dependency
now / current_timestamp Return type is Timestamp(ns, timezone) — timezone is per-session
to_timestamp Same
to_timestamp_seconds Same
to_timestamp_millis Same
to_timestamp_micros Same
to_timestamp_nanos Same

Aggregate and window functions are structurally incapable of being session-dependent: their impl traits (AggregateUDFImpl, WindowUDFImpl) do not define a with_updated_config method at all. This is not a temporary limitation — it is a design boundary.

⚠️ Summary: every session creation pays to rebuild 165 function registries and every query execution pays to clone them again into a TaskContext, yet only 6 of those 165 functions actually differ between sessions.

To Reproduce

No response

Expected behavior

Session creation should not rebuild function registries from scratch on every call.

The 165 built-in functions are identical across all sessions. Only 6 scalar UDFs (now, to_timestamp, and 4 variants) carry session-specific state (the configured timezone in their return type). The remaining 159 functions never change.

Today — per session creation:

Operation Count
Arc::clone ~220–250
HashMap::insert ~180+
String heap allocations ~180+
Vec/HashMap allocations ~6

The same cost is paid a second time when task_ctx() converts the SessionState into a TaskContext before each physical plan execution.

Expected — per session creation:

Operation Count
Arc::clone 3 (one per shared registry)
HashMap::insert ~8 (dynamic map for the 6 config-dependent functions only)
String heap allocations ~8
Vec/HashMap allocations 1 (the small dynamic map)

The 3 shared registries (Arc<HashMap>) are built once at process start and reused across all sessions. TaskContext creation also becomes 3 Arc::clones instead of a full HashMap clone.

Additional context

Proposed Design

The idea is to split function initialization into two phases: a system initialization that runs once per process, and a session initialization that is reduced to a handful of cheap pointer copies.

System initialization (once) Session initialization (per session)
Build and freeze shared, immutable registries: 3 Arc::clones — one per shared map.
— all config-independent scalar UDFs → Arc<HashMap> Call with_updated_config(session_config) on each prototype → produce 6 per-session scalar instances → insert into a small per-session HashMap.
— all aggregate UDFs → Arc<HashMap> TaskContext creation: 3 more Arc::clones + clone of the small dynamic map.
— all window UDFs → Arc<HashMap>
— prototype instances of the 6 config-dependent scalars → Vec<Arc<ScalarUDF>>

The routing signal

The ScalarUDFImpl::with_updated_config method already exists and already expresses exactly the config-dependency concept. The build() loop already calls it on every scalar UDF. No new trait method is needed: if with_updated_config returns Some, the function is config-dependent and routes to the per-session dynamic map; if it returns None, it routes to the shared static map.

// Routing logic inside build()
for udf in scalar_functions_vec {
    match udf.inner().with_updated_config(config_options) {
        Some(session_udf) => dynamic_map.insert(..., session_udf),   // config-dependent
        None             => static_map.insert(..., udf),              // config-independent
    }
}
let static_arc = Arc::new(static_map);  // freeze once build() is done

The two scalar data structures

Field Type Contents Lifetime
scalar_functions_static Arc<HashMap<String, Arc<ScalarUDF>>> All config-independent scalar UDFs (109 built-in defaults + any user-registered static UDFs). Never written after construction. Process-wide shared
scalar_functions_dynamic HashMap<String, Arc<ScalarUDF>> Config-dependent scalar UDFs only. Today: 6 entries (now, to_timestamp, and 4 variants). Per-session, mutated on SET/RESET. Per session

Aggregate and window functions require no split: they are entirely config-independent by design. Each becomes a single shared Arc<HashMap> with no dynamic counterpart.

Function lookup

FunctionRegistry::udf(name) probes the static map first (hits ~98% of lookups), then falls back to the dynamic map:

fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
    self.scalar_functions_static
        .get(name)
        .or_else(|| self.scalar_functions_dynamic.get(name))
        .cloned()
        .ok_or_else(|| plan_datafusion_err!(...))
}

Initialization

System initialization

Before

No system initialization exists today. Everything is deferred to session creation time. The LazyLock singletons for individual function objects are initialized on first use, but the containers (Vecs and HashMaps) that hold them are always built fresh per session.

After

Three LazyLock-backed static registries are introduced in SessionStateDefaults, built once on first access and shared across all sessions:

// New statics inside SessionStateDefaults
static DEFAULT_SCALAR_REGISTRY: LazyLock<Arc<HashMap<String, Arc<ScalarUDF>>>> =
    LazyLock::new(|| build_static_scalar_map());  // NEW private fn: iterates all_default_functions(),
                                                   //   filters out config-dependent ones, builds HashMap

static DEFAULT_AGGREGATE_REGISTRY: LazyLock<Arc<HashMap<String, Arc<AggregateUDF>>>> =
    LazyLock::new(|| build_aggregate_map());  // NEW private fn: wraps all_default_aggregate_functions() into a HashMap

static DEFAULT_WINDOW_REGISTRY: LazyLock<Arc<HashMap<String, Arc<WindowUDF>>>> =
    LazyLock::new(|| build_window_map());  // NEW private fn: wraps all_default_window_functions() into a HashMap

// Public accessors — each call is one Arc::clone
pub fn default_scalar_registry() -> Arc<HashMap<String, Arc<ScalarUDF>>> {
    Arc::clone(&DEFAULT_SCALAR_REGISTRY)
}
// default_aggregate_registry() and default_window_registry() follow the same pattern

A separate Vec<Arc<ScalarUDF>> of prototype instances is also kept for the 6 config-dependent scalar functions (built once with ConfigOptions::default()). These are used at session creation time to produce per-session instances cheaply via with_updated_config.

Callers who add custom functions to the defaults (see Scenarios 2 and 3) perform their one-time setup here: clone the relevant inner HashMap, insert their functions, and freeze the result into their own Arc.

Session initialization

Before

with_default_features() calls SessionStateDefaults::default_scalar_functions(), default_aggregate_functions(), and default_window_functions() — three uncached functions that allocate fresh Vecs and perform ~165 Arc::clones on every call. build() then walks those Vecs, calls with_updated_config on every scalar UDF (~115 calls), and inserts every function into fresh HashMaps with heap-allocated String keys.

Additionally, task_ctx() — called once per physical plan execution — clones all three HashMaps in full into a new TaskContext, doubling the allocation cost.

After

with_default_features() internally calls default_scalar_registry(), default_aggregate_registry(), and default_window_registry(), assigning the resulting Arcs directly to the builder's static fields. The Vec collection and HashMap construction loops are skipped entirely for config-independent functions. The build() loop only runs over the 6 prototype entries to produce the per-session dynamic map.

task_ctx() now performs 3 Arc::clones for the static maps and a small HashMap::clone of ~6–8 entries for the dynamic map — instead of cloning three full HashMaps.


Scenarios

Scenario 1 — Standard: no custom functions

A caller uses only DataFusion's built-in functions. No startup step is needed in either version. The caller code is identical before and after — the optimisation is entirely internal to with_default_features().

Before — per session After — per session
  • 3 Vec allocations
  • ~165 Arc::clone into Vecs
  • 3 HashMap allocations
  • ~180+ HashMap::insert (names + aliases)
  • ~180+ String key allocations
  • ~115 with_updated_config calls
  • ❌ + TaskContext: 3 more HashMap clones
  • 3 Arc::clone (static maps)
  • 6 new ScalarUDF instances (prototypes)
  • 1 small HashMap allocation (~8 entries)
  • ~8 String key allocations
  • 6 with_updated_config calls
  • ✅ + TaskContext: 3 Arc::clone + 1 small map clone

Before & After — per session

// ── Startup ──────────────────────────────────────────────────────────────────
// (nothing in both versions)

// ── Per session — BEFORE ─────────────────────────────────────────────────────
let state = SessionStateBuilder::new()
    .with_default_features()   // ~165 Arc::clones + fresh Vecs + HashMaps
    .with_config(config)
    .build();

// ── Per session — AFTER (caller code identical) ───────────────────────────────
let state = SessionStateBuilder::new()
    .with_default_features()   // now: 3 Arc::clones + 6 dynamic instances
    .with_config(config)
    .build();

Scenario 2 — Custom immutable functions

A caller registers one custom config-independent scalar UDF (my_scalar_udf) and one custom aggregate UDAF (my_aggregate_udaf). In the new design these are merged into the shared registry once at startup so that per-session cost is identical to Scenario 1.

Before — per session After
  • Same as Scenario 1 before
  • +1 Arc::clone (custom UDF into Vec)
  • +1 Arc::clone (custom UDAF into Vec)
  • +2 HashMap::insert
  • +2 String allocs
  • ❌ All repeated on every session

Startup (once): clone inner HashMaps, insert custom functions, freeze into new Arcs

Per session: identical to Scenario 1 after — custom functions already in shared maps

Before

// ── Startup ──────────────────────────────────────────────────────────────────
// (nothing)

// ── Per session ──────────────────────────────────────────────────────────────
let mut builder = SessionStateBuilder::new()
    .with_default_features()
    .with_config(config);
builder.scalar_functions().get_or_insert_default()
    .push(my_scalar_udf());      // appended to the ~115 DF defaults
builder.aggregate_functions().get_or_insert_default()
    .push(my_aggregate_udaf());  // appended to the ~38 DF defaults
let state = builder.build();    // ~167 scalar + ~39 aggregate HashMap inserts

After

// ── Startup (once) ────────────────────────────────────────────────────────────
let mut scalar_functions = SessionStateDefaults::default_scalar_registry().as_ref().clone();
register_into_map(&mut scalar_functions, my_scalar_udf());  // None from with_updated_config
let scalar_functions = Arc::new(scalar_functions);  // shadow: HashMap → Arc (frozen)

let mut aggregate_functions = SessionStateDefaults::default_aggregate_registry().as_ref().clone();
register_into_map(&mut aggregate_functions, my_aggregate_udaf());
let aggregate_functions = Arc::new(aggregate_functions);  // shadow: HashMap → Arc (frozen)

let window_functions = SessionStateDefaults::default_window_registry();  // no custom window UDFs

// ── Per session ───────────────────────────────────────────────────────────────
// 3 Arc::clones — all 167 functions available, zero HashMap allocs
let state = SessionStateBuilder::new()
    .with_shared_scalar_functions(Arc::clone(&scalar_functions))
    .with_shared_aggregate_functions(Arc::clone(&aggregate_functions))
    .with_shared_window_functions(Arc::clone(&window_functions))
    .with_config(config)
    .build();

Scenario 3 — Custom mutable (config-dependent) function

A caller provides a custom scalar UDF format_timestamp whose return type depends on the session locale. It overrides with_updated_config. The routing signal routes it to the dynamic map automatically. In the new design the build() loop runs only over this one function, not all ~165 defaults.

Before — per session After — per session
  • Same as Scenario 1 before
  • +1 Arc::clone (into Vec)
  • +1 new ScalarUDF instance (from with_updated_config)
  • +1 HashMap::insert
  • +1 String alloc
  • with_updated_config called on all ~115 scalar UDFs needlessly
  • Same as Scenario 1 after
  • +1 new ScalarUDF instance (from prototype)
  • +1 HashMap::insert
  • +1 String alloc
  • with_updated_config called on 7 functions only (6 built-in + 1 custom)

Before

// ── Startup ──────────────────────────────────────────────────────────────────
// (nothing)

// ── Per session ──────────────────────────────────────────────────────────────
// build() calls with_updated_config on all ~115 scalar UDFs
let mut builder = SessionStateBuilder::new()
    .with_default_features()
    .with_config(config);
builder.scalar_functions().get_or_insert_default().push(format_timestamp_udf());
let state = builder.build();

After

// ── Startup (once) ────────────────────────────────────────────────────────────
let prototype = format_timestamp_udf();  // default-config instance, stored on the runtime

// ── Per session ──────────────────────────────────────────────────────────────
// with_default_features() uses Arc::clone for static maps;
// build() loop runs over prototype only — gets Some, routes to dynamic map
let mut builder = SessionStateBuilder::new()
    .with_default_features()
    .with_config(config);
builder.scalar_functions().get_or_insert_default().push(Arc::clone(&prototype));
let state = builder.build();
// format_timestamp is in scalar_functions_dynamic, baked with the session config
// SET statement refreshes it via with_updated_config

API Changes

Internal struct changes

Private fields on SessionState and TaskContext change as follows. These are not directly part of the public API but drive all downstream changes.

Struct Field Before After
SessionState scalar_functions HashMap<String, Arc<ScalarUDF>> split into scalar_functions_static: Arc<HashMap> and scalar_functions_dynamic: HashMap
SessionState aggregate_functions HashMap<String, Arc<AggregateUDF>> Arc<HashMap<String, Arc<AggregateUDF>>>
SessionState window_functions HashMap<String, Arc<WindowUDF>> Arc<HashMap<String, Arc<WindowUDF>>>
TaskContext scalar_functions HashMap<String, Arc<ScalarUDF>> same split as SessionState
TaskContext aggregate_functions HashMap<String, Arc<AggregateUDF>> Arc<HashMap<String, Arc<AggregateUDF>>>
TaskContext window_functions HashMap<String, Arc<WindowUDF>> Arc<HashMap<String, Arc<WindowUDF>>>
SessionStateBuilder build() scalar loop routes all scalar UDFs into one HashMap routes via with_updated_config into static or dynamic map; freezes static into Arc

New public APIs (additive — no breaking change)

Location New symbol Purpose
SessionStateDefaults default_scalar_registry() -> Arc<HashMap<String, Arc<ScalarUDF>>> LazyLock-backed process-wide static scalar registry (config-independent only). One Arc::clone per call.
SessionStateDefaults default_aggregate_registry() -> Arc<HashMap<String, Arc<AggregateUDF>>> Same for aggregates.
SessionStateDefaults default_window_registry() -> Arc<HashMap<String, Arc<WindowUDF>>> Same for window functions.
SessionStateBuilder with_shared_scalar_functions(Arc<HashMap>) Inject a pre-built static scalar registry. When set, build() skips the static construction loop entirely.
SessionStateBuilder with_shared_aggregate_functions(Arc<HashMap>) Same for aggregates.
SessionStateBuilder with_shared_window_functions(Arc<HashMap>) Same for window functions.

Breaking public API changes

ℹ️ Rationale for bundling all breaking changes in one PR: the mandatory breaks (scalar_functions() and TaskContext::new()) cannot be avoided cleanly. Since the PR is already a breaking change, removing the three default_*_functions() methods in the same PR avoids a separate deprecation PR later and leaves the API in a cleaner final state. The three removals are not mandatory — deprecation is a valid alternative path — but the cost of bundling them now is low given the context.

⚠️ Breaking surface is narrow — normal query-serving code uses FunctionRegistry::udf(name), which is unchanged. Only code that iterates all scalar functions, constructs TaskContext directly, or calls the default_*_functions() helpers is affected.

Location Symbol Mandatory? Change Migration
SessionState scalar_functions() -> &HashMap<…> 🔴 Mandatory Cannot return a unified &HashMap across static and dynamic maps without allocation. Split into static_scalar_functions() and dynamic_scalar_functions(). Update callers to use the appropriate getter. Most callers only need FunctionRegistry::udf(name) and are unaffected.
TaskContext scalar_functions() -> &HashMap<…> 🔴 Mandatory Same split as SessionState. Same as above.
TaskContext TaskContext::new(…) 🔴 Mandatory Scalar parameter splits into two; aggregate and window parameters become Arc<HashMap>. Update call sites (tests, custom executors) to pass the split parameters.
SessionStateDefaults default_scalar_functions() -> Vec<…> 🟡 Not mandatory Semantically incompatible with the new design: returns config-dependent functions that are no longer in the static registry. Removing avoids a silent semantic trap for callers who expect all defaults. Replace with default_scalar_registry(). Could be deprecated instead of removed. Recommended to remove given mandatory breaks already present.
SessionStateDefaults default_aggregate_functions() -> Vec<…> 🟡 Not mandatory Semantically equivalent to default_aggregate_registry() but returns a Vec instead of an Arc<HashMap>. Replace with default_aggregate_registry(). Could be deprecated instead of removed. Recommended to remove for consistency.
SessionStateDefaults default_window_functions() -> Vec<…> 🟡 Not mandatory Same as default_aggregate_functions() above. Replace with default_window_registry(). Could be deprecated instead of removed. Recommended to remove for consistency.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions