Hadoop使用MapReduce处理百万行Json格式数据
需求:
每行数据格式"{\"movie\":\"2599\",\"rate\":\"5\",\"timeStamp\":\"957716949\",\"uid\":\"6040\"}"
从其中计算出每个用户评分最高的十步电影movie值和rate值
输出为uid:...movie...rate...
思路:
map端先将读取的json数据转成pojo对象,所以要创建一个bean用来接收每行的json数据,然后将用户uid作物key,对象作为value放入context中。
reduce端将相同key值的pojo对象放入list集合,使用Collections.sort()方法对其按照rate值排序。最后遍历排序后的前十条记录写入context中。
代码:
bean对象
package com.season.mapper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class MovieBean implements Writable{
private Long movie;
private Long rate;
private Long timeStamp;
private Long uid;
public MovieBean() {
}
public MovieBean(Long movie, Long rate, Long timeStamp, Long uid) {
super();
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}
public Long getMovie() {
return movie;
}
public void setMovie(Long movie) {
this.movie = movie;
}
public Long getRate() {
return rate;
}
public void setRate(Long rate) {
this.rate = rate;
}
public Long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(Long timeStamp) {
this.timeStamp = timeStamp;
}
public Long getUid() {
return uid;
}
public void setUid(Long uid) {
this.uid = uid;
}
@Override
public void readFields(DataInput in) throws IOException {
this.movie = in.readLong();
this.rate = in.readLong();
this.timeStamp = in.readLong();
this.uid = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(movie);
out.writeLong(rate);
out.writeLong(timeStamp);
out.writeLong(uid);
}
}
map:
package com.season.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MovieMapper extends Mapper<LongWritable, Text, LongWritable, MovieBean>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
MovieBean movie = JsonUtils.jsonToPojo(value.toString(), MovieBean.class);
context.write(new LongWritable(movie.getUid()),movie);
}
}
reduce:
package com.season.mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MovieReducer extends Reducer<LongWritable, MovieBean, LongWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
List<MovieBean> mList = new ArrayList<MovieBean>();
for (MovieBean movie : values) {
MovieBean bean = new MovieBean();
bean.setMovie(movie.getMovie());
bean.setRate(movie.getRate());
bean.setTimeStamp(movie.getTimeStamp());
bean.setUid(movie.getUid());
mList.add(bean);
}
Collections.sort(mList, new Comparator<MovieBean>(){
@Override
public int compare(MovieBean o1, MovieBean o2) {
if (o1.getRate().longValue() < o2.getRate().longValue()) {
return 1;
}
if (o1.getRate().equals(o2.getRate())) {
return 0;
}
return -1;
}
});
for (int i = 0; i < 10; i++) {
context.write(key, new Text("rate: "+mList.get(i).getRate()+" movie: "+ mList.get(i).getMovie()));
}
}
}
MovieJobSubmitter:
package com.season.mapper;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MovieJobSubmitter {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJar("/root/movie.jar");
//job.setJarByClass(MovieJobSubmitter.class);
job.setMapperClass(MovieMapper.class);
job.setReducerClass(MovieReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(MovieBean.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?1:0);
}
}
最后将数据文件放入hdfs集群中,将程序打成jar包放入hadoop客户端中
使用命令行hadoop jar -XX.jar main方法全路径 输入目录 输出目录
最后可以得打如下处理结果:
6036 rate: 5 movie: 593
6036 rate: 5 movie: 594
6036 rate: 5 movie: 3022
6036 rate: 5 movie: 3028
6036 rate: 5 movie: 3030
6036 rate: 5 movie: 3034
6036 rate: 5 movie: 903
6037 rate: 5 movie: 593
6037 rate: 5 movie: 903
6037 rate: 5 movie: 904
6037 rate: 5 movie: 910
6037 rate: 5 movie: 924
6037 rate: 5 movie: 926
6037 rate: 5 movie: 928
6037 rate: 5 movie: 3095
6037 rate: 5 movie: 3471
6037 rate: 5 movie: 260
6038 rate: 5 movie: 3088
6038 rate: 5 movie: 1148
6038 rate: 5 movie: 1183
6038 rate: 5 movie: 1223
6038 rate: 5 movie: 1296
6038 rate: 5 movie: 1079
6038 rate: 4 movie: 1419
6038 rate: 4 movie: 232
6038 rate: 4 movie: 1136
6038 rate: 4 movie: 2146
6039 rate: 5 movie: 3037
6039 rate: 5 movie: 903
6039 rate: 5 movie: 904
6039 rate: 5 movie: 913
6039 rate: 5 movie: 916
6039 rate: 5 movie: 918
6039 rate: 5 movie: 922
6039 rate: 5 movie: 926
6039 rate: 5 movie: 3088
6039 rate: 5 movie: 260
6040 rate: 5 movie: 593
6040 rate: 5 movie: 2076
6040 rate: 5 movie: 912
6040 rate: 5 movie: 913
6040 rate: 5 movie: 916
6040 rate: 5 movie: 919
6040 rate: 5 movie: 923
6040 rate: 5 movie: 924
6040 rate: 5 movie: 3089
6040 rate: 5 movie: 953
更多推荐
所有评论(0)