51zero/eel-sdk

limit parallelism of ParquetSource to reduce memory footprint

Opened this issue · 0 comments

FloWi commented

Hi!

I set up a ParquetSource with a JDBCSink and ran into memory issues.
The parquet files are stored in an s3 bucket and have been written by spark (snappy-compressed ~500 MByte).

Spark writes one file per partition (default=200). This causes eel to use lots of memory since many subscriptions are submitted to the executor, although I set up the stream like this source.toDataStream().to(sink, parallelism = 1)

In the code I found that you initialize the executor like this val executor = Executors.newCachedThreadPool() which creates an unbounded ThreadPool.

I did some experiments and repartitioned the spark dataframe to 1 and stored it again. Here's the comparison (see screenshots below):
memory usage of the 200 files parquet source: >1.2 Gbyte*
memory usage of the 1 file parquet source: 83 MByte constantly.

(*) I let it run on my local machine with normal DSL internet connection. On the server it ran oom pretty quickly - meaning it used more than 2GByte (my XmX setting for the app).

image

image

Can you think of a way to limit the amount of parallelism? Happy to provide a merge request if you point me in the right direction.