I have been reading Hadoop-related books in recent days. I feel a little bit like that, so I have written a statistically related product by imitating the WordCount program.
Requirement description:
According to the supermarket's sales list, calculate the degree of correlation between goods (that is, count the number of times you buy goods A and goods B at the same time).
Data format:
The supermarket sales list is simplified to the following format: a line represents a list, and each product is divided into "," as shown in the figure below:
Requirements Analysis:
Use mapreduce in hadoop to calculate this requirement.
The map function mainly splits out the associated products, the output result is that the key is product A and the value is product B. The split result for the first and three results is shown in the figure below:
In order to count the products that are related to the two products A and B, the relationship between products A and B outputs two results, namely AB and BA.
The reduce function grouped and counted the products related to product A, that is, to calculate the number of times each product appears in the value, and the output result is that the key is product A|product B, and the value is the number of times this combination occurs. For the 5 records mentioned above, analyze the key value of R in the map output:
Through the processing of the map function, the record shown in the figure below is obtained:
The value value output in reduce is grouped and counted, and the result is shown in the figure below.
Use product AB as key, combine the number of products as value to output, and the output result is shown in the figure below:
The analysis of the implementation process of requirements has ended so far. Let’s take a look at the specific code implementation below.
Code implementation:
I won't give a detailed introduction to the code, please refer to the comments in the code for details.
package com; import java.io.IOException; import java.util.HashMap; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Test extends Configured implements Tool{ /** * map class, implement data preprocessing* The output result key is product A value and the associated product B * @author lulei */ public static class MapT extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); if (!(line == null || "".equals(line)) { //Split the product String []vs = line.split(","); //Combining the two to form a record for (int i = 0; i < (vs.length - 1); i++) { if ("".equals(vs[i])) {//Exclude empty records continue; } for (int j = i+1; j < vs.length; j++) { if ("".equals(vs[j])) { continue; } //Output result context.write(new Text(vs[i]), new Text(vs[j])); context.write(new Text(vs[j]), new Text(vs[i])); } } } } } } } /** * Reduce class, implements the count of data* The output result key is product A|B value, which is the number of associations* @author lulei */ public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> { private int count; /** * Initialization*/ public void setup(Context context) { //Get the minimum number of records from the parameters String countStr = context.getConfiguration().get("count"); try { this.count = Integer.parseInt(countStr); } catch (Exception e) { this.count = 0; } } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ String keyStr = key.toString(); HashMap<String, Integer> hashMap = new HashMap<String, Integer>(); //Use hash to count the number of times B products for (Text value : values) { String valueStr = value.toString(); if (hashMap.containsKey(valueStr)) { hashMap.put(valueStr, hashMap.get(valueStr) + 1); } else { hashMap.put(valueStr, 1); } } //Output the result for (Entry<String, Integer> entry : hashMap.entrySet()) { if (entry.getValue() >= this.count) {//Only output context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue())); } } } } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf = getConf(); conf.set("count", arg0[2]); Job job = new Job(conf); job.setJobName("jobtest"); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MapT.class); job.setReducerClass(ReduceT.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub if (args.length != 3) { System.exit(-1); } try { int res = ToolRunner.run(new Configuration(), new Test(), args); System.exit(res); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } Upload and run:
Package the program into a jar file and upload it to the machine group. Upload test data to the HDFS distributed file system as well.
The screenshot of the command running is shown in the following figure:
After the run is completed, check the corresponding HDFS file system, as shown in the figure below:
Here is a complete mapreduce program. I will continue to learn about Hadoop~ Thank you for reading, I hope it can help you. Thank you for your support for this site!