Skip to content

Commit 78c79bd

Browse files
authored
[Deployment Revisited][Staging] Fix runtime errors on the job exporter cron (#4837)
### Motivation The following error scenarios were not caught in unit tests: * During export, blob for which there is a key, but no corresponding object in the blobs bucket. This throws an Exception due to a 404 in the GCS API call. To address this, the blob will not be uploaded during export, and the key will be set to None during import * During exports (and imports), data bundles that reference a non existing bucket would result in a 404 from the GCS API, throwing a fatal exception. To address this, the contents folder export will be skipped, and the rsync will be skipped during import. Also, there was no error treatment for RSync, so we do not know if the operation was successful. The RSync methods were implementede to return True on successful completion, and false on failure. If the operation fails, an Exception is raised to force the cronjob to be retried. ### Carried over keys and project id When serialized from a previous database, the Key entity will carry over the GCP project name, which will result in the following exception during the put call: ``` detail: "mismatched databases within request: <unknown!>~clusterfuzz-development vs. <unknown!>~cluster-fuzz" , detail: "/Datastore.Commit to [2002:a05:6600:760b:b0:3ba:dcf:6bcb]:4001 : APP_ERROR(1) mismatched databases within request: <unknown!>~clusterfuzz-development vs. <unknown!>~cluster-fuzz" ] ``` For this reason, we cannot simply deserialize and put. The fields have to be manually retrieved and assigned to the new entity, and the key will be auto generated on every put. This was implemented by adding the '_persist_entity' method. ### Other changes The public method 'get_bucket' was added to the storage module, so we can know if a GCS bucket already exists. This was already used internally, it is now exposed in the storage interface. ### Testing strategy Reusing the test suite already in place, and doing a manual export/import from internal to chrome-development. The script finished successfully in both scenarios, and the output looks sane in dev
1 parent 0d36dce commit 78c79bd

File tree

2 files changed

+65
-12
lines changed

2 files changed

+65
-12
lines changed

src/clusterfuzz/_internal/cron/job_exporter.py

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ def __init__(self):
5555
self._runner = gsutil.GSUtilRunner()
5656

5757
def rsync(self, source: str, target: str):
58-
self._runner.rsync(f'gs://{source}', target)
58+
"""Rsyncs a source to a target destination. Returns True if
59+
successful, False if there was any failure. Considers successful
60+
any gsutil execution with a 0 return code."""
61+
rsync_process_output = self._runner.rsync(source, target)
62+
return_code = rsync_process_output.return_code
63+
return return_code == 0
5964

6065

6166
class StorageRSync(RSyncClient):
@@ -69,7 +74,8 @@ def rsync(self, source: str, target: str):
6974
"""Lists all files under the source path, and uploads
7075
them to the target path. Since list_blobs returns
7176
fully qualified names, the source prefix is trimmed
72-
to recover file names."""
77+
to recover file names. Returns True on success, False
78+
on failure."""
7379
pattern = r"^gs://([^/]+)(?:/.*)?$"
7480
for blob in storage.list_blobs(source):
7581
bucket_name_match = re.match(pattern, source)
@@ -92,7 +98,9 @@ def rsync(self, source: str, target: str):
9298
blob_file_name = blob
9399

94100
blob_target_path = f'{target}/{blob_file_name}'
95-
storage.copy_blob(f'{source}/{blob_file_name}', blob_target_path)
101+
if not storage.copy_blob(f'{source}/{blob_file_name}', blob_target_path):
102+
return False
103+
return True
96104

97105

98106
class EntityMigrator:
@@ -135,6 +143,10 @@ def _export_blobs(self, entity: ndb.Model, bucket_prefix: str):
135143
blob_id = getattr(entity, blobstore_key, None)
136144
if blob_id:
137145
blob_gcs_path = blobs.get_gcs_path(blob_id)
146+
if not storage.get(blob_gcs_path):
147+
logs.warning(f'{blobstore_key} with id {blob_id} not present '
148+
f'for {entity.name}, skipping.')
149+
continue
138150
blob_destination_path = f'{bucket_prefix}/{blobstore_key}'
139151
storage.copy_blob(blob_gcs_path, blob_destination_path)
140152

@@ -149,8 +161,16 @@ def _export_data_bundle_contents_if_applicable(self, entity: ndb.Model,
149161
logs.info(
150162
f'DataBundle {entity.name} has no related gcs bucket, skipping.')
151163
return
164+
if not storage.get_bucket(entity.bucket_name):
165+
logs.warning(f'Bucket {entity.bucket_name} missing for '
166+
f'data bundle {entity.name}, skipping.')
167+
return
168+
source_location = f'gs://{entity.bucket_name}'
152169
target_location = f'{bucket_prefix}/contents'
153-
self._rsync_client.rsync(f'gs://{entity.bucket_name}', target_location)
170+
rsync_succeeded = self._rsync_client.rsync(source_location, target_location)
171+
if not rsync_succeeded:
172+
raise ValueError(
173+
f'Failed to rsync {source_location} to {target_location}.')
154174

155175
def _export_entity(self, entity: ndb.Model, entity_bucket_prefix: str,
156176
entity_name: str):
@@ -193,9 +213,10 @@ def _import_blobs(self, entity: ndb.Model, entity_name: str,
193213
f'{blobstore_key} missing for {entity_name}, skipping blob import.')
194214
continue
195215
if not storage.get(source_blob_location):
196-
raise ValueError(
197-
f'Absent blob for {blobstore_key} in {entity_name}, it '
198-
'should be present.')
216+
logs.warning(f'Absent blob for {blobstore_key} in {entity_name}, it '
217+
'was expected be present. Marked as None and skipping.')
218+
new_blob_ids[blobstore_key] = None
219+
continue
199220
new_blob_id = blobs.generate_new_blob_name()
200221
target_blob_location = f'gs://{storage.blobs_bucket()}/{new_blob_id}'
201222
if not storage.copy_blob(source_blob_location, target_blob_location):
@@ -217,11 +238,36 @@ def _substitute_environment_string(self, env_string: str | None):
217238

218239
def _import_data_bundle_contents(self, source_location: str,
219240
bundle_name: str):
241+
"""Imports data bundle contents from the export bucket to the new
242+
data bundle bucket in the target project. Skips if the contents
243+
are absent during export, and throws an exception if the rsync
244+
call failed."""
220245
new_bundle_bucket = data_handler.get_data_bundle_bucket_name(bundle_name)
221246
storage.create_bucket_if_needed(new_bundle_bucket)
222-
self._rsync_client.rsync(source_location, f'gs://{new_bundle_bucket}')
247+
# There is no helper method to figure out if a folder exists, resort to
248+
# checking if there are blobs under the path.
249+
if not list(storage.get_blobs(source_location)):
250+
logs.warning(f'No source content for data bundle {bundle_name},'
251+
' skipping content import.')
252+
return new_bundle_bucket
253+
target_location = f'gs://{new_bundle_bucket}'
254+
rsync_result = self._rsync_client.rsync(source_location, target_location)
255+
if not rsync_result:
256+
raise ValueError(
257+
f'Failed to rsync data bundle contents from {source_location} '
258+
f'to {target_location}.')
223259
return new_bundle_bucket
224260

261+
def _persist_entity(self, entity: ndb.Model):
262+
"""A raw deserialization and put() call will cause an exception, since the
263+
project from which the entity was serialized will mistmatch the project to
264+
which we are writing it to datastore. This forces creation of a new
265+
database key, circumventing the issue."""
266+
entity_to_persist = self._target_cls()
267+
for key, value in entity.to_dict().items():
268+
setattr(entity_to_persist, key, value)
269+
entity_to_persist.put()
270+
225271
def _import_entity(self, entity_name: str, entity_location: str):
226272
"""Imports entity into datastore, blobs, databundle contents
227273
and substitutes environment strings, if applicable."""
@@ -251,12 +297,14 @@ def _import_entity(self, entity_name: str, entity_location: str):
251297

252298
# Do not assume that name is a primary key, avoid having two
253299
# different keys with the same name.
254-
preexisting_entity = self._target_cls.query(
255-
self._target_cls.name == entity_name).get()
256-
if preexisting_entity:
300+
preexisting_entities = list(
301+
self._target_cls.query(self._target_cls.name == entity_name))
302+
logs.info(f'Found {len(preexisting_entities)} of type {self._entity_type}'
303+
f' and name {entity_name}, deleting.')
304+
for preexisting_entity in preexisting_entities:
257305
preexisting_entity.key.delete()
258306

259-
entity_to_import.put()
307+
self._persist_entity(entity_to_import)
260308

261309
def import_entities(self):
262310
"""Iterates over all entitiy names declared in the last export, and imports

src/clusterfuzz/_internal/google_cloud_utils/storage.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,11 @@ def set_bucket_iam_policy(client, bucket_name, iam_policy):
940940
return None
941941

942942

943+
def get_bucket(bucket_name: str):
944+
provider = _provider()
945+
return provider.get_bucket(bucket_name)
946+
947+
943948
def create_bucket_if_needed(bucket_name,
944949
object_lifecycle=None,
945950
cors=None,

0 commit comments

Comments
 (0)