Skip to content

Commit

Permalink
MLPDataFile: Python (Pandas, Numpy); Java, Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
Roffild committed May 4, 2020
1 parent 9259e9c commit 6285c60
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import roffild.mqlport.Pointer;

import java.io.Closeable;
import java.nio.file.Paths;

import static roffild.mqlport.MqlLibrary.*;

public class MLPDataFile implements Closeable
Expand Down Expand Up @@ -250,6 +252,13 @@ public static Double[] doublesToDoubles(final double[] doubles)
return result;
}

public static String getAbsolutePath(final int file, final boolean validation)
{
return Paths.get(getPathFilesCommon(),
"MLPData/mlp_" + Integer.toString(file) + (validation ? "_validation" : "") + ".bin")
.toAbsolutePath().toString();
}

@Override
protected void finalize() throws Throwable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;

public class MqlLibrary
Expand Down Expand Up @@ -118,22 +120,21 @@ public static String DoubleToString(double value, int digits)
return String.format(Locale.ENGLISH, "%." + digits + "f", value);
}

protected static ArrayList<FileHandle> Handles = new ArrayList<>();
protected static List<FileHandle> Handles = Collections.synchronizedList(new ArrayList<FileHandle>());

public static FileHandle getFileHandle(int file_handle) throws IOException
{
if (file_handle <= INVALID_HANDLE || file_handle >= Handles.size()) {
throw new IOException("INVALID_HANDLE");
synchronized (Handles) {
if (file_handle <= INVALID_HANDLE || file_handle >= Handles.size()) {
throw new IOException("INVALID_HANDLE");
}
return Handles.get(file_handle);
}
return Handles.get(file_handle);
}

public static RandomAccessFileLE getRas(int file_handle) throws IOException
{
if (file_handle <= INVALID_HANDLE || file_handle >= Handles.size()) {
throw new IOException("INVALID_HANDLE");
}
return Handles.get(file_handle).ras;
return getFileHandle(file_handle).ras;
}

protected static class FileHandle
Expand Down Expand Up @@ -232,8 +233,10 @@ public static int FileOpen(String file_name, int open_flags, String delimiter, l
} else {
ras = new RandomAccessFileLE(full_file_name.toFile(), "r");
}
Handles.add(new FileHandle(ras, open_flags, delimiter, codepage));
return Handles.size() - 1;
synchronized (Handles) {
Handles.add(new FileHandle(ras, open_flags, delimiter, codepage));
return Handles.size() - 1;
}
} catch (IOException e) {
return INVALID_HANDLE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class MLPDataFileRecordReader extends RecordReader<String[], Row>
protected MLPDataFile mlpfile;
public Pointer<Integer> nin = new Pointer<Integer>(0);
public Pointer<Integer> nout = new Pointer<Integer>(0);
public String[] header;
public String[] header = new String[0];
protected MqlArray<Double> data = new MqlArray<Double>();
protected Pointer<Integer> size = new Pointer<Integer>(0);
private long start, tell, finish;
Expand All @@ -56,16 +56,6 @@ public void initialize(InputSplit split, TaskAttemptContext context)
FileSeek(mlpfile.handleFile, start, 0);
}
setBufferSize(mlpfile.handleFile, 1024 * 1024);
if (mlpfile.header.size() != (nin.value + nout.value)) {
mlpfile.header.clear();
for (int x = 0; x < nin.value; x++) {
mlpfile.header.add("fd" + x);
}
for (int x = 0; x < nout.value; x++) {
mlpfile.header.add("result" + x);
}
}
header = mlpfile.header.toArray(new String[1]);
}

@Override
Expand All @@ -81,6 +71,14 @@ public boolean nextKeyValue() throws IOException, InterruptedException
@Override
public String[] getCurrentKey() throws IOException, InterruptedException
{
if (header.length == 0) {
if (mlpfile.header.size() == 0) {
for (int x = 0; x < size.value; x++) {
mlpfile.header.add("" + x);
}
}
header = mlpfile.header.toArray(new String[0]);
}
if (header.length != size.value) {
throw new IOException("header (" + header.length + ") != size (" + size.value + ")");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import roffild.MLPDataFile;

import java.util.Iterator;

public class MLPDataFileSpark
{
Expand Down Expand Up @@ -74,4 +77,31 @@ public static Dataset<Row> getDatasetVector(String path, SparkSession spark,
});
return spark.createDataFrame(getPairRDDVector(path, jsc, conf).values(), st);
}

public static void writeDatasetToFile(Dataset<Row> dataset, final int file, final int nin, final int nout,
final boolean validation)
{
writeDatasetToFile(dataset, file, nin, nout, new String[0], validation);
}
public static void writeDatasetToFile(Dataset<Row> dataset, final int file, final int nin, final int nout,
final String _header[], final boolean validation)
{
try (MLPDataFile mlpfile = new MLPDataFile()) {
if (validation) {
mlpfile.initWriteValidation(file, nin, nout, _header);
} else {
mlpfile.initWrite(file, nin, nout, _header);
}
writeDatasetToFile(dataset, mlpfile);
}
}
public static void writeDatasetToFile(Dataset<Row> dataset, MLPDataFile mlpfile)
{
Iterator<Row> it = dataset.toLocalIterator();
while (it.hasNext()) {
mlpfile.write(MLPDataFile.DoublesTodoubles(
scala.collection.JavaConverters.seqAsJavaListConverter(it.next().toSeq())
.asJava().toArray(new Double[0])));
}
}
}
2 changes: 2 additions & 0 deletions Include/Roffild/RoffildPython/roffild/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
#
# https://github.com/Roffild/RoffildLibrary
# ==============================================================================
from .mlpdatafile import MLPDataFile
from .utils import openUnicode, WindowsConfigParser, parseMTReport
60 changes: 60 additions & 0 deletions Include/Roffild/RoffildPython/roffild/mlpdatafile.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,66 @@ def convertToCsv(file: int, validation: bool = False, delimiter: str = ";") -> b
return True;
return False;

@staticmethod
def convertToNumpy(file: int, validation: bool = False, nin: list = None, nout: list = None):
import numpy
with MLPDataFile() as mlpfile:
if nin is None: nin = [0]
if nout is None: nout = [0]
if validation:
mlpfile.initReadValidation(file, nin, nout)
else:
mlpfile.initRead(file, nin, nout)
if mlpfile.handleFile == INVALID_HANDLE:
return None
data = []
dt = []
sz = [0]
while mlpfile.read(dt, sz) == sz[0] and sz[0] > 0:
data.append(numpy.array(dt, numpy.float64))
dt = []
return numpy.vstack(data)

@staticmethod
def convertToPandas(file: int, validation: bool = False, nin: list = None, nout: list = None):
import pandas
with MLPDataFile() as mlpfile:
if nin is None: nin = [0]
if nout is None: nout = [0]
if validation:
mlpfile.initReadValidation(file, nin, nout)
else:
mlpfile.initRead(file, nin, nout)
if mlpfile.handleFile == INVALID_HANDLE:
return None
_header = mlpfile.header
return pandas.DataFrame(MLPDataFile.convertToNumpy(file=file, validation=validation),
columns=_header)

@staticmethod
def convertFromNumpy(ndarray, file: int, nin: int, nout: int, _header: list = None,
validation: bool = False):
with MLPDataFile() as mlpfile:
if _header is None: _header = []
if validation:
mlpfile.initWriteValidation(file, nin, nout, _header)
else:
mlpfile.initWrite(file, nin, nout, _header)
if mlpfile.handleFile == INVALID_HANDLE:
return None
mlpfile.writeAll(ndarray.astype(dtype="float64"))

@staticmethod
def convertFromPandas(dataframe, file: int, nin: int, nout: int, validation: bool = False):
header = dataframe.columns.to_list() if dataframe.columns.is_object() else []
MLPDataFile.convertFromNumpy(ndarray=dataframe.to_numpy(dtype="float64"), file=file, nin=nin,
nout=nout, _header=header, validation=validation)

@staticmethod
def getAbsolutePath(file: int, validation: bool = False) -> str:
p = "_validation.bin" if validation else ".bin"
return str(pathlib.Path(getPathFilesCommon(), f"MLPData/mlp_{file}{p}").absolute())

@staticmethod
def getPathFiles() -> str:
return getPathFiles()
Expand Down

0 comments on commit 6285c60

Please sign in to comment.