Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.cassandra.hadoop; import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; /* * This is based on from hadoop-2.0.x sources. */
Utility methods to allow applications to deal with inconsistencies between MapReduce Context Objects API between Hadoop 1.x and 2.x.
/** * Utility methods to allow applications to deal with inconsistencies between * MapReduce Context Objects API between Hadoop 1.x and 2.x. */
public class HadoopCompat { private static final boolean useV21; private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR; private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR; private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR; private static final Constructor<?> GENERIC_COUNTER_CONSTRUCTOR; private static final Field READER_FIELD; private static final Field WRITER_FIELD; private static final Method GET_CONFIGURATION_METHOD; private static final Method SET_STATUS_METHOD; private static final Method GET_COUNTER_METHOD; private static final Method INCREMENT_COUNTER_METHOD; private static final Method GET_TASK_ATTEMPT_ID; private static final Method PROGRESS_METHOD; static { boolean v21 = true; final String PACKAGE = "org.apache.hadoop.mapreduce"; try { Class.forName(PACKAGE + ".task.JobContextImpl"); } catch (ClassNotFoundException cnfe) { v21 = false; } useV21 = v21; Class<?> jobContextCls; Class<?> taskContextCls; Class<?> taskIOContextCls; Class<?> mapContextCls; Class<?> genericCounterCls; try { if (v21) { jobContextCls = Class.forName(PACKAGE+".task.JobContextImpl"); taskContextCls = Class.forName(PACKAGE+".task.TaskAttemptContextImpl"); taskIOContextCls = Class.forName(PACKAGE+".task.TaskInputOutputContextImpl"); mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl"); genericCounterCls = Class.forName(PACKAGE+".counters.GenericCounter"); } else { jobContextCls = Class.forName(PACKAGE+".JobContext"); taskContextCls = Class.forName(PACKAGE+".TaskAttemptContext"); taskIOContextCls = Class.forName(PACKAGE+".TaskInputOutputContext"); mapContextCls = Class.forName(PACKAGE + ".MapContext"); genericCounterCls = Class.forName("org.apache.hadoop.mapred.Counters$Counter"); } } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Can't find class", e); } try { JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class); JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class); TASK_CONTEXT_CONSTRUCTOR.setAccessible(true); GENERIC_COUNTER_CONSTRUCTOR = genericCounterCls.getDeclaredConstructor(String.class, String.class, Long.TYPE); GENERIC_COUNTER_CONSTRUCTOR.setAccessible(true); if (useV21) { MAP_CONTEXT_CONSTRUCTOR = mapContextCls.getDeclaredConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, OutputCommitter.class, StatusReporter.class, InputSplit.class); Method get_counter; try { get_counter = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter", String.class, String.class); } catch (Exception e) { get_counter = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter", String.class, String.class); } GET_COUNTER_METHOD = get_counter; } else { MAP_CONTEXT_CONSTRUCTOR = mapContextCls.getConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, OutputCommitter.class, StatusReporter.class, InputSplit.class); GET_COUNTER_METHOD = Class.forName(PACKAGE+".TaskInputOutputContext") .getMethod("getCounter", String.class, String.class); } MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); READER_FIELD = mapContextCls.getDeclaredField("reader"); READER_FIELD.setAccessible(true); WRITER_FIELD = taskIOContextCls.getDeclaredField("output"); WRITER_FIELD.setAccessible(true); GET_CONFIGURATION_METHOD = Class.forName(PACKAGE+".JobContext") .getMethod("getConfiguration"); SET_STATUS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext") .getMethod("setStatus", String.class); GET_TASK_ATTEMPT_ID = Class.forName(PACKAGE+".TaskAttemptContext") .getMethod("getTaskAttemptID"); INCREMENT_COUNTER_METHOD = Class.forName(PACKAGE+".Counter") .getMethod("increment", Long.TYPE); PROGRESS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext") .getMethod("progress"); } catch (SecurityException e) { throw new IllegalArgumentException("Can't run constructor ", e); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Can't find constructor ", e); } catch (NoSuchFieldException e) { throw new IllegalArgumentException("Can't find field ", e); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Can't find class", e); } }
True if runtime Hadoop version is 2.x, false otherwise.
/** * True if runtime Hadoop version is 2.x, false otherwise. */
public static boolean isVersion2x() { return useV21; } private static Object newInstance(Constructor<?> constructor, Object...args) { try { return constructor.newInstance(args); } catch (InstantiationException e) { throw new IllegalArgumentException("Can't instantiate " + constructor, e); } catch (IllegalAccessException e) { throw new IllegalArgumentException("Can't instantiate " + constructor, e); } catch (InvocationTargetException e) { throw new IllegalArgumentException("Can't instantiate " + constructor, e); } }
Creates JobContext from a JobConf and jobId using the correct constructor for based on Hadoop version. jobId could be null.
/** * Creates JobContext from a JobConf and jobId using the correct constructor * for based on Hadoop version. <code>jobId</code> could be null. */
public static JobContext newJobContext(Configuration conf, JobID jobId) { return (JobContext) newInstance(JOB_CONTEXT_CONSTRUCTOR, conf, jobId); }
Creates TaskAttempContext from a JobConf and jobId using the correct constructor for based on Hadoop version.
/** * Creates TaskAttempContext from a JobConf and jobId using the correct * constructor for based on Hadoop version. */
public static TaskAttemptContext newTaskAttemptContext( Configuration conf, TaskAttemptID taskAttemptId) { return (TaskAttemptContext) newInstance(TASK_CONTEXT_CONSTRUCTOR, conf, taskAttemptId); }
Instantiates MapContext under Hadoop 1 and MapContextImpl under Hadoop 2.
/** * Instantiates MapContext under Hadoop 1 and MapContextImpl under Hadoop 2. */
public static MapContext newMapContext(Configuration conf, TaskAttemptID taskAttemptID, RecordReader recordReader, RecordWriter recordWriter, OutputCommitter outputCommitter, StatusReporter statusReporter, InputSplit inputSplit) { return (MapContext) newInstance(MAP_CONTEXT_CONSTRUCTOR, conf, taskAttemptID, recordReader, recordWriter, outputCommitter, statusReporter, inputSplit); }
Returns:with Hadoop 2 : new GenericCounter(args),
with Hadoop 1 : new Counter(args)
/** * @return with Hadoop 2 : <code>new GenericCounter(args)</code>,<br> * with Hadoop 1 : <code>new Counter(args)</code> */
public static Counter newGenericCounter(String name, String displayName, long value) { try { return (Counter) GENERIC_COUNTER_CONSTRUCTOR.newInstance(name, displayName, value); } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new IllegalArgumentException("Can't instantiate Counter", e); } }
Invokes a method and rethrows any exception as runtime excetpions.
/** * Invokes a method and rethrows any exception as runtime excetpions. */
private static Object invoke(Method method, Object obj, Object... args) { try { return method.invoke(obj, args); } catch (IllegalAccessException | InvocationTargetException e) { throw new IllegalArgumentException("Can't invoke method " + method.getName(), e); } }
Invoke getConfiguration() on JobContext. Works with both Hadoop 1 and 2.
/** * Invoke getConfiguration() on JobContext. Works with both * Hadoop 1 and 2. */
public static Configuration getConfiguration(JobContext context) { return (Configuration) invoke(GET_CONFIGURATION_METHOD, context); }
Invoke setStatus() on TaskAttemptContext. Works with both Hadoop 1 and 2.
/** * Invoke setStatus() on TaskAttemptContext. Works with both * Hadoop 1 and 2. */
public static void setStatus(TaskAttemptContext context, String status) { invoke(SET_STATUS_METHOD, context, status); }
returns TaskAttemptContext.getTaskAttemptID(). Works with both Hadoop 1 and 2.
/** * returns TaskAttemptContext.getTaskAttemptID(). Works with both * Hadoop 1 and 2. */
public static TaskAttemptID getTaskAttemptID(TaskAttemptContext taskContext) { return (TaskAttemptID) invoke(GET_TASK_ATTEMPT_ID, taskContext); }
Invoke getCounter() on TaskInputOutputContext. Works with both Hadoop 1 and 2.
/** * Invoke getCounter() on TaskInputOutputContext. Works with both * Hadoop 1 and 2. */
public static Counter getCounter(TaskInputOutputContext context, String groupName, String counterName) { return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName); }
Invoke TaskAttemptContext.progress(). Works with both Hadoop 1 and 2.
/** * Invoke TaskAttemptContext.progress(). Works with both * Hadoop 1 and 2. */
public static void progress(TaskAttemptContext context) { invoke(PROGRESS_METHOD, context); }
Increment the counter. Works with both Hadoop 1 and 2
/** * Increment the counter. Works with both Hadoop 1 and 2 */
public static void incrementCounter(Counter counter, long increment) { // incrementing a count might be called often. Might be affected by // cost of invoke(). might be good candidate to handle in a shim. // (TODO Raghu) figure out how achieve such a build with maven invoke(INCREMENT_COUNTER_METHOD, counter, increment); } }