/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hadoop.cql3;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.NativeSSTableLoaderClient;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
The CqlBulkRecordWriter
maps the output <key, value>
pairs to a Cassandra column family. In particular, it applies the binded variables
in the value to the prepared statement, which it associates with the key, and in
turn the responsible endpoint.
Furthermore, this writer groups the cql queries by the endpoint responsible for
the rows being affected. This allows the cql queries to be executed in parallel,
directly to a responsible endpoint.
See Also: - CqlBulkOutputFormat
/**
* The <code>CqlBulkRecordWriter</code> maps the output <key, value>
* pairs to a Cassandra column family. In particular, it applies the binded variables
* in the value to the prepared statement, which it associates with the key, and in
* turn the responsible endpoint.
*
* <p>
* Furthermore, this writer groups the cql queries by the endpoint responsible for
* the rows being affected. This allows the cql queries to be executed in parallel,
* directly to a responsible endpoint.
* </p>
*
* @see CqlBulkOutputFormat
*/
public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
implements org.apache.hadoop.mapred.RecordWriter<Object, List<ByteBuffer>>
{
public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
public final static String IGNORE_HOSTS = "mapreduce.output.bulkoutputformat.ignorehosts";
private final Logger logger = LoggerFactory.getLogger(CqlBulkRecordWriter.class);
protected final Configuration conf;
protected final int maxFailures;
protected final int bufferSize;
protected Closeable writer;
protected SSTableLoader loader;
protected Progressable progress;
protected TaskAttemptContext context;
protected final Set<InetAddress> ignores = new HashSet<>();
private String keyspace;
private String table;
private String schema;
private String insertStatement;
private File outputDir;
private boolean deleteSrc;
private IPartitioner partitioner;
CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
{
this(HadoopCompat.getConfiguration(context));
this.context = context;
setConfigs();
}
CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException
{
this(conf);
this.progress = progress;
setConfigs();
}
CqlBulkRecordWriter(Configuration conf) throws IOException
{
this.conf = conf;
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
setConfigs();
}
private void setConfigs() throws IOException
{
// if anything is missing, exceptions will be thrown here, instead of on write()
keyspace = ConfigHelper.getOutputKeyspace(conf);
table = ConfigHelper.getOutputColumnFamily(conf);
// check if table is aliased
String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
if (aliasedCf != null)
table = aliasedCf;
schema = CqlBulkOutputFormat.getTableSchema(conf, table);
insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table);
outputDir = getTableDirectory();
deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
try
{
partitioner = ConfigHelper.getInputPartitioner(conf);
}
catch (Exception e)
{
partitioner = Murmur3Partitioner.instance;
}
try
{
for (String hostToIgnore : CqlBulkOutputFormat.getIgnoreHosts(conf))
ignores.add(InetAddress.getByName(hostToIgnore));
}
catch (UnknownHostException e)
{
throw new RuntimeException(("Unknown host: " + e.getMessage()));
}
}
protected String getOutputLocation() throws IOException
{
String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
if (dir == null)
throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
return dir;
}
private void prepareWriter() throws IOException
{
if (writer == null)
{
writer = CQLSSTableWriter.builder()
.forTable(schema)
.using(insertStatement)
.withPartitioner(ConfigHelper.getOutputPartitioner(conf))
.inDirectory(outputDir)
.withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
.withPartitioner(partitioner)
.build();
}
if (loader == null)
{
ExternalClient externalClient = new ExternalClient(conf);
externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler())
{
@Override
public void onSuccess(StreamState finalState)
{
if (deleteSrc)
FileUtils.deleteRecursive(outputDir);
}
};
}
}
The column values must correspond to the order in which
they appear in the insert stored procedure.
Key is not used, so it can be null or any object.
Params: - key –
any object or null.
- values –
the values to write.
Throws:
/**
* <p>
* The column values must correspond to the order in which
* they appear in the insert stored procedure.
*
* Key is not used, so it can be null or any object.
* </p>
*
* @param key
* any object or null.
* @param values
* the values to write.
* @throws IOException
*/
@Override
public void write(Object key, List<ByteBuffer> values) throws IOException
{
prepareWriter();
try
{
((CQLSSTableWriter) writer).rawAddRow(values);
if (null != progress)
progress.progress();
if (null != context)
HadoopCompat.progress(context);
}
catch (InvalidRequestException e)
{
throw new IOException("Error adding row with key: " + key, e);
}
}
private File getTableDirectory() throws IOException
{
File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, table, UUID.randomUUID().toString()));
if (!dir.exists() && !dir.mkdirs())
{
throw new IOException("Failed to created output directory: " + dir);
}
return dir;
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
close();
}
Fills the deprecated RecordWriter interface for streaming. /** Fills the deprecated RecordWriter interface for streaming. */
@Deprecated
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
{
close();
}
private void close() throws IOException
{
if (writer != null)
{
writer.close();
Future<StreamState> future = loader.stream(ignores);
while (true)
{
try
{
future.get(1000, TimeUnit.MILLISECONDS);
break;
}
catch (ExecutionException | TimeoutException te)
{
if (null != progress)
progress.progress();
if (null != context)
HadoopCompat.progress(context);
}
catch (InterruptedException e)
{
throw new IOException(e);
}
}
if (loader.getFailedHosts().size() > 0)
{
if (loader.getFailedHosts().size() > maxFailures)
throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
else
logger.warn("Some hosts failed: {}", loader.getFailedHosts());
}
}
}
public static class ExternalClient extends NativeSSTableLoaderClient
{
public ExternalClient(Configuration conf)
{
super(resolveHostAddresses(conf),
CqlConfigHelper.getOutputNativePort(conf),
ConfigHelper.getOutputKeyspaceUserName(conf),
ConfigHelper.getOutputKeyspacePassword(conf),
CqlConfigHelper.getSSLOptions(conf).orNull());
}
private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
{
Set<InetAddress> addresses = new HashSet<>();
for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
{
try
{
addresses.add(InetAddress.getByName(host));
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
return addresses;
}
}
public static class NullOutputHandler implements OutputHandler
{
public void output(String msg) {}
public void debug(String msg) {}
public void warn(String msg) {}
public void warn(String msg, Throwable th) {}
}
}