/python-stream

更优雅的流式数据处理方式

Primary LanguagePython

python-stream

说明

数据流式框架, 可用作数据清洗, 数据预处理, 数据迁移等应用场景

更优雅的流式数据处理方式

安装


pip install git+https://github.com/sandabuliu/python-stream.git

or

git clone https://github.com/sandabuliu/python-stream.git
cd python-agent
python setup.py install

QuickStart


Examples

Word Count
from pystream.executor.source import Memory
from pystream.executor.executor import Map, Iterator, ReducebyKey
    
data = Memory([
    'Wikipedia is a free online encyclopedia, created and edited by volunteers around the world and hosted by the Wikimedia Foundation.',
    'Search thousands of wikis, start a free wiki, compare wiki software.',
    'The official Wikipedia Android app is designed to help you find, discover, and explore knowledge on Wikipedia.'
])
p = data | Map(lambda x: x.split(' ')) | Iterator(lambda x: (x.strip('.,'), 1)) | ReducebyKey(lambda x, y: x+y)
result = {}
for key, value in p:
    result[key] = value
print result.items()

执行结果

[('and', 3), ('wiki', 2), ('compare', 1), ('help', 1), ('is', 2), ('Wikipedia', 3), ('discover', 1), ('hosted', 1), ('Android', 1), ('find', 1), ('Foundation', 1), ('knowledge', 1), ('to', 1), ('by', 2), ('start', 1), ('online', 1), ('you', 1), ('thousands', 1), ('app', 1), ('edited', 1), ('Search', 1), ('around', 1), ('free', 2), ('explore', 1), ('designed', 1), ('world', 1), ('The', 1), ('the', 2), ('a', 2), ('on', 1), ('created', 1), ('Wikimedia', 1), ('official', 1), ('encyclopedia', 1), ('of', 1), ('wikis', 1), ('volunteers', 1), ('software', 1)]
计算π
from random import random
from pystream.executor.source import Faker
from pystream.executor.executor import Executor, Map, Group

class Pi(Executor):
    def __init__(self, **kwargs):
        super(Pi, self).__init__(**kwargs)
        self.counter = 0
        self.result = 0

    def handle(self, item):
        self.counter += 1
        self.result += item
        return 4.0*self.result/self.counter

s = Faker(lambda: random(), 100000) | Map(lambda x: x*2-1) | Group(size=2) | Map(lambda x: 1 if x[0]**2+x[1]**2 <= 1 else 0) | Pi()

res = None
for _ in s:
    res = _
print res

执行结果

3.14728
排序
from random import randint
from pystream.executor.source import Memory
from pystream.executor.executor import Sort
m = Memory([randint(0, 100) for i in range(10)]) | Sort()

for i in m:
    print list(i)

执行结果

[94]
[94, 99]
[18, 94, 99]
[18, 40, 94, 99]
[18, 26, 40, 94, 99]
[18, 26, 40, 63, 94, 99]
[18, 26, 40, 63, 83, 94, 99]
[3, 18, 26, 40, 63, 83, 94, 99]
[3, 18, 26, 40, 63, 83, 83, 94, 99]
[3, 16, 18, 26, 40, 63, 83, 83, 94, 99]
在 hadoop 中使用
wordcount
mapper.py
from pystream.executor.source import Stdin
from pystream.executor.executor import Map, Iterator
from pystream.executor.output import Stdout

s = Stdin() | Map(lambda x: x.strip().split()) | Iterator(lambda x: "%s\t1" % x) | Stdout()
s.start()
reducer.py
from pystream.executor.source import Stdin
from pystream.executor.executor import Map, ReducebySortedKey
from pystream.executor.output import Stdout

s = Stdin() | Map(lambda x: x.strip().split('\t')) | ReducebySortedKey(lambda x, y: int(x)+int(y)) | Map(lambda x: '%s\t%s' % x) | Stdout()
s.start()
解析 NGINX 日志
from pystream.config import rule
from pystream.executor.source import File
from pystream.executor.executor import Parser
s = File('/var/log/nginx/access.log') | Parser(rule('nginx'))

for item in s:
    print item

执行结果

{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
导出数据库数据
from sqlalchemy import create_engine
from pystream.executor.source import SQL
from pystream.executor.output import Csv
from pystream.executor.wraps import Batch

engine = create_engine('mysql://root:123456@127.0.0.1:3306/test')  
conn = engine.connect()
s = SQL(conn, 'select * from faker') | Batch(Csv('/tmp/output'))

for item in s:
    print item['data']
    print item['exception']
conn.close()

数据源

读取文件数据
from pystream.executor.source import Tail, File, Csv
Tail('/var/log/nginx/access.log')
File('/var/log/nginx/*.log')
Csv('/tmp/test*.csv')
读取 TCP 流数据
from pystream.executor.source import TCPClient
TCPClient('/tmp/pystream.sock')
TCPClient(('127.0.0.1', 10000))
读取 python 数据
from Queue import Queue as Q
from random import randint
from pystream.executor.source import Memory, Faker, Queue
queue = Q(10)

Memory([1, 2, 3, 4])
Faker(randint, 1000)
Queue(queue)
读取常用模块数据
from pystream.executor.source import SQL, Kafka
SQL(conn, 'select * from faker')   # 读取数据库数据
Kafka('topic1', '127.0.0.1:9092')  # 读取 kafka 数据

数据输出

输出到文件
from pystream.executor.output import File, Csv
File('/tmp/output')
Csv('/tmp/output.csv')
通过HTTP输出
from pystream.executor.output import HTTPRequest
HTTPRequest('http://127.0.0.1/api/data')
输出到kafka
from pystream.executor.output import Kafka
Kafka('topic', '127.0.0.1:9092')

中间件

队列
from pystream.executor.source import Tail
from pystream.executor.output import Stdout
from pystream.executor.middleware import Queue

s = Tail('/Users/tongbin01/PycharmProjects/python-stream/README.md') | Queue() | Stdout()
s.start()
订阅
from random import randint
from pystream.executor.source import Tail
from pystream.executor.executor import Map
from pystream.executor.output import Stdout

from pystream.executor.middleware import Subscribe
from pystream.executor.wraps import Daemonic

sub = Tail('/var/log/messages') | Map(lambda x: (str(randint(1, 2)), x.strip())) | Subscribe()
Daemonic(sub).start()

s = sub['1'] | Map(lambda x: x.strip()) | Stdout()
s.start()

TodoList

  • 订阅器(Subscribe)客户端超时处理
  • 并行计算
  • HTTP 异步输出/异步源
  • 添加其他基础输出/基础源
  • 添加对其他常用模块的支持, 如 redis, kafka, flume, log-stash, 各种数据库等

Copyright © 2017 g_tongbin@foxmail.com