From 288ab308e7803abdac6812b3069f12aae81173ff Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 29 Jan 2025 18:39:18 +0400 Subject: [PATCH 1/2] Update mobile gaming groovy scripts --- .../groovy/mobilegaming-java-dataflow.groovy | 21 ++++++++++++------- .../groovy/mobilegaming-java-direct.groovy | 17 +++++++++------ 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/release/src/main/groovy/mobilegaming-java-dataflow.groovy b/release/src/main/groovy/mobilegaming-java-dataflow.groovy index bb0b76bd6757..60853d5542f6 100644 --- a/release/src/main/groovy/mobilegaming-java-dataflow.groovy +++ b/release/src/main/groovy/mobilegaming-java-dataflow.groovy @@ -98,15 +98,20 @@ class LeaderBoardRunner { def isSuccess = false String query_result = "" while ((System.currentTimeMillis() - startTime) / 60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) { - tables = t.run "bq query SELECT table_id FROM ${t.bqDataset()}.__TABLES_SUMMARY__" - if (tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")) { - query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}:${ - t.bqDataset() - }.leaderboard_${runner}_user] LIMIT 10\"""" - if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) { - isSuccess = true - break + try { + tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${t.bqDataset()}.INFORMATION_SCHEMA.TABLES" + if (tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")) { + query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}:${ + t.bqDataset() + }.leaderboard_${runner}_user] LIMIT 10\"""" + if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) { + isSuccess = true + break + } } + } catch (Exception e) { + println "Warning: Exception while checking tables: ${e.message}" + println "Retrying..." } println "Waiting for pipeline to produce more results..." sleep(60000) // wait for 1 min diff --git a/release/src/main/groovy/mobilegaming-java-direct.groovy b/release/src/main/groovy/mobilegaming-java-direct.groovy index 3c6f4ca01a6c..8622a8a4a6cc 100644 --- a/release/src/main/groovy/mobilegaming-java-direct.groovy +++ b/release/src/main/groovy/mobilegaming-java-direct.groovy @@ -87,13 +87,18 @@ def startTime = System.currentTimeMillis() def isSuccess = false String query_result = "" while((System.currentTimeMillis() - startTime)/60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) { - tables = t.run "bq query SELECT table_id FROM ${t.bqDataset()}.__TABLES_SUMMARY__" - if(tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")){ - query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}:${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\"""" - if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){ - isSuccess = true - break + try { + tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${t.bqDataset()}.INFORMATION_SCHEMA.TABLES" + if(tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")) { + query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}.${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\"""" + if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){ + isSuccess = true + break + } } + } catch (Exception e) { + println "Warning: Exception while checking tables: ${e.message}" + println "Retrying..." } println "Waiting for pipeline to produce more results..." sleep(60000) // wait for 1 min From ceeffa676ae9c96e4437475560ca73f32b6b579b Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 29 Jan 2025 23:29:27 +0400 Subject: [PATCH 2/2] Add retry --- .../beam/examples/complete/game/utils/WriteToBigQuery.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index dadc974e62c3..eef4bc932682 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -129,7 +130,8 @@ public PDone expand(PCollection teamAndScore) { .to(getTable(projectId, datasetId, tableName)) .withSchema(getSchema()) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_APPEND)); + .withWriteDisposition(WriteDisposition.WRITE_APPEND) + .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())); return PDone.in(teamAndScore.getPipeline()); }