Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
implementation libs.guava

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="com.google.re2j" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="com.google.common.hash" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.HdrHistogram" />
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ versions += [
classgraph: "4.8.173",
gradle: "8.10.2",
grgit: "4.1.1",
guava: "33.4.0-jre",
httpclient: "4.5.14",
Comment on lines +64 to 65
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verification script for Guava dependency review

echo "=== 1. Checking for Guava in LICENSE-binary ==="
if [ -f "LICENSE-binary" ]; then
  echo "LICENSE-binary exists. Searching for Guava..."
  rg -i "guava" LICENSE-binary -n || echo "Guava not found in LICENSE-binary"
else
  echo "LICENSE-binary does not exist"
fi

echo ""
echo "=== 2. Confirming libs.guava wire-up ==="
rg "libs\.guava" . -n

echo ""
echo "=== 3. Checking gradle/dependencies.gradle structure ==="
sed -n '60,70p' gradle/dependencies.gradle
echo "---"
sed -n '148,155p' gradle/dependencies.gradle

Repository: coderabbit-test/kafka

Length of output: 1247


Add Guava license entry to LICENSE-binary

Guava 33.4.0-jre is wired correctly in gradle/dependencies.gradle and is already in use (build.gradle:1423). However, per the dependency file comment requiring LICENSE-binary updates for new third-party dependencies, Guava is missing from LICENSE-binary. Add an entry with the correct Apache 2.0 license text for Guava 33.4.0-jre.

🤖 Prompt for AI Agents
In gradle/dependencies.gradle around lines 64-65, Guava 33.4.0-jre is declared
but missing from LICENSE-binary; add a new entry to LICENSE-binary for "Guava
33.4.0-jre" that includes the full Apache License, Version 2.0 text (or an exact
copy per project convention), the copyright/attribution for Google LLC (or the
upstream copyright listed in Guava's distribution), a reference to the Guava
project and version (e.g., https://github.com/google/guava and 33.4.0-jre), and
any SPDX identifier (Apache-2.0) so the LICENSE-binary accurately reflects this
third-party dependency.

jackson: "2.16.2",
jacoco: "0.8.10",
Expand Down Expand Up @@ -147,6 +148,7 @@ libs += [
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
guava: "com.google.guava:guava:$versions.guava",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}

/**
* Computes the hash of the topics in a group.
*
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
).asLong();
Comment on lines +230 to +236
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: When the topic hash map is empty, calling Hashing.combineOrdered with an empty list will throw an IllegalArgumentException at runtime, so the group hash computation must handle the empty-case explicitly and return a deterministic value instead of delegating to combineOrdered. [logic error]

Severity Level: Minor ⚠️

Suggested change
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
).asLong();
List<HashCode> hashCodes = topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList();
if (hashCodes.isEmpty()) {
return 0L;
}
return Hashing.combineOrdered(hashCodes).asLong();
Why it matters? ⭐

The suggestion is correct: Guava's Hashing.combineOrdered throws if given an empty iterable, so the current implementation can throw at runtime for an empty topicHashes map. The proposed change makes the behavior deterministic for that edge case and prevents a potential IllegalArgumentException. This is a real bug fix, not a mere stylistic change.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
**Line:** 230:236
**Comment:**
	*Logic Error: When the topic hash map is empty, calling `Hashing.combineOrdered` with an empty list will throw an IllegalArgumentException at runtime, so the group hash computation must handle the empty-case explicitly and return a deterministic value instead of delegating to `combineOrdered`.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

}

/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
*
* @param topicImage The topic image.
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using topicImage.id().hashCode() for hashing the Uuid can lead to an increased chance of collisions. Uuid.hashCode() returns an int, which is then cast to a long, but it discards information from the full 128-bit UUID. To create a more robust hash, you should use the full UUID value.

Consider changing the hasher chain to include both parts of the UUID:

Hasher topicHasher = hf.newHasher()
    .putByte((byte) 0) // magic byte
    .putLong(topicImage.id().getMostSignificantBits())
    .putLong(topicImage.id().getLeastSignificantBits())
    .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
    .putInt(topicImage.partitions().size()); // number of partitions

This will also require updating the corresponding tests in GroupTest.java.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUID Hashing Weakness

The code uses topicImage.id().hashCode(), which converts a 128-bit UUID to a 32-bit integer before hashing. This discards 96 bits of information, significantly increasing the probability of hash collisions for different topic UUIDs, potentially causing incorrect topic state detection.

            .putLong(topicImage.id().getMostSignificantBits())
            .putLong(topicImage.id().getLeastSignificantBits()) // full UUID bits
Commitable Suggestion
Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(topicImage.id().getMostSignificantBits())
.putLong(topicImage.id().getLeastSignificantBits()) // full UUID bits
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Fault-Tolerance

.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions

topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
Comment on lines +260 to +261

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For improved readability and conciseness, the stream operation .filter(Optional::isPresent).map(Optional::get) can be simplified using flatMap.

//...
.map(BrokerRegistration::rack)
.flatMap(Optional::stream)
.sorted()
//...

This requires Java 9+.

.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataImage;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

public class GroupTest {
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
private static final String FOO_TOPIC_NAME = "foo";
private static final String BAR_TOPIC_NAME = "bar";
private static final int FOO_NUM_PARTITIONS = 2;
private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build();

@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Following the recommended change in Group.computeTopicHash to use the full 128-bit UUID for hashing, this line and similar lines in other tests (testComputeTopicHashWithDifferentMagicByte, testComputeTopicHashWithDifferentPartitionOrder, testComputeTopicHashWithDifferentRackOrder) should be updated.

Instead of putLong(FOO_TOPIC_ID.hashCode()), you should use:

.putLong(FOO_TOPIC_ID.getMostSignificantBits())
.putLong(FOO_TOPIC_ID.getLeastSignificantBits())

.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentPartitionOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentRackOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}

@ParameterizedTest
@MethodSource("differentFieldGenerator")
void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

assertNotEquals(
Group.computeTopicHash(
differentImage.topics().getTopic(topicId),
differentImage.cluster()
),
result
);
}

private static Stream<Arguments> differentFieldGenerator() {
Uuid differentTopicId = Uuid.randomUuid();
return Stream.of(
Arguments.of(new MetadataImageBuilder() // different topic id
.addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build(),
differentTopicId
),
Arguments.of(new MetadataImageBuilder() // different topic name
.addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different partitions
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different racks
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.build(),
FOO_TOPIC_ID
)
);
}

@Test
void testComputeGroupHash() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long expected = Hashing.combineOrdered(List.of(
HashCode.fromLong(123L),
HashCode.fromLong(456L)
)).asLong();
assertEquals(expected, result);
}

@Test
void testComputeGroupHashWithDifferentOrder() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long unexpected = Hashing.combineOrdered(List.of(
HashCode.fromLong(456L),
HashCode.fromLong(123L)
)).asLong();
assertNotEquals(unexpected, result);
}
}
Loading