helgeho/ArchiveSpark

Its not clear how to use the mapEnrich function

borissmidt opened this issue · 2 comments

I have been able to use the mapEnrich when i only needed a single field,
But when i want to use multiple fields then i is not clear which function description i have to use.

I tried

def parsePayload(x: Any): ParsedData = {???}
rdd.mapEnrich(Seq("header","payload"), "parsedData")(ParsePayload)
rdd.foreach(println(_)) //force an action

But the functions isn't called,
I noticed the function will only be called when it has the correct type,
So how can i debug this? Is there some way to log the required type of the function?

Would you please describe what you are trying to do?

The idea is enrichments is pretty simple: every enrichment has a single parent / dependency and creates one or multiple fields under this parent. The aim of this is to maintain a clean lineage. mapEnrich is just a shortcut for enriching a record with a single value without creating an enrich function. The concept is still the same though, there is a single parent and it creates one field under this.

However, this does not mean that you have to use enrichment, that depends on your use case. You can still use the regular Spark map. In that case, a previously applied enrich function can serve as a pointer to the value derived by it.

Also, enforcing an action like in your example would not execute the mapEnrich, because mapEnrich does never change an RDD but creates a new enriched one, just like the regular Spark map.

Instead, the way this usually works is as follows:
Let's say you want to enrich your records with the string length of their content. Then you start with the enrich function that gives you the string representation of your content, i.e., StringContent and "mapEnrich" the length, which will be a new field under the string. You can see this if you take a peek at the JSON representation of your records:

val rddWithStringLength = rdd.mapEnrich(StringContent, "length") { str => str.length }
rddWithStringLength.peekJson

After you enrich an RDD, using rdd.enrich(...), you can also access the created fields in mapEnrich but providing it with the full path. In that case, you need to know the data type of this field though, which needs to be specified explicitly. Doing it as I showed above, you don't need that, as ArchiveSpark can derive the data type from the default field type of StringContent, so this would be the preferred way.

If you want to map your records to a tuple of multiple values from an enriched RDD in order to do some additional processing, you could to it as shown in this example:

val headerPayloadTuples = rdd.enrich(WarcPayload).map{ record =>
   val httpHeader: Map[String, String] = record.value(WarcPayload, WarcPayload.HttpHeaderField)
   val httpPayload = record.value(WarcPayload)
   (httpHeader, httpPayload)
}

If you want to do more complex things, please implement real enrich functions instead of using mapEnrich. This way the functions can be used by others too, your instruction code becomes much cleaner as you abstract away complexity and there is also possibility to access multiple fields. An example would be StringContent, where we access the http header to read the default charset to encode the payload as string.

One more thing to consider is that you should always filter your records based on the metadata before you run an action that invokes an enrichment, because by enriching your rdd you access the actual records, which is expensive if you have not filtered it before to keep only the records of interest. During development I would always recommend to do a peekJson, which will only access and apply your enrichments to the first record.

Hope this helps!