Skip to content

Commit 9bb4889

Browse files
edgurgelfilipecabaco
authored andcommitted
fix: ensure that messages partitions are created for self hosted environment. (#1383)
* chore: extract Janitor.MaintenanceTask * fix: run Janitor.MaintenanceTask on seeds * chore: fix containers set-up
1 parent 90b9a8e commit 9bb4889

File tree

6 files changed

+171
-48
lines changed

6 files changed

+171
-48
lines changed

lib/realtime/tenants/janitor.ex

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,7 @@ defmodule Realtime.Tenants.Janitor do
88

99
import Realtime.Logs
1010

11-
alias Realtime.Api.Tenant
12-
alias Realtime.Database
13-
alias Realtime.Messages
14-
alias Realtime.Tenants
15-
alias Realtime.Tenants.Migrations
11+
alias Realtime.Tenants.Janitor.MaintenanceTask
1612

1713
@type t :: %__MODULE__{
1814
timer: pos_integer() | nil,
@@ -77,7 +73,7 @@ defmodule Realtime.Tenants.Janitor do
7773
task =
7874
Task.Supervisor.async_nolink(
7975
__MODULE__.TaskSupervisor,
80-
fn -> perform_mantaince_tasks(chunks) end,
76+
fn -> perform_maintenance_tasks(chunks) end,
8177
ordered: false
8278
)
8379

@@ -120,20 +116,16 @@ defmodule Realtime.Tenants.Janitor do
120116

121117
defp timer(%{timer: timer}), do: timer
122118

123-
defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1)
119+
defp perform_maintenance_tasks(tenants), do: Enum.map(tenants, &perform_maintenance_task/1)
124120

125-
defp perform_mantaince_task(tenant_external_id) do
121+
defp perform_maintenance_task(tenant_external_id) do
126122
Logger.metadata(project: tenant_external_id, external_id: tenant_external_id)
127123
Logger.info("Janitor starting realtime.messages cleanup")
128124
:ets.delete(@table_name, tenant_external_id)
129125

130-
with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
131-
{:ok, conn} <- Database.connect(tenant, "realtime_janitor"),
132-
:ok <- Messages.delete_old_messages(conn),
133-
:ok <- Migrations.create_partitions(conn) do
126+
with :ok <- MaintenanceTask.run(tenant_external_id) do
134127
Logger.info("Janitor finished")
135128

136-
GenServer.stop(conn)
137129
:ok
138130
end
139131
end
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
defmodule Realtime.Tenants.Janitor.MaintenanceTask do
2+
@moduledoc """
3+
Perform maintenance on the messages table.
4+
* Delete old messages
5+
* Create new partitions
6+
"""
7+
8+
@spec run(String.t()) :: :ok | {:error, any}
9+
def run(tenant_external_id) do
10+
with %Realtime.Api.Tenant{} = tenant <- Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
11+
{:ok, conn} <- Realtime.Database.connect(tenant, "realtime_janitor"),
12+
:ok <- Realtime.Messages.delete_old_messages(conn),
13+
:ok <- Realtime.Tenants.Migrations.create_partitions(conn) do
14+
GenServer.stop(conn)
15+
:ok
16+
end
17+
end
18+
end

priv/repo/seeds.exs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,37 +9,45 @@ alias Realtime.Tenants
99
tenant_name = System.get_env("SELF_HOST_TENANT_NAME", "realtime-dev")
1010
default_db_host = "host.docker.internal"
1111

12-
Repo.transaction(fn ->
13-
case Repo.get_by(Tenant, external_id: tenant_name) do
14-
%Tenant{} = tenant -> Repo.delete!(tenant)
15-
nil -> {:ok, nil}
16-
end
12+
{:ok, tenant} =
13+
Repo.transaction(fn ->
14+
case Repo.get_by(Tenant, external_id: tenant_name) do
15+
%Tenant{} = tenant -> Repo.delete!(tenant)
16+
nil -> {:ok, nil}
17+
end
1718

18-
%Tenant{}
19-
|> Tenant.changeset(%{
20-
"name" => tenant_name,
21-
"external_id" => tenant_name,
22-
"jwt_secret" => System.get_env("API_JWT_SECRET", "super-secret-jwt-token-with-at-least-32-characters-long"),
23-
"jwt_jwks" => System.get_env("API_JWT_JWKS") |> then(fn v -> if v, do: Jason.decode!(v) end),
24-
"extensions" => [
25-
%{
26-
"type" => "postgres_cdc_rls",
27-
"settings" => %{
28-
"db_name" => System.get_env("DB_NAME", "postgres"),
29-
"db_host" => System.get_env("DB_HOST", default_db_host),
30-
"db_user" => System.get_env("DB_USER", "supabase_admin"),
31-
"db_password" => System.get_env("DB_PASSWORD", "postgres"),
32-
"db_port" => System.get_env("DB_PORT", "5433"),
33-
"region" => "us-east-1",
34-
"poll_interval_ms" => 100,
35-
"poll_max_record_bytes" => 1_048_576,
36-
"ssl_enforced" => false
19+
%Tenant{}
20+
|> Tenant.changeset(%{
21+
"name" => tenant_name,
22+
"external_id" => tenant_name,
23+
"jwt_secret" => System.get_env("API_JWT_SECRET", "super-secret-jwt-token-with-at-least-32-characters-long"),
24+
"jwt_jwks" => System.get_env("API_JWT_JWKS") |> then(fn v -> if v, do: Jason.decode!(v) end),
25+
"extensions" => [
26+
%{
27+
"type" => "postgres_cdc_rls",
28+
"settings" => %{
29+
"db_name" => System.get_env("DB_NAME", "postgres"),
30+
"db_host" => System.get_env("DB_HOST", default_db_host),
31+
"db_user" => System.get_env("DB_USER", "supabase_admin"),
32+
"db_password" => System.get_env("DB_PASSWORD", "postgres"),
33+
"db_port" => System.get_env("DB_PORT", "5433"),
34+
"region" => "us-east-1",
35+
"poll_interval_ms" => 100,
36+
"poll_max_record_bytes" => 1_048_576,
37+
"ssl_enforced" => false
38+
}
3739
}
38-
}
39-
]
40-
})
41-
|> Repo.insert!()
40+
]
41+
})
42+
|> Repo.insert!()
43+
end)
4244

43-
tenant = Tenants.get_tenant_by_external_id(tenant_name)
44-
Tenants.Migrations.run_migrations(tenant)
45-
end)
45+
tenant = Tenants.get_tenant_by_external_id(tenant_name)
46+
47+
with res when res in [:noop, :ok] <- Tenants.Migrations.run_migrations(tenant),
48+
:ok <- Tenants.Janitor.MaintenanceTask.run(tenant.external_id) do
49+
Logger.info("Tenant set-up successfully")
50+
else
51+
error ->
52+
Logger.info("Failed to set-up tenant: #{inspect(error)}")
53+
end
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do
2+
use Realtime.DataCase, async: true
3+
4+
alias Realtime.Tenants.Janitor.MaintenanceTask
5+
alias Realtime.Api.Message
6+
alias Realtime.Database
7+
alias Realtime.Repo
8+
9+
setup do
10+
tenant = Containers.checkout_tenant(run_migrations: true)
11+
# Warm cache to avoid Cachex and Ecto.Sandbox ownership issues
12+
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})
13+
14+
%{tenant: tenant}
15+
end
16+
17+
test "cleans messages older than 72 hours and creates partitions", %{tenant: tenant} do
18+
utc_now = NaiveDateTime.utc_now()
19+
limit = NaiveDateTime.add(utc_now, -72, :hour)
20+
21+
messages =
22+
for days <- -5..0 do
23+
inserted_at = NaiveDateTime.add(utc_now, days, :day)
24+
message_fixture(tenant, %{inserted_at: inserted_at})
25+
end
26+
|> MapSet.new()
27+
28+
to_keep =
29+
messages
30+
|> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt))
31+
|> MapSet.new()
32+
33+
assert MaintenanceTask.run(tenant.external_id) == :ok
34+
35+
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
36+
{:ok, res} = Repo.all(conn, from(m in Message), Message)
37+
38+
verify_partitions(conn)
39+
40+
current = MapSet.new(res)
41+
42+
assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
43+
end
44+
45+
test "exits if fails to remove old messages" do
46+
extensions = [
47+
%{
48+
"type" => "postgres_cdc_rls",
49+
"settings" => %{
50+
"db_host" => "127.0.0.1",
51+
"db_name" => "postgres",
52+
"db_user" => "supabase_admin",
53+
"db_password" => "postgres",
54+
"db_port" => "11111",
55+
"poll_interval" => 100,
56+
"poll_max_changes" => 100,
57+
"poll_max_record_bytes" => 1_048_576,
58+
"region" => "us-east-1",
59+
"ssl_enforced" => false
60+
}
61+
}
62+
]
63+
64+
tenant = tenant_fixture(%{extensions: extensions})
65+
# Warm cache to avoid Cachex and Ecto.Sandbox ownership issues
66+
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})
67+
68+
Process.flag(:trap_exit, true)
69+
70+
t =
71+
Task.async(fn ->
72+
MaintenanceTask.run(tenant.external_id)
73+
end)
74+
75+
pid = t.pid
76+
ref = t.ref
77+
assert_receive {:EXIT, ^pid, :killed}
78+
assert_receive {:DOWN, ^ref, :process, ^pid, :killed}
79+
end
80+
81+
defp verify_partitions(conn) do
82+
today = Date.utc_today()
83+
yesterday = Date.add(today, -1)
84+
future = Date.add(today, 3)
85+
dates = Date.range(yesterday, future)
86+
87+
%{rows: rows} =
88+
Postgrex.query!(
89+
conn,
90+
"SELECT tablename from pg_catalog.pg_tables where schemaname = 'realtime' and tablename like 'messages_%'",
91+
[]
92+
)
93+
94+
partitions = MapSet.new(rows, fn [name] -> name end)
95+
96+
expected_names =
97+
MapSet.new(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end)
98+
99+
assert MapSet.equal?(partitions, expected_names)
100+
end
101+
end

test/realtime/tenants/janitor_test.exs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,11 @@ defmodule Realtime.Tenants.JanitorTest do
173173
[]
174174
)
175175

176-
partitions = Enum.map(rows, fn [name] -> name end)
177-
expected_names = Enum.map(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end)
178-
assert expected_names == partitions
176+
partitions = MapSet.new(rows, fn [name] -> name end)
177+
178+
expected_names =
179+
MapSet.new(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end)
180+
181+
assert MapSet.equal?(partitions, expected_names)
179182
end
180183
end

test/support/containers.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ defmodule Containers do
177177
end
178178

179179
defp existing_containers(pattern) do
180-
{containers, 0} = System.cmd("docker", ["ps", "-a", "--format", "{{json .}}", "--filter", "name=#{pattern}"])
180+
{containers, 0} = System.cmd("docker", ["ps", "--format", "{{json .}}", "--filter", "name=#{pattern}"])
181181

182182
containers
183183
|> String.split("\n", trim: true)
@@ -245,6 +245,7 @@ defmodule Containers do
245245
System.cmd("docker", [
246246
"run",
247247
"-d",
248+
"--rm",
248249
"--name",
249250
name,
250251
"-e",

0 commit comments

Comments
 (0)