Spark SQL
Spark SQL est un module Apache Spark qui offre une interface de programmation pour le traitement de données structurées à l'aide du langage SQL (Structured Query Language). Il permet aux utilisateurs d'exécuter des requêtes SQL sur des données stockées dans des formats variés tels que JSON, CSV et d'autres, ainsi que dans des sources de données externes telles que des bases de données relationnelles.
Structure de project
La structure de projet :
C:.
├───.idea
├───spark-warehouse
├───src
│ ├───main
├───classes
│ ├───org
│ │ ├───DataFrameFromCSVandJSON
│ │ ├───DataFrameFromDataBase
│ │ └───entities
│ └───TP_SparkSQL
│ └───Entities
└───generated-sources
└───annotations
Exercice 1
On souhaite développer pour une entreprise industrielle une application Spark qui traite les
incidents de chaque service. Les incidents sont stockés dans un fichier csv.
Voir le fichier en format CSV incidents.csv
dans le project.
1. Afficher le nombre d’incidents par service.
SparkSession sparkSession= SparkSession.builder()
.appName("SPARK SQL EXO1")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = sparkSession.read().option("header", true).csv("incidents.csv");
dataset.groupBy(col("service")).count().show();
2. Afficher les deux années où il a y avait plus d’incidents.
SparkSession sparkSession= SparkSession.builder()
.appName("SPARK SQL EXO1")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = sparkSession.read().option("header", true).csv("incidents.csv");
dataset.groupBy(col("date")).count().limit(2).select(col("date").as("JOUR"),col("count").as("NOMBRE DE INCIDENTS")).show();
Exercice 2
L’hôpital national souhaite traiter ces données au moyen d’une application Spark d’une manière parallèle est distribuée. L’hôpital possède des données stockées dans une base de données relationnel et des fichiers csv. L’objectif est de traiter ces données en utilisant Spark SQL à travers les APIs DataFrame et Dataset pour extraire des informations utiles afin de prendre des décisions.
Creation des trois tables dans la base de donnes relationnelles MYSQL
DB_HOSPITAL
Structure de
TABLE PATIENTS
TABLE MEDECINS
TABLE CONSULTATIONS
Travail a faire
1- Afficher le nombre de consultations par jour.
SparkSession sparkSession= SparkSession.builder()
.appName("SPARK SQL")
.master("local[*]")
.getOrCreate();
Map<String, String> options = Map.of("driver", "com.mysql.jdbc.Driver",
"url", "jdbc:mysql://localhost:3306/DB_HOSPITAL?createDatabaseIfNotExist=true",
"user","root",
"password","");
Dataset<Row> dataframe=sparkSession.read().format("jdbc")
.options(options)
.option("query","select * from consultations")
.load();
dataframe.groupBy(col("date_consultation").as("JOUR")).count().show();
Afficher le nombre de consultation par médecin.
NOM | PRENOM | NOMBRE DE CONSULTATION
SparkSession sparkSession= SparkSession.builder()
.appName("SPARK SQL")
.master("local[*]")
.getOrCreate();
Map<String, String> options = Map.of("driver", "com.mysql.jdbc.Driver",
"url", "jdbc:mysql://localhost:3306/DB_HOSPITAL?createDatabaseIfNotExist=true",
"user","root",
"password","");
Dataset<Row> dfConsultations=sparkSession.read().format("jdbc")
.options(options)
.option("query","select * from consultations")
.load();
Dataset<Row> dfPatients=sparkSession.read().format("jdbc")
.options(options)
.option("query","select * from patients")
.load();
Dataset<Row> dfMedecins=sparkSession.read().format("jdbc")
.options(options)
.option("query","select * from medecins")
.load();
// This tables contient two Columns
// ID_MEDECIN | COUNT
Dataset<Row> table1 = dfConsultations.groupBy(col("id_medecin")).count();
// ID | NOM | PRENOM
Dataset<Row> table2 = dfMedecins.select("ID","NOM","PRENOM");
Dataset<Row> joinTables = table2.join(table1, table1.col("id_medecin").equalTo(table2.col("id")), "inner");
joinTables.select(col("NOM"),col("PRENOM"),col("count").as("NOMBRE DE CONSULTATION")).show();
Afficher pour chaque médecin, le nombre de patients qu’il a assisté.
dataframe.groupBy(col("id_medecin").as("Num de MEDICINE")).count().show();