/RxPY

Reactive Extensions for Python

Primary LanguagePythonApache License 2.0Apache-2.0

Travis Build Status Coveralls Coverage Status PyPI Downloads per month

The Reactive Extensions for Python (RxPY)

A library for composing asynchronous and event-based programs using observable collections and LINQ-style query operators in Python

The main repository is at ReactiveX/RxPY. There are currently outdated mirrors at Reactive-Extensions/RxPy and CodePlex. Please register any issues to ReactiveX/RxPY/issues, and make sure your pull requests is made against the develop branch.

About the Reactive Extensions

The Reactive Extensions for Python (RxPY) is a set of libraries for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

Whether you are authoring a client-side or server-side application in Python, you have to deal with asynchronous and event-based programming as a matter of course.

Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, tweets, computer events, web service requests, etc.), and subscribe to the event stream using the Observer object. The Observable notifies the subscribed Observer instance whenever an event occurs.

Because observable sequences are data streams, you can query them using standard LINQ query operators implemented by the Observable type. Thus you can filter, map, reduce, compose and perform time-based operations on multiple events easily by using these static LINQ operators. In addition, there are a number of other reactive stream specific operators that allow powerful queries to be written. Cancellation, exceptions, and synchronization are also handled gracefully by using the methods on the Observable object.

Install

RxPy runs on Python 2.7, 3.4, PyPy and IronPython

To install RxPY:

pip install rx

Note that pip may be called pip3 if your are using Python3.

Tutorials

Differences from .NET and RxJS

RxPY is a fairly complete implementation of Rx v2.2 with more than 134 query operators, and over 1100 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from RxNET and RxJava in terms of threading and blocking operators.

RxPY follows PEP 8, so all function and method names are lowercase with words separated by underscores as necessary to improve readability.

Thus .NET code such as:

var group = source.GroupBy(i => i % 3);

need to be written with an _ in Python:

group = source.group_by(lambda i: i % 3)

With RxPY you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).

res = Observable.timer(5000) # Yes
res = Observable.timer(5000, 1000) # Yes
res = Observable.timer(5000, 1000, Scheduler.timeout) # Yes
res = Observable.timer(5000, scheduler=Scheduler.timeout) # Yes, but must name

res = Observable.timer(5000, Scheduler.timeout) # No, this is an error

Thus when an operator like Observable.timeout has multiple optional arguments you should name your arguments. At least the arguments marked as optional.

Python Alignment

Disposables implements a context manager so you may use them in with statements.

Observable sequences may be concatenated using +, so you can write:

xs = Observable.from_([1,2,3])
ys = Observable.from_([4,5,6])
zs = xs + ys  # Concatenate observables

Observable sequences may be repeated using *=, so you can write:

xs = Observable.from_([1,2,3])
ys = xs * 4

Observable sequences may be sliced using [start:stop:step], so you can write:

xs = Observable.from_([1,2,3,4,5,6])
ys = xs[1:-1]

Observable sequences may be turned into an iterator so you can use generator expressions, or iterate over them (uses queueing and blocking).

xs = Observable.from_([1,2,3,4,5,6])
ys = xs.to_blocking()
zs = (x*x for x in ys if x > 3)
for x in zs:
    print(x)

Schedulers

In RxPY you can choose to run fully asynchronously or you may decide to schedule work and timeouts using threads.

For time and scheduler handing you will need to supply datetime for absolute time values and timedelta for relative time. You may also use int to represent milliseconds.

RxPY also comes with batteries included, and has a number of Python specific mainloop schedulers to make it easier for you to use RxPY with your favorite Python framework.

  • AsyncIOScheduler for use with AsyncIO. (requires Python 3.4 or trollius, a port of asyncio compatible with Python 2.6-3.5).
  • IOLoopScheduler for use with Tornado IOLoop. See the autocomplete and konamicode examples for how to use RxPY with your Tonado application.
  • GEventScheduler for use with GEvent. (Python 2.7 only).
  • TwistedScheduler for use with Twisted.
  • TkinterScheduler for use with Tkinter. See the timeflies example for how to use RxPY with your Tkinter application.
  • PyGameScheduler for use with PyGame. See the chess example for how to use RxPY with your PyGame application.
  • QtScheduler for use with PyQt4, PyQt5, and PySide. See the timeflies example for how to use RxPY with your Qt application.
  • WxScheduler for use with wxPython. See the timeflies example for how to use RxPY with your wx application.

Contributing

You can contribute by reviewing and sending feedback on code checkins, suggesting and trying out new features as they are implemented, register issues and help us verify fixes as they are checked in, as well as submit code fixes or code contributions of your own.

Note that the master branch is for releases only, so please submit any pull requests against the develop branch at ReactiveX/RxPY.

License

Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. Microsoft Open Technologies would like to thank its contributors, a list of whom are at http://rx.codeplex.com/wikipage?title=Contributors.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.