QUESTION: how to get sample app WordCount.jar to run with version 1.15.3 and 2 taskmanager replicas
pzim opened this issue · 3 comments
Hi all,
I am attempting to deploy the sample WordCount.jar app using 2 taskmanager replicas, however only 1 taskmanager is able to register successfully with the ResourceManager at any one time. When attempting to deploy the same app with a single replica for taskmanager, it seems to work fine.
flink UI showing running job using a single taskmanager replica setup:
However when deploying a 2 replica taskmanager configuration, one of the taskmanagers gets an akka gated exception.
logs from the failed taskmanager pod in a 2 replica setup:
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/), Path(/user/rpc/resourcemanager_*)]
at java.lang.Thread.run(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:53) ~[?:?]
at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:59) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcService.lambda$resolveActorAddress$11(AkkaRpcService.java:602) ~[?:?]
org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/user/rpc/resourcemanager_*.
2023-01-31 21:20:42,583 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 10000 ms.
2023-01-31 21:20:42,582 WARN akka.remote.Remoting [] - Tried to associate with unreachable remote address [akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123]. Address is now gated for 50 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.]
flink UI showing job with 1 taskmanager registered and running healthy, although 2 taskmanager pods are running:
same flink UI observed a few minutes later, showing the job failed:
I created a custom flink 1.15.3 image using the below Dockerfile:
FROM flink:1.15.3-scala_2.12-java11
RUN mkdir -p ${FLINK_HOME}/flink-web-upload ${FLINK_HOME}/usrlib
COPY target/wc.jar ${FLINK_HOME}/flink-web-upload/app.jar
RUN ln -s ${FLINK_HOME}/flink-web-upload/app.jar ${FLINK_HOME}/usrlib/app.jar
RUN chown -R 9999:9999 ${FLINK_HOME}/flink-web-upload ${FLINK_HOME}/usrlib
USER 9999
Where wc.jar was downloaded from here:
https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.3/flink-examples-streaming_2.12-1.15.3-WordCount.jar
Below is the flinkcluster CR I'm using to deploy this setup:
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
name: flinkjobcluster-sample
spec:
flinkVersion: "1.15"
image:
name: <redacted_path_to_ECR>/streaming-analytics-app:flink-1-15-test
jobManager:
accessScope: Cluster
ports:
ui: 8081
resources:
limits:
memory: "2048Mi"
cpu: "500m"
taskManager:
replicas: 2
resources:
limits:
memory: "2048Mi"
cpu: "500m"
job:
jarFile: ./examples/streaming/WordCount.jar
className: org.apache.flink.streaming.examples.wordcount.WordCount
args: ["--input", "./README.txt"]
parallelism: 2
restartPolicy: Never
flinkProperties:
taskmanager.numberOfTaskSlots: "1"
logConfig:
log4j-console.properties: |
# This affects logging for both user code and Flink
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
logger.flink.name = org.apache.flink
logger.flink.level = DEBUG
# The following lines keep the log level of common libraries/connectors on
# log level DEBUG. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = DEBUG
logger.kafka.name= org.apache.kafka
logger.kafka.level = DEBUG
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = DEBUG
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = DEBUG
logger.kafkacoordinator.name = org.apache.kafka.clients.consumer.internals.AbstractCoordinator
logger.kafkacoordinator.level = DEBUG
# Log all info to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all info in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
I would welcome any input in helping get both taskmanagers registered and able to run the job healthy.
Hey! This is rather odd. Looks like there may be a network issue somewhere. Could you look into your namespace events see if anything pops ?
@jto - appreciate you responding so quick. The networking is fine I believe. I deployed an ubuntu container with networking tools in the same namespace and am able to access each of the job/taskmanager pods/ports. The only interesting events are related to the taskmanager experiencing the gating issue, where its readiness probes are failing with 500 response.
Logs in this failing taskmanager still showing:
Tried to associate with unreachable remote address [akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123]. Address is now gated for 50 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.]
Another data point: I can delete the failing taskmanager pod and when it comes back up it becomes the healthy one and the other, previously healthy taskmanager pod becomes unhealthy with the above gated
message.
After doing some pcaps, the network traffic between the taskmanagers and and jobmanager looks pretty healthy/normal.
After seeing heartbeat timeouts like below led to the thought that our envoy proxies were blocking the heartbeat communication.
org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException: The heartbeat of ResourceManager with id cf8116891cd5aec6f77ac3f897359ff6 timed out.
After adding an istio annotation to exclude traffic for port 6123, both taskmanagers were able to register and stay up.
Closing this issue now as this has resolved the issue.