Skip to content

Can't create a connector even if its loaded in Strimzi #730

Open
@victorcasignia

Description

@victorcasignia

Deployed a Kafka cluster using strimzi on the cloud.

Used this KafkaConnect config to get the elasticsearch plugin files.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: es-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.6.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build: 
    output: 
      type: docker
      image: registry/elasticsearch-kafka-connect:latest
      pushSecret: es-connect-secret
    plugins: 
      - name: elasticsearch-connector
        artifacts:
            - type: maven
              repository: https://packages.confluent.io/maven
              group: io.confluent
              artifact: kafka-connect-elasticsearch
              version: 14.0.3
  template:
    pod:
      imagePullSecrets:
        - name: es-connect-secret

The created connect clusters contains the jar files needed for elasticsearch connect. I can also see the plugin loaded using the REST API.

[kafka@es-connect-cluster-connect-0 kafka]$ curl localhost:8083/connector-plugins
[{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"14.0.3"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector","type":"sink","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector","type":"sink","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.ErrantRecordSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.integration.MonitorableSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.runtime.TestSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.tools.MockSinkConnector","type":"sink","version":"3.6.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSourceConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$ConfigBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$InitializeBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSourceConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$ValidateBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.MonitorableSourceConnector","type":"source","version":"an entirely different version"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.runtime.TestSourceConnector","type":"source","version":"an entirely different version"},{"class":"org.apache.kafka.connect.runtime.WorkerTest$WorkerTestConnector","type":"source","version":"1.0"},{"class":"org.apache.kafka.connect.runtime.WorkerWithTopicCreationTest$WorkerTestConnector","type":"source","version":"1.0"},{"class":"org.apache.kafka.connect.tools.MockSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.SchemaSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.VerifiableSinkConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.VerifiableSourceConnector","type":"source","version":"3.6.0"}]

But whenever I create the Kafka Connector class. I get this error:

org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='3.6.0', encodedVersion=3.6.0, type=sourcHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='3.6.0', encodedVersion=3.6.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='3.6.0', encodedVersion=3.6.0, type=source, typeName='source', location='classpath'}

Using this connector config:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: es-connector
  labels:
    strimzi.io/cluster: es-connect-cluster
spec:
  class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  config:
    topics: topic_here
    connection.url: elastic_host_here
    connection.username: elastic
    connection.password: password_here

I have also tried manually creating the connector using the REST API but I still get the same error.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions