You can use a simple example to illustrate what MapReduce is:
We want to count the number of times each word appears in a large file. Because the file is too large. We divide this file into small files and then arrange multiple people to count. This process is "Map". Then merge the numbers counted by each person, and this is "Reduce".
If you do the above example in MapReduce, you need to create a task job, which divides the file into several independent data blocks and distributes it in different machine nodes. Then process it in a completely parallel way through Map tasks scattered in different nodes. MapReduce will collect the output lines of the Map, and then send the result output to Reduce for the next step of processing.
For the specific execution process of a task, there will be a process called "JobTracker" responsible for coordinating all tasks in the execution process of MapReduce. Several TaskTracker processes are used to run separate Map tasks and report the execution of the task to JobTracker at any time. If a TaskTracker reports a task or fails to report its own task for a long time, JobTracker will start another TaskTracker to re-execute a separate Map task.
(1) Create a related maven project under eclipse, relying on the jar package as follows (you can also refer to the pom configuration of the hadoop-mapreduce-examples project under the hadoop source code package)
Note: To configure a maven plugin maven-jar-plugin and specify mainClass
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.5.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.5.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.xxx.demo.hadoop.wordcount.WordCount</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build>
(2) According to the operation mechanism of MapReduce, a job must write at least three classes to complete the three things: Map logic, Reduce logic, and job scheduling.
Map's code can inherit org.apache.hadoop.mapreduce.Mapper class
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); // Since this example does not use the key parameter, the type of the key is simply specified as Object public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }Reduce's code can inherit org.apache.hadoop.mapreduce.Reducer class
public class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }Write main method for job scheduling
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true) ; //System.exit(job.waitForCompletion(true) ? 0 : 1); }Execute mvn install to type the project into a jar file and upload it to the linux cluster environment. Use the hdfs dfs -mkdir command to create the corresponding command in the hdfs file system. Use hdfs dfs -put to upload the data files that need to be processed to the hdfs system. Example: hdfs dfs -put ${linux_path/data file} ${hdfs_path}
Execute the command in a cluster environment: hadoop jar ${linux_path}/wordcount.jar ${hdfs_input_path} ${hdfs_output_path}
hdfs dfs -cat ${hdfs_output_path}/output file name
The above method is run in Local mode when the hadoop cluster environment is not started. At this time, neither HDFS nor YARN work. The following is the work you need to do when executing mapreduce job in pseudo-distributed mode. First, excerpt the steps listed on the official website:
Configure the host name
# vi /etc/sysconfig/network
For example:
NETWORKING=yesHOSTNAME=mastervi /etc/hosts
Fill in the following content
127.0.0.1 localhost
Configure ssh without password intercommunication
ssh-keygen -t rsa
# cat?~/.ssh/id_rsa.pub?>>?~/.ssh/authorized_keys
Configure the core-site.xml file (located at ${HADOOP_HOME}/etc/hadoop/
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property></configuration>
Configure the hdfs-site.xml file
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property></configuration>
The following command can run the mapreduce job in stand-alone pseudo-distribution mode
1.Format the filesystem:
$ bin/hdfs namenode -format
2.Start NameNode daemon and DataNode daemon:
$ sbin/start-dfs.sh
3.The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).4.Browse the web interface for the NameNode; by default it is available at:
NameNode - http://localhost:50070/
Make the HDFS directories required to execute MapReduce jobs:
$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/<username>
5.Copy the input files into the distributed filesystem:
$ bin/hdfs dfs -put etc/hadoop input
6.Run some of the examples provided:
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[az.]+'
7.Examine the output files:
Copy the output files from the distributed filesystem to the local filesystem and examin them:$ bin/hdfs dfs -get output output
$ cat output/*
orView the output files on the distributed filesystem:
$ bin/hdfs dfs -cat output/*
8.When you're done, stop the daemons with:
$ sbin/stop-dfs.sh
The above is the entire content of the wordcount instance code in this article, I hope it will be helpful to everyone. Interested friends can continue to refer to other related topics on this site. If there are any shortcomings, please leave a message to point it out. Thank you friends for your support for this site!