跳到主要内容

Flink 升级(跨版本)

  1. 删除 ConfigMap 清理 Job 列表,找到 ConfigMap 相关配置,如下示例:

    kubernetes.cluster-id: md-flink
    kubernetes.namespace: default

    执行脚本批量删除 ConfigMap

    如果未输出 configmap "md-flink" deleted 之类字样,则表示命名空间或者 ConfigMap 前缀错误,需要再次检查

    # for i in $(kubectl -n 【此处替换为 kubernetes.namespace 的值】 get cm | awk '$1~"【此处替换为 kubernetes.cluster-id 的值】"{print $1}');do kubectl -n 【此处替换为 kubernetes.namespace 的值】 delete cm $i;done

    for i in $(kubectl -n default get cm | awk '$1~"md-flink"{print $1}');do kubectl -n default delete cm $i;done
  2. 下载新版本镜像(离线包下载

    kubernetes 集群中各节点服务器上都需要操作

    crictl pull nocoly/flink:版本号
  3. 修改配置文件

    修改 flink.yamlflink-jobmanagerflink-taskmanager 服务使用的镜像版本

    - name: jobmanager
    image: nocoly/flink:版本号
    - name: taskmanager
    image: nocoly/flink:版本号
    升级至 v1.19.710 时,点击查看 flink.yaml 更多调整细节
    1. 删除原有所有以 metrics 开头的配置项,并新增以下 Kafka 指标上报配置(请根据实际环境替换 Kafka 地址):

       metrics.job.status.enable: STATE
      metrics.reporters: kafka_reporter,kafka_reporter_running,kafka_reporter2,kafka_reporter_running2
      metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
      metrics.reporter.kafka_reporter.chunk.size: 20000
      metrics.reporter.kafka_reporter.interval: 60s
      metrics.reporter.kafka_reporter.filter.metrics: numRecordsIn,numRecordsOut,runningTime
      metrics.reporter.kafka_reporter.topic: flink_metrics_counter
      metrics.reporter.kafka_reporter.taskNamePrefix: HAP0x5c2_
      metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter_running.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
      metrics.reporter.kafka_reporter_running.chunk.size: 20000
      metrics.reporter.kafka_reporter_running.interval: 60s
      metrics.reporter.kafka_reporter_running.filter.metrics: RUNNINGState
      metrics.reporter.kafka_reporter_running.topic: flink_metrics_gauge
      metrics.reporter.kafka_reporter_running.taskNamePrefix: HAP0x5c2_
      metrics.reporter.kafka_reporter2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
      metrics.reporter.kafka_reporter2.chunk.size: 20000
      metrics.reporter.kafka_reporter2.interval: 60s
      metrics.reporter.kafka_reporter2.filter.metrics: numRecordsIn,numRecordsOut,runningTime
      metrics.reporter.kafka_reporter2.topic: flink_metrics_counter-hdp
      metrics.reporter.kafka_reporter2.taskNamePrefix: HDP0x5c2_
      metrics.reporter.kafka_reporter_running2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
      metrics.reporter.kafka_reporter_running2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
      metrics.reporter.kafka_reporter_running2.chunk.size: 20000
      metrics.reporter.kafka_reporter_running2.interval: 60s
      metrics.reporter.kafka_reporter_running2.filter.metrics: RUNNINGState
      metrics.reporter.kafka_reporter_running2.topic: flink_metrics_gauge-hdp
      metrics.reporter.kafka_reporter_running2.taskNamePrefix: HDP0x5c2_
    2. 找到 kind: Role 配置部分,在 rules.verbs 字段下为 configmaps 资源新增 patch 权限

      kind: Role
      apiVersion: rbac.authorization.k8s.io/v1
      metadata:
      name: configmap-access
      namespace: default
      rules:
      - apiGroups: [""]
      resources: ["configmaps"]
      verbs: ["update", "get", "watch", "list", "create", "edit", "delete", "patch"] # 此处新增 patch 权限
  4. 重启服务

    kubectl apply -f flink.yaml
  5. 在同步任务列表中重新开启/发布任务 💥 💥 💥