Skip to content

Update progress WIP #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: v1.2-histrionicus
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/include/duckdb/web/webdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class WebDB {
public:
/// A connection
class Connection {
friend WebDB;

protected:
/// The webdb
WebDB& webdb_;
Expand Down
44 changes: 44 additions & 0 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "duckdb/web/webdb.h"

#include <emscripten/val.h>

#include <chrono>
#include <cstddef>
#include <cstdio>
Expand Down Expand Up @@ -783,11 +785,53 @@ std::string WebDB::Tokenize(std::string_view text) {
/// Get the version
std::string_view WebDB::GetVersion() { return database_->LibraryVersion(); }

class ProgressBarCustom : public ProgressBarDisplay {
double value{0.0};
double times{0.0};
double to_send{1.0};

public:
ProgressBarCustom() {
value = 0.0;
times = 0.0;
to_send = 1.0;
}
~ProgressBarCustom() {}
static void SendMessage(double end, double percentage, double times) {
emscripten::val::global("DUCKDB_RUNTIME").call<void>("progressUpdate", end ? 1.0 : 0.0, percentage, times);
}

public:
void Update(double percentage) {
if (percentage >= value + 1.0) {
value = percentage;
times = 1.0;
SendMessage(false, percentage, times);
to_send = 10.0;
} else {
times += 1.0;
if (times >= to_send) {
SendMessage(false, percentage, times);
to_send *= 10.0;
}
}
}
void Finish() {
SendMessage(true, value, times);
value = 0.0;
times = 0.0;
to_send = 1.0;
}
static unique_ptr<ProgressBarDisplay> GetProgressBar() { return make_uniq<ProgressBarCustom>(); }
};

/// Create a session
WebDB::Connection* WebDB::Connect() {
auto conn = duckdb::make_uniq<WebDB::Connection>(*this);
auto conn_ptr = conn.get();
connections_.insert({conn_ptr, std::move(conn)});
ClientConfig::GetConfig(*conn_ptr->connection_.context).wait_time = 1;
ClientConfig::GetConfig(*conn_ptr->connection_.context).display_create_func = ProgressBarCustom::GetProgressBar;
return conn_ptr;
}

Expand Down
6 changes: 6 additions & 0 deletions packages/duckdb-wasm/src/bindings/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ export interface DuckDBRuntime {
prepareFileHandles?: (path: string[], protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;
prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;

// Internal API - experimental
progressUpdate(final: number, percentage: number, iteration:number): void;

// Call a scalar UDF function
callScalarUDF(
mod: DuckDBModule,
Expand All @@ -185,6 +188,9 @@ export const DEFAULT_RUNTIME: DuckDBRuntime = {
getLastFileModificationTime: (_mod: DuckDBModule, _fileId: number): number => {
return 0;
},
progressUpdate: (_final: number, _percentage: number, _iteration: number): void => {
return;
},
truncateFile: (_mod: DuckDBModule, _fileId: number, _newSize: number): void => {},
readFile: (_mod: DuckDBModule, _fileId: number, _buffer: number, _bytes: number, _location: number): number => {
return 0;
Expand Down
8 changes: 8 additions & 0 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import {StatusCode} from '../status';
import {
WorkerResponseType,
} from '../parallel/worker_request';
import {addS3Headers, getHTTPUrl} from '../utils';

import {
Expand Down Expand Up @@ -687,6 +690,11 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
return 0;
},
progressUpdate: (done: number, percentage: number, repeat: number): void => {
if (postMessage) {
postMessage({requestId: 0, type: WorkerResponseType.PROGRESS_UPDATE, data: {status: done?"completed":"in-progress", percentage: percentage, repetitions: repeat}});
}
},
checkDirectory: (mod: DuckDBModule, pathPtr: number, pathLen: number) => {
const path = readString(mod, pathPtr, pathLen);
console.log(`checkDirectory: ${path}`);
Expand Down
3 changes: 3 additions & 0 deletions packages/duckdb-wasm/src/bindings/runtime_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ export const NODE_RUNTIME: DuckDBRuntime & {
}
return 0;
},
progressUpdate: (_final: number, _percentage: number, _iteration: number): void => {
return;
},
getLastFileModificationTime: (mod: DuckDBModule, fileId: number) => {
try {
const file = NODE_RUNTIME.resolveFileInfo(mod, fileId);
Expand Down
9 changes: 9 additions & 0 deletions packages/duckdb-wasm/src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ export type LogEntry<O, T, E, V> = {
readonly value: V;
};

export type ProgressEntry = {
readonly status: string;
readonly percentage: string;
readonly repetitions: string;
}

/** An execution progress handler */
export type ExecutionProgressHandler = (p: ProgressEntry) => void;

export type LogEntryVariant =
| LogEntry<LogOrigin.BINDINGS, LogTopic.INSTANTIATE, LogEvent.ERROR, string>
| LogEntry<LogOrigin.BINDINGS, LogTopic.QUERY, LogEvent.START, void>
Expand Down
10 changes: 10 additions & 0 deletions packages/duckdb-wasm/src/parallel/async_bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { InstantiationProgress } from '../bindings/progress';
import { arrowToSQLField } from '../json_typedef';
import { WebFile } from '../bindings/web_file';
import { DuckDBDataProtocol } from '../bindings';
import { ProgressEntry } from '../log';

const TEXT_ENCODER = new TextEncoder();

Expand All @@ -32,6 +33,9 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
/** Instantiate the module */
protected _onInstantiationProgress: ((p: InstantiationProgress) => void)[] = [];

/** Progress callbacks */
protected _onExecutionProgress: ((p: ProgressEntry) => void)[] = [];

/** The logger */
protected readonly _logger: Logger;
/** The worker */
Expand Down Expand Up @@ -122,6 +126,12 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
const response = event.data as WorkerResponseVariant;
switch (response.type) {
// Request failed?
case WorkerResponseType.PROGRESS_UPDATE: {
for (const p of this._onExecutionProgress) {
p(response.data);
}
return;
}
case WorkerResponseType.LOG: {
this._logger.log(response.data);
return;
Expand Down
4 changes: 3 additions & 1 deletion packages/duckdb-wasm/src/parallel/worker_request.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CSVInsertOptions, JSONInsertOptions, ArrowInsertOptions } from '../bindings/insert_options';
import { LogEntryVariant } from '../log';
import { LogEntryVariant, ProgressEntry } from '../log';
import { ScriptTokens } from '../bindings/tokens';
import { FileStatistics } from '../bindings/file_stats';
import { DuckDBConfig } from '../bindings/config';
Expand Down Expand Up @@ -57,6 +57,7 @@ export enum WorkerResponseType {
FILE_STATISTICS = 'FILE_STATISTICS',
INSTANTIATE_PROGRESS = 'INSTANTIATE_PROGRESS',
LOG = 'LOG',
PROGRESS_UPDATE = 'PROGRESS_UPDATE',
OK = 'OK',
PREPARED_STATEMENT_ID = 'PREPARED_STATEMENT_ID',
QUERY_PLAN = 'QUERY_PLAN',
Expand Down Expand Up @@ -154,6 +155,7 @@ export type WorkerResponseVariant =
| WorkerResponse<WorkerResponseType.FILE_STATISTICS, FileStatistics>
| WorkerResponse<WorkerResponseType.INSTANTIATE_PROGRESS, InstantiationProgress>
| WorkerResponse<WorkerResponseType.LOG, LogEntryVariant>
| WorkerResponse<WorkerResponseType.PROGRESS_UPDATE, ProgressEntry>
| WorkerResponse<WorkerResponseType.OK, null>
| WorkerResponse<WorkerResponseType.PREPARED_STATEMENT_ID, number>
| WorkerResponse<WorkerResponseType.QUERY_PLAN, Uint8Array>
Expand Down
Loading