Hadoop快速入门与实战指南 1. Hadoop生态系统全景图 1.1 核心组件架构 Hadoop生态系统采用主从架构模式,通过**HDFS(Hadoop Distributed File System)**实现分布式存储,**YARN(Yet Another Resource Negotiator)**负责资源调度,MapReduce 提供分布式计算框架。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 graph TD     A[Client Application] --> B[YARN ResourceManager]     B --> C[NodeManager Node1]     B --> D[NodeManager Node2]     B --> E[NodeManager Node3]          C --> F[HDFS DataNode1]     D --> G[HDFS DataNode2]     E --> H[HDFS DataNode3]          I[HDFS NameNode] --> F     I --> G     I --> H          J[Secondary NameNode] --> I          style A fill:#f9f,stroke:#333     style B fill:#bbf,stroke:#333     style I fill:#9f9,stroke:#333 
1.2 核心组件详解 
组件 
角色定位 
关键特性 
生产配置建议 
 
 
NameNode 元数据管理 
管理文件系统命名空间 
8-16GB内存,SSD存储 
 
DataNode 数据存储 
实际存储数据块 
4-8TB磁盘,千兆网络 
 
ResourceManager 全局资源调度 
管理集群资源分配 
高可用配置,Zookeeper 
 
NodeManager 节点资源管理 
监控容器资源使用 
动态资源调整 
 
1.3 数据存储原理 HDFS采用块存储 机制,默认块大小128MB,通过副本机制 保证数据可靠性:
1 2 hdfs fsck /user/data/largefile.txt -files -blocks -locations 
副本放置策略 :
第1个副本:本地节点 
第2个副本:不同机架节点 
第3个副本:与第2个副本同机架的不同节点 
 
2. 环境搭建实战 2.1 单机模式部署 2.1.1 前置依赖安装 1 2 3 4 5 6 7 sudo  apt updatesudo  apt install openjdk-8-jdk ssh pdshjava -version ssh localhost   
2.1.2 Hadoop安装配置 1 2 3 4 5 6 7 8 9 10 cd  /optsudo  wget https://downloads.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gzsudo  tar -xzf hadoop-3.3.4.tar.gzsudo  mv  hadoop-3.3.4 hadoopecho  'export HADOOP_HOME=/opt/hadoop'  >> ~/.bashrcecho  'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin'  >> ~/.bashrcsource  ~/.bashrc
2.1.3 核心配置文件 core-site.xml  - 核心配置:
1 2 3 4 5 6 7 8 9 10 11 12 <configuration >     <property >          <name > fs.defaultFS</name >          <value > hdfs://localhost:9000</value >          <description > 默认文件系统URI</description >      </property >      <property >          <name > hadoop.tmp.dir</name >          <value > /opt/hadoop/tmp</value >          <description > 临时目录</description >      </property >  </configuration > 
hdfs-site.xml  - HDFS配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <configuration >     <property >          <name > dfs.replication</name >          <value > 1</value >          <description > 单机模式副本数设为1</description >      </property >      <property >          <name > dfs.namenode.name.dir</name >          <value > /opt/hadoop/data/namenode</value >      </property >      <property >          <name > dfs.datanode.data.dir</name >          <value > /opt/hadoop/data/datanode</value >      </property >  </configuration > 
2.2 伪分布式集群 2.2.1 配置文件调整 mapred-site.xml  - MapReduce配置:
1 2 3 4 5 6 7 8 9 10 <configuration >     <property >          <name > mapreduce.framework.name</name >          <value > yarn</value >      </property >      <property >          <name > mapreduce.application.classpath</name >          <value > $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value >      </property >  </configuration > 
yarn-site.xml  - YARN配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <configuration >     <property >          <name > yarn.nodemanager.aux-services</name >          <value > mapreduce_shuffle</value >      </property >      <property >          <name > yarn.nodemanager.resource.memory-mb</name >          <value > 4096</value >      </property >      <property >          <name > yarn.scheduler.maximum-allocation-mb</name >          <value > 4096</value >      </property >  </configuration > 
2.2.2 集群启动验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 hdfs namenode -format start-dfs.sh start-yarn.sh jps   
2.3 生产级集群配置 2.3.1 集群规划模板 
节点类型 
推荐配置 
角色分配 
网络要求 
 
 
Master节点 16核32GB内存 
NameNode+ResourceManager 
万兆网络 
 
Worker节点 8核16GB内存 
DataNode+NodeManager 
千兆网络 
 
边缘节点 4核8GB内存 
客户端访问 
千兆网络 
 
2.3.2 高可用配置 hdfs-site.xml 高可用配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <property >     <name > dfs.nameservices</name >      <value > mycluster</value >  </property > <property >     <name > dfs.ha.namenodes.mycluster</name >      <value > nn1,nn2</value >  </property > <property >     <name > dfs.namenode.rpc-address.mycluster.nn1</name >      <value > master1:8020</value >  </property > <property >     <name > dfs.namenode.rpc-address.mycluster.nn2</name >      <value > master2:8020</value >  </property > 
3. HDFS操作实战 3.1 文件系统基础操作 3.1.1 文件和目录管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 hdfs dfs -mkdir  /user/hadoop/data hdfs dfs -mkdir  -p /user/hadoop/input/2024 hdfs dfs -put localfile.txt /user/hadoop/data/ hadoop fs -copyFromLocal localfile.txt /user/hadoop/data/ hdfs dfs -get /user/hadoop/data/file.txt ./local_copy.txt hdfs dfs -copyToLocal /user/hadoop/data/file.txt ./ hdfs dfs -cat  /user/hadoop/data/file.txt hdfs dfs -tail  -f /user/hadoop/logs/app.log   hdfs dfs -ls  /user/hadoop/data hdfs dfs -ls  -R /user/hadoop/   
3.1.2 高级文件操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 hdfs dfs -du  -h /user/hadoop/data/ hadoop fs -count /user/hadoop/data/ hdfs dfs -chmod  755 /user/hadoop/data/ hadoop fs -chown  hadoop:hadoop /user/hadoop/data/file.txt hdfs dfs -chgrp  hadoop /user/hadoop/data/ hdfs dfs -mv  /user/hadoop/data/old.txt /user/hadoop/data/new.txt hdfs dfs -cp  /user/hadoop/data/file.txt /user/hadoop/backup/ hdfs dfs -rm  /user/hadoop/data/temp.txt hdfs dfs -rm  -r /user/hadoop/data/old_directory/ hdfs dfs -rm  -skipTrash /user/hadoop/data/large_file.txt   
3.2 数据备份与恢复 3.2.1 DistCp分布式拷贝 1 2 3 4 5 6 7 8 9 10 11 12 hadoop distcp hdfs://source-cluster:8020/user/data \                hdfs://target-cluster:8020/user/data hadoop distcp -update -skipcrccheck -p \     hdfs://namenode1:8020/user/data \     hdfs://namenode2:8020/user/backup hadoop distcp -m 10 -bandwidth 100 \     /user/hadoop/data /user/hadoop/backup 
3.2.2 快照管理 1 2 3 4 5 6 7 8 9 10 11 12 hdfs dfsadmin -allowSnapshot /user/hadoop/data hdfs dfs -createSnapshot /user/hadoop/data backup_20241225 hdfs dfs -ls  /user/hadoop/data/.snapshot/ hdfs dfs -cp  /user/hadoop/data/.snapshot/backup_20241225/file.txt \          /user/hadoop/data/current/ 
3.3 存储策略优化 3.3.1 存储类型配置 1 2 3 4 5 6 7 8 9 10 hdfs storagepolicies -setStoragePolicy -path /user/hadoop/archive -policy COLD hdfs storagepolicies -getStoragePolicy -path /user/hadoop/data/ 
4. MapReduce编程模型深度解析 4.1 WordCount经典案例 4.1.1 Mapper实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package  com.hadoop.tutorial;import  java.io.IOException;import  java.util.StringTokenizer;import  org.apache.hadoop.io.IntWritable;import  org.apache.hadoop.io.Text;import  org.apache.hadoop.mapreduce.Mapper;public  class  WordCountMapper  extends  Mapper <Object, Text, Text, IntWritable> {         private  final  static  IntWritable  one  =  new  IntWritable (1 );     private  Text  word  =  new  Text ();          @Override      public  void  map (Object key, Text value, Context context)               throws  IOException, InterruptedException {                           StringTokenizer  itr  =  new  StringTokenizer (value.toString());         while  (itr.hasMoreTokens()) {             word.set(itr.nextToken().toLowerCase().replaceAll("[^a-zA-Z]" , "" ));             if  (word.getLength() > 0 ) {                 context.write(word, one);             }         }     } } 
4.1.2 Reducer实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package  com.hadoop.tutorial;import  java.io.IOException;import  org.apache.hadoop.io.IntWritable;import  org.apache.hadoop.io.Text;import  org.apache.hadoop.mapreduce.Reducer;public  class  WordCountReducer  extends  Reducer <Text, IntWritable, Text, IntWritable> {         private  IntWritable  result  =  new  IntWritable ();          @Override      public  void  reduce (Text key, Iterable<IntWritable> values, Context context)              throws  IOException, InterruptedException {                  int  sum  =  0 ;                  for  (IntWritable val : values) {             sum += val.get();         }         result.set(sum);         context.write(key, result);     } } 
4.1.3 Driver程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package  com.hadoop.tutorial;import  org.apache.hadoop.conf.Configuration;import  org.apache.hadoop.fs.Path;import  org.apache.hadoop.io.IntWritable;import  org.apache.hadoop.io.Text;import  org.apache.hadoop.mapreduce.Job;import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public  class  WordCountDriver  {         public  static  void  main (String[] args)  throws  Exception {         Configuration  conf  =  new  Configuration ();                           Job  job  =  Job.getInstance(conf, "word count" );         job.setJarByClass(WordCountDriver.class);                           job.setMapperClass(WordCountMapper.class);         job.setCombinerClass(WordCountReducer.class);           job.setReducerClass(WordCountReducer.class);                           job.setOutputKeyClass(Text.class);         job.setOutputValueClass(IntWritable.class);                           FileInputFormat.addInputPath(job, new  Path (args[0 ]));         FileOutputFormat.setOutputPath(job, new  Path (args[1 ]));                           job.setNumReduceTasks(2 );                  System.exit(job.waitForCompletion(true ) ? 0  : 1 );     } } 
4.2 编译与运行 4.2.1 编译打包 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 mvn archetype:generate -DgroupId=com.hadoop.tutorial \   -DartifactId=wordcount \   -DarchetypeArtifactId=maven-archetype-quickstart \   -DinteractiveMode=false  mvn clean package -DskipTests hdfs dfs -mkdir  /user/hadoop/input hdfs dfs -put $HADOOP_HOME /etc/hadoop/*.xml /user/hadoop/input/ hadoop jar wordcount-1.0-SNAPSHOT.jar \   com.hadoop.tutorial.WordCountDriver \   /user/hadoop/input \   /user/hadoop/output/wordcount 
4.2.2 作业监控 1 2 3 4 5 6 7 8 9 10 mapred job -list mapred job -status job_id yarn logs -applicationId application_id 
4.3 高级MapReduce案例 4.3.1 倒排索引构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public  class  InvertedIndexMapper  extends  Mapper <LongWritable, Text, Text, Text> {    private  Text  word  =  new  Text ();     private  Text  documentId  =  new  Text ();          @Override      protected  void  setup (Context context)  {                  String  fileName  =  ((FileSplit) context.getInputSplit()).getPath().getName();         documentId.set(fileName);     }          @Override      public  void  map (LongWritable key, Text value, Context context)               throws  IOException, InterruptedException {                  String[] words = value.toString().split("\\s+" );         for  (String w : words) {             if  (w.length() > 0 ) {                 word.set(w.toLowerCase());                 context.write(word, documentId);             }         }     } } 
4.3.2 二次排序实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  class  SecondarySortMapper  extends  Mapper <LongWritable, Text, CompositeKey, IntWritable> {    private  CompositeKey  compositeKey  =  new  CompositeKey ();     private  IntWritable  value  =  new  IntWritable ();          @Override      public  void  map (LongWritable key, Text line, Context context)               throws  IOException, InterruptedException {                  String[] fields = line.toString().split("," );         compositeKey.set(fields[0 ], Integer.parseInt(fields[1 ]));         value.set(Integer.parseInt(fields[2 ]));         context.write(compositeKey, value);     } } 
5. 性能优化实战 5.1 数据本地化优化 5.1.1 本地化级别监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 mapred job -status job_id | grep "Data-local"  <property>     <name>mapreduce.task.io.sort.mb</name>     <value>256</value>  <!-- 增加排序缓冲区 --> </property> <property>     <name>mapreduce.map.memory.mb</name>     <value>2048</value>  <!-- Map任务内存 --> </property> <property>     <name>mapreduce.reduce.memory.mb</name>     <value>4096</value>  <!-- Reduce任务内存 --> </property> 
5.1.2 机架感知配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 cat  > /etc/hadoop/conf/rack-topology.sh << 'EOF' if  [ $#  -eq 0 ]; then     echo  "/default-rack"  else     ip=$1      case  $ip  in          192.168.1.*) echo  "/rack1"  ;;         192.168.2.*) echo  "/rack2"  ;;         *) echo  "/default-rack"  ;;     esac  fi EOF chmod  +x /etc/hadoop/conf/rack-topology.sh<property>     <name>net.topology.script.file.name</name>     <value>/etc/hadoop/conf/rack-topology.sh</value> </property> 
5.2 Combiner优化策略 5.2.1 Combiner适用场景分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public  class  CombinerAnalyzer  {         public  static  boolean  shouldUseCombiner (Job job)  {                  if  (!isAssociative(job.getReducerClass())) {             return  false ;         }                           long  inputSize  =  getInputSize(job);         long  uniqueKeys  =  estimateUniqueKeys(job);                           return  (double )inputSize / uniqueKeys > 100 ;     }          private  static  boolean  isAssociative (Class<?> reducerClass)  {         return  reducerClass.equals(IntSumReducer.class) ||                 reducerClass.equals(LongSumReducer.class);     } } 
5.2.2 自定义Combiner实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  class  AverageCombiner  extends  Reducer <Text, IntWritable, Text, IntWritable> {         @Override      public  void  reduce (Text key, Iterable<IntWritable> values, Context context)              throws  IOException, InterruptedException {                  int  sum  =  0 ;         int  count  =  0 ;                           for  (IntWritable val : values) {             sum += val.get();             count++;         }                           context.write(key, new  IntWritable (sum));         context.write(new  Text (key.toString() + "_count" ), new  IntWritable (count));     } } 
5.3 压缩算法选择与配置 5.3.1 压缩算法性能对比 
算法 
压缩比 
压缩速度 
解压速度 
适用场景 
 
 
Snappy 20-50% 
非常快 
非常快 
实时处理 
 
LZ4 20-50% 
快 
非常快 
通用场景 
 
Gzip 60-70% 
中等 
中等 
归档存储 
 
Bzip2 80-90% 
慢 
中等 
最大化压缩 
 
5.3.2 生产级压缩配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <configuration >          <property >          <name > mapreduce.map.output.compress</name >          <value > true</value >      </property >      <property >          <name > mapreduce.map.output.compress.codec</name >          <value > org.apache.hadoop.io.compress.SnappyCodec</value >      </property >                <property >          <name > mapreduce.output.fileoutputformat.compress</name >          <value > true</value >      </property >      <property >          <name > mapreduce.output.fileoutputformat.compress.codec</name >          <value > org.apache.hadoop.io.compress.Lz4Codec</value >      </property >      <property >          <name > mapreduce.output.fileoutputformat.compress.type</name >          <value > BLOCK</value >      </property >  </configuration > 
5.3.3 基准测试脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #!/bin/bash ALGORITHMS=("org.apache.hadoop.io.compress.DefaultCodec"               "org.apache.hadoop.io.compress.GzipCodec"              "org.apache.hadoop.io.compress.BZip2Codec"              "org.apache.hadoop.io.compress.SnappyCodec"              "org.apache.hadoop.io.compress.Lz4Codec" ) TEST_FILE="/user/hadoop/test_data/large_file.txt"  for  codec in  "${ALGORITHMS[@]} " ; do     echo  "Testing codec: $codec "                hadoop jar wordcount.jar \         -D mapreduce.output.fileoutputformat.compress=true  \         -D mapreduce.output.fileoutputformat.compress.codec=$codec  \         $TEST_FILE  /user/hadoop/output/$(basename  $codec )               hdfs dfs -du  -h /user/hadoop/output/$(basename  $codec ) done 
6. 常见问题排查与解决方案 6.1 NameNode故障诊断 6.1.1 NameNode启动失败 症状 :NameNode无法启动,日志显示端口冲突
1 2 3 4 5 6 7 8 9 10 netstat -tlnp | grep :8020 lsof -i :8020 <property>     <name>dfs.namenode.rpc-address</name>     <value>namenode-host:8021</value> </property> 
6.1.2 安全模式问题 1 2 3 4 5 6 7 8 hdfs dfsadmin -safemode get hadoop dfsadmin -safemode leave hdfs dfsadmin -report | grep "Safe mode is ON"  
6.2 DataNode连接问题 6.2.1 网络连接诊断 1 2 3 4 5 6 7 8 hdfs dfsadmin -report hdfs dfsadmin -printTopology hdfs dfsadmin -checkHealth datanode-host:50010 
6.2.2 磁盘空间问题 1 2 3 4 5 6 7 8 9 10 11 hdfs dfsadmin -report | grep "DFS Used"  <property>     <name>dfs.datanode.du.reserved</name>     <value>10737418240</value>  <!-- 预留10GB --> </property> hdfs dfsadmin -refreshNodes 
6.3 MapReduce作业故障 6.3.1 内存溢出分析 1 2 3 4 5 6 7 8 9 10 11 12 yarn logs -applicationId application_xxx -log_files stderr <property>     <name>mapreduce.map.java.opts</name>     <value>-Xmx2048m</value> </property> <property>     <name>mapreduce.reduce.java.opts</name>     <value>-Xmx4096m</value> </property> 
6.3.2 数据倾斜检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public  class  SkewDetector  {         public  static  class  SkewMapper  extends  Mapper <LongWritable, Text, Text, IntWritable> {         private  Text  keyOut  =  new  Text ();         private  IntWritable  one  =  new  IntWritable (1 );                  @Override          protected  void  map (LongWritable key, Text value, Context context)  {                          String  keyStr  =  extractKey(value);             keyOut.set(keyStr);             context.getCounter("Skew" , keyStr).increment(1 );         }     }          public  static  void  analyzeSkew (Job job)  {         Counters  counters  =  job.getCounters();         long  maxCount  =  0 ;         long  totalCount  =  0 ;                  for  (CounterGroup group : counters) {             if  (group.getName().equals("Skew" )) {                 for  (Counter counter : group) {                     maxCount = Math.max(maxCount, counter.getValue());                     totalCount += counter.getValue();                 }             }         }                  double  skewRatio  =  (double )maxCount / (totalCount / counters.countCounters());         System.out.println("数据倾斜比: "  + skewRatio);     } } 
6.4 性能监控工具 6.4.1 集群监控脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #!/bin/bash echo  "=== Hadoop集群状态检查 ===" if  jps | grep -q NameNode; then     echo  "✓ NameNode运行正常"  else     echo  "✗ NameNode未运行"  fi data_nodes=$(hdfs dfsadmin -report | grep "Live datanodes"  | awk '{print $3}' ) echo  "活跃DataNode数量: $data_nodes " hdfs dfsadmin -report | grep "DFS Used%"  | head  -5 if  jps | grep -q ResourceManager; then     echo  "✓ ResourceManager运行正常"      active_nodes=$(yarn node -list | grep -c RUNNING)     echo  "活跃NodeManager数量: $active_nodes "  else     echo  "✗ ResourceManager未运行"  fi free -h 
6.4.2 日志分析工具 1 2 3 4 5 6 7 8 9 10 11 <property>     <name>yarn.log-aggregation-enable</name>     <value>true </value> </property> yarn logs -applicationId application_xxx tail  -f $HADOOP_HOME /logs/hadoop-*.log 
7. 电商日志分析实战案例 7.1 业务场景设计 7.1.1 数据模型 日志格式 :Apache Combined Log Format
1 127.0.0.1 - - [25/Dec/2024:10:15:32 +0000] "GET /product/12345 HTTP/1.1" 200 1024 "http://example.com" "Mozilla/5.0" 
7.1.2 分析需求 
分析维度 
指标定义 
技术实现 
 
 
PV统计 页面访问量 
MapReduce计数 
 
UV统计 独立访客数 
按IP去重 
 
转化率 购买/访问比 
多表关联 
 
热门商品 Top N商品 
排序统计 
 
7.2 数据预处理 7.2.1 日志清洗MapReduce 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public  class  LogCleaner  {         public  static  class  LogMapper  extends  Mapper <LongWritable, Text, Text, NullWritable> {         private  Text  cleanedLog  =  new  Text ();                  @Override          public  void  map (LongWritable key, Text value, Context context)                   throws  IOException, InterruptedException {                          String  logLine  =  value.toString();                                       Pattern  pattern  =  Pattern.compile(                 "^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] \"([^\\s]+) ([^\\s]+) ([^\\s]+)\" (\\d+) (\\d+) \"([^\\" ]*)\" \"([^\\" ]*)\""              );                          Matcher  matcher  =  pattern.matcher(logLine);             if  (matcher.matches()) {                 String  ip  =  matcher.group(1 );                 String  timestamp  =  matcher.group(2 );                 String  method  =  matcher.group(3 );                 String  url  =  matcher.group(4 );                 String  status  =  matcher.group(6 );                 String  size  =  matcher.group(7 );                 String  referer  =  matcher.group(8 );                 String  userAgent  =  matcher.group(9 );                                                   String  cleaned  =  String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s" ,                     ip, timestamp, method, url, status, size, referer, userAgent);                                  cleanedLog.set(cleaned);                 context.write(cleanedLog, NullWritable.get());             }         }     } } 
7.3 业务指标计算 7.3.1 PV/UV统计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public  class  PVUVAnalyzer  {         public  static  class  PVMapper  extends  Mapper <LongWritable, Text, Text, IntWritable> {         private  Text  dateKey  =  new  Text ();         private  IntWritable  one  =  new  IntWritable (1 );                  @Override          public  void  map (LongWritable key, Text value, Context context)  {             String[] fields = value.toString().split("\t" );             if  (fields.length >= 4 ) {                                  String  date  =  extractDate(fields[1 ]);                 String  url  =  fields[3 ];                                                   dateKey.set(date + "_"  + url);                 context.write(dateKey, one);                                                   dateKey.set(date + "_"  + fields[0 ]);                 context.write(dateKey, one);             }         }     }          public  static  class  PVUVReducer  extends  Reducer <Text, IntWritable, Text, Text> {                  @Override          public  void  reduce (Text key, Iterable<IntWritable> values, Context context)  {             String[] parts = key.toString().split("_" );             String  date  =  parts[0 ];             String  dimension  =  parts[1 ];                          int  count  =  0 ;             for  (IntWritable val : values) {                 count += val.get();             }                                       if  (dimension.matches("\\d+\\.\\d+\\.\\d+\\.\\d+" )) {                                  context.write(new  Text (date + "_UV" ), new  Text (String.valueOf(1 )));             } else  {                                  context.write(new  Text (date + "_PV_"  + dimension), new  Text (String.valueOf(count)));             }         }     } } 
7.3.2 热门商品分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public  class  HotProductAnalyzer  {         public  static  class  ProductMapper  extends  Mapper <LongWritable, Text, Text, IntWritable> {         private  Text  productId  =  new  Text ();         private  IntWritable  count  =  new  IntWritable ();                  @Override          public  void  map (LongWritable key, Text value, Context context)  {             String[] fields = value.toString().split("\t" );             String  url  =  fields[3 ];                                       if  (url.startsWith("/product/" )) {                 String  pid  =  url.substring("/product/" .length());                 productId.set(pid);                 count.set(1 );                 context.write(productId, count);             }         }     }          public  static  class  TopNReducer  extends  Reducer <Text, IntWritable, Text, IntWritable> {         private  TreeMap<Integer, List<String>> topProducts = new  TreeMap <>();         private  final  int  TOP_N  =  100 ;                  @Override          public  void  reduce (Text key, Iterable<IntWritable> values, Context context)  {             int  sum  =  0 ;             for  (IntWritable val : values) {                 sum += val.get();             }                          topProducts.computeIfAbsent(sum, k -> new  ArrayList <>()).add(key.toString());                                       if  (topProducts.size() > TOP_N) {                 topProducts.remove(topProducts.firstKey());             }         }                  @Override          protected  void  cleanup (Context context)  throws  IOException, InterruptedException {                          for  (Map.Entry<Integer, List<String>> entry : topProducts.descendingMap().entrySet()) {                 for  (String product : entry.getValue()) {                     context.write(new  Text (product), new  IntWritable (entry.getKey()));                 }             }         }     } } 
7.4 性能基准测试 7.4.1 测试数据集 
数据规模 
文件大小 
记录条数 
处理时间 
 
 
小型 1GB 
100万条 
5分钟 
 
中型 10GB 
1000万条 
25分钟 
 
大型 100GB 
1亿条 
3小时 
 
7.4.2 性能对比结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 DATA_SIZES=("1GB"  "10GB"  "100GB" ) WORKERS=(1 3 5) for  size in  "${DATA_SIZES[@]} " ; do     for  workers in  "${WORKERS[@]} " ; do          echo  "Testing $size  data with $workers  workers"                            hadoop jar analytics.jar \             -D mapreduce.job.reduces=$workers  \             /user/hadoop/logs/${size}  \             /user/hadoop/results/${size} _${workers} workers                               yarn application -list -appStates FINISHED | grep "Total Time"      done  done 
7.5 生产部署建议 7.5.1 集群规模估算 
日活用户 
日志量/天 
推荐配置 
预估成本 
 
 
1万 10GB 
3节点 
月成本500元 
 
10万 100GB 
5节点 
月成本2000元 
 
100万 1TB 
10节点 
月成本8000元 
 
7.5.2 监控告警配置 1 2 3 4 5 6 7 8 9 10 11 <property>     <name>dfs.datanode.disk.check.timeout</name>     <value>600000</value> </property> <property>     <name>dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold</name>     <value>10737418240</value>  <!-- 10GB阈值 --> </property> 
8. 总结与最佳实践 8.1 核心要点回顾 
架构设计 :理解HDFS、YARN、MapReduce的协同工作机制性能调优 :数据本地化、Combiner、压缩算法的综合应用故障处理 :建立完善的监控告警体系业务应用 :从日志分析到实时计算的完整解决方案 
8.2 学习路径建议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 journey     title Hadoop学习路径     section 基础阶段       环境搭建: 5: 单机/伪分布式       HDFS操作: 4: 文件系统命令       MapReduce: 3: WordCount案例     section 进阶阶段       性能优化: 4: 调优参数配置       故障排查: 3: 日志分析       监控告警: 4: 集群健康检查     section 实战阶段       业务案例: 5: 电商日志分析       生产部署: 4: 集群规划       性能调优: 5: 基准测试优化 
8.3 社区资源推荐 
本文档基于Hadoop 3.3.4版本编写,持续更新以适应最新技术发展。