This repository shows how to load Tweets from the twint library into Kafka, and then from Kafka into Neo4j using the Kafka Connect Sink.
git clone git@github.com:mneedham/kafka-connect-neo4j.git && cd kafka-connect-neo4j
docker-compose up
pip install --upgrade -e git+https://github.com/twintproject/twint.git@origin/master#egg=twint
pip install confluent-kafka[avro]
import twint
import sys
import json
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"namespace": "my.test",
"name": "value",
"type": "record",
"fields" : [
{ "name": "id", "type": "long" },
{ "name": "tweet", "type": "string" },
{ "name": "datetime", "type": "long" },
{ "name": "username", "type": "string" },
{ "name": "user_id", "type": "long" },
{ "name": "hashtags", "type": {"type": "array", "items": "string"} }
]
}
"""
key_schema_str = """
{
"namespace": "my.test",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""
kafka_broker = 'localhost:9092'
schema_registry = 'http://localhost:8081'
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
producer = AvroProducer({
'bootstrap.servers': kafka_broker,
'schema.registry.url': schema_registry
}, default_key_schema=key_schema, default_value_schema=value_schema)
module = sys.modules["twint.storage.write"]
def Json(obj, config):
tweet = obj.__dict__
print(tweet)
producer.produce(topic='tweets10', value=tweet, key={"name": "Key"})
producer.flush()
module.Json = Json
c = twint.Config()
c.Search = "neo4j OR \"graph database\" OR \"graph databases\" OR graphdb OR graphconnect OR @neoquestions OR @Neo4jDE OR @Neo4jFr OR neotechnology"
c.Store_json = True
c.Custom["user"] = ["id", "tweet", "user_id", "username", "hashtags", "mentions"]
c.User_full = True
c.Output = "tweets.json"
c.Since = "2019-05-20"
c.Hide_output = True
twint.run.Search(c)
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "connect.sink.neo4j.tweets",
"config": {
"topics": "tweets10",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "neo",
"neo4j.topic.cypher.tweets10": "WITH event AS data MERGE (t:Tweet {id: data.id}) SET t.text = data.tweet, t.createdAt = datetime({epochmillis:data.datetime}) MERGE (u:User {username: data.username}) SET u.id = data.user_id MERGE (u)-[:POSTED]->(t) FOREACH (ht IN data.hashtags | MERGE (hashtag:HashTag {value: ht}) MERGE (t)-[:HAS_HASHTAG]->(hashtag))"
}
}'
Navigate to http://localhost:7474
and paste the following into the query pane:
MATCH path = (u:User)-[:POSTED]->(t:Tweet)-[:HAS_HASHTAG]->(ht)
RETURN path
LIMIT 100