Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 83 additions & 37 deletions lib/hammer/atomic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,34 @@ defmodule Hammer.Atomic do
This provides fast, atomic counter operations without the overhead of ETS or process messaging.
Requires Erlang/OTP 21.2 or later.

defmodule MyApp.RateLimit do
use Hammer, backend: :atomic
end

MyApp.RateLimit.start_link(clean_period: :timer.minutes(1))

# Allow 10 requests per second
MyApp.RateLimit.hit("user_123", 1000, 10)

Runtime configuration:
- `:clean_period` - (in milliseconds) period to clean up expired entries, defaults to 1 minute
- `:key_older_than` - (in milliseconds) maximum age for entries before they are cleaned up, defaults to 24 hours
- `:algorithm` - the rate limiting algorithm to use, one of: `:fix_window`, `:leaky_bucket`, `:token_bucket`. Defaults to `:fix_window`
- `:before_clean` - optional callback invoked with expired entries before they are deleted.
Accepts a function `(algorithm :: atom(), entries :: [map()]) -> any()` or an MFA tuple
`{module, function, extra_args}`. Each entry is a map with `:key`, `:value`, and `:expired_at` (ms).
If the callback raises, entries are still deleted and a warning is logged.

MyApp.RateLimit.start_link(
clean_period: :timer.minutes(1),
key_older_than: :timer.hours(24),
before_clean: fn algorithm, entries ->
Enum.each(entries, fn entry ->
MyApp.Telemetry.emit_expired(algorithm, entry)
end)
end
)

The atomic backend supports the following algorithms:

- `:fix_window` - Fixed window rate limiting (default)
Expand All @@ -23,14 +51,16 @@ defmodule Hammer.Atomic do
@type start_option ::
{:clean_period, pos_integer()}
| {:key_older_than, pos_integer()}
| {:before_clean, Hammer.CleanUtils.before_clean()}
| GenServer.option()

@type config :: %{
table: atom(),
table_opts: list(),
clean_period: pos_integer(),
key_older_than: pos_integer(),
algorithm: module()
algorithm_module: module(),
before_clean: Hammer.CleanUtils.before_clean() | nil
}

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
Expand Down Expand Up @@ -129,6 +159,10 @@ defmodule Hammer.Atomic do
Options:
- `:clean_period` - How often to run cleanup (ms). Default 1 minute.
- `:key_older_than` - Max age for entries (ms). Default 24 hours.
- `:before_clean` - Optional callback invoked with expired entries before deletion.
Accepts a function `(algorithm :: atom(), entries :: [map()]) -> any()` or an MFA tuple
`{module, function, extra_args}`. Each entry is a map with `:key`, `:value`, and `:expired_at` (ms).
If the callback raises, entries are still deleted and a warning is logged.
"""
@spec start_link([start_option]) :: GenServer.on_start()
def start_link(opts) do
Expand All @@ -138,6 +172,7 @@ defmodule Hammer.Atomic do
{table, opts} = Keyword.pop!(opts, :table)
{algorithm_module, opts} = Keyword.pop!(opts, :algorithm_module)
{key_older_than, opts} = Keyword.pop(opts, :key_older_than, :timer.hours(24))
{before_clean, opts} = Keyword.pop(opts, :before_clean)

case opts do
[] ->
Expand All @@ -154,7 +189,8 @@ defmodule Hammer.Atomic do
table_opts: algorithm_module.ets_opts(),
clean_period: clean_period,
key_older_than: key_older_than,
algorithm_module: algorithm_module
algorithm_module: algorithm_module,
before_clean: before_clean
}

GenServer.start_link(__MODULE__, config, gen_opts)
Expand All @@ -179,60 +215,70 @@ defmodule Hammer.Atomic do

@impl GenServer
def handle_info(:clean, config) do
clean(config)

schedule(config.clean_period)
{:noreply, config}
end

defp clean(config) do
case config.algorithm_module do
Hammer.Atomic.FixWindow -> clean_fix_window(config)
_ -> clean_bucket(config)
end

schedule(config.clean_period)
{:noreply, config}
end

# FixWindow stores expires_at in milliseconds in slot 2
defp clean_fix_window(config) do
now = now()

:ets.foldl(
fn {_key, atomic} = term, deleted ->
expires_at = :atomics.get(atomic, 2)

if now - expires_at > config.key_older_than do
:ets.delete_object(config.table, term)
deleted + 1
else
deleted
end
end,
0,
config.table
)
algo_module = config.algorithm_module

expired_terms =
:ets.foldl(
fn {_key, atomic} = term, acc ->
expires_at = :atomics.get(atomic, 2)
if now - expires_at > config.key_older_than, do: [term | acc], else: acc
end,
[],
config.table
)

maybe_invoke_before_clean(config.before_clean, algo_module, expired_terms)
Enum.each(expired_terms, fn term -> :ets.delete_object(config.table, term) end)
end

# TokenBucket and LeakyBucket store last_update in seconds in slot 2
defp clean_bucket(config) do
now = System.system_time(:second)
older_than = now - div(config.key_older_than, 1000)
algo_module = config.algorithm_module

expired_terms =
:ets.foldl(
fn {_key, atomic} = term, acc ->
last_update = :atomics.get(atomic, 2)
if last_update < older_than, do: [term | acc], else: acc
end,
[],
config.table
)

maybe_invoke_before_clean(config.before_clean, algo_module, expired_terms)
Enum.each(expired_terms, fn term -> :ets.delete_object(config.table, term) end)
end

:ets.foldl(
fn {_key, atomic} = term, deleted ->
last_update = :atomics.get(atomic, 2)
defp maybe_invoke_before_clean(nil, _algo_module, _expired_terms), do: :ok
defp maybe_invoke_before_clean(_callback, _algo_module, []), do: :ok

if last_update < older_than do
:ets.delete_object(config.table, term)
deleted + 1
else
deleted
end
end,
0,
config.table
)
defp maybe_invoke_before_clean(callback, algo_module, expired_terms) do
entries =
Enum.map(expired_terms, fn {key, atomic} ->
algo_module.normalize_entry(key, atomic)
end)

Hammer.CleanUtils.invoke_before_clean(callback, algorithm_name(algo_module), entries)
end

defp algorithm_name(Hammer.Atomic.FixWindow), do: :fix_window
defp algorithm_name(Hammer.Atomic.TokenBucket), do: :token_bucket
defp algorithm_name(Hammer.Atomic.LeakyBucket), do: :leaky_bucket

defp schedule(clean_period) do
Process.send_after(self(), :clean, clean_period)
end
Expand Down
8 changes: 8 additions & 0 deletions lib/hammer/atomic/fix_window.ex
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,12 @@ defmodule Hammer.Atomic.FixWindow do
[] -> 0
end
end

@doc false
@spec normalize_entry(key :: term(), atomic :: reference()) :: map()
def normalize_entry(key, atomic) do
count = :atomics.get(atomic, 1)
expires_at = :atomics.get(atomic, 2)
%{key: key, value: count, expired_at: expires_at}
end
end
9 changes: 9 additions & 0 deletions lib/hammer/atomic/leaky_bucket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,13 @@ defmodule Hammer.Atomic.LeakyBucket do
0
end
end

@doc false
@spec normalize_entry(key :: term(), atomic :: reference()) :: map()
def normalize_entry(key, atomic) do
packed = :atomics.get(atomic, 1)
{_timestamp, fill} = unpack(packed)
last_update = :atomics.get(atomic, 2)
%{key: key, value: fill, expired_at: last_update * 1000}
end
end
9 changes: 9 additions & 0 deletions lib/hammer/atomic/token_bucket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,13 @@ defmodule Hammer.Atomic.TokenBucket do
0
end
end

@doc false
@spec normalize_entry(key :: term(), atomic :: reference()) :: map()
def normalize_entry(key, atomic) do
packed = :atomics.get(atomic, 1)
{_timestamp, fill} = unpack(packed)
last_update = :atomics.get(atomic, 2)
%{key: key, value: fill, expired_at: last_update * 1000}
end
end
28 changes: 28 additions & 0 deletions lib/hammer/clean_utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Hammer.CleanUtils do
@moduledoc false

require Logger

@type before_clean :: (atom(), [map()] -> any()) | {module(), atom(), list()}

@doc false
def invoke_before_clean(callback, algorithm, entries) do
case callback do
{mod, fun, extra_args} -> apply(mod, fun, [algorithm, entries | extra_args])
fun when is_function(fun, 2) -> fun.(algorithm, entries)
end
rescue
e ->
Logger.warning(
"before_clean callback raised: #{Exception.format(:error, e, __STACKTRACE__)}"
)
catch
kind, reason ->
Logger.warning("before_clean callback failed: #{inspect({kind, reason})}")
end

@doc false
def delete_expired(table, expired) do
Enum.each(expired, fn entry -> :ets.delete_object(table, entry) end)
end
end
50 changes: 47 additions & 3 deletions lib/hammer/ets.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@ defmodule Hammer.ETS do
- `:clean_period` - (in milliseconds) period to clean up expired entries, defaults to 1 minute
- `:key_older_than` - (in milliseconds) maximum age for entries before they are cleaned up, defaults to 1 hour
- `:algorithm` - the rate limiting algorithm to use, one of: `:fix_window`, `:sliding_window`, `:leaky_bucket`, `:token_bucket`. Defaults to `:fix_window`
- `:before_clean` - optional callback invoked with expired entries before they are deleted.
Accepts a function `(algorithm :: atom(), entries :: [map()]) -> any()` or an MFA tuple
`{module, function, extra_args}`. Each entry is a map with `:key`, `:value`, and `:expired_at` (ms).
If the callback raises, entries are still deleted and a warning is logged.

MyApp.RateLimit.start_link(
clean_period: :timer.minutes(1),
before_clean: fn algorithm, entries ->
Enum.each(entries, fn entry ->
MyApp.Telemetry.emit_expired(algorithm, entry)
end)
end
)

The ETS backend supports the following algorithms:
- `:fix_window` - Fixed window rate limiting (default)
Expand All @@ -37,14 +50,16 @@ defmodule Hammer.ETS do
| {:table, atom()}
| {:algorithm, module()}
| {:key_older_than, pos_integer()}
| {:before_clean, Hammer.CleanUtils.before_clean()}
| GenServer.option()

@type config :: %{
table: atom(),
table_opts: list(),
clean_period: pos_integer(),
key_older_than: pos_integer(),
algorithm: module()
algorithm: module(),
before_clean: Hammer.CleanUtils.before_clean() | nil
}

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
Expand Down Expand Up @@ -144,6 +159,10 @@ defmodule Hammer.ETS do
- `:clean_period` - How often to run the cleanup process (in milliseconds). Defaults to 1 minute.
- `:key_older_than` - Optional maximum age for bucket entries (in milliseconds). Defaults to 24 hours.
Entries older than this will be removed during cleanup.
- `:before_clean` - Optional callback invoked with expired entries before deletion.
Accepts a function `(algorithm :: atom(), entries :: [map()]) -> any()` or an MFA tuple
`{module, function, extra_args}`. Each entry is a map with `:key`, `:value`, and `:expired_at` (ms).
If the callback raises, entries are still deleted and a warning is logged.
- optional `:debug`, `:spawn_opts`, and `:hibernate_after` GenServer options
"""
@spec start_link([start_option]) :: GenServer.on_start()
Expand All @@ -154,6 +173,7 @@ defmodule Hammer.ETS do
{table, opts} = Keyword.pop!(opts, :table)
{algorithm, opts} = Keyword.pop!(opts, :algorithm)
{key_older_than, opts} = Keyword.pop(opts, :key_older_than, :timer.hours(24))
{before_clean, opts} = Keyword.pop(opts, :before_clean)

case opts do
[] ->
Expand All @@ -170,7 +190,8 @@ defmodule Hammer.ETS do
table_opts: algorithm.ets_opts(),
clean_period: clean_period,
key_older_than: key_older_than,
algorithm: algorithm
algorithm: algorithm,
before_clean: before_clean
}

GenServer.start_link(__MODULE__, config, gen_opts)
Expand Down Expand Up @@ -198,11 +219,34 @@ defmodule Hammer.ETS do
@impl GenServer
def handle_info(:clean, config) do
algorithm = config.algorithm
algorithm.clean(config)

if config.before_clean do
expired = algorithm.select_expired(config)

if expired != [] do
entries = algorithm.normalize_expired(expired)

Hammer.CleanUtils.invoke_before_clean(
config.before_clean,
algorithm_name(algorithm),
entries
)
end

Hammer.CleanUtils.delete_expired(config.table, expired)
else
algorithm.clean(config)
end

schedule(config.clean_period)
{:noreply, config}
end

defp algorithm_name(Hammer.ETS.FixWindow), do: :fix_window
defp algorithm_name(Hammer.ETS.SlidingWindow), do: :sliding_window
defp algorithm_name(Hammer.ETS.TokenBucket), do: :token_bucket
defp algorithm_name(Hammer.ETS.LeakyBucket), do: :leaky_bucket

defp schedule(clean_period) do
Process.send_after(self(), :clean, clean_period)
end
Expand Down
15 changes: 15 additions & 0 deletions lib/hammer/ets/fix_window.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,19 @@ defmodule Hammer.ETS.FixWindow do
match_spec = [{{{:_, :_}, :_, :"$1"}, [], [{:<, :"$1", {:const, ETS.now()}}]}]
:ets.select_delete(table, match_spec)
end

@doc false
@spec select_expired(config :: ETS.config()) :: list()
def select_expired(config) do
match_spec = [{{{:_, :_}, :_, :"$1"}, [{:<, :"$1", {:const, ETS.now()}}], [:"$_"]}]
:ets.select(config.table, match_spec)
end

@doc false
@spec normalize_expired(expired :: list()) :: list(map())
def normalize_expired(expired) do
Enum.map(expired, fn {{key, _window}, count, expires_at} ->
%{key: key, value: count, expired_at: expires_at}
end)
end
end
Loading