>

1.2 机器网络环境... 3,• Hadoop快速入门对初次使

- 编辑:澳门博发娱乐官网 -

1.2 机器网络环境... 3,• Hadoop快速入门对初次使

hadoop中mapreduce的常用类(1)

写这么些文章的时候才开采到新旧API是还要设有于1.1.2的hadoop中的。此前还直选用闷儿为何不时候是jobClient提交职分,临时是Job...不管API是还是不是更新,上边那么些类也依旧存在于API中的,经过和谐追踪源码,开采原理还是那些。只不超过实际行了再度组织,实行了有的封装,使得扩大性更好。所以仍旧把这几个东西从记载本贴进来呢。

有关那几个类的介绍以至利用,有的是在协和debug中看看的,大多为纯翻译API的申明,然而翻译的经过收获颇丰。

GenericOptionsParser

parseGeneralOptions(Options opts, Configuration conf, String[] args)解析命令行参数

GenericOptionsParser是为hadoop框架深入分析命令行参数的工具类。它能够分辨标准的命令行参数,使app能够轻巧钦定namenode,jobtracker,以至额外的配备能源或新闻等。它支持的职能有:

-conf 钦定布署文件;

-D 钦命陈设新闻;

-fs 指定namenode

-jt 指定jobtracker

-files 钦赐须求copy到MXC90集群的文件,以逗号分隔

-libjars钦命须求copy到M奥迪Q3集群的classpath的jar包,以逗号分隔

-archives钦点须要copy到M大切诺基集群的压缩文件,以逗号分隔,会活动解压缩

  1. String[] otherArgs = new GenericOptionsParser(job, args)

  2. .getRemainingArgs();

  3. if (otherArgs.length != 2) {

  4. System.err.println("Usage: wordcount ");

  5. System.exit(2);

  6. }

ToolRunner

用来跑实现Tool接口的工具。它与GenericOptionsParser合作来分析命令行参数,只在这里次运维中改换configuration的参数。

Tool

管理命令行参数的接口。Tool是M君越的此外tool/app的正规化。那一个完结应有代理对规范命令行参数的管理。上边是头角崭然实现:

public class MyApp extends Configured implements Tool {              public int run(String[] args) throws Exception {   // 即将被ToolRunner执行的Configuration   Configuration conf = getConf();               // 使用conf建立JobConf   JobConf job = new JobConf(conf, MyApp.class);           // 执行客户端参数   Path in = new Path(args[1]);   Path out = new Path(args[2]);               // 指定job相关的参数        job.setJobName("my-app");   job.setInputPath(in);   job.setOutputPath(out);   job.setMapperClass(MyApp.MyMapper.class);   job.setReducerClass(MyApp.MyReducer.class);   *   // 提交job,然后监视进度直到job完成   JobClient.runJob(job);   }              public static void main(String[] args) throws Exception {   // 让ToolRunner 处理命令行参数    int res = ToolRunner.run(new Configuration(), new Sort(), //这里封装了GenericOptionsParser解析args               System.exit(res);   }   }   

MultipleOutputFormat

自定义输出文件名称大概说名称格式。在jobconf中setOutputFormat(MultipleOutputFormat的子类)就行了。并不是那种part-r-00000吗的了。。。况兼能够分配结果到七个文本中。

MultipleOutputFormat承袭了FileOutputFormat, 允许将出口数据写进区别的出口文件中。有二种选择场景:

a. 最稀有叁个reducer的mapreduce任务。这几个reducer想要依据实际的key将出口写进区别的文件中。要是三个key编码了实在的key和为实际的key钦点的地点

b. 独有map的职分。这一个任务想要把输入文件或然输入内容的一对名称设为输出文件名。

c. 独有map的天职。这些职务为出口命名时,须要依据keys和输入文件名。 

//这里是根据key生成多个文件的地方,可以看到还有value,name等参数   @Override   protected String generateFileNameForKeyValue(Text key,   IntWritable value, String name) {   char c = key.toString().toLowerCase().charAt(0);   if (c >= 'a' && c <= 'z') {   return c + ".txt";   }   return "result.txt";   }   

DistributedCache

在集群中异常快分发大的只读文件。DistributedCache是M揽胜极光用来缓存app需求的诸如text,archive,jar等的文件的。app通过jobconf中的url来钦赐要求缓存的文书。它会假定钦赐的这些文件已经在url钦定的对应地方上了。在job在node上实行以前,DistributedCache会copy须求的文件到那些slave node。它的遵守正是为各个job只copy二次,並且copy到钦点地方,能够自行解压缩。

DistributedCache能够用来散发轻松的只读文件,恐怕部分目迷五色的举例archive,jar文件等。archive文件会活动解压缩,而jar文件会被机关放置到任务的classpath中(lib)。分发压缩archive时,能够钦赐解压名称如:dict.zip#dict。那样就能够解压到dict中,不然暗中认可是dict.zip中。

文本是有施行权限的。顾客可以选拔在任务的办事目录下创设针对DistributedCache的软链接。

DistributedCache.createSymlink(conf);     DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);      

DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前职业目录下开创到缓存文件的标识链接。则在task的当前职业目录会有link-name的链接,也正是高效方法,链接到expr.txt文件,在setup方法应用的情景则要简明大多。可能经过设置配置文件属性mapred.create.symlink为yes。 布满式缓存会截取U昂科雷I的部分作为链接的名字。 举个例子,UOdysseyI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前专门的学问目录会著名字为lib.so的链接, 它会链接分布式缓存中的lib.so.1


图片 1


) 写那一个小说的时候才开采到新旧API是还要设有于1.1.2的hadoop中的。在此在此以前还间接选举择闷儿为何有的时候候是jobClient提交任...

版权注解:本文为博主原创小说,未经博主同意不得转发。

目的
那篇教程从客商的角度出发,周全地介绍了Hadoop Map/Reduce框架的各类方面。
先决条件
请先确认Hadoop被科学安装、配置和正规运行中。更多音信见:
• Hadoop连忙入门对第一使用者。
• Hadoop集群搭建对周围布满式集群。
概述
Hadoop Map/Reduce是二个应用简易的软件框架,基于它写出来的应用程序能够运维在由上千个商用机器组成的巨型集群上,并以一种保障容错的办法并行管理上T级其余数据集。
多少个Map/Reduce 作业(job) 平时会把输入的数量集切分为若干单身的数据块,由 map职分(task)以完全并行的方法处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce义务。平时作业的输入和输出都会被存放在文件系统中。 整个框架担当任务的调解和监察和控制,以至重新施行已经战败的职责。
普通,Map/Reduce框架和分布式文件系统是运作在一组相同的节点上的,约等于说,计算节点和存款和储蓄节点平常在一道。这种安顿允许框架在此多少个已经存好数据的节点上相当慢地调节职分,那能够使全部集群的互联网带宽被那一个连忙地使用。
Map/Reduce框架由一个独立的master JobTracker 和各样集群节点一个slave TaskTracker共同构成。master担当调整构成二个功课的持有职分,那些职分布满在差别的slave上,master监察和控制它们的实施,重新奉行业已倒闭的天职。而slave仅担当实践由master指使的职务。
应用程序起码应该指明输入/输出的岗位(路线),并由此兑现方便的接口或抽象类提供map和reduce函数。再增多别的作业的参数,就组成了课业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可实施程序等)和配备消息给JobTracker,前者担任分发那个软件和布置音信给slave、调治职责并监督它们的进行,同时提供情况和确诊消息给job-client。
虽说Hadoop框架是用JavaTM完毕的,但Map/Reduce应用程序则不必然要用 Java来写 。
• Hadoop Streaming是一种运转作业的实用工具,它同意顾客创建和周转任何可奉行程序 (譬喻:Shell工具)来做为mapper和reducer。
• Hadoop Pipes是二个与SWIG包容的C++ API (未有依赖JNITM技艺),它也可用以落实Map/Reduce应用程序。
输入与出口
Map/Reduce框架运营在<key, value> 键值对上,也正是说, 框架把作业的输入看为是一组<key, value> 键值对,同样也油然则生一组 <key, value> 键值对做为作业的出口,这两组键值对的连串也许分裂。
框架必要对key和value的类(classes)举行类别化操作, 由此,那么些类须要贯彻 Writable接口。 别的,为了便于框架推行排序操作,key类必得贯彻 WritableComparable接口。
贰个Map/Reduce 作业的输入和输出类型如下所示:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, Iterator<v2>> -> reduce -> <k3, v3> (output)
例子:WordCount v1.0
在浓重细节在此以前,让大家先看多个Map/Reduce的选择示范,以便对它们的做事格局有二个方始的认识。
WordCount是一个大致的使用,它能够总计出钦点数量汇总每一个单词出现的次数。
那些利用适用于 单机格局, 伪遍及式情势 或 完全分布式形式三种Hadoop安装格局。

1 configuration api

Hadoop 组件的配置利用 XML 方式的配置文件,何况能够利用 ${变量名} 的样式来使用此外品质的值,比方:

<?xml version="1.0"?>
<configuration>
  <property>
     <name>color</name> 
     <value>yellow</value>
     <description>Color</description>
  </property>
  <property>
    <name>size</name>
    <value>10</value>         
    <description>Size</description>
  </property>

  <property>
    <name>weight</name>
    <value>heavy</value>
    <final>true</final>     
    <description>Weight</description>
  </property>

  <property>
    <name>size-weight</name>
    <value>${size},${weight}</value>       
    <description>Size and weight</description>
  </property>
</configuration>

如此那般可以接纳 Configuration 类来读取数据:

Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
assertThat(conf.get("color"), is("yellow")); 
assertThat(conf.getInt("size", 0), is(10)); 
assertThat(conf.get("breadth", "wide"), is("wide"));

也足以增进多少个布局文件:

Configuration conf = new Configuration(); 
conf.addResource("configuration-1.xml");
conf.addResource("configuration-2.xml");

1  运作情状表达... 3

package org.myorg; 

import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class WordCount { 

   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 
     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 

     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
       String line = value.toString(); 
       StringTokenizer tokenizer = new StringTokenizer(line); 
       while (tokenizer.hasMoreTokens()) { 
         word.set(tokenizer.nextToken()); 
         output.collect(word, one); 
       } 
     } 
   } 

   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 
     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
       int sum = 0; 
       while (values.hasNext()) { 
         sum += values.next().get(); 
       } 
       output.collect(key, new IntWritable(sum)); 
     } 
   } 

   public static void main(String[] args) throws Exception { 
     JobConf conf = new JobConf(WordCount.class); 
     conf.setJobName("wordcount"); 

     conf.setOutputKeyClass(Text.class); 
     conf.setOutputValueClass(IntWritable.class); 

     conf.setMapperClass(Map.class); 
     conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(Reduce.class); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(conf, new Path(args[0])); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

     JobClient.runJob(conf); 
   } 
} 

2 开垦情形的搭建

hadoop 的 IO 可以应用种种文件系统,所以能够允许在开垦景况、本地意况以至集群遇到。开荒条件下得以接纳Maven 方便获取相关的库:

 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.5.1</hadoop.version>
  </properties>
  <dependencies>
    <!-- Hadoop main client artifact -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <version>${hadoop.version}</version>
    </dependency>
    <!-- Unit test artifacts -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.mrunit</groupId>
      <artifactId>mrunit</artifactId>
      <version>1.1.0</version>
      <classifier>hadoop2</classifier>
      <scope>test</scope>
    </dependency>
    <!-- Hadoop test artifact for running mini clusters -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-minicluster</artifactId>
      <version>${hadoop.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <finalName>hadoop-examples</finalName>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.5</version>
        <configuration>
          <outputDirectory>${basedir}</outputDirectory>
        </configuration>
      </plugin>
    </plugins>
  </build>

为了切换开荒、本地和集群遇到,大家来创建四个布局文件:

(1)使用当和姑件系统的费用条件:

<?xml version="1.0"?>
<configuration>
    <property> 
      <name>fs.defaultFS</name>
      <value>file:///</value>
    </property>
    <property>
      <name>mapreduce.framework.name</name>       
      <value>local</value>
    </property>
</configuration>

(2)本地伪单机:

<?xml version="1.0"?>
<configuration>
    <property> 
      <name>fs.defaultFS</name>
      <value>hdfs://localhost:9000/</value>
    </property>
    <property>
      <name>mapreduce.framework.name</name>       
      <value>yarn</value>
    </property>
    <property>
       <name>yarn.resourcemanager.address</name>         
       <value>localhost:8032</value>
    </property>
</configuration>

(3) 集群:

<configuration>
    <property> 
      <name>fs.defaultFS</name>
      <value>hdfs://namenode/</value>
    </property>
    <property>
      <name>mapreduce.framework.name</name>       
      <value>yarn</value>
    </property>
    <property>
       <name>yarn.resourcemanager.address</name>         
       <value>resourcemanager:8032</value>
    </property>
</configuration>

如此那般,就足以使用 -conf 选项选拔使用的配备文件了:

hadoop fs -conf conf/hadoop-localhost.xml -ls .

设若未有 -conf,则会读取 HADOOP_HOME 下的文书夹 etc/hadoop 下的配置文件。

除此以外一种方法是将 $HADOOP_HOME/etc/hadoop 下的文本拷贝到其余文件夹,然后设置 HADOOP_CONF_DIR 来切换情状。

1.1 硬软件景况... 3

如今在做多少剖判的时候。须要在mapreduce中调用c语言写的接口,此时就必得把动态链接库so文件分发到hadoop的逐条节点上,原本想自身来做那个分发,大致进度便是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地。但查询资料后发觉hadoop有相应的零部件来帮衬大家完结这么些操作,这一个组件便是DistributedCache,布满式缓存。运用那个东西能够变成第三方文件的分发和缓存成效,下边具体解释:

用法
如果条件变量HADOOP_HOME对应安装时的根目录,HADOOP_VEEscortSION对应Hadoop的近来设置版本,编写翻译WordCount.java来创立jar包,可正如操作:
$ mkdir wordcount_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .
假设:
• /usr/joe/wordcount/input - 是HDFS中的输入路线
• /usr/joe/wordcount/output - 是HDFS中的输出路径
用示例文本文件做为输入:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02

3 使用 mrunit 开荒单元测量试验

在 test 目录下生成测量检验类,先来测量检验二个不应当出口任何结果的,这里 MapperDriver 类不带别的withOutput,就是指未有出口,有多少个出口就对应几个withOutput:

import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Test;

public class MaxTemperatureMapperTest {
  @Test
  public void ignoresMissingTemperatureRecord() throws IOException, InterruptedException {
    Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" + // Year
        "99999V0203201N00261220001CN9999999N9+99991+99999999999"); // Temperature
    // 没有 withOutput 所以该测试要通过必须没有输出
    new MapDriver<LongWritable, Text, Text, IntWritable>()
        .withMapper(new v2.MaxTemperatureMapper())
        .withInput(new LongWritable(0), value)
        .runTest();
  }
}

然后加上贰个 reducer 的测量试验:

@Test
@Test
  public void returnsMaximumIntegerInValues() throws IOException, InterruptedException {
    new ReduceDriver<Text, IntWritable, Text, IntWritable>().withReducer(
        new v2.MaxTemperatureReducer())
        .withInput(new Text("1950"),
            Arrays.asList(new IntWritable(10), new IntWritable(5)))
        .withOutput(new Text("1950"), new IntWritable(10))
        .runTest();
  }

给 一九四六 年输入 10 和 5,应该输出 一九五零 10 那样的结果(mapper 和 reducer 正是前方计算天气的)。

通过测量检验后,我们就能够在本地的小数码集上运维程序测量检验了,那样相比较好 debug。这里大家承接了 Configured 类,达成了 Tool 接口,并接纳 ToolRunner 运营:

public class MaxTemperatureDriver extends Configured implements Tool {
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>n",
          getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
    Job job = Job.getInstance(getConf(), "Max temperature");
    job.setJarByClass(getClass());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setCombinerClass(MaxTemperatureReducer.class);
    job.setReducerClass(MaxTemperatureReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
    System.exit(exitCode);
  }
}

此间 ToolRunner 主要基于命令行参数剖判出 Configuration,Configured 使该类能够 getConf 和 setConf,Tool 主借使提供了 run 方法。上边包车型大巴代码是 ToolRunner 调用 GenericOptionsParser 来剖判配置文件:

public static int run(Configuration conf, Tool tool, String[] args) 
    throws Exception{
    if(conf == null) {
      conf = new Configuration();
    }
    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
    //set the configuration back, so that Tool can configure itself
    tool.setConf(conf);

    //get the args w/o generic hadoop args
    String[] toolArgs = parser.getRemainingArgs();
    return tool.run(toolArgs);
  }

接下来就来执行了:

% mvn compile
% export HADOOP_CLASSPATH=target/classes/
% hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml 
      input/ncdc/micro output

或不点名安排文件,使用 -fs 钦定文件系统, -jt 钦定 yarn实践:

//local 指使用本地开发环境
hadoop v2.MaxTemperatureDriver -fs file:/// -jt local input/ncdc/micro output

测试

@Test
  public void test() throws Exception {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "file:///");
    conf.set("mapreduce.framework.name", "local");
    conf.setInt("mapreduce.task.io.sort.mb", 1);
    Path input = new Path("input/sample");
    Path output = new Path("output");
    FileSystem fs = FileSystem.getLocal(conf);
    fs.delete(output, true); // delete old output
    MaxTemperatureDriver driver = new MaxTemperatureDriver();
    driver.setConf(conf);
    int exitCode = driver.run(new String[] {input.toString(), output.toString()});
    assertThat(exitCode, is(0));
    //一行一行检查输出是否正确
    checkOutput(conf, output);
  }

1.2 机器互连网景况... 3

假定大家供给在map之间分享一些数量,假使新闻量一点都不大,大家能够维持在conf中。不过要是大家不可能不分享一些布署文件,jar包之类的。此时DistributedCache能够知足大家的急需,使用DistributedCache的长河譬喻以下:

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World Bye World

4 在集群上运营 hadoop

通过了测验,获得了不利的结果之后,将要真枪真刀的在集群上运转程序了。要在集群上运维,必须先打成叁个jar 包,上面的 pom 文件中采纳 maven-jar-plugin 来进展包装,使用 maven 命令 mvn package -DskipTests 即可。借使不在 manifest 中 钦点 main class 的话,记得运维时在命令行中要钦定主类。为了有支持,能够像 war 包同样,把正视的 jar 全体包裹到 运行 jar 包的 lib 目录下,把布署文件打包到运营 jar 包的 classes 目录下,那一个用 maven 插件都非常粗大略。

顾客的 classpath 由上边三部分构成:

  1. job JAR;
  2. job JAR 下面的 lib 目录和 classes 目录;
  3. 碰到变量 HADOOP_CLASSPATH 钦点的目录。

在集群中,有所调换,HADOOP_CLASSPATH 不再生效,因为它独有对 driver 运转的 JVM 有效。:

  1. job JAR;
  2. job JAR 下面的 lib 目录和 classes 目录;
  3. 另外利用命令行参数 -libjars 参加布满式缓存的文本

因此运营时有几种方法:

  1. 把 jar 包解压缩然后打包进运维的 jar 中;
  2. 把 jar 包打包进 运营 jar 包的 lib 目录下;
  3. 使用 HADOOP_CLASSPATH 将依靠参加 client 的 classpath 中,然后用 -libjars 命令将其投入分布式缓存中。

在客户侧,可以安装境况变量 HADOOP_USER_CLASSPATH_FICRUISERST 来让客商选取的库被优用;在集群中,能够安装 mapreduce.job.user.classpath.first 为 true 来让客户的库被事先利用。

运营上面包车型大巴一声令下能够提交贰个职务到 hadoop 集群运维:

hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver -fs hdfs://192.168.0.133:9000 -jt 192.168.0.133:8032 file:///home/hadoop/input max-temp

是因为 input 是当半夏件,所以加上了 file:/// 前缀。

谈到底输出:

    File System Counters
        FILE: Number of bytes read=46341975
        FILE: Number of bytes written=317163
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=176
        HDFS: Number of bytes written=27
        HDFS: Number of read operations=7
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=27621
        Total time spent by all reduces in occupied slots (ms)=13410
        Total time spent by all map tasks (ms)=27621
        Total time spent by all reduce tasks (ms)=13410
        Total vcore-seconds taken by all map tasks=27621
        Total vcore-seconds taken by all reduce tasks=13410
        Total megabyte-seconds taken by all map tasks=28283904
        Total megabyte-seconds taken by all reduce tasks=13731840
    Map-Reduce Framework
        Map input records=211054
        Map output records=208834
        Map output bytes=1879506
        Map output materialized bytes=56
        Input split bytes=176
        Combine input records=208834
        Combine output records=4
        Reduce input groups=3
        Reduce shuffle bytes=56
        Reduce input records=4
        Reduce output records=3
        Spilled Records=8
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=456
        CPU time spent (ms)=8530
        Physical memory (bytes) snapshot=660267008
        Virtual memory (bytes) snapshot=5742837760
        Total committed heap usage (bytes)=464519168
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=46341925
    File Output Format Counters
        Bytes Written=27

2  封面作业1:总括职员和工人相关... 3

1.科学配置被分发的公文的不二等秘书诀

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
运作应用程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
输出是:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
应用程序能够运用-files选项来钦命一个由逗号分隔的不二秘技列表,那么些渠道是task的当前职业目录。使用选拔-libjars能够向map和reduce的classpath中增加jar包。使用-archives选项顺序能够传递档案文件做为参数,那个档案文件会被解压而且在task的当前专门的工作目录下会创设四个对准解压生成的目录的标志链接(以压缩包的名字命名)。 有关命令行选项的更加的多细节请参见 Commands manual。
使用-libjars和-files运行wordcount例子:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output
解释
WordCount应用程序非常行动坚决果决。
Mapper(14-26行)中的map方法(18-25行)通过点名的 TextInputFormat(49行)叁回拍卖一行。然后,它经过StringTokenizer 以空格为分隔符将一行切分为若干tokens,之后,输出< <word>, 1> 情势的键值对。
对于示例中的首个输入,map输出是:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
其次个输入,map输出是:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
有关组成二个钦命作业的map数目标鲜明,以致如何以更加小巧的秘技去决定那个map,大家将在科指标接轨部分学习到越来越多的开始和结果。
WordCount还点名了三个combiner (46行)。由此,每趟map运营之后,会对输出依照key举行排序,然后把出口传递给本地的combiner(根据作业的配备与Reducer同样),举办地面聚合。
先是个map的输出是:
< Bye, 1>
< Hello, 1>
< World, 2>
其次个map的输出是:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
代码中的run方法中钦定了作业的多少个地点, 比如:通过命令行传递过来的输入/输出路线、key/value的体系、输入/输出的格式等等JobConf中的配置新闻。随后程序调用了JobClient.runJob(55行)来交付作业况且监察和控制它的实施。
咱俩将要本教程的存在延续部分学习越来越多的关于JobConf, JobClient, Tool和任何接口及类(class)。

2.1 书面作业1内容... 3

2.在和谐定义的mapper或reducer中获得文件下载到本地后的门路(linux文件系统路线);通常是重写configure可能重写setup

Map/Reduce 客商分界面
这一部分文书档案为顾客将会面对的Map/Reduce框架中的各类环节提供了妥善的细节。那应该会推推搡搡客商越来越细粒度地去落实、配置和调优作业。然而,请在意每一种类/接口的javadoc文书档案提供最完善的文书档案;本文只是想起到指南的效劳。
咱俩会先看看Mapper和Reducer接口。应用程序平日会通过提供map和reduce方法来促成它们。
接下来,我们会谈论别的的主干接口,此中囊括: JobConf,JobClient,Partitioner, OutputCollector,Reporter, InputFormat,OutputFormat等等。
末尾,大家将经过座谈框架中有的实用的作用点(比方:DistributedCache, IsolationRunner等等)来终结。
骨干作用描述
应用程序平常会通过提供map和reduce来完毕Mapper和Reducer接口,它们组成作业的骨干。
Mapper
Mapper将输入键值对(key/value pair)映射到一组中间格式的键值对聚集。
Map是一类将输入记录集调换为中等格式记录集的独门职责。 这种转移的中级格式记录集不需求与输入记录集的品种一致。三个加以的输入键值对能够映射成0个或多少个出口键值对。
Hadoop Map/Reduce框架为每贰个InputSplit爆发三个map职责,而各种InputSplit是由该学业的InputFormat产生的。
归纳地说,对Mapper的实现者供给重写 JobConfigurable.configure(JobConf)方法,那个措施必要传递三个JobConf参数,指标是成就Mapper的开头化工作。然后,框架为那几个职分的InputSplit中各个键值对调用二遍map(WritableComparable, Writable, OutputCollector, Reporter)操作。应用程序能够经过重写Closeable.close()方法来实行相应的清理专门的工作。
出口键值对无需与输入键值对的门类一致。多个加以的输入键值对能够映射成0个或几个出口键值对。通过调用 OutputCollector.collect(WritableComparable,Writable)可以搜罗输出的键值对。
应用程序可以接纳Reporter报告进程,设定应用等第的情况新闻,更新Counters(计数器),也许仅是标识自个儿运营如常。
框架随后会把与二个一定key关联的具备中等进程的值(value)分成组,然后把它们传给Reducer以产出最终的结果。客户能够通过 JobConf.setOutputKeyComparatorClass(Class)来钦点具体担负分组的 Comparator。
Mapper的输出被排序后,就被分割给各类Reducer。分块的总量据和二个学业的reduce任务的多少是一律的。顾客能够经过落到实处自定义的 Partitioner来调整哪个key被分配给哪个 Reducer。
客户可挑选通过 JobConf.setCombinerClass(Class)钦命四个combiner,它承担对中间经过的输出举行本地的集结,那会带动收缩从Mapper到 Reducer数据传输量。
这个被排好序的中等进程的输出结果保存的格式是(key-len, key, value-len, value),应用程序能够通过JobConf调节对这么些中级结果是或不是开展削减以致怎么收缩,使用哪种CompressionCodec。
内需某些个Map?
Map的数据平常是由输入数据的分寸决定的,日常就是全部输入文件的总块(block)数。
Map符合规律的并行规模大概是每种节点(node)大概10到玖拾玖个map,对于CPU 消耗极小的map任务能够设到300个左右。由于每一种职务起头化供给一定的光阴,因而,相比合理的气象是map实施的时光起码超过1秒钟。
诸如此比,要是你输入10TB的多少,每种块(block)的轻重是128MB,你将急需大概82,000个map来完成职分,除非动用 setNumMapTasks(int)(注意:这里仅仅是对框架举行了叁个晋升(hint),实际调整因素见这里)将以此数值设置得更加高。
Reducer
Reducer将与叁个key关联的一组中间数值集归约(reduce)为贰个越来越小的数值集。
客户能够通过 JobConf.setNumReduceTasks(int)设定贰个功课中reduce职分的多寡。
回顾地说,对Reducer的完成者供给重写 JobConfigurable.configure(JobConf)方法,这些方法需求传递多个JobConf参数,指标是成就Reducer的起始化职业。然后,框架为成组的输入数据中的各个<key, (list of values)>对调用叁回 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,应用程序能够通过重写Closeable.close()来施行相应的清理职业。
Reducer有3个至关心珍重要阶段:shuffle、sort和reduce。
Shuffle
Reducer的输入就是Mapper已经排好序的输出。在此个阶段,框架通过HTTP为每一个Reducer获得全数Mapper输出中与之相关的分块。
Sort
这么些等第,框架将规行矩步key的值对Reducer的输入进行分组 (因为区别mapper的输出中也许会有同等的key)。
Shuffle和Sort多少个级次是同一时候张开的;map的输出也是单方面被取回一边被联合的。
Secondary Sort
要是须求中间经过对key的分组法则和reduce前对key的分组法规不一,那么能够透过 JobConf.setOutputValueGroupingComparator(Class)来钦命一个Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用以调节中间进度的key怎么着被分组,所以结合双方能够兑现按值的二遍排序。
Reduce
在此个阶段,框架为已分组的输入数据中的每一种 <key, (list of values)>对调用二次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
Reduce职责的出口平时是经过调用 OutputCollector.collect(WritableComparable, Writable)写入 文件系统的。
应用程序能够动用Reporter报告进程,设定应用程序级其他气象消息,更新Counters(计数器),可能仅是标识本身运转如常。
Reducer的出口是从未排序的。
内需多少个Reduce?
Reduce的多寡提议是0.95或1.75倍增 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。
用0.95,全体reduce能够在maps一做到时就及时运转,伊始传输map的输出结果。用1.75,速度快的节点能够在做到次轮reduce职分后,能够初始第一轮,那样能够获得相比较好的负载均衡的功效。
追加reduce的数据会追加全数框架的付出,但足以改良负载均衡,减少由于进行停业带来的负面影响。
上述比例因子比总体数量稍小一些是为了给框架中的猜测性任务(speculative-tasks) 或退步的天职留给部分reduce的财富。
无Reducer
若果未有归约要拓宽,那么设置reduce职务的多寡为零是合法的。
这种景观下,map义务的输出会直接被写入由 setOutputPath(Path)内定的输出路线。框架在把它们写入FileSystem在此之前未有对它们举行排序。
Partitioner
Partitioner用于私分键值空间(key space)。
Partitioner担当调整map输出结果key的分开。Key(恐怕四个key子集)被用来产面生区,常常选拔的是Hash函数。分区的数目与三个作业的reduce职分的数码是大同小异的。由此,它调整将中等进程的key(约等于那条记下)应该发送给m个reduce职分中的哪三个来扩充reduce操作。
HashPartitioner是暗中同意的 Partitioner。
Reporter
Reporter是用于Map/Reduce应用程序报告进度,设定应用等级的境况音信, 更新Counters(计数器)的体制。
Mapper和Reducer的贯彻能够动用Reporter 来报告进度,大概仅是申明本身运营如常。在此种应用程序供给花相当短日子管理各自键值对的光景中,这种机制是很注重的,因为框架大概会以为那个职分超时了,进而将它强行杀死。另四个制止这种情景发生的办法是,将配置参数mapred.task.timeout设置为一个十足高的值(只怕干脆设置为零,则并未有过期限制了)。
应用程序能够用Reporter来更新Counter(计数器)。
OutputCollector (outputcollector 使用旧API,供给区分新旧API)
OutputCollector是叁个Map/Reduce框架提供的用于收集Mapper或Reducer输出数据的通用机制 (富含中间输出结果和学业的输出结果)。
Hadoop Map/Reduce框架附带了一个包涵众多实用型的mapper、reducer和partitioner 的类库。
学业配置
JobConf代表贰个Map/Reduce作业的安插。
JobConf是客商向Hadoop框架描述贰个Map/Reduce作业怎么样实行的要紧接口。框架会遵照JobConf描述的音讯忠实地去尝试完毕那一个作业,但是:
• 一些参数大概会被领导标识为 final,那意味它们不可能被改换。
• 一些学业的参数能够被直截了本地开展设置(举个例子: setNumReduceTasks(int)),而另一些参数则与框架恐怕作业的其他参数之间微妙地相互影响,而且安装起来比较复杂(譬喻: setNumMapTasks(int))。
常见,JobConf会指明Mapper、Combiner(如若局地话)、 Partitioner、Reducer、InputFormat和 OutputFormat的现实性达成。JobConf还是能够钦命一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以至出口文件应当写在何方 (setOutputPath(Path))。
JobConf可选拔地对作业设置有个别高等选项,举例:设置Comparator; 放到DistributedCache上的文本;中间结果可能作业输出结果是还是不是必要减弱乃至怎么压缩; 利用顾客提供的台本(setMapDebugScript(String)/setReduceDebugScript(String)) 进行调治将养;作业是不是允许卫戍性(speculative)职务的实践(setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;各个任务最大的品尝次数 (set马克斯MapAttempts(int)/set马克斯ReduceAttempts(int)) ;三个功课能隐忍的天职失败的百分比 (set马克斯MapTaskFailuresPercent(int)/set马克斯ReduceTaskFailuresPercent(int)) ;等等。
本来,客商能利用 set(String, String)/get(String, String) 来设置恐怕获得应用程序需求的人身自由参数。不过,DistributedCache的使用是面向左近只读数据的。
任务的推行和情况
TaskTracker是在一个单独的jvm上以子进度的格局进行Mapper/Reducer职责(Task)的。
子任务会接二连三父TaskTracker的条件。顾客能够透过JobConf中的 mapred.child.java.opts配置参数来设定子jvm上的增公投项,举例: 通过-Djava.library.path=<> 将四个非标准路线设为运转时的链接用以寻找分享库,等等。要是mapred.child.java.opts富含一个标志@taskid@, 它会被替换来map/reduce的taskid的值。
上面是八个包蕴多个参数和替换的例子,当中蕴涵:记录jvm GC日志; JVM JMX代理程序以无密码的办法运行,那样它就能够连接受jconsole上,进而能够查看子进度的内部存款和储蓄器和线程,获得线程的dump;还把子jvm的最大堆尺寸设置为512MB, 并为子jvm的java.library.path增添了多个增大路线。

2.2  完成进程... 4

3.在和睦定义的mapper或reducer类中读取这一个文件的剧情

<property> 
  <name>mapred.child.java.opts</name> 
  <value> 
     -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc 
     -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false 
  </value> 
</property> 

2.2.1   计划测量试验数据... 4

上面以代码表达三个步骤:

客商或领队也能够动用mapred.child.ulimit设定运维的子任务的最大虚构内部存款和储蓄器。mapred.child.ulimit的值以(KB)为单位,並且必得超越或等于-Xmx参数字传送给JavaVM的值,不然VM会不可能运维。
瞩目:mapred.child.java.opts只用于安装task tracker运转的子职责。为守护进度设置内存选项请查看 cluster_setup.html
${mapred.local.dir}/taskTracker/是task tracker的本地目录, 用于创制本地缓存和job。它能够钦赐多少个目录(超过七个磁盘),文件会半随机的保留到地点路线下的某部目录。当job运维时,task tracker依照安顿文书档案成立本地job目录,目录结构如以下所示:
• ${mapred.local.dir}/taskTracker/archive/ :分布式缓存。那些目录保存本地的分布式缓存。由此地点分布式缓存是在全体task和job间分享的。
• ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job内定的分享目录。各类任务能够选用那么些空间做为暂存空间,用于它们中间分享文件。这几个目录通过job.local.dir 参数揭示给顾客。那些渠道能够经过API JobConf.getJobLocalDir()来拜望。它也足以被做为系统本性拿到。由此,客商(例如运营streaming)能够调用System.getProperty("job.local.dir")得到该目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 寄存jar包的门道,用于存放作业的jar文件和展开的jar。job.jar是应用程序的jar文件,它会被电动分发到各台机器,在task运行前会被活动实行。使用api JobConf.getJar() 函数能够拿走job.jar的职位。使用JobConf.getJar().getParent()可以访谈寄放展开的jar包的目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一个job.xml文件,本地的通用的功课配置文件。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 种种义务有一个目录task-id,它里面有如下的目录结构:
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 二个job.xml文件,本地化的职责作业配置文件。职务本地化是指为该task设定一定的属性值。那一个值会在上面具体表达。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 四个寄放中间经过的出口文件的目录。它保存了由framwork发生的有时map reduce数据,比如map的出口文件等。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的当前专门的学业目录。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的权且目录。(客商能够设定属性mapred.child.tmp 来为map和reduce task设定有时目录。缺省值是./tmp。假诺这几个值不是相对路线, 它会把task的工作渠道加到该路径后边作为task的有时文件路线。借使那个值是相对路线则直接采纳那个值。 假若内定的目录不设有,会自动成立该目录。之后,依据选项 -Djava.io.tmpdir='一时文件的相对路径'实施java子职分。 pipes和streaming的有时文件路线是因而遭遇变量TMPDIQashqai='the absolute path of the tmp dir'设定的)。 如若mapred.child.tmp有./tmp值,这些目录会被创建。
上面的天性是为种种task实践时行使的本土参数,它们保存在本地化的职分作业配置文件里:
• ${mapred.local.dir}/taskTracker/archive/ :分布式缓存。那么些目录保存本地的布满式缓存。因而地点布满式缓存是在颇有task和job间分享的。
• ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job钦赐的分享目录。各种职务能够应用这一个空间做为暂存空间,用于它们中间分享文件。这几个目录通过job.local.dir 参数暴光给客商。那个路子能够通过API JobConf.getJobLocalDir()来访谈。它也得以被做为系统质量获得。因而,顾客(比如运营streaming)能够调用System.getProperty("job.local.dir")获得该目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 贮存jar包的路线,用于贮存作业的jar文件和实行的jar。job.jar是应用程序的jar文件,它会被活动分发到各台机械,在task运行前会被自动实行。使用api JobConf.getJar() 函数能够获得job.jar的地点。使用JobConf.getJar().getParent()能够访谈寄放张开的jar包的目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 二个job.xml文件,本地的通用的作业配置文件。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每一个职责有多少个目录task-id,它里面有如下的目录结构:
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 二个job.xml文件,本地化的天职作业配置文件。职分本地化是指为该task设定一定的属性值。那个值会在上面具体表达。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 三个存放中间经过的出口文件的目录。它保存了由framwork发生的有的时候map reduce数据,举个例子map的出口文件等。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的当前工作目录。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的暂且目录。(顾客能够设定属性mapred.child.tmp 来为map和reduce task设定临时目录。缺省值是./tmp。假如这些值不是相对路线, 它会把task的行事路径加到该路径前面作为task的一时文件路线。假设那些值是相对路线则直接行使这几个值。 假诺钦赐的目录不设有,会自行创设该目录。之后,遵照选项 -Djava.io.tmpdir='有时文件的断然路线'奉行java子职分。 pipes和streaming的有时文件路线是由此情状变量TMPDI福特Explorer='the absolute path of the tmp dir'设定的)。 假使mapred.child.tmp有./tmp值,那一个目录会被创立。
上边包车型地铁性情是为各种task实践时选取的地方参数,它们保存在本地化的职务作业配置文件里:
• ${mapred.local.dir}/taskTracker/archive/ :布满式缓存。这么些目录保存本地的布满式缓存。因而本地分布式缓存是在有着task和job间分享的。
• ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job钦赐的分享目录。各种义务能够使用这些空间做为暂存空间,用于它们之间分享文件。那一个目录通过job.local.dir 参数揭露给顾客。这么些门路能够由此API JobConf.getJobLocalDir()来做客。它也得以被做为系统个性获得。因而,客商(例如运营streaming)能够调用System.getProperty("job.local.dir")获得该目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 寄存jar包的门道,用于存放作业的jar文件和扩充的jar。job.jar是应用程序的jar文件,它会被电动分发到各台机器,在task运营前会被活动进行。使用api JobConf.getJar() 函数能够拿走job.jar的职位。使用JobConf.getJar().getParent()能够采访寄放展开的jar包的目录。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 三个job.xml文件,本地的通用的功课配置文件。
o ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 各种职务有四个目录task-id,它此中有如下的目录结构:
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 三个job.xml文件,本地化的职责作业配置文件。任务本地化是指为该task设定特定的属性值。这一个值会在上面具体表达。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一个存放中间经过的出口文件的目录。它保存了由framwork产生的一时map reduce数据,举个例子map的出口文件等。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的当前职业目录。
 ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的一时半刻目录。(用户能够设定属性mapred.child.tmp 来为map和reduce task设定临时目录。缺省值是./tmp。假使那么些值不是绝对路线, 它会把task的行事路线加到该路径前边作为task的有的时候文件路径。借使那个值是绝对路线则直接行使那一个值。 要是钦赐的目录不设有,会自行创制该目录。之后,遵照选项 -Djava.io.tmpdir='有时文件的断然路线'实践java子职务。 pipes和streaming的一时文件路线是经过碰到变量TMPDI途胜='the absolute path of the tmp dir'设定的)。 如若mapred.child.tmp有./tmp值,这几个目录会被创设。
上面包车型客车性质是为各样task实施时利用的地点参数,它们保存在本地化的天职作业配置文件里:
名称 类型 描述
mapred.job.id String job id
mapred.jar String job目录下job.jar的位置
job.local.dir String job钦点的分享存款和储蓄空间
mapred.tip.id String task id
mapred.task.id String task尝试id
mapred.task.is.map boolean 是否是map task
mapred.task.partition int task在job中的id
map.input.file String map读取的公文名
map.input.start long map输入的数据块的带头地点偏移
map.input.length long map输入的数据块的字节数
mapred.work.output.dir String task一时输出目录
task的正规化输出和不当输出流会被读到TaskTracker中,而且记录到 ${HADOOP_LOG_DIR}/userlogs
DistributedCache 可用于map或reduce task中分发jar包和本地库。子jvm总是把 当前职业目录 加到 java.library.path 和 LD_LIBRARY_PATH。 因而,可以透过 System.loadLibrary或 System.load装载缓存的库。有关使用分布式缓存加载分享库的内情请参见 native_libraries.html
学业的提交与监察和控制
JobClient是客商提交的课业与JobTracker交互的主要接口。
JobClient 提供提交作业,追踪进度,访问子职分的日记记录,获得Map/Reduce集群状态新闻等功效。
学业提交进程包涵:

2.2.2   难点1:求各种部门的总报酬... 5

1.Configuration conf = new Configuration();

  1. 反省作业输入输出样式细节
  2. 为作业总结InputSplit值。
  3. 只要需求的话,为作业的DistributedCache创设必需的总结消息。
  4. 拷贝作业的jar包和计划文件到FileSystem上的Map/Reduce系统目录下。
  5. 付出作业到JobTracker並且监察和控制它的事态。
    学业的历史文件记录到钦点目录的"_logs/history/"子目录下。那些钦赐目录由hadoop.job.history.user.location设定,默许是学业输出的目录。由此暗许情状下,文件会存放在mapred.output.dir/_logs/history目录下。客户可以设置hadoop.job.history.user.location为none来终止日志记录。
    客户选择上面包车型地铁吩咐能够看看在钦点目录下的历史日志记录的摘要。
    $ bin/hadoop job -history output-dir
    本条命令会打字与印刷出作业的内情,以至失利的和被杀死的任务细节。
    要翻看有关作业的越多细节比如成功的天职、每一种任务尝试的次数(task attempt)等,能够利用上面包车型地铁指令
    $ bin/hadoop job -history all output-dir
    客商能够行使 OutputLogFilter 从出口目录列表中筛选日志文件。
    相似情况,客商选取JobConf制造应用程序并安插作业属性, 然后用 JobClient 提交作业并监视它的进程。
    学业的主宰
    有的时候,用三个独立的Map/Reduce作业并不能一气浑成三个叶影参差的天职,顾客或然要链接五个Map/Reduce作业才行。那是便于达成的,因为作业日常输出到布满式文件系统上的,所以能够把这几个作业的出口作为下一个功课的输入达成串联。
    唯独,这也象征,确认保证每一功课成功(成功或退步)的权力和义务就一直落在了客户身上。在这里种场地下,能够用的操纵作业的选项有:
    • runJob(JobConf):提交作业,仅充作业完结时再次来到。
    • submitJob(JobConf):只交给作业,之后必要你轮询它回到的 RunningJob句柄的气象,并依附事态调治。
    • JobConf.setJobEndNotificationUCRUISERI(String):设置贰个功课成功公告,可防止轮询。
    作业的输入
    InputFormat 为Map/Reduce作业描述输入的内部原因标准。
    Map/Reduce框架依据作业的InputFormat来:
  6. 自己争辩作业输入的立见成效。
  7. 把输入文件切分成七个逻辑InputSplit实例, 并把每一实例分别分发给八个Mapper。
  8. 提供RecordReader的落到实处,那么些RecordReader从逻辑InputSplit中获取输入记录, 这一个记录将由Mapper管理。
    基于文件的InputFormat达成(平常是 FileInputFormat的子类) 暗许行为是按部就公输子入文件的字节大小,把输入数据切分成逻辑分块(logical InputSplit )。 其中输入文件所在的FileSystem的数据块尺寸是分块大小的上限。下限能够安装mapred.min.split.size 的值。
    思虑到分界意况,对于众多应用程序来讲,很分明服从文件大小举办逻辑分割是不可能知足需要的。 在此种气象下,应用程序需求完结三个RecordReader来管理记录的界限并为每一个任务提供叁个逻辑分块的面向记录的视图。
    TextInputFormat 是默许的InputFormat。
    若果一个作业的Inputformat是TextInputFormat, 并且框架检查测量检验到输入文件的后缀是.gz和.lzo,就能够选取相应的CompressionCodec自动解压缩这几个文件。 但是必要当心,上述带后缀的压缩文件不会被切分,并且整个压缩文件会分给多个mapper来管理。
    InputSplit
    InputSplit 是多个独自的Mapper要拍卖的数据块。
    平日的InputSplit 是字节样式输入,然后由Record里德r管理并转化成记录样式。
    FileSplit 是暗许的InputSplit。 它把 map.input.file 设定为输入文件的不二诀窍,输入文件是逻辑分块文件。
    RecordReader
    RecordReader 从InputSlit读入<key, value>对。
    相似的,RecordReader 把由InputSplit 提供的字节样式的输入文件,转化成由Mapper管理的笔录样式的文书。 因而RecordReader担任管理记录的边界情形和把数据表示成keys/values对方式。
    作业的出口
    OutputFormat 描述Map/Reduce作业的出口样式。
    Map/Reduce框架根据作业的OutputFormat来:
  9. 检查作业的输出,举例检查输出路线是不是曾经存在。
  10. 提供一个RecordWriter的完毕,用来输出作业结果。 输出文件保留在FileSystem上。
    TextOutputFormat是暗中同意的 OutputFormat。
    任务的Side-Effect File
    在有的应用程序中,子任务急需发出部分side-file,那几个文件与学业实际出口结果的文件分歧。
    在此种意况下,同一个Mapper或许Reducer的七个实例(比方防卫性任务)同有时候张开只怕写 FileSystem上的一样文件就能够发生冲突。因而应用程序在写文件的时候供给为每一趟职责尝试(不唯有是每一遍职分,各种职务能够尝尝施行很频仍)采取三个无比的文件名(使用attemptid,举个例子task_200709221812_0001_m_000000_0)。
    为了幸免矛盾,Map/Reduce框架为每便尝试进行任务都建会谈保证一个不一样通常的 ${mapred.output.dir}/temporary/${taskid}子目录,这些目录位于此次尝试进行职分输出结果所在的FileSystem上,可以经过 ${mapred.work.output.dir}来拜望那么些子目录。 对于成功做到的天职尝试,唯有${mapred.output.dir}/temporary/${taskid}下的文件会移动到${mapred.output.dir}。当然,框架会吐弃那多少个失败的天职尝试的子目录。这种管理进程对于应用程序来讲是全然透明的。
    在职务执行时期,应用程序在写文件时方可行使这么些脾气,比方 通过 FileOutputFormat.getWorkOutputPath()获得${mapred.work.output.dir}目录, 并在其下成立大肆职责实施时所需的side-file,框架在职分尝试成功时会立刻移动这么些文件,由此无需在前后相继内为每一遍职务尝试挑选一个天下无双的名字。
    静心:在历次职务尝试施行时期,${mapred.work.output.dir} 的值实际上是 ${mapred.output.dir}/temporary/{$taskid},这几个值是Map/Reduce框架创制的。 所以使用那性子情的艺术是,在 FileOutputFormat.getWorkOutputPath() 路线下成立side-file就能够。
    对此只行使map不使用reduce的功课,那个结论也创建。这种状态下,map的出口结果一直扭转到HDFS上。
    RecordWriter
    RecordWriter 生成<key, value> 对到输出文件。
    RecordWriter的兑现把作业的输出结果写到 FileSystem。
    任何有效的特点
    Counters
    Counters 是七个由Map/Reduce框架或许应用程序定义的全局计数器。 每三个Counter能够是任何一种 Enum类型。同一特定Enum类型的Counter能够聚集到七个组,其项目为Counters.Group。
    应用程序可以定义自便(Enum类型)的Counters何况能够通过 map 或许reduce方法中的 Reporter.incrCounter(Enum, long)可能Reporter.incrCounter(String, String, long) 更新。之后框架汇集集这几个全局counters。
    DistributedCache
    DistributedCache 可将切实选取相关的、大尺寸的、只读的公文有效地布满放置。
    DistributedCache 是Map/Reduce框架提供的功用,能够缓存应用程序所需的公文 (包涵文件,档案文件,jar文件等)。
    应用程序在JobConf中通过url(hdfs://)钦点须求被缓存的文书。 DistributedCache假定由hdfs://格式url钦点的文书已经在 FileSystem上了。
    Map-Redcue框架在学业全数职责施行从前会把须求的公文拷贝到slave节点上。 它运转高效是因为各种作业的文件只拷贝一回况兼为那么些尚未文书档案的slave节点缓存文书档案。
    DistributedCache 依照缓存文书档案修改的岁月戳进行追踪。 在学业实行时期,当前应用程序只怕外界程序无法改改缓存文件。
    distributedCache能够分发轻便的只读数据或文本文件,也得以分发复杂类型的文书举例归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 那一个文件能够设置进行权限。
    顾客能够经过设置mapred.cache.{files|archives}来散发文件。 尽管要分发多少个公文,能够动用逗号分隔文件所在路线。也可以选择API来安装该属性: DistributedCache.addCacheFile(U奥迪Q5I,conf)/ DistributedCache.addCacheArchive(U福睿斯I,conf) and DistributedCache.setCacheFiles(U昂科威Is,conf)/ DistributedCache.setCacheArchives(U奥迪Q5Is,conf) 当中UHavalI的样式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,能够透过命令行选项 -cacheFile/-cacheArchive 分发文件。
    客户能够透过 DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下开创到缓存文件的符号链接。 也许通过安装配置文件属性mapred.create.symlink为yes。 分布式缓存会截取UOdysseyI的一对作为链接的名字。 举个例子,U汉兰达I是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前职业目录会盛名称为lib.so的链接, 它会链接分布式缓存中的lib.so.1。
    DistributedCache可在map/reduce职责中作为 一种基础软件分发机制使用。它能够被用来分发jar包和本土库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用来 缓存文件和jar包,并把它们投入子jvm的classpath。也得以透过安装配置文书档案里的品质mapred.job.classpath.{files|archives}达到平等的功力。缓存文件可用于分发和装载本地库。
    Tool
    Tool 接口辅助管理常用的Hadoop命令行选项。
    Tool 是Map/Reduce工具或利用的正经。应用程序应只管理其定制参数, 要把标准命令行选项通过 ToolRunner.run(Tool, String[]) 委托给 GenericOptionsParser处理。
    Hadoop命令行的常用选项有:
    -conf <configuration file>
    -D <property=value>
    -fs <local|namenode:port>
    -jt <local|jobtracker:port>
    IsolationRunner
    IsolationRunner 是匡助调度Map/Reduce程序的工具。
    利用IsolationRunner的法子是,首先设置 keep.failed.tasks.files属性为true (相同的时候参考keep.tasks.files.pattern)。
    接下来,登入到任务运转战败的节点上,步入 TaskTracker的地头路线运转IsolationRunner:
    $ cd <local path>/taskTracker/${taskid}/work
    $ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
    IsolationRunner会把停业的职分放在单独的三个能够调度的jvm上运维,并且动用和事先一模二样的输入数据。
    Profiling
    Profiling是二个工具,它选拔内置的java profiler工具进行深入分析得到(2-3个)map或reduce样例运转深入分析报告。
    顾客能够因而设置属性mapred.task.profile内定系统是或不是搜聚profiler音信。 利用api JobConf.setProfileEnabled(boolean)能够修改属性值。假使设为true, 则开启profiling功用。profiler音讯保存在客商日志目录下。缺省景色,profiling成效是关闭的。
    假设顾客设定使用profiling功用,能够应用计划文书档案里的性质 mapred.task.profile.{maps|reduces} 设置要profile map/reduce task的限量。设置该属性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范围的缺省值是0-2。
    顾客能够经过设定配置文书档案里的品质mapred.task.profile.params 来钦点profiler配置参数。修改属性要使用api JobConf.setProfileParams(String)。当运营task时,假使字符串包括%s。 它会被替换到profileing的出口文件名。这几个参数会在命令行里传递到子JVM中。缺省的profiling 参数是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。
    调试
    Map/Reduce框架能够运维客商提供的用于调节和测量试验的本子程序。 当map/reduce任务退步时,客商能够通过运营脚本在职责日志(举例职务的正式输出、标准错误、系统日志乃至学业配置文件)上做持续管理职业。顾客提供的调护医治脚本程序的标准输出和专门的工作错误会输出为确诊文件。假设需求的话那一个输出结果也能够打字与印刷在客商分界面上。
    在接下去的章节,大家商酌什么与学业一齐交给调节和测量检验脚本。为了提交调试脚本, 首先要把那一个本子分发出去,而且还要在布署文件里安装。
    什么样分发脚本文件:
    顾客要用 DistributedCache 机制来散发和链接脚本文件
    怎么着提交脚本:
    八个连忙提交调节和测量试验脚本的法子是各自为索要调弄整理的map职责和reduce任务设置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 属性的值。这么些属性也得以经过 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API来安装。对于streaming, 可以分级为须要调弄整理的map职责和reduce义务选拔命令行选项-mapdebug 和 -reducedegug来交付调节和测验脚本。
    本子的参数是天职的正经输出、典型错误、系统日志以至学业配置文件。在运作map/reduce退步的节点上运营调治命令是:
    $script $stdout $stderr $syslog $jobconf
    Pipes 程序依照第七个参数获得c++程序名。 因而调节和测量检验pipes程序的下令是
    $script $stdout $stderr $syslog $jobconf $program
    默许行为
    对于pipes,暗许的剧本会用gdb管理core dump, 打字与印刷 stack trace何况付诸正在运作线程的新闻。
    JobControl
    JobControl是贰个工具,它包裹了一组Map/Reduce作业以致他们中间的依附关系。
    数据压缩
    Hadoop Map/Reduce框架为应用程序的写入文件操作提供压缩工具,那么些工具得认为map输出的中游数据和课业最后输出数据(比如reduce的输出)提供支撑。它还捎带了一些 CompressionCodec的实现,比方完成了 zlib和lzo压缩算法。 Hadoop同样支撑gzip文件格式。
    惦念到质量难点(zlib)以至Java类库的缺乏(lzo)等要素,Hadoop也为上述压缩解压算法提供地点库的实现。越来越多的细节请参照他事他说加以考察这里。
    中间输出
    应用程序能够透过 JobConf.setCompressMapOutput(boolean)api调节map输出的中游结果,并且能够由此JobConf.setMapOutputCompressorClass(Class)api内定CompressionCodec。
    学业输出
    应用程序可以经过 FileOutputFormat.setCompressOutput(JobConf, boolean) api调节输出是或不是要求减小而且能够采用FileOutputFormat.setOutputCompressorClass(JobConf, Class)api钦命CompressionCodec。
    如若作业输出要封存成 SequenceFileOutputFormat格式,须求选择SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,来设定 SequenceFile.CompressionType (i.e. RECOEscortD / BLOCK - 默许是RECORubiconD)。
    例子:WordCount v2.0
    此间是一个更完美的WordCount例子,它采纳了笔者们早已探究过的不在少数Map/Reduce框架提供的效果与利益。
    运行这些例子必要HDFS的少数职能,特别是 DistributedCache相关功效。由此这一个例子只好运营在 伪布满式 恐怕完全分布式形式的 Hadoop上。

2.2.3   难题2:求种种部门的人数和平均薪俸... 11

  DistributedCache.addCacheFile(new URI("/user/tinfo/zhangguochen/libJMeshCalc.so"), conf);

2.2.4   主题材料3:求每一个机关最先进入店肆的职员和工人姓名... 17

  DistributedCache.addCacheArchive(new URI("/user/tinfo/zhangguochen/libJMeshCalc.zip"),conf);
  DistributedCache.addFileToClassPath(new URI("/user/tinfo/zhangguochen/libMeshCal.jar"), conf);

package org.myorg; 

import java.io.*; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.filecache.DistributedCache; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class WordCount extends Configured implements Tool { 

   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 

     static enum Counters { INPUT_WORDS } 

     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 

     private boolean caseSensitive = true; 
     private Set<String> patternsToSkip = new HashSet<String>(); 

     private long numRecords = 0; 
     private String inputFile; 

     public void configure(JobConf job) { 
       caseSensitive = job.getBoolean("wordcount.case.sensitive", true); 
       inputFile = job.get("map.input.file"); 

       if (job.getBoolean("wordcount.skip.patterns", false)) { 
         Path[] patternsFiles = new Path[0]; 
         try { 
           patternsFiles = DistributedCache.getLocalCacheFiles(job); 
         } catch (IOException ioe) { 
           System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe)); 
         } 
         for (Path patternsFile : patternsFiles) { 
           parseSkipFile(patternsFile); 
         } 
       } 
     } 

     private void parseSkipFile(Path patternsFile) { 
       try { 
         BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString())); 
         String pattern = null; 
         while ((pattern = fis.readLine()) != null) { 
           patternsToSkip.add(pattern); 
         } 
       } catch (IOException ioe) { 
         System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe)); 
       } 
     } 

     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
       String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); 

       for (String pattern : patternsToSkip) { 
         line = line.replaceAll(pattern, ""); 
       } 

       StringTokenizer tokenizer = new StringTokenizer(line); 
       while (tokenizer.hasMoreTokens()) { 
         word.set(tokenizer.nextToken()); 
         output.collect(word, one); 
         reporter.incrCounter(Counters.INPUT_WORDS, 1); 
       } 

       if ((++numRecords % 100) == 0) { 
         reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile); 
       } 
     } 
   } 

   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 
     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
       int sum = 0; 
       while (values.hasNext()) { 
         sum += values.next().get(); 
       } 
       output.collect(key, new IntWritable(sum)); 
     } 
   } 

   public int run(String[] args) throws Exception { 
     JobConf conf = new JobConf(getConf(), WordCount.class); 
     conf.setJobName("wordcount"); 

     conf.setOutputKeyClass(Text.class); 
     conf.setOutputValueClass(IntWritable.class); 

     conf.setMapperClass(Map.class); 
     conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(Reduce.class); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     List<String> other_args = new ArrayList<String>(); 
     for (int i=0; i < args.length; ++i) { 
       if ("-skip".equals(args[i])) { 
         DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf); 
         conf.setBoolean("wordcount.skip.patterns", true); 
       } else { 
         other_args.add(args[i]); 
       } 
     } 

     FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); 
     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); 

     JobClient.runJob(conf); 
     return 0; 
   } 

   public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new WordCount(), args); 
     System.exit(res); 
   } 
} 

2.2.5   主题素材4:求各类城市的职工的总薪水... 23

    或者

运作样例
输入样例:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02

2.2.6   主题材料5:列出报酬比上边高的职工姓名及其薪水... 29

    conf.set("mapred.cache.files", "/myapp/file");

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!

2.2.7   主题材料6:列出报酬比公司平均薪酬要高的职员和工人姓名及其工资... 34

   conf.set("mapred.cache. archives", "/mayapp/file.zip");

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
运转程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
介怀此时的输入与第多少个本子的不如,输出的结果也会有例外。
近日因而DistributedCache插入三个情势文件,文件中保留了要被忽略的单词格局。
$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
.
,
!
to
再运转三遍,此番运用愈来愈多的选项:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
应当获得如此的输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
再运转壹遍,那一回关闭大小写敏感性(case-sensitivity):
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
前后相继要点
经过行使一些Map/Reduce框架提供的魔法,WordCount的第2个版本在原来版本基础上有了之类的精雕细琢:
• 浮现了应用程序如何在Mapper (和Reducer)中经过configure方法 修改配置参数(28-43行)。
• 彰显了学业如何利用DistributedCache 来散发只读数据。 这里允许客商钦点单词的形式,在计数时大意那多少个符合形式的单词(104行)。
• 显示Tool接口和GenericOptionsParser管理Hadoop命令行选项的机能 (87-116, 119行)。
• 体现了应用程序怎样运用Counters(68行),如何通过传递给map(和reduce) 方法的Reporter实例来设置应用程序的情事消息(72行)。
Java和JNI是Sun ASL翔升, Inc.在U.S.和另海外家的注册商标。

2.2.8   难点7:列有名字以J开首的职工姓名及其所属单位名称... 40

   以上是布局须求分发的hdfs上的文书,不过前提是那个文件必需在hdfs上存在,看源代码可领悟DistributedCache的静态方法事实上就是包装了conf.set的动作。

2.2.9   题目8:列出工资最高的头三名职工姓名及其报酬... 44

2.在融洽的mapper类中,使用DistributedCache获取下载到本地的文书,当先四分之二意况下那么些操作都是重写configure接口,然后把地点文件路线保存在mapper类的成员变量中,供map方法应用。代码举个例子以下:

2.2.10**问题9:将全方位职工依据营收(薪酬+提成)从高到低排列... 49

   private Path[] localFiles;

2.2.11**难题10:求任何两名职工消息传送所急需经过的中档节点数... 53

   public void setup(Context context) {

3  书面作业2:MapReduce落成推荐系统... 60

       localFiles = DistributeCache.getLocalCacheFiles(context.getConfiguration;

3.1 书面作业2内容... 60

       for(Path temp:localFiles) {

3.2     程序代码... 60

            String path = temp.toString();//path正是此文件在本土的门道

3.2.1   CountThread.java. 60

            if(path.contains("myfileName")) {//获取到温馨必需的文件

3.2.2   Recommendation.java. 62

本文由胜博发-运维发布,转载请注明来源:1.2 机器网络环境... 3,• Hadoop快速入门对初次使