Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/INTERNALS.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,13 @@ Three classes for async result handling:
- `await` — first call resolves via the internal context's `resolve_handle(handle)`, subsequent calls return cached value
- `completed?` — non-blocking check via the internal context's `completed?(handle)`
- `handle` — the raw VM notification handle (Integer)
- `or_timeout(duration)` — races `self` against `Restate.sleep(duration)` via `Restate.wait_any`. Returns the future's value if it completes first; raises `Restate::TimeoutError` if the sleep wins. The sleep handle is **not** cancelled when this future wins — `restate-sdk-shared-core` 0.7 exposes no `sys_cancel_handle` primitive (only `sys_cancel_invocation` on a different invocation), so the journal entry remains until the timer fires. Same footprint as TS `RestatePromise.orTimeout` and Java `DurableFuture.withTimeout`.

**`DurableCallFuture` < `DurableFuture`** — returned by `Restate.service_call`, `Restate.object_call`, `Restate.workflow_call`.
- Two handles: `result_handle` (for await) and `invocation_id_handle` (for ID)
- `invocation_id` — lazily resolved on first access
- `cancel` — calls `Restate.cancel_invocation(invocation_id)`
- Inherits `or_timeout` from the parent — does **not** auto-cancel the remote invocation on timeout (matches TS/Java SDKs). Callers rescue `Restate::TimeoutError` and invoke `#cancel` themselves if they want the callee stopped.

**`SendHandle`** — returned by `Restate.service_send`, `Restate.object_send`, `Restate.workflow_send`.
- `invocation_id` — lazily resolved
Expand Down
61 changes: 60 additions & 1 deletion docs/USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,46 @@ Restate.sleep(5.0).await # Sleep for 5 seconds (durable timer)

The timer survives crashes — if the handler restarts, it resumes waiting for the remaining time.

### Timeouts

Race any `DurableFuture` against a deadline via `#or_timeout(seconds)`.
Returns the future's value if it wins; raises `Restate::TimeoutError`
(a `TerminalError` subclass, HTTP 408) if the sleep wins.

```ruby
# Bound a service call to 5 seconds.
result = Worker.call.process(task).or_timeout(5)

# Works on any DurableFuture — sleeps, run-blocks, etc.
Restate.run('expensive') { compute }.or_timeout(10)
```

Timeout does **not** cancel the underlying work — matches the TS and
Java SDKs. To stop the remote invocation on a service call, rescue
the error and call `#cancel`:

```ruby
future = Worker.call.process(task)
begin
future.or_timeout(5)
rescue Restate::TimeoutError
future.cancel
raise
end
```

**Caveat — orphan sleep**: the underlying shared-core VM has no
primitive to cancel an in-flight sleep handle (only
`sys_cancel_invocation` for a separate invocation), so when the
future wins the race the sleep stays in the journal until the
duration elapses. The wake-up is a no-op against a completed
handler but keeps the invocation row alive in Restate's state for
the deadline window. For long deadlines on workflows whose
retention you care about, route the timer through a separate
cancellable invocation (delayed `Restate.service_send` to a small
trigger service that resolves an awakeable) and cancel the
`SendHandle` on success.

### Service Communication

#### Fluent Call API (Recommended)
Expand Down Expand Up @@ -912,6 +952,25 @@ rescue Restate::TerminalError => e
end
```

#### TimeoutError

`Restate::TimeoutError` is a `TerminalError` subclass raised by
`DurableFuture#or_timeout` when the deadline elapses before the
future completes (HTTP status 408). Because it inherits from
`TerminalError`, the same `rescue` block catches it uniformly:

```ruby
begin
Worker.call.process(task).or_timeout(5)
rescue Restate::TimeoutError => e
# specific timeout handling
rescue Restate::TerminalError => e
# any other terminal failure
end
```

See the [Timeouts](#timeouts) section above for the full API.

### Transient Errors

Any `StandardError` (other than `TerminalError`) triggers a retry of the entire invocation.
Expand Down Expand Up @@ -1137,7 +1196,7 @@ The `examples/` directory contains runnable examples:
| `durable_execution.rb` | `Restate.run`, `Restate.run_sync`, `background: true`, `RunRetryPolicy`, `TerminalError` |
| `virtual_objects.rb` | Declarative state, `handler` vs `shared`, `state_keys`, `clear_all` |
| `workflow.rb` | Declarative state, promises, signals |
| `service_communication.rb` | Fluent call API, fan-out/fan-in, `wait_any`, awakeables |
| `service_communication.rb` | Fluent call API, fan-out/fan-in, `wait_any`, `or_timeout`, awakeables |
| `typed_handlers.rb` | `input:`/`output:` with `Dry::Struct`, JSON Schema generation |
| `service_configuration.rb` | Service-level config: timeouts, retention, retry policy, lazy state |
| `deadlock_detection.rb` | Built-in deadlock detection middleware for VirtualObjects |
Expand Down
13 changes: 13 additions & 0 deletions examples/service_communication.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# - Restate.service_call / Restate.service_send — explicit RPC (same thing, verbose)
# - Fan-out/fan-in — launch concurrent calls, collect results
# - Restate.wait_any — race multiple futures, handle first completer
# - future.or_timeout(seconds) — bound a single future with a deadline
# - Restate.awakeable — pause until an external system calls back
#
# Try it:
Expand Down Expand Up @@ -59,6 +60,18 @@ class FanOut < Restate::Service
completed.first.await
end

# Bound a single call with a deadline. +or_timeout+ races the call
# against a +Restate.sleep+ and raises +Restate::TimeoutError+ if the
# sleep wins. The remote invocation keeps running unless you cancel
# it explicitly — see the rescue below.
handler def with_deadline(task)
future = Worker.call.process(task)
future.or_timeout(5)
rescue Restate::TimeoutError => e
future.cancel
{ 'task' => task, 'error' => e.message, 'status_code' => e.status_code }
end

# Awakeable: pause until an external system resolves the callback.
handler def with_callback(task)
awakeable_id, future = Restate.awakeable
Expand Down
13 changes: 13 additions & 0 deletions lib/restate/durable_future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ def await
def completed?
@resolved || @ctx.completed?(@handle)
end

# Race +self+ against +Restate.sleep(duration)+. Returns the value
# if the future wins; raises {Restate::TimeoutError} otherwise.
# Does not cancel the underlying work (matches TS/Java SDKs); on
# a {DurableCallFuture}, call +#cancel+ in the rescue if you want
# the remote invocation stopped.
def or_timeout(duration)
Comment thread
junyuanz1 marked this conversation as resolved.
sleep_future = Restate.sleep(duration)
Restate.wait_any(self, sleep_future)
return await if completed?

raise TimeoutError
end
end

# A durable future for service/object/workflow calls.
Expand Down
12 changes: 12 additions & 0 deletions lib/restate/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ def initialize(message = 'Internal Server Error', status_code: 500, metadata: ni
end
end

# Raised by {DurableFuture#or_timeout} when the timeout elapses
# before the future completes. Mirrors +TimeoutError+ in the
# TypeScript SDK (408 Request Timeout) and +TimeoutException+ in
# the Java SDK. Inherits from {TerminalError} so the same
# +rescue Restate::TerminalError+ block in user handlers catches
# timeouts alongside other terminal failures.
class TimeoutError < TerminalError
def initialize(message = 'Timeout occurred', metadata: nil)
super(message, status_code: 408, metadata: metadata)
end
end

# Internal: raised when the VM suspends execution.
# User code should NOT catch this.
class SuspendedError < StandardError
Expand Down
5 changes: 5 additions & 0 deletions sig/restate.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ module Restate
def initialize: (?String message, ?status_code: Integer, ?metadata: Hash[String, String]?) -> void
end

class TimeoutError < TerminalError
def initialize: (?String message, ?metadata: Hash[String, String]?) -> void
end

class SuspendedError < StandardError
def initialize: () -> void
end
Expand All @@ -92,6 +96,7 @@ module Restate
def initialize: (untyped ctx, Integer handle, ?serde: untyped) -> void
def await: () -> untyped
def completed?: () -> bool
def or_timeout: (Numeric duration) -> untyped
end

class DurableCallFuture < DurableFuture
Expand Down
125 changes: 125 additions & 0 deletions spec/or_timeout_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# frozen_string_literal: true

require 'spec_helper'
require 'restate'

# Unit coverage for +DurableFuture#or_timeout+ and
# +DurableCallFuture#or_timeout+. Stubs the +Restate+ module-level
# helpers (+sleep+, +wait_any+) so the spec doesn't need a live VM —
# this is intentionally an SDK-shape check, not an end-to-end
# integration. The +test-services/+ suite covers the live-VM path.
RSpec.describe Restate::DurableFuture do
# Minimal fake context that just records calls. Real Server::Context
# is too heavy for this spec — we only need +resolve_handle+ and
# +completed?+ to respond.
let(:ctx) { double('ctx', resolve_handle: 'ignored', completed?: false) }

describe '#or_timeout' do
context 'when the future completes before the sleep' do
it "returns the future's value" do
future = described_class.new(ctx, :handle_a)
sleep_future = described_class.new(ctx, :handle_sleep)

allow(Restate).to receive(:sleep).with(5).and_return(sleep_future)
allow(Restate).to receive(:wait_any) do
# Race outcome: the work future wins.
allow(future).to receive(:completed?).and_return(true)
allow(future).to receive(:await).and_return('done')
nil
end

expect(future.or_timeout(5)).to eq('done')
end
end

context 'when the sleep wins the race' do
it 'raises Restate::TimeoutError' do
future = described_class.new(ctx, :handle_a)
sleep_future = described_class.new(ctx, :handle_sleep)

allow(Restate).to receive(:sleep).with(5).and_return(sleep_future)
allow(Restate).to receive(:wait_any) do
# Race outcome: the sleep wins; the work future stays incomplete.
allow(future).to receive(:completed?).and_return(false)
nil
end

expect { future.or_timeout(5) }.to raise_error(Restate::TimeoutError)
end

it 'attaches HTTP status 408 (Request Timeout)' do
future = described_class.new(ctx, :handle_a)
allow(Restate).to receive(:sleep).and_return(described_class.new(ctx, :sleep))
allow(Restate).to receive(:wait_any) do
allow(future).to receive(:completed?).and_return(false)
nil
end

future.or_timeout(5)
rescue Restate::TimeoutError => e
expect(e.status_code).to eq(408)
end
end
end
end

RSpec.describe Restate::DurableCallFuture do
let(:ctx) do
double('ctx', resolve_handle: 'inv_xyz', completed?: false, cancel_invocation: nil)
end

def build_call_future
described_class.new(ctx, :result_handle, :invocation_id_handle, output_serde: nil)
end

# Matches TS/Java SDKs: timeout never auto-cancels the underlying call.
# Users who want that behavior rescue +TimeoutError+ and call +#cancel+.
describe '#or_timeout' do
context 'when the call completes before the sleep' do
it "returns the call's value and does not cancel" do
future = build_call_future
sleep_future = Restate::DurableFuture.new(ctx, :handle_sleep)

allow(Restate).to receive(:sleep).with(5).and_return(sleep_future)
allow(Restate).to receive(:wait_any) do
allow(future).to receive(:completed?).and_return(true)
allow(future).to receive(:await).and_return({ 'ok' => true })
nil
end

expect(future.or_timeout(5)).to eq({ 'ok' => true })
expect(ctx).not_to have_received(:cancel_invocation)
end
end

context 'when the sleep wins the race' do
it 'raises TimeoutError without cancelling the remote invocation' do
future = build_call_future
sleep_future = Restate::DurableFuture.new(ctx, :handle_sleep)

allow(Restate).to receive(:sleep).with(5).and_return(sleep_future)
allow(Restate).to receive(:wait_any) do
allow(future).to receive(:completed?).and_return(false)
nil
end

expect { future.or_timeout(5) }.to raise_error(Restate::TimeoutError)
expect(ctx).not_to have_received(:cancel_invocation)
end
end
end
end

RSpec.describe Restate::TimeoutError do
it 'inherits from TerminalError so user rescue blocks catch it uniformly' do
expect(described_class.ancestors).to include(Restate::TerminalError)
end

it 'defaults to HTTP status 408 (Request Timeout) matching the TS SDK' do
expect(described_class.new.status_code).to eq(408)
end

it 'has a meaningful default message' do
expect(described_class.new.message).to eq('Timeout occurred')
end
end
Loading