一、背景

早在8月份的时候,我就做了一些MR的Join查询,但是发现回北京之后,2个月不用,居然有点生疏,所以今天早上又花时间好好看了一下,顺便写下这个文档,以供以后查阅。

二、环境

JDK 1.6、Linux操作系统、hadoop0.20.2

三、资料数据

在做这个Join查询的时候,必然涉及数据,我这里设计了2张表,分别较data.txt和info.txt,字段之间以/t划分。

data.txt内容如下:

201001	1003	abc
201002	1005	def
201003	1006	ghi
201004	1003	jkl
201005	1004	mno
201006	1005	pqr


info.txt内容如下:

1003	kaka
1004	da
1005	jue
1006	zhao


期望输出结果:

1003	201001	abc	kaka
1003	201004	jkl	kaka
1004	201005	mno	da
1005	201002	def	jue
1005	201006	pqr	jue
1006	201003	ghi	zhao


四、Map代码

首先是map的代码,我贴上,然后简要说说

public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			// 获取输入文件的全路径和名称
			String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

			if (pathName.contains("data.txt")) {
				String values[] = value.toString().split("/t");
				if (values.length < 3) {
					// data数据格式不规范,字段小于3,抛弃数据
					return;
				} else {
					// 数据格式规范,区分标识为1
					TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
					context.write(tp, new Text(values[0] + "/t" + values[2]));
				}
			}
			if (pathName.contains("info.txt")) {
				String values[] = value.toString().split("/t");
				if (values.length < 2) {
					// data数据格式不规范,字段小于2,抛弃数据
					return;
				} else {
					// 数据格式规范,区分标识为0
					TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
					context.write(tp, new Text(values[1]));
				}
			}
		}
	}


 

这里需要注意以下部分:

A、pathName是文件在HDFS中的全路径(例如:hdfs://M1:9000/dajuezhao/join/data/info.txt),可以以endsWith()的方法来判断。

B、资料表,也就是这里的info.txt需要放在前面,也就是标识号是0.否则无法输出理想结果。

C、Map执行完成之后,输出的中间结果如下:

1003,0	kaka
1004,0	da
1005,0	jue
1006,0	zhao
1003,1	201001	abc
1003,1	201004	jkl
1004,1	201005	mon
1005,1	201002	def
1005,1	201006	pqr
1006,1	201003	ghi



五、分区和分组

1、map之后的输出会进行一些分区的操作,代码贴出来:

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
		@Override
		public int getPartition(TextPair key, Text value, int numParititon) {
			return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
		}
	}



分区我在以前的文档中写过,这里不做描述了,就说是按照map输出的符合key的第一个字段做分区关键字。分区之后,相同key会划分到一个reduce中去处理(如果reduce设置是1,那么就是分区有多个,但是还是在一个reduce中处理。但是结果会按照分区的原则排序)。分区后结果大致如下:
同一区:
1003,0	kaka
1003,1	201001	abc
1003,1	201004	jkl


同一区:
1004,0	da
1004,1	201005	mon


同一区:
1005,0	jue
1005,1	201002	def
1005,1	201006	pqr


同一区:
1006,0	zhao
1006,1	201003	ghi


2、分组操作,代码如下

public static class Example_Join_01_Comparator extends WritableComparator {

		public Example_Join_01_Comparator() {
			super(TextPair.class, true);
		}

		@SuppressWarnings("unchecked")
		public int compare(WritableComparable a, WritableComparable b) {
			TextPair t1 = (TextPair) a;
			TextPair t2 = (TextPair) b;
			return t1.getFirst().compareTo(t2.getFirst());
		}
	}

分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,是按照复合key的第一个字段做分组原则,达到忽略复合key的第二个字段值的目的,从而让数据能够迭代在一个reduce中。输出后结果如下:

同一组:
1003,0	kaka
1003,0	201001	abc
1003,0	201004	jkl

同一组:
1004,0	da
1004,0	201005	mon

同一组:
1005,0	jue
1005,0	201002	def
1005,0	201006	pqr

同一组:
1006,0	zhao
1006,0	201003	ghi


六、reduce操作

贴上代码如下:

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {
		protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,
				InterruptedException {
			Text pid = key.getFirst();
			String desc = values.iterator().next().toString();
			while (values.iterator().hasNext()) {
				context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));
			}
		}
	}

1、代码比较简单,首先获取关键的ID值,就是key的第一个字段。

2、获取公用的字段,通过排组织后可以看到,一些共有字段是在第一位,取出来即可。

3、遍历余下的结果,输出。

七、其他的支撑代码

1、首先是TextPair代码,没有什么可以细说的,贴出来:

public class TextPair implements WritableComparable<TextPair> {
	private Text first;
	private Text second;

	public TextPair() {
		set(new Text(), new Text());
	}

	public TextPair(String first, String second) {
		set(new Text(first), new Text(second));
	}

	public TextPair(Text first, Text second) {
		set(first, second);
	}

	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}

	public Text getFirst() {
		return first;
	}

	public Text getSecond() {
		return second;
	}

	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	public int compareTo(TextPair tp) {
		int cmp = first.compareTo(tp.first);
		if (cmp != 0) {
			return cmp;
		}
		return second.compareTo(tp.second);
	}
}


2、Job的入口函数

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
		String[] otherArgs = parser.getRemainingArgs();
		if (agrs.length < 3) {
			System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");
			System.exit(2);
		}

		//conf.set("hadoop.job.ugi", "root,hadoop");

		Job job = new Job(conf, "Example_Join_01");
		// 设置运行的job
		job.setJarByClass(Example_Join_01.class);
		// 设置Map相关内容
		job.setMapperClass(Example_Join_01_Mapper.class);
		// 设置Map的输出
		job.setMapOutputKeyClass(TextPair.class);
		job.setMapOutputValueClass(Text.class);
		// 设置partition
		job.setPartitionerClass(Example_Join_01_Partitioner.class);
		// 在分区之后按照指定的条件分组
		job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
		// 设置reduce
		job.setReducerClass(Example_Join_01_Reduce.class);
		// 设置reduce的输出
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		// 设置输入和输出的目录
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
		// 执行,直到结束就退出
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

八、总结

1、这是个简单的join查询,可以看到,我在处理输入源的时候是在map端做来源判断。其实在0.19可以用MultipleInputs.addInputPath()的方法,但是它用了JobConf做参数。这个方法原理是多个数据源就采用多个map来处理。方法各有优劣。

2、对于资源表,如果我们采用0和1这样的模式来区分,资源表是需要放在前的。例如本例中info.txt就是资源表,所以标识位就是0.如果写为1的话,可以试下,在分组之后,资源表对应的值放在了迭代器最后一位,无法追加在最后所有的结果集合中。

3、关于分区,并不是所有的map都结束才开始的,一部分数据完成就会开始执行。同样,分组操作在一个分区内执行,如果分区完成,分组将会开始执行,也不是等所有分区完成才开始做分组的操作。

4、有疑问或是写的不对的地方,欢迎大家发邮件沟通交流:dajuezhao@gmail.com

GitHub 加速计划 / li / linux-dash
6
1
下载
A beautiful web dashboard for Linux
最近提交(Master分支:3 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐