diff --git a/lib/realtime/tenants/janitor.ex b/lib/realtime/tenants/janitor.ex index 1a41897f7..ec278aeeb 100644 --- a/lib/realtime/tenants/janitor.ex +++ b/lib/realtime/tenants/janitor.ex @@ -8,11 +8,7 @@ defmodule Realtime.Tenants.Janitor do import Realtime.Logs - alias Realtime.Api.Tenant - alias Realtime.Database - alias Realtime.Messages - alias Realtime.Tenants - alias Realtime.Tenants.Migrations + alias Realtime.Tenants.Janitor.MaintenanceTask @type t :: %__MODULE__{ timer: pos_integer() | nil, @@ -77,7 +73,7 @@ defmodule Realtime.Tenants.Janitor do task = Task.Supervisor.async_nolink( __MODULE__.TaskSupervisor, - fn -> perform_mantaince_tasks(chunks) end, + fn -> perform_maintenance_tasks(chunks) end, ordered: false ) @@ -120,20 +116,16 @@ defmodule Realtime.Tenants.Janitor do defp timer(%{timer: timer}), do: timer - defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1) + defp perform_maintenance_tasks(tenants), do: Enum.map(tenants, &perform_maintenance_task/1) - defp perform_mantaince_task(tenant_external_id) do + defp perform_maintenance_task(tenant_external_id) do Logger.metadata(project: tenant_external_id, external_id: tenant_external_id) Logger.info("Janitor starting realtime.messages cleanup") :ets.delete(@table_name, tenant_external_id) - with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id), - {:ok, conn} <- Database.connect(tenant, "realtime_janitor"), - :ok <- Messages.delete_old_messages(conn), - :ok <- Migrations.create_partitions(conn) do + with :ok <- MaintenanceTask.run(tenant_external_id) do Logger.info("Janitor finished") - GenServer.stop(conn) :ok end end diff --git a/lib/realtime/tenants/janitor/maintenance_task.ex b/lib/realtime/tenants/janitor/maintenance_task.ex new file mode 100644 index 000000000..4a01432c5 --- /dev/null +++ b/lib/realtime/tenants/janitor/maintenance_task.ex @@ -0,0 +1,18 @@ +defmodule Realtime.Tenants.Janitor.MaintenanceTask do + @moduledoc """ + Perform maintenance on the messages table. + * Delete old messages + * Create new partitions + """ + + @spec run(String.t()) :: :ok | {:error, any} + def run(tenant_external_id) do + with %Realtime.Api.Tenant{} = tenant <- Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_external_id), + {:ok, conn} <- Realtime.Database.connect(tenant, "realtime_janitor"), + :ok <- Realtime.Messages.delete_old_messages(conn), + :ok <- Realtime.Tenants.Migrations.create_partitions(conn) do + GenServer.stop(conn) + :ok + end + end +end diff --git a/mix.exs b/mix.exs index 8102543bb..8c651e5a5 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.55.4", + version: "2.55.5", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs index 78942dccd..95715f77b 100644 --- a/priv/repo/seeds.exs +++ b/priv/repo/seeds.exs @@ -9,37 +9,45 @@ alias Realtime.Tenants tenant_name = System.get_env("SELF_HOST_TENANT_NAME", "realtime-dev") default_db_host = "host.docker.internal" -Repo.transaction(fn -> - case Repo.get_by(Tenant, external_id: tenant_name) do - %Tenant{} = tenant -> Repo.delete!(tenant) - nil -> {:ok, nil} - end +{:ok, tenant} = + Repo.transaction(fn -> + case Repo.get_by(Tenant, external_id: tenant_name) do + %Tenant{} = tenant -> Repo.delete!(tenant) + nil -> {:ok, nil} + end - %Tenant{} - |> Tenant.changeset(%{ - "name" => tenant_name, - "external_id" => tenant_name, - "jwt_secret" => System.get_env("API_JWT_SECRET", "super-secret-jwt-token-with-at-least-32-characters-long"), - "jwt_jwks" => System.get_env("API_JWT_JWKS") |> then(fn v -> if v, do: Jason.decode!(v) end), - "extensions" => [ - %{ - "type" => "postgres_cdc_rls", - "settings" => %{ - "db_name" => System.get_env("DB_NAME", "postgres"), - "db_host" => System.get_env("DB_HOST", default_db_host), - "db_user" => System.get_env("DB_USER", "supabase_admin"), - "db_password" => System.get_env("DB_PASSWORD", "postgres"), - "db_port" => System.get_env("DB_PORT", "5433"), - "region" => "us-east-1", - "poll_interval_ms" => 100, - "poll_max_record_bytes" => 1_048_576, - "ssl_enforced" => false + %Tenant{} + |> Tenant.changeset(%{ + "name" => tenant_name, + "external_id" => tenant_name, + "jwt_secret" => System.get_env("API_JWT_SECRET", "super-secret-jwt-token-with-at-least-32-characters-long"), + "jwt_jwks" => System.get_env("API_JWT_JWKS") |> then(fn v -> if v, do: Jason.decode!(v) end), + "extensions" => [ + %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_name" => System.get_env("DB_NAME", "postgres"), + "db_host" => System.get_env("DB_HOST", default_db_host), + "db_user" => System.get_env("DB_USER", "supabase_admin"), + "db_password" => System.get_env("DB_PASSWORD", "postgres"), + "db_port" => System.get_env("DB_PORT", "5433"), + "region" => "us-east-1", + "poll_interval_ms" => 100, + "poll_max_record_bytes" => 1_048_576, + "ssl_enforced" => false + } } - } - ] - }) - |> Repo.insert!() + ] + }) + |> Repo.insert!() + end) - tenant = Tenants.get_tenant_by_external_id(tenant_name) - Tenants.Migrations.run_migrations(tenant) -end) +tenant = Tenants.get_tenant_by_external_id(tenant_name) + +with res when res in [:noop, :ok] <- Tenants.Migrations.run_migrations(tenant), + :ok <- Tenants.Janitor.MaintenanceTask.run(tenant.external_id) do + Logger.info("Tenant set-up successfully") +else + error -> + Logger.info("Failed to set-up tenant: #{inspect(error)}") +end diff --git a/test/realtime/tenants/janitor/maintenance_task_test.exs b/test/realtime/tenants/janitor/maintenance_task_test.exs new file mode 100644 index 000000000..f4c51436e --- /dev/null +++ b/test/realtime/tenants/janitor/maintenance_task_test.exs @@ -0,0 +1,101 @@ +defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do + use Realtime.DataCase, async: true + + alias Realtime.Tenants.Janitor.MaintenanceTask + alias Realtime.Api.Message + alias Realtime.Database + alias Realtime.Repo + + setup do + tenant = Containers.checkout_tenant(run_migrations: true) + # Warm cache to avoid Cachex and Ecto.Sandbox ownership issues + Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant}) + + %{tenant: tenant} + end + + test "cleans messages older than 72 hours and creates partitions", %{tenant: tenant} do + utc_now = NaiveDateTime.utc_now() + limit = NaiveDateTime.add(utc_now, -72, :hour) + + messages = + for days <- -5..0 do + inserted_at = NaiveDateTime.add(utc_now, days, :day) + message_fixture(tenant, %{inserted_at: inserted_at}) + end + |> MapSet.new() + + to_keep = + messages + |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> MapSet.new() + + assert MaintenanceTask.run(tenant.external_id) == :ok + + {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) + {:ok, res} = Repo.all(conn, from(m in Message), Message) + + verify_partitions(conn) + + current = MapSet.new(res) + + assert MapSet.difference(current, to_keep) |> MapSet.size() == 0 + end + + test "exits if fails to remove old messages" do + extensions = [ + %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_host" => "127.0.0.1", + "db_name" => "postgres", + "db_user" => "supabase_admin", + "db_password" => "postgres", + "db_port" => "11111", + "poll_interval" => 100, + "poll_max_changes" => 100, + "poll_max_record_bytes" => 1_048_576, + "region" => "us-east-1", + "ssl_enforced" => false + } + } + ] + + tenant = tenant_fixture(%{extensions: extensions}) + # Warm cache to avoid Cachex and Ecto.Sandbox ownership issues + Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant}) + + Process.flag(:trap_exit, true) + + t = + Task.async(fn -> + MaintenanceTask.run(tenant.external_id) + end) + + pid = t.pid + ref = t.ref + assert_receive {:EXIT, ^pid, :killed} + assert_receive {:DOWN, ^ref, :process, ^pid, :killed} + end + + defp verify_partitions(conn) do + today = Date.utc_today() + yesterday = Date.add(today, -1) + future = Date.add(today, 3) + dates = Date.range(yesterday, future) + + %{rows: rows} = + Postgrex.query!( + conn, + "SELECT tablename from pg_catalog.pg_tables where schemaname = 'realtime' and tablename like 'messages_%'", + [] + ) + + partitions = MapSet.new(rows, fn [name] -> name end) + + expected_names = + MapSet.new(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end) + + assert MapSet.equal?(partitions, expected_names) + end +end diff --git a/test/realtime/tenants/janitor_test.exs b/test/realtime/tenants/janitor_test.exs index 7cef814d3..5f6963c88 100644 --- a/test/realtime/tenants/janitor_test.exs +++ b/test/realtime/tenants/janitor_test.exs @@ -173,8 +173,11 @@ defmodule Realtime.Tenants.JanitorTest do [] ) - partitions = Enum.map(rows, fn [name] -> name end) - expected_names = Enum.map(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end) - assert expected_names == partitions + partitions = MapSet.new(rows, fn [name] -> name end) + + expected_names = + MapSet.new(dates, fn date -> "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}" end) + + assert MapSet.equal?(partitions, expected_names) end end diff --git a/test/support/containers.ex b/test/support/containers.ex index 0f2ba45f9..3aee8ad18 100644 --- a/test/support/containers.ex +++ b/test/support/containers.ex @@ -165,7 +165,7 @@ defmodule Containers do end defp existing_containers(pattern) do - {containers, 0} = System.cmd("docker", ["ps", "-a", "--format", "{{json .}}", "--filter", "name=#{pattern}"]) + {containers, 0} = System.cmd("docker", ["ps", "--format", "{{json .}}", "--filter", "name=#{pattern}"]) containers |> String.split("\n", trim: true) @@ -233,6 +233,7 @@ defmodule Containers do System.cmd("docker", [ "run", "-d", + "--rm", "--name", name, "-e",