package io.github.vmzakharov.ecdataframe.dataset;
import io.github.vmzakharov.ecdataframe.dataframe.*;
import io.github.vmzakharov.ecdataframe.dsl.value.ValueType;
import org.eclipse.collections.api.block.procedure.Procedure;
import org.eclipse.collections.api.list.ListIterable;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.impl.factory.Lists;
import java.io.*;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.format.ResolverStyle;
import java.time.temporal.TemporalAccessor;
public class CsvDataSet
extends DataSetAbstract
{
public static int BUFFER_SIZE = 65_536;
private final String dataFileName;
private boolean emptyElementsConvertedToNulls = false;
private CsvSchema schema;
private DateTimeFormatter[] formatters;
public CsvDataSet(String dataFileName, String newName)
{
this(dataFileName, newName, null);
}
public CsvDataSet(String dataFileName, String newName, CsvSchema newSchema)
{
super(newName);
this.dataFileName = dataFileName;
this.schema = newSchema;
}
@Override
public void openFileForReading()
{
File file = new File(this.dataFileName);
}
public void convertEmptyElementsToNulls()
{
this.emptyElementsConvertedToNulls = true;
}
@Override
public Object next()
{
return null;
}
@Override
public boolean hasNext()
{
return false;
}
@Override
public void close()
{
}
public void write(DataFrame dataFrame)
{
if (this.schemaIsNotDefined())
{
this.schema = this.schemaFromDataFrame(dataFrame);
}
try (BufferedWriter writer = new BufferedWriter(this.createWriter(), BUFFER_SIZE))
{
int columnCount = dataFrame.columnCount();
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
writer.write(schema.columnAt(columnIndex).getName());
if (columnIndex < columnCount - 1)
{
writer.write(this.getSchema().getSeparator());
}
}
writer.write('\n');
int rowCount = dataFrame.rowCount();
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
this.writeValue(writer, dataFrame, rowIndex, columnIndex);
if (columnIndex < columnCount - 1)
{
writer.write(this.getSchema().getSeparator());
}
}
writer.write('\n');
}
}
catch (IOException e)
{
throw new RuntimeException("Failed write data frame to '" + this.dataFileName + "'", e);
}
}
private CsvSchema schemaFromDataFrame(DataFrame dataFrame)
{
CsvSchema schema = new CsvSchema();
dataFrame.getColumns().forEach(e -> schema.addColumn(e.getName(), e.getType()));
return schema;
}
private void writeValue(Writer writer, DataFrame dataFrame, int rowIndex, int columnIndex)
throws IOException
{
DfColumn column = dataFrame.getColumnAt(columnIndex);
String valueAsLiteral;
switch (column.getType())
{
case LONG:
case DOUBLE:
valueAsLiteral = column.getValueAsStringLiteral(rowIndex);
break;
case STRING:
String stringValue = column.getValueAsString(rowIndex);
valueAsLiteral = stringValue == null ? "" : schema.getQuoteCharacter() + stringValue + schema.getQuoteCharacter();
break;
case DATE:
LocalDate dateValue = ((DfDateColumn) column).getDate(rowIndex);
valueAsLiteral = dateValue == null ? "" : this.formatterForColumn(columnIndex).format(dateValue);
break;
default:
ErrorReporter.reportAndThrow("Do not know how to convert value of type " + column.getType() + " to a string");
valueAsLiteral = null;
}
writer.write(valueAsLiteral);
}
private DateTimeFormatter formatterForColumn(int columnIndex)
{
if (this.formatters == null)
{
this.formatters = new DateTimeFormatter[this.schema.columnCount()];
}
if (this.formatters[columnIndex] == null)
{
String pattern = schema.columnAt(columnIndex).getPattern();
this.formatters[columnIndex] = DateTimeFormatter.ofPattern(pattern);
}
return this.formatters[columnIndex];
}
protected Writer createWriter()
throws IOException
{
return new FileWriter(this.dataFileName);
}
protected Reader createReader()
throws IOException
{
return new FileReader(this.dataFileName);
}
public DataFrame loadAsDataFrame()
{
DataFrame df = new DataFrame(this.getName());
df.enablePooling();
if (this.schemaIsNotDefined())
{
this.schema = new CsvSchema();
}
try (BufferedReader reader = new BufferedReader(this.createReader(), BUFFER_SIZE))
{
MutableList<String> headers = this.splitMindingQs(reader.readLine());
String dataRow = reader.readLine();
MutableList<String> firstRowElements = this.splitMindingQs(dataRow);
ErrorReporter.reportAndThrow(headers.size() != firstRowElements.size(),
"The number of elements in the header does not match the number of elements in the first data row ("
+ headers.size() + ", " + firstRowElements.size() + ")");
if (this.getSchema().columnCount() == 0)
{
this.inferSchema(headers, firstRowElements);
}
else
{
ErrorReporter.reportAndThrow(this.getSchema().columnCount() != firstRowElements.size(),
"The number of columns in the schema does not match the number of elements in the first data row ("
+ this.getSchema().columnCount() + ", " + firstRowElements.size() + ")");
}
MutableList<Procedure<String>> columnPopulators = Lists.mutable.of();
this.getSchema().getColumns().forEach(col -> addDataFrameColumn(df, col, columnPopulators));
int columnCount = this.getSchema().columnCount();
MutableList<String> lineElements = Lists.mutable.withInitialCapacity(columnCount);
do
{
this.parseAndAddLineToDataFrame(dataRow, lineElements, columnCount, columnPopulators);
}
while ((dataRow = reader.readLine()) != null);
df.seal();
}
catch (IOException e)
{
throw new RuntimeException("Failed to load file as a data frame. File '" + this.dataFileName + "'", e);
}
return df;
}
private void addDataFrameColumn(DataFrame df, CsvSchemaColumn col, MutableList<Procedure<String>> columnPopulators)
{
ValueType columnType = col.getType();
df.addColumn(col.getName(), col.getType());
DfColumn lastColumn = df.getColumnAt(df.columnCount() - 1);
switch (columnType)
{
case LONG:
columnPopulators.add(s -> ((DfLongColumnStored) lastColumn).addLong(col.parseAsLong(s)));
break;
case DOUBLE:
columnPopulators.add(s -> ((DfDoubleColumnStored) lastColumn).addDouble(col.parseAsDouble(s)));
break;
case STRING:
columnPopulators.add(s -> ((DfStringColumnStored) lastColumn).addString(col.parseAsString(s)));
break;
case DATE:
columnPopulators.add(s -> ((DfDateColumnStored) lastColumn).addDate(col.parseAsLocalDate(s)));
break;
default:
throw new RuntimeException("Don't know what to do with the column type: " + columnType);
}
}
private void inferSchema(MutableList<String> headers, MutableList<String> elements)
{
int columnCount = elements.size();
for (int i = 0; i < columnCount; i++)
{
String element = elements.get(i);
ValueType guessedType;
String matchingFormat = null;
if (this.getSchema().surroundedByQuotes(element))
{
guessedType = ValueType.STRING;
}
else if ((matchingFormat = this.findMatchingDateFormat(element)) != null)
{
guessedType = ValueType.DATE;
}
else if (this.canParseAsLong(element))
{
guessedType = ValueType.LONG;
}
else if (this.canParseAsDouble(element))
{
guessedType = ValueType.DOUBLE;
}
else
{
guessedType = ValueType.STRING;
}
this.schema.addColumn(headers.get(i), guessedType, matchingFormat);
}
}
private boolean schemaIsNotDefined()
{
return this.schema == null;
}
public CsvSchema getSchema()
{
return this.schema;
}
private void parseAndAddLineToDataFrame(String line, MutableList<String> elements, int columnCount, MutableList<Procedure<String>> columnPopulators)
{
this.splitMindingQsInto(line, elements);
for (int i = 0; i < columnCount; i++)
{
String element = elements.get(i);
if (this.getSchema().hasNullMarker())
{
if (this.getSchema().getNullMarker().equals(element))
element = null;
}
columnPopulators.get(i).accept(element);
}
}
private boolean canParseAsLong(String aString)
{
try
{
Long.parseLong(aString);
return true;
}
catch (NumberFormatException e)
{
return false;
}
}
private boolean canParseAsDouble(String aString)
{
try
{
Double.parseDouble(aString);
return true;
}
catch (NumberFormatException e)
{
return false;
}
}
private String findMatchingDateFormat(String aString)
{
ListIterable<String> dateFormats = Lists.immutable.of("uuuu/M/d", "uuuu-M-d", "M/d/uuuu");
String trimmed = aString.trim();
for (int i = 0; i < dateFormats.size(); i++)
{
String pattern = dateFormats.get(i);
try
{
DateTimeFormatter candidateFormatter = DateTimeFormatter.ofPattern(pattern).withResolverStyle(ResolverStyle.STRICT);
LocalDate.parse(trimmed, candidateFormatter);
return pattern;
}
catch (DateTimeParseException e)
{
}
}
return null;
}
private MutableList<String> splitMindingQs(String aString)
{
MutableList<String> elements = Lists.mutable.of();
this.splitMindingQsInto(aString, elements);
return elements;
}
public void splitMindingQsInto(String aString, MutableList<String> elements)
{
elements.clear();
int currentTokenStart = 0;
boolean insideQuotes = false;
boolean initialBlanks = true;
boolean closedQuote = false;
int charCount = aString.length();
for (int index = 0; index < charCount; index++)
{
char curChar = aString.charAt(index);
boolean endOfLine = index == charCount - 1;
if (endOfLine)
{
if (insideQuotes && !this.isQuote(curChar))
{
this.throwBadFormat("Unbalanced quotes at index " + index + " in " + aString);
}
if (this.isTokenSeparator(curChar))
{
if (!closedQuote)
{
elements.add(substringOrNull(aString, currentTokenStart, index));
}
elements.add(substringOrNull(aString, index + 1, index + 1));
}
else
{
elements.add(this.substringOrNull(aString, currentTokenStart, index + 1));
}
}
else if (insideQuotes)
{
if (this.isQuote(curChar))
{
insideQuotes = false;
closedQuote = true;
elements.add(this.substringOrNull(aString, currentTokenStart, index + 1));
currentTokenStart = index + 1;
}
}
else if (this.isTokenSeparator(curChar))
{
if (!closedQuote)
{
elements.add(this.substringOrNull(aString, currentTokenStart, index));
}
closedQuote = false;
initialBlanks = true;
currentTokenStart = index + 1;
}
else if (initialBlanks)
{
if (!Character.isSpaceChar(curChar))
{
initialBlanks = false;
if (this.isQuote(curChar))
{
insideQuotes = true;
currentTokenStart = index;
}
}
}
}
}
private void throwBadFormat(String message)
{
throw new RuntimeException(message);
}
private boolean isTokenSeparator(char aChar)
{
return aChar == this.getSchema().getSeparator();
}
private boolean isQuote(char aChar)
{
return aChar == this.getSchema().getQuoteCharacter();
}
private String substringOrNull(String aString, int beginIndex, int endIndex)
{
if(beginIndex < endIndex)
{
return aString.substring(beginIndex, endIndex);
}
return this.emptyElementsConvertedToNulls ? null : "";
}
}