A library for composing asynchronous and event-based programs using observable collections and LINQ-style query operators in Python
Python Reactive Programming is in the making. Finally there will be a book about reactive programming in Python and RxPY. The book is scheduled for fall 2016, and you may pre-order it at Amazon, or buy it directly from Packt Publishing when available.
The Reactive Extensions for Python (RxPY) is a set of libraries for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.
Whether you are authoring a client-side or server-side application in Python, you have to deal with asynchronous and event-based programming as a matter of course.
Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, tweets, computer events, web service requests, etc.), and subscribe to the event stream using the Observer object. The Observable notifies the subscribed Observer instance whenever an event occurs.
Because observable sequences are data streams, you can query them using standard LINQ query operators implemented by the Observable type. Thus you can filter, map, reduce, compose and perform time-based operations on multiple events easily by using these static LINQ operators. In addition, there are a number of other reactive stream specific operators that allow powerful queries to be written. Cancellation, exceptions, and synchronization are also handled gracefully by using the methods on the Observable object.
RxPy runs on Python 2.7, 3.4, PyPy and IronPython
To install RxPY:
pip install rx
Note that pip
may be called pip3
if your are using Python3.
RxPY is a fairly complete implementation of Rx v2.2 with more than 134 query operators, and over 1100 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from RxNET and RxJava in terms of threading and blocking operators.
RxPY follows PEP 8, so all function and method names are lowercase with words separated by underscores as necessary to improve readability.
Thus .NET code such as:
var group = source.GroupBy(i => i % 3);
need to be written with an _
in Python:
group = source.group_by(lambda i: i % 3)
With RxPY you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).
res = Observable.timer(5000) # Yes
res = Observable.timer(5000, 1000) # Yes
res = Observable.timer(5000, 1000, Scheduler.timeout) # Yes
res = Observable.timer(5000, scheduler=Scheduler.timeout) # Yes, but must name
res = Observable.timer(5000, Scheduler.timeout) # No, this is an error
Thus when an operator like Observable.timeout
has multiple optional arguments
you should name your arguments. At least the arguments marked as optional.
Disposables implements a context manager so you may use them in with
statements.
Observable sequences may be concatenated using +
, so you can write:
xs = Observable.from_([1,2,3])
ys = Observable.from_([4,5,6])
zs = xs + ys # Concatenate observables
Observable sequences may be repeated using *=
, so you can write:
xs = Observable.from_([1,2,3])
ys = xs * 4
Observable sequences may be sliced using [start:stop:step]
, so you can write:
xs = Observable.from_([1,2,3,4,5,6])
ys = xs[1:-1]
Observable sequences may be turned into an iterator so you can use generator expressions, or iterate over them (uses queueing and blocking).
xs = Observable.from_([1,2,3,4,5,6])
ys = xs.to_blocking()
zs = (x*x for x in ys if x > 3)
for x in zs:
print(x)
In RxPY you can choose to run fully asynchronously or you may decide to schedule work and timeouts using threads.
For time and scheduler handing you will need to supply
datetime for absolute time
values and
timedelta
for relative time. You may also use int
to represent milliseconds.
RxPY also comes with batteries included, and has a number of Python specific mainloop schedulers to make it easier for you to use RxPY with your favorite Python framework.
AsyncIOScheduler
for use with AsyncIO. (requires Python 3.4 or trollius, a port ofasyncio
compatible with Python 2.6-3.5).IOLoopScheduler
for use with Tornado IOLoop. See the autocomplete and konamicode examples for how to use RxPY with your Tonado application.GEventScheduler
for use with GEvent. (Python 2.7 only).TwistedScheduler
for use with Twisted.TkinterScheduler
for use with Tkinter. See the timeflies example for how to use RxPY with your Tkinter application.PyGameScheduler
for use with PyGame. See the chess example for how to use RxPY with your PyGame application.QtScheduler
for use with PyQt4, PyQt5, and PySide. See the timeflies example for how to use RxPY with your Qt application.WxScheduler
for use with wxPython. See the timeflies example for how to use RxPY with your wx application.
You can contribute by reviewing and sending feedback on code checkins, suggesting and trying out new features as they are implemented, register issues and help us verify fixes as they are checked in, as well as submit code fixes or code contributions of your own.
The main repository is at ReactiveX/RxPY. There are currently outdated mirrors at Reactive-Extensions/RxPy and CodePlex. Please register any issues to ReactiveX/RxPY/issues.
Note that the master branch is for releases only, so please submit any pull requests against the develop branch at ReactiveX/RxPY.
Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. Microsoft Open Technologies would like to thank its contributors, a list of whom are at http://rx.codeplex.com/wikipage?title=Contributors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.