Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 9 additions & 36 deletions src/hapibd/PbwtIbd.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public final class PbwtIbd implements Runnable {

private PrintWriter hbdOut = printWriter(hbdBaos);
private PrintWriter ibdOut = printWriter(ibdBaos);
private final Map<Integer, PrintWriter> ibdWriters;
private final Map<Integer, PrintWriter> hbdWriters;
private final String outputPrefix;
private final String splitFilename;
private final boolean split;
private SplitFileWriterManager splitFileWriterManager;

private boolean useSeedQ = false;
private final int nWindows;
private final IntList seedList;
Expand Down Expand Up @@ -125,7 +125,8 @@ private static PrintWriter printWriter(ByteArrayOutputStream out) {
public PbwtIbd(HapIbdPar par, RefGT gt, MarkerMap map,
int windowStart, int windowEnd, int nWindows,
BlockingQueue<int[]> seedQ,
SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS) {
SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS,
SplitFileWriterManager splitFileWriterManager) {
if (gt.isPhased()==false) {
throw new IllegalArgumentException("unphased data");
}
Expand Down Expand Up @@ -162,10 +163,9 @@ public PbwtIbd(HapIbdPar par, RefGT gt, MarkerMap map,
this.ibdOS = ibdOS;
this.split = par.split();

this.splitFileWriterManager = splitFileWriterManager;
this.outputPrefix = par.out();
this.splitFilename = par.splitFilename();
this.ibdWriters = new ConcurrentHashMap<>();
this.hbdWriters = new ConcurrentHashMap<>();

this.pbwt = new PbwtUpdater(nHaps);
this.a = IntStream.range(0, nHaps).toArray();
Expand Down Expand Up @@ -205,14 +205,6 @@ public void run() {
catch (Throwable t) {
Utilities.exit(t);
}
finally {
for (PrintWriter writer : ibdWriters.values()) {
writer.close();
}
for (PrintWriter writer : hbdWriters.values()) {
writer.close();
}
}
}

private int advancePbwtToFirstIbsEnd() {
Expand Down Expand Up @@ -337,11 +329,11 @@ private void processSeed(int hap1, int hap2, int start, int inclEnd) {
inclEnd = extendInclEnd(hap1, hap2, inclEnd);
if ((genPos[inclEnd] - genPos[start])>=minOutput) {
if ((hap1>>1)==(hap2>>1)) {
writeSegment(hap1, hap2, start, inclEnd, hbdOut, "hbd");
writeSegment(hap1, hap2, start, inclEnd, hbdOut, SplitFileWriterManager.MatchType.HBD);
N_HBD_SEGS.incrementAndGet();
}
else {
writeSegment(hap1, hap2, start, inclEnd, ibdOut, "ibd");
writeSegment(hap1, hap2, start, inclEnd, ibdOut, SplitFileWriterManager.MatchType.IBD);
N_IBD_SEGS.incrementAndGet();
}
}
Expand Down Expand Up @@ -478,27 +470,8 @@ private void flushIbdBuffer(int byteThreshold) {
}
}

private PrintWriter getWriterForProxyKey(int proxyKey, String type) {
Map<Integer, PrintWriter> writers = type.equals("ibd") ? ibdWriters : hbdWriters;
return writers.computeIfAbsent(proxyKey, key -> {
try {
String dirPath = outputPrefix + "/" + key;
File dir = new File(dirPath);
if (!dir.mkdirs() && !dir.isDirectory()) {
Utilities.exit("ERROR: Failed to create directory " + dirPath);
}
String filename = dirPath + "/" + splitFilename + "." + type;
return new PrintWriter(new File(filename));
}
catch (IOException e) {
Utilities.exit("ERROR creating " + type + " file for proxy key " + key + ": ", e);
return null; // This will never be reached due to Utilities.exit
}
});
}

private void writeSegment(int hap1, int hap2, int start, int inclEnd,
PrintWriter out, String type) {
PrintWriter out, SplitFileWriterManager.MatchType type) {
// At Embark, the new dog, ie the higher proxy key, comes first
if (Integer.parseInt(ids[hap1>>1]) < Integer.parseInt(ids[hap2>>1])) {
int tmp = hap1;
Expand All @@ -513,7 +486,7 @@ private void writeSegment(int hap1, int hap2, int start, int inclEnd,
}

if (split) {
PrintWriter splitWriter = getWriterForProxyKey(hap1ProxyKey, type);
PrintWriter splitWriter = splitFileWriterManager.getWriterForProxyKey(hap1ProxyKey, type);
synchronized (splitWriter) {
printSegment(splitWriter, hap1ProxyKey, hap2ProxyKey, hap1, hap2, start, inclEnd);
splitWriter.println(); // flushes line to file
Expand Down
10 changes: 6 additions & 4 deletions src/hapibd/PbwtIbdDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public static long[] detectIbd(HapIbdPar par) {
File ibdFile = new File(par.out() + ".ibd");
try (SampleFileIt<RefGTRec> it = refIt(par);
SynchFileOutputStream hbdOS = new SynchFileOutputStream(hbdFile);
SynchFileOutputStream ibdOS = new SynchFileOutputStream(ibdFile)) {
SynchFileOutputStream ibdOS = new SynchFileOutputStream(ibdFile);
SplitFileWriterManager splitFileWriterManager = par.split() ? new SplitFileWriterManager(par) : null ){
try {
nSamplesAndMarkers[0] = it.samples().nSamples();
List<RefGTRec> recList = new ArrayList<>(1<<14);
Expand All @@ -80,7 +81,7 @@ public static long[] detectIbd(HapIbdPar par) {
if (recList.isEmpty()==false) {
RefGT gt = new RefGT(recList.toArray(new RefGTRec[0]));
MarkerMap map = MarkerMap.create(genMap, gt.markers());
PbwtIbdDriver.detectIBD(par, gt, map, hbdOS, ibdOS);
PbwtIbdDriver.detectIBD(par, gt, map, hbdOS, ibdOS, splitFileWriterManager);
nSamplesAndMarkers[1] += gt.nMarkers();
}
}
Expand All @@ -94,7 +95,8 @@ public static long[] detectIbd(HapIbdPar par) {
}

private static void detectIBD(HapIbdPar par, RefGT gt, MarkerMap map,
SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS) {
SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS,
SplitFileWriterManager splitFileWriterManager) {
float minSeed = par.min_seed();
int minMarkers = par.min_markers();
double[] genPos = map.genPos().toArray();
Expand All @@ -106,7 +108,7 @@ private static void detectIBD(HapIbdPar par, RefGT gt, MarkerMap map,
ExecutorService execService = Executors.newFixedThreadPool(starts.length);
for (int j=0; j<starts.length; ++j) {
PbwtIbd pbwtIbs = new PbwtIbd(par, gt, map, starts[j], ends[j],
starts.length, seedQ, hbdOS, ibdOS);
starts.length, seedQ, hbdOS, ibdOS, splitFileWriterManager);
execService.submit(pbwtIbs);
}
MultiThreadUtils.shutdownExecService(execService);
Expand Down
56 changes: 56 additions & 0 deletions src/hapibd/SplitFileWriterManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package hapibd;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import blbutil.Utilities;


public class SplitFileWriterManager implements AutoCloseable {

public static enum MatchType {HBD, IBD};

private final String outputPrefix;
private final String splitFilename;

private final Map<Integer, PrintWriter> ibdWriters;
private final Map<Integer, PrintWriter> hbdWriters;

public SplitFileWriterManager(HapIbdPar par) {
this.outputPrefix = par.out();
this.splitFilename = par.splitFilename();
this.ibdWriters = new ConcurrentHashMap<>();
this.hbdWriters = new ConcurrentHashMap<>();
}

public PrintWriter getWriterForProxyKey(int proxyKey, MatchType type) {
Map<Integer, PrintWriter> writers = type.equals(MatchType.IBD) ? ibdWriters : hbdWriters;
return writers.computeIfAbsent(proxyKey, key -> {
try {
String dirPath = outputPrefix + "/" + key;
File dir = new File(dirPath);
if (!dir.mkdirs() && !dir.isDirectory()) {
Utilities.exit("ERROR: Failed to create directory " + dirPath);
}
String filename = dirPath + "/" + splitFilename + "." + type.toString().toLowerCase();
return new PrintWriter(new File(filename));
}
catch (IOException e) {
Utilities.exit("ERROR creating " + type + " file for proxy key " + key + ": ", e);
return null; // This will never be reached due to Utilities.exit
}
});
}

public synchronized void close() {
for (PrintWriter writer : ibdWriters.values()) {
writer.close();
}
for (PrintWriter writer : hbdWriters.values()) {
writer.close();
}
}
}