-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17747: Add compute topic and group hash #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: PoAn Yang <[email protected]>
|
CodeAnt AI is reviewing your PR. Thanks for using CodeAnt! 🎉We're free for open-source projects. if you're enjoying it, help us grow by sharing. Share on X · |
|
@refacto-visz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the group coordinator by introducing a mechanism to compute cryptographic hashes for Kafka topics and consumer groups. This functionality is designed to provide a quick and reliable way to detect changes in topic configurations or the composition of consumer groups, which is vital for maintaining consistent internal state and optimizing operations. The implementation leverages the Guava Hashing library for its proven hashing capabilities. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThis pull request adds Guava 33.4.0-jre as a dependency to the group-coordinator module and introduces two static helper methods— Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces utility methods to compute hashes for topics and groups, which involves adding Guava as a dependency. The implementation of computeTopicHash has a potential issue with UUID hashing that could lead to collisions, and there's a minor opportunity for code simplification in the stream processing. The new tests will also need updates to reflect the suggested change in the hashing logic. Otherwise, the changes look good and the tests are comprehensive.
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using topicImage.id().hashCode() for hashing the Uuid can lead to an increased chance of collisions. Uuid.hashCode() returns an int, which is then cast to a long, but it discards information from the full 128-bit UUID. To create a more robust hash, you should use the full UUID value.
Consider changing the hasher chain to include both parts of the UUID:
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().getMostSignificantBits())
.putLong(topicImage.id().getLeastSignificantBits())
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitionsThis will also require updating the corresponding tests in GroupTest.java.
| .filter(Optional::isPresent) | ||
| .map(Optional::get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the recommended change in Group.computeTopicHash to use the full 128-bit UUID for hashing, this line and similar lines in other tests (testComputeTopicHashWithDifferentMagicByte, testComputeTopicHashWithDifferentPartitionOrder, testComputeTopicHashWithDifferentRackOrder) should be updated.
Instead of putLong(FOO_TOPIC_ID.hashCode()), you should use:
.putLong(FOO_TOPIC_ID.getMostSignificantBits())
.putLong(FOO_TOPIC_ID.getLeastSignificantBits())|
@refacto-visz |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Nitpicks 🔍
|
Code Review: UUID Hashing ImplementationPR Confidence Score: 🟨 4 / 5👍 Well Done
📁 Selected files for review (5)
📝 Additional Comments
|
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UUID Hashing Weakness
The code uses topicImage.id().hashCode(), which converts a 128-bit UUID to a 32-bit integer before hashing. This discards 96 bits of information, significantly increasing the probability of hash collisions for different topic UUIDs, potentially causing incorrect topic state detection.
.putLong(topicImage.id().getMostSignificantBits())
.putLong(topicImage.id().getLeastSignificantBits()) // full UUID bits
Commitable Suggestion
| .putLong(topicImage.id().hashCode()) // topic Id | |
| .putLong(topicImage.id().getMostSignificantBits()) | |
| .putLong(topicImage.id().getLeastSignificantBits()) // full UUID bits |
Standards
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- ISO-IEC-25010-Reliability-Fault-Tolerance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (1)
239-267: Topic hash design looks sound; consider using full UUID bits instead ofhashCode()and reusing theHashFunctionThe overall structure of
computeTopicHash(fixed magic byte, topic id + name + partition count, partitions sorted by id, racks sorted per partition) gives:
- Deterministic hashes for a given metadata/rack layout.
- Sensitivity to changes in topic identity, partitioning, or rack distribution, while ignoring replica order within a partition.
Two possible refinements:
Use full UUID instead of
hashCode()
Currently you do:.putLong(topicImage.id().hashCode())
Uuid.hashCode()collapses a 128‑bit UUID down to 32 bits, which is then zero-extended into along. If you ever rely on differences in topic IDs alone (e.g., same name/partitions/racks but a different ID across images), this unnecessarily increases collision risk. You could instead feed both UUID halves:
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(topicImage.id().getMostSignificantBits()).putLong(topicImage.id().getLeastSignificantBits())(Adjust if `TopicImage.id()` is not `Uuid` but exposes equivalent accessors.) This keeps the intent (“include topic id in the hash”) while preserving the full 128 bits.
- Optional: reuse a static
HashFunctioninstance
Hashing.murmur3_128()is pure and thread-safe; you could store it in aprivate static final HashFunctionand callMURMUR3_128.newHasher()here to avoid re-creating the function object on every invocation. The impact is small but easy to adopt if this runs in a hot path.These are behavioural-compatible improvements except for (1), which changes the hash definition. If the topic hash is persisted or part of any cross-version protocol, you’d need to decide now whether you want the stronger ID contribution and, if so, lock it in before the feature ships.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (1)
40-188: Strong coverage of hash behaviour; consider a couple of small robustness tweaksThe tests exercise the key properties of both hashing helpers well:
testComputeTopicHashmatches the exact Murmur3 input sequence.- Variant tests ensure changes in magic byte, partition order, rack order, and individual fields all perturb the hash.
testComputeGroupHashand the order-variant test correctly lock in the “sorted by topic name, then combineOrdered” semantics.Two optional improvements you might consider:
Make the topic ID deterministic in tests
UsingUuid.randomUuid()forFOO_TOPIC_IDis fine, but swapping it for a fixed UUID (e.g.,Uuid.fromString("...")or a constructor) would make failures easier to reproduce and reason about if someone later needs to compare expected hash values across runs.Add explicit edge-case tests
computeGroupHashwith an emptytopicHashesmap, to document and lock in the expected result.computeTopicHashfor a topic where brokers have no rack information, to clarify that the hash only depends on racks when present and what the behaviour is when all racks are missing.These would make the contract of the helpers clearer without changing the existing behaviour.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
build.gradle(1 hunks)checkstyle/import-control-group-coordinator.xml(1 hunks)gradle/dependencies.gradle(2 hunks)group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java(2 hunks)group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (2)
clients/src/main/java/org/apache/kafka/common/Uuid.java (1)
Uuid(32-196)metadata/src/main/java/org/apache/kafka/image/MetadataImage.java (1)
MetadataImage(33-200)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (5)
metadata/src/main/java/org/apache/kafka/image/ClusterImage.java (1)
ClusterImage(36-108)metadata/src/main/java/org/apache/kafka/image/TopicImage.java (1)
TopicImage(37-92)metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java (1)
BrokerRegistration(44-398)core/src/test/scala/unit/kafka/log/LogManagerTest.scala (1)
topicImage(1390-1422)core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala (1)
hashCode(51-53)
🔇 Additional comments (3)
checkstyle/import-control-group-coordinator.xml (1)
80-80: Import-control allowance for Guava hashing looks correctAllowing
com.google.common.hashunder thecoordinator.groupsubpackage is consistent with the new usage inGroupandGroupTestand keeps the scope tight.build.gradle (1)
1410-1424: Guava dependency scope for group-coordinator is reasonableAdding
implementation libs.guavaonly to:group-coordinatoris appropriate since the new helpers return primitives and don’t expose Guava types. Please just double-check that no public API in this module (or its-apisibling) directly uses Guava types, so we keep Guava as an internal implementation detail.group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (1)
223-237: Deterministic group hash via sorted topics +combineOrderedSorting the
topicHashesentries by topic name before callingHashing.combineOrderedgives a stable group hash independent ofMapiteration order, and the API returns a primitivelong, so Guava stays internal here. This looks correct for the intended “ordered combination of per-topic hashes” use case.
| guava: "33.4.0-jre", | ||
| httpclient: "4.5.14", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verification script for Guava dependency review
echo "=== 1. Checking for Guava in LICENSE-binary ==="
if [ -f "LICENSE-binary" ]; then
echo "LICENSE-binary exists. Searching for Guava..."
rg -i "guava" LICENSE-binary -n || echo "Guava not found in LICENSE-binary"
else
echo "LICENSE-binary does not exist"
fi
echo ""
echo "=== 2. Confirming libs.guava wire-up ==="
rg "libs\.guava" . -n
echo ""
echo "=== 3. Checking gradle/dependencies.gradle structure ==="
sed -n '60,70p' gradle/dependencies.gradle
echo "---"
sed -n '148,155p' gradle/dependencies.gradleRepository: coderabbit-test/kafka
Length of output: 1247
Add Guava license entry to LICENSE-binary
Guava 33.4.0-jre is wired correctly in gradle/dependencies.gradle and is already in use (build.gradle:1423). However, per the dependency file comment requiring LICENSE-binary updates for new third-party dependencies, Guava is missing from LICENSE-binary. Add an entry with the correct Apache 2.0 license text for Guava 33.4.0-jre.
🤖 Prompt for AI Agents
In gradle/dependencies.gradle around lines 64-65, Guava 33.4.0-jre is declared
but missing from LICENSE-binary; add a new entry to LICENSE-binary for "Guava
33.4.0-jre" that includes the full Apache License, Version 2.0 text (or an exact
copy per project convention), the copyright/attribution for Google LLC (or the
upstream copyright listed in Guava's distribution), a reference to the Guava
project and version (e.g., https://github.com/google/guava and 33.4.0-jre), and
any SPDX identifier (Apache-2.0) so the LICENSE-binary accurately reflects this
third-party dependency.
| return Hashing.combineOrdered( | ||
| topicHashes.entrySet() | ||
| .stream() | ||
| .sorted(Map.Entry.comparingByKey()) | ||
| .map(e -> HashCode.fromLong(e.getValue())) | ||
| .toList() | ||
| ).asLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: When the topic hash map is empty, calling Hashing.combineOrdered with an empty list will throw an IllegalArgumentException at runtime, so the group hash computation must handle the empty-case explicitly and return a deterministic value instead of delegating to combineOrdered. [logic error]
Severity Level: Minor
| return Hashing.combineOrdered( | |
| topicHashes.entrySet() | |
| .stream() | |
| .sorted(Map.Entry.comparingByKey()) | |
| .map(e -> HashCode.fromLong(e.getValue())) | |
| .toList() | |
| ).asLong(); | |
| List<HashCode> hashCodes = topicHashes.entrySet() | |
| .stream() | |
| .sorted(Map.Entry.comparingByKey()) | |
| .map(e -> HashCode.fromLong(e.getValue())) | |
| .toList(); | |
| if (hashCodes.isEmpty()) { | |
| return 0L; | |
| } | |
| return Hashing.combineOrdered(hashCodes).asLong(); |
Why it matters? ⭐
The suggestion is correct: Guava's Hashing.combineOrdered throws if given an empty iterable, so the current implementation can throw at runtime for an empty topicHashes map. The proposed change makes the behavior deterministic for that edge case and prevents a potential IllegalArgumentException. This is a real bug fix, not a mere stylistic change.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
**Line:** 230:236
**Comment:**
*Logic Error: When the topic hash map is empty, calling `Hashing.combineOrdered` with an empty list will throw an IllegalArgumentException at runtime, so the group hash computation must handle the empty-case explicitly and return a deterministic value instead of delegating to `combineOrdered`.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.|
CodeAnt AI finished reviewing your PR. |
CodeAnt-AI Description
Add deterministic topic and group hashing used by the group coordinator
What Changed
Impact
✅ Detects topic metadata changes affecting group hash✅ Stable group hashing regardless of map or partition iteration order✅ Fewer incorrect group equality matches due to nondeterministic ordering💡 Usage Guide
Checking Your Pull Request
Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.
Talking to CodeAnt AI
Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:
This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.
Example
Preserve Org Learnings with CodeAnt
You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:
This helps CodeAnt AI learn and adapt to your team's coding style and standards.
Example
Retrigger review
Ask CodeAnt AI to review the PR again, by typing:
Check Your Repository Health
To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.
Summary by CodeRabbit
Chores
Tests
✏️ Tip: You can customize this high-level summary in your review settings.