Skip to content

fix: ensure that messages partitions are created for self hosted environment. #1383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ defmodule Realtime.Tenants.Janitor do

import Realtime.Logs

alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.Messages
alias Realtime.Tenants
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Janitor.MaintenanceTask

@type t :: %__MODULE__{
timer: pos_integer() | nil,
Expand Down Expand Up @@ -77,7 +73,7 @@ defmodule Realtime.Tenants.Janitor do
task =
Task.Supervisor.async_nolink(
__MODULE__.TaskSupervisor,
fn -> perform_mantaince_tasks(chunks) end,
fn -> perform_maintenance_tasks(chunks) end,
ordered: false
)

Expand Down Expand Up @@ -120,20 +116,16 @@ defmodule Realtime.Tenants.Janitor do

defp timer(%{timer: timer}), do: timer

defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1)
defp perform_maintenance_tasks(tenants), do: Enum.map(tenants, &perform_maintenance_task/1)

defp perform_mantaince_task(tenant_external_id) do
defp perform_maintenance_task(tenant_external_id) do
Logger.metadata(project: tenant_external_id, external_id: tenant_external_id)
Logger.info("Janitor starting realtime.messages cleanup")
:ets.delete(@table_name, tenant_external_id)

with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
{:ok, conn} <- Database.connect(tenant, "realtime_janitor"),
:ok <- Messages.delete_old_messages(conn),
:ok <- Migrations.create_partitions(conn) do
with :ok <- MaintenanceTask.run(tenant_external_id) do
Logger.info("Janitor finished")

GenServer.stop(conn)
:ok
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/realtime/tenants/janitor/maintenance_task.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Realtime.Tenants.Janitor.MaintenanceTask do
@moduledoc """
Perform maintenance on the messages table.
* Delete old messages
* Create new partitions
"""

@spec run(String.t()) :: :ok | {:error, any}
def run(tenant_external_id) do
with %Realtime.Api.Tenant{} = tenant <- Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
{:ok, conn} <- Realtime.Database.connect(tenant, "realtime_janitor"),
:ok <- Realtime.Messages.delete_old_messages(conn),
:ok <- Realtime.Tenants.Migrations.create_partitions(conn) do
GenServer.stop(conn)
:ok
end
end
end
70 changes: 39 additions & 31 deletions priv/repo/seeds.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,45 @@ alias Realtime.Tenants
tenant_name = System.get_env("SELF_HOST_TENANT_NAME", "realtime-dev")
default_db_host = "host.docker.internal"

Repo.transaction(fn ->
case Repo.get_by(Tenant, external_id: tenant_name) do
%Tenant{} = tenant -> Repo.delete!(tenant)
nil -> {:ok, nil}
end
{:ok, tenant} =
Repo.transaction(fn ->
case Repo.get_by(Tenant, external_id: tenant_name) do
%Tenant{} = tenant -> Repo.delete!(tenant)
nil -> {:ok, nil}
end

%Tenant{}
|> Tenant.changeset(%{
"name" => tenant_name,
"external_id" => tenant_name,
"jwt_secret" => System.get_env("API_JWT_SECRET", "super-secret-jwt-token-with-at-least-32-characters-long"),
"jwt_jwks" => System.get_env("API_JWT_JWKS") |> then(fn v -> if v, do: Jason.decode!(v) end),
"extensions" => [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_name" => System.get_env("DB_NAME", "postgres"),
"db_host" => System.get_env("DB_HOST", default_db_host),
"db_user" => System.get_env("DB_USER", "supabase_admin"),
"db_password" => System.get_env("DB_PASSWORD", "postgres"),
"db_port" => System.get_env("DB_PORT", "5433"),
"region" => "us-east-1",
"poll_interval_ms" => 100,
"poll_max_record_bytes" => 1_048_576,
"ssl_enforced" => false
%Tenant{}
|> Tenant.changeset(%{
"name" => tenant_name,
"external_id" => tenant_name,
"jwt_secret" => System.get_env("API_JWT_SECRET", "super-secret-jwt-token-with-at-least-32-characters-long"),
"jwt_jwks" => System.get_env("API_JWT_JWKS") |> then(fn v -> if v, do: Jason.decode!(v) end),
"extensions" => [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_name" => System.get_env("DB_NAME", "postgres"),
"db_host" => System.get_env("DB_HOST", default_db_host),
"db_user" => System.get_env("DB_USER", "supabase_admin"),
"db_password" => System.get_env("DB_PASSWORD", "postgres"),
"db_port" => System.get_env("DB_PORT", "5433"),
"region" => "us-east-1",
"poll_interval_ms" => 100,
"poll_max_record_bytes" => 1_048_576,
"ssl_enforced" => false
}
}
}
]
})
|> Repo.insert!()
]
})
|> Repo.insert!()
end)

tenant = Tenants.get_tenant_by_external_id(tenant_name)
Tenants.Migrations.run_migrations(tenant)
end)
tenant = Tenants.get_tenant_by_external_id(tenant_name)

with res when res in [:noop, :ok] <- Tenants.Migrations.run_migrations(tenant),
:ok <- Tenants.Janitor.MaintenanceTask.run(tenant.external_id) do
Logger.info("Tenant set-up successfully")
else
error ->
Logger.info("Failed to set-up tenant: #{inspect(error)}")
end
98 changes: 98 additions & 0 deletions test/realtime/tenants/janitor/maintenance_task_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do
use Realtime.DataCase, async: true

alias Realtime.Tenants.Janitor.MaintenanceTask
alias Realtime.Api.Message
alias Realtime.Database
alias Realtime.Repo

setup do
tenant = Containers.checkout_tenant(run_migrations: true)
# Warm cache to avoid Cachex and Ecto.Sandbox ownership issues
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})

%{tenant: tenant}
end

test "cleans messages older than 72 hours and creates partitions", %{ tenant: tenant } do
utc_now = NaiveDateTime.utc_now()
limit = NaiveDateTime.add(utc_now, -72, :hour)

messages =
for days <- -5..0 do
inserted_at = NaiveDateTime.add(utc_now, days, :day)
message_fixture(tenant, %{inserted_at: inserted_at})
end
|> MapSet.new()

to_keep =
messages
|> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt))
|> MapSet.new()

assert MaintenanceTask.run(tenant.external_id) == :ok

{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
{:ok, res} = Repo.all(conn, from(m in Message), Message)

verify_partitions(conn)

current = MapSet.new(res)

assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
end

test "exits if fails to remove old messages" do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "11111",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false
}
}
]

tenant = tenant_fixture(%{extensions: extensions})
# Warm cache to avoid Cachex and Ecto.Sandbox ownership issues
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})

Process.flag(:trap_exit, true)
t = Task.async(fn ->
MaintenanceTask.run(tenant.external_id)
end)
pid = t.pid
ref = t.ref
assert_receive {:EXIT, ^pid, :killed}
assert_receive {:DOWN, ^ref, :process, ^pid, :killed}
end

defp verify_partitions(conn) do
today = Date.utc_today()
yesterday = Date.add(today, -1)
future = Date.add(today, 3)
dates = Date.range(yesterday, future)

%{rows: rows} =
Postgrex.query!(
conn,
"SELECT tablename from pg_catalog.pg_tables where schemaname = 'realtime' and tablename like 'messages_%'",
[]
)

partitions = MapSet.new(rows, fn [name] -> name end)

expected_names =
MapSet.new(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end)

assert MapSet.equal?(partitions, expected_names)
end
end
9 changes: 6 additions & 3 deletions test/realtime/tenants/janitor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ defmodule Realtime.Tenants.JanitorTest do
[]
)

partitions = Enum.map(rows, fn [name] -> name end)
expected_names = Enum.map(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end)
assert expected_names == partitions
partitions = MapSet.new(rows, fn [name] -> name end)

expected_names =
MapSet.new(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end)

assert MapSet.equal?(partitions, expected_names)
end
end
3 changes: 2 additions & 1 deletion test/support/containers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ defmodule Containers do
end

defp existing_containers(pattern) do
{containers, 0} = System.cmd("docker", ["ps", "-a", "--format", "{{json .}}", "--filter", "name=#{pattern}"])
{containers, 0} = System.cmd("docker", ["ps", "--format", "{{json .}}", "--filter", "name=#{pattern}"])

containers
|> String.split("\n", trim: true)
Expand Down Expand Up @@ -233,6 +233,7 @@ defmodule Containers do
System.cmd("docker", [
"run",
"-d",
"--rm",
"--name",
name,
"-e",
Expand Down
Loading