Monday 6 January 2014

Map Reduce Counters

Counters
One of the feature provided by Map Reduce Framework is Counters

Counters helps in gathering statistics about the job. These statistics are useful for quality control and problem diagnosis

Built-in Counters:
For every job, hadoop maintains some built-in counters which report various metrics
e.g There are counters for the number of bytes and records processed, which allows you to confirm that the expected amount of input was consumed and the expected amount of output was produced.

There are different built in counters related to Job , FileSystem 

Counters are global:
Counters are maintained by the task with which they are associated, and periodically sent to the tasktracker and then to the jobtracker, so they can be globally aggregated.The built-in Job Counters are actually maintained by the jobtracker, so they don’t need to be sent across the network, unlike all other counters, including user-defined ones.
Counter values are definitive only once a job has successfully completed.

Custom Counters:
MapReduce allows users to define a set of counters, which are incremented as desired in the mapper or reducer. Counters are defined by a Java enum. A job may define an arbitrary number of enums, each with an arbitrary number of fields. The name of the enum is the group name, and the enum’s fields are the counter names.  
Counters are global: the MapReduce framework aggregates them across all maps and reduces to produce a grand total at the end of the job.

Declare Counter

enum CustomCounters {
        VALID,
        INVALID ,
        SUM
}

This Counter hold three fields :
Valid gives total count of valid records
Invalid gives total count of valid records
Sum gives the sum of 2nd column

Increment the value of Counter:
context.getCounter(CustomCounters.VALID).increment(1);

Retrieve the value of Counter :
long sum = context.getCounter(CustomCounters.SUM).getValue();

Complete Code: 

public class MR_CounterDemo {
// Declaring a Counter
   enum CustomCounters {
         VALID,
         INVALID ,
         SUM
    }

public static class FilterMapper extends Mapper<Object, Text, IntWritable,LongWritable>{


public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
   if(value!=null){
        String[] line = value.toString().split(",");
        if(line.length==3){
 // Increment the counter
            context.getCounter(CustomCounters.VALID).increment(1);
            int field = Integer.parseInt(line[1]);
 // Retreive the counter
            context.getCounter(CustomCounters.SUM).increment(field);
            long sum = context.getCounter(CustomCounters.SUM).getValue();
            int keyOut = Integer.parseInt(line[0]);
            context.write(new IntWritable(keyOut),new LongWritable(sum));
       }
     }
     else{
         context.getCounter(CustomCounters.INVALID).increment(1);
       }
   }
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Map Reduce Counter Usage");
job.setJarByClass(MR_CounterDemo.class);
job.setMapperClass(FilterMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/path/to/inputfile"));
FileOutputFormat.setOutputPath(job, new Path("/path/to/outputfile"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


Counter are accessible in map and reduce method 
In its output ,you see below lines:
13/12/20 14:32:54 INFO mapred.JobClient: Job complete: job_local_0001
13/12/20 14:32:54 INFO mapred.JobClient: Counters: 23
13/12/20 14:32:54 INFO mapred.JobClient: wordcount.newapi.MR_CounterDemo$CustomCounters
13/12/20 14:32:54 INFO mapred.JobClient: SUM=87
13/12/20 14:32:54 INFO mapred.JobClient: INVALID=4
13/12/20 14:32:54 INFO mapred.JobClient: VALID=7
Above lines shows the counter values for custom counter fields
Buit in counters:
3/12/20 14:32:54 INFO mapred.JobClient: File Output Format Counters
13/12/20 14:32:54 INFO mapred.JobClient: Bytes Written=47
13/12/20 14:32:54 INFO mapred.JobClient: FileSystemCounters
13/12/20 14:32:54 INFO mapred.JobClient: FILE_BYTES_READ=576
13/12/20 14:32:54 INFO mapred.JobClient: FILE_BYTES_WRITTEN=65031
..........................................................
Above lines shows the counter values for built -in counter fields

To access in-built counters associated with job, you can try below code:
Counters counters = job.getCounters();
long counter = counters.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue();

counters variable will hold following values:

FileSystemCounters
        FILE_BYTES_READ=198258
        FILE_BYTES_WRITTEN=219848
    Map-Reduce Framework
        Combine input records=0
        Combine output records=0
        Total committed heap usage (bytes)=519438336
        CPU time spent (ms)=0
        Map input records=258
        Map output bytes=141
        Map output materialized bytes=150
        Map output records=1
        Physical memory (bytes) snapshot=0
        Reduce input groups=1
        Reduce input records=1
        Reduce output records=0
        Reduce shuffle bytes=0
        Spilled Records=2
        SPLIT_RAW_BYTES=141
        Virtual memory (bytes) snapshot=0
    File Input Format Counters
        Bytes Read=8937
    File Output Format Counters
        Bytes Written=0

These counters can be accessed by using its property name.