加入收藏 | 设为首页 | 会员中心 | 我要投稿 | RSS
您当前的位置:首页 > 教程文章 > NOSQL数据库

一个简单的hadoop开发例程

时间:2012-05-02 01:38:37  来源:  作者:

Hadoop 是Google MapReduce的一个Java实现。MapReduce是一种简化的分布式编程模式,让程序自动分布到一个由普通机器组成的超大集群上并发执行。就如同java程序员可以不考虑内存泄露一样, MapReduce的run-time系统会解决输入数据的分布细节,跨越机器集群的程序执行调度,处理机器的失效,并且管理机器之间的通讯请求。这样的模式允许程序员可以不需要有什么并发处理或者分布式系统的经验,就可以处理超大的分布式系统得资源。

hadoop的程序开发,主要就是做以下几件事情。

1. 定义mapper,处理输入的key-value对,输出中间结果。

2。定义reducer,可选,对中间结果进行规约,输出最终结果。

3。定义inputformat 和 outputformat, 可选,inputformat将每行输入的文件内容转换为java类提供mapper函数使用,不定义时默认为string.

4.定义main函数,并创建一个job并运行它.

之后就有系统来实现和管理所有的事情了。主要有以下几个方面:

1.基本概念:Hadoop的HDFS实现了google的GFS文件系统,NameNode作为文件系统的负责调度运行在master,DataNode运行在每个机器上。同时Hadoop实现了Google的MapReduce,JobTracker作为MapReduce的总调度运行在master,TaskTracker则运行在每个机器上执行Task。

2.main()函数,创建JobConf,定义Mapper,Reducer,Input/OutputFormat 和输入输出文件目录,最后把Job提交給JobTracker,等待Job结束。

3.JobTracker,创建一个InputFormat的实例,调用它的getSplits()方法,把输入目录的文件拆分成FileSplist作为Mapper task 的输入,生成Mapper task加入Queue。

4.TaskTracker 向 JobTracker索求下一个Map/Reduce。

Mapper Task先从InputFormat创建RecordReader,循环读入FileSplits的内容生成Key与Value,传给Mapper函数,处理完后中间结果写成SequenceFile.
Reducer Task 从运行Mapper的TaskTracker的Jetty上使用http协议获取所需的中间内容(33%),Sort/Merge后(66%),执行Reducer函数,最后按照OutputFormat写入结果目录。

TaskTracker 每10秒向JobTracker报告一次运行情况,每完成一个Task10秒后,就会向JobTracker索求下一个Task。

一个简单的分布式grep函数。

我们做一个简单的分布式的Grep,简单对输入文件进行逐行的正则匹配,如果符合就将该行打印到输出文件。因为是简单的全部输出,所以我们只要写Mapper函数,不用写Reducer函数,也不用定义Input/Output Format。

二、程序员编写的代码
我们做一个简单的分布式的Grep,简单对输入文件进行逐行的正则匹配,如果符合就将该行打印到输出文件。因为是简单的全部输出,所以我们只要写Mapper函数,不用写Reducer函数,也不用定义Input/Output Format。

package demo.hadoop

public class HadoopGrep {

public static class RegMapper extends MapReduceBase implements Mapper {

private Pattern pattern;

public void configure(JobConf job) {
pattern = Pattern.compile(job.get( " mapred.mapper.regex " ));
}

public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
throws IOException {
String text = ((Text) value).toString();
Matcher matcher = pattern.matcher(text);
if (matcher.find()) {
output.collect(key, value);
}
}
}

private HadoopGrep () {
} // singleton

public static void main(String[] args) throws Exception {

JobConf grepJob = new JobConf(HadoopGrep. class );
grepJob.setJobName( " grep-search " );
grepJob.set( " mapred.mapper.regex " , args[ 2 ]);

grepJob.setInputPath( new Path(args[ 0 ]));
grepJob.setOutputPath( new Path(args[ 1 ]));
grepJob.setMapperClass(RegMapper. class );
grepJob.setReducerClass(IdentityReducer. class );

JobClient.runJob(grepJob);
}
}

RegMapper类的configure()函数接受由main函数传入的查找字符串,map() 函数进行正则匹配,key是行数,value是文件行的内容,符合的文件行放入中间结果。
main()函数定义由命令行参数传入的输入输出目录和匹配字符串,Mapper函数为RegMapper类,Reduce函数是什么都不做,直接把中间结果输出到最终结果的的IdentityReducer类,运行Job。


整个代码非常简单,丝毫没有分布式编程的任何细节。


三.运行Hadoop程序
Hadoop这方面的文档写得不全面,综合参考GettingStartedWithHadoop 与Nutch Hadoop Tutorial 两篇后,再碰了很多钉子才终于完整的跑起来了,记录如下:

3.1 local运行模式

完全不进行任何分布式计算,不动用任何namenode,datanode的做法,适合一开始做调试代码。
解压hadoop,其中conf目录是配置目录,hadoop的配置文件在hadoop-default.xml,如果要修改配置,不是直接修改该文件,而是修改hadoop-site.xml,将该属性在hadoop-site.xml里重新赋值。
hadoop-default.xml的默认配置已经是local运行,不用任何修改,配置目录里唯一必须修改的是hadoop-env.sh 里JAVA_HOME的位置。


将编译好的HadoopGrep与RegMapper.class 放入hadoop/build/classes/demo/hadoop/目录 找一个比较大的log文件放入一个目录,然后运行


hadoop / bin / hadoop demo.hadoop.HadoopGrep log文件所在目录 任意的输出目录 grep的字符串

查看输出目录的结果,查看hadoop/logs/里的运行日志。
在重新运行前,先删掉输出目录。
 

3.2 单机集群运行模式

现在来搞一下只有单机的集群.假设以完成3.1中的设置,本机名为hadoopserver
第1步. 然后修改hadoop-site.xml ,加入如下内容:

< property >
< name > fs.default.name </ name >
< value > hadoopserver:9000 </ value >
</ property >
< property >
< name > mapred.job.tracker </ name >
< value > hadoopserver:9001 </ value >
</ property >
< property >
< name > dfs.replication </ name >
< value > 1 </ value >
</ property >

从此就将运行从local文件系统转向了hadoop的hdfs系统,mapreduce的jobtracker也从local的进程内操作变成了分布式的任务系统,9000,9001两个端口号是随便选择的两个空余端口号。

另外,如果你的/tmp目录不够大,可能还要修改hadoop.tmp.dir属性。


第2步. 增加ssh不输入密码即可登陆。

因为Hadoop需要不用输入密码的ssh来进行调度,在不su的状态下,在自己的home目录运行ssh-keygen -t rsa ,然后一路回车生成密钥,再进入.ssh目录,cp id_rsa.pub authorized_keys
详细可以man 一下ssh, 此时执行ssh hadoopserver,不需要输入任何密码就能进入了。

3.格式化namenode,执行
bin/hadoop namenode -format

4.启动Hadoop
执行hadoop/bin/start-all.sh, 在本机启动namenode,datanode,jobtracker,tasktracker

5.现在将待查找的log文件放入hdfs,。
执行hadoop/bin/hadoop dfs 可以看到它所支持的文件操作指令。
执行hadoop/bin/hadoop dfs put log文件所在目录 in ,则log文件目录已放入hdfs的/user/user-name/in 目录中

6.现在来执行Grep操作
hadoop/bin/hadoop demo.hadoop.HadoopGrep in out
查看hadoop/logs/里的运行日志,重新执行前。运行hadoop/bin/hadoop dfs rmr out 删除out目录。

7.运行hadoop/bin/stop-all.sh 结束

3.3 集群运行模式
假设已执行完3.2的配置,假设第2台机器名是hadoopserver2
1.创建与hadoopserver同样的执行用户,将hadoop解压到相同的目录。

2.同样的修改haoop-env.sh中的JAVA_HOME 及修改与3.2同样的hadoop-site.xml

3. 将hadoopserver中的/home/username/.ssh/authorized_keys 复制到hadoopserver2,保证hadoopserver可以无需密码登陆hadoopserver2
scp /home/username/.ssh/authorized_keys username@hadoopserver2:/home/username/.ssh/authorized_keys

4.修改hadoop-server的hadoop/conf/slaves文件, 增加集群的节点,将localhost改为
hadoop-server
hadoop-server2

5.在hadoop-server执行hadoop/bin/start-all.sh
将会在hadoop-server启动namenode,datanode,jobtracker,tasktracker
在hadoop-server2启动datanode 和tasktracker

6.现在来执行Grep操作
hadoop/bin/hadoop demo.hadoop.HadoopGrep in out
重新执行前,运行hadoop/bin/hadoop dfs rmr out 删除out目录

7.运行hadoop/bin/stop-all.sh 结束。
 

四、效率
经测试,Hadoop并不是万用灵丹,很取决于文件的大小和数量,处理的复杂度以及群集机器的数量,相连的带宽,当以上四者并不大时,hadoop优势并不明显。
比如,不用hadoop用java写的简单grep函数处理100M的log文件只要4秒,用了hadoop local的方式运行是14秒,用了hadoop单机集群的方式是30秒,用双机集群10M网口的话更慢,慢到不好意思说出来的地步。


Hadoop 的文件系统,最重要是 FileSystem 类,以及它的两个子类 LocalFileSystem 和 DistributedFileSystem。 这里先分析 FileSystem。
抽象类 FileSystem,提高了一系列对文件/目录操作的接口,还有一些辅助方法。分别说明一下:
1. open,create,delete,rename等,非abstract,部分返回 FSDataOutputStream,作为流进行处理。
2. openRaw,createRaw,renameRaw,deleteRaw等,abstract,部分返回 FSInputStream,可以随机访问。
3. lock,release,copyFromLocalFile,moveFromLocalFile,copyToLocalFile 等abstract method,提供便利作用,从方法命名可以看出作用。
特别说明,Hadoop的文件系统,每个文件都有一个checksum,一个crc文件。因此FileSystem里面的部分代码对此进行了特别的处理,比如 rename。
LocalFileSystem 和 DistributedFileSystem,理应对用户透明,这里不多做分析,和 FSDataInputStream,FSInputStream 结合一起说明一下。
查看两个子类的 getFileCacheHints 方法,可以看到 LocalFileSystem 是使用'localhost'来命名,这里暂且估计两个FileSystem都是通过网络进行数据通讯,一个是Internet,一个是Intranet。
LocalFileSystem 里面有两个内部类 LocalFSFileInputStream和LocalFSFileOutputStream,查看代码可以看到它是使用 FileChannel进行操作的。另外 lock和release 两个方法使用了TreeMap来保存文件和对应的锁。
DistributedFileSystem 代码量少于 LocalFileSystem,但是更加复杂,它里面使用了 DFSClient 来进行分布式文件系统的操作:
public DistributedFileSystem(InetSocketAddress namenode, Configuration conf) throws IOException
{
super(conf);
this.dfs = new DFSClient(namenode, conf);
this.name = namenode.getHostName() + ":" + namenode.getPort();
}
DFSClient 类接收一个InetSocketAddress 和Configuration 作为输入,对网络传输细节进行了封装。DistributedFileSystem中绝大多数方法都是调用DFSClient进行处理,它只是一个 Warpper。下面着重分析DFSClient。
DFSClient中,主要使用RPC来进行网络的通讯,而不是直接在内部使用Socket。如果要详细了解传输细节,可以查看 org.apache.hadoop.ipc 这个包里面的3个Class。
DFSClient 中的路径,基本上都是UTF8类型,而非String,在DistributedFileSystem中,通过getPath和getDFSPath来转换,这样做可以保证路径格式的标准和数据传输的一致性。
DFSClient 中的大多数方法,也是直接委托ClientProtocol类型的namenode来执行,这里主要分析其它方法。
LeaseChecker 内部类。一个守护线程,定期对namenode进行renewLease操作,注释说明:
Client programs can cause stateful changes in the NameNode that affect other clients. A client may obtain a file and neither abandon nor complete it. A client might hold a series of locks that prevent other clients from proceeding. Clearly, it would be bad if a client held a bunch of locks that it never gave up. This can happen easily if the client dies unexpectedly. So, the NameNode will revoke the locks and live file-creates for clients that it thinks have died. A client tells the NameNode that it is still alive by periodically calling renewLease(). If a certain amount of time passes since the last call to renewLease(), the NameNode assumes the client has died.
作用是对client进行心跳监测,若client挂掉了,执行解锁操作。
DFSInputStream 和 DFSOutputStream,比LocalFileSystem里面的更为复杂,也是通过 ClientProtocol 进行操作,里面使用到了 org.apache.hadoop.dfs 包中的数据结构,如DataNode,Block等,这里不对这些细节进行分析。

对FileSystem的分析(1)到此结束,个人感觉它的封装还是做的不错的,从Nutch项目分离出来后,比原先更为清晰。 下面就接着进行MapReduce的第二部分分析,从MapReduce如何进行分布式
 

###############################################################################

之前的MapReduce Demo只能在一台机器上运行,现在是时候让它分布式运行了。在对MapReduce的运行流程和FileSystem进行了简单研究之后,现在尝试从配置着手,看看怎样让Hadoop在两台机器上面同时运行MapReduce。
首先看回这里
String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = (JobSubmissionProtocol)
RPC.getProxy(JobSubmissionProtocol.class,
JobTracker.getAddress(conf), conf);
}
当tracker地址不为local,则tracker为Remote Client的 JobTracker 类,这里重点分析。
JobTracker有一个main函数,注释显示它仅仅用于调试,正常情况是作为DFS Namenode进程的一部分来运行。不过这里我们可以先从它着手开始分析。
tracker = new JobTracker(conf); //构造
构造函数先获取一堆常量的值,然后清空'systemDir',接着启动RPC服务器。
InetSocketAddress addr = getAddress(conf);
this.localMachine = addr.getHostName();
this.port = addr.getPort();
this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
this.interTrackerServer.start();
启动TrackInfoServer:
this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
this.infoServer = new JobTrackerInfoServer(this, infoPort);
this.infoServer.start();
TrackInfoServer 提供了通过HTTP方式获取JobTracker信息的方式,可以方便用于监测工作任务的进度。
启动三个守护线程:
new Thread(this.expireTrackers).start(); //Used to expire TaskTrackers that have gone down
new Thread(this.retireJobs).start(); //Used to remove old finished Jobs that have been around for too long
new Thread(this.initJobs).start(); //Used to init new jobs that have just been created
三个线程的用处已经注释,这里不作分析。下面开始分析 JobTracker.submitJob()
之前已经分析过 LocalJobRunner.submitJob(),它实例化内部类Job,在里面实现MapReduce流程。JobTracker就复杂一些,它实例化 JobInProgress,然后将这个Job提交到队列:
JobInProgress job = new JobInProgress(jobFile, this, this.conf);
synchronized (jobs) {
synchronized (jobsByArrival) {
synchronized (jobInitQueue) {
jobs.put(job.getProfile().getJobId(), job);
jobsByArrival.add(job);
jobInitQueue.add(job);
jobInitQueue.notifyAll();
}
}
}
此时RetireJobs线程开始处理超时和出错的Job,JobInitThread线程初始化工作任务: job.initTasks();
开始分析 JobInProgress
在构造函数中,Tracker从发起端的DFS获取任务文件(xml和jar),然后保存到本地目录下面
JobConf default_job_conf = new JobConf(default_conf);
this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,
jobid + ".xml");
this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,
jobid + ".jar");
FileSystem fs = FileSystem.get(default_conf);
fs.copyToLocalFile(new File(jobFile), localJobFile);

conf = new JobConf(localJobFile);
this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url,
conf.getJobName());
String jarFile = conf.getJar();
if (jarFile != null) {
fs.copyToLocalFile(new File(jarFile), localJarFile);
conf.setJar(localJarFile.getCanonicalPath());
}

这里要注意jarFile,JobConf的构造函数:
public JobConf(Configuration conf, Class aClass) {
this(conf);
String jar = findContainingJar(aClass);
if (jar != null) {
setJar(jar);
}
}
如果 aClass 是在一个jar里面,那么setJar(jar);就会被执行,这个jar会被copy到 LocalJobRunner 或是 JobTracker 的工作目录下面。所以这里有一个原则: 将要执行的MapReduce操作的所有class打包到一个jar中,这样才能执行分布式的MapReduce计算。
再看 JobInProgress.initTasks()
先从Jar中加载InputFormat
String ifClassName = jd.get("mapred.input.format.class");
InputFormat inputFormat;
if (ifClassName != null && localJarFile != null) {
try {
ClassLoader loader =
new URLClassLoader(new URL[]{ localJarFile.toURL() });
Class inputFormatClass = loader.loadClass(ifClassName);
inputFormat = (InputFormat)inputFormatClass.newInstance();
} catch (Exception e) {
throw new IOException(e.toString());
}
} else {
inputFormat = jd.getInputFormat();
}
接下来对文件块的大小进行排序
创建对应的Map任务
this.numMapTasks = splits.length;
// create a map task for each split
this.maps = new TaskInProgress[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
maps = new TaskInProgress(jobFile, splits, jobtracker, conf, this);
}
创建Reduce任务
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this);
}
最后对于每Split的信息进行缓存,并且创建状态类
for (int i = 0; i < maps.length; i++) {
String hints[][] = fs.getFileCacheHints(splits.getFile(), splits.getStart(), splits.getLength());
cachedHints.put(maps.getTIPId(), hints);
}

this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
现在轮到 TaskInProgress,它将Job里面的Map和Reduce操作进行了封装,但是JobInProgress.initTasks()仅仅对task进行了初始化,并没有执行Task,经过一番跟踪,发现Task的执行,是由 TaskTracker 来处理。
TaskTracker,实现了TaskUmbilicalProtocol接口。在之前的文章中,LocalJobRunner的内部类Job也实现了这个接口,这里对比一下:
接口 JobSubmissionProtocol: LocalJobRunner <---> JobTracker
接口 TaskUmbilicalProtocol: LocalJobRunner.Job <---> TaskTracker
下面对TaskTracker进行分析,首先也是从main入口开始。
TaskTracker实现了Runnable,main实例化TaskTracker对象,然后执行run()方法。
在构造函数中,主要进行初始化
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
initialize();
initialize()里面,初始化一些变量值 ,然后初始化RPC服务器:
while (true) {
try {
this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);
this.taskReportServer.start();
break;
} catch (BindException e) {
LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");
this.taskReportPort++;
}

}
while (true) {
try {
this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);
this.mapOutputServer.start();
break;
} catch (BindException e) {
LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port");
this.mapOutputPort++;
}
}
mapOutputServer使用一个循环来尝试各个端口绑定。
最后一句
this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
这里有一个新的接口InterTrackerProtocol,是TaskTracker和中央JobTracker通讯用的协议。通过这个接口, TaskTracker可以用来执行JobTracker中的Task了。接下来分析TaskServer的主流程,run()函数。
run()中, 有两个while循环。在内部while循环里面,执行 offerService() 方法。它里面也是一个while循环,开始几段代码用于JobTracker的心跳监测。接下来,它通过协议接口调用JobTracker,获取Task并执行:
if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
Task t = jobClient.pollForNewTask(taskTrackerName);
if (t != null) {
TaskInProgress tip = new TaskInProgress(t, this.fConf);
synchronized (this) {
tasks.put(t.getTaskId(), tip);
if (t.isMapTask()) {
mapTotal++;
} else {
reduceTotal++;
}
runningTasks.put(t.getTaskId(), tip);
}
tip.launchTask();
}
}
tip.launchTask(); 开始执行这个Task,在方法内部:
this.runner = task.createRunner(TaskTracker.this);
this.runner.start();
Task 有两个子类 MapTask和ReduceTask,它们的createRunner()方法都会创建一个TaskRunner的子类,TaskRunner继承Thread,run()方法中:
String sep = System.getProperty("path.separator");
File workDir = new File(new File(t.getJobFile()).getParent(), "work");
workDir.mkdirs();

StringBuffer classPath = new StringBuffer();
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
JobConf job = new JobConf(t.getJobFile());
String jar = job.getJar();
if (jar != null) { // if jar exists, it into workDir
unJar(new File(jar), workDir);
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.append(sep); // add libs from jar to classpath
classPath.append(libs);
}
}
classPath.append(sep);
classPath.append(new File(workDir, "classes"));
classPath.append(sep);
classPath.append(workDir);
}

获取工作目录,获取classpath。然后解压工作任务的jar包。
// Build exec child jmv args.
Vector vargs = new Vector(8);
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");

vargs.add(jvm.toString());
String javaOpts = handleDeprecatedHeapSize(
job.get("mapred.child.java.opts", "-Xmx200m"),
job.get("mapred.child.heap.size"));
javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
int port = job.getInt("mapred.task.tracker.report.port", 50050) + 1;
javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit);
}

// Add classpath.
vargs.add("-classpath");
vargs.add(classPath.toString());
// Add main class and its arguments
vargs.add(TaskTracker.Child.class.getName()); // main of Child
vargs.add(tracker.taskReportPort + ""); // pass umbilical port
vargs.add(t.getTaskId()); // pass task identifier
// Run java
runChild((String[])vargs.toArray(new String[0]), workDir);
这里是构造启动Java进程的classpath和其它vm参数,最后在 runChild 中开一个子进程来执行这个Task。感觉够复杂的。
最后分析TaskTracker的内部类Child。它就是上面子进程执行的类。在main函数中
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
new InetSocketAddress(port), conf);

Task task = umbilical.getTask(taskid);
JobConf job = new JobConf(task.getJobFile());

conf.addFinalResource(new File(task.getJobFile()));
可见该子进程也是通过RPC跟TaskTracker进行通讯。
startPinging(umbilical, taskid); // start pinging parent
开一个进程,对TaskTracker进行心跳监测。
String workDir = job.getWorkingDirectory();
if (workDir != null) {
FileSystem file_sys = FileSystem.get(job);
file_sys.setWorkingDirectory(new File(workDir));
}
task.run(job, umbilical); // run the task
这里才真正开始执行Task。

来顶一下
返回首页
返回首页
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表
推荐资讯
在CentOS下搭建Android 开发环境
在CentOS下搭建Androi
轻松搭建属于自己的Ubuntu发行版
轻松搭建属于自己的Ub
利用SUSE Studio 打造自己的个性化Linux发行版
利用SUSE Studio 打造
那些采用PHP技术的IT大企业
那些采用PHP技术的IT大
相关文章
    无相关信息
栏目更新
栏目热门