Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,23 @@ config :grpc, GRPC.Client.Adapters.Mint,

The accepted options are the same as [`Mint.HTTP.connect/4`](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options).

#### Automatic Reconnection

The Mint adapter supports automatic reconnection when the underlying HTTP/2 connection drops (e.g. server restart, network interruption). To enable it, pass the `:retry` option via `adapter_opts`:

```elixir
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051",
...> adapter: GRPC.Client.Adapters.Mint,
...> adapter_opts: [retry: 5]
...> )
```

When the connection drops, the adapter will attempt to reconnect up to `retry` times using **exponential backoff with jitter**. The delay starts at ~1 second and grows up to a maximum of 120 seconds. If all attempts are exhausted, the parent process receives a `{:elixir_grpc, :connection_down, pid}` message.

By default, `:retry` is `0` (no reconnection attempts).

> **Note:** Any in-flight requests at the time of the drop will fail immediately. Reconnection only re-establishes the transport connection — it does not replay requests.

---

### **HTTP Transcoding**
Expand Down
17 changes: 17 additions & 0 deletions grpc_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,23 @@ config :grpc, GRPC.Client.Adapters.Mint,

The accepted options are the same as [`Mint.HTTP.connect/4`](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options).

#### Automatic Reconnection

The Mint adapter supports automatic reconnection when the underlying HTTP/2 connection drops (e.g. server restart, network interruption). To enable it, pass the `:retry` option via `adapter_opts`:

```elixir
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051",
...> adapter: GRPC.Client.Adapters.Mint,
...> adapter_opts: [retry: 5]
...> )
```

When the connection drops, the adapter will attempt to reconnect up to `retry` times using **exponential backoff with jitter**. The delay starts at ~1 second and grows up to a maximum of 120 seconds. If all attempts are exhausted, the parent process receives a `{:elixir_grpc, :connection_down, pid}` message.

By default, `:retry` is `0` (no reconnection attempts).

> **Note:** Any in-flight requests at the time of the drop will fail immediately. Reconnection only re-establishes the transport connection — it does not replay requests.

---

## Contributing
Expand Down
10 changes: 8 additions & 2 deletions grpc_client/lib/grpc/client/adapters/mint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ defmodule GRPC.Client.Adapters.Mint do
window size ensures that the number of packages exchanges is smaller, thus speeding up the requests by reducing the
amount of networks round trip, with the cost of having larger packages reaching the server per connection.
Check [Mint.HTTP2.setting() type](https://hexdocs.pm/mint/Mint.HTTP2.html#t:setting/0) for additional configs.
* `:retry`: Number of reconnection attempts when the connection drops. Defaults to `0` (no retries).
Uses exponential backoff with jitter between attempts.
"""
@impl true
def connect(%{host: host, port: port} = channel, opts \\ []) do
# Added :config_options to facilitate testing.
{config_opts, opts} = Keyword.pop(opts, :config_options, [])
{retry, opts} = Keyword.pop(opts, :retry, 0)
module_opts = Application.get_env(:grpc, __MODULE__, config_opts)

opts = connect_opts(channel, opts) |> merge_opts(module_opts)
opts =
channel
|> connect_opts(opts)
|> merge_opts(module_opts)
|> Keyword.put(:retry, retry)

Process.flag(:trap_exit, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,20 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do

@impl true
def init({scheme, host, port, opts}) do
{retry, opts} = Keyword.pop(opts, :retry, 0)

case Mint.HTTP.connect(scheme, host, port, opts) do
{:ok, conn} ->
{:ok, State.new(conn, opts[:parent])}
state_opts = [
parent: opts[:parent],
scheme: scheme,
host: host,
port: port,
connect_opts: opts,
retry: retry
]

{:ok, State.new(conn, state_opts)}

{:error, reason} ->
Logger.error(
Expand Down Expand Up @@ -178,6 +189,10 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
end

@impl true
def handle_info(:reconnect, state) do
attempt_reconnect(state)
end

def handle_info(message, state) do
case Mint.HTTP.stream(state.conn, message) do
:unknown ->
Expand Down Expand Up @@ -378,24 +393,76 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
new_state
end)

# Inform the parent that the connection is down
send(new_state.parent, {:elixir_grpc, :connection_down, self()})

new_state.requests
|> Enum.each(fn {ref, _} ->
new_state
|> State.stream_response_pid(ref)
|> send_connection_close_and_end_stream_response()
end)

{:noreply, State.update_request_stream_queue(%{new_state | requests: %{}}, :queue.new())}
clean_state = State.update_request_stream_queue(%{new_state | requests: %{}}, :queue.new())

if clean_state.retry > 0 do
attempt_reconnect(clean_state)
else
send(clean_state.parent, {:elixir_grpc, :connection_down, self()})
{:noreply, clean_state}
end
end

defp send_connection_close_and_end_stream_response(pid) do
:ok = StreamResponseProcess.consume(pid, :error, @connection_closed_error)
:ok = StreamResponseProcess.done(pid)
end

defp attempt_reconnect(%{retry: max, retry_attempt: attempt} = state)
when attempt >= max do
Logger.warning(
"Connection retry exhausted (#{attempt}/#{max}) for #{state.scheme}://#{state.host}:#{state.port}"
)

send(state.parent, {:elixir_grpc, :connection_down, self()})
{:noreply, state}
end

defp attempt_reconnect(state) do
next_attempt = state.retry_attempt + 1

Logger.info(
"Attempting reconnection #{next_attempt}/#{state.retry} to #{state.scheme}://#{state.host}:#{state.port}"
)

case Mint.HTTP.connect(state.scheme, state.host, state.port, state.connect_opts) do
{:ok, conn} ->
Logger.info("Reconnected successfully to #{state.scheme}://#{state.host}:#{state.port}")

new_state = %{state | conn: conn, retry_attempt: 0}
{:noreply, new_state}

{:error, reason} ->
Logger.warning(
"Reconnection attempt #{next_attempt}/#{state.retry} failed: #{inspect(reason)}"
)

timeout = retry_timeout(next_attempt)
Process.send_after(self(), :reconnect, timeout)
{:noreply, %{state | retry_attempt: next_attempt}}
end
end

@doc false
def retry_timeout(attempt) do
timeout =
if attempt < 11 do
:math.pow(1.6, attempt - 1) * 1000
else
120_000
end

jitter = (:rand.uniform_real() - 0.5) / 2.5
round(timeout + jitter * timeout)
end

defp check_connection_status(state) do
if Mint.HTTP.open?(state.conn) do
check_request_stream_queue(state)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,42 @@
defmodule GRPC.Client.Adapters.Mint.ConnectionProcess.State do
@moduledoc false

defstruct [:conn, :parent, requests: %{}, request_stream_queue: :queue.new()]
defstruct [
:conn,
:parent,
:scheme,
:host,
:port,
:connect_opts,
requests: %{},
request_stream_queue: :queue.new(),
retry: 0,
retry_attempt: 0
]

@type t :: %__MODULE__{
conn: Mint.HTTP.t(),
requests: map(),
parent: pid()
parent: pid(),
scheme: Mint.Types.scheme() | nil,
host: Mint.Types.address() | nil,
port: :inet.port_number() | nil,
connect_opts: keyword(),
retry: non_neg_integer(),
retry_attempt: non_neg_integer()
}

def new(conn, parent) do
%__MODULE__{conn: conn, request_stream_queue: :queue.new(), parent: parent}
def new(conn, opts) do
%__MODULE__{
conn: conn,
request_stream_queue: :queue.new(),
parent: opts[:parent],
scheme: opts[:scheme],
host: opts[:host],
port: opts[:port],
connect_opts: opts[:connect_opts] || [],
retry: opts[:retry] || 0
}
end

def update_conn(state, conn) do
Expand Down
13 changes: 0 additions & 13 deletions grpc_client/lib/grpc/stub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,6 @@ defmodule GRPC.Stub do
connect("#{ip_type}:#{host}:#{port}", opts)
end

def retry_timeout(curr) when curr < 11 do
timeout =
if curr < 11 do
:math.pow(1.6, curr - 1) * 1000
else
120_000
end

jitter = (:rand.uniform_real() - 0.5) / 2.5

round(timeout + jitter * timeout)
end

@doc """
Disconnects the adapter and frees any resources the adapter is consuming
"""
Expand Down
119 changes: 117 additions & 2 deletions grpc_client/test/grpc/adapters/mint/connection_process_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,53 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
assert_receive {:elixir_grpc, :connection_down, pid}, 500
assert pid == self()
end

test "does not attempt reconnect when retry is 0", %{
state: state
} do
socket = state.conn.socket
tcp_message = {:tcp_closed, socket}

assert {:noreply, new_state} = ConnectionProcess.handle_info(tcp_message, state)
assert new_state.conn.state == :closed
assert new_state.retry == 0
assert_receive {:elixir_grpc, :connection_down, _pid}, 500
end
end

describe "handle_info - connection_closed - with retry" do
setup :valid_connection_with_retry

test "attempts reconnect when retry > 0 and connection drops", %{
state: state
} do
socket = state.conn.socket
tcp_message = {:tcp_closed, socket}

assert {:noreply, new_state} = ConnectionProcess.handle_info(tcp_message, state)
assert new_state.conn.state != :closed
assert new_state.retry_attempt == 0
refute_receive {:elixir_grpc, :connection_down, _pid}, 200
end

test "notifies parent when all retry attempts are exhausted", %{
state: state,
port: port
} do
:ok = GRPC.Server.stop(FeatureServer)

logs =
capture_log(fn ->
exhausted_state = %{state | retry: 1, retry_attempt: 1}
result = ConnectionProcess.handle_info(:reconnect, exhausted_state)
assert {:noreply, _} = result
assert_receive {:elixir_grpc, :connection_down, _pid}, 500
end)

assert logs =~ "Connection retry exhausted"

{:ok, _, _} = GRPC.Server.start(FeatureServer, port)
end
end

describe "handle_info - connection_closed - with request" do
Expand Down Expand Up @@ -417,8 +464,73 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
end
end

defp valid_connection(%{port: port}) do
{:ok, pid} = ConnectionProcess.start_link(:http, "localhost", port, protocols: [:http2])
describe "retry_timeout/1" do
test "returns exponentially increasing timeouts" do
t1 = ConnectionProcess.retry_timeout(1)
t2 = ConnectionProcess.retry_timeout(2)
t5 = ConnectionProcess.retry_timeout(5)

assert t1 >= 800 and t1 <= 1200
assert t2 > t1
assert t5 > t2
end

test "caps at 120 seconds for attempt >= 11" do
t11 = ConnectionProcess.retry_timeout(11)
t15 = ConnectionProcess.retry_timeout(15)

assert t11 >= 96_000 and t11 <= 144_000
assert t15 >= 96_000 and t15 <= 144_000
end
end

describe "handle_info :reconnect" do
setup :valid_connection_with_retry

test "successfully reconnects when server is available", %{
state: state
} do
failed_state = %{state | retry_attempt: 1}

logs =
capture_log(fn ->
assert {:noreply, new_state} = ConnectionProcess.handle_info(:reconnect, failed_state)
assert Mint.HTTP.open?(new_state.conn)
assert new_state.retry_attempt == 0
end)

assert logs =~ "Reconnected successfully"
end

test "schedules another reconnect when server is unavailable", %{
state: state,
port: port
} do
:ok = GRPC.Server.stop(FeatureServer)

logs =
capture_log(fn ->
failed_state = %{state | retry_attempt: 0}
assert {:noreply, new_state} = ConnectionProcess.handle_info(:reconnect, failed_state)
assert new_state.retry_attempt == 1
assert_receive :reconnect, 5_000
end)

assert logs =~ "Reconnection attempt 1/"

{:ok, _, _} = GRPC.Server.start(FeatureServer, port)
end
end

defp valid_connection(%{port: port}, opts \\ []) do
{:ok, pid} =
ConnectionProcess.start_link(
:http,
"localhost",
port,
Keyword.merge([protocols: [:http2]], opts)
)

state = :sys.get_state(pid)
version = Application.spec(:grpc_client) |> Keyword.get(:vsn)

Expand All @@ -431,6 +543,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
%{
process_pid: pid,
state: state,
port: port,
request: {"POST", "/routeguide.RouteGuide/RecordRoute", headers}
}
end
Expand All @@ -455,4 +568,6 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do

%{state | requests: %{request_ref => %{request_ref_state | stream_response_pid: test_pid}}}
end

defp valid_connection_with_retry(ctx), do: valid_connection(ctx, retry: 3)
end
Loading
Loading