I'm trying to run a hadoop streaming job with a java Mapper/Reducer over some wikipedia dumps (in compressed bz2 form). I'm trying to use WikiHadoop, which is an interface released by Wikimedia recently.
WikiReader_Mapper.java
package courseproj.example;
// Mapper: emits (token, 1) for every article occurrence.
public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> {
// Reuse objects to save overhead of object creation.
private final static Text KEY = new Text();
private final static IntWritable VALUE = new IntWritable(1);
@Override
public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
throws IOException {
KEY.set("article count");
collector.collect(KEY, VALUE);
}
}
WikiReader_Reducer.java
package courseproj.example;
//Reducer: sums up all the counts.
public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
private final static IntWritable SUM = new IntWritable();
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
SUM.set(sum);
collector.collect(key, SUM);
}
}
The command I'm running is
hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \
-libjars lib2/wikihadoop-0.2.jar \
-D mapreduce.input.fileinputformat.split.minsize=300000000 \
-D mapreduce.task.timeout=6000000 \
-D org.wikimedia.wikihadoop.previousRevision=false \
-input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \
-output out \
-inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
-mapper WikiReader_Mapper \
-reducer WikiReader_Reducer
and the error messages I'm getting are
Error: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103)
Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)
I'm more familiar with the new hadoop API vs. old. Since my mapper and reducer code is in two different files, where do I define the JobConf configuration parameters for the job while at the same time, following the command structure of hadoop streaming (explicitly setting the mapper and reducer class). Is there a way I can wrap the mapper and reducer code all up into one class (that extends Configured and implements Tool, which is what's done in the new API) and pass the class name to the hadoop streaming command line vs. setting the the map and reduce classes separately?
Copyright Notice:Content Author:「Josh Bradley」,Reproduced under the CC 4.0 BY-SA copyright license with a link to the original source and this disclaimer.
Link to original article:https://stackoverflow.com/questions/16043572/hadoop-streaming-with-java-mapper-reducer