-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Design Document: Distributed Migration Locks
Objective
Implement a distributed locking mechanism in the gofr migration system to prevent race conditions when multiple application instances start simultaneously.
Architecture Overview
We will introduce a Locker interface that datasources can implement. The migration runner will attempt to acquire locks from all available datasources before proceeding with any migration logic.
Code Changes
1. New Interface: Locker
We will define a new interface in pkg/gofr/migration/interface.go.
// Locker is an interface that datasources can implement to support distributed locking.
type Locker interface {
// Lock acquires a distributed lock. It should block or retry until the lock is acquired
// or the context is cancelled.
// The lock identifier should be standard (e.g., "gofr_migration_lock").
Lock(ctx context.Context) error
// Unlock releases the distributed lock.
Unlock(ctx context.Context) error
}2. Update Run Function
Modify pkg/gofr/migration/migration.go:
func Run(migrationsMap map[int64]Migrate, c *container.Container) {
// ... existing initialization code ...
ds, mg, ok := getMigrator(c)
// [NEW] Lock Acquisition
// Identify all datasources that implement Locker
lockers := getLockers(ds)
// Acquire locks sequentially
// CRITICAL: Locks must be acquired in a deterministic order to prevent deadlocks.
// e.g., SQL -> Redis -> Mongo
for _, l := range lockers {
if err := l.Lock(context.Background()); err != nil {
// If we fail to acquire a lock, we must release any locks we already hold
releaseLocks(acquiredLocks)
c.Fatalf("failed to acquire migration lock: %v", err)
}
acquiredLocks = append(acquiredLocks, l)
// Ensure we unlock on exit
defer l.Unlock(context.Background())
}
// ... existing migration logic (getLastMigration, loop, etc.) ...
}4. Lock Acquisition Strategy
To answer the critical question of concurrency: Yes, one single pod will acquire ALL locks for ALL datasources.
- All-or-Nothing: The migration process cannot proceed unless it holds exclusive locks for every configured datasource. This ensures that no other pod can modify any part of the system state while migrations are running.
- Deterministic Ordering: To prevent deadlocks (where Pod A holds SQL and wants Redis, while Pod B holds Redis and wants SQL), locks will always be acquired in a fixed, deterministic order (e.g., alphabetical by datasource type or a hardcoded precedence).
5. Edge Cases & Solutions
A. Deadlock Prevention (The "Cross-Lock" Problem)
Scenario: Pod A holds Lock 1 and waits for Lock 2. Pod B holds Lock 2 and waits for Lock 1.
Solution: Deterministic Ordering.
We enforce a strict global order for acquiring locks. For example: SQL -> Redis -> Mongo -> Cassandra.
- Pod A acquires SQL.
- Pod B tries to acquire SQL -> BLOCKED.
- Pod A acquires Redis, Mongo, etc.
- Pod A finishes and releases.
- Pod B acquires SQL.
- Result: Cycles are mathematically impossible.
B. Lock Expiry (The "Zombie" Problem)
Scenario: Pod A acquires a lock but crashes (OOM, power failure) before releasing it.
Solution: Time-To-Live (TTL).
All locks must have an expiration time (e.g., 5-10 minutes).
- Redis:
SETNXwithEX. - Mongo/SQL: Store
expires_attimestamp. - Logic: If a lock is found but
expires_at < now, it is considered a "zombie" and can be overwritten (stolen) by the active pod.
C. Partial Failure (The "Stuck" Problem)
Scenario: Pod A acquires SQL lock, but fails to acquire Redis lock (e.g., Redis is down).
Solution: Immediate Release & Retry.
If any lock acquisition fails:
- Release all currently held locks immediately (in reverse order).
- Wait (backoff).
- Retry the entire sequence.
This prevents Pod A from holding the SQL lock indefinitely while waiting for Redis to come back.
D. Clock Skew
Scenario: Pod A and Pod B have different system times, leading to incorrect expiry checks.
Solution: Server-Side Time.
Where possible, rely on the datasource's internal mechanism (e.g., Redis TTL). If storing timestamps (SQL/Mongo), ensure a generous buffer (e.g., 5 minutes) to account for reasonable skew, or use the database's NOW() function instead of client time.
Low-Level Design (LLD)
1. Locker Interface
Located in pkg/gofr/migration/interface.go.
type Locker interface {
// Lock acquires the lock. It blocks until acquired or context is cancelled.
Lock(ctx context.Context) error
// Unlock releases the lock.
Unlock() error
}2. SQL Implementation (sqlMigrator)
Challenge: SQL locks (like MySQL GET_LOCK) are often session-bound. If we use a connection pool, we must ensure the lock is held on a dedicated connection.
Solution: We will use a dedicated Transaction (Begin()) to hold the connection for the duration of the lock.
type sqlMigrator struct {
// ... existing fields
lockTx *gofrSql.Tx // Store the transaction holding the lock
}
func (d *sqlMigrator) Lock(ctx context.Context) error {
// 1. Start a transaction to get a dedicated connection
tx, err := d.SQL.Begin()
if err != nil {
return err
}
d.lockTx = tx
// 2. Execute Dialect-specific lock query
query := ""
switch d.SQL.Dialect() {
case "mysql":
query = "SELECT GET_LOCK('gofr_migrations', 10)" // 10s timeout
case "postgres":
query = "SELECT pg_advisory_lock(123456789)" // Arbitrary key
default:
// For SQLite or others, we might rely on the transaction itself or skip
return nil
}
_, err = tx.ExecContext(ctx, query)
if err != nil {
d.lockTx.Rollback()
return err
}
return nil
}
func (d *sqlMigrator) Unlock() error {
if d.lockTx != nil {
// Rolling back the transaction releases the connection and the lock
return d.lockTx.Rollback()
}
return nil
}3. Redis Implementation (redisMigrator)
Challenge: Redis SETNX is atomic but requires a TTL to prevent deadlocks if the pod crashes.
Solution: Use SetNX with a reasonable TTL (e.g., 5 minutes). We must also ensure we don't delete a lock held by another instance (though with Run blocking, this is less likely, we can use a unique ID if strict correctness is needed).
func (r *redisMigrator) Lock(ctx context.Context) error {
key := "gofr_migrations_lock"
ttl := 5 * time.Minute
// Simple spin lock
for {
// Try to acquire
ok, err := r.Redis.SetNX(ctx, key, "locked", ttl).Result()
if err != nil {
return err
}
if ok {
return nil // Acquired
}
// Wait and retry
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(500 * time.Millisecond):
continue
}
}
}
func (r *redisMigrator) Unlock() error {
return r.Redis.Del(context.Background(), "gofr_migrations_lock").Err()
}4. Migration Runner (Run)
The Run function orchestrates the locking.
func Run(migrationsMap map[int64]Migrate, c *container.Container) {
// ... init ...
// 1. Identify Lockers
var lockers []Locker
if c.SQL != nil { lockers = append(lockers, &sqlMigrator{...}) }
if c.Redis != nil { lockers = append(lockers, &redisMigrator{...}) }
// ... others ...
// 2. Acquire All Locks
for _, l := range lockers {
if err := l.Lock(context.Background()); err != nil {
// Release already acquired
for _, acquired := range acquiredLockers {
acquired.Unlock()
}
c.Fatalf("Migration lock failed: %v", err)
}
acquiredLockers = append(acquiredLockers, l)
}
// 3. Defer Release
defer func() {
for i := len(lockers) - 1; i >= 0; i-- {
lockers[i].Unlock()
}
}()
// ... Run Migrations ...
}5. Mongo Implementation (mongoMigrator)
Mechanism: Use FindOneAndUpdate (or UpdateOne with filter) on a dedicated lock document in gofr_migrations collection.
Logic:
- Lock:
db.gofr_migrations.updateOne({_id: "lock", locked: false}, {$set: {locked: true, expiresAt: Now() + TTL}})- If
ModifiedCount > 0, we acquired the lock. - If
ModifiedCount == 0, check if existing lock is expired. If expired, overwrite it.
- If
- Unlock:
db.gofr_migrations.updateOne({_id: "lock"}, {$set: {locked: false}})
func (m *mongoMigrator) Lock(ctx context.Context) error {
filter := bson.M{"_id": "lock", "locked": false}
update := bson.M{"$set": bson.M{"locked": true, "expiresAt": time.Now().Add(5 * time.Minute)}}
// Loop with timeout
for {
res, err := m.Mongo.UpdateOne(ctx, "gofr_migrations", filter, update)
// Handle success, failure, and expiration logic...
}
}6. Cassandra/ScyllaDB Implementation (cassandraMigrator)
Mechanism: Lightweight Transactions (LWT) using INSERT ... IF NOT EXISTS or UPDATE ... IF ....
Logic:
- Lock:
INSERT INTO gofr_migrations_lock (id, locked, lease_id) VALUES ('lock', true, uuid()) IF NOT EXISTS- Or
UPDATE gofr_migrations_lock SET locked = true, lease_id = uuid() WHERE id = 'lock' IF locked = false
- Or
- Unlock:
UPDATE gofr_migrations_lock SET locked = false WHERE id = 'lock' IF lease_id = my_uuid
func (c *cassandraMigrator) Lock(ctx context.Context) error {
// Use ExecCAS (Compare And Set)
applied, err := c.Cassandra.ExecCASWithCtx(ctx, &dest,
"UPDATE gofr_migrations SET locked = true WHERE id = 'lock' IF locked = false")
// ... retry logic ...
}7. Elasticsearch Implementation (elasticsearchMigrator)
Mechanism: Optimistic Concurrency Control using op_type=create or versioning.
Logic:
- Lock: Try to create a document with ID
lock.PUT /gofr_migrations/_doc/lock/_create- If 201 Created -> Acquired.
- If 409 Conflict -> Check if expired.
- Unlock: Delete the
lockdocument.
8. Clickhouse Implementation (clickhouseMigrator)
Challenge: Clickhouse is not designed for row-level locking.
Solution:
- If using ClickHouse Keeper (Zookeeper compatible), use
SELECT getLock('gofr_migrations', 10). - If not, we might have to skip locking for Clickhouse or use a "best effort" via a separate table with
Memoryengine, though it's not distributed-safe without Keeper. - Recommendation: Use
SELECT getLock(...)and assume Keeper/Zookeeper is present for distributed setups. If it fails (function not found), log a warning and proceed (soft failure).
9. ArangoDB Implementation (arangoMigrator)
Mechanism: Document locking within a transaction.
Logic:
- Lock: Try to insert a lock document
{_key: "lock"}.- If success -> Acquired.
- If conflict -> Check timestamp. If expired, replace.
- Unlock: Remove the lock document.
10. DGraph Implementation (dgraphMigrator)
Mechanism: Distributed Transactions (ACID).
Logic:
- Lock: Mutate a node with
dgraph.type: "Lock"andid: "migration_lock".- Use
upsertblock with@ifto check iflockedis false.
- Use
- Unlock: Mutate
lockedto false.
11. Oracle Implementation (oracleMigrator)
Mechanism: DBMS_LOCK package.
Logic:
- Lock:
DBMS_LOCK.REQUEST(id => 123456, lockmode => DBMS_LOCK.x_mode, timeout => 10, release_on_commit => FALSE)- Note:
release_on_commitmust be FALSE to hold it across the migration transaction.
- Note:
- Unlock:
DBMS_LOCK.RELEASE(id => 123456)
12. SurrealDB Implementation (surrealMigrator)
Mechanism: Transactions with CREATE or UPDATE.
Logic:
- Lock:
CREATE gofr_migrations:lock SET locked = true- Handle error if exists.
- Unlock:
DELETE gofr_migrations:lock
13. OpenTSDB & PubSub
Status: These datasources do not support strong consistency primitives suitable for distributed locking.
Strategy:
- OpenTSDB: Skip locking. Warn user if it's the only datasource.
- PubSub: Skip locking. Topic creation is usually idempotent or handled by the broker.
3. Datasource Implementations
We will implement Locker for the primary datasources.
SQL (pkg/gofr/migration/sql.go)
- MySQL: Use
GET_LOCK('gofr_migration_lock', timeout). - Postgres: Use
pg_advisory_lock(key). - SQLite: File locking is handled by the driver, but we can implement a no-op or explicit table lock if needed.
func (d sqlMigrator) Lock(ctx context.Context) error {
// Logic to detect dialect and execute appropriate lock query
}
func (d sqlMigrator) Unlock(ctx context.Context) error {
// Logic to release lock
}Redis (pkg/gofr/migration/redis.go)
- Use
SETNXkeygofr_migration_lockwith a TTL (e.g., 1 minute) to prevent deadlocks if the process crashes. - Implement a retry loop (spin lock) to wait for the lock.
func (r redisMigrator) Lock(ctx context.Context) error {
// Loop: SETNX "gofr_migration_lock" "1" EX 60
// If success -> return nil
// If fail -> sleep and retry
}Other Datasources (Mongo, Cassandra, etc.)
- Mongo:
findAndModifyon a special lock document ingofr_migrationscollection. - Cassandra: Lightweight Transactions (LWT) using
INSERT ... IF NOT EXISTS.
Implementation Plan
- Define Interface: Add
Lockertointerface.go. - Implement SQL Lock: Add
Lock/UnlocktosqlMigrator. - Implement Redis Lock: Add
Lock/UnlocktoredisMigrator. - Update Runner: Modify
Runinmigration.goto use the locks. - Testing:
- Unit tests for
Lock/Unlockmethods. - Integration test: Spawn multiple goroutines/processes trying to run migrations concurrently and verify sequential execution.
- Unit tests for