/FLiP-Pi-DeltaLake-Thermal

Apache Pulsar -> Sink -> DeltaLake

Primary LanguageHTMLApache License 2.0Apache-2.0

FLiP-Pi-DeltaLake-Thermal

Apache Pulsar -> Sink -> DeltaLake

Diagram

Python Pulsar Record

*** Note: current version requires no nulls, no maps. ***

class thermal(Record):
    uuid = String(required=True)
    ipaddress = String(required=True)
    cputempf = Integer(required=True)
    runtime = Integer(required=True)
    host = String(required=True)
    hostname = String(required=True)
    macaddress = String(required=True)
    endtime = String(required=True)
    te = String(required=True)
    cpu = Float(required=True)
    diskusage = String(required=True)
    memory = Float(required=True)
    rowid = String(required=True)
    systemtime = String(required=True)
    ts = Integer(required=True)
    starttime = String(required=True)
    datetimestamp = String(required=True)
    temperature = Float(required=True)
    humidity = Float(required=True)
    co2 =  Float(required=True)

Raspberry Pi Sensor Python Run

2022-07-15 09:08:06.115 INFO  [3034530880] HandlerBase:64 | [persistent://public/default/pi-sensors-partition-0, ] Getting connection from pool
2022-07-15 09:08:06.117 INFO  [3034530880] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-07-15 09:08:06.117 INFO  [3034530880] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650
2022-07-15 09:08:06.120 INFO  [3034530880] ClientConnection:370 | [192.168.1.204:47626 -> 192.168.1.230:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2022-07-15 09:08:06.127 INFO  [3034530880] ProducerImpl:189 | [persistent://public/default/pi-sensors-partition-0, ] Created producer on broker [192.168.1.204:47626 -> 192.168.1.230:6650]
SCD4X, Serial: d3efd3efd3ef
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_fui_20220715130806', 'ipaddress': '192.168.1.204', 'cputempf': 106, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890486.6318262', 'te': '0.0005743503570556641', 'cpu': 0.0, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130806_60188a3a-57f0-4bc3-819b-5b643e0de5b9', 'systemtime': '07/15/2022 09:08:12', 'ts': 1657890492, 'starttime': '07/15/2022 09:08:06', 'datetimestamp': '2022-07-15 13:08:11.308836+00:00', 'temperature': 27.6959, 'humidity': 31.7, 'co2': 1360.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_wcc_20220715130812', 'ipaddress': '192.168.1.204', 'cputempf': 106, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890492.320293', 'te': '0.0007376670837402344', 'cpu': 0.0, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130812_c59a07d1-8994-49e8-888a-44f82f7ed441', 'systemtime': '07/15/2022 09:08:17', 'ts': 1657890497, 'starttime': '07/15/2022 09:08:12', 'datetimestamp': '2022-07-15 13:08:16.084772+00:00', 'temperature': 27.2527, 'humidity': 32.3, 'co2': 1365.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_wvx_20220715130817', 'ipaddress': '192.168.1.204', 'cputempf': 106, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890497.0999746', 'te': '0.0007336139678955078', 'cpu': 3.3, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130817_af0783b4-12f4-49f3-b5e2-d4ad912d369b', 'systemtime': '07/15/2022 09:08:21', 'ts': 1657890501, 'starttime': '07/15/2022 09:08:17', 'datetimestamp': '2022-07-15 13:08:20.862417+00:00', 'temperature': 26.983, 'humidity': 32.78, 'co2': 1381.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_cgk_20220715130821', 'ipaddress': '192.168.1.204', 'cputempf': 105, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890501.8731027', 'te': '0.0007915496826171875', 'cpu': 0.0, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130821_bb918b7c-f074-4cd0-8315-ab7d773cd9d2', 'systemtime': '07/15/2022 09:08:26', 'ts': 1657890506, 'starttime': '07/15/2022 09:08:21', 'datetimestamp': '2022-07-15 13:08:25.636997+00:00', 'temperature': 26.7052, 'humidity': 33.3, 'co2': 1349.0}
^C2022-07-15 09:08:30.165 INFO  [3069380992] ClientImpl:495 | Closing Pulsar client with 1 producers and 0 consumers
2022-07-15 09:08:30.166 INFO  [3069380992] ProducerImpl:686 | [persistent://public/default/pi-sensors-partition-0, standalone-1-22] Closing producer for topic persistent://public/default/pi-sensors-partition-0
2022-07-15 09:08:30.169 INFO  [3034530880] ProducerImpl:729 | [persistent://public/default/pi-sensors-partition-0, standalone-1-22] Closed producer
2022-07-15 09:08:30.170 INFO  [3034530880] ClientConnection:1548 | [192.168.1.204:47626 -> 192.168.1.230:6650] Connection closed
2022-07-15 09:08:30.171 INFO  [3034530880] ClientConnection:1548 | [192.168.1.204:47624 -> 192.168.1.230:6650] Connection closed

Example Message


{
 "uuid": "thrml_zda_20220715182748",
 "ipaddress": "192.168.1.204",
 "cputempf": 108,
 "runtime": 0,
 "host": "thermal",
 "hostname": "thermal",
 "macaddress": "e4:5f:01:7c:3f:34",
 "endtime": "1657909668.7279365",
 "te": "0.0007398128509521484",
 "cpu": 1.8,
 "diskusage": "105078.0 MB",
 "memory": 9.0,
 "rowid": "20220715182748_fc4cbbb1-79da-4c1a-8991-78bd23c9f221",
 "systemtime": "07/15/2022 14:27:53",
 "ts": 1657909673,
 "starttime": "07/15/2022 14:27:48",
 "datetimestamp": "2022-07-15 18:27:52.492469+00:00",
 "temperature": 28.238,
 "humidity": 29.61,
 "co2": 992.0
}

Pulsar Delta Sink Configuration

  • conf/deltalakesink.yml


configs:
        type: "delta"
        maxCommitInterval: 120
        maxRecordsPerCommit: 10_000_000
        tablePath: "file:///opt/demo/lakehouse"
        processingGuarantees: "EXACTLY_ONCE"
        deltaFileType: "parquet"
        subscriptionType: "Failover"

Pulsar 2.9.* Sink Setup


bin/pulsar-admin topics delete "persistent://public/default/pi-sensors" --force
bin/pulsar-admin schemas delete "persistent://public/default/pi-sensors"
bin/pulsar-admin topics unload "persistent://public/default/pi-sensors"

bin/pulsar-admin sink stop --name delta_sink --namespace default --tenant public
bin/pulsar-admin sinks delete --tenant public --namespace default --name delta_sink

bin/pulsar-client consume "persistent://public/default/pi-sensors" -s "pisensorwatch" --subscription-type Failover

        
bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-lakehouse-2.9.2.24.nar --tenant public --namespace default --name delta_sink --sink-config-file conf/deltalakesink.yml --inputs "persistent://public/default/pi-sensors" --parallelism 1  --subs-name pisensorwatch --processing-guarantees EFFECTIVELY_ONCE


bin/pulsar-admin sinks get --tenant public --namespace default --name delta_sink

{
  "tenant": "public",
  "namespace": "default",
  "name": "delta_sink",
  "className": "org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector",
  "sourceSubscriptionName": "pisensorwatch",
  "sourceSubscriptionPosition": "Latest",
  "inputs": [
    "persistent://public/default/pi-sensors"
  ],
  "inputSpecs": {
    "persistent://public/default/pi-sensors": {
      "isRegexPattern": false,
      "schemaProperties": {},
      "consumerProperties": {},
      "poolMessages": false
    }
  },
  "configs": {
    "maxRecordsPerCommit": 10000000,
    "processingGuarantees": "EXACTLY_ONCE",
    "tablePath": "file:///opt/demo/lakehouse",
    "subscriptionType": "Failover",
    "deltaFileType": "parquet",
    "type": "delta",
    "maxCommitInterval": 120
  },
  "parallelism": 1,
  "processingGuarantees": "EFFECTIVELY_ONCE",
  "retainOrdering": true,
  "autoAck": true
}

bin/pulsar-admin sinks status --tenant public --namespace default --name delta_sink
{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReadFromPulsar" : 66,
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "numSinkExceptions" : 0,
      "latestSinkExceptions" : [ ],
      "numWrittenToSink" : 66,
      "lastReceivedTime" : 1657890506653,
      "workerId" : "c-standalone-fw-127.0.0.1-8080"
    }
  } ]
}

bin/pulsar-admin sinks list

[
  "scylla-airquality-sink",
  "delta_sink"
]


Output


/opt/demo/lakehouse/
total 12
-rw-r--r-- 1 root root    0 Jul 15 09:00 part-0000-f1bfd140-8da9-4507-8e8d-1d98c5faf67a-c000.snappy.parquet
drwxr-xr-x 2 root root 4096 Jul 15 09:00 _delta_log
-rw-r--r-- 1 root root 6968 Jul 15 09:00 part-0000-de100d45-4aae-444f-b078-85fd54662a8f-c000.snappy.parquet

cat /opt/demo/lakehouse/_delta_log/00000000000000000001.json
{"commitInfo":{"timestamp":1657890016312,"operation":"WRITE","operationParameters":{},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"pulsar-sink-connector-version-2.9.1 Delta-Standalone/0.3.0"}}
{"txn":{"appId":"pulsar-delta-sink-connector","version":1,"lastUpdated":1657890016310}}
{"add":{"path":"part-0000-de100d45-4aae-444f-b078-85fd54662a8f-c000.snappy.parquet","partitionValues":{},"size":4460,"modificationTime":1657890016310,"dataChange":true,"stats":"{}"}}

Querying Delta Lake (Scala Spark Shell)


bin/spark-shell --packages io.delta:delta-core_2.12:1.2.1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

import io.delta.tables._
import org.apache.spark.sql.functions._

val df = spark.read.format("delta").load("/opt/demo/lakehouse/")
df.printSchema()

df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").orderBy(col("datetimestamp").desc).show(5,100)

Querying Delta Lake pyspark


pyspark --packages io.delta:delta-core_2.12:1.2.1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

>>> df = spark.read.format("delta").load("/opt/demo/lakehouse/")
>>> df.show(10)
+--------------------+-------------+--------+-------+-------+--------+-----------------+------------------+--------------------+---+-----------+------+--------------------+-------------------+----------+-------------------+--------------------+-----------+--------+------+
|                uuid|    ipaddress|cputempf|runtime|   host|hostname|       macaddress|           endtime|                  te|cpu|  diskusage|memory|               rowid|         systemtime|        ts|          starttime|       datetimestamp|temperature|humidity|   co2|
+--------------------+-------------+--------+-------+-------+--------+-----------------+------------------+--------------------+---+-----------+------+--------------------+-------------------+----------+-------------------+--------------------+-----------+--------+------+
|thrml_aoi_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890015.3147886|0.000545024871826...|0.0|105078.3 MB|   9.0|20220715130015_dc...|07/15/2022 09:00:20|1657890020|07/15/2022 09:00:15|2022-07-15 13:00:...|    24.7132|    37.3|1332.0|
|thrml_crz_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890020.0944705|0.000758171081542...|0.0|105078.3 MB|   9.0|20220715130020_b1...|07/15/2022 09:00:24|1657890024|07/15/2022 09:00:20|2022-07-15 13:00:...|    24.6491|   37.46|1334.0|
|thrml_nre_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890024.8735719| 0.00072479248046875|0.0|105078.3 MB|   9.0|20220715130024_48...|07/15/2022 09:00:29|1657890029|07/15/2022 09:00:24|2022-07-15 13:00:...|    24.5797|   37.66|1336.0|
|thrml_umx_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890029.6493442|0.000723361968994...|0.0|105078.3 MB|   9.0|20220715130029_3d...|07/15/2022 09:00:34|1657890034|07/15/2022 09:00:29|2022-07-15 13:00:...|    24.4462|   37.83|1339.0|
|thrml_ijc_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890034.5295844|0.000723361968994...|6.5|105078.3 MB|   9.0|20220715130034_f9...|07/15/2022 09:00:39|1657890039|07/15/2022 09:00:34|2022-07-15 13:00:...|    24.3901|   38.07|1343.0|
|thrml_yfy_2022071...|192.168.1.204|     107|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890039.3061981|  0.0005035400390625|0.0|105078.3 MB|   9.0|20220715130039_f6...|07/15/2022 09:00:44|1657890044|07/15/2022 09:00:39|2022-07-15 13:00:...|      24.31|   38.23|1343.0|
|thrml_jzo_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890044.0833697|0.000773191452026...|0.0|105078.3 MB|   9.0|20220715130044_78...|07/15/2022 09:00:48|1657890048|07/15/2022 09:00:44|2022-07-15 13:00:...|    24.2486|   38.31|1345.0|
|thrml_onk_2022071...|192.168.1.204|     105|      0|thermal| thermal|e4:5f:01:7c:3f:34|  1657890048.86295|0.000727176666259...|4.9|105078.3 MB|   9.0|20220715130048_5b...|07/15/2022 09:00:53|1657890053|07/15/2022 09:00:48|2022-07-15 13:00:...|    24.1578|   38.41|1346.0|
|thrml_lrx_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890053.6392615|0.000728368759155...|0.0|105078.3 MB|   9.0|20220715130053_35...|07/15/2022 09:00:58|1657890058|07/15/2022 09:00:53|2022-07-15 13:00:...|    24.1311|   38.49|1347.0|
|thrml_wqr_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890058.4192047|0.000722885131835...|0.0|105078.3 MB|   9.0|20220715130058_4e...|07/15/2022 09:01:03|1657890063|07/15/2022 09:00:58|2022-07-15 13:01:...|    24.0536|   38.62|1348.0|
+--------------------+-------------+--------+-------+-------+--------+-----------------+------------------+--------------------+---+-----------+------+--------------------+-------------------+----------+-------------------+--------------------+-----------+--------+------+
only showing top 10 rows

df.printSchema()
root
 |-- uuid: string (nullable = false)
 |-- ipaddress: string (nullable = false)
 |-- cputempf: integer (nullable = false)
 |-- runtime: integer (nullable = false)
 |-- host: string (nullable = false)
 |-- hostname: string (nullable = false)
 |-- macaddress: string (nullable = false)
 |-- endtime: string (nullable = false)
 |-- te: string (nullable = false)
 |-- cpu: float (nullable = false)
 |-- diskusage: string (nullable = false)
 |-- memory: float (nullable = false)
 |-- rowid: string (nullable = false)
 |-- systemtime: string (nullable = false)
 |-- ts: integer (nullable = false)
 |-- starttime: string (nullable = false)
 |-- datetimestamp: string (nullable = false)
 |-- temperature: float (nullable = false)
 |-- humidity: float (nullable = false)
 |-- co2: float (nullable = false)
 
 
df.select("humidity","co2","datetimestamp","cputempf","ts", "uuid").show(100)

df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").show(5,100)

df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").orderBy("datetimestamp").show(5,100)

Python3 Setup

pip3 install -U pip
pip3 install delta-spark==2.0.0
pip3 install pyspark==3.2.0

Example Python3 App

from delta.tables import *
from pyspark.sql.functions import *
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("PulsarApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

df = spark.read.format("delta").load("/opt/demo/lakehouse/")
df.printSchema()
df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").orderBy("datetimestamp").show(5,100)


Validating Parquet Files


pip3 install parquet-tools -U

parquet-tools inspect /opt/demo/lakehouse/part-0000-f2185319-1e50-4ca9-a8dd-69b7e54e552e-c000.snappy.parquet

############ file meta data ############
created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
num_columns: 20
num_rows: 3
num_row_groups: 1
format_version: 1.0
serialized_size: 3289


############ Columns ############
uuid
ipaddress
cputempf
runtime
host
hostname
macaddress
endtime
te
cpu
diskusage
memory
rowid
systemtime
ts
starttime
datetimestamp
temperature
humidity
co2

############ Column(uuid) ############
name: uuid
path: uuid
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ipaddress) ############
name: ipaddress
path: ipaddress
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(cputempf) ############
name: cputempf
path: cputempf
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(runtime) ############
name: runtime
path: runtime
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(host) ############
name: host
path: host
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(hostname) ############
name: hostname
path: hostname
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(macaddress) ############
name: macaddress
path: macaddress
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(endtime) ############
name: endtime
path: endtime
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(te) ############
name: te
path: te
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(cpu) ############
name: cpu
path: cpu
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(diskusage) ############
name: diskusage
path: diskusage
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(memory) ############
name: memory
path: memory
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(rowid) ############
name: rowid
path: rowid
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(systemtime) ############
name: systemtime
path: systemtime
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ts) ############
name: ts
path: ts
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(starttime) ############
name: starttime
path: starttime
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(datetimestamp) ############
name: datetimestamp
path: datetimestamp
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(temperature) ############
name: temperature
path: temperature
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(humidity) ############
name: humidity
path: humidity
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(co2) ############
name: co2
path: co2
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

Flink SQL


CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

use catalog pulsar;

show tables;

describe `pi-sensors`

select uuid, cputempf, temperature, humidity, co2, ts, datetimestamp from `pi-sensors`;

select max(cputempf) as MaxCPU, max(temperature) as MaxTemp, max(humidity) as MaxHumidity, max(co2) as MaxCO2 
from `pi-sensors` ;

FlinkSQLTableDef

FlinkSQLResults

FlinkSQLCluster

Pulsar SQL


presto> desc pulsar."public/default"."pi-sensors";
      Column       |   Type    | Extra |                                   Comment
-------------------+-----------+-------+-----------------------------------------------------------------------------
 uuid              | varchar   |       | "string"
 ipaddress         | varchar   |       | "string"
 cputempf          | integer   |       | "int"
 runtime           | integer   |       | "int"
 host              | varchar   |       | "string"
 hostname          | varchar   |       | "string"
 macaddress        | varchar   |       | "string"
 endtime           | varchar   |       | "string"
 te                | varchar   |       | "string"
 cpu               | real      |       | "float"
 diskusage         | varchar   |       | "string"
 memory            | real      |       | "float"
 rowid             | varchar   |       | "string"
 systemtime        | varchar   |       | "string"
 ts                | integer   |       | "int"
 starttime         | varchar   |       | "string"
 datetimestamp     | varchar   |       | "string"
 temperature       | real      |       | "float"
 humidity          | real      |       | "float"
 co2               | real      |       | "float"
 __partition__     | integer   |       | The partition number which the message belongs to
 __event_time__    | timestamp |       | Application defined timestamp in milliseconds of when the event occurred
 __publish_time__  | timestamp |       | The timestamp in milliseconds of when event as published
 __message_id__    | varchar   |       | The message ID of the message used to generate this row
 __sequence_id__   | bigint    |       | The sequence ID of the message used to generate this row
 __producer_name__ | varchar   |       | The name of the producer that publish the message used to generate this row
 __key__           | varchar   |       | The partition key for the topic
 __properties__    | varchar   |       | User defined properties
(28 rows)

presto> select co2, humidity,temperature, datetimestamp, ts, uuid, __publish_time__ from pulsar."public/default"."pi-sensors";
  co2   | humidity | temperature |          datetimestamp           |     ts     |           uuid           |    __publish_time__
--------+----------+-------------+----------------------------------+------------+--------------------------+-------------------------
 1042.0 |    32.55 |     26.4275 | 2022-07-15 18:44:34.803190+00:00 | 1657910675 | thrml_ska_20220715184430 | 2022-07-15 14:44:35.807
 1045.0 |    32.44 |     26.4676 | 2022-07-15 18:44:39.582550+00:00 | 1657910680 | thrml_udc_20220715184435 | 2022-07-15 14:44:40.587
 1046.0 |    32.49 |     26.4596 | 2022-07-15 18:44:44.358047+00:00 | 1657910685 | thrml_sqj_20220715184440 | 2022-07-15 14:44:45.362
 1047.0 |    32.48 |     26.4729 | 2022-07-15 18:44:49.133060+00:00 | 1657910690 | thrml_kff_20220715184445 | 2022-07-15 14:44:50.137
 1047.0 |    32.49 |     26.4649 | 2022-07-15 18:44:53.912035+00:00 | 1657910694 | thrml_xll_20220715184450 | 2022-07-15 14:44:54.916
 1048.0 |    32.43 |     26.4783 | 2022-07-15 18:44:58.790554+00:00 | 1657910699 | thrml_zkr_20220715184454 | 2022-07-15 14:44:59.793
 1049.0 |     32.5 |     26.4569 | 2022-07-15 18:45:03.567302+00:00 | 1657910704 | thrml_ogp_20220715184459 | 2022-07-15 14:45:04.571
 1050.0 |    32.53 |     26.4409 | 2022-07-15 18:45:08.347039+00:00 | 1657910709 | thrml_xto_20220715184504 | 2022-07-15 14:45:09.351
 1050.0 |    32.51 |     26.4409 | 2022-07-15 18:45:13.118426+00:00 | 1657910714 | thrml_kpz_20220715184509 | 2022-07-15 14:45:14.123
 1051.0 |    32.48 |     26.4382 | 2022-07-15 18:45:17.897249+00:00 | 1657910718 | thrml_ghy_20220715184514 | 2022-07-15 14:45:18.902
 1052.0 |     32.5 |     26.4676 | 2022-07-15 18:45:22.676866+00:00 | 1657910723 | thrml_uei_20220715184518 | 2022-07-15 14:45:23.680
 1052.0 |    32.41 |     26.4676 | 2022-07-15 18:45:27.552872+00:00 | 1657910728 | thrml_mzi_20220715184523 | 2022-07-15 14:45:28.557
 1051.0 |    32.36 |     26.4569 | 2022-07-15 18:45:32.331788+00:00 | 1657910733 | thrml_luk_20220715184528 | 2022-07-15 14:45:33.336
 1052.0 |    32.33 |     26.4783 | 2022-07-15 18:45:37.104204+00:00 | 1657910738 | thrml_tbp_20220715184533 | 2022-07-15 14:45:38.107
 1051.0 |    32.33 |     26.5317 | 2022-07-15 18:45:41.881281+00:00 | 1657910742 | thrml_dqf_20220715184538 | 2022-07-15 14:45:42.887
 1050.0 |    32.43 |     26.5557 | 2022-07-15 18:45:46.659983+00:00 | 1657910747 | thrml_nal_20220715184542 | 2022-07-15 14:45:47.665
 1051.0 |    32.38 |     26.5824 | 2022-07-15 18:45:51.537531+00:00 | 1657910752 | thrml_bxo_20220715184547 | 2022-07-15 14:45:52.543
 1050.0 |    32.39 |      26.553 | 2022-07-15 18:45:56.316338+00:00 | 1657910757 | thrml_gtv_20220715184552 | 2022-07-15 14:45:57.322
 1050.0 |     32.4 |     26.5824 | 2022-07-15 18:46:01.095270+00:00 | 1657910762 | thrml_keo_20220715184557 | 2022-07-15 14:46:02.099
 1050.0 |    32.31 |     26.5664 | 2022-07-15 18:46:05.865263+00:00 | 1657910766 | thrml_jmz_20220715184602 | 2022-07-15 14:46:06.869
 1050.0 |     32.4 |      26.529 | 2022-07-15 18:46:10.643571+00:00 | 1657910771 | thrml_hfg_20220715184606 | 2022-07-15 14:46:11.648
 1048.0 |    32.33 |     26.5317 | 2022-07-15 18:46:15.416817+00:00 | 1657910776 | thrml_xtg_20220715184611 | 2022-07-15 14:46:16.420
 1048.0 |    32.29 |     26.5557 | 2022-07-15 18:46:20.295350+00:00 | 1657910781 | thrml_vzo_20220715184616 | 2022-07-15 14:46:21.304
 1048.0 |    32.33 |     26.5637 | 2022-07-15 18:46:25.074648+00:00 | 1657910786 | thrml_mds_20220715184621 | 2022-07-15 14:46:26.084
 1047.0 |    32.36 |     26.5611 | 2022-07-15 18:46:29.851385+00:00 | 1657910790 | thrml_qgv_20220715184626 | 2022-07-15 14:46:30.856
 1049.0 |    32.41 |     26.5691 | 2022-07-15 18:46:34.629965+00:00 | 1657910795 | thrml_taj_20220715184630 | 2022-07-15 14:46:35.639
 1049.0 |    32.38 |     26.5797 | 2022-07-15 18:46:39.409163+00:00 | 1657910800 | thrml_zdj_20220715184635 | 2022-07-15 14:46:40.413
 1049.0 |    32.24 |     26.6225 | 2022-07-15 18:46:44.281726+00:00 | 1657910805 | thrml_pri_20220715184640 | 2022-07-15 14:46:45.290
 1048.0 |    32.18 |     26.5958 | 2022-07-15 18:46:49.060992+00:00 | 1657910810 | thrml_mhu_20220715184645 | 2022-07-15 14:46:50.066
 

PulsarSQL

Apache Spark Structured Streaming SQL

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/pi-sensors").load()

dfPulsar.printSchema()

root
 |-- uuid: string (nullable = false)
 |-- ipaddress: string (nullable = false)
 |-- cputempf: integer (nullable = false)
 |-- runtime: integer (nullable = false)
 |-- host: string (nullable = false)
 |-- hostname: string (nullable = false)
 |-- macaddress: string (nullable = false)
 |-- endtime: string (nullable = false)
 |-- te: string (nullable = false)
 |-- cpu: float (nullable = false)
 |-- diskusage: string (nullable = false)
 |-- memory: float (nullable = false)
 |-- rowid: string (nullable = false)
 |-- systemtime: string (nullable = false)
 |-- ts: integer (nullable = false)
 |-- starttime: string (nullable = false)
 |-- datetimestamp: string (nullable = false)
 |-- temperature: float (nullable = false)
 |-- humidity: float (nullable = false)
 |-- co2: float (nullable = false)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start()

HTML Table Display

  • To Run: python3 -m http.server 8000
  • See thermal.html

ThermalHTML

References