Mars is a tensor-based unified framework for large-scale data computation which scales Numpy, Pandas and Scikit-learn. Documentation.
Mars is easy to install by
pip install pymars
When you need to install dependencies needed by the distributed version, you can use the command below.
pip install 'pymars[distributed]'
For now, distributed version is only available on Linux and Mac OS.
When you want to contribute code to Mars, you can follow the instructions below to install Mars for development:
git clone https://github.com/mars-project/mars.git
cd mars
pip install -e ".[dev]"
More details about installing Mars can be found at getting started section in Mars document.
Mars tensor provides a familiar interface like Numpy.
Numpy | Mars tensor |
import numpy as np
a = np.random.rand(1000, 2000)
(a + 1).sum(axis=1) |
import mars.tensor as mt
a = mt.random.rand(1000, 2000)
(a + 1).sum(axis=1).execute() |
The following is a brief overview of supported subset of Numpy interface.
- Arithmetic and mathematics:
+
,-
,*
,/
,exp
,log
, etc. - Reduction along axes (
sum
,max
,argmax
, etc). - Most of the array creation routines
(
empty
,ones_like
,diag
, etc). What's more, Mars does not only support create array/tensor on GPU, but also support create sparse tensor. - Most of the array manipulation routines
(
reshape
,rollaxis
,concatenate
, etc.) - Basic indexing (indexing by ints, slices, newaxes, and Ellipsis)
- Advanced indexing (except combing boolean array indexing and integer array indexing)
- universal functions for elementwise operations.
- Linear algebra functions,
including product (
dot
,matmul
, etc.) and decomposition (cholesky
,svd
, etc.).
However, Mars has not implemented entire Numpy interface, either the time limitation or difficulty is the main handicap. Any contribution from community is sincerely welcomed. The main feature not implemented are listed below:
- Tensor with unknown shape does not support all operations.
- Only small subset of
np.linalg
are implemented. - Mars tensor doesn't implement interface like
tolist
andnditer
etc, because the iteration or loops over a large tensor is very inefficient.
Mars DataFrame provides a familiar interface like pandas.
Pandas | Mars DataFrame |
import numpy as np
import pandas as pd
df = pd.DataFrame(np.random.rand(100000000, 4),
columns=list('abcd')
print(df.sum()) |
import mars.tensor as mt
import mars.dataframe as md
df = md.DataFrame(mt.random.rand(100000000, 4),
columns=list('abcd')
print(df.sum().execute()) |
Mars learn provides a familiar interface like scikit-learn.
Scikit-learn | Mars learn |
from sklearn.datasets import make_blobs
from sklearn.decomposition import PCA
X, y = make_blobs(
n_samples=100000000, n_features=3,
centers=[[3, 3, 3], [0, 0, 0],
[1, 1, 1], [2, 2, 2]],
cluster_std=[0.2, 0.1, 0.2, 0.2],
random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_)
print(pca.explained_variance_) |
from mars.learn.datasets import make_blobs
from mars.learn.decomposition import PCA
X, y = make_blobs(
n_samples=100000000, n_features=3,
centers=[[3, 3, 3], [0, 0, 0],
[1, 1, 1], [2, 2, 2]],
cluster_std=[0.2, 0.1, 0.2, 0.2],
random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_.execute())
print(pca.explained_variance_.execute()) |
Mars supports eager mode which makes it friendly for developing and easy to debug.
Users can enable the eager mode by options, set options at the beginning of the program or console session.
>>> from mars.config import options
>>> options.eager_mode = True
Or use a context.
>>> from mars.config import option_context
>>> with option_context() as options:
>>> options.eager_mode = True
>>> # the eager mode is on only for the with statement
>>> ...
If eager mode is on, tensor will be executed immediately by default session once it is created.
>>> import mars.tensor as mt
>>> from mars.config import options
>>> options.eager_mode = True
>>> t = mt.arange(6).reshape((2, 3))
>>> print(t)
Tensor(op=TensorRand, shape=(2, 3), data=
[[0 1 2]
[3 4 5]])
Mars can scale in to a single machine, and scale out to a cluster with thousands of machines. Both the local and distributed version share the same piece of code, it's fairly simple to migrate from a single machine to a cluster due to the increase of data.
Running on a single machine including thread-based scheduling, local cluster scheduling which bundles the whole distributed components. Mars is also easy to scale out to a cluster by starting different components of mars distributed runtime on different machines in the cluster.
execute
method will by default run on the thread-based scheduler on a single machine.
>>> import mars.tensor as mt
>>> a = mt.ones((10, 10))
>>> a.execute()
Users can create a session explicitly.
>>> from mars.session import new_session
>>> session = new_session()
>>> session.run(a + 1)
>>> (a * 2).execute(session=session)
>>> # session will be released when out of with statement
>>> with new_session() as session2:
>>> session2.run(a / 3)
Users can start the local cluster bundled with the distributed runtime on a single machine. Local cluster mode requires mars distributed version.
>>> from mars.deploy.local import new_cluster
>>> # cluster will create a session and set it as default
>>> cluster = new_cluster()
>>> # run on the local cluster
>>> (a + 1).execute()
>>> # create a session explicitly by specifying the cluster's endpoint
>>> session = new_session(cluster.endpoint)
>>> session.run(a * 3)
After installing the distributed version on every node in the cluster, A node can be selected as scheduler and another as web service, leaving other nodes as workers. The scheduler can be started with the following command:
mars-scheduler -a <scheduler_ip> -p <scheduler_port>
Web service can be started with the following command:
mars-web -a <web_ip> -s <scheduler_endpoint> --ui-port <ui_port_exposed_to_user>
Workers can be started with the following command:
mars-worker -a <worker_ip> -p <worker_port> -s <scheduler_endpoint>
After all mars processes are started, users can run
>>> sess = new_session('http://<web_ip>:<ui_port>')
>>> a = mt.ones((2000, 2000), chunk_size=200)
>>> b = mt.inner(a, a)
>>> sess.run(b)
- Read contribution guide.
- Join the mailing list: send an email to mars-dev@googlegroups.com.
- Please report bugs by submitting a GitHub issue.
- Submit contributions using pull requests.
Thank you in advance for your contributions!