empty mutation messages
tomconnors opened this issue · 9 comments
I'm seeing a problem where inserts, updates, and deletes are all picked up by mypipe, but the VALUES and WHERE clauses are empty. For instance, if I start mypipe with
sbt "project runner" "runMain mypipe.runner.PipeRunner"
then do a few inserts in mysql, I'll see this output:
INSERT INTO db.table_name () VALUES ()
This is my config:
mypipe {
# consumers represent sources for mysql binary logs
consumers {
database1 {
# database "host:port:user:pass" array
source = "192.168.50.19:3306:repl:analytics"
}
}
# data producers export data out (stdout, other stores, external services, etc.)
producers {
stdout {
class = "mypipe.producer.stdout.StdoutProducer"
}
kafka-generic {
class = "mypipe.producer.KafkaMutationGenericAvroProducer"
}
}
# pipes join consumers and producers
pipes {
stdout {
consumers = ["database1"]
producer {
stdout {}
}
}
kafka-generic {
enabled = true
consumers = ["database1"]
producer {
kafka-generic {
metadata-brokers = "localhost:9092"
}
}
}
}
}
include "application.overrides"
And here's my java version:
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) Client VM (build 25.45-b02, mixed mode)
Any idea what I'm doing wrong?
Can you include the queries you're running and a minimal schema for the table they're operating on, like #16 did, to verify this isn't the same issue as reported there?
@ralph-tice I think it's completely different because in #16 we see empty transactions and mypipe fails to commit them successfully (there is an explicit check for that), while here there is just little information available about the query (but query itself is here)
I meant that I think more information to reproduce is necessary, and that other issue has what I think is missing from this issue in their reproduction steps, not that this issue is a duplicate.
I had a very similar config when I was testing earlier this week and didn't have the same problem as above, so it must be an issue of schema or the queries being issued -- or maybe mysql config.
I tested some more yesterday and UPDATE statements weren't pulling in their WHERE clauses but I still get INSERTs fine.
Sorry I haven't commented back about this yet! My project ended up going another direction but I'll try to get some time soon to get you a minimal reproducible example.
@tomconnors np, let us know when you have those examples (=
latest master of mypipe, ZK 3.4.6, kafka 0.8.2, mysql 5.6.25
SQL issued:
mysql> create table foo (id int);
Query OK, 0 rows affected (0.02 sec)
mysql> insert into foo values(1);
Query OK, 1 row affected (0.00 sec)
# mypipe stdout: INSERT INTO test.foo (id) VALUES (1)
# GenericConsoleConsumer:
{"database": "test", "table": "foo", "tableId": 73, "txid": [32, -38, -107, -15, 48, 36, 17, -27, -92, 78, -66, 112, -75, -61, -4, -113], "integers": {"id": 1}, "strings": {}, "longs": {}}
mysql> update foo set id = 2 where id = 1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
# mypipe stdout: UPDATE test.foo SET (id=2) Some(WHERE ())
# GenericConsoleConsumer:
{"database": "test", "table": "foo", "tableId": 73, "txid": [56, -105, 23, -29, 48, 36, 17, -27, -92, 78, -66, 112, -75, -61, -4, -113], "old_integers": {"id": 1}, "old_strings": {}, "old_longs": {}, "new_integers": {"id": 2}, "new_strings": {}, "new_longs": {}}
mysql> delete from foo where id = 2;
Query OK, 1 row affected (0.00 sec)
# mypipe stdout: DELETE FROM test.foo WHERE ()
# GenericConsoleConsumer:
{"database": "test", "table": "foo", "tableId": 73, "txid": [92, 35, -28, -107, 48, 36, 17, -27, -92, 78, -66, 112, -75, -61, -4, -113], "integers": {"id": 2}, "strings": {}, "longs": {}}
So there's a couple display bugs for the stdout pipe on delete and update, but I'm not sure what's supposed to indicate that the generic delete entity is a delete and not an insert... The update avro looks pretty OK to me though.
here is my application.conf also:
mypipe {
# consumers represent sources for mysql binary logs
consumers {
database1 {
# database "host:port:user:pass" array
source = "localhost:3306:root:foobar"
}
}
# data producers export data out (stdout, other stores, external services, etc.)
producers {
stdout {
class = "mypipe.producer.stdout.StdoutProducer"
}
kafka-generic {
class = "mypipe.producer.KafkaMutationGenericAvroProducer"
}
}
# pipes join consumers and producers
pipes {
stdout {
consumers = ["database1"]
producer {
stdout {}
}
}
kafka-generic {
enabled = true
consumers = ["database1"]
producer {
kafka-generic {
metadata-brokers = "boot2docker:9092"
}
}
}
}
}
@ralph-tice In 64e452a I've improved on primary key retrieval which makes your issue better. That being said, there's not much to do about what I mention in the commit re: the standard out producer. The Avro output also contains 3 identical objects; not much to do there.
I'm still interested in seeing how to reproduce @tomconnors original issue with blank column names and values.
@tomconnors @ralph-tice guys, I'm closing this for now until we have a way to reproduce. Thanks for the effort so far (=