pystorm/streamparse

Implementing a Spout from a Kafka Consumer

SioKCronin opened this issue · 0 comments

I'm trying to build a topology that streams records from Kafka Consumer (Spout) to Elasticsearch workers for a search process (Bolt).

When I run sparse run I get the following error:

23416 [Thread-19-search_bolt-executor[3 3]] INFO  o.a.s.d.executor - Prepared bolt search_bolt:(3)
23471 [Thread-17-docs_spout-executor[2 2]] WARN  o.a.s.s.ShellSpout - ShellLog pid:18630, name:docs_spout 2018-08-13 14:59:24,785 - kafka.coordinator.consumer - group_id is None: disabling auto-commit.

I'm running this on Ubuntu (with Zookeeper and Kafka serviced). I saw Issue 181 mentioned a native Kafka spout was in progress. Perhaps there is a solution in place that I'm overlooking?

For reference, I'm attaching my Spout and Topology below:

import itertools

from streamparse.spout import Spout
from kafka import KafkaConsumer

class DocsSpout(Spout):
    outputs = ['doc']

    def initialize(self, stormconf, context):
        c = 'ec2-XX-XX-XXX-XXX.us-west-2.compute.amazonaws.com'
        self.docs = KafkaConsumer('docs', bootstrap_servers=c)

    def next_tuple(self):
        doc = next(self.docs)
        self.emit([doc])
from streamparse import Grouping, Topology
from spouts.docs import DocsSpout
from bolts.search import SearchBolt

class Search(Topology):
    docs_spout = DocsSpout.spec()
    search_bolt = SearchBolt.spec(inputs=[docs_spout])