Skip to content

Conversation

@Athosone
Copy link

@Athosone Athosone commented Jun 19, 2025

Summary

This pull request implements support for configurable schema subject naming strategies in NS4Kafka, allowing namespaces to enforce different schema naming conventions beyond the default TopicNameStrategy.

Changes Made

Core Implementation

  • New enum SubjectNameStrategy: Defines the three main Confluent schema naming strategies:

    • TOPIC_NAME (default): {topic}-key/{topic}-value format
    • TOPIC_RECORD_NAME: {topic}-{recordName} format
    • RECORD_NAME: {recordName} format
  • Enhanced validation framework: Added SchemaSubjectNameValidator with comprehensive validation logic for all naming strategies, including:

    • AVRO schema record name extraction
    • Pattern matching for each strategy type
    • Support for qualified record names

Integration Points

  • Topic configuration: Added value.subject.name.strategy configuration support
  • Namespace validation: Extended namespace specs to define valid naming strategies per namespace
  • Schema service: Updated schema validation to use configured naming strategies instead of hardcoded TopicNameStrategy
  • Error handling: Enhanced error messages to show expected formats for configured strategies

Files Modified

  • SubjectNameStrategy.java (new): Core enum with strategy definitions and format helpers
  • SchemaSubjectNameValidator.java (new): Validation logic for all naming strategies
  • TopicValidator.java: Added naming strategy configuration validation
  • SchemaService.java: Integrated new validation logic
  • FormatErrorUtils.java: Enhanced error messages for naming strategy violations
  • Topic.java: Added subject name strategy configuration support
  • Comprehensive test coverage in SchemaSubjectNameValidatorTest.java

Benefits

  • Flexibility: Supports all Confluent schema naming strategies
  • Backward compatibility: Default behavior unchanged (TopicNameStrategy)
  • Namespace-level control: Administrators can configure allowed strategies per namespace
  • Better validation: Clear error messages showing expected formats
  • Future-ready: Extensible architecture for additional naming strategies

Testing

  • Added comprehensive unit tests covering all naming strategies
  • Validated AVRO record name extraction with various schema formats
  • Integration tests ensure backward compatibility
  • Edge cases handled (empty schemas, malformed content, etc.)

This implementation provides the foundation for flexible schema management while maintaining full backward compatibility with existing deployments.

Co-authors

Co-authored-by: monitorpattern [email protected]

@monitorpattern monitorpattern force-pushed the feature/schema-naming-strategies branch from 36f2d5d to 9db47c1 Compare June 19, 2025 15:03
@Athosone Athosone force-pushed the feature/schema-naming-strategies branch 2 times, most recently from 0d70c20 to 5272cda Compare June 19, 2025 15:06
@monitorpattern monitorpattern force-pushed the feature/schema-naming-strategies branch from 910f0a5 to 990a391 Compare June 19, 2025 19:44
@Athosone Athosone closed this Jun 19, 2025
@Athosone Athosone reopened this Jun 19, 2025
@monitorpattern monitorpattern force-pushed the feature/schema-naming-strategies branch from cfc225d to 5edb5ba Compare June 30, 2025 11:09
@loicgreffier loicgreffier changed the title feat: Add support for schema subject naming strategies Add support for schema subject naming strategies Jul 8, 2025
@loicgreffier loicgreffier added the feature This issue or pull request contains a new feature label Jul 8, 2025
Copy link
Collaborator

@loicgreffier loicgreffier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Athosone @monitorpattern Here is my review.

I have mainly reviewed the schema validation for now (basically what's inside schemaService.validateSchema).

I've added "⚠️" in front of the points that need specific attention. The rest are minor code updates.

List<String> validationErrors = new ArrayList<>();
List<SubjectNameStrategy> namingStrategies = getValidSubjectNameStrategies(namespace);
String subjectName = schema.getMetadata().getName();
boolean isValid = SchemaSubjectNameValidator.validateSubjectName(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pass the whole schema object here instead of these 3 parameters schema.getMetadata().getName(), schema.getSpec().getSchema(), schema.getSpec().getSchemaType(). It'll make it clearer.

Also, we use to define validation functions in services, so validateSubjectName can just be a function of the SchemaService.

public static boolean validateSubjectNameWithStrategy(
String subjectName, SubjectNameStrategy strategy, String schemaContent, Schema.SchemaType schemaType) {
// https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/subject
switch (strategy) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this switch-case could be written using the return-switch pattern:

return switch (strategy) {
    case TOPIC_NAME -> {
        String topicName = extractTopicName(subjectName, strategy).orElse("");
        yield subjectName.equals(topicName + "-key") || subjectName.equals(topicName + "-value");
    }
    case TOPIC_RECORD_NAME -> {
        String topicName = extractTopicName(subjectName, strategy).orElse("");
        Optional<String> recordName = extractRecordName(schemaContent, schemaType);
        yield recordName.isPresent() && subjectName.equals(topicName + "-" + recordName.get());
    }
    case RECORD_NAME -> {
        Optional<String> recordName = extractRecordName(schemaContent, schemaType);
        yield recordName.isPresent() && subjectName.equals(recordName.get());
    }
};

Also, feel free to pass the whole schema in parameter to avoid having so many parameters.

}

try {
switch (schemaType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IDE is yelling that a single condition switch-case should be replaced with an if, so let's make it happy.

⚠️ I think the whole extractAvroRecordName function can be replaced with new AvroSchema(schemaContent).name() which return namespace + name for us 😅.

So overall, extracting the record name from the schema can just be:

if (schemaContent == null || schemaContent.trim().isEmpty()) {
    return Optional.empty();
}

if (schemaType == Schema.SchemaType.AVRO) {
    return Optional.of(new AvroSchema(schemaContent).name());
}

return Optional.empty();

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it throws an exception when the references aren't resolved which break existing tests and current behavior.
Also it would throw when parsing union of reference :(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hence, we hesitate between setting a constant value for the record name in the case of union references that do not have namespace/name, and letting the namespace/name parsing. In either case, we need to use the json parser. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a big issue. All functions are available in the SchemaService to build a new AvroSchema even with some references:

getSchemaReferences(schema, namespace)
.map(schemaRefs -> new AvroSchema(
    schema.getSpec().getSchema(),
    getReferences(schema),
    schemaRefs,
    null
)
.name())
  • Call the getSchemaReferences() function. The schema and namespace are needed
  • Build a new AvroSchema(...) with the references

As getSchemaReferences is asynchronous, extractRecordName should now return a Mono<String> as well as all calling methods

public final class SchemaSubjectNameValidator {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private SchemaSubjectNameValidator() {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the @NoArgsConstructor(access = AccessLevel.PRIVATE) convention in this project instead


public List<SubjectNameStrategy> getValidSubjectNameStrategies() {
ResourceValidator.Validator namingStrategies =
getValidationConstraints().get(VALUE_SUBJECT_NAME_STRATEGY);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Be careful here:

  • If the given subject name ends with -key we should verify it against the KEY_SUBJECT_NAME_STRATEGY.
  • If the given subject name is RecordName or TopicRecordName, we should verify it against both KEY_SUBJECT_NAME_STRATEGY or VALUE_SUBJECT_NAME_STRATEGY.

Currently, if a namespace has the following topic validation rules:

  • VALUE_SUBJECT_NAME_STRATEGY authorized for TopicName
  • KEY_SUBJECT_NAME_STRATEGY authorized for RecordName
    And a schema following the RecordName strategy is being deployed, the user will be denied even though they're authorized to deploy that schema, right?

Copy link
Collaborator

@ThomasCAI-mlv ThomasCAI-mlv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my review, feel free to challenge :)

* @param strategy The naming strategy
* @return The topic name if it can be determined
*/
public static Optional<String> extractTopicName(String subjectName, SubjectNameStrategy strategy) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the feedback on validateSubjectNameWithStrategy(), this method would be used only once, you might consider putting its content in extractTopicName(String subjectName, List<SubjectNameStrategy> strategies)

class SchemaSubjectNameValidatorTest {

@Test
void testValidateSubjectName_TopicNameStrategy_Valid() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you follow the naming convention shouldActLikeThisWhenThatCondition() or shouldActLikeThisIfThatCondition() or shouldActLikeThisForThatConstraint() for test methods?

I suggest shouldValidateSubjectForTopicNameStrategy() and shouldNotValidateSubjectForTopicNameStrategy()

subject, List.of(SubjectNameStrategy.RECORD_NAME), schemaContent, Schema.SchemaType.AVRO);
assertTrue(result);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to add an "OK" test with multiple strategies and a "KO" test with multiple strategies

boolean result = SchemaSubjectNameValidator.validateSubjectName(
subject, List.of(SubjectNameStrategy.TOPIC_RECORD_NAME), schemaContent, Schema.SchemaType.AVRO);
assertTrue(result);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For TopicRecordName, might be good to test:

  • KO when the subject name has no -
  • OK when the subject name has multiple -
    In addition to the simple cases:
  • OK when the subject name has one -
  • KO when the subject name does not match the schema namespace / name

@@ -0,0 +1,160 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the content of this file should be in a "Validator" class, it makes more sense in the "SchemaService". What do you think?

monitorpattern added a commit to Athosone/ns4kafka that referenced this pull request Jul 18, 2025
monitorpattern added a commit to Athosone/ns4kafka that referenced this pull request Jul 24, 2025
TOPIC_RECORD_NAME("io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"),
RECORD_NAME("io.confluent.kafka.serializers.subject.RecordNameStrategy");

private final String STRATEGY_PREFIX = "io.confluent.kafka.serializers.subject.";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static is missing: private static final

* @return The format for subject value (i.e. the SchemaResource metadata name) according to subject name strategy
*/
public String toExpectedFormat() {
switch (this) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use return-switch here like:

return switch (this) {
    case TOPIC_NAME -> "{topic}-{key|value}";
    case TOPIC_RECORD_NAME -> "{topic}-{recordName}";
    case RECORD_NAME -> "{recordName}";
};

*/
@AllArgsConstructor
public class ValidSubjectNameStrategies {
public List<SubjectNameStrategy> validValueStrategies;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make both attributes private and define a @Getter on the class

*/
public static boolean validateSubjectName(ValidSubjectNameStrategies validStrategies, Schema schema) {
if (schema.getMetadata().getName().endsWith("-key")) {
return validStrategies.validKeyStrategies.stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use getter here, based on my comment above: validStrategies.getValidKeyStrategies()

if (subjectName == null || subjectName.trim().isEmpty()) {
return false;
}
switch (strategy) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return-switch here like:

return switch (strategy) {
    case TOPIC_NAME -> subjectName.endsWith("-key") || subjectName.endsWith("-value");
    case TOPIC_RECORD_NAME -> {
        Optional<String> recordName = extractRecordName(schemaContent, schemaType);
        yield recordName.isPresent()
            && (recordName.get() == UNION_AVRO_RECORD_CLASS_NAME
            || subjectName.endsWith("-" + recordName.get()));
    }
    case RECORD_NAME -> {
        Optional<String> recordNameOnly = extractRecordName(schemaContent, schemaType);
        yield recordNameOnly.isPresent() && subjectName.equals(recordNameOnly.get());
    }
};

}

try {
switch (schemaType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a big issue. All functions are available in the SchemaService to build a new AvroSchema even with some references:

getSchemaReferences(schema, namespace)
.map(schemaRefs -> new AvroSchema(
    schema.getSpec().getSchema(),
    getReferences(schema),
    schemaRefs,
    null
)
.name())
  • Call the getSchemaReferences() function. The schema and namespace are needed
  • Build a new AvroSchema(...) with the references

As getSchemaReferences is asynchronous, extractRecordName should now return a Mono<String> as well as all calling methods

@loicgreffier loicgreffier force-pushed the feature/schema-naming-strategies branch from 460d310 to afbadec Compare October 24, 2025 21:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature This issue or pull request contains a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants