Hadoop快速入门与实战指南

1. Hadoop生态系统全景图

1.1 核心组件架构

Hadoop生态系统采用主从架构模式,通过**HDFS(Hadoop Distributed File System)**实现分布式存储,**YARN(Yet Another Resource Negotiator)**负责资源调度,MapReduce提供分布式计算框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
graph TD
A[Client Application] --> B[YARN ResourceManager]
B --> C[NodeManager Node1]
B --> D[NodeManager Node2]
B --> E[NodeManager Node3]

C --> F[HDFS DataNode1]
D --> G[HDFS DataNode2]
E --> H[HDFS DataNode3]

I[HDFS NameNode] --> F
I --> G
I --> H

J[Secondary NameNode] --> I

style A fill:#f9f,stroke:#333
style B fill:#bbf,stroke:#333
style I fill:#9f9,stroke:#333

1.2 核心组件详解

组件 角色定位 关键特性 生产配置建议
NameNode 元数据管理 管理文件系统命名空间 8-16GB内存,SSD存储
DataNode 数据存储 实际存储数据块 4-8TB磁盘,千兆网络
ResourceManager 全局资源调度 管理集群资源分配 高可用配置,Zookeeper
NodeManager 节点资源管理 监控容器资源使用 动态资源调整

1.3 数据存储原理

HDFS采用块存储机制,默认块大小128MB,通过副本机制保证数据可靠性:

1
2
# 查看文件块信息
hdfs fsck /user/data/largefile.txt -files -blocks -locations

副本放置策略

  • 第1个副本:本地节点
  • 第2个副本:不同机架节点
  • 第3个副本:与第2个副本同机架的不同节点

2. 环境搭建实战

2.1 单机模式部署

2.1.1 前置依赖安装

1
2
3
4
5
6
7
# Ubuntu/Debian系统
sudo apt update
sudo apt install openjdk-8-jdk ssh pdsh

# 验证Java安装
java -version
ssh localhost # 测试SSH免密登录

2.1.2 Hadoop安装配置

1
2
3
4
5
6
7
8
9
10
# 下载并解压
cd /opt
sudo wget https://downloads.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
sudo tar -xzf hadoop-3.3.4.tar.gz
sudo mv hadoop-3.3.4 hadoop

# 配置环境变量
echo 'export HADOOP_HOME=/opt/hadoop' >> ~/.bashrc
echo 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin' >> ~/.bashrc
source ~/.bashrc

2.1.3 核心配置文件

core-site.xml - 核心配置:

1
2
3
4
5
6
7
8
9
10
11
12
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
<description>默认文件系统URI</description>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop/tmp</value>
<description>临时目录</description>
</property>
</configuration>

hdfs-site.xml - HDFS配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
<description>单机模式副本数设为1</description>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/hadoop/data/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/hadoop/data/datanode</value>
</property>
</configuration>

2.2 伪分布式集群

2.2.1 配置文件调整

mapred-site.xml - MapReduce配置:

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property>
</configuration>

yarn-site.xml - YARN配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
</configuration>

2.2.2 集群启动验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 格式化NameNode
hdfs namenode -format

# 启动HDFS
start-dfs.sh

# 启动YARN
start-yarn.sh

# 验证服务状态
jps # 应看到NameNode, DataNode, ResourceManager, NodeManager

# Web界面访问
# NameNode: http://localhost:9870
# ResourceManager: http://localhost:8088

2.3 生产级集群配置

2.3.1 集群规划模板

节点类型 推荐配置 角色分配 网络要求
Master节点 16核32GB内存 NameNode+ResourceManager 万兆网络
Worker节点 8核16GB内存 DataNode+NodeManager 千兆网络
边缘节点 4核8GB内存 客户端访问 千兆网络

2.3.2 高可用配置

hdfs-site.xml高可用配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>master1:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>master2:8020</value>
</property>

3. HDFS操作实战

3.1 文件系统基础操作

3.1.1 文件和目录管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 创建目录
hdfs dfs -mkdir /user/hadoop/data
hdfs dfs -mkdir -p /user/hadoop/input/2024

# 上传文件
hdfs dfs -put localfile.txt /user/hadoop/data/
hadoop fs -copyFromLocal localfile.txt /user/hadoop/data/

# 下载文件
hdfs dfs -get /user/hadoop/data/file.txt ./local_copy.txt
hdfs dfs -copyToLocal /user/hadoop/data/file.txt ./

# 查看文件内容
hdfs dfs -cat /user/hadoop/data/file.txt
hdfs dfs -tail -f /user/hadoop/logs/app.log # 实时查看

# 文件列表
hdfs dfs -ls /user/hadoop/data
hdfs dfs -ls -R /user/hadoop/ # 递归查看

3.1.2 高级文件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 文件统计信息
hdfs dfs -du -h /user/hadoop/data/
hadoop fs -count /user/hadoop/data/

# 文件权限管理
hdfs dfs -chmod 755 /user/hadoop/data/
hadoop fs -chown hadoop:hadoop /user/hadoop/data/file.txt
hdfs dfs -chgrp hadoop /user/hadoop/data/

# 文件移动和重命名
hdfs dfs -mv /user/hadoop/data/old.txt /user/hadoop/data/new.txt
hdfs dfs -cp /user/hadoop/data/file.txt /user/hadoop/backup/

# 删除操作
hdfs dfs -rm /user/hadoop/data/temp.txt
hdfs dfs -rm -r /user/hadoop/data/old_directory/
hdfs dfs -rm -skipTrash /user/hadoop/data/large_file.txt # 直接删除

3.2 数据备份与恢复

3.2.1 DistCp分布式拷贝

1
2
3
4
5
6
7
8
9
10
11
12
# 集群间数据迁移
hadoop distcp hdfs://source-cluster:8020/user/data \
hdfs://target-cluster:8020/user/data

# 带校验的拷贝
hadoop distcp -update -skipcrccheck -p \
hdfs://namenode1:8020/user/data \
hdfs://namenode2:8020/user/backup

# 限制并发任务数
hadoop distcp -m 10 -bandwidth 100 \
/user/hadoop/data /user/hadoop/backup

3.2.2 快照管理

1
2
3
4
5
6
7
8
9
10
11
12
# 创建快照目录
hdfs dfsadmin -allowSnapshot /user/hadoop/data

# 创建快照
hdfs dfs -createSnapshot /user/hadoop/data backup_20241225

# 查看快照
hdfs dfs -ls /user/hadoop/data/.snapshot/

# 恢复快照数据
hdfs dfs -cp /user/hadoop/data/.snapshot/backup_20241225/file.txt \
/user/hadoop/data/current/

3.3 存储策略优化

3.3.1 存储类型配置

1
2
3
4
5
6
7
8
9
10
# 设置存储策略
hdfs storagepolicies -setStoragePolicy -path /user/hadoop/archive -policy COLD

# 查看存储策略
hdfs storagepolicies -getStoragePolicy -path /user/hadoop/data/

# 常用策略说明:
# HOT: SSD存储,频繁访问
# WARM: 磁盘存储,中等访问
# COLD: 归档存储,低频访问

4. MapReduce编程模型深度解析

4.1 WordCount经典案例

4.1.1 Mapper实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.hadoop.tutorial;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

// 将输入行按空格分割
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase().replaceAll("[^a-zA-Z]", ""));
if (word.getLength() > 0) {
context.write(word, one);
}
}
}
}

4.1.2 Reducer实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.hadoop.tutorial;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

int sum = 0;
// 统计每个单词出现的总次数
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

4.1.3 Driver程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.hadoop.tutorial;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

// 设置作业名称
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountDriver.class);

// 设置Mapper和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class); // 本地聚合优化
job.setReducerClass(WordCountReducer.class);

// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 设置Reduce任务数
job.setNumReduceTasks(2);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

4.2 编译与运行

4.2.1 编译打包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 创建Maven项目结构
mvn archetype:generate -DgroupId=com.hadoop.tutorial \
-DartifactId=wordcount \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false

# 编译并打包
mvn clean package -DskipTests

# 上传到HDFS
hdfs dfs -mkdir /user/hadoop/input
hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml /user/hadoop/input/

# 运行作业
hadoop jar wordcount-1.0-SNAPSHOT.jar \
com.hadoop.tutorial.WordCountDriver \
/user/hadoop/input \
/user/hadoop/output/wordcount

4.2.2 作业监控

1
2
3
4
5
6
7
8
9
10
# 查看作业状态
mapred job -list
mapred job -status job_id

# 查看作业日志
yarn logs -applicationId application_id

# Web界面监控
# ResourceManager: http://localhost:8088
# JobHistory: http://localhost:19888

4.3 高级MapReduce案例

4.3.1 倒排索引构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private Text documentId = new Text();

@Override
protected void setup(Context context) {
// 获取输入文件名作为文档ID
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
documentId.set(fileName);
}

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String[] words = value.toString().split("\\s+");
for (String w : words) {
if (w.length() > 0) {
word.set(w.toLowerCase());
context.write(word, documentId);
}
}
}
}

4.3.2 二次排序实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SecondarySortMapper extends Mapper<LongWritable, Text, CompositeKey, IntWritable> {
private CompositeKey compositeKey = new CompositeKey();
private IntWritable value = new IntWritable();

@Override
public void map(LongWritable key, Text line, Context context)
throws IOException, InterruptedException {

String[] fields = line.toString().split(",");
compositeKey.set(fields[0], Integer.parseInt(fields[1]));
value.set(Integer.parseInt(fields[2]));
context.write(compositeKey, value);
}
}

5. 性能优化实战

5.1 数据本地化优化

5.1.1 本地化级别监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 查看作业本地化统计
mapred job -status job_id | grep "Data-local"

# 优化参数配置
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>256</value> <!-- 增加排序缓冲区 -->
</property>

<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value> <!-- Map任务内存 -->
</property>

<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value> <!-- Reduce任务内存 -->
</property>

5.1.2 机架感知配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 创建机架拓扑脚本
cat > /etc/hadoop/conf/rack-topology.sh << 'EOF'
#!/bin/bash
# 机架感知脚本
if [ $# -eq 0 ]; then
echo "/default-rack"
else
ip=$1
case $ip in
192.168.1.*) echo "/rack1" ;;
192.168.2.*) echo "/rack2" ;;
*) echo "/default-rack" ;;
esac
fi
EOF

chmod +x /etc/hadoop/conf/rack-topology.sh

# 在core-site.xml中配置
<property>
<name>net.topology.script.file.name</name>
<value>/etc/hadoop/conf/rack-topology.sh</value>
</property>

5.2 Combiner优化策略

5.2.1 Combiner适用场景分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 判断是否适合使用Combiner
public class CombinerAnalyzer {

public static boolean shouldUseCombiner(Job job) {
// 1. 聚合函数是否可交换结合
if (!isAssociative(job.getReducerClass())) {
return false;
}

// 2. 数据倾斜程度检查
long inputSize = getInputSize(job);
long uniqueKeys = estimateUniqueKeys(job);

// 如果唯一键值较少,Combiner效果有限
return (double)inputSize / uniqueKeys > 100;
}

private static boolean isAssociative(Class<?> reducerClass) {
return reducerClass.equals(IntSumReducer.class) ||
reducerClass.equals(LongSumReducer.class);
}
}

5.2.2 自定义Combiner实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class AverageCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

int sum = 0;
int count = 0;

// 局部聚合:计算部分和与计数
for (IntWritable val : values) {
sum += val.get();
count++;
}

// 输出中间结果
context.write(key, new IntWritable(sum));
context.write(new Text(key.toString() + "_count"), new IntWritable(count));
}
}

5.3 压缩算法选择与配置

5.3.1 压缩算法性能对比

算法 压缩比 压缩速度 解压速度 适用场景
Snappy 20-50% 非常快 非常快 实时处理
LZ4 20-50% 非常快 通用场景
Gzip 60-70% 中等 中等 归档存储
Bzip2 80-90% 中等 最大化压缩

5.3.2 生产级压缩配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<!-- mapred-site.xml压缩配置 -->
<configuration>
<!-- Map输出压缩 -->
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

<!-- 最终输出压缩 -->
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.Lz4Codec</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>BLOCK</value>
</property>
</configuration>

5.3.3 基准测试脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/bin/bash
# 压缩算法性能测试脚本

ALGORITHMS=("org.apache.hadoop.io.compress.DefaultCodec"
"org.apache.hadoop.io.compress.GzipCodec"
"org.apache.hadoop.io.compress.BZip2Codec"
"org.apache.hadoop.io.compress.SnappyCodec"
"org.apache.hadoop.io.compress.Lz4Codec")

TEST_FILE="/user/hadoop/test_data/large_file.txt"

for codec in "${ALGORITHMS[@]}"; do
echo "Testing codec: $codec"

# 运行WordCount作业
hadoop jar wordcount.jar \
-D mapreduce.output.fileoutputformat.compress=true \
-D mapreduce.output.fileoutputformat.compress.codec=$codec \
$TEST_FILE /user/hadoop/output/$(basename $codec)

# 记录结果
hdfs dfs -du -h /user/hadoop/output/$(basename $codec)
done

6. 常见问题排查与解决方案

6.1 NameNode故障诊断

6.1.1 NameNode启动失败

症状:NameNode无法启动,日志显示端口冲突

1
2
3
4
5
6
7
8
9
10
# 检查端口占用
netstat -tlnp | grep :8020
lsof -i :8020

# 解决方案:修改端口或释放端口
# 在hdfs-site.xml中修改
<property>
<name>dfs.namenode.rpc-address</name>
<value>namenode-host:8021</value>
</property>

6.1.2 安全模式问题

1
2
3
4
5
6
7
8
# 查看安全模式状态
hdfs dfsadmin -safemode get

# 强制退出安全模式(谨慎使用)
hadoop dfsadmin -safemode leave

# 自动退出条件检查
hdfs dfsadmin -report | grep "Safe mode is ON"

6.2 DataNode连接问题

6.2.1 网络连接诊断

1
2
3
4
5
6
7
8
# 检查DataNode注册状态
hdfs dfsadmin -report

# 详细网络诊断
hdfs dfsadmin -printTopology

# 节点健康检查
hdfs dfsadmin -checkHealth datanode-host:50010

6.2.2 磁盘空间问题

1
2
3
4
5
6
7
8
9
10
11
# 检查磁盘使用情况
hdfs dfsadmin -report | grep "DFS Used"

# 设置磁盘预留空间
<property>
<name>dfs.datanode.du.reserved</name>
<value>10737418240</value> <!-- 预留10GB -->
</property>

# 手动退役DataNode
hdfs dfsadmin -refreshNodes

6.3 MapReduce作业故障

6.3.1 内存溢出分析

1
2
3
4
5
6
7
8
9
10
11
12
# 查看作业日志
yarn logs -applicationId application_xxx -log_files stderr

# 内存配置调优
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx2048m</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx4096m</value>
</property>

6.3.2 数据倾斜检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 数据倾斜监控工具
public class SkewDetector {

public static class SkewMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text keyOut = new Text();
private IntWritable one = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) {
// 记录每个key的处理情况
String keyStr = extractKey(value);
keyOut.set(keyStr);
context.getCounter("Skew", keyStr).increment(1);
}
}

public static void analyzeSkew(Job job) {
Counters counters = job.getCounters();
long maxCount = 0;
long totalCount = 0;

for (CounterGroup group : counters) {
if (group.getName().equals("Skew")) {
for (Counter counter : group) {
maxCount = Math.max(maxCount, counter.getValue());
totalCount += counter.getValue();
}
}
}

double skewRatio = (double)maxCount / (totalCount / counters.countCounters());
System.out.println("数据倾斜比: " + skewRatio);
}
}

6.4 性能监控工具

6.4.1 集群监控脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/bin/bash
# Hadoop集群健康检查脚本

echo "=== Hadoop集群状态检查 ==="

# NameNode状态
if jps | grep -q NameNode; then
echo "✓ NameNode运行正常"
else
echo "✗ NameNode未运行"
fi

# DataNode状态
data_nodes=$(hdfs dfsadmin -report | grep "Live datanodes" | awk '{print $3}')
echo "活跃DataNode数量: $data_nodes"

# 磁盘使用率
hdfs dfsadmin -report | grep "DFS Used%" | head -5

# YARN状态
if jps | grep -q ResourceManager; then
echo "✓ ResourceManager运行正常"
active_nodes=$(yarn node -list | grep -c RUNNING)
echo "活跃NodeManager数量: $active_nodes"
else
echo "✗ ResourceManager未运行"
fi

# 内存使用情况
free -h

6.4.2 日志分析工具

1
2
3
4
5
6
7
8
9
10
11
# 日志聚合配置
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

# 查看聚合日志
yarn logs -applicationId application_xxx

# 实时监控日志
tail -f $HADOOP_HOME/logs/hadoop-*.log

7. 电商日志分析实战案例

7.1 业务场景设计

7.1.1 数据模型

日志格式:Apache Combined Log Format

1
127.0.0.1 - - [25/Dec/2024:10:15:32 +0000] "GET /product/12345 HTTP/1.1" 200 1024 "http://example.com" "Mozilla/5.0"

7.1.2 分析需求

分析维度 指标定义 技术实现
PV统计 页面访问量 MapReduce计数
UV统计 独立访客数 按IP去重
转化率 购买/访问比 多表关联
热门商品 Top N商品 排序统计

7.2 数据预处理

7.2.1 日志清洗MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LogCleaner {

public static class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Text cleanedLog = new Text();

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String logLine = value.toString();

// 使用正则表达式解析日志
Pattern pattern = Pattern.compile(
"^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] \"([^\\s]+) ([^\\s]+) ([^\\s]+)\" (\\d+) (\\d+) \"([^\\"]*)\" \"([^\\"]*)\""
);

Matcher matcher = pattern.matcher(logLine);
if (matcher.matches()) {
String ip = matcher.group(1);
String timestamp = matcher.group(2);
String method = matcher.group(3);
String url = matcher.group(4);
String status = matcher.group(6);
String size = matcher.group(7);
String referer = matcher.group(8);
String userAgent = matcher.group(9);

// 输出清洗后的结构化数据
String cleaned = String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s",
ip, timestamp, method, url, status, size, referer, userAgent);

cleanedLog.set(cleaned);
context.write(cleanedLog, NullWritable.get());
}
}
}
}

7.3 业务指标计算

7.3.1 PV/UV统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class PVUVAnalyzer {

public static class PVMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text dateKey = new Text();
private IntWritable one = new IntWritable(1);

@Override
public void map(LongWritable key, Text value, Context context) {
String[] fields = value.toString().split("\t");
if (fields.length >= 4) {
// 提取日期和URL
String date = extractDate(fields[1]);
String url = fields[3];

// 输出格式: date_url\t1 (PV)
dateKey.set(date + "_" + url);
context.write(dateKey, one);

// 输出格式: date_ip\t1 (UV)
dateKey.set(date + "_" + fields[0]);
context.write(dateKey, one);
}
}
}

public static class PVUVReducer extends Reducer<Text, IntWritable, Text, Text> {

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
String[] parts = key.toString().split("_");
String date = parts[0];
String dimension = parts[1];

int count = 0;
for (IntWritable val : values) {
count += val.get();
}

// 区分PV和UV统计
if (dimension.matches("\\d+\\.\\d+\\.\\d+\\.\\d+")) {
// UV统计
context.write(new Text(date + "_UV"), new Text(String.valueOf(1)));
} else {
// PV统计
context.write(new Text(date + "_PV_" + dimension), new Text(String.valueOf(count)));
}
}
}
}

7.3.2 热门商品分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class HotProductAnalyzer {

public static class ProductMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text productId = new Text();
private IntWritable count = new IntWritable();

@Override
public void map(LongWritable key, Text value, Context context) {
String[] fields = value.toString().split("\t");
String url = fields[3];

// 提取商品ID
if (url.startsWith("/product/")) {
String pid = url.substring("/product/".length());
productId.set(pid);
count.set(1);
context.write(productId, count);
}
}
}

public static class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private TreeMap<Integer, List<String>> topProducts = new TreeMap<>();
private final int TOP_N = 100;

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}

topProducts.computeIfAbsent(sum, k -> new ArrayList<>()).add(key.toString());

// 保持Top N
if (topProducts.size() > TOP_N) {
topProducts.remove(topProducts.firstKey());
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 输出Top N结果
for (Map.Entry<Integer, List<String>> entry : topProducts.descendingMap().entrySet()) {
for (String product : entry.getValue()) {
context.write(new Text(product), new IntWritable(entry.getKey()));
}
}
}
}
}

7.4 性能基准测试

7.4.1 测试数据集

数据规模 文件大小 记录条数 处理时间
小型 1GB 100万条 5分钟
中型 10GB 1000万条 25分钟
大型 100GB 1亿条 3小时

7.4.2 性能对比结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 基准测试执行脚本
#!/bin/bash

DATA_SIZES=("1GB" "10GB" "100GB")
WORKERS=(1 3 5)

for size in "${DATA_SIZES[@]}"; do
for workers in "${WORKERS[@]}"; do
echo "Testing $size data with $workers workers"

# 设置Reduce任务数
hadoop jar analytics.jar \
-D mapreduce.job.reduces=$workers \
/user/hadoop/logs/${size} \
/user/hadoop/results/${size}_${workers}workers

# 记录执行时间
yarn application -list -appStates FINISHED | grep "Total Time"
done
done

7.5 生产部署建议

7.5.1 集群规模估算

日活用户 日志量/天 推荐配置 预估成本
1万 10GB 3节点 月成本500元
10万 100GB 5节点 月成本2000元
100万 1TB 10节点 月成本8000元

7.5.2 监控告警配置

1
2
3
4
5
6
7
8
9
10
11
# 设置磁盘使用率告警
<property>
<name>dfs.datanode.disk.check.timeout</name>
<value>600000</value>
</property>

# 邮件告警配置
<property>
<name>dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold</name>
<value>10737418240</value> <!-- 10GB阈值 -->
</property>

8. 总结与最佳实践

8.1 核心要点回顾

  1. 架构设计:理解HDFS、YARN、MapReduce的协同工作机制
  2. 性能调优:数据本地化、Combiner、压缩算法的综合应用
  3. 故障处理:建立完善的监控告警体系
  4. 业务应用:从日志分析到实时计算的完整解决方案

8.2 学习路径建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
journey
title Hadoop学习路径
section 基础阶段
环境搭建: 5: 单机/伪分布式
HDFS操作: 4: 文件系统命令
MapReduce: 3: WordCount案例
section 进阶阶段
性能优化: 4: 调优参数配置
故障排查: 3: 日志分析
监控告警: 4: 集群健康检查
section 实战阶段
业务案例: 5: 电商日志分析
生产部署: 4: 集群规划
性能调优: 5: 基准测试优化

8.3 社区资源推荐

  • 官方文档https://hadoop.apache.org/docs/
  • 技术博客:Cloudera官方技术博客
  • 开源项目:Apache Hadoop GitHub仓库
  • 学习社区:Stack Overflow Hadoop标签

本文档基于Hadoop 3.3.4版本编写,持续更新以适应最新技术发展。