lintool/warcbase

K-Means Clustering

Opened this issue · 4 comments

As discussed, we're interested in incorporating K-Means clustering into warcbase. Can we take a collection (part of GeoCities, for example, or a smaller Archive-It collection) and separate it into k clusters?

@yb1 has offered to tackle this.

Pull request (#230)

Usage example:

import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/", sc)
  .keepUrlPatterns(Set("http://geocities.com/EnchantedForest/.*".r))

val clusters = ExtractClusters(recs, sc)
             .topNWords("GEO_ENCHANTED_FOREST_TOP_N", sc)
             .computeLDA("GEO_ENCHANTED_FOREST_LDA", sc)
             .saveSampleDocs("GEO_ENCHANTED_FOREST_LDA", sc)

APIs:

ExtractClusters (records: RDD[ArchiveRecord], sc: SparkContext, k: Int = 20, numIterations: Int = 20, minDocThreshold: Int=5)

This does stemming & stopwords removal & applying tf-idf and create k-mean clusters
k is # of clusters you want to have, and minDocThreshold is to ignore words that were only used in very few docs.

getSampleDocs(sc: SparkContext, numDocs: Int=10)

This function returns rdd of contents of web which are closest to centroids of clusters.
numDocs determines how many documents you want to return for each cluster
This function can be used when further operations are desired to be applied on rdd. (e.g extracting data from the content)

saveSampleDocs(output: String, sc: SparkContext, numDocs: Int=10)

This just saves the result of calling getSampleDocs (after repartitioning)
Each output file will contain sample docs for the corresponding cluster (part-00000 for cluster 1 and so on)

computeLDA(output: String, sc: SparkContext, numTopics: Int = 3, numWordsPerTopic: Int = 10)
It will save result of lda to the folder 'output'
numWordsPerTopic defines # of most important words for each topic
Each output file will contain LDA result for the corresponding cluster (part-00000 for cluster 1 and so on)
Output format. (topic, array of words, weight)
e.g (0,WrappedArray(green),0.05061560896291224)

topNWords(output: String, sc: SparkContext, limit: Int = 10)
Saves the most frequent words in each cluster.
Limit defines # of words to be shown for each cluster.

Great! Will test this.

I ran

import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.{RecordLoader, ExtractClusters}

val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/", sc)
  .keepUrlPatterns(Set("http://geocities.com/EnchantedForest/.*".r))

val clusters = ExtractClusters(recs, sc)
             .topNWords("GEO_ENCHANTED_FOREST_TOP_N", sc)
             .computeLDA("GEO_ENCHANTED_FOREST_LDA", sc)
             .saveSampleDocs("GEO_ENCHANTED_FOREST_LDA", sc)

Using this command on camalon:

spark-shell --jars ~/git/warcbase/target/warcbase-0.1.0-SNAPSHOT-fatjar.jar --num-executors 75 --executor-cores 5 --executor-memory 20G --driver-memory 10G and really haven't had any luck. Tons and tons of errors.

Did you get lots of verbose errors when running @yb1? Is this a Spark 1.5.1 vs 1.6 error? Just doublechecking.

I will keep it running a bit longer but lots of failures.

Errors were deserialization errors. It's odd that those errors have occurred because I have seen those errors and fixed it.

I've tried to run it again a couple of times, and it worked. (Except for LDA, which is still running)

Please try again with 1.6.1.
Also, in order to avoid memory error, please add "--conf spark.yarn.executor.memoryOverhead=4096" when you start.

Note:

  1. (addition). test for good k.
    If you'd like to find the desired k for k-means, I added option to test against different k values. It will print out costs (sum of squared distances of points to their nearest center) for each k. Then, k can be picked by using elbow method or other method. (For K means, cost will always decrease as k increase (until k > dataset). But then, there is a point where the decrease in cost slows down quite rapidly. Elbow method is to pick that point for k. For more details, please refer to https://en.wikipedia.org/wiki/Determining_the_number_of_clusters_in_a_data_set)
    val clusters = ExtractClusters(recs, sc, testK=true, maxK=50, stepK: Int=10, minK: Int=5)
    It will then compute the cost for k from minK until maxK where k is increase by stepK every time.
  2. computeLDA takes significantly more time than other two. For enchantedForrest dataset with 80 executors, computeLDA took more than 24hours, whereas, creating k clusters, extracting most frequent words or extracting samples that are closest to centroids took less than 2 hours each.
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.{RecordLoader, ExtractClusters}

val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/", sc)
  .keepUrlPatterns(Set("http://geocities.com/EnchantedForest/.*".r))

val clusters = ExtractClusters(recs, sc)
             .topNWords("GEO_ENCHANTED_FOREST_TOP_N", sc)
             .saveSampleDocs("GEO_ENCHANTED_FOREST_SAMPLE", sc)
             .computeLDA("GEO_ENCHANTED_FOREST_LDA", sc)