diff --git a/src/hapibd/PbwtIbd.java b/src/hapibd/PbwtIbd.java index 070c17d..c4d26ee 100644 --- a/src/hapibd/PbwtIbd.java +++ b/src/hapibd/PbwtIbd.java @@ -89,11 +89,11 @@ public final class PbwtIbd implements Runnable { private PrintWriter hbdOut = printWriter(hbdBaos); private PrintWriter ibdOut = printWriter(ibdBaos); - private final Map ibdWriters; - private final Map hbdWriters; private final String outputPrefix; private final String splitFilename; private final boolean split; + private SplitFileWriterManager splitFileWriterManager; + private boolean useSeedQ = false; private final int nWindows; private final IntList seedList; @@ -125,7 +125,8 @@ private static PrintWriter printWriter(ByteArrayOutputStream out) { public PbwtIbd(HapIbdPar par, RefGT gt, MarkerMap map, int windowStart, int windowEnd, int nWindows, BlockingQueue seedQ, - SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS) { + SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS, + SplitFileWriterManager splitFileWriterManager) { if (gt.isPhased()==false) { throw new IllegalArgumentException("unphased data"); } @@ -162,10 +163,9 @@ public PbwtIbd(HapIbdPar par, RefGT gt, MarkerMap map, this.ibdOS = ibdOS; this.split = par.split(); + this.splitFileWriterManager = splitFileWriterManager; this.outputPrefix = par.out(); this.splitFilename = par.splitFilename(); - this.ibdWriters = new ConcurrentHashMap<>(); - this.hbdWriters = new ConcurrentHashMap<>(); this.pbwt = new PbwtUpdater(nHaps); this.a = IntStream.range(0, nHaps).toArray(); @@ -205,14 +205,6 @@ public void run() { catch (Throwable t) { Utilities.exit(t); } - finally { - for (PrintWriter writer : ibdWriters.values()) { - writer.close(); - } - for (PrintWriter writer : hbdWriters.values()) { - writer.close(); - } - } } private int advancePbwtToFirstIbsEnd() { @@ -337,11 +329,11 @@ private void processSeed(int hap1, int hap2, int start, int inclEnd) { inclEnd = extendInclEnd(hap1, hap2, inclEnd); if ((genPos[inclEnd] - genPos[start])>=minOutput) { if ((hap1>>1)==(hap2>>1)) { - writeSegment(hap1, hap2, start, inclEnd, hbdOut, "hbd"); + writeSegment(hap1, hap2, start, inclEnd, hbdOut, SplitFileWriterManager.MatchType.HBD); N_HBD_SEGS.incrementAndGet(); } else { - writeSegment(hap1, hap2, start, inclEnd, ibdOut, "ibd"); + writeSegment(hap1, hap2, start, inclEnd, ibdOut, SplitFileWriterManager.MatchType.IBD); N_IBD_SEGS.incrementAndGet(); } } @@ -478,27 +470,8 @@ private void flushIbdBuffer(int byteThreshold) { } } - private PrintWriter getWriterForProxyKey(int proxyKey, String type) { - Map writers = type.equals("ibd") ? ibdWriters : hbdWriters; - return writers.computeIfAbsent(proxyKey, key -> { - try { - String dirPath = outputPrefix + "/" + key; - File dir = new File(dirPath); - if (!dir.mkdirs() && !dir.isDirectory()) { - Utilities.exit("ERROR: Failed to create directory " + dirPath); - } - String filename = dirPath + "/" + splitFilename + "." + type; - return new PrintWriter(new File(filename)); - } - catch (IOException e) { - Utilities.exit("ERROR creating " + type + " file for proxy key " + key + ": ", e); - return null; // This will never be reached due to Utilities.exit - } - }); - } - private void writeSegment(int hap1, int hap2, int start, int inclEnd, - PrintWriter out, String type) { + PrintWriter out, SplitFileWriterManager.MatchType type) { // At Embark, the new dog, ie the higher proxy key, comes first if (Integer.parseInt(ids[hap1>>1]) < Integer.parseInt(ids[hap2>>1])) { int tmp = hap1; @@ -513,7 +486,7 @@ private void writeSegment(int hap1, int hap2, int start, int inclEnd, } if (split) { - PrintWriter splitWriter = getWriterForProxyKey(hap1ProxyKey, type); + PrintWriter splitWriter = splitFileWriterManager.getWriterForProxyKey(hap1ProxyKey, type); synchronized (splitWriter) { printSegment(splitWriter, hap1ProxyKey, hap2ProxyKey, hap1, hap2, start, inclEnd); splitWriter.println(); // flushes line to file diff --git a/src/hapibd/PbwtIbdDriver.java b/src/hapibd/PbwtIbdDriver.java index 7993827..7801c96 100644 --- a/src/hapibd/PbwtIbdDriver.java +++ b/src/hapibd/PbwtIbdDriver.java @@ -69,7 +69,8 @@ public static long[] detectIbd(HapIbdPar par) { File ibdFile = new File(par.out() + ".ibd"); try (SampleFileIt it = refIt(par); SynchFileOutputStream hbdOS = new SynchFileOutputStream(hbdFile); - SynchFileOutputStream ibdOS = new SynchFileOutputStream(ibdFile)) { + SynchFileOutputStream ibdOS = new SynchFileOutputStream(ibdFile); + SplitFileWriterManager splitFileWriterManager = par.split() ? new SplitFileWriterManager(par) : null ){ try { nSamplesAndMarkers[0] = it.samples().nSamples(); List recList = new ArrayList<>(1<<14); @@ -80,7 +81,7 @@ public static long[] detectIbd(HapIbdPar par) { if (recList.isEmpty()==false) { RefGT gt = new RefGT(recList.toArray(new RefGTRec[0])); MarkerMap map = MarkerMap.create(genMap, gt.markers()); - PbwtIbdDriver.detectIBD(par, gt, map, hbdOS, ibdOS); + PbwtIbdDriver.detectIBD(par, gt, map, hbdOS, ibdOS, splitFileWriterManager); nSamplesAndMarkers[1] += gt.nMarkers(); } } @@ -94,7 +95,8 @@ public static long[] detectIbd(HapIbdPar par) { } private static void detectIBD(HapIbdPar par, RefGT gt, MarkerMap map, - SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS) { + SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS, + SplitFileWriterManager splitFileWriterManager) { float minSeed = par.min_seed(); int minMarkers = par.min_markers(); double[] genPos = map.genPos().toArray(); @@ -106,7 +108,7 @@ private static void detectIBD(HapIbdPar par, RefGT gt, MarkerMap map, ExecutorService execService = Executors.newFixedThreadPool(starts.length); for (int j=0; j ibdWriters; + private final Map hbdWriters; + + public SplitFileWriterManager(HapIbdPar par) { + this.outputPrefix = par.out(); + this.splitFilename = par.splitFilename(); + this.ibdWriters = new ConcurrentHashMap<>(); + this.hbdWriters = new ConcurrentHashMap<>(); + } + + public PrintWriter getWriterForProxyKey(int proxyKey, MatchType type) { + Map writers = type.equals(MatchType.IBD) ? ibdWriters : hbdWriters; + return writers.computeIfAbsent(proxyKey, key -> { + try { + String dirPath = outputPrefix + "/" + key; + File dir = new File(dirPath); + if (!dir.mkdirs() && !dir.isDirectory()) { + Utilities.exit("ERROR: Failed to create directory " + dirPath); + } + String filename = dirPath + "/" + splitFilename + "." + type.toString().toLowerCase(); + return new PrintWriter(new File(filename)); + } + catch (IOException e) { + Utilities.exit("ERROR creating " + type + " file for proxy key " + key + ": ", e); + return null; // This will never be reached due to Utilities.exit + } + }); + } + + public synchronized void close() { + for (PrintWriter writer : ibdWriters.values()) { + writer.close(); + } + for (PrintWriter writer : hbdWriters.values()) { + writer.close(); + } + } +}