需求:

每行数据格式"{\"movie\":\"2599\",\"rate\":\"5\",\"timeStamp\":\"957716949\",\"uid\":\"6040\"}"

从其中计算出每个用户评分最高的十步电影movie值和rate值

输出为uid:...movie...rate...


思路:

map端先将读取的json数据转成pojo对象,所以要创建一个bean用来接收每行的json数据,然后将用户uid作物key,对象作为value放入context中。

reduce端将相同key值的pojo对象放入list集合,使用Collections.sort()方法对其按照rate值排序。最后遍历排序后的前十条记录写入context中。


代码:

bean对象

package com.season.mapper;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class MovieBean implements Writable{

	private Long movie;
	private Long rate;
	private Long timeStamp;
	private Long uid;
	
	public MovieBean() {
	}

	public MovieBean(Long movie, Long rate, Long timeStamp, Long uid) {
		super();
		this.movie = movie;
		this.rate = rate;
		this.timeStamp = timeStamp;
		this.uid = uid;
	}

	public Long getMovie() {
		return movie;
	}

	public void setMovie(Long movie) {
		this.movie = movie;
	}

	public Long getRate() {
		return rate;
	}

	public void setRate(Long rate) {
		this.rate = rate;
	}

	public Long getTimeStamp() {
		return timeStamp;
	}

	public void setTimeStamp(Long timeStamp) {
		this.timeStamp = timeStamp;
	}

	public Long getUid() {
		return uid;
	}

	public void setUid(Long uid) {
		this.uid = uid;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.movie = in.readLong();
		this.rate = in.readLong();
		this.timeStamp = in.readLong();
		this.uid = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(movie);
		out.writeLong(rate);
		out.writeLong(timeStamp);
		out.writeLong(uid);
	}

}
map:

package com.season.mapper;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MovieMapper extends Mapper<LongWritable, Text, LongWritable, MovieBean>{
	
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		MovieBean movie = JsonUtils.jsonToPojo(value.toString(), MovieBean.class);
		
		context.write(new LongWritable(movie.getUid()),movie);
		
	}
}
reduce:

package com.season.mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MovieReducer extends Reducer<LongWritable, MovieBean, LongWritable, Text> {

	@Override
	protected void reduce(LongWritable key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {

		List<MovieBean> mList = new ArrayList<MovieBean>();

		for (MovieBean movie : values) {
			MovieBean bean = new MovieBean();
			bean.setMovie(movie.getMovie());
			bean.setRate(movie.getRate());
			bean.setTimeStamp(movie.getTimeStamp());
			bean.setUid(movie.getUid());
			mList.add(bean);
		}
		
		Collections.sort(mList, new Comparator<MovieBean>(){

			@Override
			public int compare(MovieBean o1, MovieBean o2) {
				if (o1.getRate().longValue() < o2.getRate().longValue()) {
					return 1;
				} 
				if (o1.getRate().equals(o2.getRate())) {
					return 0;
				}
				return -1;
			}
			
		});

		for (int i = 0; i < 10; i++) {
			context.write(key, new Text("rate: "+mList.get(i).getRate()+" movie: "+ mList.get(i).getMovie()));
		}
	}

}
MovieJobSubmitter:

package com.season.mapper;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MovieJobSubmitter {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJar("/root/movie.jar");
		//job.setJarByClass(MovieJobSubmitter.class);
		
		job.setMapperClass(MovieMapper.class);
		job.setReducerClass(MovieReducer.class);
		
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(MovieBean.class);
		
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setOutputFormatClass(TextOutputFormat.class);
		TextOutputFormat.setOutputPath(job, new Path(args[1]));
		
		boolean res = job.waitForCompletion(true);
		
		System.exit(res?1:0);
		
	}
}

最后将数据文件放入hdfs集群中,将程序打成jar包放入hadoop客户端中

使用命令行hadoop jar -XX.jar main方法全路径 输入目录 输出目录

最后可以得打如下处理结果:

6036    rate: 5 movie: 593
6036    rate: 5 movie: 594
6036    rate: 5 movie: 3022
6036    rate: 5 movie: 3028
6036    rate: 5 movie: 3030
6036    rate: 5 movie: 3034
6036    rate: 5 movie: 903
6037    rate: 5 movie: 593
6037    rate: 5 movie: 903
6037    rate: 5 movie: 904
6037    rate: 5 movie: 910
6037    rate: 5 movie: 924
6037    rate: 5 movie: 926
6037    rate: 5 movie: 928
6037    rate: 5 movie: 3095
6037    rate: 5 movie: 3471
6037    rate: 5 movie: 260
6038    rate: 5 movie: 3088
6038    rate: 5 movie: 1148
6038    rate: 5 movie: 1183
6038    rate: 5 movie: 1223
6038    rate: 5 movie: 1296
6038    rate: 5 movie: 1079
6038    rate: 4 movie: 1419
6038    rate: 4 movie: 232
6038    rate: 4 movie: 1136
6038    rate: 4 movie: 2146
6039    rate: 5 movie: 3037
6039    rate: 5 movie: 903
6039    rate: 5 movie: 904
6039    rate: 5 movie: 913
6039    rate: 5 movie: 916
6039    rate: 5 movie: 918
6039    rate: 5 movie: 922
6039    rate: 5 movie: 926
6039    rate: 5 movie: 3088
6039    rate: 5 movie: 260
6040    rate: 5 movie: 593
6040    rate: 5 movie: 2076
6040    rate: 5 movie: 912
6040    rate: 5 movie: 913
6040    rate: 5 movie: 916
6040    rate: 5 movie: 919
6040    rate: 5 movie: 923
6040    rate: 5 movie: 924
6040    rate: 5 movie: 3089
6040    rate: 5 movie: 953


GitHub 加速计划 / js / json
17
5
下载
适用于现代 C++ 的 JSON。
最近提交(Master分支:2 个月前 )
960b763e 5 个月前
8c391e04 8 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐