anhnongdan/Spark1.6_Problems

Coalesce big table before - AND AFTER - join

Closed this issue · 7 comments

Calculating fts_data for weekly analysis

if 'raw_fraud_fts' in locals():
    raw_fraud_fts.unpersist()
raw_fraud_fts = (sqlContext.read.options(basePath="/user/hdfs/tc/par_new/fraud/features_at_fwd/")
    .parquet("/user/hdfs/tc/par_new/fraud/features_at_fwd/update_date=*")
    .where("update_date >= '{}' and update_date <= '{}'".format(start_date, end_date))
    .withColumnRenamed("update_date", "request_date_1d_ago")
    .coalesce(3000)
                )
print raw_fraud_fts.first()

if 'ocb_fts_data' in locals():
    ocb_fts_data.unpersist()
    
ocb_fts_data = (con_rep_app_match_all24
  .selectExpr("phone_number", 'app_date', 'app_date request_date', "request_date_1d_ago", "match_score", "match_level", 
              'is_self_init_sms', 'is_avay_sms', 's2f_days', 'f2d_days', 'f2d_days_2', 'is_self_initiated')
  .join(raw_fraud_fts, ["phone_number", "request_date_1d_ago"]) 
  .repartition(1000)
                )
# print ocb_fts_data.count()
# print ocb_fts_data.take(1)

# write the intermediate results
(ocb_fts_data.write.mode("overwrite")
   .parquet("{}/{}".format(PROJECT_BASE, FI_FTS_DATA))
)

It take another step for coalesce:
screen shot 2018-06-26 at 7 29 16 am

Coalescing take nearly 1hr.
Then joining con_rep_app_match_all24 and raw_fraud_fts take 1.9 hrs (see detail below).
Repartition took only 1.9 min (Already sorted in above join)

screen shot 2018-06-26 at 11 52 12 am

Although, 1000 parts are too many for a some 40K records:

7.4 K  22.2 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00000-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.2 K  21.7 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00001-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.3 K  21.8 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00002-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.3 K  22.0 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00003-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.2 K  21.6 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00004-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.4 K  22.2 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00005-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.2 K  21.5 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00006-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.3 K  21.9 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00007-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.4 K  22.3 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00008-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.2 K  21.6 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00009-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.2 K  21.7 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00010-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.3 K  21.8 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00011-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.2 K  21.5 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00012-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet
7.2 K  21.7 K  /user/hdfs/working/thangnguyen/ocb/fraud_investigation/1day_delay/ocb_fts_data_/part-r-00013-3e1692ed-d9d9-41a8-8f22-2e6120c6f052.gz.parquet

Coalesce
screen shot 2018-06-26 at 3 15 24 pm

Join 200 parts df and 3000 (after coalesce) :

screen shot 2018-06-26 at 11 55 58 am

screen shot 2018-06-26 at 11 56 58 am

Try without coalesce

First, number of tasks seem go up and easy to fail

screen shot 2018-06-26 at 12 09 31 pm

Failed tasks
screen shot 2018-06-26 at 12 16 28 pm

This time, joining take a bit longer ~ 11%
runtime = 3:06:05.367592
screen shot 2018-06-26 at 3 07 53 pm

DAG and executor statistics looks pretty the same with coalesce.
But Input Size and Shuffle Size are much evenly distributed between Executors with Coalesce
screen shot 2018-06-26 at 3 12 26 pm

screen shot 2018-06-26 at 3 13 44 pm

On HCVN, I tried coalescing raw_fts without repartition hcvn_fts_data after join
=>> So I will need to coalesce(1) afterward, surely.

runtime = 1:20:21.195657

#35

Actually, I need to coalesce con_rep_app_match_all24 before join

Run time without coalescing both con_rep_app and raw_fts: runtime = 3:06:05.367592
=> New Run time:

### About after join:
Even joining 2 dfs which have 1 partition each. The resulted df will have 200 partition as default.
=>>> Really need to pay attention to this. Set by spark.sql.shuffle.partitions

=> The idea partition size is slightly smaller than 128MB
=> See tc_call_histories, 10 weeks graphs and other big files for more details.