Note
This document is in progress.
The module loads FISPACT JSON output files and converts to Polars dataframes with minor data normalization. This allows efficient data extraction and aggregation. Multiple JSON files can be combined using simple additional identification for different FISPACT runs. So far we use just two-dimensional identification by material and case. The case usually identifies certain neutron flux.
- export to DuckDB
- export to parquet files
Note
Currently available FISPACT v.5 API uses rather old python version (3.6). That prevents direct use of their API in our package (>=3.10). Check if own python integration with FISPACT is reasonable and feasible. Or provide own FISPACT python binding.
From PyPI
pip install xpypact
As dependency
poetry add xpypact
From source
pip install htpps://github.com/MC-kit/xpypact.git
from xpypact import FullDataCollector, Inventory def get_material_id(p: Path) -> int: ... def get_case_id(p: Path) -> int: ... jsons = [path1, path2, ...] material_ids = {p: get_material_id(p) for p in jsons } case_ids = {c: get_case_id(p) for p in jsons } collector = FullDataCollector() if sequential_load: for json in jsons: inventory = Inventory.from_json(json) collector.append(inventory, material_id=material_ids[json], case_id=case_ids[json]) else: # multithreading is allowed for collector as well task_list = ... # list of tuples[directory, case_id, tasks_sequence] threads = 16 # whatever def _find_path(arg) -> tuple[int, int, Path]: _case, path, inventory = arg json_path: Path = (Path(path) / inventory).with_suffix(".json") if not json_path.exists(): msg = f"Cannot find file {json_path}" raise FindPathError(msg) try: material_id = int(inventory[_LEN_INVENTORY:]) case_str = json_path.parent.parts[-1] case_id = int(case_str[_LEN_CASE:]) except (ValueError, IndexError) as x: msg = f"Cannot define material_id and case_id from {json_path}" raise FindPathError(msg) from x if case_id != _case: msg = f"Contradicting values of case_id in case path and database: {case_id} != {_case}" raise FindPathError(msg) return material_id, case_id, json_path with futures.ThreadPoolExecutor(max_workers=threads) as executor: mcp_futures = [ executor.submit(_find_path, arg) for arg in ( (task_case[0], task_case[1], task) for task_case in task_list for task in task_case[2].split(",") if task.startswith("inventory-") ) ] mips = [x.result() for x in futures.as_completed(mcp_futures)] mips.sort(key=lambda x: x[0:2]) # sort by material_id, case_id def _load_json(arg) -> None: collector, material_id, case_id, json_path = arg collector.append(from_json(json_path.read_text(encoding="utf8")), material_id, case_id) with futures.ThreadPoolExecutor(max_workers=threads) as executor: executor.map(_load_json, ((collector, *mip) for mip in mips)) collected = collector.get_result() # save to parquet files collected.save_to_parquets(Path.cwd() / "parquets") # or use DuckDB database import from xpypact.dao save import duckdb as db con = db.connect() save(con, collected) gamma_from_db = con.sql( """ select g, rate from timestep_gamma where material_id = 1 and case_id = 54 and time_step_number = 7 order by g """, ).fetchall()
Just follow ordinary practice:
Note
add references to FISPACT, pypact and used tools: poetry etc