Hadoop reduce side join using Datajoin
NickName:LGG Ask DateTime:2012-04-18T09:17:51

Hadoop reduce side join using Datajoin

I am using the folllowing code to do the reduce side join

/*
 * HadoopMapper.java
 *
 * Created on Apr 8, 2012, 5:39:51 PM
 */


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// import org.apache.commons.logging.Log;
// import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.*; 

/**
 *
 * @author 
 */
public class DataJoin extends Configured implements Tool 
    {
        public static class MapClass extends DataJoinMapperBase 
            {
                protected Text generateInputTag(String inputFile) 
                    {
                        String datasource = inputFile.split("-")[0];
                        return new Text(datasource);
                    }
            protected Text generateGroupKey(TaggedMapOutput aRecord) 
                {
                    String line = ((Text) aRecord.getData()).toString();
                    String[] tokens = line.split(",");
                    String groupKey = tokens[0];
                    return new Text(groupKey);
                }
            protected TaggedMapOutput generateTaggedMapOutput(Object value) 
                {
                    TaggedWritable retv = new TaggedWritable((Text) value);
                    retv.setTag(this.inputTag);
                    return retv;
                }
            }
        public static class Reduce extends DataJoinReducerBase 
            {
                protected TaggedMapOutput combine(Object[] tags, Object[] values) 
                    {
                        if (tags.length < 2) return null;
                        String joinedStr = "";
                        for (int i=0; i<values.length; i++) 
                        {
                            if (i > 0) joinedStr += ",";
                            TaggedWritable tw = (TaggedWritable) values[i];
                            String line = ((Text) tw.getData()).toString();
                            String[] tokens = line.split(",", 2);
                            joinedStr += tokens[1];
                        }
                        TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
                        retv.setTag((Text) tags[0]);
                        return retv;
                    }
            }
        public static class TaggedWritable extends TaggedMapOutput 
            {
                private Writable data;
                public TaggedWritable(Writable data) 
                    {
                        this.tag = new Text("");
                        this.data = data;
                    }

                public Writable getData() 
                    {
                        return data;
                    }
                public void write(DataOutput out) throws IOException
                    {
                        this.tag.write(out);
                        this.data.write(out);
                    }
                public void readFields(DataInput in) throws IOException 
                    {
                        this.tag.readFields(in);
                        this.data.readFields(in);
                    }
            }
        public int run(String[] args) throws Exception 
            {


                                Configuration conf = getConf();
                JobConf job = new JobConf(conf, DataJoin.class);
                                String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                                if (otherArgs.length != 2) 
                                {
                                  System.err.println("Usage: wordcount <in> <out>");
                                  System.exit(2);
                                }

                Path in = new Path(args[0]);
                Path out = new Path(args[1]);
                FileInputFormat.setInputPaths(job, in);
                FileOutputFormat.setOutputPath(job, out);
                job.setJobName("DataJoin");
                job.setMapperClass(MapClass.class);
                job.setReducerClass(Reduce.class);
                job.setInputFormat(TextInputFormat.class);
                job.setOutputFormat(TextOutputFormat.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(TaggedWritable.class);
                job.set("mapred.textoutputformat.separator", ",");
                JobClient.runJob(job);
                return 0;
            }
        public static void main(String[] args) throws Exception 
            {
                int res = ToolRunner.run(new Configuration(),
                new DataJoin(),
                args);
                System.exit(res);
            }
    }

I am able to compile my code. When I run in hadoop I am getting the following error with the combiner

12/04/17 19:59:29 INFO mapred.JobClient:  map 100% reduce 27%
12/04/17 19:59:38 INFO mapred.JobClient:  map 100% reduce 30%
12/04/17 19:59:47 INFO mapred.JobClient:  map 100% reduce 33%
12/04/17 20:00:23 INFO mapred.JobClient: Task Id : attempt_201204061316_0018_r_000000_2, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException: DataJoin$TaggedWritable.<init>()
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:62)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
        at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1136)
        at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1076)
        at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:246)
        at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:242)
        at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)

The command I use to run hadoop is /hadoop/core/bin/hadoop jar /export/scratch/lopez/Join/DataJoin.jar DataJoin /export/scratch/user/lopez/Join /export/scratch/user/lopez/Join_Output

and the DataJoin.jar file has DataJoin$TaggedWritable packaged in it

I checked some forums and found out that the error may occur due to non static class. My program has no non static class!

Could someone please help me


Thank you Chris I edited as you said . I updated my code to take in two files. But I am getting same error message

I am getting the same message INFO mapred.FileInputFormat: Total input paths to process : 2

the error is

     Status : FAILED
    java.lang.ArrayIndexOutOfBoundsException: 1
    at DataJoin$Reduce.combine(DataJoin.java:69)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:205)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:181)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:135)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:468)



{


    Configuration conf = getConf();
    JobConf job = new JobConf(conf, DataJoin.class);
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 3) 
    {
      System.err.println("Usage: wordcount <in> <in1> <out>");
      System.exit(2);
    }

    Path in = new Path(args[0]);
    Path in1 = new Path(args[1]);
    Path out = new Path(args[2]);
    FileInputFormat.setInputPaths(job,in,in1);
    FileOutputFormat.setOutputPath(job, out);
    job.setJobName("DataJoin");
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(TaggedWritable.class);
    job.set("mapred.textoutputformat.separator", ",");
    JobClient.runJob(job);
    return 0;

}

Copyright Notice:Content Author:「LGG」,Reproduced under the CC 4.0 BY-SA copyright license with a link to the original source and this disclaimer.
Link to original article:https://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin

Answers
Chris White 2012-04-18T03:03:26

You need a default constructor for TaggedWritable (Hadoop uses reflection to create this object, and requires a default constructor (no args).\n\nYou also have a problem in that your readFields method, you call data.readFields(in) on the writable interface - but has no knowledge of the actual runtime class of data.\n\nI suggest you either write out the data class name before outputting the data object itself, or look into the GenericWritable class (you'll need to extend it to define the set of allowable writable classes that can be used).\n\nSo you could amend as follows:\n\npublic static class TaggedWritable extends TaggedMapOutput {\n private Writable data;\n\n public TaggedWritable() {\n this.tag = new Text();\n }\n\n public TaggedWritable(Writable data) {\n this.tag = new Text(\"\");\n this.data = data;\n }\n\n public Writable getData() {\n return data;\n }\n\n public void setData(Writable data) {\n this.data = data;\n }\n\n public void write(DataOutput out) throws IOException {\n this.tag.write(out);\n out.writeUTF(this.data.getClass().getName());\n this.data.write(out);\n }\n\n public void readFields(DataInput in) throws IOException {\n this.tag.readFields(in);\n String dataClz = in.readUTF();\n if (this.data == null\n || !this.data.getClass().getName().equals(dataClz)) {\n this.data = (Writable) ReflectionUtils.newInstance(\n Class.forName(dataClz), null);\n }\n this.data.readFields(in);\n }\n}\n",


More about “Hadoop reduce side join using Datajoin” related questions

Hadoop reduce side join using Datajoin

I am using the folllowing code to do the reduce side join /* * HadoopMapper.java * * Created on Apr 8, 2012, 5:39:51 PM */ import java.io.DataInput; import java.io.DataOutput; import java.io.

Show Detail

Hadoop reduce-side join doesn't work

When I try the same example shown here, I see following output but it doesn't proceed any further. Please advice. My DataJoin.jar includes following libs: hadoop-common.jar hadoop-datajoin-2.0.0...

Show Detail

Join using Hadoop Map Reduce to join data from NoSQL databases

I am currently using Solr as a NoSQL database. I have indexed various types of documents that sometimes have relationships between them. For new use cases I have to perform the equivalent of a join

Show Detail

java.lang.reflect.InvocationTargetException in Reduce DataJoin ( Hadoop In Action )

I am having a problem running the DataJoin example in the Hadoop In Action. It seems like while running the job, java.lang.reflect.InvocationTargetException was thrown. I tried it for a day and it...

Show Detail

Hadoop Data Set Map Reduce DataJoin

Code I tried to run DataJoin example of Hadoop in Action Book. import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; // import org.apache.commons.logging.Log; // impor...

Show Detail

Hadoop data join package

I am new to hadoop while exploring the hadoop data join package I am given the below mentioned command: hadoop jar /home/biadmin/DataJoin.jar com.datajoin.DataJoin /user/biadmin/Datajoin/

Show Detail

DataJoins in Hadoop MapReduce

I am trying to implement one use case as given in Book Hadoop In Action, but I am not being to compile the code. I am new to Java so, not being able to understand the exact reasons behind the error...

Show Detail

Hadoop sorting by value in a Reduce Side Join

So I'm just working on some Hadoop training getting to learn the lay of the land, and I'm attempting to do a reduce side join, which I have functioning, except for the secondary sort. So the basic...

Show Detail

Hadoop: Reduce-side join get stuck at map 100% reduce 100% and never finish

I'm beginner with Hadoop, these days I'm trying to run reduce-side join example but it got stuck: Map 100% and Reduce 100% but never finishing. Progress,logs, code, sample data and configuration fi...

Show Detail

Configure Map Side join for multiple mappers in Hadoop Map/Reduce

I have a question about configuring Map/Side inner join for multiple mappers in Hadoop. Suppose I have two very large data sets A and B, I use the same partition and sort algorithm to split them i...

Show Detail