Hadoop Professionals

A Community for Hadoop Users

Himanshu Gahlot

Null value for DistributedCache.getLocalFiles method in hadoop-0.20.2

Hi all,

I upgraded the WordCountv2.0 example provided in the Map-Reduce tutorial (http://hadoop.apache.org/common/docs/r0.20.1/mapred_tutorial.html#E...) to word with hadoop-0.20.2 i.e. removed org.apache.hadoop.mapred.* imports and used org.apache.hadoop.mapreduce.* instead. I am using a pseudo-distributed single node cluster which means that the DistributedCache feature should not be a problem. I was able to run the original example on hadoop-0.18.3 (using the DistributedCache feature) but I am facing problems in running it on hadoop-0.20.2.

I am getting null value for the method DistributedCache.getCacheFiles(job) in both mapper and at the time when it is set (i.e. just after DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);). I am also not able to set any keys. The map.input.file is giving null value. The wordcount.skip.patterns is also not set to true. It prints true just after setting but prints false in the mapper.

Here is my code:

package org.myorg;
    
import java.io.*;
import java.util.*;
    
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;
    
     public class WordCount{
        
        public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
          static enum Counters { INPUT_WORDS }
    
          private final static IntWritable one = new IntWritable(1);
          private Text word = new Text();
    
          private boolean caseSensitive = true;
          private Set<String> patternsToSkip = new HashSet<String>();
    
          private long numRecords = 0;
          private String inputFile;
    
          @Override
          public void setup(Context context) {
            Configuration job = ((JobContext)context).getConfiguration();
            caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
            inputFile = job.get("map.input.file");
            System.out.println("input file="+job.get("map.input.file")); // prints null
            try {
             System.out.println("in mapper cache1="+DistributedCache.getCacheFiles(job)); // prints null
            } catch (IOException e) {
              e.printStackTrace();
            }
            System.out.println("wordcount.skip.patterns value in map="+job.getBoolean("wordcount.skip.patterns", false)); // prints false when it should be true (I am using -skip argument)
          
            if (job.getBoolean("wordcount.skip.patterns", false)) {
              Path[] patternsFiles = new Path[0];
              try {
                patternsFiles = DistributedCache.getLocalCacheFiles(job);
             } catch (IOException ioe) {
                System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
              }
              for (Path patternsFile : patternsFiles) {
                parseSkipFile(patternsFile);
              }
            }
          }
    
          private void parseSkipFile(Path patternsFile) {
            try {
              BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
              String pattern = null;
              while ((pattern = fis.readLine()) != null) {
                patternsToSkip.add(pattern);
              }
            } catch (IOException ioe) {
              System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
            }
          }
        
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
    
            for (String pattern : patternsToSkip) {
              line = line.replaceAll(pattern, "");
            }
    
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
              word.set(tokenizer.nextToken());
              context.write(word, one);
              context.getCounter(Counters.INPUT_WORDS).increment(1);
            }
    
            if ((++numRecords % 100) == 0) {
              context.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
            }
          }
        }
    
        public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
          @Override
          public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            while (values.iterator().hasNext()) {
              sum += values.iterator().next().get();
            }
            context.write(key, new IntWritable(sum));
          }
        }
    
       public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration();
              Job job = new Job(conf);
              job.setJarByClass(WordCount.class);
              job.setJobName("wordcount");
       
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
       
              job.setMapperClass(WCMapper.class);
              job.setCombinerClass(WCReducer.class);
              job.setReducerClass(WCReducer.class);
        
              job.setInputFormatClass(TextInputFormat.class);
              job.setOutputFormatClass(TextOutputFormat.class);
        
              List<String> other_args = new ArrayList<String>();
              for (int i=0; i < args.length; ++i) {
                if ("-skip".equals(args[i])) {
                  DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
                  System.out.println("in main cache="+DistributedCache.getCacheFiles(conf)); // prints null
                  conf.setBoolean(PATTERN_KEY, true);
                } else {
                  other_args.add(args[i]);
                }
              }
             
              System.out.println("wordcount.skip.patterns value in main="+conf.getBoolean("wordcount.skip.patterns", false)); // prints true
             
             FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
             FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
       
             System.exit(job.waitForCompletion(true)?0:1);
        }
}

-----------------------

Kindly tell me how to set keys and use the DistributedCache feature in hadoop-0.20.2

Views: 193

Reply to This

Replies to This Discussion

I solved the DistributedCache problem. It is discussed here -> http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-t...

I am still stuck with the setting of keys problem. I am not able to retrieve values of keys set during configuration in the mapper or reducer. Anyone got any idea why is this happening ? Is it a bug in 0.20.2 or is there some other workaround to this problem ?
using job.getConfiguration().setString(, ) instead of conf.getConfiguration().setString(....) solved the issue !

Reply to Discussion

RSS




Groups

© 2012   Created by Jason Venner.

Badges  |  Report an Issue  |  Terms of Service