- Narayana Murthy Pola, Sr. Project Manager, DST India
Rapid digitization has increased the generation and availability of data. To harness the data we need to store the (Big) data and process the (Big) data. Available storage size and performance has increased while the costs came down almost at the same pace the data has grown. But Network bandwidth/data transfer rates could not keep the same pace thus hindering the data processing capabilities.
However, Distributed computing has tried to address the large data processing (For example, processing weather data, seismic information etc) even before Hadoop came into existence. Grid computing community, SETI@Home are some of the popular distributed computing initiatives using PVM (Parallel Virtual Machine), MPI (Message Processing Interface),( examples of some of the common distributed computing frameworks) etc.
The approach in this programming model is to distribute the work across cluster of machines (nodes) which access a shared file system hosted by SAN.(Storage Area Network). This works well for compute intensive jobs but quickly becomes a problem when each node (machine) has to access large datasets, owing to the limitations of network bandwidth.
Also, the programmer has to explicitly (programmatically) handle the mechanics of data flow, load balancing of clusters, and extremely complicated task of coordinating the processes in the distributed computation. SETI @ Home sends chunks of data and computing algorithms to participating volunteer machines over the internet. These machines are not verified and trusted thus risking exposure of confidential data.
Thus the existing distributed computing frameworks and modes are battling the complexities like
1. Network bandwidth limitations
2. Ease of programming
3. Data consistency
4. Fault tolerance
5. Cost
6. Scalability
Hadoop comes as an answer to the above distributed programming complexities. It was developed by Doug cutting based on Google’s Distributed File System (2003) and the Map Reduce Programming paradigm introduced to the world (by Google) in 2004.
The kernel of Hadoop consists of
1. Data Storage - Hadoop Distributed File System (HDFS)
2. Data Processing engine : Map Reduce programming paradigm
HDFS Architecture:
Hadoop Distributed File system is a Master Slave architecture with a single name node and many data nodes. Name node (Master) maintains the file system Meta data. In general, Server class machine is used as namenode.
Slave node contains actual contents of the file stored as blocks. In general, commodity hardware is used for data nodes thus bringing down the cost of data storage whenever data is ingested into Hadoop, it is split into fixed sized blocks and are stored on data nodes. These data blocks are replicated to nodes across the cluster for redundancy with a default replicating factor of 3.This ensures fault tolerance performance and reliability. Each Slave node runs data node daemon (process) that controls the access to data blocks and communicates with name nodes. Data nodes periodically send the status (heartbeat) to the name node. Data nodes can be added/deleted with ease thus ensuring scalability Programmer is free from allocation and distribution of data and cluster load management.
Execution engine: The execution engine follows a Master-slave architecture like the HDFS storage. There are two types of daemons (processes in the background) that control the job execution process. Job tracker (Master) co-ordinates all the jobs run on the system. There will be only one Job tracker for a cluster. In a typical scenario client submits the job and data input splits based on the allocated block size of the data. Job tracker then assigns a Job id and a Map job for each of the input splits Job tracker tracks the progress of each job (by the heart beats/status sent by the task tracker) and schedules/re-schedules the tasks if any of them fails.
Task trackers creates a new JVM for each of the tasks to run. Task tracker sends periodically progress/status info (heartbeats) of the jobs to the Job tracker.
Programming model: The programming model that runs in the execution engine is called MapReduce. Map Reduce is a linearly scalable programming model consisting of two main phases/tasks, programmed by the developer. They are Map tasks and Reduce tasks. In between the two processes a subsidiary process of shuffle and sort runs before the output of Map task is passed to Reducer task. Mapper runs on one record at a time and Reducer aggregates results from Mappers. They can be used on any size of data thus helping Scalability
MapReduce tries to co-locate the data with the compute node so data access is fast as it is local. This feature data locality helps faster performance thus reducing/almost eliminating the network transfer bottleneck.
The model facilitates task distribution across the nodes. Before the output data from a Mapper is input to the Reducer task. Map Reduce breaks down everything to key-value pairs and may sound very restrictive programming paradigm. However, many algorithms ranging from image analysis to graph-based problems, to complex machine learning algorithms are written using MaP reduce paradigm.
A typical flow of Map Reduce’s Hello world program i.e. Word count can be depicted as below.
Some of the examples which readers can try are 1. Log analysis – There is a collection of number of documents with an occurrence of a particular term. Readers can write a simple algorithm to count the occurrence of a particular term.
Log Analysis: Calculate the total up time of a server from the log data emitted every minute over a seven-day period.
Map Reduce programs can be written in Java. Hadoop provides APIs (Hadoop Streaming) to write map reduce programs in Python, Ruby and other scripting languages as well .Hadoop Streaming supports any programming language that can read standard input and writes to standard output. Hadoop pipes is the API to interfaces C++.They use sockets as the channel for task tracker to communicate with the C++ Map Reduce functions. Thus, any programmer can easily slip into Map Reduce Programming model and get Hadoop work for them.