Kurulum işlemi Ubuntu 16.04 üzerinden yapılmıştır.
Aşağıdaki komutu çalıştırarak sunucunuzu güncelleyin
$ sudo apt-get update -y
$ sudo apt-get upgrade -y
Kafka'yı kurmadan önce Java'yı sisteminize kurmanız gerekecektir. Webupd8 daki PPA kullanarak Oracle JDK 8'i yükleyebilirsiniz.
Repository eklemek için aşağıdaki komutu çalıştırın
$ sudo add-apt-repository -y ppa:webupd8team/java
Aşağıdaki çıktıyı görmelisiniz
gpg: keyring `/tmp/tmpkjrm4mnm/secring.gpg' created
gpg: keyring `/tmp/tmpkjrm4mnm/pubring.gpg' created
gpg: requesting key EEA14886 from hkp server keyserver.ubuntu.com
gpg: /tmp/tmpkjrm4mnm/trustdb.gpg: trustdb created
gpg: key EEA14886: public key "Launchpad VLC" imported
gpg: no ultimately trusted keys found
gpg: Total number processed: 1
gpg: imported: 1 (RSA: 1)
OK
Ardından, aşağıdaki komutu çalıştırarak yeni repository meta verilerini güncelleyin
$ sudo apt-get update
İşlem bittiğınde, JDK 8'i yüklemek için aşağıdaki komutu çalıştırın
$ sudo apt-get install oracle-java8-installer -y
Aşağıdaki komutu çalıştırarak JDK 8'in düzgün şekilde yüklendiğini de doğrulayabilirsiniz.
$ sudo java -version
Çıktıyı bu şekilde görmelisiniz
java version "1.8.0_66"
Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
Apache Kafka'yı kurmadan önce, ZooKeeper hazır ve çalışıyor olması gerekir. ZooKeeper, yapılandırma bilgilerini korumak, dağıtılmış senkronizasyon sağlamak, grup hizmetleri adlandırmak ve sağlamak için açık kaynaklı bir hizmettir.
Varsayılan olarak ZooKeeper paketi Ubuntu'nun varsayılan repositorysinde bulunur, aşağıdaki komutu çalıştırarak kurabilirsiniz
$ sudo apt-get install zookeeperd
Kurulum tamamlandığında, otomatik olarak bir daemon olarak başlatılacaktır. ZooKeeper varsayılan olarak 2181 numaralı bağlantı noktasında çalışacaktır.
Aşağıdaki komutu çalıştırarak test edebilirsiniz.
netstat -ant | grep :2181
Her şey yolundaysa, aşağıdaki Çıktıyı görmelisiniz
tcp6 0 0 :::2181 :::* LISTEN
Artık Java ve ZooKeeper yüklendiğine göre, Kafka'yı Apache web sitesinden indirip çıkarmanın zamanı geldi. Kafka'yı indirmek için wget kullanabilirsiniz.
$ wget http://ftp.itu.edu.tr/Mirror/Apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
Ardından, Kafka kurulumu için bir dizin oluşturun
$ sudo mkdir /opt/Kafka
İndirilen arşivi /opt/Kafka dizinindeki tar komutunu kullanarak açın
$ sudo tar -xvf kafka_2.11-2.1.0.tgz -C /opt/Kafka/
Bir sonraki adım Kafka sunucusunu başlatmak, /opt/Kafka/kafka_2.10-0.10.0.1/bin/ dizininde bulunan kafka-server-start.sh scriptini çalıştırarak başlatabilirsiniz.
$ cd /opt/Kafka/kafka_2.11-2.1.0/
$ sudo bin/kafka-server-start.sh config/server.properties
Sunucu başarıyla başlatılmışsa, aşağıdaki çıktıyı görmelisiniz
[2016-08-22 21:43:48,279] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-08-22 21:43:48,516] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-08-22 21:43:48,525] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser)
[2016-08-22 21:43:48,527] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-08-22 21:43:48,555] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
Her şeyin çalışır durumda olduğunu test edin, yeni bir terminal açıp aşağıdaki komutları deneyin
$ sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testing
Aşağıdaki gibi cıktı alınmalıdır
Created topic "testing".
Kafka sunucusunu arka plan işlemi olarak başlatmak için nohup komut dosyasını komut ile kullanabilirsiniz.
$ sudo nohup /opt/Kafka/kafka_2.10-0.10.0.1/bin/kafka-server-start.sh /opt/Kafka/kafka_2.11-2.1.0/config/server.properties /tmp/kafka.log 2>&1 &
Artık 9092 numaralı bağlantı noktasında çalışan ve dinleyen bir Kafka sunucunuz var.
prepare topics
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Created topic "streams-plaintext-input".
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Created topic "streams-wordcount-output".
create a project with maven
mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=2.1.0 \
-DgroupId=streams.examples \
-DartifactId=streams.examples \
-Dversion=0.1 \
-Dpackage=myapps
content of
WordCount.java
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/**
* In this example, we implement a simple WordCount program using the high-level Streams DSL
* that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
* split each text line into words and then compute the word occurence histogram, write the continuous updated histogram
* into a topic "streams-wordcount-output" where each record is an updated count of a single word.
*/
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
build and run
cd streams.examples/
mvn clean package
mvn exec:java -Dexec.mainClass=myapps.WordCount
run producer
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
run consumer
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Öncelikle projemiz için yeni bir dizin oluşturacağız.
$ mkdir kafkaDemo
$ cd kafkaDemo
Sanal ortamımızı çalıştırıyoruz.
$ virtualenv env
$ . env/bin/activate
Kafka client çalışır duruma getirmek Kafka-Python projesine ihtiyacımız var.
Ayrıca, video dağıtımı için OpenCV'a ve “dağıtılmış” Consumer için Flask'a ihtiyacımız olacak.
$ pip install kafka-python opencv-contrib-python Flask
Son bölümde kurduğumuz Kafka Sunucusu 9092 numaralı portuna bağlı. İki Kafka clientini kurarken bu değeri kullanacağız.
Kafka clientinin ilki, producer mesajı olacaktır. Burada videoyu bir JPEG görüntü akışına dönüştürmek işlemi yapılacaktır.
import sys
import time
import cv2
from kafka import KafkaProducer
servers=['localhost:9092']
def publish_video(video_file):
"""
Publish given video file to a specified Kafka topic.
Kafka Server is expected to be running on the localhost. Not partitioned.
:param video_file: path to video file <string>
"""
# Start up producer
producer = KafkaProducer(bootstrap_servers=servers)
# Open file
video = cv2.VideoCapture(video_file)
print('publishing video...')
while(video.isOpened()):
success, frame = video.read()
# Ensure file was read successfully
if not success:
print("bad read!")
break
# Convert image to png
ret, buffer = cv2.imencode('.jpg', frame)
# Convert to bytes and send to kafka
producer.send("video", buffer.tobytes())
time.sleep(0.04)
video.release()
print('publish complete')
def publish_camera():
"""
Publish camera video stream to specified Kafka topic.
Kafka Server is expected to be running on the localhost. Not partitioned.
"""
# Start up producer
producer = KafkaProducer(bootstrap_servers=servers)
camera = cv2.VideoCapture(0)
try:
while(True):
success, frame = camera.read()
ret, buffer = cv2.imencode('.jpg', frame)
producer.send("webcam", buffer.tobytes())
# Choppier stream, reduced load on processor
time.sleep(0.2)
except:
print("\nExiting.")
sys.exit(1)
camera.release()
if __name__ == '__main__':
"""
Producer will publish to Kafka Server a video file given as a system arg.
Otherwise it will default by streaming webcam feed.
"""
if(len(sys.argv) > 1):
video_path = sys.argv[1]
publish_video(video_path)
else:
print("publishing feed!")
publish_camera()
Producer doğrudan bir web kamerasından video akışı yaparak varsayılan ayarlara sahiptir; Bir video dosyasından çekme, stiliniz daha fazlaysa, Producer bir dosya adını komut satırı argümanı olarak kabul eder.
Yeni yayınlanan akışımızı okumak için Kafka konusuna erişen bir Consumer'e ihtiyacımız olacak. Mesaj akış sistemimiz dağıtık bir sistem için tasarlandığından, projemizi bu bölüm içinde tutacağız ve Tüketici'yi Flask servisi olarak açacağız.
import os
import time
#import datetime
from threading import Thread
from flask import Flask, Response, render_template
from kafka import KafkaConsumer
# Fire up the Kafka Consumer
topics = ["webcam", "video"]
servers=['localhost:9092']
consumers = []
buffer = []
for topic in topics:
consumers.append(KafkaConsumer(topic, bootstrap_servers=servers))
# Frame Buffer
buffer.append(b'')
# Set the consumer in a Flask App
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/webcam')
def webcam():
return render_template('webcam.html')
@app.route('/video')
def video():
return render_template('video.html')
@app.route('/webcam_feed', methods=['GET'])
def webcam_feed():
return Response(get_video_stream(topic = 0),
mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/video_feed', methods=['GET'])
def video_feed():
"""
This is the heart of our video display. Notice we set the mimetype to
multipart/x-mixed-replace. This tells Flask to replace any old images with
new values streaming through the pipeline.
"""
return Response(get_video_stream(topic = 1),
mimetype='multipart/x-mixed-replace; boundary=frame')
def get_video_stream(topic):
"""
Here is where we recieve streamed images from the Kafka Server and convert
them to a Flask-readable format.
"""
while True:
time.sleep(0.04) #0.2=5fps, 0.04=25fps
yield (b'--frame\r\n'
b'Content-Type: image/jpg\r\n\r\n' + buffer[topic] + b'\r\n\r\n')
def consumer_thread(topic, delay = 3):
global topics
global buffer
index = topics.index(topic)
while True:
for msg in consumers[index]:
buffer[index] = msg.value
time.sleep(delay)
if __name__ == "__main__":
threads = []
for topic in topics:
threads.append(Thread(target=consumer_thread,args=(topic,)))
for t in threads:
t.start()
PORT = int(os.environ.get("PORT", 5000))
app.run(host='0.0.0.0', port=PORT, debug=True)
Şimdi her şeyi bir araya getirireceğiz. Sırasıyla, Kafka'yı, Consumer'i ve nihayetinde Prducer'i, her birinin kendi terminalinde başlatlır.
Kafka' yı başlatcağız
$ cd /opt/Kafka/kafka_2.11-2.1.0/
$ sudo bin/kafka-server-start.sh config/server.properties
Yeni bir terminalde sanal ortamını ve Consumer projesini başlatacağız.
$ cd ~/KafkaDemo
$ . env/bin/activate
$ python consumer.py
Her şey çalışıyorsa terminaliniz okumalısınız
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
Tarayıcıda, http://0.0.0.0:5000/video adresine gidin. Burada henüz bir şey görmeyeceksin, bir kaç adım sonra gelecek
Consumer için de aynı. İlk önce yeni bir terminal açın. Burada, web kamerasından akış yapacağız, bu nedenle ek argümana gerek yok.
$ cd ~/KafkaDemo
$ . env/bin/activate
$ python producer.py
Kısa bir video yayınlamak istiyorsak, son komutu aşağıdaki gibi yazabiliriz.
python producer.py videos/Countdown1.mp4