Monday 25 November 2013

MapReduce Overview

What is MapReduce ?

Map reduce is an algorithm or concept to process Huge amount of data in a faster way. As per its name you can divide it Map and Reduce.

The main MapReduce job usually splits the input data-set into independent chunks.

MapTask: will process these chunks in a completely parallel manner (One node can process one or more chunks).The framework sorts the outputs of the maps.

Reduce Task : And the above output will be the input for the reducetasks, produces the final result.

Your business logic would be written in the MappedTask and ReducedTask. Typically both the input and the output of the job are stored in a file-system (Not database). The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.


MapReduce Overview



'MapReduce' is a framework for processing parallelizable problems across huge datasets using a large number of computers (nodes), collectively referred to as a cluster (if all nodes are on the same local network and use similar hardware) or a grid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogenous hardware). Computational processing can occur on data stored either in a filesystem (unstructured) or in a database (structured). MapReduce can take advantage of locality of data, processing data on or near the storage assets to decrease transmission of data.

"Map" step: The master node takes the input, divides it into smaller sub-problems, and distributes them to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure. The worker node processes the smaller problem, and passes the answer back to its master node.

"Reduce" step: The master node then collects the answers to all the sub-problems and combines them in some way to form the output – the answer to the problem it was originally trying to solve.
MapReduce allows for distributed processing of the map and reduction operations. Provided that each mapping operation is independent of the others, all maps can be performed in parallel – though in practice this is limited by the number of independent data sources and/or the number of CPUs near each source. Similarly, a set of 'reducers' can perform the reduction phase, provided that all outputs of the map operation that share the same key are presented to the same reducer at the same time, or that the reduction function is associative. While this process can often appear inefficient compared to algorithms that are more sequential, MapReduce can be applied to significantly larger datasets than "commodity" servers can handle – a large server farm can use MapReduce to sort a petabyte of data in only a few hours.[citation needed] The parallelism also offers some possibility of recovering from partial failure of servers or storage during the operation: if one mapper or reducer fails, the work can be rescheduled – assuming the input data is still available.

Another way to look at MapReduce is as a 5-step parallel and distributed computation:

Prepare the Map() input – the "MapReduce system" designates Map processors, assigns the K1 input key value each processor would work on, and provides that processor with all the input data associated with that key value.

Run the user-provided Map() code – Map() is run exactly once for each K1 key value, generating output organized by key values K2.

"Shuffle" the Map output to the Reduce processors – the MapReduce system designates Reduce processors, assigns the K2 key value each processor would work on, and provides that processor with all the Map-generated data associated with that key value.

Run the user-provided Reduce() code – Reduce() is run exactly once for each K2 key value produced by the Map step.

Produce the final output – the MapReduce system collects all the Reduce output, and sorts it by K2 to produce the final outcome.

Note:
Logically these 5 steps can be thought of as running in sequence – each step starts only after the previous step is completed – though in practice, of course, they can be intertwined, as long as the final result is not affected.

In many situations the input data might already be distributed ("sharded") among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as much as possible local to the Map-generated data they need to process.


MapReduce Example: WordCount



Driver Code:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool
{
public int run(String args[]) throws Exception
{
if(args.length!=2)
{
System.out.println("check your i/p and o/ps");
return -1;
}
JobConf conf=new JobConf(getConf(),WordCount.class);
conf.setJobName(this.getClass().getName());
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
conf.setMapperClass(WordMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf); 
return 0;
}
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new WordCount(), args);
    System.exit(exitCode);
  }
}

Mapper Code:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class WordMapper extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, IntWritable> {

  @Override
  public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {
    String s = value.toString();
    for (String word : s.split("\\W+")) {
      if (word.length() > 0) {
        output.collect(new Text(word), new IntWritable(1));
      }
    }
  }
}


Reducer Code:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SumReducer extends MapReduceBase implements
    Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  public void reduce(Text key, Iterator<IntWritable> values,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {

    int wordCount = 0;
    while (values.hasNext()) {
      IntWritable value = values.next();
      wordCount += value.get();
    }
    output.collect(key, new IntWritable(wordCount));
  }
}

Compile and Submit the Job:

1. Create the file
$cat >balu.txt
Hello World Hello World
Hello World Hello World
ctrl+d(save the file)

Put the file into HDFS by using bellow command

$hadoop fs -put balu.txt balu.txt

2.Compile the all the Java files

goto the .java files located directory
Ex:/home/training/wordcount/
my files are located in above directory

$javac -classpath /usr/lib/hadoop/hadoop-core.jar *.java

see the all .class file by using bellow command

$ls

3. Create the Jar file on .class files

$jar cvf word.jar *.class

$ls

4. Run the Job

$hadoop jar word.jar WordCount balu.txt baluoutput 

5. Check the output through command prompt

$hadoop fs -cat /user/training/baluoutput

                         or

goto browser and type the following link and check the output

http://localhost:50070


Explanation:

WordCount implementation via Hadoop framework 

We will count the words in all the input file flow as below 
Input: 
Assume there are two files each having a sentence 
Hello World Hello World (In file 1) 
Hello World Hello World (In file 2) 

Mapper : There would be each mapper for the a file 
For the given sample input the first map output:
< Hello, 1>
< World, 1> 
< Hello, 1> 
< World, 1> 
The second map output:
< Hello, 1> 
< World, 1> 
< Hello, 1> 
< World, 1>

Combiner/Sorting (This is done for each individual map) 
So output looks like this 
The output of the first map: 
< Hello, 2> 
< World, 2> 
The output of the second map: 
< Hello, 2> 
< World, 2>

Reducer :

It sums up the above output and generates the output as 

below 
< Hello, 4> 
< World, 4>

Output: 

Final output would look like 
Hello 4 times 
World 4 times

No comments:

Post a Comment