# Hadoop Architecture ## Features - Designed to run in clusters of pcs - Scales up linearly - Suitable for local networks or data centers - Design principles - Data is distributed around the network - Computation is sent to data: code is sent to run on nodes - Basic architecture is mater / worker - Offers the following: - Redundant, fault tolerant data storage - Parallel computation framework - Job coordination ## Structure of a MapReduce job - **Job**: a program to be executed across the **entire** dataset - Packaged as a jar file, with all the code needed - Job is assigned a cluster-unique ID - Data attached to job is replicated over the entire internet - **Task**: an execution on a slice of data - **Task Attempt**: An instance of a local execution ## Job execution flow - Split data into computing chunks - Assign a chunk to node manager - Run many mappers - [Shuffle and sort](/3-1-map-reduce.md#running) - Run many reducers - Result from reducers create the job output ## The Optional Combiner - The bottleneck in map-reduce frameworks: - Map and reduce jobs scale close to linearly, so they are not - Potential bottleneck at the shuffle and sort operations (between map and reduce) - Data need to be copied over network - Lots of keys are emitted by mapper, and sorting they are costly - Combiner can be used to execute before shffling and sorting - Reason - Acts as a preliminary reducer - Executed at each mapper node, just before sending all the pairs for shuffling - **Reduces** amount of data emitted by mapper, to improve efficiency - Restrictions and rules: - **Cannot** be mandatory, the job should work the correctly without it - **Idempotent**: The number of time the combiner is applied shouldn't change the output - No **Side effect**, or they won't be idempotent - **Preserve** the keys: can't change the keys to disrupt the **sort** order, or changing the **partitioning** - Example of using reducer code as the combiner: ```java public void Combine(String key, List values) { int sum = 0; for (Integer count: values){ sum+=count; } emit(key, sum); } ``` - TODO review the combiner diagram ## Apache Hadoop ### Architecture of Hadoop - Executes on nodes connected by network - Each node runs a set of daemons - Computing: - `ResourceManager` - `NodeManager` - Storage: - `NameNode` - `SecondaryNameNode` as backup - `DataNode` - Nodes are in Master Slave architecture - Master node: `NameNode`, `ResourceManager` - Aware of slave nodes - Receives external requests - Decide the work split of slaves - Notify slaves - Slave node, also called Worker node: `DataNode`, `NodeManager` - Executes the tasks, received from master ### What Hadoop Does - Resource Management: the existence and availability of resources - Job Allocation: needed resources for job, and the split of work - Job Execution: Run job, make sure it's completed, deal with failures ## Job execution: YARN ### Intro - Estimate how many map and reduce tasks are needed for a job, based on _input dataset_ and _job definition_ - Ideally, one different node for each map / reduce tasks ### Deciding the number of workers #### Mapper Parallelization - Different input split are processed on each mapper - Input data size is known - Number of mappers: Input size $/$ Split Size - If input size is small, and has many files, they won't be splitted and will use more mappers #### Reducer - Number of reducer is **user defined**, since it's hard to figure out automatically - Keys are partitioned, partitioning too much lead to overhead in shuffle and sort ### Execution Daemons #### `ResourceManager` - On master: one per cluster - Responsibility: - Receive job requests from client - Create a `ApplicationMaster` per job to manage it - Allocate **container** in slave nodes, with the assigned resources - Provision the health of `NodeManager` nodes #### `NodeManager` - Responsibility: - Coordinate the execution of tasks on node - Send health information to `ResourceManager` #### `ApplicationMaster` - Only one per job - Responsibility: Job allocation and job execution - Implements specific framework, for example MapReduce - Negitiates with `ResourceManager` on the resources required - Decides which node will run which job, in the container, given by `ResourceManager` - Destroyed when job completed ## Storage: HDFS ### Definition - Hadoop Distributed File System - This is the storage for Hadoop's input and output - Features: - Tailored for MapReduce jobs - Large Block size (64MB) - Not a POSIX compliant file system ### Data distribution: key element of map reduce - Job code (jars) moved to where data is stored - Blocks are replicated on the cluster, by default **three** times, to ensure **reliability** ### Storage daemon - `DataNode`: many per cluster - Stores block from HDFS - Report the blocks stored to `NameNode` - `NameNode`: one per cluster - Keep index and location for every block - Don't do computation, because this is heavy - Single point of failure - `SecondaryNameNode` - Communicates directly with a `NameNode` - Store backup of index table ### Data Replication - Format: csv, example: | `Filename` | `numReplicas` | `block-ids` | | ---------- | ------------- | ----------- | | part-0 | r:2 | {1,3} | | part-1 | r:3 | {2,4,5} | - Definition: Creating and maintaining multiple copies of data, across different nodes in the HDFS - Significance - Fault tolerance - Data availability - System Reliability - Support Parallel Processing ### Failure recovery - Identifying failure: by missing the hearbeat signal - Replication: NameNode initializes replication once a node failure is detected - Maintaining Integrity: use a predetermined replication factor - Mitigating Potential Disruptions: Dynamic data management ### Operation - TODO: See the graph in p59 - Input data: - Mappers are assigned input splits from HDFS input path (default 64MB) - Data locality: `ApplicationMaster` try to assign mapper where data is stored - Output data: - Copied to HDFS, one file per reducer - Replicate