Skip to content

BE: SR: Add compatibility w/ GCP SR #1153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

magnusdriver
Copy link

@magnusdriver magnusdriver commented Jun 26, 2025

What changes did you make? (Give an overview)

Added new cluster property "schemaRegistryAuth.bearerAuthCustomProviderClass" to enable compatibility with new GCP Schema Registries.
This property can be used in config.yaml file like this:

kafka:
  clusters:
    - name: local  # Unique name identifier for the Kafka cluster
       bootstrap-servers: kafka1:9092,kafka2:9092  # List of Kafka broker addresses

       schemaRegistry: https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
       schema-registry-auth:
         bearer-auth-custom-provider-class: com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider

Or can be enabled with the env variable: KAFKA_CLUSTERS_0_SCHEMA_REGISTRY_AUTH_BEARER_AUTH_CUSTOM_PROVIDER_CLASS="com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"

or KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_BEARERAUTHCUSTOMPROVIDERCLASS is also valid.

Important changes in code are:

  • Add bearer token generation for auth with GCP Schema Registry in WebClientConfigurator
  • Add custom bearer token provider in SchemaRegistrySerde
  • Add the new bearerAuthCustomProviderClass config property in kafbat-ui-api
  • Add new schema fields kafka-sr-api for compatibility level as GCP Schema Registry return different format.
  • Add condition to use compatibility or compatiblityLevel field based on bearerAuthCustomProviderClass
    property in SchemaRegistryService

It's needed to use gcloud auth application-default login to get the credentials to connect to GCP Schema
Registries or run kafka-ui in a compute engine instance or GKE with a service account with the needed permissions.

Note: This functionality only works now with Avro schema types as these are the ones I work with.
It could work with Protobuf schemas, but not with JSON ones as GCP Schema Registries don't
support them.

How Has This Been Tested? (put an "x" (case-sensitive!) next to an item)

  • Manually (please, describe, if necessary)

    Changes tested connecting to GCP and Confluent Schema Registries to check:

    • All Schemas registered are shown in Schema Registry dashboard
    • Topic events are deserialized correctly in the topics dashboards.

    It was impossible to pass unit tests even before adding any change :'(

Checklist (put an "x" (case-sensitive!) next to all the items, otherwise the build will fail)

  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation (e.g. ENVIRONMENT VARIABLES)
  • My changes generate no new warnings (e.g. Sonar is happy)
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

Check out Contributing and Code of Conduct

@magnusdriver magnusdriver requested a review from a team as a code owner June 26, 2025 15:52
@kapybro kapybro bot added status/triage Issues pending maintainers triage status/triage/manual Manual triage in progress status/triage/completed Automatic triage completed and removed status/triage Issues pending maintainers triage labels Jun 26, 2025
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi magnusdriver! 👋

Welcome, and thank you for opening your first PR in the repo!

Please wait for triaging by our maintainers.

Please take a look at our contributing guide.

@magnusdriver magnusdriver changed the title Add all changes to connect with GCP Schema Registries Add compatibility with GCP Schema Registries Jun 26, 2025
Copy link
Member

@germanosin germanosin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution! I believe it would be more effective to implement this in a more generic manner by exposing the BEARER_SCOPE and BEARER_AUTH_CUSTOM_PROVIDER_CLASS parameters through SchemaRegistryAuth. Don't you mind to change it?

@magnusdriver
Copy link
Author

I finally exposed the BEARER_AUTH_CUSTOM_PROVIDER_CLASS parameter. I didn't need the bearer scope for this use case. If you think it's still interesting to expose that parameter I'll add it. Maybe pass it to WebClientConfigurator.configureBearerTokenAuth as parameter for future use cases?

@magnusdriver magnusdriver requested a review from germanosin July 8, 2025 10:16
Copy link
Member

@Haarolean Haarolean left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few inline comments. Once we decide on a higher level of abstractions, we can dive deep into reviewing the concrete implementations. Right now, it's a nailed-down solution not taking future maintainability (or possible expansion towards other implementations) into account.

@@ -381,8 +381,13 @@ components:
properties:
compatibilityLevel:
$ref: '#/components/schemas/Compatibility'
required:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has compatibility become no longer required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to change it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally added a new compatibilityConfigGcp schema detached from the existent compatibilityConfig. And I had to modify code in SchemaRegistryService class.

If this new modification is not ok then please let me know if you have any recommendation about how to address this as the only alternative I can think of is to add a new kafka-gcp-sr-api and a new GcpSchemaRegistryService class.

@@ -166,6 +178,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
keyStorePassword);
}

if (bearerAuthCustomProviderClass != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called a class? The implementation rather retrieves a token via custom implementation from GCP which is later used as a bearer token value, there are no "classes" in use per se.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this to reference this class. And I changed the bearer token implementation for the WebClient using this class too. Sorry 🙏 .

@Haarolean Haarolean changed the title Add compatibility with GCP Schema Registries BE: SR: Add compatibility w/ GCP SR Jul 8, 2025
@Haarolean Haarolean added type/enhancement En enhancement/improvement to an already existing feature scope/backend Related to backend changes area/sr Schema Registry and removed status/triage/manual Manual triage in progress labels Jul 8, 2025
@magnusdriver magnusdriver requested a review from Haarolean July 9, 2025 16:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sr Schema Registry scope/backend Related to backend changes status/triage/completed Automatic triage completed type/enhancement En enhancement/improvement to an already existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants