✨ push-based execution
Closed this issue · 5 comments
as a stepping stone for parallel execution, refactor operators to support push-based execution.
have been playing with writing a push-based version of the execution engine
from opteryx.third_party.travers import Graph
import time
import opteryx
from orso.tools import random_string
from orso import DataFrame
from typing import Optional
from opteryx.compiled.structures.hash_table import hash_join_map
from opteryx.utils.arrow import align_tables
EOS = object()
class Node:
def __init__(self, node_type: str):
self.node_type = node_type
self.execution_time = 0
self.calls = 0
self.records_in = 0
self.records_out = 0
self.identity = random_string()
def __call__(self, morsel: Optional[DataFrame]) -> Optional[DataFrame]:
if morsel is not None and morsel != EOS:
self.records_in += 1
start = time.time_ns()
result = self.execute(morsel)
self.execution_time += (time.time_ns() - start)
self.calls += 1
if result is not None and result != EOS:
self.records_out += 1
return result
def execute(self, morsel: Optional[DataFrame]) -> Optional[DataFrame]:
if morsel == EOS:
return morsel
return morsel + self.node_type
def __str__(self):
return f"{self.node_type} ({self.sensors()})"
def sensors(self):
return {"calls": self.calls, "execution_time": self.execution_time, "records_in": self.records_in, "records_out": self.records_out}
class PumpNode(Node):
def __init__(self, node_type: str, data):
Node.__init__(self, node_type)
self.data = data # Data that this pump will yield
def __call__(self, morsel: Optional[DataFrame]) -> Optional[DataFrame]:
self.calls += 1
start = time.time_ns()
self.execution_time += (time.time_ns() - start)
self.records_out += 1
yield opteryx.query(f"SELECT * FROM {self.data}")
start = time.time_ns()
yield EOS
class GreedyNode(Node):
def __init__(self, node_type: str):
Node.__init__(self, node_type)
self.collector:DataFrame = None
def execute(self, morsel: Optional[DataFrame]) -> Optional[DataFrame]:
if morsel == EOS:
return self.collector.group_by(["id"]).max(["name"])
if self.collector is None:
self.collector = morsel
else:
self.collector.append([m for m in morsel])
return None # Nothing to yield until EOS is received
class FilterNode(Node):
def __init__(self, node_type: str):
Node.__init__(self, node_type)
def execute(self, morsel: Optional[DataFrame]) -> Optional[DataFrame]:
if morsel == EOS:
return EOS
return morsel.query(lambda x: x[0] % 2 == 0)
class JoinNode(Node):
def __init__(self, node_type: str):
Node.__init__(self, node_type)
self.left_buffer = None
self.stream = 'left'
def execute(self, morsel: Optional[DataFrame]) -> Optional[DataFrame]:
if self.stream == 'left':
if morsel == EOS:
self.left_buffer = hash_join_map(self.left_buffer.arrow(), ["id"])
self.stream = 'right'
else:
self.left_buffer = morsel
return None
if morsel == EOS:
return EOS
l_indexes = []
r_indexes = []
right_hash_map = hash_join_map(morsel.arrow(), ["id"])
for k, v in right_hash_map.hash_table.items():
rows = self.left_buffer.get(k)
if rows:
l_indexes.extend(rows)
r_indexes.extend(v)
return DataFrame.from_arrow(align_tables(morsel.arrow(), morsel.arrow(), l_indexes, r_indexes))
import string
et = Graph()
# Nodes definitions
et.add_node("data_source_a", PumpNode("DataSourcePumpA", "$planets"))
et.add_node("data_source_b", PumpNode("DataSourcePumpB", "$satellites"))
et.add_node("transform_fruits_a", FilterNode("TransformFruitsA"))
et.add_node("transform_fruits_b", FilterNode("TransformFruitsB"))
et.add_node("join_fruits", JoinNode("JoinFruits"))
et.add_node("batch_group", GreedyNode("BatchGroup"))
et.add_node("enrich_nutrients", FilterNode("EnrichNutrients"))
# Edges between nodes to create the execution plan
et.add_edge("data_source_a", "transform_fruits_a")
et.add_edge("data_source_b", "transform_fruits_b")
et.add_edge("transform_fruits_a", "join_fruits")
et.add_edge("transform_fruits_b", "join_fruits")
et.add_edge("join_fruits", "batch_group")
class SerialExecutionEngine:
def __init__(self, plan: Graph):
self.plan = plan
def execute(self):
pump_nodes = self.plan.get_entry_points()
for pump_node in pump_nodes:
pump_instance = self.plan[pump_node]
for morsel in pump_instance(None):
yield from self.process_node(pump_node, morsel)
def process_node(self, nid, morsel):
node = self.plan[nid]
if isinstance(node, PumpNode):
children = [t for s, t, r in self.plan.outgoing_edges(nid)]
for child in children:
yield from self.process_node(child, morsel)
else:
result = node(morsel)
if result is not None:
children = [t for s, t, r in self.plan.outgoing_edges(nid)]
for child in children:
yield from self.process_node(child, result)
if len(children) == 0:
yield result
se = SerialExecutionEngine(et)
for _ in se.execute():
pass
print(_)
print(et.draw())
about 80% of tests passing in the initial set of changes, not all operators have been rewritten, some which have been rewritten before release, particularly the CROSS JOIN
which will be very memory hungry in it's initial rewritten form.
This isn't intended to be an improvement initially, this is an enabling change for concurrent processing.
Most of the operators have have an initial rewrite, two things have been observed:
The approach to joins is broken and needs rewriting.
The approach to traversing the plan needs updating, it doesn't keep track of left or right relations into joins.
It was suspected that joins would be the problem function.
20 failing tests, still needs a lot of work for services like CROSS JOIN that can create datasets larger than memory
5 failing tests
FAILURES
SELECT * FROM $planets CROSS JOIN UNNEST(('Earth', 'Moon')) AS n
Query failed with error <class 'opteryx.exceptions.InvalidInternalStateError'> but error None was expected
SELECT name, mission FROM $astronauts INNER JOIN UNNEST(missions) AS mission ON mission = name
Query failed with error <class 'opteryx.exceptions.InconsistentSchemaError'> but error None was expected
SELECT number FROM $astronauts CROSS JOIN UNNEST((1, 2, 3, 4, 5)) AS number
Query failed with error <class 'KeyError'> but error None was expected
SELECT * FROM $astronauts INNER JOIN UNNEST(alma_mater) AS n ON name = n
Query failed with error <class 'opteryx.exceptions.InconsistentSchemaError'> but error None was expected
SELECT * FROM $astronauts INNER JOIN UNNEST(alma_mater) AS n ON name = n WHERE GROUP = 10
Query failed with error <class 'opteryx.exceptions.InconsistentSchemaError'> but error None was expected