Tuesday, 31 December 2013

HBase Examples

Go to HBase Mode
          $hbase shell

List all the tables

Create HBase table with Normal Mode
          hbase>create ‘cars’, ‘vi’

Let’s insert 3 column qualifies (make, model, year) and the associated values into the first row (row1).
          hbase>put ‘cars’, ‘row1’, ‘vi:make’, ‘BMW’
          hbase>put ‘cars’, ‘row1’, ‘vi:model’, ‘5 series’
          hbase>put ‘cars’, ‘row1’, ‘vi:year’, ‘2012’

Now let’s add second row
          hbase>put ‘cars’, ‘row2’, ‘vi:make’, ‘Ferari’
          hbase>put ‘cars’, ‘row2’, ‘vi:model’, ‘e series’
          hbase>put ‘cars’, ‘row2’, ‘vi:year’, ‘2012’

Now let’s add third row
          hbase>put ‘cars’, ‘row3’, ‘vi:make’, ‘Honda’
          hbase>put ‘cars’, ‘row3’, ‘vi:model’, ‘f series’
          hbase>put ‘cars’, ‘row3’, ‘vi:year’, ‘2013’

Sacn the table
          hbase>scan ‘cars’

The next scan we’ll run will limit our results to the make column qualifier.
          hbase>scan ‘cars’, {COLUMNs=>[‘vi:make’]}

1 row to demonstrate how LIMIT works.
          hbase>scan ‘cars’, {COLUMNS =>[‘vi:make’], LIMIT => 1}

We’ll start by getting all columns in row1.
          hbase>get ‘cars’, ‘row1’

You should see output similar to:
COLUMN                   CELL
vi:make                 timestamp=1344817012999, value=bmw
vi:model                timestamp=1344817020843, value=5 series
vi:year                 timestamp=1344817033611, value=2012

To get one specific column include the COLUMN option.
         hbase>get ‘cars’, ‘row1’, {COLUMNS => ‘vi:make’}

You can also get two or more columns by passing an array of columns.
         hbase>get ‘cars’, ‘row1’, {COLUMNS => [‘vi:make’, ‘vi:year’]}

Delete a cell (value)
         hbase>delete ‘cars’, ‘row2’, ‘vi:year’

Let’s check that our delete worked
         hbase>get ‘cars’, ‘row2’

You should see output that shows 2 columns.
vi:make   timestamp=1344817104923, value=mercedes
vi:model   timestamp=1344817115463, value=e class 2
row(s) in 0.0080 seconds

Disable and drop tables
>disable ‘cars’

>drop ‘cars’

Exit the table

HBase Shell Commands

Show the current hbase user.
         hbase> whoami

Alter column family schema;  pass table name and a dictionary specifying new column family schema. Dictionaries are described below in the GENERAL NOTES section.  Dictionary must include name
of column family to alter.
For example,

 To change or add the 'f1' column family in table 't1' from defaults to instead keep a maximum of 5 cell VERSIONS, do:
           hbase> alter 't1', {NAME => 'f1', VERSIONS => 5}

To delete the 'f1' column family in table 't1', do:
           hbase> alter 't1', {NAME => 'f1', METHOD => 'delete'}

 You can also change table-scope attributes like MAX_FILESIZE

 For example, to change the max size of a family to 128MB, do:
           hbase> alter 't1', {METHOD => 'table_att', MAX_FILESIZE => '134217728'}

Count the number of rows in a table. This operation may take a LONG time (Run '$HADOOP_HOME/bin/hadoop jar hbase.jar rowcount' to run a counting mapreduce job). Current count is shown every 1000 rows by default. Count interval may be optionally specified.

           hbase> count 't1'
           hbase> count 't1', 100000
           hbase> t.count INTERVAL => 100000
           hbase> t.count CACHE => 1000
           hbase> t.count INTERVAL => 10, CACHE => 1000

Create table; pass table name, a dictionary of specifications per column family, and optionally a dictionary of table configuration. Dictionaries are described below in the GENERAL NOTES section.

           hbase> create 't1', {NAME => 'f1', VERSIONS => 5}
           hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}
           hbase> # The above in shorthand would be the following:
           hbase> create 't1', 'f1', 'f2', 'f3'
           hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, \
             BLOCKCACHE => true}

Describe the named table
        e.g. "hbase> describe 't1'"

Put a delete cell value at specified table/row/column and optionally timestamp coordinates.  Deletes must match the deleted cell's coordinates exactly.  When scanning, a delete cell suppresses older versions. Takes arguments like the 'put' command described below
          hbase> delete ‘t1′, ‘r1′, ‘c1′, ts1

Delete all cells in a given row; pass a table name, row, and optionally a column and timestamp
Delete all cells in a given row; pass a table name, row, and optionally a column and timestamp.
          hbase> deleteall ‘t1′, ‘r1′
          hbase> deleteall ‘t1′, ‘r1′, ‘c1′
          hbase> deleteall ‘t1′, ‘r1′, ‘c1′, ts1
The same commands also can be run on a table reference. Suppose you had a reference t to table ‘t1′, the corresponding command would be:
          hbase> t.deleteall ‘r1′
          hbase> t.deleteall ‘r1′, ‘c1′
          hbase> t.deleteall ‘r1′, ‘c1′, ts1

Disable the named table:
            e.g. "hbase> disable 't1'"

Disable all of tables matching the given regex
            hbase> disable_all ‘t.*’

Drop the named table. Table must first be disabled.
               hbase> drop ‘t1′

Drop all of the tables matching the given regex
              hbase> drop_all ‘t.*’

Enable the named table
              hbase> enable ‘t1′

Enable all of the tables matching the given regex
             hbase> enable_all ‘t.*’

verifies Is named table enabled
            hbase> is_enabled ‘t1′

Does the named table exist?
             e.g. "hbase> exists 't1'"

Type "hbase> exit" to leave the HBase Shell

Get row or cell contents; pass table name, row, and optionally a dictionary of column(s), timestamp and versions.  

           hbase> get 't1', 'r1'
           hbase> get 't1', 'r1', {COLUMN => 'c1'}
           hbase> get 't1', 'r1', {COLUMN => ['c1', 'c2', 'c3']}
           hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1}
           hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1, \
             VERSIONS => 4}

List all tables in hbase
           hbase> list
           hbase> list ‘abc.*’

Put a cell 'value' at specified table/row/column and optionally timestamp coordinates.  To put a cell value into table 't1' at  row 'r1' under column 'c1' marked with the time 'ts1', do:
           hbase> put 't1', 'r1', 'c1', 'value', ts1

Listing of hbase surgery tools

Scan a table; pass table name and optionally a dictionary of scanner specifications.  Scanner specifications may include one or more of the following: LIMIT, STARTROW, STOPROW, TIMESTAMP, or COLUMNS.  If no columns are specified, all columns will be scanned.  To scan all members of a column family, leave the qualifier empty as in 'col_family:'.  

           hbase> scan '.META.'
           hbase> scan '.META.', {COLUMNS => 'info:regioninfo'}
           hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, \
             STARTROW => 'xyz'}

 For experts, there is an additional option -- CACHE_BLOCKS -- which switches block caching for the scanner on (true) or off (false).  By default it is enabled.  

           hbase> scan 't1', {COLUMNS => ['c1', 'c2'], CACHE_BLOCKS => false}

Show cluster status. Can be 'summary', 'simple', or 'detailed'. The default is 'summary'. 

           hbase> status
           hbase> status 'simple'
           hbase> status 'summary'
           hbase> status 'detailed'

Shut down the cluster.

Disables, drops and recreates the specified table.
          hbase>truncate ‘t1′

Output this HBase version
          hbase> version

Show all the filters in hbase.
          hbase> show_filters

Get the status of the alter command. Indicates the number of regions of the table that have received the updated schema Pass table name.
         hbase> alter_status ‘t1′

Flush all regions in passed table or pass a region row to flush an individual region. 
         hbase> flush ‘TABLENAME’
         hbase> flush ‘REGIONNAME’

Run major compaction on passed table or pass a region row to major compact an individual region. To compact a single column family within a region specify the region name followed by the column family name.
Compact all regions in a table:
         hbase> major_compact ‘t1′
Compact an entire region:
         hbase> major_compact ‘r1′
Compact a single column family within a region:
         hbase> major_compact ‘r1′, ‘c1′
Compact a single column family within a table:
         hbase> major_compact ‘t1′, ‘c1′

Split entire table or pass a region to split individual region. With the second parameter, you can specify an explicit split key for the region.
         hbase>split ‘tableName’
         hbase>split ‘regionName’ # format: ‘tableName,startKey,id’
         hbase>split ‘tableName’, ‘splitKey’
         hbase>split ‘regionName’, ‘splitKey’

Dump status of HBase cluster as seen by ZooKeeper. 

Restarts all the replication features. The state in which each stream starts in is undetermined. 
WARNING: start/stop replication is only meant to be used in critical load situations.
       hbase> start_replication

Stops all the replication features. The state in which each stream stops in is undetermined.
WARNING: start/stop replication is only meant to be used in critical load situations.
       hbase> stop_replication

Grant users specific rights.
Syntax : grant permissions is either zero or more letters from the set “RWXCA”.
         hbase> grant ‘bobsmith’, ‘RWXCA’
         hbase> grant ‘bobsmith’, ‘RW’, ‘t1′, ‘f1′, ‘col1′

Revoke a user’s access rights.
Syntax : revoke
        hbase> revoke ‘bobsmith’, ‘t1′, ‘f1′, ‘col1′

Show all permissions for the particular user.
Syntax : user_permission
      hbase> user_permission
     hbase> user_permission ‘table1′

Hbase Data model

Hbase Data model - These six concepts form the foundation of HBase.

Table: HBase organizes data into tables. Table names are Strings and composed of characters that are safe for use in a file system path.

Row: Within a table, data is stored according to its row. Rows are identified uniquely by their rowkey. Rowkeys don’t have a data type and are always treated as a byte[].

Column family: Data within a row is grouped by column family. Column families also impact the physical arrangement of data stored in HBase. For this reason,they must be
defined up front and aren’t easily modified. Every row in a table has the same column families, although a row need not store data in all its families.Column family names are Strings and composed of characters that are safe for use in a file system path.

Column qualifier: Data within a column family is addressed via its column qualifier,or column. Column qualifiers need not be specified in advance. Column qualifiers need not be consistent between rows. Like rowkeys, column qualifiers don’t have a data type and are always treated as a byte[].

Cell: A combination of rowkey, column family, and column qualifier uniquely identifies a cell. The data stored in a cell is referred to as that cell’s value. Values also don’t have a data type and are always treated as a byte[].

Version: Values within a cell are versioned. Versions are identified by their timestamp,a long. When a version isn’t specified, the current timestamp is used as the
basis for the operation. The number of cell value versions retained by HBase is configured via the column family. The default number of cell versions is three. Versions stored in decreasing order of timestamp.

HBase Architecture

      The HBase Architecture consists of servers in a Master-Slave relationship as shown below. Typically, the HBase cluster has one Master node, called HMaster and multiple Region Servers called HRegionServer. Each Region Server contains multiple Regions – HRegions.

Just like in a Relational Database, data in HBase is stored in Tables and these Tables are stored in Regions. When a Table becomes too big, the Table is partitioned into multiple Regions. These Regions are assigned to Region Servers across the cluster. Each Region Server hosts roughly the same number of Regions.

The HMaster in the HBase is responsible for
  • Performing Administration
  • Managing and Monitoring the Cluster
  • Assigning Regions to the Region Servers
  • Controlling the Load Balancing and Failover
On the other hand, the HRegionServer perform the following work
  • Hosting and managing Regions
  • Splitting the Regions automatically
  • Handling the read/write requests
  • Communicating with the Clients directly

Each Region Server contains a Write-Ahead Log (called HLog) and multiple Regions. Each Region in turn is made up of a MemStore and multiple StoreFiles (HFile). The data lives in these StoreFiles in the form of Column Families (explained below). The MemStore holds in-memory modifications to the Store (data).

The mapping of Regions to Region Server is kept in a system table called .META. When trying to read or write data from HBase, the clients read the required Region information from the .META table and directly communicate with the appropriate Region Server. Each Region is identified by the start key (inclusive) and the end key (exclusive)

HBase Tables and Regions

Table is made up of any number of regions.
Region is specified by its startKey and endKey.
  • Empty table: (Table, NULL, NULL)
  • Two-region table: (Table, NULL, “com.ABC.www”) and (Table, “com.ABC.www”, NULL)
Each region may live on a different node and is made up of several HDFS files and blocks, each of which is replicated by Hadoop. HBase uses HDFS as its reliable storage layer.It Handles checksums, replication, failover

HBase Tables:
  • Tables are sorted by Row in lexicographical order
  • Table schema only defines its column families
  • Each family consists of any number of columns
  • Each column consists of any number of versions
  • Columns only exist when inserted, NULLs are free
  • Columns within a family are sorted and stored together
  • Everything except table names are byte[]
  • Hbase Table format (Row, Family:Column, Timestamp) -> Value
Hbase consists of,
  • Java API, Gateway for REST, Thrift, Avro
  • Master manages cluster
  • RegionServer manage data
  • ZooKeeper is used the “neural network” and coordinates cluster
Data is stored in memory and flushed to disk on regular intervals or based on size
  • Small flushes are merged in the background to keep number of files small
  • Reads read memory stores first and then disk based files second
  • Deletes are handled with “tombstone” markers

After data is written to the WAL the RegionServer saves KeyValues in memory store
  • Flush to disk based on size, is hbase.hregion.memstore.flush.size
  • Default size is 64MB
  • Uses snapshot mechanism to write flush to disk while still serving from it and accepting new data at the same time
Two types: Minor and Major Compactions
Minor Compactions
  • Combine last “few” flushes
  • Triggered by number of storage files
Major Compactions
  • Rewrite all storage files
  • Drop deleted data and those values exceeding TTL and/or number of versions
Key Cardinality:
The best performance is gained from using row keys
  • Time range bound reads can skip store files
  • So can Bloom Filters
  • Selecting column families reduces the amount of data to be scanned   

Fold, Store, and Shift:
All values are stored with the full coordinates,including: Row Key, Column Family, Column Qualifier, and Timestamp
  • Folds columns into “row per column”
  • NULLs are cost free as nothing is stored
  • Versions are multiple “rows” in folded table

Apache HBase

        HBase is an open source, non-relational, distributed database modeled after Google's BigTable and is written in Java. It is developed as part of Apache Software Foundation's Apache Hadoop project and runs on top of HDFS (Hadoop Distributed Filesystem), providing BigTable-like capabilities for Hadoop.

HBase features compression, in-memory operation, and Bloom filters on a per-column basis as outlined in the original BigTable paper. Tables in HBase can serve as the input and output for MapReduce jobs run in Hadoop, and may be accessed through the Java API but also through REST, Avro or Thrift gateway APIs.

What is HBase?

        HBase is a column-oriented database management system that runs on top of HDFS. It is well suited for sparse data sets, which are common in many big data use cases. Unlike relational database systems, HBase does not support a structured query language like SQL; in fact, HBase isn’t a relational data store at all. HBase applications are written in Java much like a typical MapReduce application. HBase does support writing applications in Avro, REST, and Thrift.

       An HBase system comprises a set of tables. Each table contains rows and columns, much like a traditional database. Each table must have an element defined as a Primary Key, and all access attempts to HBase tables must use this Primary Key. An HBase column represents an attribute of an object; for example, if the table is storing diagnostic logs from servers in your environment, where each row might be a log record, a typical column in such a table would be the timestamp of when the log record was written, or perhaps the server name where the record originated. In fact, HBase allows for many attributes to be grouped together into what are known as column families, such that the elements of a column family are all stored together. This is different from a row-oriented relational database, where all the columns of a given row are stored together. With HBase you must predefine the table schema and specify the column families. However, it’s very flexible in that new columns can be added to families at any time, making the schema flexible and therefore able to adapt to changing application requirements.

         Just as HDFS has a NameNode and slave nodes, and MapReduce has JobTracker and TaskTracker slaves, HBase is built on similar concepts. In HBase a master node manages the cluster and region servers store portions of the tables and perform the work on the data. In the same way HDFS has some enterprise concerns due to the availability of the NameNode , HBase is also sensitive to the loss of its master node.

What is NoSQL DataBase?

           A NoSQL database provides a mechanism for storage and retrieval of data that is modeled in means other than the tabular relations used in relational databases. Motivations for this approach include simplicity of design, horizontal scaling and finer control over availability. NoSQL databases are often highly optimized key–value stores intended primarily for simple retrieval and appending operations, whereas an RDBMS is intended as a general purpose data store. There will thus be some operations where NoSQL is faster and some where an RDBMS is faster. NoSQL databases are finding significant and growing industry use in big data and real-time web applications.[1] NoSQL systems are also referred to as "Not only SQL" to emphasize that they may in fact allow SQL-like query languages to be used.

Monday, 25 November 2013

MapReduce Overview

What is MapReduce ?

Map reduce is an algorithm or concept to process Huge amount of data in a faster way. As per its name you can divide it Map and Reduce.

The main MapReduce job usually splits the input data-set into independent chunks.

MapTask: will process these chunks in a completely parallel manner (One node can process one or more chunks).The framework sorts the outputs of the maps.

Reduce Task : And the above output will be the input for the reducetasks, produces the final result.

Your business logic would be written in the MappedTask and ReducedTask. Typically both the input and the output of the job are stored in a file-system (Not database). The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

MapReduce Overview

'MapReduce' is a framework for processing parallelizable problems across huge datasets using a large number of computers (nodes), collectively referred to as a cluster (if all nodes are on the same local network and use similar hardware) or a grid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogenous hardware). Computational processing can occur on data stored either in a filesystem (unstructured) or in a database (structured). MapReduce can take advantage of locality of data, processing data on or near the storage assets to decrease transmission of data.

"Map" step: The master node takes the input, divides it into smaller sub-problems, and distributes them to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure. The worker node processes the smaller problem, and passes the answer back to its master node.

"Reduce" step: The master node then collects the answers to all the sub-problems and combines them in some way to form the output – the answer to the problem it was originally trying to solve.
MapReduce allows for distributed processing of the map and reduction operations. Provided that each mapping operation is independent of the others, all maps can be performed in parallel – though in practice this is limited by the number of independent data sources and/or the number of CPUs near each source. Similarly, a set of 'reducers' can perform the reduction phase, provided that all outputs of the map operation that share the same key are presented to the same reducer at the same time, or that the reduction function is associative. While this process can often appear inefficient compared to algorithms that are more sequential, MapReduce can be applied to significantly larger datasets than "commodity" servers can handle – a large server farm can use MapReduce to sort a petabyte of data in only a few hours.[citation needed] The parallelism also offers some possibility of recovering from partial failure of servers or storage during the operation: if one mapper or reducer fails, the work can be rescheduled – assuming the input data is still available.

Another way to look at MapReduce is as a 5-step parallel and distributed computation:

Prepare the Map() input – the "MapReduce system" designates Map processors, assigns the K1 input key value each processor would work on, and provides that processor with all the input data associated with that key value.

Run the user-provided Map() code – Map() is run exactly once for each K1 key value, generating output organized by key values K2.

"Shuffle" the Map output to the Reduce processors – the MapReduce system designates Reduce processors, assigns the K2 key value each processor would work on, and provides that processor with all the Map-generated data associated with that key value.

Run the user-provided Reduce() code – Reduce() is run exactly once for each K2 key value produced by the Map step.

Produce the final output – the MapReduce system collects all the Reduce output, and sorts it by K2 to produce the final outcome.

Logically these 5 steps can be thought of as running in sequence – each step starts only after the previous step is completed – though in practice, of course, they can be intertwined, as long as the final result is not affected.

In many situations the input data might already be distributed ("sharded") among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as much as possible local to the Map-generated data they need to process.

MapReduce Example: WordCount

Driver Code:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool
public int run(String args[]) throws Exception
System.out.println("check your i/p and o/ps");
return -1;
JobConf conf=new JobConf(getConf(),WordCount.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
return 0;
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new WordCount(), args);

Mapper Code:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class WordMapper extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, IntWritable> {

  public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {
    String s = value.toString();
    for (String word : s.split("\\W+")) {
      if (word.length() > 0) {
        output.collect(new Text(word), new IntWritable(1));

Reducer Code:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SumReducer extends MapReduceBase implements
    Reducer<Text, IntWritable, Text, IntWritable> {

  public void reduce(Text key, Iterator<IntWritable> values,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {

    int wordCount = 0;
    while (values.hasNext()) {
      IntWritable value = values.next();
      wordCount += value.get();
    output.collect(key, new IntWritable(wordCount));

Compile and Submit the Job:

1. Create the file
$cat >balu.txt
Hello World Hello World
Hello World Hello World
ctrl+d(save the file)

Put the file into HDFS by using bellow command

$hadoop fs -put balu.txt balu.txt

2.Compile the all the Java files

goto the .java files located directory
my files are located in above directory

$javac -classpath /usr/lib/hadoop/hadoop-core.jar *.java

see the all .class file by using bellow command


3. Create the Jar file on .class files

$jar cvf word.jar *.class


4. Run the Job

$hadoop jar word.jar WordCount balu.txt baluoutput 

5. Check the output through command prompt

$hadoop fs -cat /user/training/baluoutput


goto browser and type the following link and check the output



WordCount implementation via Hadoop framework 

We will count the words in all the input file flow as below 
Assume there are two files each having a sentence 
Hello World Hello World (In file 1) 
Hello World Hello World (In file 2) 

Mapper : There would be each mapper for the a file 
For the given sample input the first map output:
< Hello, 1>
< World, 1> 
< Hello, 1> 
< World, 1> 
The second map output:
< Hello, 1> 
< World, 1> 
< Hello, 1> 
< World, 1>

Combiner/Sorting (This is done for each individual map) 
So output looks like this 
The output of the first map: 
< Hello, 2> 
< World, 2> 
The output of the second map: 
< Hello, 2> 
< World, 2>

Reducer :

It sums up the above output and generates the output as 

< Hello, 4> 
< World, 4>


Final output would look like 
Hello 4 times 
World 4 times

Thursday, 21 November 2013

Dancing with Sqoop

What is Sqoop?

Sqoop is a command-line interface application for transferring data between relational databases and Hadoop. (or) Import/Export data from RDBMS to Hadoop(HDFS) by using sqoop.

You can use Sqoop to import data from a relational database management system (RDBMS) such as MySQL or Oracle into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS.

Step1: Consider I have a table in mysql(emp) as follows

mysql>create database <database name>;
mysql>use <database>;

Step2: Now we need to give grant permissions to our created database as follows

mysql>grant all privileges on *.* to '<database username>'@'%' identified by '<database password>';
mysql> flush priviliges;

Step3: Now we need to create one table and insert values into table

mysql> create table emp(id int,name varchar(20),sal float);
mysql>insert into emp values(101,'xxxx',1234.5);
mysql>insert into emp values(201,'yyyy',567.7);
mysql>select * from emp;
mysql> create table empcity(city varchar(20),venue varchar(20),cityid int);
mysql> insert into empcity values('xxx','rrr',101);
mysql> insert into empcity values('xxxx','rrrx',106);

Step 4: Now i need to load this table values into HDFS in other system(PC).

Let us assume we had already pre-installed Hadoop, Sqoop and other Hadoop ecosystem(cdh3/cdh4) available.
First open cdh3/cdh4 Hadoop cluster environment and then open terminal now we need to execute following commands,

To display the list of databases in MySQL by using Hadoop system:

training@localhost$ sqoop list-databases --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)> -username root -P

Ex: training@localhost$localhost$sqoop list-databases --connect jdbc:mysql://     -username root -P

Tto display all the tables in particuler database by using Hadoop system:

training@localhost$ sqoop list-tables --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> -username <db username> -P

Ex: training@localhost$ sqoop list-tables --connect jdbc:mysql:// -username root -P

Import the data from MySQL to HDFS:

training@localhost$ sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> --table emp --username root -P --split-by id --target-dir /user/training/emp -m 1

Ex: training@localhost$ sqoop import --connect jdbc:mysql:// --table emp --username root -P --split-by id --target-dir /user/training/emp

Import data into Hive table from MySQL:

training@localhost$ sudo sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database name> --table <table name> --username <db username> --password <db password> --target-dir /user/training/hive/warehouse/<hive database>/<table name> -m 1

Ex: training@localhost$sudo sqoop import --connect jdbc:mysql:// --table emp --username root --password training --target-dir /user/training/hive/warehouse/myown.db/emp -m 1

Importing specific resultset based on our requirement:

training@localhost$sudo sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database name> --query "select * from emp where id=101 AND \$CONDITIONS" --username <username> --password <password> --target-dir /user/training/data -m 1

Ex: training@localhost$ sudo sqoop import --connect jdbc:mysql:// --query "select * from exp where id=101 AND \$CONDITIONS" --username root --password training --target-dir /user/training/datadir -m 1

Importing data after performing join into HDFS:

training@localhost$ sudo sqoop import --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> --query "select emp.*,empcity.* from emp join empcity on(emp.id=empcity.cityid) AND \$CONDITIONS" --username <dbusername> --password <password> --target-dir /user/training/datadir -m 1

Export data from HDFS to MySQL table:

First of all we need to create one table in MySQL:

mysql> create table emp1(id int,name varchar(20),sal float);

Consider we had the data in HDFS:

training@localhost$ sudo sqoop export --connect jdbc:mysql://<ip address of mysql installed system>:<portnumber(3306 default)>/<database> --table <table name in mysql> --username <username> --password <password> --export-dir <hdfs directory> -m 1

Ex: training@localhost$ sudo sqoop export --connect jdbc:mysql:// --table exp --username root --password training --export-dir /user/training/qqqq -m 1

Export data with specific delimiter:

training@localhost$sudo sqoop export --connect jdbc:mysql://<database  bame> --table <table name> --username <username> --password <password> --export-dir <hdf directory> --input-fields-terminated-by '@'  -m 1

Ex: training@localhost$ sudo sqoop export --connect jdbc:mysql:// --table exp1 --username root --password training --export-dir /user/training/exp --input-fields-terminated-by '@'  -m 1

Friday, 8 November 2013

The Hadoop Distributed File System


HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications. This module introduces the design of this distributed file system and instructions on how to operate it.

A distributed file system is designed to hold a large amount of data and provide access to this data to many clients distributed across a network. 

How to solve the Traditional System problems by using Big Data

Traditional System Problem : Data is too big store in one computer

Today's big data is 'too big' to store in ONE single computer -- no matter how powerful it is and how much storage it has. This eliminates lot of storage system and databases that were built for single machines. So we are going to build the system to run on multiple networked computers. The file system will look like a unified single file system to the 'outside' world

Hadoop solution : Data is stored on multiple computers

Traditional System Problem : Very high end machines are expensive

Now that we have decided that we need a cluster of computers, what kind of machines are they? Traditional storage machines are expensive with top-end components, sometimes with 'exotic' components (e.g. fiber channel for disk arrays, etc). Obviously these computers cost a pretty penny.
We want our system to be cost-effective, so we are not going to use these 'expensive' machines. Instead we will opt to use commodity hardware. By that we don't mean cheapo desktop class machines. We will use performant server class machines -- but these will be commodity servers that you can order from any of the vendors (Dell, HP, etc)

Hadoop solution : Run on commodity hardware

Traditional System Problem : Commodity hardware will fail

In the old days of distributed computing, failure was an exception, and hardware errors were not tolerated well. So companies providing gear for distributed computing made sure their hardware seldom failed. This is achieved by using high quality components, and having backup systems (in come cases backup to backup systems!). So the machines are engineered to withstand component failures, but still keep functioning. This line of thinking created hardware that is impressive, but EXPENSIVE!

On the other hand we are going with commodity hardware. These don't have high end whiz bang components like the main frames mentioned above. So they are going to fail -- and fail often. We need to prepared for this. How?

The approach we will take is we build the 'intelligence' into the software. So the cluster software will be smart enough to handle hardware failure. The software detects hardware failures and takes corrective actions automatically -- without human intervention. Our software will be smarter!

Hadoop solution : Software is intelligent enough to deal with hardware failure

Traditional System Problem : hardware failure may lead to data loss

So now we have a network of machines serving as a storage layer. Data is spread out all over the nodes. What happens when a node fails (and remember, we EXPECT nodes to fail). All the data on that node will become unavailable (or lost). So how do we prevent it?

One approach is to make multiple copies of this data and store them on different machines. So even if one node goes down, other nodes will have the data. This is called 'replication'. The standard replication is 3 copies.

Hadoop Solution : replicate (duplicate) data

Traditional System Problem : how will the distributed nodes co-ordinate among themselves

Since each machine is part of the 'storage', we will have a 'daemon' running on each machine to manage storage for that machine. These daemons will talk to each other to exchange data.

OK, now we have all these nodes storing data, how do we coordinate among them? One approach is to have a MASTER to be the coordinator. While building distributed systems with a centralized coordinator may seem like an odd idea, it is not a bad choice. It simplifies architecture, design and implementation of the system

So now our architecture looks like this. We have a single master node and multiple worker nodes.

Hadoop solution : There is a master node that co-ordinates all the worker nodes

Overview of HDFS

HDFS has many similarities with other distributed file systems, but is different in several respects. One noticeable difference is HDFS's write-once-read-many model that relaxes concurrency control requirements, simplifies data coherency, and enables high-throughput access.

Another unique attribute of HDFS is the viewpoint that it is usually better to locate processing logic near the data rather than moving the data to the application space.

HDFS rigorously restricts data writing to one writer at a time. Bytes are always appended to the end of a stream, and byte streams are guaranteed to be stored in the order written.

HDFS has many goals. Here are some of the most notable:

  • Fault tolerance by detecting faults and applying quick, automatic recovery
  • Data access via MapReduce streaming
  • Simple and robust coherency model
  • Processing logic close to the data, rather than the data close to the processing logic
  • Portability across heterogeneous commodity hardware and operating systems
  • Scalability to reliably store and process large amounts of data
  • Economy by distributing data and processing across clusters of commodity personal computers
  • Efficiency by distributing data and logic to process it in parallel on nodes where data is located
  • Reliability by automatically maintaining multiple copies of data and automatically redeploying processing logic in the event of failures
HDFS Architecture

Master / worker design

In an HDFS cluster, there is ONE master node and many worker nodes. The master node is called the Name Node (NN) and the workers are called Data Nodes (DN). Data nodes actually store the data. They are the workhorses.

Name Node is in charge of file system operations (like creating files, user permissions, etc.). Without it, the cluster will be inoperable. No one can write data or read data. 
This is called a Single Point of Failure. We will look more into this later.

Runs on commodity hardware

As we saw hadoop doesn't need fancy, high end hardware. It is designed to run on commodity hardware. The Hadoop stack is built to deal with hardware failure and the file system will continue to function even if nodes fail.

HDFS is resilient (even in case of node failure)

The file system will continue to function even if a node fails. Hadoop accomplishes this by duplicating data across nodes.

Data is replicated

So how does Hadoop keep data safe and resilient in case of node failure? Simple, it keeps multiple copies of data around the cluster.

To understand how replication works, lets look at the following scenario. Data segment #2 is replicated 3 times, on data nodes A, B and D. Lets say data node A fails. The data is still accessible from nodes B and D.

HDFS is better suited for large files

Generic file systems, say like Linux EXT file systems, will store files of varying size, from a few bytes to few gigabytes. HDFS, however, is designed to store large files. Large as in a few hundred megabytes to a few gigabytes.

Why is this?

HDFS was built to work with mechanical disk drives, whose capacity has gone up in recent years. However, seek times haven't improved all that much. So Hadoop tries to minimize disk seeks.

Files are write-once only (not updateable)

HDFS supports writing files once (they cannot be updated). This is a stark difference between HDFS and a generic file system (like a Linux file system). Generic file systems allows files to be modified.

However appending to a file is supported. Appending is supported to enable applications like HBase.


Namenode acts as master in HDFS. It stores file system metadata and transaction log of changes happening in file system. Namenode does not store actual file data.

Namenode also maintains block map report sent by individual data nodes. Whenever any client wants to perform any operation on file. It contacts Namenode, which responds to this request by providing block map and Datanode information.


Datanode is the actual storage component in HDFS. Datanode store data in HDFS file system. A typical production HDFS cluster has one Namenode and multiple Datanodes.

Datanodes talks to Namenode in the form of heartbeats to let Namenode know that particular Datanode is alive and Block report which consists of list of data block held by that particular Datanode. Datanodes also talks to other Datanode directly for data replication.

Checkpoint Node

HDFS stores its namespace and file system transaction log in FsImage and EditLog files on Namenode local disk. When Namenode starts-up, changes recorded in EditLog are merged with FsImage, So that HDFS always have up-to date file system metadata. After merging the changes from EditLog to FsImage, HDFS removes the old FsImage copy and replaces it with newer one as it has new updated FsImage which represents current state of HDFS and then it opens up new EditLog.

In any HDFS instance, Namenode is the single point of failure because if Namenode maintains the namespace and Editlog, if these files are corrupted or lost, whole cluster will go down. To avoid this, multiple copies of FsImage and EditLog can be maintained on different machine using checkpoint node.

Checkpoint node creates the periodic checkpoints of namespace and edit log. Checkpoint node downloads the latest copies of FsImage and EditLog from active Namenode, stores them locally, merges them and uploads back to active Namenode.

A true production Hadoop cluster should have checkpoint node running on different machine which is of same configuration like active namenode in terms of memory.

The Checkpoint node stores the latest checkpoint in a directory which has same structure as the NameNode’s directory. This allows the checkpointed image to be always available for reading by the NameNode if necessary. It is possible to have multiple checkpoint node in a cluster. This can be specified in HDFS configuration file.

Backup Node

Backup node works the same way like checkpoint. Backup node provides checkpoint functionality and in addition to this it also maintains updated copies of file system namespace in its memory. This in-memory copy is in synchronized with Namenode. Backup node applies the EditLog changes to in-memory copy namespace and stores it on disk. This way backup node always have up to date copies of EditLog and FsImage on disk and in memory.

In contrast with checkpoint node, where checkpoint node needs to download the copies of FsImage and EditLog, Backup node does not need to download these copies, as it always have updated copy of namespace in its memory. It only needs to apply latest EditLog changes to in-memory namespace and stores the copies of FsImage and Edit log on its local disk. Due to this backup node checkpoint process is more efficient than checkpoint node.

Backup node memory requirement is same as Namenode as it needs to maintain namespace in memory like Namenode. You can have only one backup node (multiple backup node are not supported at this point of time) and no checkpoint node can run, when backup node is running. Means you can have either backup node or checkpoint node, not both at a time.

Since backup node can maintain the copies of namespace in memory, you can start Namenode in a such a way that Namenode no longer needs to maintain namespace in its own memory, Namenode node can delegate this tack to backup node. In such case Namenode will import the namespace from backup node memory whenever it requires namespace. This can be done by starting Namenode with –importCheckpoint option.

Saturday, 2 November 2013

What is Unstructured Data

The phrase "unstructured data" usually refers to information that doesn't reside in a traditional row-column database. As you might expect, it's the opposite of structured data -- the data stored in fields in a database.

Unstructured data files often include text and multimedia content. Examples include e-mail messages, word processing documents, videos, photos, audio files, presentations, webpages and many other kinds of business documents. Note that while these sorts of files may have an internal structure, they are still considered "unstructured" because the data they contain doesn't fit neatly in a database.

Experts estimate that 80 to 90 percent of the data in any organization is unstructured. And the amount of unstructured data in enterprises is growing significantly -- often many times faster than structured databases are growing.

Mining Unstructured Data

Many organizations believe that their unstructured data stores include information that could help them make better business decisions. Unfortunately, it's often very difficult to analyze unstructured data. To help with the problem, organizations have turned to a number of different software solutions designed to search unstructured data and extract important information. The primary benefit of these tools is the ability to glean actionable information that can help a business succeed in a competitive environment.

Because the volume of unstructured data is growing so rapidly, many enterprises also turn to technological solutions to help them better manage and store their unstructured data. These can include hardware or software solutions that enable them to make the most efficient use of their available storage space.

Unstructured Data and 'Big Data'

As mentioned above, unstructured data is the opposite of structured data. Structured data generally resides in a relational database, and as a result, it is sometimes called "relational data." This type of data can be easily mapped into pre-designed fields. For example, a database designer may set up fields for phone numbers, zip codes and credit card numbers that accept a certain number of digits. Structured data has been or can be placed in fields like these. By contrast, unstructured data is not relational and doesn't fit into these sorts of pre-defined data models.

In addition to structured and unstructured data, there's also a third category: semi-structured data. Semi-structured data is information that doesn't reside in a relational database but that does have some organizational properties that make it easier to analyze. Examples of semi-structured data might include XML documents and NoSQL databases.

The term "big data" is closely associated with unstructured data. "Big data" refers to extremely large datasets that are difficult to analyze with traditional tools. Big data can include both structured and unstructured data, but IDC estimates that 90 percent of big data is unstructured data. Many of the tools designed to analyze big data can handle unstructured data.

Unstructured Data Vendors

Numerous vendors offer products designed to help companies analyze and manage their unstructured data. They include the following:
The open source community has been particularly active in developing software that can manage unstructured data, and many vendors offer paid products and services related to these open source projects. Open source projects and vendors related to the storage, management and analysis of unstructured data include the following: