1 环境说明与准备

CPU:12th Gen Intel(R) Core(TM) i7-12700F
内存:32GB 3600MT/s
操作系统:Windows11
VMware: VMware® Workstation 17 Pro
处理器:4 (处理器数量1,每个处理器内核数量4,处理器内核总数4)
内存:4GB
存储:30GB
操作系统:Ubuntu 24.04.3 LTS
网络适配器:NAT
组件:Apache Flume 1.9.0
      JDK 1.8.0_171

本实验基于 Hadoop 分布式集群环境
Flume 用于实现日志采集与数据传输管道(Data Ingestion Pipeline)

启动三台虚拟机(master、slave1、slave2),确保 Hadoop 集群环境正常运行。
本实验仅需在 master 节点完成 Flume 的安装与配置,其余节点无需进行额外操作。

实验全程在 root 用户下进行


Flume 基本作用

Flume 是一个分布式日志采集系统,其核心能力:

  • 数据采集(Source)

  • 数据缓冲(Channel)

  • 数据输出(Sink)

典型链路:数据源 → Source → Channel → Sink → 存储系统(HDFS)

1.1 上传并解压组件

cd /root

apache-flume-1.9.0-bin.tar.gz上传至/root 目录并解压:

tar -xzvf apache-flume-1.9.0-bin.tar.gz

1.2 进入配置目录

cd /root/apache-flume-1.9.0-bin/conf/

2 环境变量配置

2.1 配置 flume-env.sh 文件

cp flume-env.sh.template flume-env.sh
vim flume-env.sh
------
export JAVA_HOME=/root/jdk1.8.0_171
------

2.2 配置全局环境变量

vim /root/.bashrc
------
# Flume
export FLUME_HOME=/root/apache-flume-1.9.0-bin
export PATH=$FLUME_HOME/bin:$PATH
------

应用配置

source /root/.bashrc

验证 Flume

flume-ng -version

✔ 正常输出版本信息即为成功


3 Flume 配置

创建 flume.conf 配置文件并编辑内容:

vim flume.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------

架构说明

该配置构成最小 Flume 数据链路:Netcat → Memory Channel → Logger Sink

用于验证 Flume 基本数据流机制


4 启动 Flume

flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/flume.conf \
-n a1 \
-Dflume.root.logger=INFO,console

5 数据验证(关键)

5.1 发送数据

新开终端执行并输入内容:

telnet localhost 44444
Hello World

验证,在 Flume 启动窗口中查看:

Event: { headers:{} body: Hello World }

Event 结构说明

Flume 中 Event 是最小数据单元:

headers:元数据(默认可为空)

body:实际数据内容


6 Source 组件应用

任务1:采集本地目录 /opt/logs 数据并输出到 Logger

6.1 配置文件

vim spool-memory-logger.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/logs
a1.sources.r1.fileHeader = false

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------

6.2 创建监听目录

mkdir -p /opt/logs

6.3 启动 Flume

flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/spool-memory-logger.conf \
-n a1 \
-Dflume.root.logger=INFO,console

测试:

vim nametest.txt
------
VincentCassano
------

将文件放入监听目录:

cp nametest.txt /opt/logs

7 Channel 组件应用

任务2:使用 File Channel 实现本地磁盘缓存,从 44444 端口采集数据到 Logger

7.1 配置文件

vim netcat-file-logger.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/flume/chk
a1.channels.c1.dataDirs = /root/flume/data

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------

7.2 创建缓存目录

mkdir -p /root/flume/chk /root/flume/data

7.3 启动 Flume

flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/netcat-file-logger.conf \
-n a1 \
-Dflume.root.logger=INFO,console

测试:

telnet localhost 44444
Hello World

File Channel 机制说明

数据先写磁盘

支持断点恢复

比 Memory Channel 更安全


8 Sink 组件应用

任务3:Netcat → Memory Channel → HDFS Sink

8.1 配置文件

vim netcat-memory-hdfs.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/root/flume/netcat_data
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 5

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------

8.2 创建 HDFS 目录

hdfs dfs -mkdir -p /root/flume/netcat_data

8.3 启动 Flume

flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/netcat-memory-hdfs.conf \
-n a1 \
-Dflume.root.logger=INFO,console

测试:

telnet localhost 44444
Hello World

验证:

访问http://<master-ip>:50070 ,点击Utilities 选择 Browse the file system 前往/root/flume/netcat_data目录,查看生成文件内容即可验证数据写入成功。

HDFS Sink 机制说明

数据按批次写入HDFS

依赖滚动策略(roll)

属于最终持久化层


9 本地文件 → HDFS 全链路采集

任务实现:Spooldir → File Channel → HDFS Sink

9.1 配置文件

vim spool-file-hdfs.conf
------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/logs
a1.sources.r1.fileHeader = false
a1.sources.r1.batchSize = 250

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/root/flume/logs/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = access_log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 100

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/flume/chk
a1.channels.c1.dataDirs = /root/flume/data

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
------

9.2 创建 HDFS 目录

hdfs dfs -mkdir -p /root/flume/logs/

9.3 启动 Flume

flume-ng agent \
-c /root/apache-flume-1.9.0-bin/conf/ \
-f /root/apache-flume-1.9.0-bin/conf/spool-file-hdfs.conf \
-n a1 \
-Dflume.root.logger=INFO,console

9.4 测试数据写入

echo "log1" > /opt/logs/1.log
echo "log2" > /opt/logs/2.log

# 等待至少 1 分钟(rollInterval 控制滚动)
echo "log3" > /opt/logs/3.log
echo "log4" > /opt/logs/4.log

总体说明

本实验通过 Flume 分别实现了:

  • 本地文件采集(Spooldir Source)

  • 网络数据采集(Netcat Source)

  • 多种 Channel 缓存机制(Memory / File)

  • HDFS 持久化存储(HDFS Sink)

构建了完整的数据采集与传输链路,为后续大数据日志处理提供基础。