Skip to main content

Flink upgrade (cross version)

  1. Delete the ConfigMap, clean up the Job list and find the ConfigMap configuration, as shown in the following example:

    kubernetes.cluster-id: md-flink
    kubernetes.namespace: default

    Execute the script to batch delete ConfigMap.

    If something like configmap "md-flink" deleted is not output, it means that the namespace or the prefix of ConfigMap is not correct, check again.

    # for i in $(kubectl -n [Replaced with value of kubernetes.namespace] get cm | awk '$1~"[Replaced with value of kubernetes.cluster-id]"{print $1}');do kubectl -n [Replaced with value ofkubernetes.namespace] delete cm $i;done

    for i in $(kubectl -n default get cm | awk '$1~"md-flink"{print $1}');do kubectl -n default delete cm $i;done
  2. Download the mirror of the new version

    Operations are required on each node server in a kubernetes cluster.

    crictl pull nocoly/flink:<version>
  3. Modify Configuration File

    Update the flink.yaml file to specify the image versions used by the flink-jobmanager and flink-taskmanager services:

    - name: jobmanager
    image: nocoly/flink:VERSION
    - name: taskmanager
    image: nocoly/flink:VERSION
    When upgrading to v1.19.710, click to view more details about adjustments in flink.yaml
    1. Remove all existing configurations starting with metrics, and add the following Kafka metric reporting configurations (replace Kafka addresses as per your environment):

      metrics.job.status.enable: STATE
      metrics.reporters: kafka_reporter,kafka_reporter_running,kafka_reporter2,kafka_reporter_running2
      metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
      metrics.reporter.kafka_reporter.chunk.size: 20000
      metrics.reporter.kafka_reporter.interval: 60s
      metrics.reporter.kafka_reporter.filter.metrics: numRecordsIn,numRecordsOut,runningTime
      metrics.reporter.kafka_reporter.topic: flink_metrics_counter
      metrics.reporter.kafka_reporter.taskNamePrefix: HAP0x5c2_
      metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter_running.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
      metrics.reporter.kafka_reporter_running.chunk.size: 20000
      metrics.reporter.kafka_reporter_running.interval: 60s
      metrics.reporter.kafka_reporter_running.filter.metrics: RUNNINGState
      metrics.reporter.kafka_reporter_running.topic: flink_metrics_gauge
      metrics.reporter.kafka_reporter_running.taskNamePrefix: HAP0x5c2_
      metrics.reporter.kafka_reporter2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
      metrics.reporter.kafka_reporter2.chunk.size: 20000
      metrics.reporter.kafka_reporter2.interval: 60s
      metrics.reporter.kafka_reporter2.filter.metrics: numRecordsIn,numRecordsOut,runningTime
      metrics.reporter.kafka_reporter2.topic: flink_metrics_counter-hdp
      metrics.reporter.kafka_reporter2.taskNamePrefix: HDP0x5c2_
      metrics.reporter.kafka_reporter_running2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter_running2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
      metrics.reporter.kafka_reporter_running2.chunk.size: 20000
      metrics.reporter.kafka_reporter_running2.interval: 60s
      metrics.reporter.kafka_reporter_running2.filter.metrics: RUNNINGState
      metrics.reporter.kafka_reporter_running2.topic: flink_metrics_gauge-hdp
      metrics.reporter.kafka_reporter_running2.taskNamePrefix: HDP0x5c2_
    2. Locate the kind: Role configuration section, and add patch permission for the configmaps resource under the rules.verbs field:

      kind: Role
      apiVersion: rbac.authorization.k8s.io/v1
      metadata:
      name: configmap-access
      namespace: default
      rules:
      - apiGroups: [""]
      resources: ["configmaps"]
      verbs: ["update", "get", "watch", "list", "create", "edit", "delete", "patch"] # Newly added patch permission
  4. Restart the Service

    kubectl apply -f flink.yaml
  5. Restart or publish tasks on the synchronization task list 💥 💥 💥