AbsaOSS/ABRiS

Can't use the from_avro_abris_config function --> TypeError: 'JavaPackage' object is not callable

Closed this issue · 3 comments

AkilG commented

Hello,

i take the Code from the https://github.com/AbsaOSS/ABRiS/blob/master/documentation/python-documentation.md and add it to my Code. The Problem is, when i try this code: from_avro_abris_settings = from_avro_abris_config({'schema.registry.url': 'http://localhost:8081'}, 'topic_name', False) with my topic name, i get the error that the JavaPackage object is not callable. I use pyspark 3.0.0 in Pycharm. I would appreciate it, when you have any clue for my problem.

Best Regards,
Akil

Hi @AkilG I'm not well-versed with Pyspark, but from the error message, it looks like a dependency is missing. Did you forget to add Abris as an additional jar in pyspark?

AkilG commented

Yes i solved it, but now i have this error message:

Traceback (most recent call last):
  File "/home/student/GIT/ameise-data-handling/Code/Kafka_Stream/Consumer_ABRIS.py", line 80, in <module>
    from_avro_abris_settings = from_avro_abris_config({'schema.registry.url': registry_url}, conf['kafka']['topic'], False)
  File "/home/student/GIT/ameise-data-handling/Code/Kafka_Stream/Consumer_ABRIS.py", line 36, in from_avro_abris_config
    return jvm_gateway.za.co.absa.abris.config \
  File "/home/student/ameise-data-handling/Code/Kafka_Stream/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/home/student/ameise-data-handling/Code/Kafka_Stream/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/student/ameise-data-handling/Code/Kafka_Stream/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o48.usingSchemaRegistry.
: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unrecognized field "type" (class io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage), not marked as ignorable (2 known properties: "error_code", "message"])
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 141] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage["type"]); error code: 50005
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
	at za.co.absa.abris.avro.registry.ConfluentRegistryClient.getLatestSchemaMetadata(ConfluentRegistryClient.scala:39)
	at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaMetadataBySubjectAndVersion(SchemaManager.scala:66)
	at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaBySubjectAndVersion(SchemaManager.scala:55)
	at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchema(SchemaManager.scala:46)
	at za.co.absa.abris.config.FromSchemaDownloadingConfigFragment.usingSchemaRegistry(Config.scala:250)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)

I am using following Versions: pyspark 3.1.2, abris_2.12:5.0.0 and confluent schema Registry 5.3.1.

As I understand the Error it happened during a parsing of another error from json. For reason that type was not expected property of ErrorMessage .

Can you double check that the schema registry is really running on http://localhost:8081 and that schema.registry.url property is set correctly?