LITE is an auto-tuning system for various Spark applications on large-scale datasets.
The project is implemented using python 3.6 and tested in Linux environment. Our system environment and cuda versions are as follows:
ubuntu 18.04
Hadoop 2.7.7
spark 2.4.7
HiBench 7.0
JAVA 1.8
SCALA 2.12.10
Python 3.6
Maven 4.15
CUDA Version: 10.1
To generate the training data for your own cluster environments, first install SparkBench https://github.com/CODAIT/spark-bench.
Run SparkBench applications using the following command:
python scripts/bo_sample.py <path_to_spark_bench_folders>
The log files will be saved on your spark history server, you can use the following command to download it:
hdfs dfs -get <path_to_spark_history_server_folders_on_hdfs> <local_folders>
Parse the log files generated by spark-bench to obtain stage status
python scripts/history_by_stage.py <spark_bench_conf_path> <history_dir> <result_path>
Note that the log file does not contain the data volume features of the workload, we need to add the data volume features through the configuration file in <spark_bench_conf_path>, result is like:
{
"AppId": "application_1616731661908_1527",
"AppName": "SVM Classifier Example",
"Duration": 23656,
"SparkParameters": ["spark.default.parallelism=6", "spark.driver.cores=6"......],
"StageInfo": {
"0": {
"duration": 3234,
"input": 134283264,
"output": 0,
"read": 0,
"write": 0
}......
},
"WorkloadConf": ["NUM_OF_EXAMPLES=1600000", "NUM_OF_FEATURES=100"]
}
Save as training data file
python scripts/build_dataset.py <result_path> <dataset_path>
The dataset files are written as comma-separated values files with a single header row.
AppId AppName Duration spark.default.parallelism spark.driver.cores spark.driver.memory spark.driver.maxResultSize spark.executor.instances spark.executor.cores spark.executor.memory spark.executor.memoryOverhead spark.files.maxPartitionBytes spark.memory.fraction spark.memory.storageFraction spark.reducer.maxSizeInFlight spark.shuffle.compress spark.shuffle.file.buffer spark.shuffle.spill.compress rows cols itr partitions stage_id duration input output read write code node_num cpu_cores cpu_freq mem_size mem_speed net_width
Obtain the stage code characteristics: enter the instrumentation folder, maven it into a jar package which name is preMain-1.0.jar
cd instrumentation
mvn clean package
and add the package to the spark-submit's shell file as follow:
spark-submit --class <workload_class> --master yarn --conf "spark.executor.cores=4" --conf "spark.executor.memory=5g" --conf "spark.driver.extraJavaOptions=-javaagent:<path_to_your_instrumentation_jar>/preMain-1.0.jar" <path_to_spark_bench>/<workload>/target/spark-example-1.0- SNAPSHOT.jar
The stage code is in /inst_log, you can change it by yourself; The file you get by instrumentation should be parsed by prediction_ml/spark_tuning/by_stage/instrumentation/all_code_by_stage/get_all_code.py
python get_all_code.py <folder_of_instrumentation> <code_reuslt_folder>
Our model is saved in prediction_nn,
You should use data_process_text.py、dag2data.py、dataset_process.py to get dictionary information、Process the edge and node information of the graph, and integrate all features.
python data_process_text.py <code_reuslt_folder>
python dag2data.py <log_folder>
python dataset_process.py
Dictionary information and graph information are saved in dag_data, integrated them by dataset_process.py, the final dataset is in the folder dataset.
Then use fast_train.py to train model.
python fast_train.py
You can change the config of model through config.py, and the model will be saved in the folder model_save.
You can also use trans_learn.py to finetune the model.
python trans_learn.py
nn_pred_1.py can test the model,we use predict_first_cold.py to predict the best combination of parameters and evaluate the performance.
python nn_pred_8.py
python predict_first_cold.py