How to calculate mean etc over multiple columns
SteveRosam opened this issue · 2 comments
SteveRosam commented
import os
from quixstreams import Application, State
import uuid
from dotenv import load_dotenv
from datetime import datetime
load_dotenv("./.env")
app = Application.Quix(str(uuid.uuid4()), auto_offset_reset="earliest")
input_topic = app.topic(os.environ["input"], value_deserializer='json')
#output_topic = app.topic(os.environ["output"], value_serializer='json')
sdf = app.dataframe(input_topic)
sdf = sdf[sdf.contains("location-speed")]
sdf = sdf[["Timestamp", "location-speed"]]
def reduce_mean(state: dict, row:dict):
state["sum_speed"] += row["location-speed"]
state["count_speed"] += 1
return state
def init_mean(row: dict):
return {
"sum_speed": row["location-speed"],
"count_speed": 1
}
sdf = sdf.tumbling_window(60000).reduce(reduce_mean, init_mean).final()
sdf = sdf.apply(lambda row: {
"timestamp": row["start"],
"mean_speed": row["value"]["sum_speed"] / row["value"]["count_speed"]
})
sdf = sdf.update(lambda row: print((row)))
#sdf = sdf.to_topic(output_topic)
if __name__ == "__main__":
app.run(sdf)
Discussion on internal slack:
https://quixworld.slack.com/archives/C04KXLCGQ94/p1709129051812289
daniil-quix commented
Hey @SteveRosam !
Thanks for the example.
Question: why not to use .mean()
instead for your case?
Here is how it could look like:
sdf = sdf[sdf.contains("location-speed")]
sdf = sdf[["Timestamp", "location-speed"]]
sdf = sdf.apply(lambda value: value['location-speed']).tumbling_window(60000).mean().final()
sdf = sdf.apply(lambda row: {
"timestamp": row["start"],
"mean_speed": row["value"]
})
sdf = sdf.update(lambda row: print((row)))
daniil-quix commented
The similar example is available in the library docs - https://quix.io/docs/quix-streams/windowing.html#reduce