commoncrawl/cc-pyspark

How To: process CC NEWS warc files, most recent first

lgov opened this issue · 3 comments

lgov commented

This is more a 'how would I do...' question than an issue report.

I am processing the commoncrawl news WARC files with multiple python processes on a Kubernetes cluster. I am in progress of implementing cc-pyspark instead, to improve the parallelism.

Now, my main issue is that the cluster should process the most recent WARC file first, continuously released every hour (?), and only process older files when there is no new file pending.
Both my own setup and - as far as I know - cc-pyspark will only work on a specific set of WARC files. For instance, I can store a list of all 14183 news warc files in a DataFrame to be dispatched to all executors, but I can not dynamically add the new WARC new files first in that DataFrame.

One idea was to use Spark Structured Streaming with two queues, where the list of existing files are added in one queueStream and the new incoming files are appended to the second queueStream. That second one then needs to be consumed first, so has higher priority.
Not sure if that's possible with standard py-spark, or Spark/Scala.

Is this something you (or any cc-pyspark user) has encountered? I'm looking for directions, but working code is also fine. :)
Thanks.

lgov commented

Thinking a bit further myself... in my current approach each process handles one warc file completely.

If I read your code correctly the records in a warc file are processed in parallel by executors (using mapPartitionsWithIndex over the ArchiveIterator). So if I adapt the outer loop in the process_warcs method that iterates over the warc files to always use the newest warc file if available, that should solve my requirement.

Hi @lgov, with Spark streaming you could write the path of a new WARC file into a text file and then move it into a directory which is monitored for the appearance of new files (see textFileStream. But then you'd still need to mix in the paths of older WARC files to be processed with lower priority.

adapt the outer loop in the process_warcs method that iterates over the warc files to always use the newest warc file if available

Yes, but you then need to make sure that the newest WARC is processed only once which isn't trivial with parallel executors.

lgov commented

Thanks for the feedback @sebastian-nagel.

Looks like my problem can be simplified to:

  • one Spark job running continuously to follow the hourly news dumps, for that the nr. of input partitions can be set to 1 to ensure each file is only processed once.
  • a separate Spark Job with many input partitions to handle the backlog, once.