Open
Description
Deployed a Kafka cluster using strimzi on the cloud.
Used this KafkaConnect config to get the elasticsearch plugin files.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: es-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.6.0
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: registry/elasticsearch-kafka-connect:latest
pushSecret: es-connect-secret
plugins:
- name: elasticsearch-connector
artifacts:
- type: maven
repository: https://packages.confluent.io/maven
group: io.confluent
artifact: kafka-connect-elasticsearch
version: 14.0.3
template:
pod:
imagePullSecrets:
- name: es-connect-secret
The created connect clusters contains the jar files needed for elasticsearch connect. I can also see the plugin loaded using the REST API.
[kafka@es-connect-cluster-connect-0 kafka]$ curl localhost:8083/connector-plugins
[{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"14.0.3"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector","type":"sink","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector","type":"sink","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.ErrantRecordSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.integration.MonitorableSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.runtime.TestSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.tools.MockSinkConnector","type":"sink","version":"3.6.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSourceConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$ConfigBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$InitializeBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSourceConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$ValidateBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.MonitorableSourceConnector","type":"source","version":"an entirely different version"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.runtime.TestSourceConnector","type":"source","version":"an entirely different version"},{"class":"org.apache.kafka.connect.runtime.WorkerTest$WorkerTestConnector","type":"source","version":"1.0"},{"class":"org.apache.kafka.connect.runtime.WorkerWithTopicCreationTest$WorkerTestConnector","type":"source","version":"1.0"},{"class":"org.apache.kafka.connect.tools.MockSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.SchemaSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.VerifiableSinkConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.VerifiableSourceConnector","type":"source","version":"3.6.0"}]
But whenever I create the Kafka Connector class. I get this error:
org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='3.6.0', encodedVersion=3.6.0, type=sourcHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='3.6.0', encodedVersion=3.6.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='3.6.0', encodedVersion=3.6.0, type=source, typeName='source', location='classpath'}
Using this connector config:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: es-connector
labels:
strimzi.io/cluster: es-connect-cluster
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
config:
topics: topic_here
connection.url: elastic_host_here
connection.username: elastic
connection.password: password_here
I have also tried manually creating the connector using the REST API but I still get the same error.
Metadata
Metadata
Assignees
Labels
No labels