-
Notifications
You must be signed in to change notification settings - Fork 15
Add support for schema subject naming strategies #581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add support for schema subject naming strategies #581
Conversation
36f2d5d to
9db47c1
Compare
0d70c20 to
5272cda
Compare
910f0a5 to
990a391
Compare
cfc225d to
5edb5ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Athosone @monitorpattern Here is my review.
I have mainly reviewed the schema validation for now (basically what's inside schemaService.validateSchema).
I've added "
src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java
Outdated
Show resolved
Hide resolved
src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java
Outdated
Show resolved
Hide resolved
src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java
Outdated
Show resolved
Hide resolved
| List<String> validationErrors = new ArrayList<>(); | ||
| List<SubjectNameStrategy> namingStrategies = getValidSubjectNameStrategies(namespace); | ||
| String subjectName = schema.getMetadata().getName(); | ||
| boolean isValid = SchemaSubjectNameValidator.validateSubjectName( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pass the whole schema object here instead of these 3 parameters schema.getMetadata().getName(), schema.getSpec().getSchema(), schema.getSpec().getSchemaType(). It'll make it clearer.
Also, we use to define validation functions in services, so validateSubjectName can just be a function of the SchemaService.
| public static boolean validateSubjectNameWithStrategy( | ||
| String subjectName, SubjectNameStrategy strategy, String schemaContent, Schema.SchemaType schemaType) { | ||
| // https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/subject | ||
| switch (strategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this switch-case could be written using the return-switch pattern:
return switch (strategy) {
case TOPIC_NAME -> {
String topicName = extractTopicName(subjectName, strategy).orElse("");
yield subjectName.equals(topicName + "-key") || subjectName.equals(topicName + "-value");
}
case TOPIC_RECORD_NAME -> {
String topicName = extractTopicName(subjectName, strategy).orElse("");
Optional<String> recordName = extractRecordName(schemaContent, schemaType);
yield recordName.isPresent() && subjectName.equals(topicName + "-" + recordName.get());
}
case RECORD_NAME -> {
Optional<String> recordName = extractRecordName(schemaContent, schemaType);
yield recordName.isPresent() && subjectName.equals(recordName.get());
}
};Also, feel free to pass the whole schema in parameter to avoid having so many parameters.
src/main/java/com/michelin/ns4kafka/validation/SchemaSubjectNameValidator.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| try { | ||
| switch (schemaType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IDE is yelling that a single condition switch-case should be replaced with an if, so let's make it happy.
extractAvroRecordName function can be replaced with new AvroSchema(schemaContent).name() which return namespace + name for us 😅.
So overall, extracting the record name from the schema can just be:
if (schemaContent == null || schemaContent.trim().isEmpty()) {
return Optional.empty();
}
if (schemaType == Schema.SchemaType.AVRO) {
return Optional.of(new AvroSchema(schemaContent).name());
}
return Optional.empty();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it throws an exception when the references aren't resolved which break existing tests and current behavior.
Also it would throw when parsing union of reference :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hence, we hesitate between setting a constant value for the record name in the case of union references that do not have namespace/name, and letting the namespace/name parsing. In either case, we need to use the json parser. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be a big issue. All functions are available in the SchemaService to build a new AvroSchema even with some references:
getSchemaReferences(schema, namespace)
.map(schemaRefs -> new AvroSchema(
schema.getSpec().getSchema(),
getReferences(schema),
schemaRefs,
null
)
.name())- Call the
getSchemaReferences()function. The schema and namespace are needed - Build a
new AvroSchema(...)with the references
As getSchemaReferences is asynchronous, extractRecordName should now return a Mono<String> as well as all calling methods
| public final class SchemaSubjectNameValidator { | ||
| private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||
|
|
||
| private SchemaSubjectNameValidator() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the @NoArgsConstructor(access = AccessLevel.PRIVATE) convention in this project instead
|
|
||
| public List<SubjectNameStrategy> getValidSubjectNameStrategies() { | ||
| ResourceValidator.Validator namingStrategies = | ||
| getValidationConstraints().get(VALUE_SUBJECT_NAME_STRATEGY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If the given subject name ends with
-keywe should verify it against theKEY_SUBJECT_NAME_STRATEGY. - If the given subject name is
RecordNameorTopicRecordName, we should verify it against bothKEY_SUBJECT_NAME_STRATEGYorVALUE_SUBJECT_NAME_STRATEGY.
Currently, if a namespace has the following topic validation rules:
VALUE_SUBJECT_NAME_STRATEGYauthorized forTopicNameKEY_SUBJECT_NAME_STRATEGYauthorized forRecordName
And a schema following theRecordNamestrategy is being deployed, the user will be denied even though they're authorized to deploy that schema, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's my review, feel free to challenge :)
src/main/java/com/michelin/ns4kafka/model/schema/SubjectNameStrategy.java
Outdated
Show resolved
Hide resolved
src/main/java/com/michelin/ns4kafka/model/schema/SubjectNameStrategy.java
Show resolved
Hide resolved
src/main/java/com/michelin/ns4kafka/model/schema/SubjectNameStrategy.java
Outdated
Show resolved
Hide resolved
src/main/java/com/michelin/ns4kafka/validation/SchemaSubjectNameValidator.java
Outdated
Show resolved
Hide resolved
| * @param strategy The naming strategy | ||
| * @return The topic name if it can be determined | ||
| */ | ||
| public static Optional<String> extractTopicName(String subjectName, SubjectNameStrategy strategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the feedback on validateSubjectNameWithStrategy(), this method would be used only once, you might consider putting its content in extractTopicName(String subjectName, List<SubjectNameStrategy> strategies)
src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java
Outdated
Show resolved
Hide resolved
| class SchemaSubjectNameValidatorTest { | ||
|
|
||
| @Test | ||
| void testValidateSubjectName_TopicNameStrategy_Valid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you follow the naming convention shouldActLikeThisWhenThatCondition() or shouldActLikeThisIfThatCondition() or shouldActLikeThisForThatConstraint() for test methods?
I suggest shouldValidateSubjectForTopicNameStrategy() and shouldNotValidateSubjectForTopicNameStrategy()
| subject, List.of(SubjectNameStrategy.RECORD_NAME), schemaContent, Schema.SchemaType.AVRO); | ||
| assertTrue(result); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to add an "OK" test with multiple strategies and a "KO" test with multiple strategies
| boolean result = SchemaSubjectNameValidator.validateSubjectName( | ||
| subject, List.of(SubjectNameStrategy.TOPIC_RECORD_NAME), schemaContent, Schema.SchemaType.AVRO); | ||
| assertTrue(result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For TopicRecordName, might be good to test:
- KO when the subject name has no
- - OK when the subject name has multiple
-
In addition to the simple cases: - OK when the subject name has one
- - KO when the subject name does not match the schema namespace / name
| @@ -0,0 +1,160 @@ | |||
| /* | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the content of this file should be in a "Validator" class, it makes more sense in the "SchemaService". What do you think?
missing doc: michelin#581 (comment) useless constraints: michelin#581 (comment)
missing doc: michelin#581 (comment) useless constraints: michelin#581 (comment)
| TOPIC_RECORD_NAME("io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"), | ||
| RECORD_NAME("io.confluent.kafka.serializers.subject.RecordNameStrategy"); | ||
|
|
||
| private final String STRATEGY_PREFIX = "io.confluent.kafka.serializers.subject."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static is missing: private static final
| * @return The format for subject value (i.e. the SchemaResource metadata name) according to subject name strategy | ||
| */ | ||
| public String toExpectedFormat() { | ||
| switch (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use return-switch here like:
return switch (this) {
case TOPIC_NAME -> "{topic}-{key|value}";
case TOPIC_RECORD_NAME -> "{topic}-{recordName}";
case RECORD_NAME -> "{recordName}";
};| */ | ||
| @AllArgsConstructor | ||
| public class ValidSubjectNameStrategies { | ||
| public List<SubjectNameStrategy> validValueStrategies; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make both attributes private and define a @Getter on the class
| */ | ||
| public static boolean validateSubjectName(ValidSubjectNameStrategies validStrategies, Schema schema) { | ||
| if (schema.getMetadata().getName().endsWith("-key")) { | ||
| return validStrategies.validKeyStrategies.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use getter here, based on my comment above: validStrategies.getValidKeyStrategies()
| if (subjectName == null || subjectName.trim().isEmpty()) { | ||
| return false; | ||
| } | ||
| switch (strategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return-switch here like:
return switch (strategy) {
case TOPIC_NAME -> subjectName.endsWith("-key") || subjectName.endsWith("-value");
case TOPIC_RECORD_NAME -> {
Optional<String> recordName = extractRecordName(schemaContent, schemaType);
yield recordName.isPresent()
&& (recordName.get() == UNION_AVRO_RECORD_CLASS_NAME
|| subjectName.endsWith("-" + recordName.get()));
}
case RECORD_NAME -> {
Optional<String> recordNameOnly = extractRecordName(schemaContent, schemaType);
yield recordNameOnly.isPresent() && subjectName.equals(recordNameOnly.get());
}
};| } | ||
|
|
||
| try { | ||
| switch (schemaType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be a big issue. All functions are available in the SchemaService to build a new AvroSchema even with some references:
getSchemaReferences(schema, namespace)
.map(schemaRefs -> new AvroSchema(
schema.getSpec().getSchema(),
getReferences(schema),
schemaRefs,
null
)
.name())- Call the
getSchemaReferences()function. The schema and namespace are needed - Build a
new AvroSchema(...)with the references
As getSchemaReferences is asynchronous, extractRecordName should now return a Mono<String> as well as all calling methods
… null Strategy is set as optional, so it can be null
Co-authored-by: monitorpattern <[email protected]>
missing doc: michelin#581 (comment) useless constraints: michelin#581 (comment)
missing doc: michelin#581 (comment) useless constraints: michelin#581 (comment)
460d310 to
afbadec
Compare
Summary
This pull request implements support for configurable schema subject naming strategies in NS4Kafka, allowing namespaces to enforce different schema naming conventions beyond the default TopicNameStrategy.
Changes Made
Core Implementation
New enum
SubjectNameStrategy: Defines the three main Confluent schema naming strategies:TOPIC_NAME(default): {topic}-key/{topic}-value formatTOPIC_RECORD_NAME: {topic}-{recordName} formatRECORD_NAME: {recordName} formatEnhanced validation framework: Added
SchemaSubjectNameValidatorwith comprehensive validation logic for all naming strategies, including:Integration Points
value.subject.name.strategyconfiguration supportFiles Modified
SubjectNameStrategy.java(new): Core enum with strategy definitions and format helpersSchemaSubjectNameValidator.java(new): Validation logic for all naming strategiesTopicValidator.java: Added naming strategy configuration validationSchemaService.java: Integrated new validation logicFormatErrorUtils.java: Enhanced error messages for naming strategy violationsTopic.java: Added subject name strategy configuration supportSchemaSubjectNameValidatorTest.javaBenefits
Testing
This implementation provides the foundation for flexible schema management while maintaining full backward compatibility with existing deployments.
Co-authors
Co-authored-by: monitorpattern [email protected]