Monday, 25 November 2013

MapReduce Overview

What is MapReduce ?

Map reduce is an algorithm or concept to process Huge amount of data in a faster way. As per its name you can divide it Map and Reduce.

The main MapReduce job usually splits the input data-set into independent chunks.

MapTask: will process these chunks in a completely parallel manner (One node can process one or more chunks).The framework sorts the outputs of the maps.

Reduce Task : And the above output will be the input for the reducetasks, produces the final result.

Your business logic would be written in the MappedTask and ReducedTask. Typically both the input and the output of the job are stored in a file-system (Not database). The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

MapReduce Overview

'MapReduce' is a framework for processing parallelizable problems across huge datasets using a large number of computers (nodes), collectively referred to as a cluster (if all nodes are on the same local network and use similar hardware) or a grid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogenous hardware). Computational processing can occur on data stored either in a filesystem (unstructured) or in a database (structured). MapReduce can take advantage of locality of data, processing data on or near the storage assets to decrease transmission of data.

"Map" step: The master node takes the input, divides it into smaller sub-problems, and distributes them to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure. The worker node processes the smaller problem, and passes the answer back to its master node.

"Reduce" step: The master node then collects the answers to all the sub-problems and combines them in some way to form the output – the answer to the problem it was originally trying to solve.
MapReduce allows for distributed processing of the map and reduction operations. Provided that each mapping operation is independent of the others, all maps can be performed in parallel – though in practice this is limited by the number of independent data sources and/or the number of CPUs near each source. Similarly, a set of 'reducers' can perform the reduction phase, provided that all outputs of the map operation that share the same key are presented to the same reducer at the same time, or that the reduction function is associative. While this process can often appear inefficient compared to algorithms that are more sequential, MapReduce can be applied to significantly larger datasets than "commodity" servers can handle – a large server farm can use MapReduce to sort a petabyte of data in only a few hours.[citation needed] The parallelism also offers some possibility of recovering from partial failure of servers or storage during the operation: if one mapper or reducer fails, the work can be rescheduled – assuming the input data is still available.

Another way to look at MapReduce is as a 5-step parallel and distributed computation:

Prepare the Map() input – the "MapReduce system" designates Map processors, assigns the K1 input key value each processor would work on, and provides that processor with all the input data associated with that key value.

Run the user-provided Map() code – Map() is run exactly once for each K1 key value, generating output organized by key values K2.

"Shuffle" the Map output to the Reduce processors – the MapReduce system designates Reduce processors, assigns the K2 key value each processor would work on, and provides that processor with all the Map-generated data associated with that key value.

Run the user-provided Reduce() code – Reduce() is run exactly once for each K2 key value produced by the Map step.

Produce the final output – the MapReduce system collects all the Reduce output, and sorts it by K2 to produce the final outcome.

Logically these 5 steps can be thought of as running in sequence – each step starts only after the previous step is completed – though in practice, of course, they can be intertwined, as long as the final result is not affected.

In many situations the input data might already be distributed ("sharded") among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as much as possible local to the Map-generated data they need to process.

MapReduce Example: WordCount

Driver Code:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool
public int run(String args[]) throws Exception
System.out.println("check your i/p and o/ps");
return -1;
JobConf conf=new JobConf(getConf(),WordCount.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
return 0;
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new WordCount(), args);

Mapper Code:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class WordMapper extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, IntWritable> {

  public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {
    String s = value.toString();
    for (String word : s.split("\\W+")) {
      if (word.length() > 0) {
        output.collect(new Text(word), new IntWritable(1));

Reducer Code:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SumReducer extends MapReduceBase implements
    Reducer<Text, IntWritable, Text, IntWritable> {

  public void reduce(Text key, Iterator<IntWritable> values,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {

    int wordCount = 0;
    while (values.hasNext()) {
      IntWritable value = values.next();
      wordCount += value.get();
    output.collect(key, new IntWritable(wordCount));

Compile and Submit the Job:

1. Create the file
$cat >balu.txt
Hello World Hello World
Hello World Hello World
ctrl+d(save the file)

Put the file into HDFS by using bellow command

$hadoop fs -put balu.txt balu.txt

2.Compile the all the Java files

goto the .java files located directory
my files are located in above directory

$javac -classpath /usr/lib/hadoop/hadoop-core.jar *.java

see the all .class file by using bellow command


3. Create the Jar file on .class files

$jar cvf word.jar *.class


4. Run the Job

$hadoop jar word.jar WordCount balu.txt baluoutput 

5. Check the output through command prompt

$hadoop fs -cat /user/training/baluoutput


goto browser and type the following link and check the output



WordCount implementation via Hadoop framework 

We will count the words in all the input file flow as below 
Assume there are two files each having a sentence 
Hello World Hello World (In file 1) 
Hello World Hello World (In file 2) 

Mapper : There would be each mapper for the a file 
For the given sample input the first map output:
< Hello, 1>
< World, 1> 
< Hello, 1> 
< World, 1> 
The second map output:
< Hello, 1> 
< World, 1> 
< Hello, 1> 
< World, 1>

Combiner/Sorting (This is done for each individual map) 
So output looks like this 
The output of the first map: 
< Hello, 2> 
< World, 2> 
The output of the second map: 
< Hello, 2> 
< World, 2>

Reducer :

It sums up the above output and generates the output as 

< Hello, 4> 
< World, 4>


Final output would look like 
Hello 4 times 
World 4 times

Thursday, 21 November 2013

Dancing with Sqoop

What is Sqoop?

Sqoop is a command-line interface application for transferring data between relational databases and Hadoop. (or) Import/Export data from RDBMS to Hadoop(HDFS) by using sqoop.

You can use Sqoop to import data from a relational database management system (RDBMS) such as MySQL or Oracle into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS.

Step1: Consider I have a table in mysql(emp) as follows

mysql>create database <database name>;
mysql>use <database>;

Step2: Now we need to give grant permissions to our created database as follows

mysql>grant all privileges on *.* to '<database username>'@'%' identified by '<database password>';
mysql> flush priviliges;

Step3: Now we need to create one table and insert values into table

mysql> create table emp(id int,name varchar(20),sal float);
mysql>insert into emp values(101,'xxxx',1234.5);
mysql>insert into emp values(201,'yyyy',567.7);
mysql>select * from emp;
mysql> create table empcity(city varchar(20),venue varchar(20),cityid int);
mysql> insert into empcity values('xxx','rrr',101);
mysql> insert into empcity values('xxxx','rrrx',106);

Step 4: Now i need to load this table values into HDFS in other system(PC).

Let us assume we had already pre-installed Hadoop, Sqoop and other Hadoop ecosystem(cdh3/cdh4) available.
First open cdh3/cdh4 Hadoop cluster environment and then open terminal now we need to execute following commands,

To display the list of databases in MySQL by using Hadoop system:

training@localhost$ sqoop list-databases --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)> -username root -P

Ex: training@localhost$localhost$sqoop list-databases --connect jdbc:mysql://     -username root -P

Tto display all the tables in particuler database by using Hadoop system:

training@localhost$ sqoop list-tables --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> -username <db username> -P

Ex: training@localhost$ sqoop list-tables --connect jdbc:mysql:// -username root -P

Import the data from MySQL to HDFS:

training@localhost$ sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> --table emp --username root -P --split-by id --target-dir /user/training/emp -m 1

Ex: training@localhost$ sqoop import --connect jdbc:mysql:// --table emp --username root -P --split-by id --target-dir /user/training/emp

Import data into Hive table from MySQL:

training@localhost$ sudo sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database name> --table <table name> --username <db username> --password <db password> --target-dir /user/training/hive/warehouse/<hive database>/<table name> -m 1

Ex: training@localhost$sudo sqoop import --connect jdbc:mysql:// --table emp --username root --password training --target-dir /user/training/hive/warehouse/myown.db/emp -m 1

Importing specific resultset based on our requirement:

training@localhost$sudo sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database name> --query "select * from emp where id=101 AND \$CONDITIONS" --username <username> --password <password> --target-dir /user/training/data -m 1

Ex: training@localhost$ sudo sqoop import --connect jdbc:mysql:// --query "select * from exp where id=101 AND \$CONDITIONS" --username root --password training --target-dir /user/training/datadir -m 1

Importing data after performing join into HDFS:

training@localhost$ sudo sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> --query "select emp.*,empcity.* from emp join empcity on(emp.id=empcity.cityid) AND \$CONDITIONS" --username <dbusername> --password <password> --target-dir /user/training/datadir -m 1

Export data from HDFS to MySQL table:

First of all we need to create one table in MySQL:

mysql> create table emp1(id int,name varchar(20),sal float);

Consider we had the data in HDFS:

training@localhost$ sudo sqoop export --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> --table <table name in mysql> --username <username> --password <password> --export-dir <hdfs directory> -m 1

Ex: training@localhost$ sudo sqoop export --connect jdbc:mysql:// --table exp --username root --password training --export-dir /user/training/qqqq -m 1

Export data with specific delimiter:

training@localhost$sudo sqoop export --connect jdbc:mysql://<database  bame> --table <table name> --username <username> --password <password> --export-dir <hdf directory> --input-fields-terminated-by '@'  -m 1

Ex: training@localhost$ sudo sqoop export --connect jdbc:mysql:// --table exp1 --username root --password training --export-dir /user/training/exp --input-fields-terminated-by '@'  -m 1

Friday, 8 November 2013

The Hadoop Distributed File System


HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications. This module introduces the design of this distributed file system and instructions on how to operate it.

A distributed file system is designed to hold a large amount of data and provide access to this data to many clients distributed across a network. 

How to solve the Traditional System problems by using Big Data

Traditional System Problem : Data is too big store in one computer

Today's big data is 'too big' to store in ONE single computer -- no matter how powerful it is and how much storage it has. This eliminates lot of storage system and databases that were built for single machines. So we are going to build the system to run on multiple networked computers. The file system will look like a unified single file system to the 'outside' world

Hadoop solution : Data is stored on multiple computers

Traditional System Problem : Very high end machines are expensive

Now that we have decided that we need a cluster of computers, what kind of machines are they? Traditional storage machines are expensive with top-end components, sometimes with 'exotic' components (e.g. fiber channel for disk arrays, etc). Obviously these computers cost a pretty penny.
We want our system to be cost-effective, so we are not going to use these 'expensive' machines. Instead we will opt to use commodity hardware. By that we don't mean cheapo desktop class machines. We will use performant server class machines -- but these will be commodity servers that you can order from any of the vendors (Dell, HP, etc)

Hadoop solution : Run on commodity hardware

Traditional System Problem : Commodity hardware will fail

In the old days of distributed computing, failure was an exception, and hardware errors were not tolerated well. So companies providing gear for distributed computing made sure their hardware seldom failed. This is achieved by using high quality components, and having backup systems (in come cases backup to backup systems!). So the machines are engineered to withstand component failures, but still keep functioning. This line of thinking created hardware that is impressive, but EXPENSIVE!

On the other hand we are going with commodity hardware. These don't have high end whiz bang components like the main frames mentioned above. So they are going to fail -- and fail often. We need to prepared for this. How?

The approach we will take is we build the 'intelligence' into the software. So the cluster software will be smart enough to handle hardware failure. The software detects hardware failures and takes corrective actions automatically -- without human intervention. Our software will be smarter!

Hadoop solution : Software is intelligent enough to deal with hardware failure

Traditional System Problem : hardware failure may lead to data loss

So now we have a network of machines serving as a storage layer. Data is spread out all over the nodes. What happens when a node fails (and remember, we EXPECT nodes to fail). All the data on that node will become unavailable (or lost). So how do we prevent it?

One approach is to make multiple copies of this data and store them on different machines. So even if one node goes down, other nodes will have the data. This is called 'replication'. The standard replication is 3 copies.

Hadoop Solution : replicate (duplicate) data

Traditional System Problem : how will the distributed nodes co-ordinate among themselves

Since each machine is part of the 'storage', we will have a 'daemon' running on each machine to manage storage for that machine. These daemons will talk to each other to exchange data.

OK, now we have all these nodes storing data, how do we coordinate among them? One approach is to have a MASTER to be the coordinator. While building distributed systems with a centralized coordinator may seem like an odd idea, it is not a bad choice. It simplifies architecture, design and implementation of the system

So now our architecture looks like this. We have a single master node and multiple worker nodes.

Hadoop solution : There is a master node that co-ordinates all the worker nodes

Overview of HDFS

HDFS has many similarities with other distributed file systems, but is different in several respects. One noticeable difference is HDFS's write-once-read-many model that relaxes concurrency control requirements, simplifies data coherency, and enables high-throughput access.

Another unique attribute of HDFS is the viewpoint that it is usually better to locate processing logic near the data rather than moving the data to the application space.

HDFS rigorously restricts data writing to one writer at a time. Bytes are always appended to the end of a stream, and byte streams are guaranteed to be stored in the order written.

HDFS has many goals. Here are some of the most notable:

  • Fault tolerance by detecting faults and applying quick, automatic recovery
  • Data access via MapReduce streaming
  • Simple and robust coherency model
  • Processing logic close to the data, rather than the data close to the processing logic
  • Portability across heterogeneous commodity hardware and operating systems
  • Scalability to reliably store and process large amounts of data
  • Economy by distributing data and processing across clusters of commodity personal computers
  • Efficiency by distributing data and logic to process it in parallel on nodes where data is located
  • Reliability by automatically maintaining multiple copies of data and automatically redeploying processing logic in the event of failures
HDFS Architecture

Master / worker design

In an HDFS cluster, there is ONE master node and many worker nodes. The master node is called the Name Node (NN) and the workers are called Data Nodes (DN). Data nodes actually store the data. They are the workhorses.

Name Node is in charge of file system operations (like creating files, user permissions, etc.). Without it, the cluster will be inoperable. No one can write data or read data. 
This is called a Single Point of Failure. We will look more into this later.

Runs on commodity hardware

As we saw hadoop doesn't need fancy, high end hardware. It is designed to run on commodity hardware. The Hadoop stack is built to deal with hardware failure and the file system will continue to function even if nodes fail.

HDFS is resilient (even in case of node failure)

The file system will continue to function even if a node fails. Hadoop accomplishes this by duplicating data across nodes.

Data is replicated

So how does Hadoop keep data safe and resilient in case of node failure? Simple, it keeps multiple copies of data around the cluster.

To understand how replication works, lets look at the following scenario. Data segment #2 is replicated 3 times, on data nodes A, B and D. Lets say data node A fails. The data is still accessible from nodes B and D.

HDFS is better suited for large files

Generic file systems, say like Linux EXT file systems, will store files of varying size, from a few bytes to few gigabytes. HDFS, however, is designed to store large files. Large as in a few hundred megabytes to a few gigabytes.

Why is this?

HDFS was built to work with mechanical disk drives, whose capacity has gone up in recent years. However, seek times haven't improved all that much. So Hadoop tries to minimize disk seeks.

Files are write-once only (not updateable)

HDFS supports writing files once (they cannot be updated). This is a stark difference between HDFS and a generic file system (like a Linux file system). Generic file systems allows files to be modified.

However appending to a file is supported. Appending is supported to enable applications like HBase.


Namenode acts as master in HDFS. It stores file system metadata and transaction log of changes happening in file system. Namenode does not store actual file data.

Namenode also maintains block map report sent by individual data nodes. Whenever any client wants to perform any operation on file. It contacts Namenode, which responds to this request by providing block map and Datanode information.


Datanode is the actual storage component in HDFS. Datanode store data in HDFS file system. A typical production HDFS cluster has one Namenode and multiple Datanodes.

Datanodes talks to Namenode in the form of heartbeats to let Namenode know that particular Datanode is alive and Block report which consists of list of data block held by that particular Datanode. Datanodes also talks to other Datanode directly for data replication.

Checkpoint Node

HDFS stores its namespace and file system transaction log in FsImage and EditLog files on Namenode local disk. When Namenode starts-up, changes recorded in EditLog are merged with FsImage, So that HDFS always have up-to date file system metadata. After merging the changes from EditLog to FsImage, HDFS removes the old FsImage copy and replaces it with newer one as it has new updated FsImage which represents current state of HDFS and then it opens up new EditLog.

In any HDFS instance, Namenode is the single point of failure because if Namenode maintains the namespace and Editlog, if these files are corrupted or lost, whole cluster will go down. To avoid this, multiple copies of FsImage and EditLog can be maintained on different machine using checkpoint node.

Checkpoint node creates the periodic checkpoints of namespace and edit log. Checkpoint node downloads the latest copies of FsImage and EditLog from active Namenode, stores them locally, merges them and uploads back to active Namenode.

A true production Hadoop cluster should have checkpoint node running on different machine which is of same configuration like active namenode in terms of memory.

The Checkpoint node stores the latest checkpoint in a directory which has same structure as the NameNode’s directory. This allows the checkpointed image to be always available for reading by the NameNode if necessary. It is possible to have multiple checkpoint node in a cluster. This can be specified in HDFS configuration file.

Backup Node

Backup node works the same way like checkpoint. Backup node provides checkpoint functionality and in addition to this it also maintains updated copies of file system namespace in its memory. This in-memory copy is in synchronized with Namenode. Backup node applies the EditLog changes to in-memory copy namespace and stores it on disk. This way backup node always have up to date copies of EditLog and FsImage on disk and in memory.

In contrast with checkpoint node, where checkpoint node needs to download the copies of FsImage and EditLog, Backup node does not need to download these copies, as it always have updated copy of namespace in its memory. It only needs to apply latest EditLog changes to in-memory namespace and stores the copies of FsImage and Edit log on its local disk. Due to this backup node checkpoint process is more efficient than checkpoint node.

Backup node memory requirement is same as Namenode as it needs to maintain namespace in memory like Namenode. You can have only one backup node (multiple backup node are not supported at this point of time) and no checkpoint node can run, when backup node is running. Means you can have either backup node or checkpoint node, not both at a time.

Since backup node can maintain the copies of namespace in memory, you can start Namenode in a such a way that Namenode no longer needs to maintain namespace in its own memory, Namenode node can delegate this tack to backup node. In such case Namenode will import the namespace from backup node memory whenever it requires namespace. This can be done by starting Namenode with –importCheckpoint option.

Saturday, 2 November 2013

What is Unstructured Data

The phrase "unstructured data" usually refers to information that doesn't reside in a traditional row-column database. As you might expect, it's the opposite of structured data -- the data stored in fields in a database.

Unstructured data files often include text and multimedia content. Examples include e-mail messages, word processing documents, videos, photos, audio files, presentations, webpages and many other kinds of business documents. Note that while these sorts of files may have an internal structure, they are still considered "unstructured" because the data they contain doesn't fit neatly in a database.

Experts estimate that 80 to 90 percent of the data in any organization is unstructured. And the amount of unstructured data in enterprises is growing significantly -- often many times faster than structured databases are growing.

Mining Unstructured Data

Many organizations believe that their unstructured data stores include information that could help them make better business decisions. Unfortunately, it's often very difficult to analyze unstructured data. To help with the problem, organizations have turned to a number of different software solutions designed to search unstructured data and extract important information. The primary benefit of these tools is the ability to glean actionable information that can help a business succeed in a competitive environment.

Because the volume of unstructured data is growing so rapidly, many enterprises also turn to technological solutions to help them better manage and store their unstructured data. These can include hardware or software solutions that enable them to make the most efficient use of their available storage space.

Unstructured Data and 'Big Data'

As mentioned above, unstructured data is the opposite of structured data. Structured data generally resides in a relational database, and as a result, it is sometimes called "relational data." This type of data can be easily mapped into pre-designed fields. For example, a database designer may set up fields for phone numbers, zip codes and credit card numbers that accept a certain number of digits. Structured data has been or can be placed in fields like these. By contrast, unstructured data is not relational and doesn't fit into these sorts of pre-defined data models.

In addition to structured and unstructured data, there's also a third category: semi-structured data. Semi-structured data is information that doesn't reside in a relational database but that does have some organizational properties that make it easier to analyze. Examples of semi-structured data might include XML documents and NoSQL databases.

The term "big data" is closely associated with unstructured data. "Big data" refers to extremely large datasets that are difficult to analyze with traditional tools. Big data can include both structured and unstructured data, but IDC estimates that 90 percent of big data is unstructured data. Many of the tools designed to analyze big data can handle unstructured data.

Unstructured Data Vendors

Numerous vendors offer products designed to help companies analyze and manage their unstructured data. They include the following:
The open source community has been particularly active in developing software that can manage unstructured data, and many vendors offer paid products and services related to these open source projects. Open source projects and vendors related to the storage, management and analysis of unstructured data include the following: