# learn_flink **Repository Path**: echo-52hz/learn_flink ## Basic Information - **Project Name**: learn_flink - **Description**: Flink 框架从入门到精通学习指南 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-11-03 - **Last Updated**: 2025-12-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink 框架从入门到精通学习指南 📊 定位:面向大数据开发初学者/进阶者的体系化学习文档,涵盖 Flink 核心原理、实操技巧与生产实践,遵循「基础认知 → 核心组件 → 进阶特性 → 生产落地」的梯度学习路径。 🔧 技术栈适配:Scala/Java 开发基础、分布式系统认知、大数据流处理场景认知(前置知识建议) # 一、基础认知:Flink 核心定位与生态体系 ## 1.1 为什么是 Flink?—— 流处理领域的技术优势 - 📌 核心定位:**分布式、高性能、高可用、准确的流处理与批处理框架**(批流一体的核心特性) - ⚡ 技术优势: - 低延迟:基于状态的增量计算,毫秒级响应 - 高吞吐:支持大规模并行处理,适配海量数据场景 - 准确性:提供 Exactly-Once 语义保障,解决数据重复计算问题 - 高可用:基于 Checkpoint + 状态后端机制,支持故障自动恢复 - 🌍 应用场景:实时数据大屏、风控实时预警、实时推荐系统、日志实时分析、流批一体数据处理 ## 1.2 Flink 与主流框架对比 |框架|处理模式|延迟|语义保障|核心优势| |---|---|---|---|---| |Flink|流批一体|毫秒级|Exactly-Once|状态管理完善、容错机制强、实时性优| |Spark Streaming|微批处理|秒级|Exactly-Once|生态完善、批处理能力强、易于上手| |Storm|纯流处理|毫秒级|At-Least-Once|轻量、低延迟、部署简单| ## 1.3 Flink 生态组件概览 Flink 生态围绕核心计算引擎,提供了从数据接入、处理到输出的全链路工具链: - 🔹 核心引擎:Flink Core(分布式计算核心,负责任务调度、状态管理、容错) - 🔹 数据接入:Flink Connectors(支持 Kafka、RabbitMQ、HDFS、MySQL、Redis 等主流存储/消息中间件) - 🔹 流处理API:DataStream API(无界流处理,核心API) - 🔹 批处理API:DataSet API(有界批处理,Flink 1.12+ 推荐使用 BatchTable API 替代) - 🔹 高阶API:Table API & SQL(声明式API,降低开发门槛,支持批流统一语法) - 🔹 状态管理:State Backends(Memory、Fs、RocksDB 等状态存储方案) - 🔹 部署模式:Standalone、YARN、K8s、Mesos(适配不同集群环境) - 🔹 监控运维:Flink Web UI、Metrics、Logging(可视化监控与问题排查) ## 1.4 环境搭建:快速上手第一个 Flink 程序 ### 1.4.1 前置环境准备 - JDK 8+(Flink 1.17+ 支持 JDK 11) - Maven 3.6+(项目构建工具) - Scala 2.12/2.13(若使用 Scala 开发) - IDE:IntelliJ IDEA(推荐,搭配 Flink 插件) ### 1.4.2 本地环境搭建(以 1.17.0 版本为例) 1. 下载安装包:从 [Flink 官网](https://flink.apache.org/downloads.html) 下载 flink-1.17.0-bin-scala_2.12.tgz 2. 解压:tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /usr/local/ 3. 启动本地集群:cd /usr/local/flink-1.17.0 && ./bin/start-cluster.sh 4. 验证:访问 http://localhost:8081,查看 Flink Web UI(默认端口 8081) 5. 运行示例程序:./bin/flink run examples/streaming/WordCount.jar,查看输出结果(默认在 log/flink-*-taskexecutor-*.out) ### 1.4.3 IDE 项目搭建(Maven) 创建 Maven 项目,在 pom.xml 中添加 Flink 核心依赖: ```xml org.apache.flink flink-java 1.17.0 org.apache.flink flink-streaming-java 1.17.0 org.apache.flink flink-clients 1.17.0 ``` ### 1.4.4 第一个 Flink 程序:WordCount(流处理版) ```java import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建执行环境(流处理环境) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取数据(从本地端口 9999 读取文本流) DataStream inputStream = env.socketTextStream("localhost", 9999); // 3. 数据处理:分词 → 分组 → 聚合 DataStream> resultStream = inputStream .flatMap(new FlatMapFunction>() { @Override public void flatMap(String line, Collector> out) throws Exception { // 分词:将一行文本拆分为多个单词 String[] words = line.split(" "); for (String word : words) { // 输出 (单词, 1) out.collect(new Tuple2<>(word, 1)); } } }) .keyBy(tuple -> tuple.f0) // 按单词分组(keyBy 是逻辑分组,基于 key 哈希分区) .sum(1); // 对第二个字段(计数)求和 // 4. 输出结果(打印到控制台) resultStream.print(); // 5. 执行任务(流处理任务必须调用 execute() 启动) env.execute("Stream WordCount"); } } ``` 运行步骤: 1. 启动本地端口监听:nc -lk 9999(Linux/Mac),Windows 可使用 telnet localhost 9999 2. 运行 StreamWordCount 程序 3. 在端口监听窗口输入文本(如 "hello flink hello world"),查看程序控制台输出结果 # 二、核心组件:Flink 流处理核心原理 ## 2.1 执行环境:StreamExecutionEnvironment StreamExecutionEnvironment 是 Flink 流处理程序的入口,负责创建数据流、调度任务、管理资源,核心作用: - 📌 环境创建: - 本地环境:StreamExecutionEnvironment.createLocalEnvironment()(手动指定并行度) - 集群环境:StreamExecutionEnvironment.getExecutionEnvironment()(自动适配运行环境,本地/集群通用) - ⚙️ 核心配置: - 设置并行度:env.setParallelism(4)(默认并行度为 CPU 核心数) - 禁用 Operator Chain:env.disableOperatorChaining()(默认会将相邻的算子链化,减少数据传输开销) - 设置 Checkpoint 间隔:env.enableCheckpointing(5000)(每隔 5 秒执行一次 Checkpoint) ## 2.2 数据流:DataStream 核心概念 DataStream 是 Flink 流处理的核心数据结构,代表一个**无界的、连续的数据流**,具有以下特性: - 🔹 无界性:数据流无固定结束点,持续产生数据(批处理 DataSet 是有界的) - 🔹 并行性:DataStream 可被划分为多个并行子流(Subtask),并行处理提升吞吐量 - 🔹 不可变性:DataStream 本身不可修改,算子操作会生成新的 DataStream 核心数据流操作分类: ### 2.2.1 源操作(Source)—— 数据输入 Source 是 DataStream 的输入源,负责从外部系统读取数据,常见 Source: - 基础 Source: - socketTextStream:从 TCP 端口读取文本流(测试常用) - fromCollection:从 Java/Scala 集合读取数据(测试常用) - fromElements:从单个元素创建数据流 - 生产级 Source(通过 Connectors): - Kafka Source:org.apache.flink:flink-connector-kafka:1.17.0(最常用,实时数据接入) - HDFS Source:读取 HDFS 文本/文件数据 - MySQL Source:通过 JDBC Connector 读取数据库数据 ### 2.2.2 转换操作(Transformation)—— 数据处理 Transformation 是对 DataStream 的处理逻辑,将一个 DataStream 转换为另一个 DataStream,核心分类: 1. 单流转换(针对单个 DataStream 处理): - map:一对一转换,输入一个元素输出一个元素(如将 String 转换为 Tuple) - flatMap:一对多转换,输入一个元素输出多个元素(如分词) - filter:过滤,保留满足条件的元素(如过滤掉空值) - keyBy:逻辑分组,将 DataStream 按 key 划分为多个 KeyedStream(后续聚合操作的前提) - reduce:聚合,对 KeyedStream 按自定义逻辑聚合(如求和、求最大值) - sum/min/max:内置聚合,对 KeyedStream 的指定字段聚合 2. 多流转换(合并/连接多个 DataStream): - union:合并多个同类型的 DataStream,输出类型与输入一致(可合并多个) - connect:连接两个不同类型的 DataStream,保留各自的类型,可通过 CoMap/CoFlatMap 处理 - join:基于时间窗口的关联,将两个 DataStream 中满足条件的元素关联(如订单流与用户流关联) ### 2.2.3 汇操作(Sink)—— 数据输出 Sink 是 DataStream 的输出目的地,负责将处理后的数据写入外部系统,常见 Sink: - 基础 Sink: - print/printToErr:打印到控制台(测试常用) - writeAsText:写入文本文件(批处理场景常用) - 生产级 Sink(通过 Connectors): - Kafka Sink:写入 Kafka 主题 - Redis Sink:写入 Redis(如计数结果存储) - MySQL Sink:通过 JDBC Connector 写入数据库 - HDFS Sink:写入 HDFS 分布式文件系统 ## 2.3 并行度:Flink 任务的并行执行机制 并行度(Parallelism)是 Flink 任务并行执行的核心指标,代表一个算子被拆分为多少个并行子任务(Subtask)同时执行,影响吞吐量和资源占用。 ### 2.3.1 并行度的层级优先级(从高到低) 1. 算子级:单个算子设置并行度(最精细,如 resultStream.print().setParallelism(2)) 2. 环境级:全局设置并行度(env.setParallelism(4)) 3. 集群级:flink-conf.yaml 中配置(parallelism.default: 2,默认值) 4. 提交任务时指定:./bin/flink run -p 4 xxx.jar(通过 -p 参数指定) ### 2.3.2 并行任务的执行模型 - Task:Flink 任务的基本执行单元,一个 Task 对应一个 Subtask - TaskManager:Flink 集群的工作节点,负责运行 Task,每个 TaskManager 有多个 Task Slot(资源槽) - Slot:资源分配的最小单元,每个 Slot 对应一个 CPU 核心(默认),一个 TaskManager 的 Slot 数量决定其可运行的并行任务数 示例:一个并行度为 4 的 WordCount 程序,flatMap、keyBy、sum 算子各有 4 个 Subtask,共 12 个 Task,若集群有 2 个 TaskManager,每个有 2 个 Slot,则任务会分布式运行在 2 个节点上。 # 三、进阶特性:状态管理与容错机制 ## 3.1 状态管理:Flink 流处理的核心能力 状态(State)是 Flink 算子在处理数据过程中需要保存的中间结果(如聚合计数、窗口数据、关联缓存等),是实现复杂业务逻辑的基础。Flink 提供了完善的状态管理机制,支持状态的持久化、恢复和扩容。 ### 3.1.1 状态的分类 1. 按范围划分: - Keyed State(键控状态):与 key 绑定,每个 key 对应一个独立的状态,仅能在 KeyedStream 上使用(如 keyBy 后的聚合状态) - Operator State(算子状态):与算子实例绑定,每个 Subtask 对应一个状态,不依赖 key(如 Source 算子的偏移量状态) 2. 按数据结构划分(Keyed State 支持): - ValueState:单个值状态(如计数) - ListState:列表状态(如窗口内的所有数据) - MapState:映射状态(如 key-value 缓存) - ReducingState:聚合状态(按指定逻辑聚合) ### 3.1.2 状态的使用方式(以 Keyed State 为例) ```java import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; // 自定义 KeyedProcessFunction,统计每个单词的出现次数(使用 ValueState) public class WordCountProcessFunction extends KeyedProcessFunction> { // 定义 ValueState(存储单词计数) private ValueState countState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 初始化状态:创建 ValueStateDescriptor(描述状态名称和类型) ValueStateDescriptor descriptor = new ValueStateDescriptor<>( "word-count-state", // 状态名称 Types.INT // 状态类型 ); // 从上下文获取状态 countState = getRuntimeContext().getState(descriptor); } @Override public void processElement(String word, Context ctx, Collector> out) throws Exception { // 获取当前状态值(若为 null,初始化为 0) Integer count = countState.value() == null ? 0 : countState.value(); // 更新状态(计数 +1) countState.update(count + 1); // 输出结果 out.collect(new Tuple2<>(word, countState.value())); } @Override public void close() throws Exception { super.close(); // 关闭资源(状态会自动持久化,无需手动关闭) countState.clear(); } } ``` ### 3.1.3 状态后端(State Backend)—— 状态的存储方案 状态后端负责状态的存储、持久化和访问,Flink 提供三种内置状态后端: |状态后端|存储位置|优点|缺点|适用场景| |---|---|---|---|---| |MemoryStateBackend|TaskManager 内存|访问速度快、轻量|状态大小受内存限制,故障后状态丢失|测试环境、无状态/小状态任务| |FsStateBackend|本地文件系统/HDFS(状态元数据在内存)|状态大小不受内存限制,支持持久化|访问速度较慢(依赖文件系统)|生产环境、中大规模状态任务| |RocksDBStateBackend|RocksDB 数据库(本地磁盘,支持持久化到 HDFS)|支持超大状态、读写性能优、支持增量 Checkpoint|相比内存后端有一定性能开销|生产环境、超大状态任务(TB 级)| 配置方式(以 RocksDBStateBackend 为例): ```java // 代码中配置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置 RocksDB 状态后端,持久化到 HDFS env.setStateBackend(new RocksDBStateBackend("hdfs://localhost:9000/flink/state", true)); // 或在 flink-conf.yaml 中配置(全局生效) state.backend: rocksdb state.backend.rocksdb.checkpoint.transfer.thread.num: 4 state.checkpoints.dir: hdfs://localhost:9000/flink/checkpoints ``` ## 3.2 容错机制:Checkpoint 与 Savepoint Flink 通过 Checkpoint 机制实现故障容错,核心思想是**定期对状态进行快照(Snapshot)**,当任务故障时,从最近的 Checkpoint 恢复状态,保证数据处理的准确性。 ### 3.2.1 Checkpoint 核心原理 1. 触发:由 JobManager 定期触发(可配置间隔),采用「异步快照」机制,不阻塞数据处理 2. 快照过程:基于 Chandy-Lamport 算法,从 Source 算子开始,向下游算子发送「屏障(Barrier)」,当算子收到所有输入流的屏障后,对状态进行快照并持久化到状态后端 3. 恢复:任务故障后,JobManager 重新部署任务,从状态后端读取最近的 Checkpoint 快照,恢复每个算子的状态,然后从 Checkpoint 对应的数据流位置继续处理 ### 3.2.2 Checkpoint 配置 ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 启用 Checkpoint(间隔 5 秒) env.enableCheckpointing(5000); // 2. 配置 Checkpoint 模式(Exactly-Once/At-Least-Once) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 3. 配置超时时间(Checkpoint 执行超时 60 秒则失败) env.getCheckpointConfig().setCheckpointTimeout(60000); // 4. 配置最大并发 Checkpoint 数(同一时间最多 1 个 Checkpoint 执行) env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 5. 配置 Checkpoint 失败后的处理策略(失败不终止任务) env.getCheckpointConfig().setFailOnCheckpointingErrors(false); // 6. 配置 Checkpoint 持久化(任务取消后保留 Checkpoint,默认删除) env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ``` ### 3.2.3 Checkpoint 与 Savepoint 的区别 |特性|Checkpoint|Savepoint| |---|---|---| |触发方式|自动触发(定期)|手动触发(用户主动)| |用途|故障恢复(自动)|版本升级、任务迁移、手动恢复| |生命周期|由 Flink 自动管理(过期删除)|用户手动管理(不自动删除)| |创建命令|代码/配置启用,自动创建|./bin/flink savepoint | # 四、时间与窗口:Flink 流处理的时序逻辑 流处理的核心是「时序性」,Flink 提供完善的时间语义和窗口机制,解决无界流的时序处理问题(如统计每 5 分钟的订单总量)。 ## 4.1 时间语义:三种核心时间类型 Flink 支持三种时间语义,适配不同的业务场景: 1. Event Time(事件时间): - 定义:事件产生的时间(如日志的 timestamp 字段) - 核心优势:不受处理延迟、乱序影响,能准确反映事件的真实时序关系 - 适用场景:对时序准确性要求高的场景(如风控、实时统计) - 关键配置:需要指定事件时间字段,并配置 Watermark 处理乱序数据 2. Processing Time(处理时间): - 定义:数据被 Flink 算子处理时的系统时间 - 核心优势:简单、无额外开销(无需解析事件时间、生成 Watermark) - 缺点:受处理延迟、集群负载影响,时序准确性差 - 适用场景:对时序准确性要求低的场景(如测试、实时监控粗略统计) 3. Ingestion Time(摄入时间): - 定义:数据被 Flink Source 算子摄入的时间 - 特点:介于 Event Time 和 Processing Time 之间,比 Processing Time 准确,比 Event Time 简单 - 适用场景:无法获取事件时间,但需要一定时序准确性的场景 配置时间语义: ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 配置 Event Time(推荐) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2. 配置 Processing Time(默认) // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 3. 配置 Ingestion Time // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); ``` ## 4.2 Watermark:乱序数据的时序对齐机制 Event Time 场景下,数据可能因网络延迟、分布式传输等原因乱序到达(即后产生的事件先被处理)。Watermark 是 Flink 解决乱序问题的核心机制,本质是「一个带有时间戳的标记」,用于告知 Flink :「小于该时间戳的事件都已到达,后续不会再出现」。 ### 4.2.1 Watermark 的核心规则 - Watermark 时间戳 = 最大事件时间 - 乱序容忍度(允许延迟的时间) - 当 Watermark 时间戳 >= 窗口结束时间时,触发窗口计算 - Watermark 由 Source 算子生成,或通过 assignTimestampsAndWatermarks 算子生成 ### 4.2.2 Watermark 生成方式 Flink 提供两种内置的 Watermark 生成器,也支持自定义: 1. 周期性 Watermark 生成器(Periodic Watermarks): `DataStream logStream = env.addSource(new KafkaSource()); // 提取事件时间字段,并生成 Watermark DataStream eventTimeStream = logStream .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()) // 提取事件时间(毫秒级) );` - 特点:按固定周期(默认 200ms)生成 Watermark,适用于事件均匀产生的场景 - 示例(处理乱序数据,容忍延迟 5 秒): 2. 断点式 Watermark 生成器(Punctuated Watermarks): `WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((ctx) -> new PunctuatedWatermarkGenerator() { @Override public void onEvent(LogEvent event, long eventTimestamp, PunctuatedWatermarkOutput output) { // 当事件类型为 "MARKER" 时,生成 Watermark if ("MARKER".equals(event.getType())) { output.emitWatermark(new Watermark(eventTimestamp)); } } @Override public void onPeriodicEmit(PunctuatedWatermarkOutput output) { // 断点式生成器无需周期性触发,空实现 } }) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); DataStream eventTimeStream = logStream.assignTimestampsAndWatermarks(watermarkStrategy);` - 特点:基于特定事件触发 Watermark 生成(如遇到特定标记事件),适用于事件非均匀产生的场景 - 示例(遇到标记事件时生成 Watermark): ## 4.3 窗口机制:无界流的分段处理 窗口(Window)是将无界流划分为有界流的核心手段,Flink 按不同维度将窗口分为多种类型,适配不同的业务需求。 ### 4.3.1 窗口的核心分类(按划分方式) 1. Time Window(时间窗口):按时间划分窗口,最常用 - Tumbling Time Window(滚动时间窗口): `DataStream> resultStream = inputStream .keyBy(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) // 滚动事件时间窗口 .sum(1);` - 特点:窗口不重叠、连续,每个窗口有固定大小(如每 5 分钟一个窗口) - 适用场景:统计固定时间段内的聚合结果(如每小时订单总量) - 示例(5 分钟滚动窗口,Event Time): - Sliding Time Window(滑动时间窗口): `DataStream> resultStream = inputStream .keyBy(tuple -> tuple.f0) .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5))) // 滑动事件时间窗口 .sum(1);` - 特点:窗口有重叠,由窗口大小和滑动步长决定(如窗口大小 10 分钟,滑动步长 5 分钟,每 5 分钟生成一个窗口,每个窗口重叠 5 分钟) - 适用场景:需要连续统计的场景(如每 5 分钟统计最近 10 分钟的订单总量) - 示例(10 分钟窗口,5 分钟滑动,Event Time): - Session Window(会话窗口): `DataStream> resultStream = inputStream .keyBy(tuple -> tuple.f0) .window(EventTimeSessionWindows.withGap(Time.seconds(30))) // 会话窗口 .sum(1);` - 特点:基于会话间隔划分,当两个事件的时间间隔超过会话间隔时,开启新窗口(无固定大小) - 适用场景:统计用户会话内的行为(如用户一次浏览会话的点击量,会话间隔 30 秒) - 示例(会话间隔 30 秒,Event Time): 2. Count Window(计数窗口):按数据量划分窗口 `DataStream> resultStream = inputStream .keyBy(tuple -> tuple.f0) .countWindow(10) // 滚动计数窗口 .sum(1);` - Tumbling Count Window(滚动计数窗口):每 N 个元素一个窗口(如每 100 个元素统计一次) - Sliding Count Window(滑动计数窗口):每 M 个元素滑动一次,窗口大小 N 个元素(如每 50 个元素统计最近 100 个元素) - 示例(滚动计数窗口,每 10 个元素一个窗口): ### 4.3.2 窗口函数:窗口内数据的处理逻辑 窗口函数定义了窗口触发后如何处理窗口内的数据,Flink 支持三种核心窗口函数: 1. 增量聚合函数(Incremental Aggregation Functions): `DataStream> resultStream = inputStream .keyBy(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .reduce(new ReduceFunction>() { @Override public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { // 对相同 key 的两个元素聚合(计数求和) return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } });` - 特点:窗口内数据到来时实时聚合,只保存聚合结果(不保存原始数据),效率高 - 常见函数:sum、min、max、reduce、aggregate - 示例(自定义 reduce 函数,统计窗口内单词出现次数): 2. 全窗口函数(Full Window Functions): `DataStream resultStream = inputStream .keyBy(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new ProcessWindowFunction, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable> elements, Collector out) throws Exception { // 统计窗口内元素个数(即单词出现次数) int count = 0; for (Tuple2 element : elements) { count++; } // 获取窗口元数据(开始时间、结束时间) TimeWindow window = context.window(); long start = window.getStart(); long end = window.getEnd(); // 输出结果 out.collect("Key: " + key + ", 窗口时间: [" + start + "," + end + "), 出现次数: " + count); } });` - 特点:窗口触发后,一次性处理窗口内的所有原始数据,灵活性高(可实现复杂逻辑) - 常见函数:apply(通用全窗口函数)、ProcessWindowFunction(更强大,支持访问窗口元数据、状态) - 缺点:需要保存窗口内所有原始数据,内存开销大 - 示例(ProcessWindowFunction,统计窗口内单词出现次数及窗口信息): 3. 增量聚合 + 全窗口函数(组合使用): `DataStream resultStream = inputStream .keyBy(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .reduce( new ReduceFunction>() { @Override public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }, new ProcessWindowFunction, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable> elements, Collector out) throws Exception { Tuple2 result = elements.iterator().next(); TimeWindow window = context.window(); out.collect("Key: " + key + ", 窗口时间: [" + window.getStart() + "," + window.getEnd() + "), 出现次数: " + result.f1); } } );` - 特点:结合两者优势,先通过增量聚合函数聚合数据(减少内存开销),再通过全窗口函数处理聚合结果(获取窗口元数据) - 示例: # 五、生产落地:部署、优化与监控 ## 5.1 部署模式:Flink 集群部署方案 Flink 支持多种部署模式,适配不同的集群环境和资源管理方案: ### 5.1.1 Standalone 模式(独立集群) 最基础的部署模式,不依赖外部资源管理系统,适用于测试/小型生产环境: 1. 集群规划:1 个 JobManager + N 个 TaskManager 2. 配置步骤: - 修改 flink-conf.yaml:配置 jobmanager.rpc.address(JobManager 地址)、taskmanager.numberOfTaskSlots(每个 TaskManager 的 Slot 数) - 修改 slaves 文件:添加所有 TaskManager 的主机名/IP - 分发配置文件到所有节点:scp -r flink-1.17.0/ node2:/usr/local/ - 启动集群:./bin/start-cluster.sh 3. 提交任务:./bin/flink run -m node1:8081 -p 4 xxx.jar(-m 指定 JobManager 地址) ### 5.1.2 YARN 模式(推荐生产环境) 依赖 Hadoop YARN 进行资源管理,支持动态资源分配,适配大规模集群: 1. 核心优势: - 资源动态分配:根据任务需求自动申请/释放资源 - 高可用:借助 YARN 的资源管理能力,支持 JobManager 高可用部署 - 集成 Hadoop 生态:方便读取 HDFS、HBase 等数据 2. 部署类型: - Session Mode(会话模式):启动一个 YARN Session,多个任务共享资源(适用于小任务) - Per-Job Mode(单任务模式):每个任务启动一个独立的 YARN Session,资源隔离性好(适用于大任务,推荐) - Application Mode(应用模式):任务直接在 YARN 上运行,无需客户端提交(减少客户端压力