diff --git a/docs/large_inputs.md b/docs/large_inputs.md index eae4770c1..ccf8adfc9 100644 --- a/docs/large_inputs.md +++ b/docs/large_inputs.md @@ -14,7 +14,6 @@ Default settings: --worker_machine_type \ --disk_size_gb \ --worker_disk_type \ - --num_bigquery_write_shards \ --partition_config_path \ ``` @@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery output rather than just being specified once as in the VCF header), you typically need 3 to 4 times the total size of the raw VCF files. -In addition, if [merging](variant_merging.md) or -[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may +In addition, if [merging](variant_merging.md) is enabled, you may need more disk per worker (e.g. 500GB) as the same variants need to be aggregated together on one machine. @@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`), they can reduce cost as they can avoid idle CPU cycles due to disk IOPS limitations. -As a result, we recommend using SSDs if [merging](variant_merge.md) or -[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these -operations require "shuffling" the data (i.e. redistributing the data among -workers), which require significant disk I/O. +As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled: +this operation requires "shuffling" the data (i.e. redistributing the data +among workers), which requires significant disk I/O. Set `--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd` to use SSDs. -### `--num_bigquery_write_shards` - -Currently, the write operation to BigQuery in Dataflow is performed as a -postprocessing step after the main transforms are done. As a workaround for -BigQuery write limitations (more details -[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)), -we have added "sharding" when writing to BigQuery. This makes the data load -to BigQuery significantly faster as it parallelizes the process and enables -loading large (>5TB) data to BigQuery at once. - -As a result, we recommend setting `--num_bigquery_write_shards 20` when loading -any data that has more than 1 billion rows (after merging) or 1TB of final -output. You may use a smaller number of write shards (e.g. 5) when using -[sharded output](#--sharding_config_path) as each partition also acts as a -"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to -fail as there is a maximum limit on the number of concurrent writes per table. - ### `--sharding_config_path` Sharding the output can save significant query costs once the data is in @@ -146,4 +126,3 @@ partition). As a result, we recommend setting the partition config for very large data where possible. Please see the [documentation](sharding.md) for more details. - diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 94ef2d405..086ab6904 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -185,12 +185,8 @@ def add_arguments(self, parser): parser.add_argument( '--num_bigquery_write_shards', type=int, default=1, - help=('Before writing the final result to output BigQuery, the data is ' - 'sharded to avoid a known failure for very large inputs (issue ' - '#199). Setting this flag to 1 will avoid this extra sharding.' - 'It is recommended to use 20 for loading large inputs without ' - 'merging. Use a smaller value (2 or 3) if both merging and ' - 'optimize_for_large_inputs are enabled.')) + help=('This flag is deprecated and will be removed in future ' + 'releases.')) parser.add_argument( '--null_numeric_value_replacement', type=int, diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 525dc047b..fcb8308c2 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -73,10 +73,15 @@ def parse_args(argv, command_line_options): known_args, pipeline_args = parser.parse_known_args(argv) for transform_options in options: transform_options.validate(known_args) - _raise_error_on_invalid_flags(pipeline_args) + _raise_error_on_invalid_flags( + pipeline_args, + known_args.output_table if hasattr(known_args, 'output_table') else None) if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'): known_args.all_patterns = _get_all_patterns( known_args.input_pattern, known_args.input_file) + + # Enable new BQ sink experiment. + pipeline_args += ['--experiment', 'use_beam_bq_sink'] return known_args, pipeline_args @@ -301,8 +306,8 @@ def write_headers(merged_header, file_path): vcf_header_io.WriteVcfHeaders(file_path)) -def _raise_error_on_invalid_flags(pipeline_args): - # type: (List[str]) -> None +def _raise_error_on_invalid_flags(pipeline_args, output_table): + # type: (List[str], Any) -> None """Raises an error if there are unrecognized flags.""" parser = argparse.ArgumentParser() for cls in pipeline_options.PipelineOptions.__subclasses__(): @@ -315,6 +320,14 @@ def _raise_error_on_invalid_flags(pipeline_args): not known_pipeline_args.setup_file): raise ValueError('The --setup_file flag is required for DataflowRunner. ' 'Please provide a path to the setup.py file.') + if output_table: + if (not hasattr(known_pipeline_args, 'temp_location') or + not known_pipeline_args.temp_location): + raise ValueError('--temp_location is required for BigQuery imports.') + if not known_pipeline_args.temp_location.startswith('gs://'): + raise ValueError( + '--temp_location must be valid GCS location for BigQuery imports') + def is_pipeline_direct_runner(pipeline): diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index a5633b615..181c1b851 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -94,21 +94,31 @@ def test_fail_on_invalid_flags(self): 'gcp-variant-transforms-test', '--staging_location', 'gs://integration_test_runs/staging'] - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, None) # Add Dataflow runner (requires --setup_file). pipeline_args.extend(['--runner', 'DataflowRunner']) with self.assertRaisesRegexp(ValueError, 'setup_file'): - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, None) # Add setup.py (required for Variant Transforms run). This is now valid. pipeline_args.extend(['--setup_file', 'setup.py']) - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, None) + + with self.assertRaisesRegexp(ValueError, '--temp_location is required*'): + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') + + pipeline_args.extend(['--temp_location', 'wrong_gcs']) + with self.assertRaisesRegexp(ValueError, '--temp_location must be valid*'): + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') + + pipeline_args = pipeline_args[:-1] + ['gs://valid_bucket/temp'] + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') # Add an unknown flag. pipeline_args.extend(['--unknown_flag', 'somevalue']) with self.assertRaisesRegexp(ValueError, 'Unrecognized.*unknown_flag'): - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') def test_get_compression_type(self): vcf_metadata_list = [filesystem.FileMetadata(path, size) for diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json index 871f3fb01..48ce40d43 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json @@ -10,7 +10,6 @@ "worker_machine_type": "n1-standard-64", "max_num_workers": "64", "num_workers": "20", - "num_bigquery_write_shards": "2", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json index cd1b958bb..7bfd502d8 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json @@ -10,7 +10,6 @@ "worker_machine_type": "n1-standard-16", "max_num_workers": "20", "num_workers": "20", - "num_bigquery_write_shards": "2", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], @@ -69,4 +68,3 @@ ] } ] - diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json index c66767591..6b664dd11 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json @@ -8,7 +8,6 @@ "worker_machine_type": "n1-standard-16", "max_num_workers": "20", "num_workers": "20", - "num_bigquery_write_shards": "20", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/test_non_splittable_gzip.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/test_non_splittable_gzip.json deleted file mode 100644 index cc2d7a8ab..000000000 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/test_non_splittable_gzip.json +++ /dev/null @@ -1,23 +0,0 @@ -[ - { - "test_name": "test-non-splittable-gzip", - "table_name": "test_non_splittable_gzip", - "input_pattern": "gs://gcp-variant-transforms-testfiles/large_tests/non-splittable-gzip/**.bgz", - "sharding_config_path": "gcp_variant_transforms/data/sharding_configs/homo_sapiens_default.yaml", - "runner": "DataflowRunner", - "assertion_configs": [ - { - "query": ["NUM_ROWS_QUERY"], - "expected_result": {"num_rows": 409932} - }, - { - "query": ["SUM_START_QUERY"], - "expected_result": {"sum_start": 32190612292607} - }, - { - "query": ["SUM_END_QUERY"], - "expected_result": {"sum_end": 32190612813885} - } - ] - } -] diff --git a/gcp_variant_transforms/transforms/limit_write.py b/gcp_variant_transforms/transforms/limit_write.py deleted file mode 100644 index 0f910dc71..000000000 --- a/gcp_variant_transforms/transforms/limit_write.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A PTransform to limit BQ sink from producing too many files (shards) - -This is a work around to avoid the following failure: - BigQuery execution failed., Error: - Message: Too many sources provided: xxxxx. Limit is 10000. -To limit sink we generate a random dummy key and group by input elements (which -are BigQuery rows) based on that key before writing them to output table. -""" - -from __future__ import absolute_import - -import random -import apache_beam as beam - - -class _RoundRobinKeyFn(beam.DoFn): - def __init__(self, count): - # type: (int) -> None - self._count = count - # This attribute will be properly initiated at each worker by start_bundle() - self._counter = 0 - - def start_bundle(self): - # type: () -> None - self._counter = random.randint(0, self._count - 1) - - def process(self, element): - self._counter += 1 - if self._counter >= self._count: - self._counter -= self._count - yield self._counter, element - - -class LimitWrite(beam.PTransform): - def __init__(self, count): - # type: (int) -> None - self._count = count - - def expand(self, pcoll): - return (pcoll - | beam.ParDo(_RoundRobinKeyFn(self._count)) - | beam.GroupByKey() - | beam.FlatMap(lambda kv: kv[1])) diff --git a/gcp_variant_transforms/transforms/limit_write_test.py b/gcp_variant_transforms/transforms/limit_write_test.py deleted file mode 100644 index 980e1cc87..000000000 --- a/gcp_variant_transforms/transforms/limit_write_test.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Tests for limit_write module.""" - -from __future__ import absolute_import - -import unittest - -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms import Create - -from gcp_variant_transforms.beam_io import vcfio -from gcp_variant_transforms.transforms import limit_write - - -class LimitWriteTest(unittest.TestCase): - """Test cases for the ``LimitWrite`` PTransform.""" - - def _get_sample_variants(self): - variant1 = vcfio.Variant( - reference_name='chr19', start=11, end=12, reference_bases='C') - variant2 = vcfio.Variant( - reference_name='20', start=123, end=125, reference_bases='CT') - variant3 = vcfio.Variant( - reference_name='20', start=None, end=None, reference_bases=None) - variant4 = vcfio.Variant( - reference_name='20', start=123, end=125, reference_bases='CT') - return [variant1, variant2, variant3, variant4] - - - def test_limit_write_default_shard_limit(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(4500)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() - - def test_limit_write_shard_limit_4(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(4)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() - - def test_limit_write_shard_limit_1(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(1)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() diff --git a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py index 7a4fb804f..6e7cc120b 100644 --- a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py +++ b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py @@ -73,13 +73,11 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False): def expand(self, pcoll): return (pcoll | 'ConvertSampleInfoToBigQueryTableRow' >> beam.ParDo( - ConvertSampleInfoToRow(self._sample_name_encoding)) - | 'WriteSampleInfoToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( + ConvertSampleInfoToRow(self.sample_name_encoding)) + | 'WriteSampleInfoToBigQuery' >> beam.io.WriteToBigQuery( self._output_table, schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE)))) + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index 2d01dc4cf..a9697c3ec 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -16,7 +16,6 @@ from __future__ import absolute_import -import random from typing import Dict, List # pylint: disable=unused-import import apache_beam as beam @@ -29,13 +28,6 @@ from gcp_variant_transforms.libs import processed_variant from gcp_variant_transforms.libs import vcf_field_conflict_resolver from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import -from gcp_variant_transforms.transforms import limit_write - - -# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK, -# see: https://issues.apache.org/jira/browse/BEAM-2801 -# This has to be less than 10000. -_WRITE_SHARDS_LIMIT = 1000 @beam.typehints.with_input_types(processed_variant.ProcessedVariant) @@ -71,7 +63,6 @@ def __init__( update_schema_on_append=False, # type: bool allow_incompatible_records=False, # type: bool omit_empty_sample_calls=False, # type: bool - num_bigquery_write_shards=1, # type: int null_numeric_value_replacement=None # type: int ): # type: (...) -> None @@ -88,8 +79,6 @@ def __init__( + schema if there is a mismatch. omit_empty_sample_calls: If true, samples that don't have a given call will be omitted. - num_bigquery_write_shards: If > 1, we will limit number of sources which - are used for writing to the output BigQuery table. null_numeric_value_replacement: the value to use instead of null for numeric (float/int/long) lists. For instance, [0, None, 1] will become [0, `null_numeric_value_replacement`, 1]. If not set, the value will set @@ -109,7 +98,6 @@ def __init__( self._allow_incompatible_records = allow_incompatible_records self._omit_empty_sample_calls = omit_empty_sample_calls - self._num_bigquery_write_shards = num_bigquery_write_shards if update_schema_on_append: bigquery_util.update_bigquery_schema_on_append(self._schema.fields, self._output_table) @@ -120,35 +108,11 @@ def expand(self, pcoll): self._bigquery_row_generator, self._allow_incompatible_records, self._omit_empty_sample_calls)) - if self._num_bigquery_write_shards > 1: - # We split data into self._num_bigquery_write_shards random partitions - # and then write each part to final BQ by appending them together. - # Combined with LimitWrite transform, this will avoid the BQ failure. - bq_row_partitions = bq_rows | beam.Partition( - lambda _, n: random.randint(0, n - 1), - self._num_bigquery_write_shards) - bq_writes = [] - for i in range(self._num_bigquery_write_shards): - bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >> - limit_write.LimitWrite(_WRITE_SHARDS_LIMIT)) - bq_writes.append( - bq_rows | 'WriteToBigQuery' + str(i) >> - beam.io.Write(beam.io.BigQuerySink( + return (bq_rows + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( self._output_table, schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND)))) - return bq_writes - else: - return (bq_rows - | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( - self._output_table, - schema=self._schema, - create_disposition=( - beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE)))) + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 056a300b7..a3853f4bf 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -389,7 +389,7 @@ def _run_annotation_pipeline(known_args, pipeline_args): def _create_sample_info_table(pipeline, # type: beam.Pipeline pipeline_mode, # type: PipelineModes - known_args, # type: argparse.Namespace + known_args # type: argparse.Namespace ): # type: (...) -> None headers = pipeline_common.read_headers( @@ -409,7 +409,6 @@ def run(argv=None): logging.info('Command: %s', ' '.join(argv or sys.argv)) known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) - if known_args.auto_flags_experiment: _get_input_dimensions(known_args, pipeline_args) @@ -483,6 +482,7 @@ def run(argv=None): file_to_write.write(schema_json) for i in range(num_shards): + table_suffix = '' table_suffix = sharding.get_output_table_suffix(i) table_name = sample_info_table_schema_generator.compose_table_name( known_args.output_table, table_suffix) @@ -494,11 +494,11 @@ def run(argv=None): update_schema_on_append=known_args.update_schema_on_append, allow_incompatible_records=known_args.allow_incompatible_records, omit_empty_sample_calls=known_args.omit_empty_sample_calls, - num_bigquery_write_shards=known_args.num_bigquery_write_shards, null_numeric_value_replacement=( known_args.null_numeric_value_replacement))) if known_args.generate_sample_info_table: - _create_sample_info_table(pipeline, pipeline_mode, known_args) + _create_sample_info_table( + pipeline, pipeline_mode, known_args) if known_args.output_avro_path: # TODO(bashir2): Add an integration test that outputs to Avro files and