Skip to content

Conversation

@fuziontech
Copy link
Member

Summary

Replace the single global ReentrantLock with per-partition locks to allow parallel processing across partitions.

Problem

The connector had a single global lock that serialized ALL operations:

  • With 16 tasks, only 1 could process at a time
  • This caused severe lock contention warnings: "Time-based flush check skipped 200+ consecutive times"
  • Throughput was bottlenecked by the single lock
  • Consumer lag was growing instead of shrinking (~18.6 billion messages behind)

Solution

Implement per-partition locking:

// BEFORE:
private final ReentrantLock flushLock = new ReentrantLock();

// AFTER:
private final ConcurrentHashMap<TopicPartition, ReentrantLock> partitionLocks = new ConcurrentHashMap<>();

Changes

  • Replace single flushLock with ConcurrentHashMap<TopicPartition, ReentrantLock>
  • Update put() to group records by partition first, then acquire per-partition locks
  • Update checkTimeBasedFlush() to iterate partitions with per-partition locks
  • Add per-partition buffer methods
  • Track flush skips per-partition instead of globally
  • Update stop() to acquire per-partition locks during shutdown

Expected Impact

  • Before: All 16 tasks serialize on one lock
  • After: Each partition has its own lock - parallel processing
  • Expected improvement: 2-4x throughput

Test Plan

  • Code compiles
  • Existing tests pass
  • Deploy and verify reduced lock contention
  • Verify consumer lag decreases

Fixes #2

🤖 Generated with Claude Code

fuziontech and others added 3 commits December 6, 2025 12:27
Replace the single global ReentrantLock with per-partition locks to
allow parallel processing across partitions. This significantly
improves throughput when multiple tasks are handling different
partitions.

Changes:
- Replace single `flushLock` with `ConcurrentHashMap<TopicPartition, ReentrantLock>`
- Update `put()` to acquire only the lock for partitions being processed
- Update `checkTimeBasedFlush()` to use per-partition locks
- Add per-partition buffer methods for cleaner code
- Track flush skips per-partition instead of globally

Before: All 16 tasks serialized on one lock - only 1 could process at a time
After: Each partition has its own lock - parallel processing across partitions

Expected improvement: 2-4x throughput depending on partition distribution

Fixes #2

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
The SARIF files are named spotbugsMain.sarif and spotbugsTest.sarif,
not sarif.sarif. Use the directory path to upload all SARIF files.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Shorten log message and reformat lambda to stay under 120 char limit.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@fuziontech fuziontech merged commit 0d9b424 into main Dec 6, 2025
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Thread safety: tryLock() in checkTimeBasedFlush may silently skip flushes

2 participants