CodeForAfrica/sensors.AFRICA-api

Use TimeSeries DB

karimkawambwa opened this issue · 1 comments

Convert API to a time series database in this PR #63

Confirm the priority level of this with @chegejames and if we still want to move to this direction.

The app will still keep track of sensors, owners and locations but the data will exist in the TimeSeries database.

cc @kilemensi


All management/commands except upload to CKAN should not be needed after this including the caching JSON files since InfluxDB is super fast. However, make sure the routes don't change to ensure no breaking.

Here is the script to pull data from production and move to influxdb:

### PostgreSQL DB info ###
from influxdb import InfluxDBClient
import dj_database_url  # for python 3+ use: from urllib.parse import urlparse
import psycopg2
import psycopg2.extras
from geohash import encode
# postgresql_table_name = ""

result = dj_database_url.parse("<db url>")

conn = psycopg2.connect(
    database=result["NAME"],
    user=result["USER"],
    password=result["PASSWORD"],
    host=result["HOST"],
    port=result["PORT"]
)

### InfluxDB info ####
# influx_db_name = "sensorsafrica"
influxClient = InfluxDBClient(
    'localhost',
    8086,
    'sensorsafrica',
    'sensorsafrica',
    'sensorsafrica',
    timeout=10,
    ssl=False,
    verify_ssl=False,
)
# influxClient.delete_database(influx_db_name)
# influxClient.create_database(influx_db_name)

'''
Generates an collection of influxdb points from the given SQL records
'''

influxClient.drop_measurement("air")

# query relational DB for all records
curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
# curr = conn.cursor(dictionary=True)
curr.execute("select * from sensors_sensordatavalue")
row_count = 0
# process 1000 records at a time
while True:
    sensor_datavalues = curr.fetchmany(10)
    influx_points = []
    for sensor_datavalue in sensor_datavalues:
        c = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

        c.execute("select * from sensors_sensordata sd where '%s' = sd.id" %
                  sensor_datavalue["sensordata_id"])
        sensor_data = c.fetchone()
        c.execute("select * from sensors_sensor s where '%s' = s.id" %
                  sensor_data["sensor_id"])
        sensor = c.fetchone()
        c.execute("select * from sensors_sensortype st where '%s' = st.id" %
                  sensor["sensor_type_id"])
        sensor_type = c.fetchone()
        c.execute("select * from sensors_node n where '%s' = n.id" %
                  sensor["node_id"])
        node = c.fetchone()
        c.execute("select * from sensors_sensorlocation l where '%s' = l.id limit 1" %
                  node["location_id"])
        location = c.fetchone()
        try:
            influx_points.append({
                "measurement": 'air',
                "tags": {
                    "node_uid": node["uid"],
                    "node_owner": node["owner_id"],
                    "node_is_indoors": node["indoor"],
                    "node_is_at_height": node["height"],
                    "sensor_type": sensor_type["uid"],
                    "sensor_value_type": sensor_datavalue["value_type"],
                    "sensor_manufacturer": sensor_type["manufacturer"],
                    "lat": location["latitude"],
                    "lng": location["longitude"],
                    "city": location["city"],
                    "country": location["country"],
                    "industry_in_area": location["industry_in_area"],
                    "geohash": encode(location["latitude"], location["longitude"])
                },
                "time": sensor_datavalue["created"],
                "fields": {
                    sensor_datavalue["value_type"]: float(
                        sensor_datavalue["value"])
                }
            })
        except:
            print("fail")
    influxClient.write_points(influx_points)
    print("written")
    row_count += 10
    if len(sensor_datavalues) < 10:
        break
conn.close()