/influxable

A lightweight python ORM / ODM / Client for InfluxDB

Primary LanguagePythonMIT LicenseMIT

Influxable

pypi version build status code coverage license: MIT

A lightweight python ORM / ODM / Client for InfluxDB

Table of Contents

Note

This project is currently in development.

A better documentation and testing scripts will be added in the next release.

Genesis

I worked on a project with InfluxDB. I needed to build an API for InfluxDB and to plug with Python libraries (scipy, pandas, etc ...).

That's why I decided to create this repository in order to deal with InfluxDB in a smooth way and to manipulate Python object.

Changelog

1.4.0

  • Add integration with Influxdb OSS 2.0 Authentication (Experimental)

1.3.0

  • Add group_by() method for GROUP BY tags instructions
  • Add range() method for GROUP BY time() instructions
  • Add into() method for INTO instructions
  • Add tz() method

1.2.1

  • Handle chinese characters.

Features

  • Add automation for measurement class generation (command: autogenerate)
  • Admin commands allowing to manage the database (ex: create_user(), show_series()).
  • Measurement class allowing to make queries in order to fetch/save points (ex: Measurement.where(), Measurement.bulk_save()).
  • Group by commands
  • Different serializers for easy data manipulation (ex: PandasSerializer).

Dependencies

  • Python 3 (Tested with Python 3.7.3)
  • InfluxDB (Tested with InfluxDB 1.5.4)

Installation

The package is available in pypi. You can install it via pip :

pip install influxable

Getting started

Connection

You can set your environment variable for the connection of InfluxDB in order to override the default values :

INFLUXDB_URL=http://localhost:8086
INFLUXDB_DATABASE_NAME=default

#Optional
INFLUXDB_USER=admin
INFLUXDB_PASSWORD=changme
INFLUXDB_PASSWORD=changme

# OSS 2.0
INFLUXDB_AUTH_TOKEN=mytoken

Then you just have to import the influxable package and create an instance of Influxable :

from influxable import Influxable

client = Influxable()

You can also set connection variable in Influxable constructor :

# Without authentication

client = Influxable(
    base_url='http://localhost:8086',
    database_name='default',
)

# With authentication

client = Influxable(
    base_url='http://localhost:8086',
    database_name='default',
    user='admin',
    password='changeme',
)

# With token authentication

client = Influxable(
    base_url='http://localhost:8086',
    database_name='default',
    token='my_token',
)

Measurement

from influxable import attributes, serializers
from influxable.measurement import Measurement

class TemperatureMeasurement(Measurement):
    parser_class = serializers.MeasurementPointSerializer # Default
    measurement_name = 'temperature'

    time = attributes.TimestampFieldAttribute()
    phase = attributes.TagFieldAttribute()
    value = attributes.FloatFieldAttribute()

Fields :

  • GenericFieldAttribute (IntegerFieldAttribute, FloatFieldAttribute, StringFieldAttribute, BooleanFieldAttribute)
  • TagFieldAttribute
  • TimestampFieldAttribute, DateTimeFieldAttribute

Parser Classes :

  • MeasurementPointSerializer (default)
  • JsonSerializer
  • FormattedSerieSerializer
  • FlatFormattedSerieSerializer
  • FlatSimpleResultSerializer
  • PandasSerializer

Simple Measurement

from influxable.measurement import SimpleMeasurement

my_measurement = SimpleMeasurement('temperature', ['value'], ['phase'])

Instanciation

point = TemperatureMeasurement(
  time=1568970572,
  phase="HOT",
  value=23.5,
)

Query

You can query with Measurement.get_query() :

from influxable.db import Field

points = TemperatureMeasurement\
  .get_query()\
  .select('phase', 'value')\
  .where(
     Field('value') > 15.2,
     Field('value') < 30.5,
  )\
  .limit(100)
  .evaluate()

You can also query with Query :

from influxable.db import Query, Field

points = Query()\
  .select('phase', 'value')\
  .from_measurements('temperature')\
  .where(
     Field('value') > 15.2,
     Field('value') < 30.5,
  )\
  .limit(100)
  .execute()

Saving Data

You can create data by using Measurement.bulk_save()

points = [
    TemperatureMeasurement(phase="HOT",value=10,time=1463289075),
    TemperatureMeasurement(phase="COLD",value=10,time=1463289076),
]
TemperatureMeasurement.bulk_save(points)

You can also create data with BulkInsertQuery

str_query = '''
temperature,phase=HOT value=10 1463289075000000000
temperature,phase=COLD value=10 1463289076000000000
'''

raw_query = BulkInsertQuery(str_query)
res = raw_query.execute()

Integration with OSS 2.0 (Experimental)

# Create the user
influx user create --name admin -- password admin

# Create the auth (in order to retrieve the token)
influx auth create --user admin --operator

# List yours tokens
influx auth list

# (Optional) export the INFLUXDB_AUTH_TOKEN
INFLUXDB_AUTH_TOKEN=my-token

# Create the config
influx config create --config-name defaut --host-url http://localhost:8086 --token NjIYagimNbX5MaZfisDsvuGGvtULdqIY-Wt8EP4eGk-3P9KftDtZjxXU4GocTMTfM0eglkuFJQyA9uF82ZeEoA== --org MyOrganisation

# Create the bucket
influx bucket create --name default

# List your bucket
influx bucket list

ID          Name        Retention   Shard group duration    Organization ID     Schema Type
4688727b9c388f5f    default     infinite    168h0m0s        b89cbec9670f29f8    implicit

# Create the dbrp (link database api v1 with bucket api v2)
influx v1 dbrp create --db default --rp default --bucket-id  4688727b9c388f5f --default

Auto Generation of Measurements

You can automatically generate measurement classes file with the bash command autogenerate

influxable autogenerate #(default to auto_generate_measurement.py)
influxable autogenerate -o measurement.py

Here is the output generated file :

# auto_generate_measurement.py

from influxable import attributes
from influxable.measurement import Measurement


class CpuMeasurement(Measurement):
    measurement_name = 'cpu'

    time = attributes.TimestampFieldAttribute(precision='s')
    value = attributes.FloatFieldAttribute()
    host = attributes.TagFieldAttribute()

Influxable commands

  • autogenerate : automatic generation of measurement classes
influxable autogenerate #(default to auto_generate_measurement.py)
influxable autogenerate -o measurement.py
  • populate : create a measurement filled with a set of random data
influxable populate
influxable populate --min_value 5 --max_value 35 -s 2011-01-01T00:00:00 -id 1
influxable populate --help

Influxable API

Influxable Class

The Influxable main app class is a singleton. You can access it via the method Influxable.get_instance()

__init__():
  • base_url : url to connect to the InfluxDB server (default = 'http://localhost:8086')
  • user : authentication user name (default = 'admin')
  • password : authentication user password (default = 'changeme')
  • database_name : name of the database (default = 'default')
create_connection() -> Connection:
  • base_url : url to connect to the InfluxDB server (default = 'http://localhost:8086')
  • user : authentication user name (default = 'admin')
  • password : authentication user password (default = 'changeme')
  • database_name : name of the database (default = 'default')
ping() -> bool:
  • verbose : enables verbose mode (default = True)
execute_query() -> json():
  • query: influxdb query to execute
  • method: http method of the request (default='get')
  • chunked: if enabled, responses will be chunked by series or by every 10,000 points (default=False)
  • epoch: specified precision of the timestamp [ns,u,µ,ms,s,m,h] (default='ns')
  • pretty: if enadble, the json response is pretty-printed (default=False)
write_points() -> bool:
  • points: data to write in InfluxDB line protocol format

ex: mymeas,mytag1=1 value=21 1463689680000000000

  • precision: specified precision of the timestamp [ns,u,µ,ms,s,m,h] (default='ns')
  • consistency: sets the write consistency for the point [any,one,quorum,all] (default='all')
  • retention_policy_name: sets the target retention policy for the write (default='DEFAULT')

InfluxDBApi Class

get_debug_requests() -> bool:
  • request : instance of InfluxDBRequest
get_debug_vars() -> bool:
  • request : instance of InfluxDBRequest
ping() -> bool:
  • request : instance of InfluxDBRequest
  • verbose : enables verbose mode (default = True)
execute_query() -> json():
  • request : instance of InfluxDBRequest
  • query: influxdb query to execute
  • method: http method of the request (default='get')
  • chunked: if enabled, responses will be chunked by series or by every 10,000 points (default=False)
  • epoch: specified precision of the timestamp [ns,u,µ,ms,s,m,h] (default='ns')
  • pretty: if enadble, the json response is pretty-printed (default=False)
write_points() -> bool:
  • request : instance of InfluxDBRequest
  • points: data to write in InfluxDB line protocol format

ex: mymeas,mytag1=1 value=21 1463689680000000000

  • precision: specified precision of the timestamp [ns,u,µ,ms,s,m,h] (default='ns')
  • consistency: sets the write consistency for the point [any,one,quorum,all] (default='all')
  • retention_policy_name: sets the target retention policy for the write (default='DEFAULT')

Connection Class

__init__():
  • base_url : url to connect to the InfluxDB server (default = 'http://localhost:8086')
  • user : authentication user name (default = 'admin')
  • password : authentication user password (default = 'changeme')
  • database_name : name of the database (default = 'default')
create() -> Connection:
  • base_url : url to connect to the InfluxDB server (default = 'http://localhost:8086')
  • user : authentication user name (default = 'admin')
  • password : authentication user password (default = 'changeme')
  • database_name : name of the database (default = 'default')

Measurement Class

fields

Must be an instance of class located in influxable.attributes

  • GenericFieldAttribute
  • IntegerFieldAttribute
  • FloatFieldAttribute
  • StringFieldAttribute
  • BooleanFieldAttribute
  • TagFieldAttribute
  • TimestampFieldAttribute
  • DateTimeFieldAttribute

Example :

class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    time = TimestampFieldAttribute(auto_now=True, precision='s')
    phase = TagFieldAttribute()
    value = IntegerFieldAttribute()
parser_class

Must be a class of influxable.serializers :

  • MeasurementPointSerializer (default)
  • JsonSerializer
  • FormattedSerieSerializer
  • FlatFormattedSerieSerializer
  • FlatSimpleResultSerializer
  • PandasSerializer
measurement_name

Name of the measurement in InfluxDB

__init__():

Set the attribute value of a Measurement

Example

point = MySensorMeasurement(value=0.5, phase="MOON")
get_query() -> Query:

Return an instance of Query which

Example

points = MySensorMeasurement\
  .get_query()\
  .select()\
  .where()\
  .limit()\
  .evaluate()
dict()

Return a dict of the point values

Example

point = MySensorMeasurement(value=0.5, phase="MOON")

point.dict()

# {'time': Decimal('1568970572'), 'phase': 'MOON', 'value': 0.5}
items()

Return an item list of the point values

Example

point = MySensorMeasurement(value=0.5, phase="MOON")

point.items()

# dict_items([('time', Decimal('1568970572')), ('phase', 'MOON'), ('value', 0.5)])
bulk_save()

Save a list of measurement point

points = [
    MySensorMeasurement(phase="moon",value=5,time=1463489075),
    MySensorMeasurement(phase="moon",value=7,time=1463489076),
    MySensorMeasurement(phase="sun",value=8,time=1463489077),
]
MySensorMeasurement.bulk_save(points)

Attributes

GenericFieldAttribute
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    temperature_value = GenericFieldAttribute(
      attribute_name="temp_v1",
      default="15",
      is_nullable=True,
      enforce_cast=False,
    )
IntegerFieldAttribute
  • min_value : an error is raised if the value is less than the min_value
  • max_value : an error is raised if the value is greater than the max_value
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    temperature_value = IntegerFieldAttribute(
      min_value=10,
      max_value=30,
    )
FloatFieldAttribute
  • max_nb_decimals : set the maximal number of decimals to display
  • min_value : an error is raised if the value is less than the min_value
  • max_value : an error is raised if the value is greater than the max_value
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    value = FloatFieldAttribute(
      max_nb_decimals=5,
    )
StringFieldAttribute
  • choices : an error is raised if the value is not in the list of string options
  • max_length : an error is raised if the string value length is greater than the max_length
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    position = FloatFieldAttribute(
      choices=['first', 'last'],
      max_length=7,
    )
BooleanFieldAttribute
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    is_marked = BooleanFieldAttribute(
      default=False,
    )
TagFieldAttribute
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    phase = TagFieldAttribute(
      default='MOON',
    )
TimestampFieldAttribute
  • auto_now : Set automatically the current date (default=False)
  • precision : Set the timestamp precision which must be one of [ns,u,ms,s,m,h] (default= 'ns')
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    time = TimestampFieldAttribute(
      auto_now=True,
      precision='s',
    )
DateTimeFieldAttribute
  • str_format : Set the arrow format of the timestamp to display (default: "YYYY-MM-DD HH:mm:ss")
  • auto_now : Set automatically the current date
  • precision : Set the timestamp precision which must be one of [ns,u,ms,s,m,h]
  • attribute_name : real name of the measurement attribute in database
  • default : set a default value if it is not filled at the instanciation
  • is_nullable : if False, it will raise an error if the value is null (default=True)
  • enforce_cast : if False, it will not raise an error when the value has not the desired type without casting (default=True).
class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'

    date = DateTimeFieldAttribute(
      attribute_name='time',
      auto_now=True,
      str_format='YYYY-MM-DD',
    )

InfluxDBResponse

__init__():
  • raw_json : the raw json response object
raw

Return the raw_json value

main_serie

Return the first serie from the series field in the raw_json value

series

Return the series field in the raw_json value

error

Return the error field in the raw_json value

Example of json raw response :

{
   "results":[
      {
         "statement_id":0,
         "series":[
            {
               "name":"mymeas",
               "columns":[
                  "time",
                  "myfield",
                  "mytag1",
                  "mytag2"
               ],
               "values":[
                  [
                     "2017-03-01T00:16:18Z",
                     33.1,
                     null,
                     null
                  ],
                  [
                     "2017-03-01T00:17:18Z",
                     12.4,
                     "12",
                     "14"
                  ]
               ]
            }
         ]
      }
   ]
}

Serializers

Serializers can be used in parser_class field of Measurement class.

class MySensorMeasurement(Measurement):
    measurement_name = 'mysensor'
    parser_class = serializers.BaseSerializer

It allow to change the output response format of a influxb request

# res is formatted with BaseSerializer
res = MySensorMeasurement.get_query().limit(10).evaluate()
BaseSerializer
# res is formatted with BaseSerializer
res
{'results': [{'statement_id': 0, 'series': [{'name': 'mysamplemeasurement', 'columns': ['time', 'value'], 'values': [[1570481055000000000, 10], [1570481065000000000, 20], [1570481075000000000, 30]]}]}]}
JsonSerializer
# res is formatted with JsonSerializer
res
'{"results": [{"statement_id": 0, "series": [{"name": "mysamplemeasurement", "columns": ["time", "value"], "values": [[1570481055000000000, 10], [1570481065000000000, 20], [1570481075000000000, 30]]}]}]}'
FormattedSerieSerializer
# res is formatted with FormattedSerieSerializer
res
[{'mysamplemeasurement': [{'time': 1570481055000000000, 'value': 10}, {'time': 1570481065000000000, 'value': 20}, {'time': 1570481075000000000, 'value': 30}]}]
FlatFormattedSerieSerializer
# res is formatted with FlatFormattedSerieSerializer
[{'time': 1570481055000000000, 'value': 10}, {'time': 1570481065000000000, 'value': 20}, {'time': 1570481075000000000, 'value': 30}]
FlatSimpleResultSerializer

This serializer is used only when the result set contains only one column

res = InfluxDBAdmin.show_databases()

# res is formatted with FlatSimpleResultSerializer
res
['_internal', 'example', 'test', 'telegraf', 'mydb', ...]
FlatSingleValueSerializer

This serializer is used only when the result set contains only one value

res = InfluxDBAdmin.show_measurement_cardinality()

# res is formatted with FlatSingleValueSerializer
res
2
PandasSerializer
# res is formatted with PandasSerializer
res                   time  value
0  1570481055000000000     10
1  1570481065000000000     20
2  1570481075000000000     30
MeasurementPointSerializer

This is the default serializer class for Measurement

[<MySensorMeasurement object at 0x7f49a16227f0>, <MySensorMeasurement object at 0x7f49a16228d0>, <MySensorMeasurement object at 0x7f49a1622438>]

Raw Query

  • str_query

Example :

from influxable.db import RawQuery
str_query = 'SHOW DATABASES'
res = RawQuery(str_query).execute()
from influxable.db import RawQuery
str_query = 'SELECT * FROM temperature LIMIT 10'
res = RawQuery(str_query).execute()

Query Class

You can generate an instance of Query via the initial Query constructor or from a measurement.

Example :

from influxable.db import Query
query = Query()
...
query = MySensorMeasurement.get_query()
...

Methods :

from_measurements()
  • *measurements

Example :

query = Query()\
  .from_measurements('measurement1', 'measurement2')

Render :

FROM measurement1, measurement2
select()
  • *fields

Example :

query = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')

Render :

SELECT value, phase
where()
  • *criteria

Example :

from influxable.db import Query, Field
query = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )

Render :

WHERE param1 > 800 AND param1 < 900
limit()
  • value

Example :

from influxable.db import Query, Field
query = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .limit(10)

Render :

LIMIT 10
slimit()
  • value

Example :

from influxable.db import Query, Field
query = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .limit(10)\
  .slimit(5)

Render :

SLIMIT 5
offset()
  • value

Example :

from influxable.db import Query, Field
query = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .offset(10)

Render :

OFFSET 10
soffset()
  • value

Example :

from influxable.db import Query, Field
query = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .offset(10)\
  .soffset(5)

Render :

SOFFSET 5
into()
  • *measurement

Example :

query = Query()\
  .select('param1')\
  .from_measurements('measurement1')\
  .into('measurement2')

Render :

SELECT param1 INTO measurement2 FROM measurement1
asc()

Example :

query = Query()\
  .select('param1')\
  .from_measurements('measurement1')\
  .asc()

Render :

SELECT param1 FROM measurement1 ORDER BY ASC
desc()

Example :

query = Query()\
  .select('param1')\
  .from_measurements('measurement1')\
  .desc()

Render :

SELECT param1 FROM measurement1 ORDER BY DESC
tz()

Example :

query = Query()\
  .select('param1')\
  .from_measurements('measurement1')\
  .tz('Europe/Paris')

Render :

SELECT param1 FROM measurement1 tz('Europe/Paris')
group_by()
  • *tags

Example :

query = Query()\
  .select('param1')\
  .from_measurements('measurement1')\
  .group_by('tag_1')

Render :

SELECT param1 FROM measurement1 GROUP BY tag_1
range_by()
  • *interval
  • *shift
  • *fill
  • *tags

Example :

query = Query()\
  .select('param1')\
  .from_measurements('measurement1')\
  .range_by('12s', shift='1d', tags=['tag1'], fill=3)

Render :

SELECT param1 FROM measurement1 GROUP BY time(12s,1d),tag1 fill(3)'
execute()

Execute the query and return the response

Example :

from influxable.db import Query, Field
res = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .execute()
res

Result :

{'results': [{'statement_id': 0, 'series': [{'name': 'measurement1', 'columns': ['time', 'value'], 'values': [[1570481055000000000, 10], [1570481065000000000, 20], [1570481075000000000, 30]]}]}]}
evaluate()

Execute the query and return the serialized response

  • parser_class (default=BaseSerializer for Query and MeasurementPointSerializer for Measurement)

Example with Query :

from influxable.db import Query, Field
res = Query()\
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .evaluate()
res

Result :

{'results': [{'statement_id': 0, 'series': [{'name': 'measurement1', 'columns': ['time', 'value'], 'values': [[1570481055000000000, 10], [1570481065000000000, 20], [1570481075000000000, 30]]}]}]}

Example with Measurement :

from influxable.db import Field
points = MySensorMeasurement.get_query()
  .select('param1', 'param2')\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .evaluate()
points

Result :

[<MySensorMeasurement object at 0x7f49a16227f0>, <MySensorMeasurement object at 0x7f49a16228d0>, <MySensorMeasurement object at 0x7f49a1622438>]
count()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .count()

Render :

SELECT COUNT(*)
distinct()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .distinct()

Render :

SELECT DISTINCT(*)
integral()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .integral()

Render :

SELECT INTEGRAL(*)
mean()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .mean()

Render :

SELECT MEAN(*)
median()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .median()

Render :

SELECT MEDIAN(*)
mode()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .mode()

Render :

SELECT MODE(*)
spread()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .spread()

Render :

SELECT SPREAD(*)
std_dev()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .std_dev()

Render :

SELECT STDDEV(*)
sum()
  • value (default='*')

Example :

from influxable.db import Query, Field
query = Query()\
  .from_measurements('measurement1')\
  .where(
      Field('param1') > 800,
      Field('param1') < 900,
  )\
  .sum()

Render :

SELECT SUM(*)

Query aggregations function

Usage :

from influxable.db.function import aggregations
res = Query()\
    .select(aggregations.Sum('value'))\
    .from_measurements('param1')\
    .execute()
Count
Distinct
Integral
Mean
Median
Mode
Spread
StdDev
Sum

Query selectors function

Usage :

from influxable.db.function import selectors
res = Query()\
    .select(selectors.Min('value'), selectors.Max('value'))\
    .from_measurements('param1')\
    .execute()
Bottom
First
Last
Max
Min
Percentile
Sample
Top

Query transformations function

Usage :

from influxable.db.function import selectors, transformations
res = Query()\
    .select(transformations.Abs('value'))\
    .from_measurements('param1')\
    .execute()
from influxable.db.function.selectors import Min, Max
from influxable.db.function.transformations import Abs
res = Query()\
    .select(Abs(Min('value')), Abs(Max('value')))\
    .from_measurements('param1')\
    .execute()
Abs
ACos
ASin
ATan
ATan2
Ceil
Cos
CumulativeSum
Derivative
Difference
Elapsed
Exp
Floor
Histogram
Ln
Log
Log2
Log10
MovingAverage
NonNegativeDerivative
NonNegativeDifference
Pow
Round
Sin
Sqrt
Tan

InfluxDBAdmin

alter_retention_policy()
  • policy_name
  • duration (default=None)
  • replication (default=None)
  • shard_duration (default=None)
  • is_default (default=False)
ALTER RETENTION POLICY {policy_name} ON {database_name} [DURATION {duration} REPLICATION {replication} SHARD DURATION {shard_duration} DEFAULT]
create_database()
  • new_database_name
  • duration (default=None)
  • replication (default=None)
  • shard_duration (default=None)
  • policy_name (default=False)
CREATE DATABASE {new_database_name} [WITH DURATION {duration} REPLICATION {replication} SHARD DURATION {shard_duration} NAME {policy_name}]
create_retention_policy()
  • policy_name
  • duration (default=None)
  • replication (default=None)
  • shard_duration (default=None)
  • is_default (default=False)
CREATE RETENTION POLICY {policy_name} ON {database_name} [DURATION {duration} REPLICATION {replication} SHARD DURATION {shard_duration} DEFAULT]
create_subscription()
  • subscription_name
  • hosts
  • any (default=False)
CREATE SUBSCRIPTION {subscription_name} ON {database_name} DESTINATIONS ANY/ALL {hosts}
create_user()
  • user_name
  • password
  • with_privileges (default=False)
CREATE USER {user_name} WITH PASSWORD {password} [WITH ALL PRIVILEGES]
delete()
  • measurements (default=[])
  • criteria (default=[])
DELETE FROM {measurements} WHERE {criteria}
drop_continuous_query()
  • query_name
DROP CONTINUOUS QUERY {query_name} ON {database_name}
drop_database()
  • database_name_to_delete
DROP DATABASE {database_name_to_delete}
drop_measurement()
  • measurement_name
DROP MEASUREMENT {measurement_name}
drop_retention_policy()
  • policy_name
DROP RETENTION POLICY {policy_name} ON {database_name}
drop_series()
  • measurements (default=[])
  • criteria (default=[])
DROP SERIES FROM {measurements} WHERE {criteria}
drop_subscription()
  • subscription_name
DROP SUBSCRIPTION {subscription_name} ON {full_database_name}
drop_user()
  • user_name
DROP USER {user_name}
explain()
  • query
  • analyze (default=False)
EXPLAIN [ANALYZE] {query}
grant()
  • privilege
  • user_name
GRANT {privilege} ON {database_name} TO {user_name}
kill()
  • query_id
KILL QUERY {query_id}
revoke()
  • privilege
  • user_name
REVOKE {privilege} ON {database_name} FROM {user_name}
show_field_key_cardinality()
  • exact (default=False)
SHOW FIELD KEY [EXACT] CARDINALITY
show_measurement_cardinality()
  • exact (default=False)
SHOW MEASUREMENT [EXACT] CARDINALITY
show_series_cardinality()
  • exact (default=False)
SHOW SERIES [EXACT] CARDINALITY
show_tag_key_cardinality()
  • key
  • exact (default=False)
SHOW TAG VALUES [EXACT] CARDINALITY WITH KEY = {key}
show_continuous_queries()
SHOW CONTINUOUS QUERIES
show_diagnostics()
SHOW DIAGNOSTICS
show_field_keys()
  • measurements (default=[])
SHOW FIELD KEYS FROM {measurements}
show_grants()
  • user_name
SHOW GRANTS FOR {user_name}
show_databases()
SHOW DATABASES
show_measurements()
  • criteria (default=[])
SHOW MEASUREMENTS WHERE {criteria}
show_queries()
SHOW QUERIES
show_retention_policies()
SHOW RETENTION POLICIES
show_series()
  • measurements (default=[])
  • criteria (default=[])
  • limit (default=None)
  • offset (default=None)
SHOW SERIES ON {database_name} [FROM {measurements} WHERE {criteria} LIMIT {limit} OFFSET {offset}]
show_stats()
SHOW STATS
show_shards()
SHOW SHARDS
show_shard_groups()
SHOW SHARD GROUPS
show_subscriptions()
SHOW SUBSCRIPTIONS
show_tag_keys()
  • measurements (default=[])
SHOW TAG KEYS [FROM {measurements}]
show_tag_values()
  • key
  • measurements (default=[])
SHOW TAG VALUES [FROM {measurements}] WITH KEY = {key}
show_users()
SHOW USERS

Exceptions

InfluxDBException
InfluxDBError
InfluxDBConnectionError
InfluxDBInvalidResponseError
InfluxDBInvalidChoiceError
InfluxDBInvalidTypeError
InfluxDBInvalidURLError
InfluxDBBadRequestError
InfluxDBBadQueryError
InfluxDBInvalidNumberError
InfluxDBInvalidTimestampError
InfluxDBUnauthorizedError
InfluxDBAttributeValueError

Testing

First, you need to install pytest via the file requirements-test.txt

pip install -r requirements-test.txt

Then, you can launch the pytest command.

pytest -v

Supporting

Feel free to post issues your feedback or if you reach a problem with influxable library.

If you want to contribute, please use the pull requests section.

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository

Contributors

Credits

References

License

MIT