diff --git a/vep/src/main/resources/common.py b/vep/src/main/resources/commonStdin.py similarity index 65% rename from vep/src/main/resources/common.py rename to vep/src/main/resources/commonStdin.py index 76602a1b..6c7705d4 100644 --- a/vep/src/main/resources/common.py +++ b/vep/src/main/resources/commonStdin.py @@ -1,35 +1,5 @@ -#!/usr/bin/python3 - -import argparse +import sys import json -import os.path -import re -import subprocess -import tempfile - -S3DIR = 's3://dig-analysis-data/out/varianteffect' - - -def colocated_variant(row, ref, alt): - """ - Find the first colocated variant with a matching allele string or None. - """ - co = row.get('colocated_variants', []) - - # if there is only a single co-located variant, return it - if len(co) == 1: - return co[0] - - # only keep colocated variants where the minor allele is ref or alt - variants = [v for v in co if v.get('minor_allele') in [ref, alt]] - - # fail if no colocated variants - if not variants: - return None - - # find the first with an rsID or default to the first variant - return next((v for v in variants if v.get('id', '').startswith('rs')), variants[0]) - def dbSNP(v): """ @@ -40,46 +10,6 @@ def dbSNP(v): # only return if an actual rsID return rsid if rsid and rsid.startswith('rs') else None - -def allele_frequencies(v, ref, alt): - """ - Extract allele frequencies for a colocated variant. - """ - af = v.get('frequencies', {}) - - # find the matching allele in the frequency map (prefer alt) - allele = next((a for a in [alt, ref] if a in af), None) - - # no allele frequency data? - if not allele: - return { - 'EU': None, - 'HS': None, - 'AA': None, - 'EA': None, - 'SA': None, - } - - # lookup the frequency data - def get_freq(*cols): - freqs = [af[allele].get(c) for c in cols] - - # find the first non-null/zero frequency from the columns - freq = next((f for f in freqs if f), None) - - # flip the frequency if this is the reference allele - return 1.0 - freq if allele == ref else freq - - # try gnomad, if not there use 1kg - return { - 'EU': get_freq('gnomad_nfe', 'eur'), - 'HS': get_freq('gnomad_amr', 'amr'), - 'AA': get_freq('gnomad_afr', 'afr'), - 'EA': get_freq('gnomad_eas', 'eas'), - 'SA': get_freq('gnomad_sas', 'sas'), - } - - def common_fields(row): """ Extracts data from the colocated variant fields. @@ -117,64 +47,73 @@ def common_fields(row): 'af': allele_frequencies(variant, ref, alt), } - -def process_part(srcdir, outdir, part): +def allele_frequencies(v, ref, alt): """ - Download and process a part file. + Extract allele frequencies for a colocated variant. """ - _, tmp = tempfile.mkstemp() - - # copy the file into a temp file - subprocess.check_call(['aws', 's3', 'cp', f'{srcdir}/{part}', tmp]) - - # loop over every line, parse, and create common row - with open(tmp) as fp: - with open(part, mode='w', encoding='utf-8') as out: - for line in fp: - row = json.loads(line) - common = common_fields(row) + af = v.get('frequencies', {}) - # write the common record to the output - if common: - json.dump( - common, - out, - separators=(',', ':'), - check_circular=False, - ) + # find the matching allele in the frequency map (prefer alt) + allele = next((a for a in [alt, ref] if a in af), None) - # output a newline after the json - print(file=out) + # no allele frequency data? + if not allele: + return { + 'EU': None, + 'HS': None, + 'AA': None, + 'EA': None, + 'SA': None, + } - # copy the output file to S3 - subprocess.check_call(['aws', 's3', 'cp', part, f'{outdir}/{part}']) + # lookup the frequency data + def get_freq(*cols): + freqs = [af[allele].get(c) for c in cols] - # cleanup - os.remove(tmp) - os.remove(part) + # find the first non-null/zero frequency from the columns + freq = next((f for f in freqs if f), None) - # debug output - print(f'Processed {part} successfully') + # flip the frequency if this is the reference allele + return 1.0 - freq if allele == ref else freq + # try gnomad, if not there use 1kg + return { + 'EU': get_freq('gnomad_nfe', 'eur'), + 'HS': get_freq('gnomad_amr', 'amr'), + 'AA': get_freq('gnomad_afr', 'afr'), + 'EA': get_freq('gnomad_eas', 'eas'), + 'SA': get_freq('gnomad_sas', 'sas'), + } -def main(): +def colocated_variant(row, ref, alt): """ - Arguments: part-file + Find the first colocated variant with a matching allele string or None. """ - opts = argparse.ArgumentParser() - opts.add_argument('part') + co = row.get('colocated_variants', []) - # parse cli - args = opts.parse_args() + # if there is only a single co-located variant, return it + if len(co) == 1: + return co[0] + + # only keep colocated variants where the minor allele is ref or alt + variants = [v for v in co if v.get('minor_allele') in [ref, alt]] - # s3 locations - srcdir = f'{S3DIR}/effects' - outdir = f'{S3DIR}/common' + # fail if no colocated variants + if not variants: + return None + + # find the first with an rsID or default to the first variant + return next((v for v in variants if v.get('id', '').startswith('rs')), variants[0]) - # run - process_part(srcdir, outdir, args.part) +for line in sys.stdin: + row = json.loads(line) + common = common_fields(row) + json.dump( + common, + sys.stdout, + separators=(',', ':'), + check_circular=False, + ) + sys.stdout.write('\n') -# entry point -if __name__ == '__main__': - main() diff --git a/vep/src/main/resources/cqs.py b/vep/src/main/resources/cqs.py deleted file mode 100644 index 9fa77d73..00000000 --- a/vep/src/main/resources/cqs.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import json -import os.path -import re -import subprocess -import tempfile - -S3DIR = 's3://dig-analysis-data/out/varianteffect' - - -def rename_cqs_field(s): - """ - Lifted from BioIndex code to camelCase the names of consequences. - """ - s=re.sub(r'(?:[^a-z0-9]+)(.)', lambda m: m.group(1).upper(), s, flags=re.IGNORECASE) - s=re.sub(r'^[A-Z][a-z]+', lambda m: m.group(0).lower(), s) - - return s - - -def exploded_consequences(row): - """ - Extracts transcript consequences from the row. - """ - for cqs in row.get('transcript_consequences', []): - record = dict() - - # include the variant ID and locus with each consequence - record['varId'] = row['id'] - record['chromosome'] = row['seq_region_name'] - record['position'] = int(row['start']) - - # apend all the consequence fields; ignore some - for k, v in cqs.items(): - if k not in ['domains']: - record[rename_cqs_field(k)] = v - - # add the record to the returned values - yield record - - -def process_part(srcdir, outdir, part): - """ - Download and process a part file. - """ - _, tmp = tempfile.mkstemp() - - # copy the file into a temp file - subprocess.check_call(['aws', 's3', 'cp', f'{srcdir}/{part}', tmp]) - - # loop over every line, parse, and create common row - with open(tmp) as fp: - with open(part, mode='w', encoding='utf-8') as out: - for line in fp: - row = json.loads(line) - - # write the common record to the output - for cqs in exploded_consequences(row): - json.dump( - cqs, - out, - separators=(',', ':'), - check_circular=False, - ) - - # output a newline after the json - print(file=out) - - # copy the output file to S3 - subprocess.check_call(['aws', 's3', 'cp', part, f'{outdir}/{part}']) - - # cleanup - os.remove(tmp) - os.remove(part) - - # debug output - print(f'Processed {part} successfully') - - -def main(): - """ - Arguments: part-file - """ - opts = argparse.ArgumentParser() - opts.add_argument('part') - - # parse cli - args = opts.parse_args() - - # s3 locations - srcdir = f'{S3DIR}/effects' - outdir = f'{S3DIR}/cqs' - - # run - process_part(srcdir, outdir, args.part) - - -# entry point -if __name__ == '__main__': - main() diff --git a/vep/src/main/resources/cqsStdin.py b/vep/src/main/resources/cqsStdin.py new file mode 100644 index 00000000..e1813e69 --- /dev/null +++ b/vep/src/main/resources/cqsStdin.py @@ -0,0 +1,58 @@ +#!/usr/bin/python3 + +import json +import re +import sys + +S3DIR = 's3://dig-analysis-data/out/varianteffect' + + +def rename_cqs_field(s): + """ + Lifted from BioIndex code to camelCase the names of consequences. + """ + s=re.sub(r'(?:[^a-z0-9]+)(.)', lambda m: m.group(1).upper(), s, flags=re.IGNORECASE) + s=re.sub(r'^[A-Z][a-z]+', lambda m: m.group(0).lower(), s) + + return s + + +def exploded_consequences(row): + """ + Extracts transcript consequences from the row. + """ + for cqs in row.get('transcript_consequences', []): + record = dict() + + # include the variant ID and locus with each consequence + record['varId'] = row['id'] + record['chromosome'] = row['seq_region_name'] + record['position'] = int(row['start']) + + # apend all the consequence fields; ignore some + for k, v in cqs.items(): + if k not in ['domains']: + record[rename_cqs_field(k)] = v + + # add the record to the returned values + yield record + + +for line in sys.stdin: + row = json.loads(line) + + # write the common record to the output + for cqs in exploded_consequences(row): + json.dump( + cqs, + sys.stdout, + separators=(',', ':'), + check_circular=False, + ) + + # output a newline after the json + sys.stdout.write('\n') + + + + diff --git a/vep/src/main/resources/listVariants.py b/vep/src/main/resources/listVariants.py index b01b8b1f..1e511373 100644 --- a/vep/src/main/resources/listVariants.py +++ b/vep/src/main/resources/listVariants.py @@ -2,7 +2,7 @@ import platform from pyspark.sql import SparkSession -from pyspark.sql.functions import concat_ws, length, lit, when # pylint: disable=E0611 +from pyspark.sql.functions import concat_ws, length, lit, when, broadcast # pylint: disable=E0611 S3DIR = 's3://dig-analysis-data' @@ -24,21 +24,26 @@ def main(): """ print('Python version: %s' % platform.python_version()) - # get the source and output directories - dataset_srcdir = f'{S3DIR}/variants/*/*/*' - ld_server_srcdir = f'{S3DIR}/ld_server/variants/*' - outdir = f'{S3DIR}/out/varianteffect/variants' + # just specify, potentially with a list only new datasets + dataset_srcdir = f'{S3DIR}/variants/ExSeq/*/*' + existing_variants = f'{S3DIR}/out/varianteffect/variants' + new_variants = f'{S3DIR}/out/varianteffect/new-variants' + - # create a spark session spark = SparkSession.builder.appName('vep').getOrCreate() - # slurp all the variants across ALL datasets, but only locus information - # combine with variants in LD Server to make sure all LD Server variants go through VEP for burden binning + # reduce new datasets just to their variants dataset_df = get_df(spark, dataset_srcdir) - ld_server_df = get_df(spark, ld_server_srcdir) + df = dataset_df.dropDuplicates(['varId']) + + # read in all the variants we already know about and put them in varId column + existing_variants_df = spark.read.csv(existing_variants, sep='\t', header=False) + existing_variants_df = existing_variants_df.select('_c5').withColumnRenamed('_c5', 'varId') - df = dataset_df.union(ld_server_df)\ - .dropDuplicates(['varId']) + # find variants in new datasets that aren't already in existing datasets, this code + # assumes that the data in df is significantly smaller than existing_variants_df + df = broadcast(df).join(existing_variants_df, dataset_df.varId == existing_variants.varId, + "leftanti") # get the length of the reference and alternate alleles ref_len = length(df.reference) @@ -72,10 +77,11 @@ def main(): df.varId, ) - # output the variants as CSV part files - df.write \ - .mode('overwrite') \ - .csv(outdir, sep='\t') + # if we find new variants, put them in the new_variants path for additional processing + # and append them to the existing deduped variants path + if not df.rdd.isEmpty(): + df.write.mode('overwrite').csv(new_variants, sep='\t') + df.write.mode('append').csv(existing_variants, sep='\t') # done spark.stop() diff --git a/vep/src/main/resources/runVEP.sh b/vep/src/main/resources/runVEP.sh index 871efbea..146ee058 100644 --- a/vep/src/main/resources/runVEP.sh +++ b/vep/src/main/resources/runVEP.sh @@ -1,19 +1,21 @@ #!/bin/bash -xe # set where the source and destination is in S3 and where VEP is -S3DIR="s3://dig-analysis-data/out/varianteffect" +S3DIR="s3://dig-analysis-data/out" VEPDIR="/mnt/var/vep" # get the name of the part file from the command line; set the output filename PART=$(basename -- "$1") OUTFILE="${PART%.*}.json" +COMMON_OUTFILE="${PART%.*}_common.json" +CQS_OUTFILE="${PART%.*}_cqs.json" WARNINGS="${OUTFILE}_warnings.txt" # update the path to include samtools and tabix PATH="$PATH:$VEPDIR/samtools-1.9/:$VEPDIR/ensembl-vep/htslib" # copy the part file from S3 to local -aws s3 cp "$S3DIR/variants/$PART" . +aws s3 cp "$S3DIR/varianteffect/new-variants/$PART" . # ensure the file is sorted sort -k1,1 -k2,2n "$PART" > "$PART.sorted" @@ -51,16 +53,19 @@ perl -I "$VEPDIR/loftee-0.3-beta" "$VEPDIR/ensembl-vep/vep" \ --plugin dbNSFP,$VEPDIR/dbNSFP/dbNSFP4.1a_grch37.gz,M-CAP_score,CADD_raw_rankscore,DANN_rankscore,Eigen-PC-raw_coding_rankscore,Polyphen2_HDIV_pred,Polyphen2_HVAR_pred,SIFT_pred,LRT_pred,MutationTaster_pred,FATHMM_pred,fathmm-MKL_coding_pred,PROVEAN_pred,MetaSVM_pred,MetaLR_pred,VEST4_score,VEST4_rankscore,gnomAD_genomes_POPMAX_AF \ --fields SYMBOL,NEAREST,IMPACT,MAX_AF,AFR_AF,AMR_AF,EAS_AF,EUR_AF,SAS_AF,HGVSc,HGVSp,HGVS_OFFSET,PICK,CCDS,TSL,APPRIS,BIOTYPE,CANONICAL,HGNC,ENSP,DOMAINS,MOTIF_NAME,MOTIF_POS,HIGH_INF_POS,MOTIF_SCORE_CHANGE,SIFT,cDNA_position,CDS_position,Amino_acids,Codons,Protein_position,Protein_change,LoF,LoF_filter,LoF_flags,M-CAP_score,CADD_raw_rankscore,DANN_rankscore,Eigen-PC-raw_coding_rankscore,Polyphen2_HDIV_pred,Polyphen2_HVAR_pred,SIFT_pred,LRT_pred,MutationTaster_pred,FATHMM_pred,fathmm-MKL_coding_pred,PROVEAN_pred,MetaSVM_pred,MetaLR_pred,VEST4_score,VEST4_rankscore,gnomAD_genomes_POPMAX_AF \ -i "$PART.sorted" \ - -o "$OUTFILE" \ - --force_overwrite + -o /dev/stdout \ + --warning_file "$WARNINGS" \ + --force_overwrite | tee >(python3 commonStdin.py > "$COMMON_OUTFILE") | python3 cqsStdin.py > "$CQS_OUTFILE" -# copy the output of VEP back to S3 -aws s3 cp "$OUTFILE" "$S3DIR/effects/$OUTFILE" +# transfer common and cqs output to s3 +aws s3 cp "$COMMON_OUTFILE" "$S3DIR/varianteffect/common/$OUTFILE" +aws s3 cp "$CQS_OUTFILE" "$S3DIR/varianteffect/cqs/$OUTFILE" # delete the input and output files; keep the cluster clean rm "$PART" rm "$PART.sorted" -rm "$OUTFILE" +rm "$COMMON_OUTFILE" +rm "$CQS_OUTFILE" # check for a warnings file, upload that, too and then delete it if [ -e "$WARNINGS" ]; then diff --git a/vep/src/main/scala/CommonStage.scala b/vep/src/main/scala/CommonStage.scala deleted file mode 100644 index 4262b7c0..00000000 --- a/vep/src/main/scala/CommonStage.scala +++ /dev/null @@ -1,67 +0,0 @@ -package org.broadinstitute.dig.aggregator.methods.vep - -import org.broadinstitute.dig.aggregator.core._ -import org.broadinstitute.dig.aws.Ec2.Strategy -import org.broadinstitute.dig.aws.MemorySize -import org.broadinstitute.dig.aws.emr._ -import org.broadinstitute.dig.aws.emr.configurations.{MapReduce, Spark} - -/** After all the variants across all datasets have had VEP run on them in the - * previous step the rsID for each variant is extracted into its own file. - * - * The input location: - * - * s3://dig-analysis-data/out/varianteffect/effects/part-*.json - * - * The output location: - * - * s3://dig-analysis-data/out/varianteffect/common/part-*.json - * - * The inputs and outputs for this processor are expected to be phenotypes. - */ -class CommonStage(implicit context: Context) extends Stage { - import MemorySize.Implicits._ - - val effects: Input.Source = Input.Source.Success("out/varianteffect/effects/") - - /** Input sources. */ - override val sources: Seq[Input.Source] = Seq(effects) - - // EMR cluster to run the job steps on - override def cluster: ClusterDef = super.cluster.copy( - masterVolumeSizeInGB = 400, - instances = 1, - applications = Seq.empty, - stepConcurrency = 3 - ) - - /** Make inputs to the outputs. */ - override val rules: PartialFunction[Input, Outputs] = { - case _ => Outputs.Named("common") - } - - /** All effect results are combined together, so the results list is ignored. */ - override def make(output: String): Job = { - val runScript = resourceUri("common.py") - - // get all the variant part files to process, use only the part filename - val objects = context.s3.ls(s"out/varianteffect/effects/") - val parts = objects.map(_.key.split('/').last).filter(_.startsWith("part-")) - - // add a step for each part file - new Job(parts.map(Job.Script(runScript, _)), parallelSteps = true) - } - - /** Before the jobs actually run, perform this operation. - */ - override def prepareJob(output: String): Unit = { - context.s3.rm("out/varianteffect/common/") - } - - /** Update the success flag of the merged regions. - */ - override def success(output: String): Unit = { - context.s3.touch(s"out/varianteffect/common/_SUCCESS") - () - } -} diff --git a/vep/src/main/scala/CqsStage.scala b/vep/src/main/scala/CqsStage.scala deleted file mode 100644 index 9f35cb7f..00000000 --- a/vep/src/main/scala/CqsStage.scala +++ /dev/null @@ -1,67 +0,0 @@ -package org.broadinstitute.dig.aggregator.methods.vep - -import org.broadinstitute.dig.aggregator.core._ -import org.broadinstitute.dig.aws.Ec2.Strategy -import org.broadinstitute.dig.aws.MemorySize -import org.broadinstitute.dig.aws.emr._ -import org.broadinstitute.dig.aws.emr.configurations.{MapReduce, Spark} - -/** After all the variants across all datasets have had VEP run on them in the - * previous step the rsID for each variant is extracted into its own file. - * - * The input location: - * - * s3://dig-analysis-data/out/varianteffect/effects/part-*.json - * - * The output location: - * - * s3://dig-analysis-data/out/varianteffect/cqs/part-*.json - * - * The inputs and outputs for this processor are expected to be phenotypes. - */ -class CqsStage(implicit context: Context) extends Stage { - import MemorySize.Implicits._ - - val effects: Input.Source = Input.Source.Success("out/varianteffect/effects/") - - /** Input sources. */ - override val sources: Seq[Input.Source] = Seq(effects) - - // EMR cluster to run the job steps on - override def cluster: ClusterDef = super.cluster.copy( - masterVolumeSizeInGB = 400, - instances = 1, - applications = Seq.empty, - stepConcurrency = 3 - ) - - /** Make inputs to the outputs. */ - override val rules: PartialFunction[Input, Outputs] = { - case _ => Outputs.Named("cqs") - } - - /** All effect results are combined together, so the results list is ignored. */ - override def make(output: String): Job = { - val runScript = resourceUri("cqs.py") - - // get all the variant part files to process, use only the part filename - val objects = context.s3.ls(s"out/varianteffect/effects/") - val parts = objects.map(_.key.split('/').last).filter(_.startsWith("part-")) - - // add a step for each part file - new Job(parts.map(Job.Script(runScript, _)), parallelSteps = true) - } - - /** Before the jobs actually run, perform this operation. - */ - override def prepareJob(output: String): Unit = { - context.s3.rm("out/varianteffect/cqs/") - } - - /** Update the success flag of the merged regions. - */ - override def success(output: String): Unit = { - context.s3.touch(s"out/varianteffect/cqs/_SUCCESS") - () - } -} diff --git a/vep/src/main/scala/Vep.scala b/vep/src/main/scala/Vep.scala index aa0be36d..964974f7 100644 --- a/vep/src/main/scala/Vep.scala +++ b/vep/src/main/scala/Vep.scala @@ -25,7 +25,5 @@ object Vep extends Method { addStage(new ListVariantsStage) addStage(new VepStage) addStage(new SnpStage) - addStage(new CommonStage) - addStage(new CqsStage) } } diff --git a/vep/src/main/scala/VepStage.scala b/vep/src/main/scala/VepStage.scala index 75de4dae..fabf1519 100644 --- a/vep/src/main/scala/VepStage.scala +++ b/vep/src/main/scala/VepStage.scala @@ -21,11 +21,11 @@ import org.broadinstitute.dig.aws.emr._ class VepStage(implicit context: Context) extends Stage { import MemorySize.Implicits._ - val variants: Input.Source = Input.Source.Success("out/varianteffect/variants/") + val variants: Input.Source = Input.Source.Success("out/varianteffect/new-variants/") /** Additional resources that need uploaded to S3. */ override def additionalResources: Seq[String] = Seq( - "runVEP.sh" + "runVEP.sh", "commonStdin.py", "cqsStdin.py" ) /** Input sources. */ @@ -59,24 +59,12 @@ class VepStage(implicit context: Context) extends Stage { val runScript = resourceUri("runVEP.sh") // get all the variant part files to process, use only the part filename - val objects = context.s3.ls(s"out/varianteffect/variants/") + val objects = context.s3.ls(s"out/varianteffect/new-variants/") val parts = objects.map(_.key.split('/').last).filter(_.startsWith("part-")) // add a step for each part file new Job(parts.map(Job.Script(runScript, _)), parallelSteps = true) } - /** Before the jobs actually run, perform this operation. - */ - override def prepareJob(output: String): Unit = { - context.s3.rm("out/varianteffect/effects/") - context.s3.rm("out/varianteffect/warnings/") - } - /** On success, write the _SUCCESS file in the output directory. - */ - override def success(output: String): Unit = { - context.s3.touch("out/varianteffect/effects/_SUCCESS") - () - } }