thbar/kiba

Add support for "close" on transforms

thbar opened this issue · 8 comments

thbar commented

Sources and destinations both support some form of "close":

  • sources because they are only called once via #each (so one can close as part of this)
  • destinations via #close

In contrast, transforms cannot currently close.

This will be interesting in particular with Kiba v2 StreamingRunner, to implement some forms of buffering in the transform itself (e.g: keeping N records before doing a grouped query, aggregating N records), in which case close is a way to ensure we flush out the buffer.

See https://stackoverflow.com/questions/49422860/transforming-a-table-into-a-hash-of-sets-using-kiba-etl for an example of use.

Not sure that it is really needed cause you can always "cache" stuff inside an instance variable or some other cache store and flush it by returning the contents of the cache and return nil or call next if inside a block to aggregate. So it can be easy accomplished with the current interface.

thbar commented

@sirfilip thanks for your input! I'm still thinking through this, and be sure (since I'm always quite aggressively removing features) I'll wait to have an actual production requirement that shows it's really needed, before implementing this :-)

thbar commented

A first experimental implementation is available in #57. I'll give myself some time to play around with it and see how necessary it is for various scenarios.

thbar commented

FYI @kmayer, #57 may be of interest to you.

thbar commented

It just occurred to me (while working on another ETL), that #57 also allows to achieve sorting quite easily (here in memory):

class SortTransform
  def initialize(sort_by:)
    @sort_by = sort_by
    @buffer = []
  end

  def process(row)
    @buffer << row
    nil
  end
  
  def close
    @buffer.sort_by(@sort_by).each do |row|
      yield row
    end
  end
end

I will experiment more, but this looks promising.

thbar commented

#57 has been merged into master. This will be released at next round (Kiba 2.5.0).

@thbar in addition to sorting, this is going to help me solve the following problem:

A set of snapshot data might include multiple fact values from the same product. Some of my exports expect me to pick a specific value for each product, such as the min, avg, max, or most recent price. This will allow me to bake that into the middle of the pipeline, still using other transforms, and actual destinations, instead of the ole' instance-var-in-the-pipeline-doing--its-thing-in-the-post-process trick.

Cool addition, thanks.

thbar commented

instead of the ole' instance-var-in-the-pipeline-doing--its-thing-in-the-post-process trick

@ttilberg you made me laugh with this one 😄 it's such a common thing that it's worth having a long word like this!

Thanks for the feedback, and I can say I definitely share your findings.

I've been using this construct in various places now, and this is useful for a wide range of topics (e.g. data profiling that you can plug conditionally on/off without touching the rest of the pipeline).

And in occasions, you can easily plug stuff (still relying on instance variables though), like the upcoming LambdaTransform that I will soon add to kiba common:

transform LambdaTransform,
  on_init: -> { @count = 0 },
  on_process: -> (r) { @count += 1; r },
  on_close: -> { logger.info "Total #{@count} rows read from source" }

This is very convenient for quick profiling while iterating on the data at development time.

In all cases, thanks for taking the time to comment & provide feedback, appreciated!