Stepist. Framework for data processing.
The main Stepist goal is to simplify working with data.
What for:
- RealTime distributing services
- ETL tasks
- Prepare data for AI models
So, what is Stepist?
This is tool for creating sequence of functions (called steps) which represents execution flow.
The result of each step is input for a next step, as a result you will have graph (data pipeline),
which could handle data using streaming services (celery, rq, redis) or batch processing tools (kafka).
Install
- setup redis https://redis.io/topics/quickstart
- pip install stepist
- App - Collect's all your objects and has full configuration of the system.
- Step - Basic object. Connect multiple functions into flow.
- Flow - Chain of steps, which start from simple step, and has last step with next_step=None.
Simple step by step flow. (result of each step is input for the next)
from stepist import App
app = App()
@app.step(None)
def step2(a_plus_b, a_minus_b):
return dict(result=a_plus_b *
a_minus_b)
@app.step(step2)
def step1(a, b):
return dict(a_plus_b=a+b,
a_minus_b=a-b)
print(step1(5,5))
Simple step by step flow with workers
import sys
import requests
from stepist import App
app = App()
URLS = ['https://www.python.org/',
'https://wikipedia.org/wiki/Python_(programming_language)']
@app.step(None)
def step3(text, **kwargs):
print(text.count('python'))
@app.factory_step(step3, as_worker=True)
def step2(url):
r = requests.get(url)
return dict(url=url,
text=r.text)
@app.step(step2)
def step1(urls):
for url in urls:
yield dict(url=url)
if sys.argv[1] == "worker":
app.run(step2) # run worker
else:
step1(urls=URLS)
# Worker process:
# >>> 94
# >>> 264
Call multiple steps at once (Map)
Define Hub(list_of_next_steps) as a next step.
import sys
import requests
from stepist import Hub
from stepist import App
app = App()
URLS = ['https://www.python.org/',
'https://wikipedia.org/wiki/Python_(programming_language)']
@app.step(None)
def step3(text, **kwargs):
c = text.count('python')
return c
@app.factory_step(step3, as_worker=True)
def step2_v2(url):
r = requests.get(url)
return dict(url=url,
text=r.text)
@app.factory_step(step3, as_worker=True)
def step2(url):
r = requests.get(url)
return dict(url=url,
text=r.text)
@app.step(Hub(step2, step2_v2))
def step1(urls):
for url in urls:
yield dict(url=url)
if sys.argv[1] == "worker":
app.run() # run workers
else:
print(step1(urls=URLS))
# print, from main process
# >>> [94, 264]
Сombine data from multiple steps. (Reduce)
Define @app.reducer_step and linked it with pipeline "leaves"
import sys
import requests
from stepist import Hub
from stepist import App
app = App()
URLS = ['https://www.python.org/',
'https://wikipedia.org/wiki/Python_(programming_language)']
@app.reducer_step()
def step3(job_list):
return dict(c1=job_list[0].count('python'),
c2=job_list[1].count('python'))
@app.factory_step(step3, as_worker=True)
def step2_v2(url):
r = requests.get(url)
return dict(url=url,
text=r.text)
@app.factory_step(step3, as_worker=True)
def step2(url):
r = requests.get(url)
return dict(url=url,
text=r.text)
@app.step(Hub(step2, step2_v2))
def step1(urls):
for url in urls:
yield dict(url=url)
if sys.argv[1] == "worker":
app.run() # run workers
else:
print(step1(urls=URLS))
# print, from main process
# >>> [94, 264]
Celery
Stepist Campatible with Celery
from celery import Celery
from stepist import App
from stepist.flow.workers.adapters.celery_queue import CeleryAdapter
app = App()
celery = Celery(broker="redis://localhost:6379/0")
app.worker_engine = CeleryAdapter(app, celery)
@app.step(None, as_worker=True)
def step3(result):
return dict(result=result[:2])
@app.step(step3, as_worker=True)
def step2(hello, world):
return dict(result="%s %s" % (hello, world))
@app.step(step2)
def step1(hello, world):
return dict(hello=hello.upper(),
world=world.upper())
if __name__ == "__main__":
print(step1(hello='hello',
world='world'))
app.run()
Custom streaming adapter
Just define following functions in Base adapter class and assign to app.worker_engine
from stepist import App
from stepist.workers.worker_engine import BaseWorkerEngine
class CustomWorkerEngine(BaseWorkerEngine):
def add_job(self, step, data, result_reader, **kwargs):
raise NotImplemented()
def jobs_count(self, *steps):
raise NotImplemented()
def flush_queue(self, step):
raise NotImplemented()
def process(self, *steps):
raise NotImplemented()
def register_worker(self, handler):
raise NotImplemented()
app = App()
app.worker_engine = CustomWorkerEngine()