MySQL / MSSQL : numeric.mapping doesn't work for DECIMAL fields
rmoff opened this issue · 16 comments
I can't get numeric.mapping
to work with MySQL and Confluent Platform 5.1. Steps to reproduce below.
Create MySQL table:
use demo;
create table transactions (
txn_id INT,
customer_id INT,
amount DECIMAL(5,2),
currency VARCHAR(50),
txn_timestamp VARCHAR(50)
);
insert into transactions (txn_id, customer_id, amount, currency, txn_timestamp) values (3, 2, 17.13, 'EUR', '2018-04-30T21:30:39Z');
Inspect table:
mysql> describe transactions;
+---------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| txn_id | int(11) | YES | | NULL | |
| customer_id | int(11) | YES | | NULL | |
| amount | decimal(5,2) | YES | | NULL | |
| currency | varchar(50) | YES | | NULL | |
| txn_timestamp | varchar(50) | YES | | NULL | |
+---------------+--------------+------+-----+---------+-------+
5 rows in set (0.00 sec)
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_12a",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-12a-",
"numeric.mapping": "best_fit",
"table.whitelist" : "demo.transactions",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Even though "numeric.mapping": "best_fit"
, Kafka Connect stores the DECIMAL(5,2)
as a Decimal
, serialised to bytes in Avro:
$ curl -s "http://localhost:8081/subjects/mysql-12a-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")'
{
"name": "amount",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Connect Worker log excerpt:
INFO Kafka version : 2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
…
INFO JdbcSourceTaskConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = [hidden]
connection.url = jdbc:mysql://mysql:3306/demo
connection.user = connect_user
dialect.name =
incrementing.column.name =
mode = bulk
numeric.mapping = best_fit
numeric.precision.mapping = false
poll.interval.ms = 3600000
query =
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = [demo.transactions]
tables = [`demo`.`transactions`]
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = mysql-12a-
validate.non.null = true
(io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)
…
DEBUG Checking for next block of results from BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
DEBUG BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} prepared SQL query: SELECT * FROM `demo`.`transactions` (io.confluent.connect.jdbc.source.BulkTableQuerier)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG Returning 100 records for BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
…
kafka-connect_1_8eb73e80dda1 | [2019-01-07 13:37:50,920] DEBUG Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"transactions\",\"fields\":[{\"name\":\"txn_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"txn_timestamp\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"transactions\"}"} to http://schema-registry:8081/subjects/mysql-12a-transactions-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService)
I've tried this with three different settings, each still results in the amount
field serialised to bytes in Avro:
"numeric.mapping": "best_fit"
"numeric.mapping": "precision_only"
"numeric.precision.mapping": true
Per docs I am expecting to see the decimal(5,2)
serialised to Avro FLOAT64
(I think - but at least, not bytes
)
does numeric.mapping
only apply to NUMERIC
types, not DECIMAL
?
https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
In MySQL, NUMERIC is implemented as DECIMAL
So the following DDL:
CREATE TABLE NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL
);
Creates a table like this - note that AMOUNT_02
whilst declared as NUMERIC
is created as a DECIMAL
:
mysql> DESCRIBE NUM_TEST;
+-------------+---------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+---------------+------+-----+---------+-------+
| TXN_ID | int(11) | YES | | NULL | |
| CUSTOMER_ID | int(11) | YES | | NULL | |
| AMOUNT_01 | decimal(5,2) | YES | | NULL | |
| AMOUNT_02 | decimal(5,2) | YES | | NULL | |
| AMOUNT_03 | decimal(5,0) | YES | | NULL | |
| AMOUNT_04 | decimal(10,0) | YES | | NULL | |
+-------------+---------------+------+-----+---------+-------+
6 rows in set (0.01 sec)
(MySQL Server version: 8.0.13)
Contrast to Postgres:
CREATE TABLE NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL
);
All columns are stored as NUMERIC
:
demo=# \d num_test
Table "public.num_test"
Column | Type | Collation | Nullable | Default
-------------+--------------+-----------+----------+---------
txn_id | integer | | |
customer_id | integer | | |
amount_01 | numeric(5,2) | | |
amount_02 | numeric(5,2) | | |
amount_03 | numeric(5,0) | | |
amount_04 | numeric | | |
col1 | col2 | col3 | col4 | |
---|---|---|---|---|
Postgres column definition | DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
Source data in Postgres | 100.01 |
100.02 |
100 |
100 |
numeric.mapping = none (same as leaving it unset) |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
numeric.mapping = best_fit |
Double 100.01 |
Double 100.02 |
Int 100 |
Int 100 |
numeric.mapping = precision_only |
Bytes '\u0011 |
Bytes Øî |
Int 100 |
Int 100 |
(Postgres 11.1)
Postgres notes: https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9
MS SQL notes : https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9#testing-numericmapping-in-ms-sql-server-2017
col1 | col2 | col3 | col4 | |
---|---|---|---|---|
MSSQL column definition | DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
MSSQL created column | decimal length 5 precision 5 scale 2 |
numeric length 5 precision 5 scale 2 |
decimal length 5 precision 5 scale 0 |
decimal length 9 precision 18 scale 0 |
Source data in MSSQL | 100.01 |
100.02 |
100 |
100 |
numeric.mapping = none (same as leaving it unset) |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
numeric.mapping = best_fit |
Bytes '\u0011 |
Double 100.02 |
Bytes d |
Bytes d |
numeric.mapping = best_fit ( query used to CAST all DECIMAL fields to NUMERIC ) |
Double 100.01 |
Double 100.02 |
Int 100 |
Int 100 |
numeric.mapping = precision_only |
Bytes '\u0011 |
Bytes Øî |
Int 100 |
Int 100 |
The same problem exists with DECIMAL
fields being ignored. Since MS SQL accepts both DECIMAL
and NUMERIC
as native data types, use NUMERIC
for Kafka Connect to correctly ingest the values when using numeric.precision=best_fit
. If changing the source schema isn't an option then you can use query
mode, demonstrated here.
I am facing a problem with MySql and decimal data types. The values end up as corrupt strings in the Kafka topic. Without using schemas the values look like this when listing with console-consumer:
"revenue":"AfQ="
I tried if registering an Avro schema would help. I made the type of this revenue
field to be float
in the schema and created a JDBC source connector to fill the topic. But this connector fails with following
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
...
{\"name\":\"revenue\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}}
...
Seems like it tries to register a new schema that is incompatible with my previously created schema. It tries to use type bytes
for this revenue field (and for other decimal fields).
My table in MySQL looks like this:
mysql> describe v_game_transaction;
+-------------------+---------------+------+-----+---------------------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+---------------+------+-----+---------------------+-------+
| id | bigint(20) | NO | | 0 | |
| revenue | decimal(10,2) | NO | | NULL | |
...
Is there some way to work around this issue now?
DECIMAL
isn't supported for numeric.mapping
. There isn't a way to work around this that I'm aware of. The data isn't "corrupt", it's just a BigDecimal.
For more details see https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector#bytes-decimals-numerics
I tried to work around this issue by using a SMT cast. I changed the type of the column to varchar in the DB view i'm using here, and then casting it with
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "revenue:float64"
But now the connector fails with: [{"state":"FAILED","trace":"org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.sql.Timestamp for field: \"started\
Adding this cast transform causes it to fail with datetime fields! Found this issue relating to cast transforms
My second attempt was to fix the the connector code:
I changed that line to include both DECIMAL and NUMERIC
case Types.DECIMAL:
case Types.NUMERIC: {
if (mapNumerics == NumericMapping.PRECISION_ONLY) {
.....
Using this hacked-up connector it produces a bit different kind of bytes data in the topic. But seems like that is not the correct way to fix the issue :-)
@anssip Here is a small code snippet that can help you in getting the data back in correct form (written in scala; you can change it to Java if you want). Essentially, it is not corrupt data it is just base64 encoded string of "unscaled" value of BigDecimal. Kafka Connect converts NUMERIC type having precision and scale to BigDecimal internally (and timestamps to long/epoch) when using AVRO (since its essentially a JSON.
As you already have schema available with you just get the precision and scale from there and pass it along while recreating a BigDecimal back and once you have the final BigDecimal you can get longValue or intValue or doubleValue from it.
Hope it helps !
//a bigdecimal with precision 4 and scale 3
val bd = new BigDecimal("1.234")
println(bd)
println(bd.precision()) //prints 4
println(bd.scale) //prints 3
val encoded = Base64.getEncoder.encodeToString(bd.unscaledValue().toByteArray())
println(encoded) // prints "BNI="
val decoded = Base64.getDecoder.decode(encoded)
val bi = new BigInteger(decoded)
println(bi) //prints 1234
val bd2 = new BigDecimal(bi, bd.scale)
println(bd2) //prints 1.234
Thanks, @aliasbadwolf for that tip. I am actually able to convert it to a valid number. I am now doing it with JavaScript as I'm doing the stream processing in Node.js
But my goal here was to streamline my data pipeline and not use any stream processing at all. I'd like to stream the data directly to Elasticsearch without doing any processing (and number conversion). Just one JDBC source connector pushing the data into a topic and from there one sink to push it to Elasticsearch.
Is there anything being done about this issue? I'm working with a large existing Oracle database, where the primary keys are all declared as NUMBER
(no precision/scale). As documented by @rmoff in https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9#file-oracle-md, these end up as bytes no matter what numeric.mapping
is set to.
While specifying a custom query
is a workaround, it's an absolute pain since there are a few hundred tables. SMTs do seem to work in limited testing.
Hi Guys,
Is there any workaround on the above issues
Hi there guys!
I'm actualli working with a JDBC connector between Oracle and KafkaConnect.
I have some NUMBER(38,0) fields in my database table.
I'm actually having the same error:
- java.lang.IllegalArgumentException: Invalid decimal scale: 127 (greater than precision: 64)
It's some fix for this or a good workaround?
Or it's pushpavanthar PR (#725) adding and working in the last versions?
Thanks a lot!
Hi guys
Is there any update on this issue?
Is any way to take decimal type values from MySQL using JDBC source connector!!
in the above he said like that is solved and will fix this issue,is this fixed!!!
Hi,
I have the same question regarding MySQL and it seems not fixed yet.
I think the PR by pushpavanthar is not merged yet. :-(
I'm working with a large existing Oracle database, where the primary keys are all declared as
NUMBER
(no precision/scale). As documented by @rmoff in https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9#file-oracle-md, these end up as bytes no matter whatnumeric.mapping
is set to.
Mid-2022: NUMBER
with missing precision scale will not be transformed, even using best_fit_eager_double
.
As mentioned by @kinghuang, SMTs work in limited fashion, e.g.
transforms.AdjustPrecision.type=com.github.jcustenborder.kafka.connect.transform.common.AdjustPrecisionAndScale$Value
transforms.AdjustPrecision.precision.value=38
transforms.AdjustPrecision.scale.value=10
transforms.AdjustPrecision.precision.mode=undefined
transforms.AdjustPrecision.scale.mode=undefined
but even then this seems to get applied in parallel or after numeric.mapping
, and so the resulting schema still ends up with decimal types!
I think that especially best_fit_eager_double
should also apply if scale/precision are missing.
numeric.mapping best_fit_eager_double works for numeric(20,2) (without precision it doesn't work) but till 7 digits eg. 1234567.11 but beyond this 12345678.11 will store something like this 1.234567811E7