# learn_flink
**Repository Path**: echo-52hz/learn_flink
## Basic Information
- **Project Name**: learn_flink
- **Description**: Flink 框架从入门到精通学习指南
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-11-03
- **Last Updated**: 2025-12-29
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Flink 框架从入门到精通学习指南
📊 定位:面向大数据开发初学者/进阶者的体系化学习文档,涵盖 Flink 核心原理、实操技巧与生产实践,遵循「基础认知 → 核心组件 → 进阶特性 → 生产落地」的梯度学习路径。
🔧 技术栈适配:Scala/Java 开发基础、分布式系统认知、大数据流处理场景认知(前置知识建议)
# 一、基础认知:Flink 核心定位与生态体系
## 1.1 为什么是 Flink?—— 流处理领域的技术优势
- 📌 核心定位:**分布式、高性能、高可用、准确的流处理与批处理框架**(批流一体的核心特性)
- ⚡ 技术优势:
- 低延迟:基于状态的增量计算,毫秒级响应
- 高吞吐:支持大规模并行处理,适配海量数据场景
- 准确性:提供 Exactly-Once 语义保障,解决数据重复计算问题
- 高可用:基于 Checkpoint + 状态后端机制,支持故障自动恢复
- 🌍 应用场景:实时数据大屏、风控实时预警、实时推荐系统、日志实时分析、流批一体数据处理
## 1.2 Flink 与主流框架对比
|框架|处理模式|延迟|语义保障|核心优势|
|---|---|---|---|---|
|Flink|流批一体|毫秒级|Exactly-Once|状态管理完善、容错机制强、实时性优|
|Spark Streaming|微批处理|秒级|Exactly-Once|生态完善、批处理能力强、易于上手|
|Storm|纯流处理|毫秒级|At-Least-Once|轻量、低延迟、部署简单|
## 1.3 Flink 生态组件概览
Flink 生态围绕核心计算引擎,提供了从数据接入、处理到输出的全链路工具链:
- 🔹 核心引擎:Flink Core(分布式计算核心,负责任务调度、状态管理、容错)
- 🔹 数据接入:Flink Connectors(支持 Kafka、RabbitMQ、HDFS、MySQL、Redis 等主流存储/消息中间件)
- 🔹 流处理API:DataStream API(无界流处理,核心API)
- 🔹 批处理API:DataSet API(有界批处理,Flink 1.12+ 推荐使用 BatchTable API 替代)
- 🔹 高阶API:Table API & SQL(声明式API,降低开发门槛,支持批流统一语法)
- 🔹 状态管理:State Backends(Memory、Fs、RocksDB 等状态存储方案)
- 🔹 部署模式:Standalone、YARN、K8s、Mesos(适配不同集群环境)
- 🔹 监控运维:Flink Web UI、Metrics、Logging(可视化监控与问题排查)
## 1.4 环境搭建:快速上手第一个 Flink 程序
### 1.4.1 前置环境准备
- JDK 8+(Flink 1.17+ 支持 JDK 11)
- Maven 3.6+(项目构建工具)
- Scala 2.12/2.13(若使用 Scala 开发)
- IDE:IntelliJ IDEA(推荐,搭配 Flink 插件)
### 1.4.2 本地环境搭建(以 1.17.0 版本为例)
1. 下载安装包:从 [Flink 官网](https://flink.apache.org/downloads.html) 下载 flink-1.17.0-bin-scala_2.12.tgz
2. 解压:tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /usr/local/
3. 启动本地集群:cd /usr/local/flink-1.17.0 && ./bin/start-cluster.sh
4. 验证:访问 http://localhost:8081,查看 Flink Web UI(默认端口 8081)
5. 运行示例程序:./bin/flink run examples/streaming/WordCount.jar,查看输出结果(默认在 log/flink-*-taskexecutor-*.out)
### 1.4.3 IDE 项目搭建(Maven)
创建 Maven 项目,在 pom.xml 中添加 Flink 核心依赖:
```xml
org.apache.flink
flink-java
1.17.0
org.apache.flink
flink-streaming-java
1.17.0
org.apache.flink
flink-clients
1.17.0
```
### 1.4.4 第一个 Flink 程序:WordCount(流处理版)
```java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境(流处理环境)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取数据(从本地端口 9999 读取文本流)
DataStream inputStream = env.socketTextStream("localhost", 9999);
// 3. 数据处理:分词 → 分组 → 聚合
DataStream> resultStream = inputStream
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String line, Collector> out) throws Exception {
// 分词:将一行文本拆分为多个单词
String[] words = line.split(" ");
for (String word : words) {
// 输出 (单词, 1)
out.collect(new Tuple2<>(word, 1));
}
}
})
.keyBy(tuple -> tuple.f0) // 按单词分组(keyBy 是逻辑分组,基于 key 哈希分区)
.sum(1); // 对第二个字段(计数)求和
// 4. 输出结果(打印到控制台)
resultStream.print();
// 5. 执行任务(流处理任务必须调用 execute() 启动)
env.execute("Stream WordCount");
}
}
```
运行步骤:
1. 启动本地端口监听:nc -lk 9999(Linux/Mac),Windows 可使用 telnet localhost 9999
2. 运行 StreamWordCount 程序
3. 在端口监听窗口输入文本(如 "hello flink hello world"),查看程序控制台输出结果
# 二、核心组件:Flink 流处理核心原理
## 2.1 执行环境:StreamExecutionEnvironment
StreamExecutionEnvironment 是 Flink 流处理程序的入口,负责创建数据流、调度任务、管理资源,核心作用:
- 📌 环境创建:
- 本地环境:StreamExecutionEnvironment.createLocalEnvironment()(手动指定并行度)
- 集群环境:StreamExecutionEnvironment.getExecutionEnvironment()(自动适配运行环境,本地/集群通用)
- ⚙️ 核心配置:
- 设置并行度:env.setParallelism(4)(默认并行度为 CPU 核心数)
- 禁用 Operator Chain:env.disableOperatorChaining()(默认会将相邻的算子链化,减少数据传输开销)
- 设置 Checkpoint 间隔:env.enableCheckpointing(5000)(每隔 5 秒执行一次 Checkpoint)
## 2.2 数据流:DataStream 核心概念
DataStream 是 Flink 流处理的核心数据结构,代表一个**无界的、连续的数据流**,具有以下特性:
- 🔹 无界性:数据流无固定结束点,持续产生数据(批处理 DataSet 是有界的)
- 🔹 并行性:DataStream 可被划分为多个并行子流(Subtask),并行处理提升吞吐量
- 🔹 不可变性:DataStream 本身不可修改,算子操作会生成新的 DataStream
核心数据流操作分类:
### 2.2.1 源操作(Source)—— 数据输入
Source 是 DataStream 的输入源,负责从外部系统读取数据,常见 Source:
- 基础 Source:
- socketTextStream:从 TCP 端口读取文本流(测试常用)
- fromCollection:从 Java/Scala 集合读取数据(测试常用)
- fromElements:从单个元素创建数据流
- 生产级 Source(通过 Connectors):
- Kafka Source:org.apache.flink:flink-connector-kafka:1.17.0(最常用,实时数据接入)
- HDFS Source:读取 HDFS 文本/文件数据
- MySQL Source:通过 JDBC Connector 读取数据库数据
### 2.2.2 转换操作(Transformation)—— 数据处理
Transformation 是对 DataStream 的处理逻辑,将一个 DataStream 转换为另一个 DataStream,核心分类:
1. 单流转换(针对单个 DataStream 处理):
- map:一对一转换,输入一个元素输出一个元素(如将 String 转换为 Tuple)
- flatMap:一对多转换,输入一个元素输出多个元素(如分词)
- filter:过滤,保留满足条件的元素(如过滤掉空值)
- keyBy:逻辑分组,将 DataStream 按 key 划分为多个 KeyedStream(后续聚合操作的前提)
- reduce:聚合,对 KeyedStream 按自定义逻辑聚合(如求和、求最大值)
- sum/min/max:内置聚合,对 KeyedStream 的指定字段聚合
2. 多流转换(合并/连接多个 DataStream):
- union:合并多个同类型的 DataStream,输出类型与输入一致(可合并多个)
- connect:连接两个不同类型的 DataStream,保留各自的类型,可通过 CoMap/CoFlatMap 处理
- join:基于时间窗口的关联,将两个 DataStream 中满足条件的元素关联(如订单流与用户流关联)
### 2.2.3 汇操作(Sink)—— 数据输出
Sink 是 DataStream 的输出目的地,负责将处理后的数据写入外部系统,常见 Sink:
- 基础 Sink:
- print/printToErr:打印到控制台(测试常用)
- writeAsText:写入文本文件(批处理场景常用)
- 生产级 Sink(通过 Connectors):
- Kafka Sink:写入 Kafka 主题
- Redis Sink:写入 Redis(如计数结果存储)
- MySQL Sink:通过 JDBC Connector 写入数据库
- HDFS Sink:写入 HDFS 分布式文件系统
## 2.3 并行度:Flink 任务的并行执行机制
并行度(Parallelism)是 Flink 任务并行执行的核心指标,代表一个算子被拆分为多少个并行子任务(Subtask)同时执行,影响吞吐量和资源占用。
### 2.3.1 并行度的层级优先级(从高到低)
1. 算子级:单个算子设置并行度(最精细,如 resultStream.print().setParallelism(2))
2. 环境级:全局设置并行度(env.setParallelism(4))
3. 集群级:flink-conf.yaml 中配置(parallelism.default: 2,默认值)
4. 提交任务时指定:./bin/flink run -p 4 xxx.jar(通过 -p 参数指定)
### 2.3.2 并行任务的执行模型
- Task:Flink 任务的基本执行单元,一个 Task 对应一个 Subtask
- TaskManager:Flink 集群的工作节点,负责运行 Task,每个 TaskManager 有多个 Task Slot(资源槽)
- Slot:资源分配的最小单元,每个 Slot 对应一个 CPU 核心(默认),一个 TaskManager 的 Slot 数量决定其可运行的并行任务数
示例:一个并行度为 4 的 WordCount 程序,flatMap、keyBy、sum 算子各有 4 个 Subtask,共 12 个 Task,若集群有 2 个 TaskManager,每个有 2 个 Slot,则任务会分布式运行在 2 个节点上。
# 三、进阶特性:状态管理与容错机制
## 3.1 状态管理:Flink 流处理的核心能力
状态(State)是 Flink 算子在处理数据过程中需要保存的中间结果(如聚合计数、窗口数据、关联缓存等),是实现复杂业务逻辑的基础。Flink 提供了完善的状态管理机制,支持状态的持久化、恢复和扩容。
### 3.1.1 状态的分类
1. 按范围划分:
- Keyed State(键控状态):与 key 绑定,每个 key 对应一个独立的状态,仅能在 KeyedStream 上使用(如 keyBy 后的聚合状态)
- Operator State(算子状态):与算子实例绑定,每个 Subtask 对应一个状态,不依赖 key(如 Source 算子的偏移量状态)
2. 按数据结构划分(Keyed State 支持):
- ValueState:单个值状态(如计数)
- ListState:列表状态(如窗口内的所有数据)
- MapState:映射状态(如 key-value 缓存)
- ReducingState:聚合状态(按指定逻辑聚合)
### 3.1.2 状态的使用方式(以 Keyed State 为例)
```java
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
// 自定义 KeyedProcessFunction,统计每个单词的出现次数(使用 ValueState)
public class WordCountProcessFunction extends KeyedProcessFunction> {
// 定义 ValueState(存储单词计数)
private ValueState countState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态:创建 ValueStateDescriptor(描述状态名称和类型)
ValueStateDescriptor descriptor = new ValueStateDescriptor<>(
"word-count-state", // 状态名称
Types.INT // 状态类型
);
// 从上下文获取状态
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(String word, Context ctx, Collector> out) throws Exception {
// 获取当前状态值(若为 null,初始化为 0)
Integer count = countState.value() == null ? 0 : countState.value();
// 更新状态(计数 +1)
countState.update(count + 1);
// 输出结果
out.collect(new Tuple2<>(word, countState.value()));
}
@Override
public void close() throws Exception {
super.close();
// 关闭资源(状态会自动持久化,无需手动关闭)
countState.clear();
}
}
```
### 3.1.3 状态后端(State Backend)—— 状态的存储方案
状态后端负责状态的存储、持久化和访问,Flink 提供三种内置状态后端:
|状态后端|存储位置|优点|缺点|适用场景|
|---|---|---|---|---|
|MemoryStateBackend|TaskManager 内存|访问速度快、轻量|状态大小受内存限制,故障后状态丢失|测试环境、无状态/小状态任务|
|FsStateBackend|本地文件系统/HDFS(状态元数据在内存)|状态大小不受内存限制,支持持久化|访问速度较慢(依赖文件系统)|生产环境、中大规模状态任务|
|RocksDBStateBackend|RocksDB 数据库(本地磁盘,支持持久化到 HDFS)|支持超大状态、读写性能优、支持增量 Checkpoint|相比内存后端有一定性能开销|生产环境、超大状态任务(TB 级)|
配置方式(以 RocksDBStateBackend 为例):
```java
// 代码中配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 RocksDB 状态后端,持久化到 HDFS
env.setStateBackend(new RocksDBStateBackend("hdfs://localhost:9000/flink/state", true));
// 或在 flink-conf.yaml 中配置(全局生效)
state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.checkpoints.dir: hdfs://localhost:9000/flink/checkpoints
```
## 3.2 容错机制:Checkpoint 与 Savepoint
Flink 通过 Checkpoint 机制实现故障容错,核心思想是**定期对状态进行快照(Snapshot)**,当任务故障时,从最近的 Checkpoint 恢复状态,保证数据处理的准确性。
### 3.2.1 Checkpoint 核心原理
1. 触发:由 JobManager 定期触发(可配置间隔),采用「异步快照」机制,不阻塞数据处理
2. 快照过程:基于 Chandy-Lamport 算法,从 Source 算子开始,向下游算子发送「屏障(Barrier)」,当算子收到所有输入流的屏障后,对状态进行快照并持久化到状态后端
3. 恢复:任务故障后,JobManager 重新部署任务,从状态后端读取最近的 Checkpoint 快照,恢复每个算子的状态,然后从 Checkpoint 对应的数据流位置继续处理
### 3.2.2 Checkpoint 配置
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. 启用 Checkpoint(间隔 5 秒)
env.enableCheckpointing(5000);
// 2. 配置 Checkpoint 模式(Exactly-Once/At-Least-Once)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 3. 配置超时时间(Checkpoint 执行超时 60 秒则失败)
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 4. 配置最大并发 Checkpoint 数(同一时间最多 1 个 Checkpoint 执行)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 5. 配置 Checkpoint 失败后的处理策略(失败不终止任务)
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
// 6. 配置 Checkpoint 持久化(任务取消后保留 Checkpoint,默认删除)
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
```
### 3.2.3 Checkpoint 与 Savepoint 的区别
|特性|Checkpoint|Savepoint|
|---|---|---|
|触发方式|自动触发(定期)|手动触发(用户主动)|
|用途|故障恢复(自动)|版本升级、任务迁移、手动恢复|
|生命周期|由 Flink 自动管理(过期删除)|用户手动管理(不自动删除)|
|创建命令|代码/配置启用,自动创建|./bin/flink savepoint |
# 四、时间与窗口:Flink 流处理的时序逻辑
流处理的核心是「时序性」,Flink 提供完善的时间语义和窗口机制,解决无界流的时序处理问题(如统计每 5 分钟的订单总量)。
## 4.1 时间语义:三种核心时间类型
Flink 支持三种时间语义,适配不同的业务场景:
1. Event Time(事件时间):
- 定义:事件产生的时间(如日志的 timestamp 字段)
- 核心优势:不受处理延迟、乱序影响,能准确反映事件的真实时序关系
- 适用场景:对时序准确性要求高的场景(如风控、实时统计)
- 关键配置:需要指定事件时间字段,并配置 Watermark 处理乱序数据
2. Processing Time(处理时间):
- 定义:数据被 Flink 算子处理时的系统时间
- 核心优势:简单、无额外开销(无需解析事件时间、生成 Watermark)
- 缺点:受处理延迟、集群负载影响,时序准确性差
- 适用场景:对时序准确性要求低的场景(如测试、实时监控粗略统计)
3. Ingestion Time(摄入时间):
- 定义:数据被 Flink Source 算子摄入的时间
- 特点:介于 Event Time 和 Processing Time 之间,比 Processing Time 准确,比 Event Time 简单
- 适用场景:无法获取事件时间,但需要一定时序准确性的场景
配置时间语义:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. 配置 Event Time(推荐)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 配置 Processing Time(默认)
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 3. 配置 Ingestion Time
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
```
## 4.2 Watermark:乱序数据的时序对齐机制
Event Time 场景下,数据可能因网络延迟、分布式传输等原因乱序到达(即后产生的事件先被处理)。Watermark 是 Flink 解决乱序问题的核心机制,本质是「一个带有时间戳的标记」,用于告知 Flink :「小于该时间戳的事件都已到达,后续不会再出现」。
### 4.2.1 Watermark 的核心规则
- Watermark 时间戳 = 最大事件时间 - 乱序容忍度(允许延迟的时间)
- 当 Watermark 时间戳 >= 窗口结束时间时,触发窗口计算
- Watermark 由 Source 算子生成,或通过 assignTimestampsAndWatermarks 算子生成
### 4.2.2 Watermark 生成方式
Flink 提供两种内置的 Watermark 生成器,也支持自定义:
1. 周期性 Watermark 生成器(Periodic Watermarks):
`DataStream logStream = env.addSource(new KafkaSource());
// 提取事件时间字段,并生成 Watermark
DataStream eventTimeStream = logStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()) // 提取事件时间(毫秒级)
);`
- 特点:按固定周期(默认 200ms)生成 Watermark,适用于事件均匀产生的场景
- 示例(处理乱序数据,容忍延迟 5 秒):
2. 断点式 Watermark 生成器(Punctuated Watermarks):
`WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((ctx) ->
new PunctuatedWatermarkGenerator() {
@Override
public void onEvent(LogEvent event, long eventTimestamp, PunctuatedWatermarkOutput output) {
// 当事件类型为 "MARKER" 时,生成 Watermark
if ("MARKER".equals(event.getType())) {
output.emitWatermark(new Watermark(eventTimestamp));
}
}
@Override
public void onPeriodicEmit(PunctuatedWatermarkOutput output) {
// 断点式生成器无需周期性触发,空实现
}
})
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream eventTimeStream = logStream.assignTimestampsAndWatermarks(watermarkStrategy);`
- 特点:基于特定事件触发 Watermark 生成(如遇到特定标记事件),适用于事件非均匀产生的场景
- 示例(遇到标记事件时生成 Watermark):
## 4.3 窗口机制:无界流的分段处理
窗口(Window)是将无界流划分为有界流的核心手段,Flink 按不同维度将窗口分为多种类型,适配不同的业务需求。
### 4.3.1 窗口的核心分类(按划分方式)
1. Time Window(时间窗口):按时间划分窗口,最常用
- Tumbling Time Window(滚动时间窗口):
`DataStream> resultStream = inputStream
.keyBy(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5))) // 滚动事件时间窗口
.sum(1);`
- 特点:窗口不重叠、连续,每个窗口有固定大小(如每 5 分钟一个窗口)
- 适用场景:统计固定时间段内的聚合结果(如每小时订单总量)
- 示例(5 分钟滚动窗口,Event Time):
- Sliding Time Window(滑动时间窗口):
`DataStream> resultStream = inputStream
.keyBy(tuple -> tuple.f0)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5))) // 滑动事件时间窗口
.sum(1);`
- 特点:窗口有重叠,由窗口大小和滑动步长决定(如窗口大小 10 分钟,滑动步长 5 分钟,每 5 分钟生成一个窗口,每个窗口重叠 5 分钟)
- 适用场景:需要连续统计的场景(如每 5 分钟统计最近 10 分钟的订单总量)
- 示例(10 分钟窗口,5 分钟滑动,Event Time):
- Session Window(会话窗口):
`DataStream> resultStream = inputStream
.keyBy(tuple -> tuple.f0)
.window(EventTimeSessionWindows.withGap(Time.seconds(30))) // 会话窗口
.sum(1);`
- 特点:基于会话间隔划分,当两个事件的时间间隔超过会话间隔时,开启新窗口(无固定大小)
- 适用场景:统计用户会话内的行为(如用户一次浏览会话的点击量,会话间隔 30 秒)
- 示例(会话间隔 30 秒,Event Time):
2. Count Window(计数窗口):按数据量划分窗口
`DataStream> resultStream = inputStream
.keyBy(tuple -> tuple.f0)
.countWindow(10) // 滚动计数窗口
.sum(1);`
- Tumbling Count Window(滚动计数窗口):每 N 个元素一个窗口(如每 100 个元素统计一次)
- Sliding Count Window(滑动计数窗口):每 M 个元素滑动一次,窗口大小 N 个元素(如每 50 个元素统计最近 100 个元素)
- 示例(滚动计数窗口,每 10 个元素一个窗口):
### 4.3.2 窗口函数:窗口内数据的处理逻辑
窗口函数定义了窗口触发后如何处理窗口内的数据,Flink 支持三种核心窗口函数:
1. 增量聚合函数(Incremental Aggregation Functions):
`DataStream> resultStream = inputStream
.keyBy(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
// 对相同 key 的两个元素聚合(计数求和)
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});`
- 特点:窗口内数据到来时实时聚合,只保存聚合结果(不保存原始数据),效率高
- 常见函数:sum、min、max、reduce、aggregate
- 示例(自定义 reduce 函数,统计窗口内单词出现次数):
2. 全窗口函数(Full Window Functions):
`DataStream resultStream = inputStream
.keyBy(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new ProcessWindowFunction, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable> elements, Collector out) throws Exception {
// 统计窗口内元素个数(即单词出现次数)
int count = 0;
for (Tuple2 element : elements) {
count++;
}
// 获取窗口元数据(开始时间、结束时间)
TimeWindow window = context.window();
long start = window.getStart();
long end = window.getEnd();
// 输出结果
out.collect("Key: " + key + ", 窗口时间: [" + start + "," + end + "), 出现次数: " + count);
}
});`
- 特点:窗口触发后,一次性处理窗口内的所有原始数据,灵活性高(可实现复杂逻辑)
- 常见函数:apply(通用全窗口函数)、ProcessWindowFunction(更强大,支持访问窗口元数据、状态)
- 缺点:需要保存窗口内所有原始数据,内存开销大
- 示例(ProcessWindowFunction,统计窗口内单词出现次数及窗口信息):
3. 增量聚合 + 全窗口函数(组合使用):
`DataStream resultStream = inputStream
.keyBy(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce(
new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
},
new ProcessWindowFunction, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable> elements, Collector out) throws Exception {
Tuple2 result = elements.iterator().next();
TimeWindow window = context.window();
out.collect("Key: " + key + ", 窗口时间: [" + window.getStart() + "," + window.getEnd() + "), 出现次数: " + result.f1);
}
}
);`
- 特点:结合两者优势,先通过增量聚合函数聚合数据(减少内存开销),再通过全窗口函数处理聚合结果(获取窗口元数据)
- 示例:
# 五、生产落地:部署、优化与监控
## 5.1 部署模式:Flink 集群部署方案
Flink 支持多种部署模式,适配不同的集群环境和资源管理方案:
### 5.1.1 Standalone 模式(独立集群)
最基础的部署模式,不依赖外部资源管理系统,适用于测试/小型生产环境:
1. 集群规划:1 个 JobManager + N 个 TaskManager
2. 配置步骤:
- 修改 flink-conf.yaml:配置 jobmanager.rpc.address(JobManager 地址)、taskmanager.numberOfTaskSlots(每个 TaskManager 的 Slot 数)
- 修改 slaves 文件:添加所有 TaskManager 的主机名/IP
- 分发配置文件到所有节点:scp -r flink-1.17.0/ node2:/usr/local/
- 启动集群:./bin/start-cluster.sh
3. 提交任务:./bin/flink run -m node1:8081 -p 4 xxx.jar(-m 指定 JobManager 地址)
### 5.1.2 YARN 模式(推荐生产环境)
依赖 Hadoop YARN 进行资源管理,支持动态资源分配,适配大规模集群:
1. 核心优势:
- 资源动态分配:根据任务需求自动申请/释放资源
- 高可用:借助 YARN 的资源管理能力,支持 JobManager 高可用部署
- 集成 Hadoop 生态:方便读取 HDFS、HBase 等数据
2. 部署类型:
- Session Mode(会话模式):启动一个 YARN Session,多个任务共享资源(适用于小任务)
- Per-Job Mode(单任务模式):每个任务启动一个独立的 YARN Session,资源隔离性好(适用于大任务,推荐)
- Application Mode(应用模式):任务直接在 YARN 上运行,无需客户端提交(减少客户端压力