Skip to content

fix(#1315): retry observation_flat row-count to avoid timing flake #1378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 57 additions & 26 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

set -e

# -------------------------------------------------------------------
# Shared helper for robust Parquet row-count with retry/back-off
# -------------------------------------------------------------------
source "$(dirname "$0")/../lib/parquet_utils.sh"

#################################################
# Prints the usage
#################################################
Expand Down Expand Up @@ -190,7 +195,7 @@ function wait_for_completion() {
local runtime="15 minute"
local end_time=$(date -ud "$runtime" +%s)

while [[ $(date -u +%s) -le $end_time ]]
while [[ $(date -u +%s) -le ${end_time} ]]
do
local pipeline_status=$(curl --location --request GET "${PIPELINE_CONTROLLER_URL}/status?" \
--connect-timeout 5 \
Expand Down Expand Up @@ -238,25 +243,51 @@ function check_parquet() {
fi

# check whether output directory has received parquet files.
if [[ "$(ls -A $output)" ]]
if [[ "$(ls -A "${output}")" ]]
then
local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Patient/" | awk '{print $3}')
local total_encounters=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Encounter/" | awk '{print $3}')
local total_observations=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Observation/" | awk '{print $3}')

local total_patient_flat=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/patient_flat/" | awk '{print $3}')
local total_encounter_flat=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/encounter_flat/" | awk '{print $3}')
local total_obs_flat=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/observation_flat/" | awk '{print $3}')

print_message "Total patients: $total_patients"
print_message "Total encounters: $total_encounters"
print_message "Total observations: $total_observations"
# ------------------------------------------------------------------
# Row-counts with retry (shared helper)
# ------------------------------------------------------------------
local total_patients
total_patients=$(retry_rowcount \
"${output}/*/Patient/" \
"${TOTAL_TEST_PATIENTS}" \
"patients") || true

local total_encounters
total_encounters=$(retry_rowcount \
"${output}/*/Encounter/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"encounters") || true

local total_observations
total_observations=$(retry_rowcount \
"${output}/*/Observation/" \
"${TOTAL_TEST_OBS}" \
"observations") || true

local total_patient_flat
total_patient_flat=$(retry_rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/patient_flat/" \
"${TOTAL_VIEW_PATIENTS}" \
"patient_flat") || true

local total_encounter_flat
total_encounter_flat=$(retry_rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/encounter_flat/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"encounter_flat") || true

local total_obs_flat
total_obs_flat=$(retry_rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/observation_flat/":"${output}/*/observation_flat/" \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check ${output}/*/observation_flat/ too? All flat views (including observations) should be in a VIEWS_TIMESTAMP_* subdir.

"${TOTAL_TEST_OBS}" \
"observation_flat") || true
# ------------------------------------------------------------------

print_message "Total patients: ${total_patients}"
print_message "Total encounters: ${total_encounters}"
print_message "Total observations: ${total_observations}"

print_message "Total patient flat rows: ${total_patient_flat}"
print_message "Total encounter flat rows: ${total_encounter_flat}"
Expand All @@ -271,12 +302,12 @@ function check_parquet() {
print_message "Pipeline transformation successfully completed."
else
print_message "Mismatch in count of records"
print_message "Actual total patients: $total_patients, expected total: $TOTAL_TEST_PATIENTS"
print_message "Actual total encounters: $total_encounters, expected total: $TOTAL_TEST_ENCOUNTERS"
print_message "Total observations: $total_observations, expected total: $TOTAL_TEST_OBS"
print_message "Actual total materialized view patients: $total_patient_flat, expected total: $TOTAL_VIEW_PATIENTS"
print_message "Actual total materialized view encounters: $total_encounter_flat, expected total: $TOTAL_TEST_ENCOUNTERS"
print_message "Actual total materialized view observations: $total_obs_flat, expected total: $TOTAL_TEST_OBS"
print_message "Actual total patients: ${total_patients}, expected total: ${TOTAL_TEST_PATIENTS}"
print_message "Actual total encounters: ${total_encounters}, expected total: ${TOTAL_TEST_ENCOUNTERS}"
print_message "Total observations: ${total_observations}, expected total: ${TOTAL_TEST_OBS}"
print_message "Actual total materialized view patients: ${total_patient_flat}, expected total: ${TOTAL_VIEW_PATIENTS}"
print_message "Actual total materialized view encounters: ${total_encounter_flat}, expected total: ${TOTAL_TEST_ENCOUNTERS}"
print_message "Actual total materialized view observations: ${total_obs_flat}, expected total: ${TOTAL_TEST_OBS}"
exit 2
fi
else
Expand All @@ -292,7 +323,7 @@ function check_parquet() {
# PARQUET_SUBDIR
#######################################################################
function clear() {
rm -rf $HOME_PATH/$PARQUET_SUBDIR/*.json
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}"/*.json
}

#######################################################################
Expand Down
73 changes: 73 additions & 0 deletions e2e-tests/lib/parquet_utils.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env bash
# Utility: robust Parquet row-count with retry/back-off.
# Source from validation scripts.

set -euo pipefail

# retry_rowcount <globs> <expected> <label>
# globs – colon-separated shell globs to Parquet folders
# expected – integer row count we expect to see
# label – metric name for log messages
#
# Prints the final count on stdout.
# Returns 0 if expected count is reached; 1 otherwise.

retry_rowcount() {
local globs="$1"
local expected="$2"
local label="$3"

# Allow CI to override retry cadence without touching code
local max_retries=${ROWCOUNT_MAX_RETRIES:-5}
local sleep_secs=${ROWCOUNT_SLEEP_SECS:-5}

local retries=0
local raw_count=0
local final_count=0

IFS=':' read -r -a paths <<<"${globs}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented in the other file whether this multiple paths option is really needed. I think it is not, and if that is the case, then I suggest the we drop this : separator option and simplify this function.


while true; do
raw_count=0

# ── 1. Find a path that actually contains files
for p in "${paths[@]}"; do
shopt -s nullglob
local files=( "${p}" )
shopt -u nullglob

if [[ ${#files[@]} -gt 0 ]]; then
raw_count=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${p}" 2>/dev/null | awk '{print $3}')
break
fi
done

# ── 2. Normalise raw_count
if [[ -z "${raw_count}" || ! "${raw_count}" =~ ^[0-9]+$ ]]; then
final_count=0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please log an error message in this case.

else
final_count="${raw_count}"
fi

# ── 3. Success?
if [[ "${final_count}" -eq "${expected}" ]]; then
echo "${final_count}"
return 0
fi

# ── 4.Optional Fast-fail if no files ever matched on the *first* pass -- this can be implemented in future


# ── 5. Give up?
if [[ "${retries}" -ge "${max_retries}" ]]; then
echo "${final_count}"
return 1
fi

# ── 6. Sleep & retry
retries=$((retries + 1))
echo "E2E TEST: [${label}] raw=${raw_count}, expected=${expected} — retry ${retries}/${max_retries} in ${sleep_secs}s" >&2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please break long lines, here and everywhere else, unless there is good reason not to do so (style guide rule).

sleep "${sleep_secs}"
done
}
Loading