找回密码
 立即注册
首页 业界区 业界 从零开始学Flink:揭开实时计算的神秘面纱 ...

从零开始学Flink:揭开实时计算的神秘面纱

诘琅 昨天 16:35
一、为什么需要Flink?

当你在电商平台秒杀商品时,1毫秒的延迟可能导致交易失败;当自动驾驶汽车遇到障碍物时,10毫秒的计算延迟可能酿成事故。这些场景揭示了一个残酷事实:数据的价值随时间呈指数级衰减。
传统批处理(如Hadoop)像老式火车,必须等所有乘客(数据)到齐才能发车;而流处理(如Flink)如同磁悬浮列车,每个乘客(数据)上车即刻出发。Flink的诞生,让数据从"考古材料"变为"新鲜血液"。
二、初识Flink

1. 定义

Apache Flink是由德国柏林工业大学于2009年启动的研究项目,2014年进入Apache孵化器,现已成为实时计算领域的事实标准。其核心能力可用一句话概括:对无界和有界数据流进行有状态计算。
2. 核心特性

流处理优先:批处理是流处理的特例(有界数据流)
事件时间语义:按数据真实发生时间处理(而非系统接收时间)
精确一次语义:确保计算结果100%准确
亚秒级延迟:处理延迟可控制在毫秒级
3. 技术架构

Flink运行时架构包含三个关键角色:

  • JobManager:大脑中枢,负责任务调度与检查点管理
  • TaskManager:肌肉组织,执行具体计算任务
  • Dispatcher:网关系统,提供REST接口提交作业
三、环境搭建

环境要求

​1. ​Windows 10 2004 或更高版本​​(建议使用 Windows 11)
​2. ​已启用 WSL 2​​
3. 存储空间:至少 1GB 可用空间
详细安装步骤

步骤 1:启用 WSL

在 PowerShell 中以管理员身份运行以下命令:
  1.   # 启用 WSL 功能
  2.   dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart
  3.   # 启用虚拟机平台
  4.   dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart
  5.   # 设置 WSL 2 为默认版本
  6.   wsl --set-default-version 2
  7.   # 重启电脑(必须步骤)
复制代码
步骤 2:安装 Ubuntu

​1. 打开 Microsoft Store
​2. 搜索安装 ​​Ubuntu 22.04 LTS​​
3. 启动 Ubuntu 并创建用户名和密码
步骤 3:安装 Java 17

在 Ubuntu 终端执行:
  1.   # 更新软件包列表
  2.   sudo apt update
  3.   # 安装 Java 17
  4.   sudo apt install -y openjdk-17-jdk
  5.   # 设置环境变量
  6.   echo 'export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64' >>  /etc/profile
  7.   echo 'export PATH=$PATH:$JAVA_HOME/bin' >> /etc/profile
  8.   source /etc/profile
  9.   # 验证安装
  10.   java -version
  11.   # 应显示类似:OpenJDK Runtime Environment (build 17.0.14+...)
复制代码
步骤 4:下载并安装 Flink 1.20.1
  1.   # 下载 Flink
  2.   wget https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz
  3.   # 解压安装包
  4.   tar xzf flink-1.20.1-bin-scala_2.12.tgz
  5.   # 移动到安装目录
  6.   sudo mv flink-1.20.1 /opt/flink
  7.   # 设置环境变量
  8.   echo 'export FLINK_HOME=/opt/flink' >>  /etc/profile
  9.   echo 'export PATH=$PATH:$FLINK_HOME/bin' >> /etc/profile
  10.   source /etc/profile
复制代码
步骤 5:修改内存配置

编辑配置文件:
  1. vi /opt/flink/conf/conf.yaml
复制代码
修改以下关键参数:
  1.   jobmanager:
  2.     bind-host: 0.0.0.0
  3.     rpc:
  4.       address: localhost
  5.       port: 6123
  6.     memory:
  7.       process:
  8.         size: 1600m
  9.     execution:
  10.       failover-strategy: region
  11.   taskmanager:
  12.     bind-host: 0.0.0.0
  13.     host: localhost
  14.     numberOfTaskSlots: 2
  15.     memory:
  16.       process:
  17.         size: 2048m
  18.   parallelism:
  19.     default: 2
  20.   
  21.   rest:
  22.     address: localhost
  23.     bind-address: 0.0.0.0
  24.     port: 8081
复制代码
步骤 6:启动 Flink 集群
  1. # 启动集群(JobManager + TaskManager)
  2. $FLINK_HOME/bin/start-cluster.sh
  3. # 检查运行状态
  4. jps
复制代码
步骤 7:访问 Web UI

在 Windows 浏览器中访问:
http://localhost:8081
四、实战第一个Flink程序:BatchWordCount

下面将详细介绍如何在Flink环境中创建并运行第一个WordCount程序。这个经典示例将带你从项目创建到代码执行,全面体验Flink开发流程。
项目结构设计

采用多模块Gradle项目,结构清晰:
  1.   flink-learning/
  2.   ├── build.gradle                 # 根项目构建配置
  3.   ├── settings.gradle              # 多模块配置
  4.   ├── libraries.gradle            # 依赖统一管理
  5.   ├── data/                        # 数据文件夹
  6.   │   ├── input.txt               # 输入文件
  7.   │   └── output.txt              # 输出文件
  8.   └── wordcount/                  # WordCount模块
  9.       ├── build.gradle            # 模块构建配置
  10.       └── src/main/java           # 源代码目录
  11.           └── cn/com/daimajiangxin/flink/wordcount
  12.               └── BatchWordCount.java # 主程序
复制代码
核心文件配置

详细配置参考代码仓库:https://gitee.com/daimajiangxin/flink-learning.git
WordCount代码实现
  1. package cn.com.daimajiangxin.flink.wordcount;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.api.common.functions.FlatMapFunction;
  5. import org.apache.flink.api.common.functions.ReduceFunction;
  6. import org.apache.flink.api.common.typeinfo.TypeHint;
  7. import org.apache.flink.api.common.typeinfo.TypeInformation;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.connector.file.src.FileSource;
  10. import org.apache.flink.connector.file.src.reader.TextLineFormat;
  11. import org.apache.flink.core.fs.Path;
  12. import org.apache.flink.streaming.api.datastream.DataStream;
  13. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.util.Collector;
  16. import java.nio.charset.StandardCharsets;
  17. import java.time.Duration;
  18. import java.util.Arrays;
  19. public class BatchWordCount {
  20.     public static void main(String[] args) throws Exception {
  21.         // 转换Windows路径格式
  22.         args = convertWindowsPaths(args);
  23.         
  24.         // 参数校验
  25.         if (args.length < 2) {
  26.             System.err.println("Usage: BatchWordCount <input> <output> [--parallelism=N]");
  27.             System.err.println("Example: BatchWordCount input.txt output.txt --parallelism=4");
  28.             System.exit(1);
  29.         }
  30.         final String inputPath = args[0];
  31.         final String outputPath = args[1];
  32.         int parallelism = 1; // 默认并行度
  33.         
  34.         // 1. 创建流批一体执行环境
  35.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  36.         // 明确指定批处理模式
  37.         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  38.         // 设置并行度和作业名称
  39.         env.setParallelism(parallelism);
  40.         env.getConfig().enableObjectReuse();
  41.         // 2. 使用最新的FileSource API读取输入数据
  42.         DataStream<String> text = createFileSource(env, inputPath, parallelism);
  43.         // 3. 定义处理逻辑
  44.         SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text
  45.                 .flatMap(new Tokenizer())
  46.                 .name("Tokenizer")
  47.                 .setParallelism(parallelism)
  48.                 .keyBy(value -> value.f0)
  49.                 .reduce(new SumReducer())
  50.                 .name("SumReducer")
  51.                 .setParallelism(parallelism)
  52.                 .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  53.         // 4. 输出结果到文件
  54.         counts.writeAsText(outputPath)
  55.                 .name("FileSink")
  56.                 .setParallelism(1);
  57.         // 5. 执行作业
  58.         try {
  59.             System.out.println("Starting Flink WordCount job...");
  60.             System.out.println("Input path: " + inputPath);
  61.             System.out.println("Output path: " + outputPath);
  62.             System.out.println("Parallelism: " + parallelism);
  63.             env.execute("Flink Batch WordCount Example");
  64.             System.out.println("Job completed successfully!");
  65.         } catch (Exception e) {
  66.             System.err.println("Job execution failed: " + e.getMessage());
  67.             e.printStackTrace();
  68.         }
  69.     }
  70.     // Windows路径转换
  71.     private static String[] convertWindowsPaths(String[] args) {
  72.         if (args.length >= 1) {
  73.             args[0] = "file:///" + args[0]
  74.                 .replace("\", "/")
  75.                 .replace(" ", "%20");
  76.         }
  77.         if (args.length >= 2) {
  78.             args[1] = "file:///" + args[1]
  79.                 .replace("\", "/")
  80.                 .replace(" ", "%20");
  81.         }
  82.         return args;
  83.     }
  84.     // 创建文件源
  85.     private static DataStream<String> createFileSource(
  86.             StreamExecutionEnvironment env,
  87.             String path,
  88.             int parallelism) {
  89.         // 使用file://前缀
  90.         Path filePath = new Path(path);
  91.         
  92.         System.out.println("Loading file from: " + filePath);
  93.         
  94.         TextLineFormat format = new TextLineFormat(StandardCharsets.UTF_8);
  95.         
  96.         FileSource<String> fileSource = FileSource
  97.                 .forRecordStreamFormat(format, filePath)
  98.                 .build();
  99.         
  100.         WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
  101.                 .<String>forMonotonousTimestamps()
  102.                 .withIdleness(Duration.ofSeconds(10));
  103.         
  104.         return env.fromSource(
  105.                 fileSource,
  106.                 watermarkStrategy,
  107.                 "FileSource"
  108.         )
  109.         .name("FileSource")
  110.         .setParallelism(1);
  111.     }
  112.     // 分词器
  113.     public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  114.         @Override
  115.         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  116.             // 过滤空行
  117.             if (value == null || value.trim().isEmpty()) return;
  118.             
  119.             // 转换为小写并分割单词
  120.             String[] words = value.toLowerCase().split("\\W+");
  121.             
  122.             for (String word : words) {
  123.                 if (!word.isEmpty()) {
  124.                     out.collect(Tuple2.of(word, 1));
  125.                 }
  126.             }
  127.         }
  128.     }
  129.     // 累加器
  130.     public static final class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
  131.         @Override
  132.         public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) {
  133.             return Tuple2.of(v1.f0, v1.f1 + v2.f1);
  134.         }
  135.     }
  136. }
复制代码
输入文件示例 (input.txt)

input.txt参考代码仓库:https://gitee.com/daimajiangxin/flink-learning.git
运行Flink作业

这里讲述在IDEA中运行刚刚写的BatchWordCount 任务,配置IDEA的APPlication。
VM选项配置
  1.   --add-exports=java.base/sun.net.util=ALL-UNNAMED
  2.   --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
  3.   --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
  4.   --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
  5.   --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
  6.   --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
  7.   --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
  8.   --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED
  9.   --add-opens=java.base/java.lang=ALL-UNNAMED
  10.   --add-opens=java.base/java.net=ALL-UNNAMED
  11.   --add-opens=java.base/java.io=ALL-UNNAMED
  12.   --add-opens=java.base/java.nio=ALL-UNNAMED
  13.   --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
  14.   --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
  15.   --add-opens=java.base/java.text=ALL-UNNAMED
  16.   --add-opens=java.base/java.time=ALL-UNNAMED
  17.   --add-opens=java.base/java.util=ALL-UNNAMED
  18.   --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
  19.   --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
  20.   --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
复制代码
程序参数
  1. 代码放置路径\\flink-learning\\data\\input.txt
  2. 代码放置路径\bigdata\\flink-learning\\data\\output.txt
复制代码
运行BatchWordCount类

Run 或者Debug BatchWordCount的 APPlication.
1.png

预期输出

运行成功data目录下会生成output的文件。
  1. (processing,1)
  2. (batch,2)
  3. (flink,2)
  4. (hello,2)
复制代码
2.png

五、技术要点解析


  • 流批一体API:Flink 1.20+使用StreamExecutionEnvironment统一处理批流
  • 文件源:使用FileSource API
  • 精确一次处理:批处理天然支持Exactly-Once语义
  • 并行度控制:通过setParallelism控制任务并行度
  • Windows路径适配:统一转换为file:///开头的URI格式
六、学习路线建议

完成WordCount后,可逐步探索:

  • 实时流处理(SocketWordCount)
  • 状态管理(StatefulProcessing)
  • 事件时间处理(EventTimeProcessing)
  • 窗口计算(TumblingWindow、SlidingWindow)
  • CEP复杂事件处理
  • Table API和SQL
    通过这个完整的BatchWordCount实例,你已经掌握了Flink项目的搭建、编码和运行全流程。随着Flink在实时数据处理领域的广泛应用,这些技能将成为大数据开发的宝贵资产。
源文来自:http://blog.daimajiangxin.com.cn
源码地址:https://gitee.com/daimajiangxin/flink-learning

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册