Skip to content

Commit 9bf3c7b

Browse files
committed
Initial migration of task backend to typed_sql
1 parent f384748 commit 9bf3c7b

File tree

11 files changed

+636
-513
lines changed

11 files changed

+636
-513
lines changed

app/build.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ targets:
66
- '$package$'
77
- 'lib/$lib$'
88
- 'lib/account/models.dart'
9+
- 'lib/database/**'
910
- 'lib/admin/models.dart'
1011
- 'lib/dartdoc/models.dart'
1112
- 'lib/frontend/handlers/pubapi.dart'

app/lib/admin/tools/delete_all_staging.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ Future<String> executeDeleteAllStaging(List<String> args) async {
5555
dbService.query<ModeratedPackage>(): 500,
5656
dbService.query<NeatTaskStatus>(): 500,
5757
dbService.query<Secret>(): 500,
58+
// ignore: deprecated_member_use_from_same_package
5859
dbService.query<PackageState>(): 100,
5960
};
6061

app/lib/search/backend.dart

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import 'package:meta/meta.dart';
1919
import 'package:pana/src/dartdoc/pub_dartdoc_data.dart';
2020
import 'package:path/path.dart' as p;
2121
import 'package:pool/pool.dart';
22+
import 'package:pub_dev/database/model.dart';
2223
import 'package:pub_dev/shared/monitoring.dart';
2324
import 'package:retry/retry.dart';
2425

@@ -40,7 +41,6 @@ import '../shared/storage.dart';
4041
import '../shared/versions.dart';
4142
import '../task/backend.dart';
4243
import '../task/global_lock.dart';
43-
import '../task/models.dart';
4444

4545
import 'dart_sdk_mem_index.dart';
4646
import 'flutter_sdk_mem_index.dart';
@@ -85,10 +85,11 @@ void registerSearchIndex(SearchIndex index) =>
8585

8686
/// Datastore-related access methods for the search service
8787
class SearchBackend {
88+
final Database<PrimaryDatabase> db;
8889
final DatastoreDB _db;
8990
final VersionedJsonStorage _snapshotStorage;
9091

91-
SearchBackend(this._db, Bucket snapshotBucket)
92+
SearchBackend(this.db, this._db, Bucket snapshotBucket)
9293
: _snapshotStorage = VersionedJsonStorage(snapshotBucket, 'snapshot/');
9394

9495
/// Runs a forever loop and tries to get a global lock.
@@ -255,11 +256,14 @@ class SearchBackend {
255256
addResult(p.name!, p.updated!);
256257
}
257258

258-
final q3 = _db.query<PackageState>()
259-
..filter('finished >=', updatedThreshold)
260-
..order('-finished');
261-
await for (final s in q3.run()) {
262-
addResult(s.package, s.finished);
259+
final q2 = db.tasks
260+
.where((task) =>
261+
task.runtimeVersion.equalsValue(runtimeVersion) &
262+
task.finished.isAfterValue(updatedThreshold))
263+
.orderBy((task) => [(task.finished, Order.descending)])
264+
.select((task) => (task.package, task.finished));
265+
await for (final (package, finished) in q2.stream()) {
266+
addResult(package, finished);
263267
}
264268

265269
return results;

app/lib/service/services.dart

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// BSD-style license that can be found in the LICENSE file.
44

55
import 'dart:async' show FutureOr, Zone;
6+
import 'dart:io' show Platform;
67

78
import 'package:_pub_shared/utils/http.dart';
89
import 'package:appengine/appengine.dart';
@@ -14,6 +15,10 @@ import 'package:gcloud/service_scope.dart';
1415
import 'package:gcloud/storage.dart';
1516
import 'package:googleapis_auth/auth_io.dart' as auth;
1617
import 'package:logging/logging.dart';
18+
import 'package:postgres/postgres.dart' show Pool, Endpoint, PoolSettings;
19+
import 'package:pub_dev/database/model.dart';
20+
import 'package:pub_dev/fake/backend/fake_database.dart'
21+
show createFakeDatabaseAdaptor;
1722
import 'package:pub_dev/package/api_export/api_exporter.dart';
1823
import 'package:pub_dev/search/handlers.dart';
1924
import 'package:pub_dev/service/async_queue/async_queue.dart';
@@ -87,6 +92,29 @@ Future<void> withServices(FutureOr<void> Function() fn) async {
8792
final retryingAuthClient = httpRetryClient(innerClient: authClient);
8893
registerScopeExitCallback(() async => retryingAuthClient.close());
8994

95+
final databaseAdapter = DatabaseAdapter.postgres(
96+
Pool.withEndpoints(
97+
[
98+
Endpoint(
99+
host: Platform.environment['PGHOST'] ?? '127.0.0.1',
100+
port: int.tryParse(Platform.environment['PGPORT'] ?? '') ?? 5432,
101+
database: Platform.environment['PGDATABASE'] ?? 'postgres',
102+
username: Platform.environment['PGUSER'] ?? 'postgres',
103+
password: Platform.environment['PGPASSWORD'] ?? 'postgres',
104+
),
105+
],
106+
settings: PoolSettings(
107+
applicationName: 'pub-dev',
108+
maxConnectionCount: 10,
109+
),
110+
),
111+
);
112+
registerDatabase(Database<PrimaryDatabase>(
113+
databaseAdapter,
114+
SqlDialect.postgres(),
115+
));
116+
registerScopeExitCallback(() => databaseAdapter.close());
117+
90118
// override storageService with retrying http client
91119
registerStorageService(
92120
Storage(retryingAuthClient, activeConfiguration.projectId));
@@ -179,6 +207,14 @@ Future<R> withFakeServices<R>({
179207
}
180208

181209
// register fake services that would have external dependencies
210+
final databaseAdapter = createFakeDatabaseAdaptor();
211+
registerDatabase(Database<PrimaryDatabase>(
212+
databaseAdapter,
213+
SqlDialect.postgres(),
214+
));
215+
await database.createTables();
216+
registerScopeExitCallback(() => databaseAdapter.close());
217+
182218
registerSecretBackend(FakeSecretBackend({}));
183219
registerAuthProvider(FakeAuthProvider());
184220
registerScopeExitCallback(authProvider.close);
@@ -261,7 +297,7 @@ Future<R> _withPubServices<R>(FutureOr<R> Function() fn) async {
261297
registerIndexUpdater(IndexUpdater(dbService));
262298
registerPublisherBackend(PublisherBackend(dbService));
263299
registerScoreCardBackend(ScoreCardBackend(dbService));
264-
registerSearchBackend(SearchBackend(dbService,
300+
registerSearchBackend(SearchBackend(database, dbService,
265301
storageService.bucket(activeConfiguration.searchSnapshotBucketName!)));
266302
registerSearchClient(SearchClient());
267303
registerSearchAdapter(SearchAdapter());
@@ -280,6 +316,7 @@ Future<R> _withPubServices<R>(FutureOr<R> Function() fn) async {
280316
storageService.bucket(activeConfiguration.publicPackagesBucketName!),
281317
));
282318
registerTaskBackend(TaskBackend(
319+
database,
283320
dbService,
284321
storageService.bucket(activeConfiguration.taskResultBucketName!),
285322
));

0 commit comments

Comments
 (0)