Skip to content

Commit

Permalink
Integrate 1KG and TCGA into new DB + Fixes + Support for ethnicity
Browse files Browse the repository at this point in the history
  • Loading branch information
tomalf2 committed Apr 22, 2020
1 parent 0559579 commit f507b1e
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 64 deletions.
34 changes: 34 additions & 0 deletions assets/dw.genomes_metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
CREATE MATERIALIZED VIEW dw.genomes_metadata
TABLESPACE default_ts
AS SELECT x.donor_id,
x.donor_source_id,
x.item_id,
x.item_source_id,
x.file_name,
x.local_url,
lower(x.assembly::text) AS assembly,
x.gender,
x.health_status,
x.population,
p1.value AS super_population,
p2.value AS dna_source
FROM ( SELECT biosample.donor_id,
donor.donor_source_id,
item.item_id,
item.item_source_id,
item.file_name,
item.local_url,
dataset.assembly,
donor.gender,
biosample.is_healthy AS health_status,
donor.ethnicity AS population
FROM dw.item
JOIN dataset USING (dataset_id)
JOIN replicate2item USING (item_id)
JOIN dw.replicate USING (replicate_id)
JOIN biosample USING (biosample_id)
JOIN donor USING (donor_id)
WHERE dataset.dataset_name::text ~~* '%1000GENOMES%'::text OR dataset.dataset_name::text ~~* '%TCGA_somatic_mutation%'::text OR dataset.dataset_name::text ~~* '%TCGA_dnaseq'::text) x
LEFT JOIN pair p1 ON x.item_id = p1.item_id AND p1.key::text = 'super_population'::text
LEFT JOIN pair p2 ON x.item_id = p2.item_id AND p2.key::text = 'dna_source_from_coriell'::text
WITH DATA;
42 changes: 42 additions & 0 deletions assets/dw.genomes_metadata_2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
CREATE MATERIALIZED VIEW dw.genomes_metadata_2
TABLESPACE default_ts
AS SELECT x.donor_id,
x.donor_source_id,
x.item_id,
x.item_source_id,
x.file_name,
x.local_url,
lower(x.assembly::text) AS assembly,
COALESCE(x.gender, 'not reported'::character varying) AS gender,
x.health_status,
CASE
WHEN x.dataset_name::text ~~* '%TCGA%'::text THEN NULL::character varying
ELSE x.population
END AS population,
CASE
WHEN x.dataset_name::text ~~* '%1000GENOMES%'::text THEN dw.kgenomes_ethnicity(p1.value)
ELSE COALESCE(x.population, 'not reported'::character varying)
END AS ethnicity,
p1.value AS super_population,
p2.value AS dna_source
FROM ( SELECT biosample.donor_id,
donor.donor_source_id,
item.item_id,
item.item_source_id,
item.file_name,
item.local_url,
dataset.assembly,
donor.gender,
biosample.is_healthy AS health_status,
donor.ethnicity AS population,
dataset.dataset_name
FROM dw.item
JOIN dataset USING (dataset_id)
JOIN replicate2item USING (item_id)
JOIN dw.replicate USING (replicate_id)
JOIN biosample USING (biosample_id)
JOIN donor USING (donor_id)
WHERE dataset.dataset_name::text ~~* '%1000GENOMES%'::text OR dataset.dataset_name::text ~~* '%TCGA_somatic_mutation%'::text OR dataset.dataset_name::text ~~* '%TCGA_dnaseq'::text) x
LEFT JOIN pair p1 ON x.item_id = p1.item_id AND p1.key::text = 'super_population'::text
LEFT JOIN pair p2 ON x.item_id = p2.item_id AND p2.key::text = 'dna_source_from_coriell'::text
WITH DATA;
21 changes: 21 additions & 0 deletions assets/dw.kgenomes_ethnicity.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE OR REPLACE FUNCTION dw.kgenomes_ethnicity(super_population varchar)
RETURNS varchar AS $$
declare res varchar;
begin
if super_population = 'AMR' then
res := 'latin american';
elsif super_population = 'EUR' then
res := 'white';
elsif super_population = 'AFR' then
res := 'black or african american';
elsif super_population = 'SAS' or super_population = 'EAS' then
res := 'asian';
else
res := 'not reported';
end if;
return res;
-- this is a workaround to assign ethnicities to 1000 genomes. The correct way would be
-- to rerun metadata manager and assign the values from them, but the need for ethnicity
-- in 1000 genomes is a need that arrived too late to re-run metadata manager (it takes 2 weeks).
end;
$$ language plpgsql;
7 changes: 5 additions & 2 deletions data_sources/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


_sources: List[Type[Source]] = [
TCGA
KGenomes, TCGA
]
_annotation_sources: List[Type[AnnotInterface]] = [
Gencode
Expand Down Expand Up @@ -504,14 +504,17 @@ def try_catch_source_errors(self, fun, alternative_return_value, container_of_no
except sqlalchemy.exc.DBAPIError:
self.logger.exception('Wrong usage of the underlying database')
return alternative_return_value
except EmptyResult as empty_res:
self.logger.debug(empty_res)
return None
except Notice as notice:
# notices are eventually added to the response if the response is still a valid response,
# or attached to a more severe exception otherwise. So they will be part of the result in any case.
self.logger.info(notice.msg)
container_of_notices.append(notice)
return alternative_return_value
except Exception:
self.logger.exception('unknown exception caught in coordinator')
self.logger.exception('unknown exception caught from a source')
return alternative_return_value

def get_as_dictionary(self, stmt_to_execute, log_with_intro: Optional[str], add_notices: List[Notice]):
Expand Down
31 changes: 23 additions & 8 deletions data_sources/io_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def __init__(self,
dna_source: Optional[list] = None,
assembly: str = None,
population: Optional[list] = None,
super_population: Optional[list] = None):
super_population: Optional[list] = None,
ethnicity: Optional[list] = None):
self.free_dimensions = []
self.constrained_dimensions = []

Expand Down Expand Up @@ -153,16 +154,24 @@ def __init__(self,
if population:
self.population = population
self.constrained_dimensions.append(Vocabulary.POPULATION)
else:
self.population = None
self.free_dimensions.append(Vocabulary.POPULATION)

if super_population:
# consistency rule: super_population and ethnicity are free
self.super_population, self.ethnicity = None, None
self.free_dimensions.extend([Vocabulary.SUPER_POPULATION, Vocabulary.ETHNICITY])
elif super_population:
self.super_population = super_population
self.constrained_dimensions.append(Vocabulary.SUPER_POPULATION)
# consistency rule: population and ethnicity are free
self.population, self.ethnicity = None, None
self.free_dimensions.extend([Vocabulary.POPULATION, Vocabulary.ETHNICITY])
elif ethnicity:
self.ethnicity = ethnicity
self.constrained_dimensions.append(Vocabulary.ETHNICITY)
# consistency rule: population and super_population are free
self.population, self.super_population = None, None
self.free_dimensions.extend([Vocabulary.POPULATION, Vocabulary.SUPER_POPULATION])
else:
self.super_population = None
self.free_dimensions.append(Vocabulary.SUPER_POPULATION)
self.population, self.super_population, self.ethnicity = None, None, None
self.free_dimensions.extend([Vocabulary.POPULATION, Vocabulary.SUPER_POPULATION, Vocabulary.ETHNICITY])


class Vocabulary(Enum):
Expand All @@ -175,6 +184,7 @@ class Vocabulary(Enum):
SUPER_POPULATION = 6
DOWNLOAD_URL = 7
DONOR_ID = 8
ETHNICITY = 9

# dimensions of region kind
WITH_VARIANT = 101
Expand Down Expand Up @@ -228,6 +238,11 @@ def __init__(self, msg_explaining_cause_of_error: str):
self.msg = msg_explaining_cause_of_error


class EmptyResult(Exception):
def __init__(self, *args):
super().__init__(*args)


class SourceWarning(UserWarning):
"""
This class can be used by any source willing to communicate a problem affecting the result directly to the user. If
Expand Down
48 changes: 25 additions & 23 deletions data_sources/kgenomes/kgenomes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ..source_interface import *
from ..io_parameters import *
from sqlalchemy import MetaData, Table, cast, select, union_all, union, tuple_, func, exists, asc, desc, intersect, literal, column, types
from sqlalchemy import MetaData, Table, cast, select, union_all, union, tuple_, func, exists, asc, desc, intersect, literal, column, types, text
from sqlalchemy.sql.expression import Selectable
from sqlalchemy.engine import Connection
from functools import reduce
Expand All @@ -10,11 +10,11 @@
from loguru import logger

# SOURCE TABLE PARAMETERS
default_metadata_table_name = 'genomes_metadata_new'
default_metadata_table_name = 'genomes_metadata_2'
default_metadata_schema_name = 'dw'
default_region_table_name = 'genomes_full_data_red'
default_region_table_name = 'kgenomes_red'
default_region_schema_name = 'rr'
default_schema_to_use_name = 'dw'
default_schema_to_use_name = 'temp'
db_meta: Optional[MetaData] = None
# SOURCE TABLES
initializing_lock = RLock()
Expand All @@ -33,6 +33,7 @@ class KGenomes(Source):
Vocabulary.SUPER_POPULATION: 'super_population',
Vocabulary.HEALTH_STATUS: 'health_status',
Vocabulary.ASSEMBLY: 'assembly',
Vocabulary.ETHNICITY: 'ethnicity',
Vocabulary.DONOR_ID: 'donor_source_id'
}
# REGION CONSTRAINTS THAT CAN BE EXPRESSED WITH THIS SOURCE (REQUIRED BY SOURCE)
Expand Down Expand Up @@ -144,29 +145,22 @@ def variant_occurrence(self, connection: Connection, by_attributes: list, meta_a

def rank_variants_by_frequency(self, connection, meta_attrs: MetadataAttrs, region_attrs: RegionAttrs, ascending: bool,
freq_threshold: float, limit_result: int) -> FromClause:
# temporary fix for duplicated variants and wrong ones # TODO delete this
if ascending:
freq_threshold = max(0.00001, freq_threshold or 0.0) # avoid frequency 0
else:
freq_threshold = min(1.0, freq_threshold or 1.0) # avoid frequency > 1
# init state
self.connection = connection
self._set_meta_attributes(meta_attrs)
self.create_table_of_meta(['item_id', 'gender'])
self._set_region_attributes(region_attrs)
self.create_table_of_regions(['item_id'])
if self.my_region_t is None:
raise ValueError(
'Before using this method, you need to assign a valid state to the region attributes at least.'
'Please specify some region constraint.')

females_and_males_stmt = \
select([self.my_meta_t.c.gender, func.count(self.my_meta_t.c.item_id)]) \
.where(self.my_meta_t.c.item_id.in_(select([self.my_region_t.c.item_id]))) \
.group_by(self.my_meta_t.c.gender)
females_and_males = [row.values() for row in connection.execute(females_and_males_stmt).fetchall()]
females = next((el[1] for el in females_and_males if el[0] == 'female'), 0)
males = next((el[1] for el in females_and_males if el[0] == 'male'), 0)
gender_of_individuals = [row.values() for row in connection.execute(females_and_males_stmt).fetchall()]
if len(gender_of_individuals) == 0:
raise EmptyResult('KGenomes has no individuals matching the request parameters.')
females = next((el[1] for el in gender_of_individuals if el[0] == 'female'), 0)
males = next((el[1] for el in gender_of_individuals if el[0] == 'male'), 0)
population_size = males + females

# reduce size of the join with genomes table
Expand Down Expand Up @@ -206,10 +200,6 @@ def rank_variants_by_frequency(self, connection, meta_attrs: MetadataAttrs, regi
sample_set_with_limit,
genomes_red.c.item_id == sample_set_with_limit.c.item_id)) \
.group_by(genomes_red.c.chrom, genomes_red.c.start, genomes_red.c.ref, genomes_red.c.alt)
# temporary fix for duplicated variants # TODO delete this
stmt = stmt.having(
func_occurrence <= females*2 + males
)
if ascending:
if freq_threshold:
stmt = stmt.having(func_frequency_new >= freq_threshold)
Expand Down Expand Up @@ -304,6 +294,12 @@ def values_of_attribute(self, connection, attribute: Vocabulary):
'AMR',
'SAS'
],
self.meta_col_map[Vocabulary.ETHNICITY]: [
'latin american',
'black or african american',
'white',
'asian'
],
self.meta_col_map[Vocabulary.HEALTH_STATUS]: [
'true'
],
Expand Down Expand Up @@ -398,14 +394,18 @@ def _stmt_where_region_is_any_of_mutations(*mutations: Mutation, from_table, sel
# GENERATE DB ENTITIES
def create_table_of_meta(self, select_columns: Optional[list]):
"""Assigns my_meta_t as the table containing only the individuals with the required metadata characteristics"""
if self.meta_attrs.population is not None:
self.meta_attrs.super_population = None
columns_in_select = [metadata] # take all columns by default
if select_columns is not None: # otherwise take the ones in select_columns but make sure item_id is present
temp_set = set(select_columns)
temp_set.add('item_id')
columns_in_select = [metadata.c[col_name] for col_name in temp_set]
query = select(columns_in_select)
# noinspection SpellCheckingInspection
query = select(columns_in_select).where(metadata.c.item_id.in_(
text("select item_id from public.item where dataset_id in ( "
"select dataset_id from public.dataset "
"where dataset_name ilike '%1000GENOMES%' "
")")))

if self.meta_attrs.gender:
query = query.where(metadata.c.gender == self.meta_attrs.gender)
if self.meta_attrs.health_status:
Expand All @@ -418,6 +418,8 @@ def create_table_of_meta(self, select_columns: Optional[list]):
query = query.where(metadata.c.population.in_(self.meta_attrs.population))
elif self.meta_attrs.super_population:
query = query.where(metadata.c.super_population.in_(self.meta_attrs.super_population))
elif self.meta_attrs.ethnicity:
query = query.where(metadata.c.ethnicity.in_(self.meta_attrs.ethnicity))
new_meta_table_name = utils.random_t_name_w_prefix('meta')
utils.create_table_as(new_meta_table_name, query, default_schema_to_use_name, self.connection, self.log_sql_commands, self.logger.debug)
# t_stmt = utils.stmt_create_table_as(new_meta_table_name, query, default_schema_to_use_name)
Expand Down
Loading

0 comments on commit f507b1e

Please sign in to comment.