A simple Python Client to async consume of OpenTSDB HTTP API The writing method (put) was initially done using a simple socket and the reading http request made trough the urllib3. We refactory everything to use only HTTP with grequests.
This package was made and tested with Python 2.7 and 3.5.
- grequests==0.3.0
Clone the repo for development purposes:
$ git clone https://github.com/venidera/otsdb_client.git
$ cd otsdb_client
$ virtualenv --python=python3.5 --prompt=" OTSDB Client " venv-3.5
$ source venv-3.5/bin/activate ; pip install pip setuptools --upgrade
$ python setup.py install
Install using pip:
$ pip install git+https://github.com/venidera/otsdb_client.git
To avoid errors, the development can be done in the repo directory but the tests must be executed in an isolated directory because __init__.py
files can hide import errors.
An example of installation for tests is presented below:
$ cd <repo>
$ mkdir tests
$ cd tests
$ virtualenv --python=python3.5 --prompt=" <package name> " venv-3.5
$ source venv-3.5/bin/activate
$ pip install pip setuptools --upgrade
$ cd .. ; python setup.py install ; cd -
The steps listed above will create a virtualenv with the package installed. If you change the code of the package you must execute cd .. ; python setup.py install ; cd -`` The tests must consider the import call of the package and its classes and functions must be tested. If everything works fine, commit changes and execute
git pull && git pushor a
pull request. All files created inside the
tests` directory must be kept untracked in the repository.
- Check for limits and blocking impacts: Done - Solution: use grequests to make async http calls;
- Implement worker or multithread: pending (inpired by potsdb);
- Implement the reading with chunks: pending;
- Cover all endpoints listed in the OpenTSDB HTTP API doc.
Create objects of otsdb_client to execute read/write operations with OpenTSDB.
The __init__
method will ping the server port and will cause an exception if it fails.
class Connection(object):
def __init__(self,server='localhost', port=4242):
...
Arguments:
- server (str): the IP address or URI of the server that will be accessed;
- port (int): the port that TSD is running.
Example:
>>> from otsdb_client import Connection
>>> c = Connection()
The following OpenTSDB HTTP API endpoints listed below can be consumed using this package:
Endpoint /api/version
>>> from otsdb_client import Connection
>>> c = Connection(server='192.168.30.80')
>>> c.version()
{'branch': 'next', 'version': '2.3.0-RC1', 'user': 'hobbes', 'repo': '/home/hobbes/opentsdb_OFFICIAL/build', 'short_revision': '306603c', 'full_revision': '306603c313fda191706479e7cc78a931f36ae89b', 'repo_status': 'MINT', 'host': 'clhbase', 'timestamp': '1462215755'}
>>>
Endpoint /api/config/filters
>>> from otsdb_client import Connection
>>> c = Connection(server='192.168.30.80')
>>> c.filters()
{'iwildcard': {'description': 'Performs ...'}, 'not_literal_or': {'description': 'Accepts ...'}, 'regexp': {'description': 'Provides full...'}, 'wildcard': {'description': 'Performs pre, post and in-fix ...'}, 'literal_or': {'description': 'Accepts one or ...'}, 'not_iliteral_or': {'description': 'Accepts one or more ...'}, 'iliteral_or': {'description': 'Accepts one ...'}}
>>>
Endpoint /api/stats
>>> from otsdb_client import Connection
>>> c = Connection(server='192.168.30.80')
>>> c.statistics()
[{'metric': 'tsd.connectionmgr.connections', 'timestamp': 1471268960, 'tags': {'type': 'open', 'host': 'opentsdb-server'}, 'value': '1'}, {'metric': 'tsd.connectionmgr.connections', 'timestamp': 1471268960, 'tags': {'type': 'rejected', 'host': 'opentsdb-server'}, 'value': '0'},...]
>>>
It's a list()
created in the __init__
method. It contains the aggregators returned by the endpoint aggregators
.
Endpoint /api/aggregators
>>> from otsdb_client import Connection
>>> c = Connection(server='192.168.30.80')
>>> c.aggregators
['mult', 'p90', 'zimsum', 'mimmax', 'sum', 'p50', 'none', 'p95', 'ep99r7', 'p75', 'p99', 'ep99r3', 'ep95r7', 'min', 'avg', 'ep75r7', 'dev', 'ep95r3', 'ep75r3', 'ep50r7', 'ep90r7', 'mimmin', 'p999', 'ep50r3', 'ep90r3', 'ep999r7', 'last', 'max', 'count', 'ep999r3', 'first']
>>>
Endpoint /api/put
Endpoint /api/suggest
>>> from otsdb_client import Connection
>>> c = Connection(server='192.168.30.80')
>>> c.suggest(type='metrics',q='',max='10')
['test_put', 'test_put1', 'test_put2']
>>> c.suggest(type='tagk',q='',max='10')
['key', 'tagk']
>>> c.suggest(type='tagv',q='',max='10')
['0', '1', 'tagv']
Endpoint /api/query
Endpoint /api/query/exp
Insert a point (timestamp+value) into a Time Serie (Metric + Tags). At this moment, only one point can be added per call.
def put(self,metric,ts=None,value=0.0,tags=dict()):
...
Better take a look at OpenTSDB data model and naming schema.
- metric (str): the name of the metric;
- ts (int - Optional): point's integer timestamp (Epoch/Unix timestamp), if it is not passed will assume
ts = int(mktime(datetime.now().timetuple()))
; - value (float): the value of the point;
- tags (dict()): a dictionary with tagn (tag name) equals tagv (tag value). Example:
tags={'tag1':'valtag1','tag2':'valtag2'}
.
Storing memory used for a server at this moment. So, metric sys.mem.used
and tag host
:
c.put(metric='sys.mem.used',value=4321,tags={'host':'server1'})
Now insert a point (ts/value) for a metric in 2012-01-10 13:00
:
from datetime import datetime
from time import mktime
ts = int(mktime(datatime(2012,01,10,13,00).timetuple()))
c.put(metric='metric.name',ts=ts,value=321.20,tags={'tagname':'tagvalue'})
Read points (aggregated or not) of a time serie. At this moment, only simple use of the /api/query
endpoint was implemented. The aggregator defined will be validated with the /api/aggregators
endpoint results.
def query(self, metric, aggr='sum', tags=dict(), start='1h-ago', end=None, nots=False,\
tsd=True, json=False,show_summary=False,union=False,chunked=False):
...
Supports only one metric.
- metric (str) : the metric name;
- aggr (str) : an aggregator (example:
sum
, take a look here); - tags (dict) : tags to the points that defined a timeserie (Naming schema);
- start (str) : starting time for the query (look here);
- end (str - Optional): a end time for the query (default = now);
- nots (bool): it's NoTimeStamps in results, if is defined as
True
; - tsd (bool): return timestamps as datetime objects for
True
, and integer timestamps forFalse
; - json (bool): if True will return the exact response of the http request;
- show_summary (bool): will add information summary of the query when defined
True
; - union (bool) : return the points of the time series (Metric+Tags) in one list, union for different tags (Be careful here);
- chunked: will be implemented in future, to stream over urllib3.
Query a metric for values of all its tags since 1 day ago:
results = c.query(metric='metric_name',aggr='sum',tags={'tagn':'*'},start='1d-ago')
>>> from otsdb_client import Connection
>>> c = Connection(server='localhost', port=4242)
>>> c.put(metric='test_put',timestamps=[int(mktime(datetime.now().timetuple()))],values=[2000.00],tags={'tagk':'tagv'})
>>> c.query(metric='test_put',aggr='sum',tags={'test':'*'},start='1h-ago')
{'results': [{'metric': u'test_put', 'values': [2000.0], 'ts': [datetime.datetime(2016, 2, 4, 0, 28, 19)], 'tags': {u'test': u'client', u'type': u'telnet'}}]}
>>> c.query(metric='test_put',aggr='sum',tags={'test':'*'},start='1h-ago',union=True)
{'results': {'values': [2000.0], 'ts': [datetime.datetime(2016, 2, 4, 0, 28, 19)]}}
Venidera's development team:
This package is released and distributed under the license GNU GPL Version 3, 29 June 2007.