Quick Start for Python
Closed this issue · 6 comments
msashokkumar commented
Can you please provide quick start guide in Python?
horkhe commented
Give me a couple of days.
dlanderson commented
Python quick start would be great. I'm trying to use KafkaPixy grpc with Ruby.
horkhe commented
@msashokkumar check it out and let me know if something is not clear, wrong, or does not make sense. @dlanderson you should be able to use the python quick start for inspiration.
msashokkumar commented
Thanks @horkhe . I will try it out and let you know. Appreciate the quick turnaround.
dlanderson commented
Thanks for the example @horkhe ! Easy to get going in ruby based off the python example. I'll make a PR for a ruby quick start guide that mirrors the python example. The consumer looks like this for anyone curious:
require 'grpc'
require_relative 'kafkapixy_grpc_pb.rb'
require_relative 'kafkapixy_grpc_services_pb.rb'
$kp = KafkaPixy::Stub.new('localhost:19091', :this_channel_is_insecure)
def run_consume_n_ack(topic, group)
ack_partition = nil
ack_offset = nil
rq = ConsNAckRq.new(topic: topic, group: group)
keep_running = true
while keep_running
if ack_offset
rq.no_ack = false
rq.ack_partition = ack_partition
rq.ack_offset = ack_offset
else
rq.no_ack = true
rq.ack_partition = 0
rq.ack_offset = 0
end
begin
rs = $kp.consume_n_ack(rq)
rescue GRPC::NotFound
# Long polling timeout, no new messsages. Make another request.
ack_offset = nil
next
rescue Exception => e
# Unexpected errors can be generated in rapid succession e.g.
# when a Kafka-Pixy is down. So it makes sense to back off.
sleep 10
next
end
begin
# Do something with the message.
puts "Handled message!"
ack_partition = rs.partition
ack_offset = rs.offset
rescue Exception => e
ack_offset = nil
# The message handler raised an exception, it is up to you what
# to do in this case.
end
# If there is nothing to acknowledge continue looping
next if ack_offset.nil?
begin
ack_rq = AckRq.new(topic: topic,
group: group,
partition: rq.ack_partition,
offset: rq.ack_offset)
$kp.ack(ack_rq)
rescue Exception => e
#logger.error("Failed to ack last message: topic=#{topic},
# partition=#{ack_partition}, offset=#{ack_offset}"
end
end
end
run_consume_n_ack('my_topic', 'my_group')
horkhe commented
@dlanderson thanks a lot!. I will gladly accept your PR.