spotify/flink-on-k8s-operator

QUESTION: how to get sample app WordCount.jar to run with version 1.15.3 and 2 taskmanager replicas

pzim opened this issue · 3 comments

pzim commented

Hi all,

I am attempting to deploy the sample WordCount.jar app using 2 taskmanager replicas, however only 1 taskmanager is able to register successfully with the ResourceManager at any one time. When attempting to deploy the same app with a single replica for taskmanager, it seems to work fine.

flink UI showing running job using a single taskmanager replica setup:
image

However when deploying a 2 replica taskmanager configuration, one of the taskmanagers gets an akka gated exception.

logs from the failed taskmanager pod in a 2 replica setup:

Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/), Path(/user/rpc/resourcemanager_*)]
	at java.lang.Thread.run(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:53) ~[?:?]
	at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:59) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcService.lambda$resolveActorAddress$11(AkkaRpcService.java:602) ~[?:?]
org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/user/rpc/resourcemanager_*.
2023-01-31 21:20:42,583 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 10000 ms.
2023-01-31 21:20:42,582 WARN  akka.remote.Remoting                                         [] - Tried to associate with unreachable remote address [akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123]. Address is now gated for 50 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] 

flink UI showing job with 1 taskmanager registered and running healthy, although 2 taskmanager pods are running:
image

same flink UI observed a few minutes later, showing the job failed:
image

I created a custom flink 1.15.3 image using the below Dockerfile:

FROM flink:1.15.3-scala_2.12-java11

RUN mkdir -p ${FLINK_HOME}/flink-web-upload ${FLINK_HOME}/usrlib
COPY target/wc.jar ${FLINK_HOME}/flink-web-upload/app.jar

RUN ln -s ${FLINK_HOME}/flink-web-upload/app.jar ${FLINK_HOME}/usrlib/app.jar
RUN chown -R 9999:9999 ${FLINK_HOME}/flink-web-upload ${FLINK_HOME}/usrlib

USER 9999

Where wc.jar was downloaded from here:
https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.3/flink-examples-streaming_2.12-1.15.3-WordCount.jar

Below is the flinkcluster CR I'm using to deploy this setup:

apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: flinkjobcluster-sample
spec:
  flinkVersion: "1.15"
  image:
    name: <redacted_path_to_ECR>/streaming-analytics-app:flink-1-15-test
  jobManager:
    accessScope: Cluster
    ports:
      ui: 8081
    resources:
      limits:
        memory: "2048Mi"
        cpu: "500m"
  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "2048Mi"
        cpu: "500m"
  job:
    jarFile: ./examples/streaming/WordCount.jar
    className: org.apache.flink.streaming.examples.wordcount.WordCount
    args: ["--input", "./README.txt"]
    parallelism: 2
    restartPolicy: Never
  flinkProperties:
    taskmanager.numberOfTaskSlots: "1"

  logConfig:
    log4j-console.properties: |
      # This affects logging for both user code and Flink
      rootLogger.level = DEBUG
      rootLogger.appenderRef.console.ref = ConsoleAppender
      rootLogger.appenderRef.rolling.ref = RollingFileAppender

      # Uncomment this if you want to _only_ change Flink's logging
      logger.flink.name = org.apache.flink
      logger.flink.level = DEBUG

      # The following lines keep the log level of common libraries/connectors on
      # log level DEBUG. The root logger does not override this. You have to manually
      # change the log levels here.
      logger.akka.name = akka
      logger.akka.level = DEBUG
      logger.kafka.name= org.apache.kafka
      logger.kafka.level = DEBUG
      logger.hadoop.name = org.apache.hadoop
      logger.hadoop.level = DEBUG

      logger.zookeeper.name = org.apache.zookeeper
      logger.zookeeper.level = DEBUG

      logger.kafkacoordinator.name = org.apache.kafka.clients.consumer.internals.AbstractCoordinator
      logger.kafkacoordinator.level = DEBUG

      # Log all info to the console
      appender.console.name = ConsoleAppender
      appender.console.type = CONSOLE
      appender.console.layout.type = PatternLayout
      appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

      # Log all info in the given rolling file
      appender.rolling.name = RollingFileAppender
      appender.rolling.type = RollingFile
      appender.rolling.append = false
      appender.rolling.fileName = ${sys:log.file}
      appender.rolling.filePattern = ${sys:log.file}.%i
      appender.rolling.layout.type = PatternLayout
      appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
      appender.rolling.policies.type = Policies
      appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
      appender.rolling.policies.size.size=100MB
      appender.rolling.strategy.type = DefaultRolloverStrategy
      appender.rolling.strategy.max = 10


I would welcome any input in helping get both taskmanagers registered and able to run the job healthy.
jto commented

Hey! This is rather odd. Looks like there may be a network issue somewhere. Could you look into your namespace events see if anything pops ?

pzim commented

@jto - appreciate you responding so quick. The networking is fine I believe. I deployed an ubuntu container with networking tools in the same namespace and am able to access each of the job/taskmanager pods/ports. The only interesting events are related to the taskmanager experiencing the gating issue, where its readiness probes are failing with 500 response.
Logs in this failing taskmanager still showing:

Tried to associate with unreachable remote address [akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123]. Address is now gated for 50 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.]

Another data point: I can delete the failing taskmanager pod and when it comes back up it becomes the healthy one and the other, previously healthy taskmanager pod becomes unhealthy with the above gated message.

pzim commented

After doing some pcaps, the network traffic between the taskmanagers and and jobmanager looks pretty healthy/normal.

After seeing heartbeat timeouts like below led to the thought that our envoy proxies were blocking the heartbeat communication.

org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException: The heartbeat of ResourceManager with id cf8116891cd5aec6f77ac3f897359ff6 timed out.

After adding an istio annotation to exclude traffic for port 6123, both taskmanagers were able to register and stay up.
Closing this issue now as this has resolved the issue.