mardambey/mypipe

stdout pipe shows nothing

Opened this issue · 9 comments

I have mysql setup with binlog and format = row
my application.conf:

mypipe {
# consumers represent sources for mysql binary logs
consumers {

database1  {
    # database "host:port:user:pass"
    source = "localhost:3306:root:root"
  }
}
producers {
  stdout {
     class = "mypipe.producer.stdout.StdoutProducer"
  }
}
pipes {

  # prints queries to standard out
  stdout {
    consumers = ["database1"]
    producer {
      stdout {}
    }
  }
}

}

all I see when i start mypipe is that is is connected, but it keep loosing the connection:

INFO: Trying to restore lost connection to localhost:3306
Feb 04, 2016 2:47:11 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1457, cid:96)

in my mysql i have a table users, but when updating it and adding new entries, i see nothing in the stdout

any one experiences this ?

some progress, it now gives me this

INFO: Trying to restore lost connection to localhost:3306
Feb 04, 2016 10:10:17 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at mysql-bin.000002/107 (sid:1457, cid:140)

Is this stock MySQL?
Also, reckon you can try turning up log levels and see if that helps reveal any additional information?
Also, can you push anything into Kafka or not?

kubum commented

First time trying this and run into the same issue. However, I am using MariaDB 10.0.21. By any chance, do you know is there any difference that can affect mypipe running?

Do you know what is the best way to turn on debugging?

Thank you!

@jb-san @kubum with #33 and #36 worked on (2cd84a9) logging should be better, let me know what if this helps.

kubum commented

Hi @mardambey

Steps I have done are the following:

1.) Edit my.cnf

[server]
server-id         = 112233
log_bin           = mysql-bin
expire_logs_days  = 1
binlog_format     = row

2.) git clone
3.) Edited mypipe-runner/src/main/resources/application.conf

include "application.overrides"

mypipe {
  # consumers represent sources for mysql binary logs
  consumers {

    database1  {
        # database "host:port:user:pass"
        source = "localhost:3306:mypipe:mypipe"
    }
  }

  producers {
    stdout {
       class = "mypipe.producer.stdout.StdoutProducer"
    }
  }

  pipes {

    # prints queries to standard out
    stdout {
      consumers = ["database1"]
      producer {
        stdout {}
      }
    }
  }
}

4.) Package

$ ./sbt package

5.) Added permissions for mypipe:mypipe into MariaDB

6.) Run

$ ./sbt "project runner" "runMain mypipe.runner.PipeRunner"

and then I see the log:

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] Loading project definition from /Users/andreyfadeyev/Desktop/mypipe/project
[warn] Multiple resolvers having different access mechanism configured with same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
[info] Set current project to mypipe (in build file:/Users/andreyfadeyev/Desktop/mypipe/)
[info] Set current project to runner (in build file:/Users/andreyfadeyev/Desktop/mypipe/)
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list
[info] Packaging /Users/andreyfadeyev/Desktop/mypipe/mypipe-runner/target/scala-2.11/runner_2.11-0.0.1.jar ...
[info] Done packaging.
[info] Running mypipe.runner.PipeRunner
19:05:13 INFO  [mypipe.runner.PipeRunnerUtil$           ] Loading configuration for stdout pipe
19:05:14 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Resuming binlog consumption from file=null pos=4 for localhost:3306
19:05:14 INFO  [mypipe.runner.PipeRunner$               ] Connecting 1 pipes...
19:05:14 INFO  [mypipe.pipe.Pipe                        ] Connecting pipe between localhost:3306 -> StdoutProducer
19:05:14 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Connecting client to com.github.shyiko.mysql.binlog.BinaryLogClient@60fd263d.get:localhost:3306
Feb 14, 2016 7:05:14 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1457, cid:91)
19:05:14 INFO  [mypipe.pipe.Pipe                        ] Pipe stdout connected!
19:05:24 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:05:34 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:05:44 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:05:54 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:06:04 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
Feb 14, 2016 7:06:14 PM com.github.shyiko.mysql.binlog.BinaryLogClient$2 run
INFO: Trying to restore lost connection to localhost:3306
Feb 14, 2016 7:06:14 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1457, cid:94)
19:06:14 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:06:24 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:06:34 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:06:44 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:06:54 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:07:04 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
Feb 14, 2016 7:07:14 PM com.github.shyiko.mysql.binlog.BinaryLogClient$2 run
INFO: Trying to restore lost connection to localhost:3306
Feb 14, 2016 7:07:14 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1457, cid:95)
19:07:14 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:07:24 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
19:07:34 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
^C19:07:42 INFO  [mypipe.runner.PipeRunner$               ] Shutting down...
19:07:42 INFO  [mypipe.pipe.Pipe                        ] Disconnecting pipe between localhost:3306 -> StdoutProducer
19:07:42 INFO  [mypipe.pipe.Pipe                        ] Pipe stdout disconnected!
19:07:42 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4

I tried to do some changes in the database but I see nothing in stdout.

Do you have any ideas what could potentially happen?

Thanks!

@kubum the main difference here is MariaDB. I haven't tried mypipe against it recently, and I think it did not work properly last time I tried it. I'll have to give it a try and let you know, unless you can repeat the above steps for a "stock" MySQL server and let us know what you find out.

Another thing I noticed in your logs was:

stdout/localhost-3306 -> null:4

I wonder why mypipe can't find out the binary log file name. Usually, this looks like this:

stdout-00/localhost-3306 -> mysql-bin.000004:50989

Thanks for following up!

I am encountering a similar situation.

application.conf

include "application.overrides"

my pipe {

  # consumers represent sources for mysql binary logs
  consumers {

    database1  {
      # database "host:port:user:pass"·
      source = "localhost:3306:my pipe:my pipe
    }
  }

  producers {
    stdout {
      class = "mypipe.kafka.producer.stdout.StdoutProducer"
    }

    kaka-generic {
      class = "mypipe.kafka.producer.KafkaMutationGenericAvroProducer"
    }
  }

  pipes {

    # prints queries to standard out
    stdout {
      consumers = ["database1"]
      producer {
        stdout {}
      }
    }

    kaka-generic {
      enabled = true
      consumers = ["database1"]
      producer {
        kaka-generic {
          metadata-brokers = "localhost:9092"
        }
      }
    }
  }

}

I am using the containers within this project. Kafka, MySQL, and Zookeeper
Output from running : ./sbt "project runner" "runMain mypipe.runner.PipeRunner"

01:36:45 INFO  [mypipe.runner.PipeRunnerUtil$           ] Loading configuration for stdout pipe
01:36:46 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Resuming binlog consumption from file=null pos=4 for localhost:3306
01:36:46 INFO  [mypipe.runner.PipeRunnerUtil$           ] Loading configuration for kaka-generic pipe
01:36:46 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Resuming binlog consumption from file=null pos=4 for localhost:3306
01:36:46 INFO  [o.a.k.clients.producer.ProducerConfig   ] ProducerConfig values:
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    acks = 1
    batch.size = 16384
    reconnect.backoff.ms = 10
    bootstrap.servers = [localhost:9092]
    receive.buffer.bytes = 32768
    retry.backoff.ms = 100
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    retries = 0
    max.request.size = 1048576
    block.on.buffer.full = true
    value.serializer = class mypipe.kafka.producer.KafkaGenericAvroSerializer
    metrics.sample.window.ms = 30000
    send.buffer.bytes = 131072
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    linger.ms = 0
    client.id =

[WARNING] Avro: Invalid default for field txid: "null" not a ["null",{"type":"fixed","name":"Guid","namespace":"mypipe.avro","size":16}]
[WARNING] Avro: Invalid default for field txid: "null" not a ["null",{"type":"fixed","name":"Guid","namespace":"mypipe.avro","size":16}]
[WARNING] Avro: Invalid default for field txid: "null" not a ["null",{"type":"fixed","name":"Guid","namespace":"mypipe.avro","size":16}]
01:36:46 WARN  [o.a.k.clients.producer.ProducerConfig   ] The configuration schema-repo-client = null was supplied but isn't a known config.
01:36:46 INFO  [mypipe.runner.PipeRunner$               ] Connecting 2 pipes...
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Connecting pipe between localhost:3306 -> StdoutProducer
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Connecting pipe between localhost:3306 -> kafka-avro-producer-localhost:9092
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Disconnecting pipe between localhost:3306 -> StdoutProducer
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Pipe stdout disconnected!
01:36:46 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout/localhost-3306 -> null:4
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Disconnecting pipe between localhost:3306 -> kafka-avro-producer-localhost:9092
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Pipe kaka-generic disconnected!
01:36:46 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Connecting client to com.github.shyiko.mysql.binlog.BinaryLogClient@947ca00:localhost:3306
01:36:46 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Connecting client to com.github.shyiko.mysql.binlog.BinaryLogClient@42b45f2f:localhost:3306
01:36:46 INFO  [m.a.r.FileBasedBinaryLogPositionRepository] Saving binlog position for pipe kafka-generic/localhost-3306 -> null:4
01:36:46 INFO  [mypipe.runner.PipeRunner$               ] Shutting down actor system...
Aug 03, 2016 1:36:46 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1457, cid:43)
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Pipe stdout connected!
Aug 03, 2016 1:36:46 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1458, cid:44)
01:36:46 INFO  [mypipe.runner.PipeRunner$               ] Bye bye...
[debug]     Thread run-main-0 exited.
[debug] Interrupting remaining threads (should be all daemons).
[debug] Interrupting thread ForkJoinPool-2-worker-15
[debug]     Interrupted ForkJoinPool-2-worker-15
[debug] Interrupting thread ForkJoinPool-2-worker-11
01:36:46 INFO  [mypipe.pipe.Pipe                        ] Pipe kaka-generic connected!
[debug]     Interrupted ForkJoinPool-2-worker-11
[debug] Interrupting thread Thread-1
[debug]     Interrupted Thread-1
[debug] Interrupting thread kafka-producer-network-thread | producer-1
[debug]     Interrupted kafka-producer-network-thread | producer-1
[debug] Interrupting thread ForkJoinPool-2-worker-9
[debug]     Interrupted ForkJoinPool-2-worker-9
[debug] Interrupting thread blc-keepalive-localhost:3306
[debug]     Interrupted blc-keepalive-localhost:3306
[debug] Interrupting thread shutdownHook1
[debug]     Interrupted shutdownHook1
[debug] Interrupting thread ForkJoinPool-2-worker-13
[debug]     Interrupted ForkJoinPool-2-worker-13
[debug] Interrupting thread blc-keepalive-localhost:3306
[debug]     Interrupted blc-keepalive-localhost:3306
[debug] Sandboxed run complete..
Aug 03, 2016 1:36:46 AM com.github.shyiko.mysql.binlog.BinaryLogClient$2 run
INFO: Trying to restore lost connection to localhost:3306
Aug 03, 2016 1:36:46 AM com.github.shyiko.mysql.binlog.BinaryLogClient$2 run
INFO: Trying to restore lost connection to localhost:3306
[debug] Exited with code 0
Aug 03, 2016 1:36:46 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1457, cid:45)
Aug 03, 2016 1:36:46 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at null/4 (sid:1458, cid:46)
[success] Total time: 2 s, completed Aug 3, 2016 1:36:46 AM

I had the same problem and commenting out shutdown seemed to fix it

diff --git a/mypipe-runner/src/main/scala/mypipe/runner/PipeRunner.scala b/mypipe-runner/src/main/scala/mypipe/runner/PipeRunner.scala
index b3cf449..d0ec4b7 100644
--- a/mypipe-runner/src/main/scala/mypipe/runner/PipeRunner.scala
+++ b/mypipe-runner/src/main/scala/mypipe/runner/PipeRunner.scala
@@ -52,7 +57,7 @@ object PipeRunner extends App {
   log.info(s"Connecting ${pipes.size} pipes...")
   pipes.foreach(_.connect())

-  shutdown()
+  //shutdown()
 }
pensz commented

same problem, comment shutdown to work.
@mardambey why call shutdown in the end?