Flink Upgrade (Cross-Version)
-
Delete
ConfigMapand Clean Up the FlinkJobListExecute the script to batch delete
ConfigMap:If no output such as
configmap "md-flink" deletedappears, it indicates an incorrect namespace or the absence of theConfigMap. Please double-check.# Template: Replace <namespace> with the actual namespace where Flink is deployed
for i in $(kubectl -n <namespace> get cm | awk '$1~"md-flink"{print $1}'); do kubectl -n <namespace> delete cm $i; done
# Example (namespace is flink):
for i in $(kubectl -n flink get cm | awk '$1~"md-flink"{print $1}'); do kubectl -n flink delete cm $i; done -
Import the Corresponding Image for Each Architecture on All Flink Nodes (Offline Package Download)
- AMD64
- ARM64
crictl pull nocoly/flink:versioncrictl pull nocoly/flink-arm64:version -
Modify the Configuration File
Update the image version used by
flink-jobmanagerandflink-taskmanagerservices inflink.yaml- name: jobmanager
image: nocoly/flink:version
- name: taskmanager
image: nocoly/flink:versionFor the upgrade to v1.19.710, click to view more adjustment details in
flink.yaml-
Remove all existing configuration entries starting with
metrics, and add the following Kafka metrics reporting configurations (replace the Kafka addresses according to the actual environment):metrics.job.status.enable: STATE
metrics.reporters: kafka_reporter,kafka_reporter_running,kafka_reporter2,kafka_reporter_running2
metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter.chunk.size: 20000
metrics.reporter.kafka_reporter.interval: 60s
metrics.reporter.kafka_reporter.filter.metrics: numRecordsIn,numRecordsOut,runningTime
metrics.reporter.kafka_reporter.topic: flink_metrics_counter
metrics.reporter.kafka_reporter.taskNamePrefix: HAP0x5c2_
metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter_running.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter_running.chunk.size: 20000
metrics.reporter.kafka_reporter_running.interval: 60s
metrics.reporter.kafka_reporter_running.filter.metrics: RUNNINGState
metrics.reporter.kafka_reporter_running.topic: flink_metrics_gauge
metrics.reporter.kafka_reporter_running.taskNamePrefix: HAP0x5c2_
metrics.reporter.kafka_reporter2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter2.chunk.size: 20000
metrics.reporter.kafka_reporter2.interval: 60s
metrics.reporter.kafka_reporter2.filter.metrics: numRecordsIn,numRecordsOut,runningTime
metrics.reporter.kafka_reporter2.topic: flink_metrics_counter-hdp
metrics.reporter.kafka_reporter2.taskNamePrefix: HDP0x5c2_
metrics.reporter.kafka_reporter_running2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter_running2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter_running2.chunk.size: 20000
metrics.reporter.kafka_reporter_running2.interval: 60s
metrics.reporter.kafka_reporter_running2.filter.metrics: RUNNINGState
metrics.reporter.kafka_reporter_running2.topic: flink_metrics_gauge-hdp
metrics.reporter.kafka_reporter_running2.taskNamePrefix: HDP0x5c2_ -
Locate the
kind: Roleconfiguration section and add thepatchpermission forconfigmapsunder therules.verbsfield.kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: configmap-access
namespace: default
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["update", "get", "watch", "list", "create", "edit", "delete", "patch"] # Added patch permission here
-
-
Restart the Service
kubectl apply -f flink.yaml -
After the upgrade is complete, manually restart and/or publish the related tasks in the sync task list to restore normal operation of the sync tasks.