Groupe : PAFFME
## Récupération des tweets
On a fait un utilitaire en Node.js qui utilise l'API Streaming de Twitter :
const Twit = require('twit')
const T = new Twit({
consumer_key: '',
consumer_secret: '',
access_token: '',
access_token_secret: '',
timeout_ms: 60*1000,
strictSSL: true
})
const fs = require('fs');
const stream = T.stream('statuses/sample', { language: 'en' })
let i = 0;
stream.on('tweet', function (tweet) {
console.log(++i);
fs.appendFileSync('tweets.txt', JSON.stringify({created_at: tweet.created_at, text: tweet.text}) + '\n');
})
stream.on('error', function(err) {
console.error(err);
});
Cela créé un fichier JSON qui est mis à jour pour chaque nouveau tweet reçu. On a galéré à mettre ce JSON dans Hive, du coup on a fait un autre utilitaire pour mettre ça dans un CSV :
const createCsvWriter = require('csv-writer').createObjectCsvWriter;
const csvWriter = createCsvWriter({
path: 'out.csv',
header: [
{id: 'text', title: 'text'},
{id: 'tdate', title: 'tdate'},
{id: 'hashtags', title: 'hashtags'},
]
});
const fs = require('fs');
const readline = require('readline');
const lineReader = readline.createInterface({
input: fs.createReadStream('tweets.txt'),
});
const data = [];
lineReader.on('line', function(line) {
const parsedLine = JSON.parse(line);
data.push({
tdate: Math.round(new Date(parsedLine.created_at).getTime() / 1000),
text: parsedLine.text.replace(/(\r\n|\n|\r)/gm, ""),
hashtags: parsedLine.text.match(/#\w+/gm)
});
});
lineReader.on('close', function() {
csvWriter.writeRecords(data).then(() => console.log('done'));
});
Dans ce script pour transformer le JSON en CSV on en profite pour extraire les hashtags et retirer les \n du texte du tweet (sinon il y a beaucoup de ligne à NULL dans Hive).
On a dû récuperer 180k tweets en 5/6h qui sont aujourd'hui dans Hive.
Se connecter au SSH :
ssh paffme@edge3.sagean.fr
Copie du .csv en local vers le serveur distant :
scp ./final_out.csv paffme@edge3.sagean.fr:~/
hdfs dfs -mkdir data && hdfs dfs -mkdir data/tweet_v2
hdfs dfs -put final_out.csv /user/paffme/data/tweet_v2
En-tête :
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession \
.builder \
.appName("Twitter-App") \
.enableHiveSupport() \
.getOrCreate()
Creation de la table pour les tweets
text STRING
tdate STRING
hashtags STRING
spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS paffme.tweet_v7(text STRING, tdate STRING, hashtags STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE Location '/user/paffme/data/tweet_v2'")
Creation de la table pour les statistiques
text STRING
tdate STRING
hashtags STRING
spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS paffme.tweet_v9(hashtag STRING, occurences INT, from BIGINT, to BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE")
Si ca fonctionne pas, passer par l'interface HUE
Cf le projet dans le dossier spark
On prend en paramètre du programme l'intervalle à utiliser (from/to)
Ensuite une recherche est faite dans Hive pour filtrer les tweets
Il y a ensuite un comptage et classement des hashtags
On insère dans la table statistics de la base les données qu'on vient de calculer
On affiche le top 10 sur l'intervalle demandé
spark-submit --deploy-mode client --class fr.xebia.xke.SparkMetricsExample spark_metrics-1.0-SNAPSHOT-shaded.jar
pour lancer le jar, on peut aussi le faire en mode cluster pour faire ça bien
- Récupérer les tweet en anglais et les stocker sur HDFS Node.js
- Préparer les tweet pour un traitement optimal par la suite Node.js
- implémenter l’algorithme de kmeans (interdit d’utiliser celui fourni dans spark)
- Chaque Heure -> on arrive pas à vérifier via les logs que c'est ok, la config est faite
- Calculer le Top 10 des hashtag les plus utilisé dans le code Spark
- Pour chaque hashtag calculer son évolution par rapport à l’heure précédente On a les infos dans la table statistics, on a pas eu le temps de créer le code pour calculer l'évolution
- Appliquer le kmeans sur la latitude et longitude des tweet et/ou appliquer le kmeans sur le nombre de mots et le nombre de hashtag dans les tweet
Merci pour le cours, c'était intéressant!
C'est dur d'arriver à comprendre comment s'inter-connecte toutes les briques donc on a mis du temps à démarrer
Pas de chance c'était la semaine des soutenances de mémoire donc le groupe n'était pas forcément focus à 100%