黑客24小时在线接单的网站

黑客24小时在线接单的网站

一口气搞懂Flink Metrics监控指标和性能优化,全靠这33张图和7千字

本文转载自微信公众号「3了解大数据几分钟秒」,作者在IT中穿梭旅行。转载本文请联系3了解大数据几分钟秒公众号。

前言

大家好,我是土哥。

最近在公司做 Flink 推理任务的性能测试应用于 job 监控和调整全链路吞吐、全链路延迟和吞吐延迟指标,其中使用 Flink Metrics 监控指标。

下一篇文章,干货满满,我将带领读者全面了解 Flink Metrics 指标监控,通过实战案例优化全链路吞吐、全链路延迟、吞吐延迟指标的性能,彻底掌握 Flink Metrics 性能调整和 Metrics 的使用。大纲目录如下:

1 Flink Metrics 简介

Flink Metrics 是 Flink 集群运行中的指标包括机器系统指标,如:CPU、内存、线程、JVM、网络、IO、GC 以及任务运行组件(JM、TM、Slot、操作、算子等相关指标。

Flink Metrics 包括两个功能:

                   
  • 实时收集监控数据。Flink 的 UI 界面上,用户可以看到任务状态、延迟、监控信息等。
  •                
  • 提供外部数据收集接口。用户可以整个 Flink 集群监控数据主动向第三方监控系统报告,如:prometheus、grafana 等,下面就介绍一下。

1.1 Flink Metric Types

Flink 分别提供了四个监测指标: Counter、Gauge、Histogram、Meter。

1. Count 计数器

统计一个 指标的总量。写过 MapReduce 开发人员应熟悉 Counter,其实意思是一样的,就是累积一个计数器,就是一直向上添加多个数据和多兆数据的过程。Flink 算子总接收记录 (numRecordsIn) 发送记录总数 (numRecordsOut) 属于 Counter 类型。

使用方法:可通过调用 counter(String name)创建和注册 MetricGroup

2. Gauge 指标瞬时值

Gauge 是最简单的 Metrics ,它反映了一个指标的瞬时值。例如,现在 TaskManager 的 JVM heap 内存使用多少,每次都可以实时暴露一个 Gauge,Gauge 目前的值是 heap 使用量。

首先,在使用前创建一个实现 org.apache.flink.metrics.Gauge 接口的类。返回值的类型没有限制。您可以通过在 MetricGroup 调用 gauge。

3. Meter 平均值

用于在一定时间内记录指标的平均值。Flink 中的指标有 Task 算子中的 numRecordsInPerSecond,记录此 Task 或算子每秒接收的记录数。

使用方法:通过 markEvent() 注册事件的发生。markEvent(long n) 同时注册多个事件。

4. Histogram 直方图

Histogram 用于统计一些数据的分布,如 Quantile、Mean、StdDev、Max、Min 等,最重要的是统计算子的延迟。该指标将记录数据处理的延迟信息,并在任务监控中发挥重要作用。

使用方法:通过调用 histogram(String name,Histogram histogram) 注册一个 MetricGroup。

1.2 Scope

Flink 指标系统按树形结构划分,域相当于树顶分支,表示指标分类大。每个指标将分配一个基于 3 组件的标志符:

                   
  • 用户在注册指标时提供的名称;
  •                
  • 可选用户自定义域;
  •                
  • 系统提供的域。

例如,如果 A.B 是系统域,C.D 是用户域,E 是名字,所以指标的标识符将是 A.B.C.D.E.你可以通过设置 conf/flink-conf.yam 里面的 metrics.scope.delimiter 参数配置标识符的分隔符(默认“.”)。

例如:以算子的指标组结构为例,默认为:

.taskmanager....

算子输入记录数指标为:

hlinkui.taskmanager.1234.wordcount.flatmap.0.numRecordsIn

1.3 Metrics 运行机制

在生产环境中,为了保证对Flink监控集群和作业的运行状态,Flink 提供两种集成方式:

1.3.1 主动方式 MetricReport

Flink Metrics 通过在 conf/flink-conf.yaml 配置一个或一些 reporters,将指标暴露给外部系统.这些 reporters 将在每个 job 和 task manager 启动时实例化。

1.3.2 被动模式 RestAPI

通过提供 Rest 接口,被动接收外部系统调用,可返回集群、组件、操作Task、算子状态。Rest API 实现类是 WebMonitorEndpoint

2 Flink Metrics 建立监控系统

Flink 主动提供 8 Report。

我们使用 PrometheusPushGatewayReporter 通过 prometheus pushgateway grafana 组件搭建 Flink On Yarn 可视化监控。

用户 使用 Flink 通过 session 模式向 yarn 集群提交一个 job 后,Flink 会通过 PrometheusPushGatewayReporter 将 metrics push 到 pushgateway 外部系统 prometheus 从 pushgateway 进行 pull通过 操作,收集指标Grafana显示可视化工具。原理图如下:

首先,我们先在 Flink On Yarn 集群中提交一个 Job 任务,让它运行,然后执行以下操作。

2.1 配置 Reporter

以下所有工具,jar 包全部下载,需要的朋友在微信官方账号后台回复:02,可以全部获取。

2.1.1 导包

将 flink-metrics-prometheus_2.11-1.13.2.jar 包导入 flink-1.13.2/bin 目录下。

2.1.2 配置 Reporter

选取 PrometheusPushGatewayReporter 通过在官网查询 Flink 1.13.2 Metrics 配置后, flink-conf.yaml 设置,配置如下:

                   
  • metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  •                
  • metrics.reporter.promgateway.host: 192.168.244.129
  •                
  • metrics.reporter.promgateway.port: 9091
  •                
  • metrics.reporter.promgateway.jobName: myJob
  •                
  • metrics.reporter.promgateway.randomJobNameSuffix: true
  •                
  • metrics.reporter.promgateway.deleteOnShutdown: false
  •                
  • metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
  •                
  • metrics.reporter.promgateway.interval: 60 SECONDS

2.2 部署 pushgateway

Pushgateway 是一种独立的服务,Pushgateway 位于应用程序发送指标和 Prometheus 服务器之间。

Pushgateway 接收指标,然后将其作为目标Prometheus 提取服务器。可视为代理服务,也可视为 blackbox exporter相反, 的行为接收测量,而不是探测。

2.2.1 解压 pushgateway

2.2.2. 启动 pushgateway

进入到 pushgateway-1.4.1 目录下

  • ./pushgateway&
  • 检查后台是否成功启动

  • psaux|greppushgateway
  • 2.2.3. 登录 pushgateway webui

    2.3 部署 prometheus

    Prometheus(普罗米修斯)最初是 SoundCloud 上构建的监控系统。自2012年以来, 已成为一个非常活跃的开发人员和用户社区的社区开源项目。强调开源和独立维护,Prometheus 2016年 年加入云原云计算基金会(CNCF),成为继Kubernetes 之后的第二个托管项目。

    2.3.1 解压prometheus-2.30.0

    2.3.2 编写配置文件

  • scrape_configs:
  • -job_name:'prometheus'
  • static_configs:
  • -targets:['192.168.244.129:9090']
  • labels:
  • instance:'prometheus'
  • -job_name:'linux'
  • static_configs:
  • -targets:['192.168.244.129:9100']
  • labels:
  • instance:'localhost'
  • -job_name:'pushgateway'
  • static_configs:
  • -targets:['192.168.244.129:9091']
  • labels:
  • instance:'pushgateway'
  • 2.3.3 启动prometheus

  • ./prometheus--config.file=prometheus.yml&
  • 启动完后,可以通过 ps 检查端口:

  • psaux|grepprometheus
  • 2.3.4 登录prometheus webui

    2.4 部署 grafana

    Grafana 是一种跨平台开源的测量分析和可视化工具,可以查询收集到的数据,然后可视化显示并及时通知。它主要具有以下六个特点:

                     
    • 显示方式:快速灵活的客户端图表,面板插件有许多不同方式的可视化指标和日志,官方图书馆有丰富的仪表板插件,如热图、折线图、图表等显示方式;
    •                
    • 数据源:Graphite,InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch 和 KairosDB 等;
    •                
    • 通知提醒:定义最重要指标的警报规则,Grafana在数据达到阈值时,通过 计算并发送通知Slack、PagerDuty 等通知;
    •                
    • 混合显示:将不同的数据源混合在同一图表中,可以根据每个查询指定的数据源,甚至自定义数据源;
    •                
    • 注:使用来自不同数据源的丰富事件注释图,在事件上悬挂鼠标将显示完整的事件元数据和标记;
    •                
    • 过滤器:Ad-hoc 过滤器允许动态创建新的键/值过滤器,这些过滤器将自动应用于源的所有查询。

    2.4.1 解压grafana-8.1.5

    2.4.2 启动grafana-8.1.5

  • ./bin/grafana-serverweb&
  • 2.4.3 登录 grafana

    用户名和密码登录 admin

    grafana 配置中文教程:

    https://grafana.com/docs/grafana/latest/datasources/prometheus/

    2.4.4 配置数据源,创建系统负载监控

    要访问 Prometheus 设置,请将鼠标悬挂在配置(齿轮)图标上,然后单击数据源,然后单击 Prometheus 数据源,按下图操作。

    操作完成后,点击验证。

    2.4.5 添加仪表板

    点击最左边的 号,选择 DashBoard,选择新的 pannel。

    至此,Flink 的 metrics 的指标展示在 Grafana 中了。

    flink 指标对应的指标名称较长,可在 Legend 配置显示内容{{key}} 将 key 可以用需要显示的相应字段代替,如:{{job_name}},{{operator_name}}。

    3 指标性能测试

    建立上述监控系统后,我们可以监控性能指标。现在介绍一个实际的战斗案例:

    3.1 业务场景介绍

    金融风险控制场景

    3.1.1 业务需求:

    Flink Source 从 data kafka topic 通过 中读取推理数据sql 预处理成模型推理要求的数据格式,在进行 keyBy connect 算子,模型 connect 后进入 Co-FlatMap 算子再推理,原来理图如下:

    3.1.2 业务要求:

    根据模型的复杂性,要求推理延迟到达 20ms 以内,全链路耗时 50ms 以内吞吐量达到每秒 1.2w 条以上。

    3.1.3 业务数据:

    推理数据:3000w,推理字段 495 ,机器学习 Xgboost 模型字段:495。

    3.2 指标解析

    性能测试要求全链路耗时50ms 以内,应使用 Flink Metrics 的 Latency Marker 计算。

    3.2.1 全链路时延计算方法 :

    全链路延迟是指进入 的推理数据source 从算子到数据预处理算子直到最后一个算子输出结果耗时,即处理数据需要多长时间,包括算子内处理逻辑时间、算子间数据传输时间

    使用 全链路时延latency metric 计算。latency metric 是由 source 算子是根据当前当地时间生成的 marker ,并不参与各个算子的逻辑计算,仅仅跟着数据往下游算子流动,每到达一个算子则算出当前本地时间戳并与 source 生成时间戳减少,得到 source 当计算器到达当前计算器的耗时时sink 算子或最后一个算子时,计算当前本地时间戳和 source 算子生成的时间戳减少,即全链路延迟。原理图如下:

    使用 Lateny marker,所有需要在 flink-conf.yaml 配置参数。

  • latency.metrics.interval
  • 系统配置截图如下:

    3.2.2 全链路吞吐计算方法 :

    全链路吞吐 = 单位时间处理数据数量 / 单位时间。

    3.3 提交任务Flink on Yarn集群

    **3.3.1 直接提交 Job **

  • #-mjobmanager的地址
  • #-yjm1024指定jobmanager的内存信息
  • #-ytm1024指定taskmanager的内存信息
  • bin/flinkrun\
  • -tyarn-per-job-yjm4096-ytm8800-s96\
  • --detached-ccom.threeknowbigdata.datastream.XgboostModelPrediction\
  • examples/batch/WordCount.jar\
  • 提交完成后,我们通过 Flink WEBUI 可见 job 操作任务结果如下:

    因为推理模型只是 model,在存在状态下,所以全链路吞吐考虑每秒有多少条推理数据进入 source 算子流出倒数第二个算子(最后一个算子只是指标汇总),这个条数是全链路吞吐。

    处理 可以看到2000W 条数据时,代码直接统计输出值和 flink webUI 统计值基本相同,统计值可信。

    Flink WEBUI 跑步结果数据

    打开 Prometheus 在对话框中输入全链路时延计算公式

  • 计算公式:
  • avg(flink_taskmanager_job_latency_source_id_
  • operator_id_operator_subtask_index_latency{
  • source_id="cbc357ccb763df2852fee8c4fc7d55f2",
  • operator_id="c9c0ca46716e76f6b700eddf4366d243",quantile="0.999"})
  • 3.4 优化前性能分析

    将任务提交给集群后,优化前的结果延迟指标统计图如下:

    吞吐指标统计图如下:

    测试结束后,从图中可以发现:

    时延指标:加并行度,吞吐量也跟随高,但是全链路时延大幅增长( 1并行至32并行,时延从 110ms 增加至 3287ms )

    这远未达到要求。

    3.5 问题分析

    通过 Prometheus分析结果如下:

    3.5.1 并行度问题 :

    反压现象: Flink WEB-UI 在上面,可以看到应用程序有非常严重的反压力,这表明链路中有更耗时的算子,阻塞了整个链路;

    数据处理慢于拉数据:数据源消耗数据的速度大于下游数据处理的速度;

    增加计算并行性:因此,推理算子并行性将在下一次测试中得到调整,相当于提高下游数据处理能力。

    3.5.2 Buffer 超时问题 :

    Flink 虽然是纯流框架,但缓存机制默认开启(上游积累的部分数据发送到下游);

    缓存机制可以增加应用的吞吐量,但也会增加延迟;

    推理场景:为获得最佳延迟指标,第二轮超时时间 0测试,记录吞吐量。

    3.5.3 Buffer 数量问题 :

    同上,Flink 中的 Buffer 可配置数量;

    Buffer 数量越多,可以缓存的数据就越多;

    推理场景:为了获得最佳延迟指标,第二轮测试:减少 Flink 的 Buffer 数量优化延迟指标。

    3.5.4 调整参数配置

    SOURCE 与 COFLATMAP 并行度按 1:12 配置;

    Buffer 超时配置为 0ms (默认100ms);

  • ///设置代码
  • senv.setBufferTimeout(0);
  • Buffer 数量配置如下:

    修改flink-conf.yaml

  • memory.buffers-per-channel:2
  • memory.float-buffers-per-gate:2
  • memory.max-buffers-per-channel:2
  • 配置截图如下:

    3.6 优化后性能分析

    修改配置后,任务再次提交给集群后,通过全链路延迟计算公式和吞吐延迟计算公式,最终得到优化结果。

    延迟指标统计图如下:

    吞吐指标统计图如下:

    优化后 LGB 推理测试总结 :

    延迟指标:平行度增加,延迟也会增加,但范围很小(可接受)。事实上,在测试过程中有一定的反向压力,如果增加 SOURCE 与 COFLATMAP 的并行比可以进一步降低整个链路的延迟;吞吐量指标:随着并行度的增加,当并行度增加到 96 时,吞吐量可以达到 1.3W,此时的延迟保持在 50ms 左右(比较稳定)。

    3.7 优化前后 LGB 分析总结

    如下图所示:

    3.7.1吞吐量---影响因素:

    内存:对吞吐和延迟没有影响,并行与吞吐成正相关。

                     
    • 增大 kafka 分区,吞吐量增加
    •                
    • 增大 source、维表 source 并行度
    •                
    • 增大 flatmap 并行推理

    3.7.2全链路时延---影响因素:

                     
    • Buffer 时间越短,数量越少,时间越低。
    •                
    • 整个链路是否有算子堵塞(车道排队模型)。
    •                
    • 增加推理算子并行度,减少时延,增加吞吐量(即增加推理处理能力)。

       
    • 评论列表:
    •  澄萌忿咬
       发布于 2022-06-23 07:17:15  回复该评论
    • romgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter                metrics.reporter.promgateway.host: 192.168

    发表评论:

    Powered By

    Copyright Your WebSite.Some Rights Reserved.