Flume 数据采集系统安装与基础应用实践
1 环境说明与准备
CPU:12th Gen Intel(R) Core(TM) i7-12700F
内存:32GB 3600MT/s
操作系统:Windows11
VMware: VMware® Workstation 17 Pro处理器:4 (处理器数量1,每个处理器内核数量4,处理器内核总数4)
内存:4GB
存储:30GB
操作系统:Ubuntu 24.04.3 LTS
网络适配器:NAT
组件:Apache Flume 1.9.0
JDK 1.8.0_171本实验基于 Hadoop 分布式集群环境
Flume 用于实现日志采集与数据传输管道(Data Ingestion Pipeline)
启动三台虚拟机(master、slave1、slave2),确保 Hadoop 集群环境正常运行。
本实验仅需在 master 节点完成 Flume 的安装与配置,其余节点无需进行额外操作。
实验全程在 root 用户下进行
Flume 基本作用
Flume 是一个分布式日志采集系统,其核心能力:
数据采集(Source)
数据缓冲(Channel)
数据输出(Sink)
典型链路:数据源 → Source → Channel → Sink → 存储系统(HDFS)
1.1 上传并解压组件
cd /root将apache-flume-1.9.0-bin.tar.gz上传至/root 目录并解压:
tar -xzvf apache-flume-1.9.0-bin.tar.gz1.2 进入配置目录
cd /root/apache-flume-1.9.0-bin/conf/2 环境变量配置
2.1 配置 flume-env.sh 文件
cp flume-env.sh.template flume-env.sh
vim flume-env.sh
------
export JAVA_HOME=/root/jdk1.8.0_171
------2.2 配置全局环境变量
vim /root/.bashrc
------
# Flume
export FLUME_HOME=/root/apache-flume-1.9.0-bin
export PATH=$FLUME_HOME/bin:$PATH
------应用配置
source /root/.bashrc验证 Flume
flume-ng -version✔ 正常输出版本信息即为成功
3 Flume 配置
创建 flume.conf 配置文件并编辑内容:
vim flume.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------架构说明
该配置构成最小 Flume 数据链路:Netcat → Memory Channel → Logger Sink
用于验证 Flume 基本数据流机制
4 启动 Flume
flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/flume.conf \
-n a1 \
-Dflume.root.logger=INFO,console5 数据验证(关键)
5.1 发送数据
新开终端执行并输入内容:
telnet localhost 44444
Hello World验证,在 Flume 启动窗口中查看:
Event: { headers:{} body: Hello World }Event 结构说明
Flume 中 Event 是最小数据单元:
headers:元数据(默认可为空)
body:实际数据内容
6 Source 组件应用
任务1:采集本地目录 /opt/logs 数据并输出到 Logger
6.1 配置文件
vim spool-memory-logger.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/logs
a1.sources.r1.fileHeader = false
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------6.2 创建监听目录
mkdir -p /opt/logs6.3 启动 Flume
flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/spool-memory-logger.conf \
-n a1 \
-Dflume.root.logger=INFO,console测试:
vim nametest.txt
------
VincentCassano
------将文件放入监听目录:
cp nametest.txt /opt/logs7 Channel 组件应用
任务2:使用 File Channel 实现本地磁盘缓存,从 44444 端口采集数据到 Logger
7.1 配置文件
vim netcat-file-logger.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/flume/chk
a1.channels.c1.dataDirs = /root/flume/data
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------7.2 创建缓存目录
mkdir -p /root/flume/chk /root/flume/data7.3 启动 Flume
flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/netcat-file-logger.conf \
-n a1 \
-Dflume.root.logger=INFO,console测试:
telnet localhost 44444
Hello WorldFile Channel 机制说明
数据先写磁盘
支持断点恢复
比 Memory Channel 更安全
8 Sink 组件应用
任务3:Netcat → Memory Channel → HDFS Sink
8.1 配置文件
vim netcat-memory-hdfs.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/root/flume/netcat_data
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 5
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------8.2 创建 HDFS 目录
hdfs dfs -mkdir -p /root/flume/netcat_data8.3 启动 Flume
flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/netcat-memory-hdfs.conf \
-n a1 \
-Dflume.root.logger=INFO,console测试:
telnet localhost 44444
Hello World验证:
访问http://<master-ip>:50070 ,点击Utilities 选择 Browse the file system 前往/root/flume/netcat_data目录,查看生成文件内容即可验证数据写入成功。
HDFS Sink 机制说明
数据按批次写入HDFS
依赖滚动策略(roll)
属于最终持久化层
9 本地文件 → HDFS 全链路采集
任务实现:Spooldir → File Channel → HDFS Sink
9.1 配置文件
vim spool-file-hdfs.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/logs
a1.sources.r1.fileHeader = false
a1.sources.r1.batchSize = 250
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/root/flume/logs/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = access_log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 100
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/flume/chk
a1.channels.c1.dataDirs = /root/flume/data
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------9.2 创建 HDFS 目录
hdfs dfs -mkdir -p /root/flume/logs/9.3 启动 Flume
flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/spool-file-hdfs.conf \
-n a1 \
-Dflume.root.logger=INFO,console9.4 测试数据写入
echo "log1" > /opt/logs/1.log
echo "log2" > /opt/logs/2.log
# 等待至少 1 分钟(rollInterval 控制滚动)
echo "log3" > /opt/logs/3.log
echo "log4" > /opt/logs/4.log总体说明
本实验通过 Flume 分别实现了:
本地文件采集(Spooldir Source)
网络数据采集(Netcat Source)
多种 Channel 缓存机制(Memory / File)
HDFS 持久化存储(HDFS Sink)
构建了完整的数据采集与传输链路,为后续大数据日志处理提供基础。