mailgun/kafka-pixy

Quick Start for Python

Closed this issue · 6 comments

Can you please provide quick start guide in Python?

Give me a couple of days.

Python quick start would be great. I'm trying to use KafkaPixy grpc with Ruby.

@msashokkumar check it out and let me know if something is not clear, wrong, or does not make sense. @dlanderson you should be able to use the python quick start for inspiration.

Thanks @horkhe . I will try it out and let you know. Appreciate the quick turnaround.

Thanks for the example @horkhe ! Easy to get going in ruby based off the python example. I'll make a PR for a ruby quick start guide that mirrors the python example. The consumer looks like this for anyone curious:

require 'grpc'
require_relative 'kafkapixy_grpc_pb.rb'
require_relative 'kafkapixy_grpc_services_pb.rb'

$kp = KafkaPixy::Stub.new('localhost:19091', :this_channel_is_insecure)

def run_consume_n_ack(topic, group)
  ack_partition = nil
  ack_offset = nil

  rq = ConsNAckRq.new(topic: topic, group: group)
  keep_running = true
  while keep_running
    if ack_offset
      rq.no_ack = false
      rq.ack_partition = ack_partition
      rq.ack_offset = ack_offset
    else
      rq.no_ack = true
      rq.ack_partition = 0
      rq.ack_offset = 0
    end
    
    begin
      rs = $kp.consume_n_ack(rq)
    rescue GRPC::NotFound
      # Long polling timeout, no new messsages. Make another request.
      ack_offset = nil
      next
    rescue Exception => e
      # Unexpected errors can be generated in rapid succession e.g.
      # when a Kafka-Pixy is down. So it makes sense to back off.
      sleep 10
      next
    end

    begin
      # Do something with the message.
      puts "Handled message!"
      ack_partition = rs.partition
      ack_offset = rs.offset
    rescue Exception => e
      ack_offset = nil
      # The message handler raised an exception, it is up to you what
      # to do in this case.
    end

    # If there is nothing to acknowledge continue looping
    next if ack_offset.nil?

    begin
      ack_rq = AckRq.new(topic: topic,
                    group: group,
                    partition: rq.ack_partition,
                    offset: rq.ack_offset)
      $kp.ack(ack_rq)
    rescue Exception => e
      #logger.error("Failed to ack last message: topic=#{topic},
      #             partition=#{ack_partition}, offset=#{ack_offset}"
    end
  end
end

run_consume_n_ack('my_topic', 'my_group')

@dlanderson thanks a lot!. I will gladly accept your PR.