diff --git a/app/lib/admin/actions/merge_moderated_package_into_existing.dart b/app/lib/admin/actions/merge_moderated_package_into_existing.dart index 06ff228937..bc996dc8fd 100644 --- a/app/lib/admin/actions/merge_moderated_package_into_existing.dart +++ b/app/lib/admin/actions/merge_moderated_package_into_existing.dart @@ -62,7 +62,7 @@ Fails if that package has no existing Package entity. tx.insert(p!); tx.delete(mpKey); }); - await purgePackageCache(packageName); + await triggerPackagePostUpdates(packageName).future; return { 'package': packageName, diff --git a/app/lib/admin/actions/moderate_package.dart b/app/lib/admin/actions/moderate_package.dart index 48650a88d3..a083645d15 100644 --- a/app/lib/admin/actions/moderate_package.dart +++ b/app/lib/admin/actions/moderate_package.dart @@ -4,11 +4,10 @@ import '../../admin/backend.dart'; import '../../admin/models.dart'; -import '../../package/api_export/api_exporter.dart'; import '../../package/backend.dart'; import '../../package/models.dart'; +import '../../scorecard/backend.dart'; import '../../shared/datastore.dart'; -import '../../task/backend.dart'; import 'actions.dart'; final moderatePackage = AdminAction( @@ -108,11 +107,7 @@ Future> adminMarkPackageVisibility( return pkg; }); - // make sure visibility cache is updated immediately - await purgePackageCache(package); - - // sync exported API(s) - await apiExporter.synchronizePackage(package, forceDelete: true); + await triggerPackagePostUpdates(package, exportForceDelete: true).future; // retract or re-populate public archive files await packageBackend.tarballStorage.updatePublicArchiveBucket( @@ -121,8 +116,7 @@ Future> adminMarkPackageVisibility( deleteIfOlder: Duration.zero, ); - await taskBackend.trackPackage(package); - await purgePackageCache(package); + await purgeScorecardData(package, p2!.latestVersion!, isLatest: true); } return { diff --git a/app/lib/admin/actions/moderate_package_versions.dart b/app/lib/admin/actions/moderate_package_versions.dart index 21524c606e..dc3c558411 100644 --- a/app/lib/admin/actions/moderate_package_versions.dart +++ b/app/lib/admin/actions/moderate_package_versions.dart @@ -5,13 +5,11 @@ import 'package:_pub_shared/utils/sdk_version_cache.dart'; import 'package:clock/clock.dart'; -import '../../package/api_export/api_exporter.dart'; import '../../package/backend.dart'; import '../../package/models.dart'; import '../../scorecard/backend.dart'; import '../../shared/datastore.dart'; import '../../shared/versions.dart'; -import '../../task/backend.dart'; import '../backend.dart'; import '../models.dart'; @@ -144,11 +142,7 @@ Future> adminMarkPackageVersionVisibility( return v; }); - // make sure visibility cache is updated immediately - await purgePackageCache(package); - - // sync exported API(s) - await apiExporter.synchronizePackage(package, forceDelete: true); + await triggerPackagePostUpdates(package, exportForceDelete: true).future; // retract or re-populate public archive files await packageBackend.tarballStorage.updatePublicArchiveBucket( @@ -157,8 +151,6 @@ Future> adminMarkPackageVersionVisibility( deleteIfOlder: Duration.zero, ); - await taskBackend.trackPackage(package); - await purgePackageCache(package); await purgeScorecardData(package, version, isLatest: true); } diff --git a/app/lib/admin/actions/package_version_retraction.dart b/app/lib/admin/actions/package_version_retraction.dart index 283f2876d4..6c7795f6ff 100644 --- a/app/lib/admin/actions/package_version_retraction.dart +++ b/app/lib/admin/actions/package_version_retraction.dart @@ -88,7 +88,7 @@ value of `set-retracted`, which should either be `true` or `false`. 'isRetracted': pv.isRetracted, }; }); - await purgePackageCache(packageName); + triggerPackagePostUpdates(packageName); return { 'before': before, diff --git a/app/lib/admin/actions/publisher_package_remove.dart b/app/lib/admin/actions/publisher_package_remove.dart index 6b6335f041..90a9668dfe 100644 --- a/app/lib/admin/actions/publisher_package_remove.dart +++ b/app/lib/admin/actions/publisher_package_remove.dart @@ -59,7 +59,8 @@ If the publisher has no members, the package will end up without uploaders. ), ); }); - await purgePackageCache(packageName); + triggerPackagePostUpdates(packageName, + skipTask: true, skipVersionsExport: true); await purgePublisherCache(publisherId: currentPublisherId); return { 'previousPublisher': currentPublisherId, diff --git a/app/lib/admin/backend.dart b/app/lib/admin/backend.dart index 6563a9ab7a..73df0a5ff2 100644 --- a/app/lib/admin/backend.dart +++ b/app/lib/admin/backend.dart @@ -23,7 +23,7 @@ import '../account/models.dart'; import '../admin/models.dart'; import '../audit/models.dart'; import '../package/backend.dart' - show checkPackageVersionParams, packageBackend, purgePackageCache; + show checkPackageVersionParams, packageBackend, triggerPackagePostUpdates; import '../package/models.dart'; import '../publisher/models.dart'; import '../scorecard/backend.dart'; @@ -32,7 +32,6 @@ import '../shared/configuration.dart'; import '../shared/datastore.dart'; import '../shared/exceptions.dart'; import '../shared/versions.dart'; -import '../task/backend.dart'; import 'actions/actions.dart' show AdminAction; import 'tools/delete_all_staging.dart'; import 'tools/list_tools.dart'; @@ -400,7 +399,7 @@ class AdminBackend { await _db .deleteWithQuery(_db.query(ancestorKey: packageKey)); - await purgePackageCache(packageName); + triggerPackagePostUpdates(packageName); _logger.info('Package "$packageName" got successfully removed.'); return ( @@ -448,7 +447,7 @@ class AdminBackend { caller, tx, p, pv, isRetracted); } }); - await purgePackageCache(packageName); + triggerPackagePostUpdates(packageName); } } @@ -517,10 +516,8 @@ class AdminBackend { tx.insert(package); }); - await purgePackageCache(packageName); await purgeScorecardData(packageName, version, isLatest: true); - // trigger (eventual) re-analysis - await taskBackend.trackPackage(packageName); + triggerPackagePostUpdates(packageName); return ( deletedPackageVersions: deletedPackageVersions, deletedPackageVersionInfos: deletedPackageVersionInfos.deleted, diff --git a/app/lib/admin/tools/package_publisher.dart b/app/lib/admin/tools/package_publisher.dart index b88ce47792..0128fd864d 100644 --- a/app/lib/admin/tools/package_publisher.dart +++ b/app/lib/admin/tools/package_publisher.dart @@ -42,7 +42,8 @@ Future executeSetPackagePublisher(List args) async { tx.insert(pkg); }); await purgePublisherCache(publisherId: publisherId); - await purgePackageCache(packageName); + triggerPackagePostUpdates(packageName, + skipTask: true, skipVersionsExport: true); if (currentPublisherId != null) { await purgePublisherCache(publisherId: currentPublisherId); } diff --git a/app/lib/package/backend.dart b/app/lib/package/backend.dart index f95cc7ea1c..ca9e8a4b5a 100644 --- a/app/lib/package/backend.dart +++ b/app/lib/package/backend.dart @@ -381,7 +381,11 @@ class PackageBackend { return true; }); if (updated) { - await purgePackageCache(package); + triggerPackagePostUpdates( + package, + skipTask: true, + skipExport: true, + ); } return updated; } @@ -480,9 +484,7 @@ class PackageBackend { options: optionsChanges, )); }); - await purgePackageCache(package); - await taskBackend.trackPackage(package); - await apiExporter.synchronizePackage(package); + triggerPackagePostUpdates(package, skipVersionsExport: true); } /// Updates [options] on [package]/[version], assuming the current user @@ -520,10 +522,9 @@ class PackageBackend { authenticatedUser, tx, p, pv, options.isRetracted!); } }); - await purgePackageCache(package); await purgeScorecardData(package, version, isLatest: pkg.latestVersion == version); - await apiExporter.synchronizePackage(package); + triggerPackagePostUpdates(package); } /// Verifies an update to the credential-less publishing settings and @@ -780,7 +781,6 @@ class PackageBackend { return _asPackagePublisherInfo(package); }); await purgePublisherCache(publisherId: request.publisherId); - await purgePackageCache(packageName); if (email != null) { await emailBackend.trySendOutgoingEmail(email!); @@ -788,7 +788,11 @@ class PackageBackend { if (currentPublisherId != null) { await purgePublisherCache(publisherId: currentPublisherId); } - await apiExporter.synchronizePackage(packageName); + triggerPackagePostUpdates( + packageName, + skipTask: true, + skipVersionsExport: true, + ); return rs; } @@ -1299,7 +1303,7 @@ class PackageBackend { sw.reset(); _logger.info('Invalidating cache for package ${newVersion.package}.'); - await purgePackageCache(newVersion.package); + triggerPackagePostUpdates(newVersion.package, taskUpdateDependents: true); // Let's not block the upload response on these post-upload tasks. // The operations should either be non-critical, or should be retried @@ -1324,12 +1328,10 @@ class PackageBackend { await Future.wait([ if (activeConfiguration.isPublishedEmailNotificationEnabled) emailBackend.trySendOutgoingEmail(outgoingEmail), - taskBackend.trackPackage(newVersion.package, updateDependents: true), - apiExporter.synchronizePackage(newVersion.package), apiExporter.synchronizeAllPackagesAtomFeed(), + tarballStorage.updateContentDispositionOnPublicBucket( + newVersion.package, newVersion.version!), ]); - await tarballStorage.updateContentDispositionOnPublicBucket( - newVersion.package, newVersion.version!); } catch (e, st) { final v = newVersion.qualifiedVersionKey; _logger.severe('Error post-processing package upload $v', e, st); @@ -1581,7 +1583,7 @@ class PackageBackend { package: packageName, )); }); - await purgePackageCache(packageName); + triggerPackagePostUpdates(packageName, skipTask: true, skipExport: true); } Future _validatePackageUploader( @@ -1657,7 +1659,7 @@ class PackageBackend { uploaderUser: uploader, )); }); - await purgePackageCache(packageName); + triggerPackagePostUpdates(packageName, skipTask: true, skipExport: true); return api.SuccessMessage( success: api.Message( message: @@ -2096,3 +2098,49 @@ class _VersionTransactionDataAcccess { return await _tx.query(pkgKey).run().toList(); } } + +/// Triggers post-update event processing after a [Package] object is part of +/// a transaction. +/// +/// Returns a record with an optionally awaitable [Future] in case the caller needs +/// wait the updates before yielding its response. +({Future future}) triggerPackagePostUpdates( + String package, { + /// Skip trigger a new analysis on the package. + bool skipTask = false, + + /// Skip triggering a new export to the CDN bucket. + bool skipExport = false, + + /// Skip only the version-related exports to the CDN bucket, keeps the + /// package-related operations. + /// TODO: implement this in API exporter. + bool skipVersionsExport = false, + + /// Pass the force-deletion flag to the package export operation. + bool exportForceDelete = false, + + /// Pass the update-dependents flag to the task update operation. + bool taskUpdateDependents = false, +}) { + Future add(Future Function() fn) { + return asyncQueue.addAsyncFn(fn).future; + } + + final futures = [ + add(() => purgePackageCache(package)), + if (!skipTask) + add(() => taskBackend.trackPackage( + package, + updateDependents: taskUpdateDependents, + )), + if (!skipExport) + add(() => apiExporter.synchronizePackage( + package, + forceDelete: exportForceDelete, + // TODO: implement and use [skipVersionsExport] + )), + ]; + + return (future: Future.wait(futures)); +} diff --git a/app/lib/service/async_queue/async_queue.dart b/app/lib/service/async_queue/async_queue.dart index 979c5db896..e66a68fc7c 100644 --- a/app/lib/service/async_queue/async_queue.dart +++ b/app/lib/service/async_queue/async_queue.dart @@ -44,12 +44,14 @@ class AsyncQueue { AsyncQueue() : _zone = Zone.current; - void addAsyncFn(AsyncFn fn) { + ({Future future}) addAsyncFn(AsyncFn fn) { if (_closed) { throw StateError('AsyncQueue is closed, task was not accepted.'); } - _queue.add(_Task(fn, StackTrace.current)); + final task = _Task(fn, StackTrace.current); + _queue.add(task); _triggerProcessing(); + return (future: task.completer.future); } void _triggerProcessing() { @@ -61,10 +63,12 @@ class AsyncQueue { final first = _queue.removeFirst(); try { await first.fn(); + first.completer.complete(); } catch (e, st) { final trace = Chain([Trace.from(first.origin), Trace.from(st)]).terse; stderr.writeln('Error executing off-request function: $e\n$trace'); _logger.severe('Error executing off-request function.', e, trace); + first.completer.completeError(e, st); } } @@ -80,6 +84,7 @@ class AsyncQueue { class _Task { final AsyncFn fn; final StackTrace origin; + final completer = Completer(); _Task(this.fn, this.origin); }