DarioBalinzo/kafka-connect-elasticsearch-source

No mapping found for [] in order to sort on

karayel opened this issue · 1 comments

Hı,

When I tried to run the following config, I am getting exception over and over again

{
    "name": "elastic-source-10",
    "config": {
        "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
        "tasks.max": "1",
        "es.host": "elasticsearch",
        "es.port": "9200",
        "index.prefix": "index-1",
        "topic.prefix": "elastic-"
    }
}

Here is the exception stack trace

2021-06-21 21:39:34,699 INFO   ||  Initializing: org.apache.kafka.connect.runtime.TransformationChain{}   [org.apache.kafka.connect.runtime.Worker]
2021-06-21 21:39:34,699 INFO   ||  ProducerConfig values:
	acks = all
	batch.size = 16384
	bootstrap.servers = [kafka:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = connector-producer-elastic-source-10-0
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 2147483647
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 9223372036854775807
	max.in.flight.requests.per.connection = 1
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 2147483647
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
   [org.apache.kafka.clients.producer.ProducerConfig]
2021-06-21 21:39:34,703 INFO   ||  Kafka version: 2.4.0   [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,703 INFO   ||  Kafka commitId: 77a89fcf8d7fa018   [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,703 INFO   ||  Kafka startTimeMs: 1624311574703   [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,717 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-06-21 21:39:34,717 INFO   ||  ElasticSourceTaskConfig values:
	batch.max.rows = 10000
	connection.attempts = 3
	connection.backoff.ms = 10000
	es.host = elasticsearch
	es.indices = index-1
	es.password = null
	es.port = 9200
	es.scheme = http
	es.user = null
	fieldname_converter = avro
	filters.json_cast = null
	filters.whitelist = null
	incrementing.field.name =
	index.prefix = index-1
	mode =
	poll.interval.ms = 5000
	topic.prefix = elastic-
   [com.github.dariobalinzo.task.ElasticSourceTaskConfig]
2021-06-21 21:39:34,717 INFO   ||  elastic auth disabled   [com.github.dariobalinzo.elastic.ElasticConnection]
2021-06-21 21:39:34,719 INFO   ||  WorkerSourceTask{id=elastic-source-10-0} Source task finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-06-21 21:39:34,719 INFO   ||  fetching from index-1   [com.github.dariobalinzo.task.ElasticSourceTask]
2021-06-21 21:39:34,818 INFO   ||  [Producer clientId=connector-producer-elastic-source-10-0] Cluster ID: v38SOhq8SU-h4JvAGoDgTg   [org.apache.kafka.clients.Metadata]
2021-06-21 21:39:34,882 INFO   ||  found last value null   [com.github.dariobalinzo.task.ElasticSourceTask]
2021-06-21 21:39:34,887 ERROR  ||  error   [com.github.dariobalinzo.task.ElasticSourceTask]
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:618)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:594)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:501)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:474)
	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:391)
	at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:89)
	at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:72)
	at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:169)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://elasticsearch:9200], URI [/index-1/_search?typed_keys=true&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"vwbDQ_nlQYqwRxxqe-S9Fg","index":"index-1"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":"index-1","node":"jTlMT-4gS2Gu66gtHyuG9g","reason":{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"vwbDQ_nlQYqwRxxqe-S9Fg","index":"index-1"}}]},"status":400}
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:357)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		... 1 more

I also send the request with manually and there is no problem with related to it because I am getting response succesfully.
After that point, I checked the connector status and it is in RUNNING state but I think, it has to be in FAILED status.

Do you have any idea what could be cause the problem ?

Thanks 🖖

Thanks for the issue, in this case I will make sure that the connector changes state to FAILED.

However, the cause of the problem is that incrementing.field.name config is missing and it is mandatory.
The connector sorts the data according to this field in order to fetch new data.

Dario