diff --git a/docs/INTERNALS.md b/docs/INTERNALS.md index b7aa312..09fef9e 100644 --- a/docs/INTERNALS.md +++ b/docs/INTERNALS.md @@ -227,11 +227,13 @@ Three classes for async result handling: - `await` — first call resolves via the internal context's `resolve_handle(handle)`, subsequent calls return cached value - `completed?` — non-blocking check via the internal context's `completed?(handle)` - `handle` — the raw VM notification handle (Integer) +- `or_timeout(duration)` — races `self` against `Restate.sleep(duration)` via `Restate.wait_any`. Returns the future's value if it completes first; raises `Restate::TimeoutError` if the sleep wins. The sleep handle is **not** cancelled when this future wins — `restate-sdk-shared-core` 0.7 exposes no `sys_cancel_handle` primitive (only `sys_cancel_invocation` on a different invocation), so the journal entry remains until the timer fires. Same footprint as TS `RestatePromise.orTimeout` and Java `DurableFuture.withTimeout`. **`DurableCallFuture` < `DurableFuture`** — returned by `Restate.service_call`, `Restate.object_call`, `Restate.workflow_call`. - Two handles: `result_handle` (for await) and `invocation_id_handle` (for ID) - `invocation_id` — lazily resolved on first access - `cancel` — calls `Restate.cancel_invocation(invocation_id)` +- Inherits `or_timeout` from the parent — does **not** auto-cancel the remote invocation on timeout (matches TS/Java SDKs). Callers rescue `Restate::TimeoutError` and invoke `#cancel` themselves if they want the callee stopped. **`SendHandle`** — returned by `Restate.service_send`, `Restate.object_send`, `Restate.workflow_send`. - `invocation_id` — lazily resolved diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index 9601903..b9b54c3 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -270,6 +270,46 @@ Restate.sleep(5.0).await # Sleep for 5 seconds (durable timer) The timer survives crashes — if the handler restarts, it resumes waiting for the remaining time. +### Timeouts + +Race any `DurableFuture` against a deadline via `#or_timeout(seconds)`. +Returns the future's value if it wins; raises `Restate::TimeoutError` +(a `TerminalError` subclass, HTTP 408) if the sleep wins. + +```ruby +# Bound a service call to 5 seconds. +result = Worker.call.process(task).or_timeout(5) + +# Works on any DurableFuture — sleeps, run-blocks, etc. +Restate.run('expensive') { compute }.or_timeout(10) +``` + +Timeout does **not** cancel the underlying work — matches the TS and +Java SDKs. To stop the remote invocation on a service call, rescue +the error and call `#cancel`: + +```ruby +future = Worker.call.process(task) +begin + future.or_timeout(5) +rescue Restate::TimeoutError + future.cancel + raise +end +``` + +**Caveat — orphan sleep**: the underlying shared-core VM has no +primitive to cancel an in-flight sleep handle (only +`sys_cancel_invocation` for a separate invocation), so when the +future wins the race the sleep stays in the journal until the +duration elapses. The wake-up is a no-op against a completed +handler but keeps the invocation row alive in Restate's state for +the deadline window. For long deadlines on workflows whose +retention you care about, route the timer through a separate +cancellable invocation (delayed `Restate.service_send` to a small +trigger service that resolves an awakeable) and cancel the +`SendHandle` on success. + ### Service Communication #### Fluent Call API (Recommended) @@ -912,6 +952,25 @@ rescue Restate::TerminalError => e end ``` +#### TimeoutError + +`Restate::TimeoutError` is a `TerminalError` subclass raised by +`DurableFuture#or_timeout` when the deadline elapses before the +future completes (HTTP status 408). Because it inherits from +`TerminalError`, the same `rescue` block catches it uniformly: + +```ruby +begin + Worker.call.process(task).or_timeout(5) +rescue Restate::TimeoutError => e + # specific timeout handling +rescue Restate::TerminalError => e + # any other terminal failure +end +``` + +See the [Timeouts](#timeouts) section above for the full API. + ### Transient Errors Any `StandardError` (other than `TerminalError`) triggers a retry of the entire invocation. @@ -1137,7 +1196,7 @@ The `examples/` directory contains runnable examples: | `durable_execution.rb` | `Restate.run`, `Restate.run_sync`, `background: true`, `RunRetryPolicy`, `TerminalError` | | `virtual_objects.rb` | Declarative state, `handler` vs `shared`, `state_keys`, `clear_all` | | `workflow.rb` | Declarative state, promises, signals | -| `service_communication.rb` | Fluent call API, fan-out/fan-in, `wait_any`, awakeables | +| `service_communication.rb` | Fluent call API, fan-out/fan-in, `wait_any`, `or_timeout`, awakeables | | `typed_handlers.rb` | `input:`/`output:` with `Dry::Struct`, JSON Schema generation | | `service_configuration.rb` | Service-level config: timeouts, retention, retry policy, lazy state | | `deadlock_detection.rb` | Built-in deadlock detection middleware for VirtualObjects | diff --git a/examples/service_communication.rb b/examples/service_communication.rb index d65c3a3..6af8cb1 100644 --- a/examples/service_communication.rb +++ b/examples/service_communication.rb @@ -13,6 +13,7 @@ # - Restate.service_call / Restate.service_send — explicit RPC (same thing, verbose) # - Fan-out/fan-in — launch concurrent calls, collect results # - Restate.wait_any — race multiple futures, handle first completer +# - future.or_timeout(seconds) — bound a single future with a deadline # - Restate.awakeable — pause until an external system calls back # # Try it: @@ -59,6 +60,18 @@ class FanOut < Restate::Service completed.first.await end + # Bound a single call with a deadline. +or_timeout+ races the call + # against a +Restate.sleep+ and raises +Restate::TimeoutError+ if the + # sleep wins. The remote invocation keeps running unless you cancel + # it explicitly — see the rescue below. + handler def with_deadline(task) + future = Worker.call.process(task) + future.or_timeout(5) + rescue Restate::TimeoutError => e + future.cancel + { 'task' => task, 'error' => e.message, 'status_code' => e.status_code } + end + # Awakeable: pause until an external system resolves the callback. handler def with_callback(task) awakeable_id, future = Restate.awakeable diff --git a/lib/restate/durable_future.rb b/lib/restate/durable_future.rb index 422235f..85887c3 100644 --- a/lib/restate/durable_future.rb +++ b/lib/restate/durable_future.rb @@ -33,6 +33,19 @@ def await def completed? @resolved || @ctx.completed?(@handle) end + + # Race +self+ against +Restate.sleep(duration)+. Returns the value + # if the future wins; raises {Restate::TimeoutError} otherwise. + # Does not cancel the underlying work (matches TS/Java SDKs); on + # a {DurableCallFuture}, call +#cancel+ in the rescue if you want + # the remote invocation stopped. + def or_timeout(duration) + sleep_future = Restate.sleep(duration) + Restate.wait_any(self, sleep_future) + return await if completed? + + raise TimeoutError + end end # A durable future for service/object/workflow calls. diff --git a/lib/restate/errors.rb b/lib/restate/errors.rb index 6853124..60ded74 100644 --- a/lib/restate/errors.rb +++ b/lib/restate/errors.rb @@ -17,6 +17,18 @@ def initialize(message = 'Internal Server Error', status_code: 500, metadata: ni end end + # Raised by {DurableFuture#or_timeout} when the timeout elapses + # before the future completes. Mirrors +TimeoutError+ in the + # TypeScript SDK (408 Request Timeout) and +TimeoutException+ in + # the Java SDK. Inherits from {TerminalError} so the same + # +rescue Restate::TerminalError+ block in user handlers catches + # timeouts alongside other terminal failures. + class TimeoutError < TerminalError + def initialize(message = 'Timeout occurred', metadata: nil) + super(message, status_code: 408, metadata: metadata) + end + end + # Internal: raised when the VM suspends execution. # User code should NOT catch this. class SuspendedError < StandardError diff --git a/sig/restate.rbs b/sig/restate.rbs index 66c2c89..4ed8d4c 100644 --- a/sig/restate.rbs +++ b/sig/restate.rbs @@ -73,6 +73,10 @@ module Restate def initialize: (?String message, ?status_code: Integer, ?metadata: Hash[String, String]?) -> void end + class TimeoutError < TerminalError + def initialize: (?String message, ?metadata: Hash[String, String]?) -> void + end + class SuspendedError < StandardError def initialize: () -> void end @@ -92,6 +96,7 @@ module Restate def initialize: (untyped ctx, Integer handle, ?serde: untyped) -> void def await: () -> untyped def completed?: () -> bool + def or_timeout: (Numeric duration) -> untyped end class DurableCallFuture < DurableFuture diff --git a/spec/or_timeout_spec.rb b/spec/or_timeout_spec.rb new file mode 100644 index 0000000..710c0c4 --- /dev/null +++ b/spec/or_timeout_spec.rb @@ -0,0 +1,125 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'restate' + +# Unit coverage for +DurableFuture#or_timeout+ and +# +DurableCallFuture#or_timeout+. Stubs the +Restate+ module-level +# helpers (+sleep+, +wait_any+) so the spec doesn't need a live VM — +# this is intentionally an SDK-shape check, not an end-to-end +# integration. The +test-services/+ suite covers the live-VM path. +RSpec.describe Restate::DurableFuture do + # Minimal fake context that just records calls. Real Server::Context + # is too heavy for this spec — we only need +resolve_handle+ and + # +completed?+ to respond. + let(:ctx) { double('ctx', resolve_handle: 'ignored', completed?: false) } + + describe '#or_timeout' do + context 'when the future completes before the sleep' do + it "returns the future's value" do + future = described_class.new(ctx, :handle_a) + sleep_future = described_class.new(ctx, :handle_sleep) + + allow(Restate).to receive(:sleep).with(5).and_return(sleep_future) + allow(Restate).to receive(:wait_any) do + # Race outcome: the work future wins. + allow(future).to receive(:completed?).and_return(true) + allow(future).to receive(:await).and_return('done') + nil + end + + expect(future.or_timeout(5)).to eq('done') + end + end + + context 'when the sleep wins the race' do + it 'raises Restate::TimeoutError' do + future = described_class.new(ctx, :handle_a) + sleep_future = described_class.new(ctx, :handle_sleep) + + allow(Restate).to receive(:sleep).with(5).and_return(sleep_future) + allow(Restate).to receive(:wait_any) do + # Race outcome: the sleep wins; the work future stays incomplete. + allow(future).to receive(:completed?).and_return(false) + nil + end + + expect { future.or_timeout(5) }.to raise_error(Restate::TimeoutError) + end + + it 'attaches HTTP status 408 (Request Timeout)' do + future = described_class.new(ctx, :handle_a) + allow(Restate).to receive(:sleep).and_return(described_class.new(ctx, :sleep)) + allow(Restate).to receive(:wait_any) do + allow(future).to receive(:completed?).and_return(false) + nil + end + + future.or_timeout(5) + rescue Restate::TimeoutError => e + expect(e.status_code).to eq(408) + end + end + end +end + +RSpec.describe Restate::DurableCallFuture do + let(:ctx) do + double('ctx', resolve_handle: 'inv_xyz', completed?: false, cancel_invocation: nil) + end + + def build_call_future + described_class.new(ctx, :result_handle, :invocation_id_handle, output_serde: nil) + end + + # Matches TS/Java SDKs: timeout never auto-cancels the underlying call. + # Users who want that behavior rescue +TimeoutError+ and call +#cancel+. + describe '#or_timeout' do + context 'when the call completes before the sleep' do + it "returns the call's value and does not cancel" do + future = build_call_future + sleep_future = Restate::DurableFuture.new(ctx, :handle_sleep) + + allow(Restate).to receive(:sleep).with(5).and_return(sleep_future) + allow(Restate).to receive(:wait_any) do + allow(future).to receive(:completed?).and_return(true) + allow(future).to receive(:await).and_return({ 'ok' => true }) + nil + end + + expect(future.or_timeout(5)).to eq({ 'ok' => true }) + expect(ctx).not_to have_received(:cancel_invocation) + end + end + + context 'when the sleep wins the race' do + it 'raises TimeoutError without cancelling the remote invocation' do + future = build_call_future + sleep_future = Restate::DurableFuture.new(ctx, :handle_sleep) + + allow(Restate).to receive(:sleep).with(5).and_return(sleep_future) + allow(Restate).to receive(:wait_any) do + allow(future).to receive(:completed?).and_return(false) + nil + end + + expect { future.or_timeout(5) }.to raise_error(Restate::TimeoutError) + expect(ctx).not_to have_received(:cancel_invocation) + end + end + end +end + +RSpec.describe Restate::TimeoutError do + it 'inherits from TerminalError so user rescue blocks catch it uniformly' do + expect(described_class.ancestors).to include(Restate::TerminalError) + end + + it 'defaults to HTTP status 408 (Request Timeout) matching the TS SDK' do + expect(described_class.new.status_code).to eq(408) + end + + it 'has a meaningful default message' do + expect(described_class.new.message).to eq('Timeout occurred') + end +end