dibbhatt/kafka-spark-consumer

It works well in local model,but when I submit it in cluster model,the fixed rate is too small

yangcong3643 opened this issue · 5 comments

spark:2.3.0 kafka:0.10.0

here is my error

processingRate rate 25.068939583855602
fixed rate 50.0
P 24.931060416144398
H 398.30784657808977
2018-12-03 16:17:07 INFO PIDController:88 - ======== Rate Revision Starts ========
2018-12-03 16:17:07 INFO PIDController:89 - Current Fetch Size : 50
2018-12-03 16:17:07 INFO PIDController:90 - Fill Freq : 1000
2018-12-03 16:17:07 INFO PIDController:91 - Batch Duration : 2000
2018-12-03 16:17:07 INFO PIDController:92 - Scheduling Delay : 31777
2018-12-03 16:17:07 INFO PIDController:93 - Processing Delay : 3989
2018-12-03 16:17:07 INFO PIDController:94 - Fixed Rate : 50
2018-12-03 16:17:07 INFO PIDController:95 - Processing rate : 25
2018-12-03 16:17:07 INFO PIDController:96 - Proportional Error : 24
2018-12-03 16:17:07 INFO PIDController:97 - HistoricalError : 398
2018-12-03 16:17:07 INFO PIDController:98 - DifferentialError : 0
2018-12-03 16:17:07 INFO PIDController:99 - Reviced Rate : 25
2018-12-03 16:17:07 INFO PIDController:107 - Reviced FetchSize : 25
2018-12-03 16:17:07 INFO PIDController:108 - ======== Rate Revision Ends ========
2018-12-03 16:17:07 INFO ReceiverStreamListener:129 - Modified Rate by Controller : 25
2018-12-03 16:17:07 WARN ReceiverStreamListener:139 - Controller rate not applied as waiting queue is greater than throttle queue
2018-12-03 16:17:07 INFO ClientCnxn:512 - EventThread shut down
2018-12-03 16:17:08 INFO ShuffledDStream:54 - Time 1543825028000 ms is invalid as zeroTime is 1543824956000 ms , slideDuration is 120000 ms and difference is 72000 ms
2018-12-03 16:17:08 INFO BlockManagerInfo:54 - Added input-0-1543825026000 in memory on 10.221.150.71:38632 (size: 32.4 KB, free: 351.6 MB)
2018-12-03 16:17:08 INFO JobScheduler:54 - Added jobs for time 1543825028000 ms
2018-12-03 16:17:08 WARN ReceiverStreamListener:98 - stop consumer as pending queue 18 greater than configured limit 3
2018-12-03 16:17:08 INFO ZkState:85 - Starting curator service
2018-12-03 16:17:08 INFO CuratorFrameworkImpl:224 - Starting

image

image

but in local model,everything goes fine

processingRate rate 1562500.0
fixed rate 50000.0
P -1512500.0
H 5468.75
2018-12-03 16:35:42 INFO PIDController:88 - ======== Rate Revision Starts ========
2018-12-03 16:35:42 INFO PIDController:89 - Current Fetch Size : 50000
2018-12-03 16:35:42 INFO PIDController:90 - Fill Freq : 1000
2018-12-03 16:35:42 INFO PIDController:91 - Batch Duration : 2000
2018-12-03 16:35:42 INFO PIDController:92 - Scheduling Delay : 7
2018-12-03 16:35:42 INFO PIDController:93 - Processing Delay : 64
2018-12-03 16:35:42 INFO PIDController:94 - Fixed Rate : 50000
2018-12-03 16:35:42 INFO PIDController:95 - Processing rate : 1562500
2018-12-03 16:35:42 INFO PIDController:96 - Proportional Error : -1512500
2018-12-03 16:35:42 INFO PIDController:97 - HistoricalError : 5468
2018-12-03 16:35:42 INFO PIDController:98 - DifferentialError : -884
2018-12-03 16:35:42 INFO PIDController:99 - Reviced Rate : 1562500
2018-12-03 16:35:42 INFO PIDController:107 - Reviced FetchSize : 1562500
2018-12-03 16:35:42 INFO PIDController:108 - ======== Rate Revision Ends ========
2018-12-03 16:35:42 INFO ReceiverStreamListener:129 - Modified Rate by Controller : 1562500
2018-12-03 16:35:42 INFO ZkState:85 - Starting curator service
2018-12-03 16:35:42 INFO CuratorFrameworkImpl:224 - Starting
image

local config is --master local[2]
cluster config is --master spark://master:6066 --deploy-mode cluster
--total-executor-cores 2 --executor-cores 1 --executor-memory 1g

I think your Kafka topic has 2 partitions ? So with total executors 2 each having 1 cores has consumed all cores for receiver and you do not have any cores left for processing. You can increase total-executor to higher and see if that works for you.

I have already set receiver.num=1
Why does it behave differently in both modes?

It’s sloved after I Raised the batchDuration

@yangcong3643 good to hear that. If things are fine, you can close the issue.