Implementing a Spout from a Kafka Consumer
SioKCronin opened this issue · 0 comments
SioKCronin commented
I'm trying to build a topology that streams records from Kafka Consumer (Spout) to Elasticsearch workers for a search process (Bolt).
When I run sparse run
I get the following error:
23416 [Thread-19-search_bolt-executor[3 3]] INFO o.a.s.d.executor - Prepared bolt search_bolt:(3)
23471 [Thread-17-docs_spout-executor[2 2]] WARN o.a.s.s.ShellSpout - ShellLog pid:18630, name:docs_spout 2018-08-13 14:59:24,785 - kafka.coordinator.consumer - group_id is None: disabling auto-commit.
I'm running this on Ubuntu (with Zookeeper and Kafka serviced). I saw Issue 181 mentioned a native Kafka spout was in progress. Perhaps there is a solution in place that I'm overlooking?
For reference, I'm attaching my Spout and Topology below:
import itertools
from streamparse.spout import Spout
from kafka import KafkaConsumer
class DocsSpout(Spout):
outputs = ['doc']
def initialize(self, stormconf, context):
c = 'ec2-XX-XX-XXX-XXX.us-west-2.compute.amazonaws.com'
self.docs = KafkaConsumer('docs', bootstrap_servers=c)
def next_tuple(self):
doc = next(self.docs)
self.emit([doc])
from streamparse import Grouping, Topology
from spouts.docs import DocsSpout
from bolts.search import SearchBolt
class Search(Topology):
docs_spout = DocsSpout.spec()
search_bolt = SearchBolt.spec(inputs=[docs_spout])