/CIRCA

Causal Inference-based Root Cause Analysis

Primary LanguagePythonBSD 3-Clause "New" or "Revised" LicenseBSD-3-Clause

CIRCA

Code style: black Actions PyPI version Downloads

This project contains the code of baselines and simulation data generation for the KDD '22 paper, Causal Inference-Based Root Cause Analysis for Online Service Systems with Intervention Recognition. Experiment results can be found in figshare, where the code is corresponding to the commit 1522ddd7efd16db55e9f351fd70324501ce9134e.

Usage

This repository contains a Dockerfile to describe the necessary steps to setup the environment. To install this project as a package with pip, R package pcalg has to be installed manually.

Simulation Data Generation

python -m circa.experiment generate

Simulation Study

# Explore parameter combinations
python -m circa.experiment --max-workers 16 --model-params params-sim-tune.json tune
# Explore all the datasets with pre-defined parameters
python -m circa.experiment --model-params params-sim-run.json run
# Robustness evaluation
python -m circa.experiment robustness

Execute Rscript img/draw.sim.R to produce summaries under img/output.

  • params-sim-run.json is created according to img/output/best-sim-tuning.tex
  • To create parameter template, execute the following command
python -m circa.experiment params > default.json

Toolbox

CIRCA is designed as a toolbox with a set of interfaces.

Basic

Each root cause analysis algorithm is separated into two steps, namely graph construction and scoring.

The graph construction step should implement circa.graph.GraphFactory. GraphFactory.create takes data for analysis (an instance of circa.model.case.CaseData) and timestamp (float) when the algorithm is triggered. The output is a graph (an instance of circa.model.graph.Graph) for the fault under analysis.

The scoring step contains a sequence of scorers (instances of circa.alg.base.Scorer). Scorer.score of each scorer needs the following information:

  • The graph produced in the graph construction step,
  • data for analysis (an instance of circa.model.case.CaseData),
  • timestamp (float) when the algorithm is triggered, and
  • (optional) output of the previous scorer.

Scorer.score will generate a mapping from a node in the input graph to its score (circa.alg.base.Score). The design of the scorer sequence enables reusing scorers, i.e., two algorithms can share one scorer as a common step. Note that a scorer may drop some nodes in the input graph, performing as a filter.

circa.alg.common provides some common utilizations. For example, circa.alg.common.Model combines a graph factory and a sequence of scorers as a whole with optional names. Model.analyze will forward data and timestamp for them and produce an ordered sequence of scores. circa.alg.common.evaluate will further evaluate a model with a set of cases (instances of circa.model.case.Case, each of which combines data and the corresponding answers).

"""
An example showing the basic usage of CIRCA
"""
from collections import defaultdict
from typing import Dict
from typing import Sequence
from typing import Tuple

import networkx as nx
from sklearn.linear_model import LinearRegression

from circa.alg.ci import RHTScorer
from circa.alg.ci.anm import ANMRegressor
from circa.alg.common import Model
from circa.graph.common import StaticGraphFactory
from circa.model.case import CaseData
from circa.model.data_loader import MemoryDataLoader
from circa.model.graph import MemoryGraph
from circa.model.graph import Node


latency = Node("DB", "Latency")
traffic = Node("DB", "Traffic")
saturation = Node("DB", "Saturation")
# circa.model.graph.MemoryGraph is derived from circa.model.graph.Graph
graph = MemoryGraph(
    nx.DiGraph(
        {
            traffic: [latency, saturation],
            saturation: [latency],
        }
    )
)

# 1. Assemble an algorithm
# circa.graph.common.StaticGraphFactory is derived from circa.graph.GraphFactory
graph_factory = StaticGraphFactory(graph)
scorers = [
    # circa.alg.ci.RHTScorer is derived from circa.alg.common.DecomposableScorer,
    # which is further derived from circa.alg.base.Scorer
    RHTScorer(regressor=ANMRegressor(regressor=LinearRegression())),
]
model = Model(graph_factory=graph_factory, scorers=scorers)

# 2. Prepare data
mock_data = {
    latency: (10, 12, 11, 9, 100, 90),
    traffic: (100, 110, 90, 105, 200, 150),
    saturation: (5, 4, 5, 6, 90, 85),
}
mock_data_with_time: Dict[str, Dict[str, Sequence[Tuple[float, float]]]] = defaultdict(
    dict
)
for node, values in mock_data.items():
    mock_data_with_time[node.entity][node.metric] = [
        (index * 60, value) for index, value in enumerate(values)
    ]
data = CaseData(
    # circa.model.data_loader.MemoryDataLoader is derived from
    # circa.model.data_loader.DataLoader, which manages data with configurations
    data_loader=MemoryDataLoader(mock_data_with_time),
    sli=latency,
    detect_time=240,
    lookup_window=4,
    detect_window=2,
)

# 3. Conduct root cause analysis one minute after a fault is detected
print(model.analyze(data=data, current=data.detect_time + 60))

Advanced

circa.experiment supports comparison among models and parameter exploration, as mentioned for the simulation study. To conduct experiments with your own dataset, start from the following code named example.py. Execute python -m example -s run-new --output-dir output/test --report-dir report/test and find the report in report/test/report.csv. Find more command line parameters with python -m example -h.

"""
An example showing the advanced usage of CIRCA
"""
import argparse
import logging
import os
from typing import List

from circa.experiment import comparison
from circa.experiment.comparison.models import get_models
from circa.experiment.__main__ import get_parser
from circa.graph.structural import StructuralGraph
from circa.model.case import Case
from circa.utils import silence_third_party


BASE_DIR = os.path.abspath(os.path.dirname(__file__))


def run(args: argparse.Namespace):
    """
    Evaluate multiple models
    """
    data_dir: str = args.data_dir
    report_dir: str = args.report_dir
    os.makedirs(report_dir, exist_ok=True)

    logger = logging.getLogger(__package__)

    logger.info("Loading from %s", data_dir)
    # TODO: Prepare your data with answers here
    cases: List[Case] = []

    models, graph_factories = get_models(
        # TODO: Configure your own structural graph here
        # structural_graph_params=dict(
        #     structural_graph=StructuralGraph(filename="tests/alg/sgraph/index.yml"),
        # ),
        params=args.model_params,
        seed=args.seed,
        cuda=args.cuda,
        max_workers=1,
    )

    logger.info("Start running on %s with #models=%d", data_dir, len(models))
    comparison.run(
        models=models,
        cases=cases,
        graph_factories=graph_factories,
        output_dir=args.output_dir,
        report_filename=os.path.join(report_dir, "report.csv"),
        max_workers=1 if args.cuda else args.max_workers,
    )


def wrap_parsers(subparsers: argparse._SubParsersAction):
    """
    Add argparser for your own experiments
    """
    parser_params = dict(formatter_class=argparse.ArgumentDefaultsHelpFormatter)

    parser = argparse.ArgumentParser(add_help=False, **parser_params)
    parser.add_argument(
        "--data-dir",
        type=str,
        default=os.path.join(BASE_DIR, "data"),
        help="Data directory",
    )

    parser_run: argparse.ArgumentParser = subparsers.add_parser(
        "run-new",
        parents=[parser],
        help="Explore all combinations of model parameters",
        **parser_params,
    )
    parser_run.add_argument(
        "--output-dir", type=str, default="output", help="Output directory"
    )
    parser_run.add_argument(
        "--report-dir", type=str, default="report", help="Report directory"
    )
    parser_run.set_defaults(func=run)


def _main():
    parser, subparsers = get_parser()
    wrap_parsers(subparsers)
    parameters = parser.parse_args()

    if parameters.S:
        logging.basicConfig(level=logging.ERROR)
    elif parameters.V:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    if not parameters.V:
        silence_third_party()

    if "func" in parameters:
        parameters.func(parameters)
    else:
        parser.print_usage()


if __name__ == "__main__":
    _main()