Hadoop快速入门与实战指南 1. Hadoop生态系统全景图 1.1 核心组件架构 Hadoop生态系统采用主从架构模式,通过**HDFS(Hadoop Distributed File System)**实现分布式存储,**YARN(Yet Another Resource Negotiator)**负责资源调度,MapReduce 提供分布式计算框架。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 graph TD A[Client Application] --> B[YARN ResourceManager] B --> C[NodeManager Node1] B --> D[NodeManager Node2] B --> E[NodeManager Node3] C --> F[HDFS DataNode1] D --> G[HDFS DataNode2] E --> H[HDFS DataNode3] I[HDFS NameNode] --> F I --> G I --> H J[Secondary NameNode] --> I style A fill:#f9f,stroke:#333 style B fill:#bbf,stroke:#333 style I fill:#9f9,stroke:#333
1.2 核心组件详解
组件
角色定位
关键特性
生产配置建议
NameNode
元数据管理
管理文件系统命名空间
8-16GB内存,SSD存储
DataNode
数据存储
实际存储数据块
4-8TB磁盘,千兆网络
ResourceManager
全局资源调度
管理集群资源分配
高可用配置,Zookeeper
NodeManager
节点资源管理
监控容器资源使用
动态资源调整
1.3 数据存储原理 HDFS采用块存储 机制,默认块大小128MB,通过副本机制 保证数据可靠性:
1 2 hdfs fsck /user/data/largefile.txt -files -blocks -locations
副本放置策略 :
第1个副本:本地节点
第2个副本:不同机架节点
第3个副本:与第2个副本同机架的不同节点
2. 环境搭建实战 2.1 单机模式部署 2.1.1 前置依赖安装 1 2 3 4 5 6 7 sudo apt updatesudo apt install openjdk-8-jdk ssh pdshjava -version ssh localhost
2.1.2 Hadoop安装配置 1 2 3 4 5 6 7 8 9 10 cd /optsudo wget https://downloads.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gzsudo tar -xzf hadoop-3.3.4.tar.gzsudo mv hadoop-3.3.4 hadoopecho 'export HADOOP_HOME=/opt/hadoop' >> ~/.bashrcecho 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin' >> ~/.bashrcsource ~/.bashrc
2.1.3 核心配置文件 core-site.xml - 核心配置:
1 2 3 4 5 6 7 8 9 10 11 12 <configuration > <property > <name > fs.defaultFS</name > <value > hdfs://localhost:9000</value > <description > 默认文件系统URI</description > </property > <property > <name > hadoop.tmp.dir</name > <value > /opt/hadoop/tmp</value > <description > 临时目录</description > </property > </configuration >
hdfs-site.xml - HDFS配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <configuration > <property > <name > dfs.replication</name > <value > 1</value > <description > 单机模式副本数设为1</description > </property > <property > <name > dfs.namenode.name.dir</name > <value > /opt/hadoop/data/namenode</value > </property > <property > <name > dfs.datanode.data.dir</name > <value > /opt/hadoop/data/datanode</value > </property > </configuration >
2.2 伪分布式集群 2.2.1 配置文件调整 mapred-site.xml - MapReduce配置:
1 2 3 4 5 6 7 8 9 10 <configuration > <property > <name > mapreduce.framework.name</name > <value > yarn</value > </property > <property > <name > mapreduce.application.classpath</name > <value > $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value > </property > </configuration >
yarn-site.xml - YARN配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <configuration > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > <property > <name > yarn.nodemanager.resource.memory-mb</name > <value > 4096</value > </property > <property > <name > yarn.scheduler.maximum-allocation-mb</name > <value > 4096</value > </property > </configuration >
2.2.2 集群启动验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 hdfs namenode -format start-dfs.sh start-yarn.sh jps
2.3 生产级集群配置 2.3.1 集群规划模板
节点类型
推荐配置
角色分配
网络要求
Master节点
16核32GB内存
NameNode+ResourceManager
万兆网络
Worker节点
8核16GB内存
DataNode+NodeManager
千兆网络
边缘节点
4核8GB内存
客户端访问
千兆网络
2.3.2 高可用配置 hdfs-site.xml 高可用配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <property > <name > dfs.nameservices</name > <value > mycluster</value > </property > <property > <name > dfs.ha.namenodes.mycluster</name > <value > nn1,nn2</value > </property > <property > <name > dfs.namenode.rpc-address.mycluster.nn1</name > <value > master1:8020</value > </property > <property > <name > dfs.namenode.rpc-address.mycluster.nn2</name > <value > master2:8020</value > </property >
3. HDFS操作实战 3.1 文件系统基础操作 3.1.1 文件和目录管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 hdfs dfs -mkdir /user/hadoop/data hdfs dfs -mkdir -p /user/hadoop/input/2024 hdfs dfs -put localfile.txt /user/hadoop/data/ hadoop fs -copyFromLocal localfile.txt /user/hadoop/data/ hdfs dfs -get /user/hadoop/data/file.txt ./local_copy.txt hdfs dfs -copyToLocal /user/hadoop/data/file.txt ./ hdfs dfs -cat /user/hadoop/data/file.txt hdfs dfs -tail -f /user/hadoop/logs/app.log hdfs dfs -ls /user/hadoop/data hdfs dfs -ls -R /user/hadoop/
3.1.2 高级文件操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 hdfs dfs -du -h /user/hadoop/data/ hadoop fs -count /user/hadoop/data/ hdfs dfs -chmod 755 /user/hadoop/data/ hadoop fs -chown hadoop:hadoop /user/hadoop/data/file.txt hdfs dfs -chgrp hadoop /user/hadoop/data/ hdfs dfs -mv /user/hadoop/data/old.txt /user/hadoop/data/new.txt hdfs dfs -cp /user/hadoop/data/file.txt /user/hadoop/backup/ hdfs dfs -rm /user/hadoop/data/temp.txt hdfs dfs -rm -r /user/hadoop/data/old_directory/ hdfs dfs -rm -skipTrash /user/hadoop/data/large_file.txt
3.2 数据备份与恢复 3.2.1 DistCp分布式拷贝 1 2 3 4 5 6 7 8 9 10 11 12 hadoop distcp hdfs://source-cluster:8020/user/data \ hdfs://target-cluster:8020/user/data hadoop distcp -update -skipcrccheck -p \ hdfs://namenode1:8020/user/data \ hdfs://namenode2:8020/user/backup hadoop distcp -m 10 -bandwidth 100 \ /user/hadoop/data /user/hadoop/backup
3.2.2 快照管理 1 2 3 4 5 6 7 8 9 10 11 12 hdfs dfsadmin -allowSnapshot /user/hadoop/data hdfs dfs -createSnapshot /user/hadoop/data backup_20241225 hdfs dfs -ls /user/hadoop/data/.snapshot/ hdfs dfs -cp /user/hadoop/data/.snapshot/backup_20241225/file.txt \ /user/hadoop/data/current/
3.3 存储策略优化 3.3.1 存储类型配置 1 2 3 4 5 6 7 8 9 10 hdfs storagepolicies -setStoragePolicy -path /user/hadoop/archive -policy COLD hdfs storagepolicies -getStoragePolicy -path /user/hadoop/data/
4. MapReduce编程模型深度解析 4.1 WordCount经典案例 4.1.1 Mapper实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.hadoop.tutorial;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable (1 ); private Text word = new Text (); @Override public void map (Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer (value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase().replaceAll("[^a-zA-Z]" , "" )); if (word.getLength() > 0 ) { context.write(word, one); } } } }
4.1.2 Reducer实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.hadoop.tutorial;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable (); @Override public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
4.1.3 Driver程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.hadoop.tutorial;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver { public static void main (String[] args) throws Exception { Configuration conf = new Configuration (); Job job = Job.getInstance(conf, "word count" ); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.setNumReduceTasks(2 ); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }
4.2 编译与运行 4.2.1 编译打包 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 mvn archetype:generate -DgroupId=com.hadoop.tutorial \ -DartifactId=wordcount \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false mvn clean package -DskipTests hdfs dfs -mkdir /user/hadoop/input hdfs dfs -put $HADOOP_HOME /etc/hadoop/*.xml /user/hadoop/input/ hadoop jar wordcount-1.0-SNAPSHOT.jar \ com.hadoop.tutorial.WordCountDriver \ /user/hadoop/input \ /user/hadoop/output/wordcount
4.2.2 作业监控 1 2 3 4 5 6 7 8 9 10 mapred job -list mapred job -status job_id yarn logs -applicationId application_id
4.3 高级MapReduce案例 4.3.1 倒排索引构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class InvertedIndexMapper extends Mapper <LongWritable, Text, Text, Text> { private Text word = new Text (); private Text documentId = new Text (); @Override protected void setup (Context context) { String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); documentId.set(fileName); } @Override public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\\s+" ); for (String w : words) { if (w.length() > 0 ) { word.set(w.toLowerCase()); context.write(word, documentId); } } } }
4.3.2 二次排序实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class SecondarySortMapper extends Mapper <LongWritable, Text, CompositeKey, IntWritable> { private CompositeKey compositeKey = new CompositeKey (); private IntWritable value = new IntWritable (); @Override public void map (LongWritable key, Text line, Context context) throws IOException, InterruptedException { String[] fields = line.toString().split("," ); compositeKey.set(fields[0 ], Integer.parseInt(fields[1 ])); value.set(Integer.parseInt(fields[2 ])); context.write(compositeKey, value); } }
5. 性能优化实战 5.1 数据本地化优化 5.1.1 本地化级别监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 mapred job -status job_id | grep "Data-local" <property> <name>mapreduce.task.io.sort.mb</name> <value>256</value> <!-- 增加排序缓冲区 --> </property> <property> <name>mapreduce.map.memory.mb</name> <value>2048</value> <!-- Map任务内存 --> </property> <property> <name>mapreduce.reduce.memory.mb</name> <value>4096</value> <!-- Reduce任务内存 --> </property>
5.1.2 机架感知配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 cat > /etc/hadoop/conf/rack-topology.sh << 'EOF' if [ $# -eq 0 ]; then echo "/default-rack" else ip=$1 case $ip in 192.168.1.*) echo "/rack1" ;; 192.168.2.*) echo "/rack2" ;; *) echo "/default-rack" ;; esac fi EOF chmod +x /etc/hadoop/conf/rack-topology.sh<property> <name>net.topology.script.file.name</name> <value>/etc/hadoop/conf/rack-topology.sh</value> </property>
5.2 Combiner优化策略 5.2.1 Combiner适用场景分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class CombinerAnalyzer { public static boolean shouldUseCombiner (Job job) { if (!isAssociative(job.getReducerClass())) { return false ; } long inputSize = getInputSize(job); long uniqueKeys = estimateUniqueKeys(job); return (double )inputSize / uniqueKeys > 100 ; } private static boolean isAssociative (Class<?> reducerClass) { return reducerClass.equals(IntSumReducer.class) || reducerClass.equals(LongSumReducer.class); } }
5.2.2 自定义Combiner实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class AverageCombiner extends Reducer <Text, IntWritable, Text, IntWritable> { @Override public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; int count = 0 ; for (IntWritable val : values) { sum += val.get(); count++; } context.write(key, new IntWritable (sum)); context.write(new Text (key.toString() + "_count" ), new IntWritable (count)); } }
5.3 压缩算法选择与配置 5.3.1 压缩算法性能对比
算法
压缩比
压缩速度
解压速度
适用场景
Snappy
20-50%
非常快
非常快
实时处理
LZ4
20-50%
快
非常快
通用场景
Gzip
60-70%
中等
中等
归档存储
Bzip2
80-90%
慢
中等
最大化压缩
5.3.2 生产级压缩配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <configuration > <property > <name > mapreduce.map.output.compress</name > <value > true</value > </property > <property > <name > mapreduce.map.output.compress.codec</name > <value > org.apache.hadoop.io.compress.SnappyCodec</value > </property > <property > <name > mapreduce.output.fileoutputformat.compress</name > <value > true</value > </property > <property > <name > mapreduce.output.fileoutputformat.compress.codec</name > <value > org.apache.hadoop.io.compress.Lz4Codec</value > </property > <property > <name > mapreduce.output.fileoutputformat.compress.type</name > <value > BLOCK</value > </property > </configuration >
5.3.3 基准测试脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #!/bin/bash ALGORITHMS=("org.apache.hadoop.io.compress.DefaultCodec" "org.apache.hadoop.io.compress.GzipCodec" "org.apache.hadoop.io.compress.BZip2Codec" "org.apache.hadoop.io.compress.SnappyCodec" "org.apache.hadoop.io.compress.Lz4Codec" ) TEST_FILE="/user/hadoop/test_data/large_file.txt" for codec in "${ALGORITHMS[@]} " ; do echo "Testing codec: $codec " hadoop jar wordcount.jar \ -D mapreduce.output.fileoutputformat.compress=true \ -D mapreduce.output.fileoutputformat.compress.codec=$codec \ $TEST_FILE /user/hadoop/output/$(basename $codec ) hdfs dfs -du -h /user/hadoop/output/$(basename $codec ) done
6. 常见问题排查与解决方案 6.1 NameNode故障诊断 6.1.1 NameNode启动失败 症状 :NameNode无法启动,日志显示端口冲突
1 2 3 4 5 6 7 8 9 10 netstat -tlnp | grep :8020 lsof -i :8020 <property> <name>dfs.namenode.rpc-address</name> <value>namenode-host:8021</value> </property>
6.1.2 安全模式问题 1 2 3 4 5 6 7 8 hdfs dfsadmin -safemode get hadoop dfsadmin -safemode leave hdfs dfsadmin -report | grep "Safe mode is ON"
6.2 DataNode连接问题 6.2.1 网络连接诊断 1 2 3 4 5 6 7 8 hdfs dfsadmin -report hdfs dfsadmin -printTopology hdfs dfsadmin -checkHealth datanode-host:50010
6.2.2 磁盘空间问题 1 2 3 4 5 6 7 8 9 10 11 hdfs dfsadmin -report | grep "DFS Used" <property> <name>dfs.datanode.du.reserved</name> <value>10737418240</value> <!-- 预留10GB --> </property> hdfs dfsadmin -refreshNodes
6.3 MapReduce作业故障 6.3.1 内存溢出分析 1 2 3 4 5 6 7 8 9 10 11 12 yarn logs -applicationId application_xxx -log_files stderr <property> <name>mapreduce.map.java.opts</name> <value>-Xmx2048m</value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx4096m</value> </property>
6.3.2 数据倾斜检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class SkewDetector { public static class SkewMapper extends Mapper <LongWritable, Text, Text, IntWritable> { private Text keyOut = new Text (); private IntWritable one = new IntWritable (1 ); @Override protected void map (LongWritable key, Text value, Context context) { String keyStr = extractKey(value); keyOut.set(keyStr); context.getCounter("Skew" , keyStr).increment(1 ); } } public static void analyzeSkew (Job job) { Counters counters = job.getCounters(); long maxCount = 0 ; long totalCount = 0 ; for (CounterGroup group : counters) { if (group.getName().equals("Skew" )) { for (Counter counter : group) { maxCount = Math.max(maxCount, counter.getValue()); totalCount += counter.getValue(); } } } double skewRatio = (double )maxCount / (totalCount / counters.countCounters()); System.out.println("数据倾斜比: " + skewRatio); } }
6.4 性能监控工具 6.4.1 集群监控脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #!/bin/bash echo "=== Hadoop集群状态检查 ===" if jps | grep -q NameNode; then echo "✓ NameNode运行正常" else echo "✗ NameNode未运行" fi data_nodes=$(hdfs dfsadmin -report | grep "Live datanodes" | awk '{print $3}' ) echo "活跃DataNode数量: $data_nodes " hdfs dfsadmin -report | grep "DFS Used%" | head -5 if jps | grep -q ResourceManager; then echo "✓ ResourceManager运行正常" active_nodes=$(yarn node -list | grep -c RUNNING) echo "活跃NodeManager数量: $active_nodes " else echo "✗ ResourceManager未运行" fi free -h
6.4.2 日志分析工具 1 2 3 4 5 6 7 8 9 10 11 <property> <name>yarn.log-aggregation-enable</name> <value>true </value> </property> yarn logs -applicationId application_xxx tail -f $HADOOP_HOME /logs/hadoop-*.log
7. 电商日志分析实战案例 7.1 业务场景设计 7.1.1 数据模型 日志格式 :Apache Combined Log Format
1 127.0.0.1 - - [25/Dec/2024:10:15:32 +0000] "GET /product/12345 HTTP/1.1" 200 1024 "http://example.com" "Mozilla/5.0"
7.1.2 分析需求
分析维度
指标定义
技术实现
PV统计
页面访问量
MapReduce计数
UV统计
独立访客数
按IP去重
转化率
购买/访问比
多表关联
热门商品
Top N商品
排序统计
7.2 数据预处理 7.2.1 日志清洗MapReduce 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class LogCleaner { public static class LogMapper extends Mapper <LongWritable, Text, Text, NullWritable> { private Text cleanedLog = new Text (); @Override public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String logLine = value.toString(); Pattern pattern = Pattern.compile( "^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] \"([^\\s]+) ([^\\s]+) ([^\\s]+)\" (\\d+) (\\d+) \"([^\\" ]*)\" \"([^\\" ]*)\"" ); Matcher matcher = pattern.matcher(logLine); if (matcher.matches()) { String ip = matcher.group(1 ); String timestamp = matcher.group(2 ); String method = matcher.group(3 ); String url = matcher.group(4 ); String status = matcher.group(6 ); String size = matcher.group(7 ); String referer = matcher.group(8 ); String userAgent = matcher.group(9 ); String cleaned = String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s" , ip, timestamp, method, url, status, size, referer, userAgent); cleanedLog.set(cleaned); context.write(cleanedLog, NullWritable.get()); } } } }
7.3 业务指标计算 7.3.1 PV/UV统计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class PVUVAnalyzer { public static class PVMapper extends Mapper <LongWritable, Text, Text, IntWritable> { private Text dateKey = new Text (); private IntWritable one = new IntWritable (1 ); @Override public void map (LongWritable key, Text value, Context context) { String[] fields = value.toString().split("\t" ); if (fields.length >= 4 ) { String date = extractDate(fields[1 ]); String url = fields[3 ]; dateKey.set(date + "_" + url); context.write(dateKey, one); dateKey.set(date + "_" + fields[0 ]); context.write(dateKey, one); } } } public static class PVUVReducer extends Reducer <Text, IntWritable, Text, Text> { @Override public void reduce (Text key, Iterable<IntWritable> values, Context context) { String[] parts = key.toString().split("_" ); String date = parts[0 ]; String dimension = parts[1 ]; int count = 0 ; for (IntWritable val : values) { count += val.get(); } if (dimension.matches("\\d+\\.\\d+\\.\\d+\\.\\d+" )) { context.write(new Text (date + "_UV" ), new Text (String.valueOf(1 ))); } else { context.write(new Text (date + "_PV_" + dimension), new Text (String.valueOf(count))); } } } }
7.3.2 热门商品分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class HotProductAnalyzer { public static class ProductMapper extends Mapper <LongWritable, Text, Text, IntWritable> { private Text productId = new Text (); private IntWritable count = new IntWritable (); @Override public void map (LongWritable key, Text value, Context context) { String[] fields = value.toString().split("\t" ); String url = fields[3 ]; if (url.startsWith("/product/" )) { String pid = url.substring("/product/" .length()); productId.set(pid); count.set(1 ); context.write(productId, count); } } } public static class TopNReducer extends Reducer <Text, IntWritable, Text, IntWritable> { private TreeMap<Integer, List<String>> topProducts = new TreeMap <>(); private final int TOP_N = 100 ; @Override public void reduce (Text key, Iterable<IntWritable> values, Context context) { int sum = 0 ; for (IntWritable val : values) { sum += val.get(); } topProducts.computeIfAbsent(sum, k -> new ArrayList <>()).add(key.toString()); if (topProducts.size() > TOP_N) { topProducts.remove(topProducts.firstKey()); } } @Override protected void cleanup (Context context) throws IOException, InterruptedException { for (Map.Entry<Integer, List<String>> entry : topProducts.descendingMap().entrySet()) { for (String product : entry.getValue()) { context.write(new Text (product), new IntWritable (entry.getKey())); } } } } }
7.4 性能基准测试 7.4.1 测试数据集
数据规模
文件大小
记录条数
处理时间
小型
1GB
100万条
5分钟
中型
10GB
1000万条
25分钟
大型
100GB
1亿条
3小时
7.4.2 性能对比结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 DATA_SIZES=("1GB" "10GB" "100GB" ) WORKERS=(1 3 5) for size in "${DATA_SIZES[@]} " ; do for workers in "${WORKERS[@]} " ; do echo "Testing $size data with $workers workers" hadoop jar analytics.jar \ -D mapreduce.job.reduces=$workers \ /user/hadoop/logs/${size} \ /user/hadoop/results/${size} _${workers} workers yarn application -list -appStates FINISHED | grep "Total Time" done done
7.5 生产部署建议 7.5.1 集群规模估算
日活用户
日志量/天
推荐配置
预估成本
1万
10GB
3节点
月成本500元
10万
100GB
5节点
月成本2000元
100万
1TB
10节点
月成本8000元
7.5.2 监控告警配置 1 2 3 4 5 6 7 8 9 10 11 <property> <name>dfs.datanode.disk.check.timeout</name> <value>600000</value> </property> <property> <name>dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold</name> <value>10737418240</value> <!-- 10GB阈值 --> </property>
8. 总结与最佳实践 8.1 核心要点回顾
架构设计 :理解HDFS、YARN、MapReduce的协同工作机制
性能调优 :数据本地化、Combiner、压缩算法的综合应用
故障处理 :建立完善的监控告警体系
业务应用 :从日志分析到实时计算的完整解决方案
8.2 学习路径建议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 journey title Hadoop学习路径 section 基础阶段 环境搭建: 5: 单机/伪分布式 HDFS操作: 4: 文件系统命令 MapReduce: 3: WordCount案例 section 进阶阶段 性能优化: 4: 调优参数配置 故障排查: 3: 日志分析 监控告警: 4: 集群健康检查 section 实战阶段 业务案例: 5: 电商日志分析 生产部署: 4: 集群规划 性能调优: 5: 基准测试优化
8.3 社区资源推荐
本文档基于Hadoop 3.3.4版本编写,持续更新以适应最新技术发展。