Skip to content

Commit adc6495

Browse files
authored
Merge pull request #276 from nash8114/master
Adding TTL to proxied messages
2 parents 613d851 + 15ac1f9 commit adc6495

File tree

5 files changed

+201
-9
lines changed

5 files changed

+201
-9
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
# Changelog
22

3+
## 0.10.0
4+
5+
- Added optional TTL to Horde.DynamicSupervisor's `:proxy_operation` messages. The Time-to-Live defaults to :infinity for full backwards compatibility. This TTL helps prevent potential issues where messages could loop forever between a set of nodes which disagree on which node should execute the task.
6+
- [BREAKING] Horde.DynamicSupervisor's new `:proxy_message_ttl` option configures the maximum TTL for proxy messages. It takes an integer denoting the maximum number of hops a message can travel, or the atom :infinity (default). This can be a breaking change: when upgrading do not set this option to an integer. You can explicity set it to :infinity or leave it default. If this is set to an integer, upgraded nodes won't be able to proxy to non-upgrade nodes.
7+
38
## 0.9.1
49

510
- Fix race condition in registry when node disconnects
611
- Pass `extra_arguments` flag to the ProcessSupervisor
712
- Updating libring dependency to ~> 1.7. Needed for upgrade to OTP 27. See [this PR to libring](https://github.com/bitwalker/libring/pull/37) for details.
813

14+
915
## 0.9.0
1016

1117
- Bugfixes for scenarios causing Horde to crash. See [#266](https://github.com/derekkraan/horde/pull/266) and [#263](https://github.com/derekkraan/horde/pull/263).

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ Daniel Azuma gave [a great talk](https://www.youtube.com/watch?v=nLApFANtkHs) at
1313

1414
Since Horde is built on CRDTs, it is eventually (as opposed to immediately) consistent, although it does sync its state with its neighbours rather aggressively. Cluster membership in Horde is fully dynamic; nodes can be added and removed at any time and Horde will continue to operate as expected. `Horde.DynamicSupervisor` also uses a hash ring to limit any possible race conditions to times when cluster membership is changing.
1515

16-
`Horde.Registry` is API-compatible with Elixir's own Registry, although it does not yet support the `keys: :duplicate` option. For many use cases, it will be a drop-in replacement. `Horde.DynamicSupervisor` follows the API and behaviour of `DynamicSupervisor` as closely as possible. There will always be some difference between Horde and its standard library equivalents, if not in their APIs, then in their functionality. This is a necessary consequence of Horde's distributed nature.
16+
`Horde.Registry` and `Horde.DynamicSupervisor` are both designed to stay as close as possible to the API and behavior of their counterparts in Elixir’s standard library. For most scenarios, they can be used as drop-in replacements with minimal changes required.
17+
18+
Some differences do exist — such as the current lack of support for keys: :duplicate in Horde.Registry — but these divergences occur only when standard library behavior does not translate well to a system that is inherently distributed.
19+
20+
Our goal is to keep these differences to the absolute minimum necessary, while ensuring that Horde remains reliable, consistent, and optimized for distributed environments. See [documentation of Horde.DynamicSupervisor.start_link/1](https://hexdocs.pm/horde/Horde.DynamicSupervisor.html#start_link/1) for details.
1721

1822
## Running a single global process
1923

@@ -100,6 +104,7 @@ Horde.Cluster.set_members(:distributed_supervisor_1, [:distributed_supervisor_1,
100104
# supervisor_1, supervisor_2 and supervisor_3 will be joined in a single cluster.
101105
```
102106

107+
103108
# Other projects
104109

105110
Useful libraries that use or extend Horde functionalities.

lib/horde/dynamic_supervisor.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ defmodule Horde.DynamicSupervisor do
6060
| {:max_seconds, integer()}
6161
| {:extra_arguments, [term()]}
6262
| {:distribution_strategy, Horde.DistributionStrategy.t()}
63+
| {:proxy_message_ttl, integer() | :infinity}
6364
| {:shutdown, integer()}
6465
| {:members, [Horde.Cluster.member()] | :auto}
6566
| {:delta_crdt_options, [DeltaCrdt.crdt_option()]}
@@ -109,6 +110,7 @@ defmodule Horde.DynamicSupervisor do
109110
@doc """
110111
Works like `DynamicSupervisor.start_link/1`. Extra options are documented here:
111112
- `:distribution_strategy`, defaults to `Horde.UniformDistribution`, but more are available - see `Horde.DistributionStrategy`
113+
- `:proxy_message_ttl`, defaults to `:infinity`. Can be set to an integer indicating the maximum number of times a message may be forwarded in a Horde.DynamicSupervisor cluster. Leaving it at infinity is genrally fine when using a stable distribution strategy such as `Horde.UniformDistribution`. Setting a TTL is helpful when migrating to a different distribution_strategy, or when using an algorithm with random distribution such as `Horde.UniformRandomDistribution`, as it will prevent messages from looping (near) infinitely.
112114
"""
113115
def start_link(options) when is_list(options) do
114116
keys = [
@@ -119,6 +121,7 @@ defmodule Horde.DynamicSupervisor do
119121
:strategy,
120122
:distribution_strategy,
121123
:process_redistribution,
124+
:proxy_message_ttl,
122125
:members,
123126
:delta_crdt_options
124127
]
@@ -156,13 +159,21 @@ defmodule Horde.DynamicSupervisor do
156159
Horde.UniformDistribution
157160
)
158161

162+
proxy_message_ttl =
163+
Keyword.get(
164+
options,
165+
:proxy_message_ttl,
166+
:infinity
167+
)
168+
159169
flags = %{
160170
strategy: strategy,
161171
max_restarts: max_restarts,
162172
max_seconds: max_seconds,
163173
max_children: max_children,
164174
extra_arguments: extra_arguments,
165175
distribution_strategy: distribution_strategy,
176+
proxy_message_ttl: proxy_message_ttl,
166177
members: members,
167178
delta_crdt_options: delta_crdt_options(delta_crdt_options),
168179
process_redistribution: process_redistribution
@@ -194,6 +205,7 @@ defmodule Horde.DynamicSupervisor do
194205
period: flags.max_seconds,
195206
distribution_strategy: flags.distribution_strategy,
196207
process_redistribution: flags.process_redistribution,
208+
proxy_message_ttl: flags.proxy_message_ttl,
197209
members: members(flags.members, name)
198210
]},
199211
{Horde.ProcessesSupervisor,

lib/horde/dynamic_supervisor_impl.ex

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ defmodule Horde.DynamicSupervisorImpl do
2222
name_to_supervisor_ref: %{},
2323
shutting_down: false,
2424
supervisor_options: [],
25+
proxy_message_ttl: :infinity,
26+
proxy_operation_ttl: nil,
2527
distribution_strategy: Horde.UniformDistribution
2628

2729
def start_link(opts) do
@@ -50,7 +52,7 @@ defmodule Horde.DynamicSupervisorImpl do
5052
process_pid_to_id: new_table(:process_pid_to_id),
5153
name: name
5254
}
53-
|> Map.merge(Map.new(Keyword.take(options, [:distribution_strategy])))
55+
|> Map.merge(Map.new(Keyword.take(options, [:distribution_strategy, :proxy_message_ttl])))
5456

5557
state = set_own_node_status(state)
5658

@@ -145,15 +147,14 @@ defmodule Horde.DynamicSupervisorImpl do
145147
def handle_call({:start_child, _child_spec}, _from, %{shutting_down: true} = state),
146148
do: {:reply, {:error, {:shutting_down, "this node is shutting down."}}, state}
147149

148-
@big_number round(:math.pow(2, 128))
149-
150150
def handle_call({:start_child, child_spec} = msg, from, state) do
151151
this_name = fully_qualified_name(state.name)
152+
proxy_ttl_expired? = proxy_message_ttl(state, from) == 0
152153

153154
child_spec = randomize_child_id(child_spec)
154155

155156
case choose_node(child_spec, state) do
156-
{:ok, %{name: ^this_name}} ->
157+
{:ok, %{name: node_name}} when node_name == this_name or proxy_ttl_expired? ->
157158
{reply, new_state} = add_child(child_spec, state)
158159
{:reply, reply, new_state}
159160

@@ -277,14 +278,27 @@ defmodule Horde.DynamicSupervisorImpl do
277278
end
278279
end
279280

281+
@big_number round(:math.pow(2, 128))
282+
280283
defp randomize_child_id(child) do
281284
Map.put(child, :id, :rand.uniform(@big_number))
282285
end
283286

287+
defp proxy_to_node(_node_name, message, reply_to, %{proxy_operation_ttl: {reply_to, 0}} = state) do
288+
message_type = elem(message, 0)
289+
290+
{:reply,
291+
{:error, :proxy_operation_ttl_expired,
292+
"a proxied #{message_type} message was discard because its TTL expired"}, state}
293+
end
294+
284295
defp proxy_to_node(node_name, message, reply_to, state) do
285296
case Map.get(members(state), node_name) do
286297
%{status: :alive} ->
287-
send(node_name, {:proxy_operation, message, reply_to})
298+
case(proxy_message_ttl(state, reply_to)) do
299+
:infinity -> send(node_name, {:proxy_operation, message, reply_to})
300+
ttl -> send(node_name, {:proxy_operation, message, reply_to, ttl})
301+
end
288302
{:noreply, state}
289303

290304
_ ->
@@ -296,6 +310,12 @@ defmodule Horde.DynamicSupervisorImpl do
296310
end
297311
end
298312

313+
defp proxy_message_ttl(%{proxy_operation_ttl: {reply_to, ttl}} = _state, reply_to), do: ttl
314+
defp proxy_message_ttl(%{proxy_message_ttl: ttl} = _state, _reply_to), do: ttl
315+
316+
defp decrement_ttl(:infinity), do: :infinity
317+
defp decrement_ttl(n) when is_integer(n), do: n - 1
318+
299319
defp set_own_node_status(state, force \\ false)
300320

301321
defp set_own_node_status(state, false) do
@@ -336,6 +356,12 @@ defmodule Horde.DynamicSupervisorImpl do
336356
end
337357

338358
def handle_info({:proxy_operation, msg, reply_to}, state) do
359+
handle_info({:proxy_operation, msg, reply_to, :infinity}, state)
360+
end
361+
362+
def handle_info({:proxy_operation, msg, reply_to, ttl}, state) do
363+
state = %{state | proxy_operation_ttl: {reply_to, decrement_ttl(ttl)}}
364+
339365
case handle_call(msg, reply_to, state) do
340366
{:reply, reply, new_state} ->
341367
GenServer.reply(reply_to, reply)

test/dynamic_supervisor_test.exs

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule DynamicSupervisorTest do
22
require Logger
33
use ExUnit.Case
44

5-
defp do_setup() do
5+
defp do_setup(_context) do
66
n1 = :"horde_#{:rand.uniform(100_000_000)}"
77
n2 = :"horde_#{:rand.uniform(100_000_000)}"
88
n3 = :"horde_#{:rand.uniform(100_000_000)}"
@@ -143,16 +143,64 @@ defmodule DynamicSupervisorTest do
143143
]
144144
end
145145

146-
setup %{describe: describe} do
146+
defp proxy_ttl_spec(name, redist, %{distribution_strategy: dist_strat, proxy_message_ttl: ttl}) do
147+
%{
148+
id: name,
149+
start: {
150+
Horde.DynamicSupervisor,
151+
:start_link,
152+
[
153+
[
154+
name: name,
155+
strategy: :one_for_one,
156+
process_redistribution: redist,
157+
delta_crdt_options: [sync_interval: 20],
158+
distribution_strategy: dist_strat,
159+
proxy_message_ttl: ttl
160+
]
161+
]
162+
},
163+
restart: :transient
164+
}
165+
end
166+
167+
defp proxy_ttl_setup(context) do
168+
n1 = :horde_proxy_ttl_1
169+
n2 = :horde_proxy_ttl_2
170+
n3 = :horde_proxy_ttl_3
171+
172+
members = [{n1, Node.self()}, {n2, Node.self()}, {n3, Node.self()}]
173+
174+
# Spawn one supervisor and add processes to it, then spawn another and redistribute
175+
# the processes between the two.
176+
{:ok, _pid_n1} = start_supervised(proxy_ttl_spec(n1, :active, context))
177+
{:ok, _pid_n2} = start_supervised(proxy_ttl_spec(n2, :active, context))
178+
{:ok, _pid_n3} = start_supervised(proxy_ttl_spec(n3, :active, context))
179+
180+
Horde.Cluster.set_members(n1, [n1, n2, n3])
181+
182+
[
183+
n1: n1,
184+
n2: n2,
185+
n3: n3,
186+
names: [n1, n2, n3],
187+
members: members
188+
]
189+
end
190+
191+
setup %{describe: describe} = context do
147192
case describe do
148193
"redistribute" ->
149194
redistribute_setup()
150195

151196
"graceful shutdown" ->
152197
Logger.info("Skip setup for \"#{describe}\" context")
153198

199+
"proxy ttl" ->
200+
proxy_ttl_setup(context)
201+
154202
_ ->
155-
do_setup()
203+
do_setup(context)
156204
end
157205
end
158206

@@ -635,4 +683,99 @@ defmodule DynamicSupervisorTest do
635683
)
636684
end
637685
end
686+
687+
defp wait_until_cluster_synced(names) do
688+
Stream.iterate(false, fn _ ->
689+
Process.sleep(100)
690+
691+
1 ==
692+
names
693+
|> Enum.map(&Horde.Cluster.members(&1))
694+
|> Enum.uniq()
695+
|> length
696+
end)
697+
|> Enum.find(&(&1 == true))
698+
end
699+
700+
defmodule HotPotatoDistribution do
701+
@behaviour Horde.DistributionStrategy
702+
703+
@moduledoc """
704+
Distributes processes to any alive node except itself
705+
"""
706+
707+
def choose_node(_child_spec, members) do
708+
members
709+
|> Enum.filter(&match?(%{status: :alive}, &1))
710+
|> Enum.find(fn %{name: {name, _node}} -> Process.whereis(name) != self() end)
711+
|> case do
712+
nil ->
713+
{:error, :no_other_nodes_alive}
714+
715+
member ->
716+
{:ok, member}
717+
end
718+
end
719+
720+
def has_quorum?(_members), do: true
721+
end
722+
723+
defp async_simple_task(name) do
724+
exunit = self()
725+
task_spec = Task.child_spec(fn -> send(exunit, "child alive") end)
726+
727+
Task.async(fn ->
728+
Horde.DynamicSupervisor.start_child(name, task_spec)
729+
end)
730+
end
731+
732+
describe "proxy ttl" do
733+
@tag proxy_message_ttl: :infinity
734+
@tag distribution_strategy: HotPotatoDistribution
735+
test "message will proxy forever if nodes disagree on member", context do
736+
wait_until_cluster_synced(context.names)
737+
738+
assert %Task{} = task = async_simple_task(context.n1)
739+
740+
refute_receive "child alive", 200
741+
assert nil == Task.shutdown(task)
742+
end
743+
744+
@tag proxy_message_ttl: 10
745+
@tag distribution_strategy: HotPotatoDistribution
746+
test "start_child will add child on any node when proxy TTL expires", context do
747+
wait_until_cluster_synced(context.names)
748+
749+
assert {:ok, {:ok, _pid}} = Task.yield(async_simple_task(context.n1))
750+
assert_receive "child alive", 200
751+
end
752+
753+
@tag proxy_message_ttl: 1
754+
@tag distribution_strategy: Horde.UniformDistribution
755+
test "message will be picked up if nodes agree on member", context do
756+
wait_until_cluster_synced(context.names)
757+
758+
assert {:ok, {:ok, _pid}} = Task.yield(async_simple_task(context.n1))
759+
assert_receive "child alive", 200
760+
end
761+
762+
@tag proxy_message_ttl: 1
763+
@tag distribution_strategy: Horde.UniformRandomDistribution
764+
test "random distribution start_child with TTL of 1 will not bounce around", context do
765+
wait_until_cluster_synced(context.names)
766+
767+
process_count = 200
768+
769+
result_count =
770+
Enum.map(1..process_count, fn _ -> async_simple_task(context.n1) end)
771+
|> Task.yield_many(1_000)
772+
|> Enum.frequencies_by(fn {_task, {:ok, outcome}} -> elem(outcome, 0) end)
773+
774+
Enum.each(1..process_count, fn _n ->
775+
assert_receive "child alive", 200
776+
end)
777+
778+
assert %{ok: process_count} == result_count
779+
end
780+
end
638781
end

0 commit comments

Comments
 (0)