Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose the peerId that created the operations #551

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/interval-discovery/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
},
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@ts-drp/blueprints": "0.9.2",
"@ts-drp/blueprints": "0.10.0",
"race-event": "^1.3.0"
}
}
2 changes: 1 addition & 1 deletion packages/object/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"@bufbuild/protobuf": "^2.0.0",
"@ts-drp/blueprints": "^0.10.0",
"@ts-drp/keychain": "^0.10.0",
"@ts-drp/test-utils": "^0.10.0",
"@ts-drp/test-utils": "^0.10.0",
"@types/benchmark": "^2.1.5",
"benchmark": "^2.1.4",
"pprof": "^4.0.0",
Expand Down
2 changes: 2 additions & 0 deletions packages/object/src/acl/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ACLGroup,
ActionType,
type DRPPublicCredential,
type DrpRuntimeContext,
type IACL,
type PeerPermissions,
type ResolveConflictsType,
Expand All @@ -24,6 +25,7 @@ function getPeerPermissions(params?: {

export class ObjectACL implements IACL {
semanticsType = SemanticsType.pair;
context: DrpRuntimeContext = { caller: "" };

// if true, any peer can write to the object
permissionless: boolean;
Expand Down
80 changes: 59 additions & 21 deletions packages/object/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
const operationDRP = cloneDeep(initialDRP);
let result: unknown | Promise<unknown> = undefined;
try {
result = this._applyOperation(operationDRP, operation);
result = this._applyOperation(operationDRP, operation, this.hashGraph.peerId);
} catch (e) {
log.error(`::drpObject::callFn: ${e}`);
return result;
Expand All @@ -240,7 +240,10 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
}

private _hasStateChanged(a: IDRP | IACL, b: IDRP | IACL): boolean {
return Object.keys(a).some((key) => !deepEqual(a[key], b[key]));
return Object.keys(a).some((key) => {
if (key === "context") return false;
return !deepEqual(a[key], b[key]);
});
}

private _processOperationResult(
Expand Down Expand Up @@ -367,15 +370,17 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
const drp = await this._computeDRP(
vertex.dependencies,
preComputeLca,
vertex.operation.drpType === DrpType.DRP ? vertex.operation : undefined
vertex.operation.drpType === DrpType.DRP ? vertex.operation : undefined,
vertex.peerId
);
await this._setDRPState(vertex, preComputeLca, this._getDRPState(drp));
}

const acl = await this._computeObjectACL(
vertex.dependencies,
preComputeLca,
vertex.operation.drpType === DrpType.ACL ? vertex.operation : undefined
vertex.operation.drpType === DrpType.ACL ? vertex.operation : undefined,
vertex.peerId
);
await this._setObjectACLState(vertex, preComputeLca, this._getDRPState(acl));

Expand Down Expand Up @@ -417,7 +422,11 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
}

// apply the operation to the DRP
private _applyOperation(drp: IDRP, operation: Operation): unknown | Promise<unknown> {
private _applyOperation(
drp: IDRP,
operation: Operation,
caller: string
): unknown | Promise<unknown> {
const { opType, value } = operation;

const typeParts = opType.split(".");
Expand All @@ -430,6 +439,10 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
}
}

if (target.context) {
target.context.caller = caller;
}

const methodName = typeParts[typeParts.length - 1];
if (typeof target[methodName] !== "function") {
throw new Error(`${opType} is not a function`);
Expand All @@ -446,7 +459,8 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
private _computeDRP(
vertexDependencies: Hash[],
preCompute?: LowestCommonAncestorResult,
vertexOperation?: Operation
vertexOperation?: Operation,
caller?: string
): IDRP | Promise<IDRP> {
if (!this.drp || !this.originalDRP) {
throw new Error("DRP is undefined");
Expand All @@ -466,23 +480,31 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
for (const entry of state.state) {
drp[entry.key] = entry.value;
}
const operations: Operation[] = [];
const operations: [Operation, string][] = [];
for (const vertex of linearizedVertices) {
if (vertex.operation && vertex.operation.drpType === DrpType.DRP) {
operations.push(vertex.operation);
operations.push([vertex.operation, vertex.peerId]);
}
}
if (vertexOperation && vertexOperation.drpType === DrpType.DRP) {
operations.push(vertexOperation);
if (!caller) {
throw new Error("Caller is undefined");
}
operations.push([vertexOperation, caller]);
}

return processSequentially(operations, (op: Operation) => this._applyOperation(drp, op), drp);
return processSequentially(
operations,
([op, caller]: [Operation, string]) => this._applyOperation(drp, op, caller),
drp
);
}

private _computeObjectACL(
vertexDependencies: Hash[],
preCompute?: LowestCommonAncestorResult,
vertexOperation?: Operation
vertexOperation?: Operation,
caller?: string
): IACL | Promise<IACL> {
if (!this.acl || !this.originalObjectACL) {
throw new Error("ObjectACL is undefined");
Expand All @@ -503,18 +525,25 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
acl[entry.key] = entry.value;
}

const operations: Operation[] = [];
const operations: [Operation, string][] = [];
for (const v of linearizedVertices) {
if (v.operation && v.operation.drpType === DrpType.ACL) {
operations.push(v.operation);
operations.push([v.operation, v.peerId]);
}
}

if (vertexOperation && vertexOperation.drpType === DrpType.ACL) {
operations.push(vertexOperation);
if (!caller) {
throw new Error("Caller is undefined");
}
operations.push([vertexOperation, caller]);
}

return processSequentially(operations, (op: Operation) => this._applyOperation(acl, op), acl);
return processSequentially(
operations,
([op, caller]: [Operation, string]) => this._applyOperation(acl, op, caller),
acl
);
}

private computeLCA(vertexDependencies: string[]): LowestCommonAncestorResult {
Expand Down Expand Up @@ -552,18 +581,20 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
private _computeDRPState(
vertexDependencies: Hash[],
preCompute?: LowestCommonAncestorResult,
vertexOperation?: Operation
vertexOperation?: Operation,
caller?: string
): DRPState | Promise<DRPState> {
const drp = this._computeDRP(vertexDependencies, preCompute, vertexOperation);
const drp = this._computeDRP(vertexDependencies, preCompute, vertexOperation, caller);
return isPromise(drp) ? drp.then(this._getDRPState) : this._getDRPState(drp);
}

private _computeObjectACLState(
vertexDependencies: Hash[],
preCompute?: LowestCommonAncestorResult,
vertexOperation?: Operation
vertexOperation?: Operation,
caller?: string
): DRPState | Promise<DRPState> {
const acl = this._computeObjectACL(vertexDependencies, preCompute, vertexOperation);
const acl = this._computeObjectACL(vertexDependencies, preCompute, vertexOperation, caller);
return isPromise(acl) ? acl.then(this._getDRPState) : this._getDRPState(acl);
}

Expand All @@ -574,7 +605,13 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
): void | Promise<void> {
if (this.acl) {
const stateComputation =
drpState ?? this._computeObjectACLState(vertex.dependencies, preCompute, vertex.operation);
drpState ??
this._computeObjectACLState(
vertex.dependencies,
preCompute,
vertex.operation,
vertex.peerId
);

return handlePromiseOrValue(stateComputation, (state) => {
this.aclStates.set(vertex.hash, state);
Expand All @@ -588,7 +625,8 @@ export class DRPObject implements DRPObjectBase, IDRPObject {
drpState?: DRPState
): void | Promise<void> {
const stateComputation =
drpState ?? this._computeDRPState(vertex.dependencies, preCompute, vertex.operation);
drpState ??
this._computeDRPState(vertex.dependencies, preCompute, vertex.operation, vertex.peerId);

return handlePromiseOrValue(stateComputation, (state) => {
this.drpStates.set(vertex.hash, state);
Expand Down
11 changes: 8 additions & 3 deletions packages/object/tests/drpobject.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { SetDRP } from "@ts-drp/blueprints";
import { AsyncCounterDRP } from "@ts-drp/test-utils";
import {
ActionType,
type DrpRuntimeContext,
type IDRP,
type ResolveConflictsType,
SemanticsType,
Expand Down Expand Up @@ -69,7 +70,6 @@ describe("Test for duplicate call issue", () => {

class CounterDRP implements IDRP {
semanticsType = SemanticsType.pair;

private _counter: number;

constructor() {
Expand All @@ -89,7 +89,7 @@ describe("Test for duplicate call issue", () => {

test("Detect duplicate call", () => {
const obj = new DRPObject({
peerId: "",
peerId: "peer1",
drp: new CounterDRP(),
});

Expand Down Expand Up @@ -135,6 +135,7 @@ describe("Merging vertices tests", () => {

class AsyncPushToArrayDRP implements IDRP {
semanticsType = SemanticsType.pair;
context: DrpRuntimeContext = { caller: "" };

private _array: number[];

Expand Down Expand Up @@ -225,12 +226,16 @@ describe("Async push to array DRP", () => {
expect(drp2.query_array()).toEqual([1, 2, 3]);

await drp1.pushAsync(4);
expect(drp1.context.caller).toEqual("peer1");
vi.advanceTimersByTime(1000);
drp1.push(5);
expect(drp1.context.caller).toEqual("peer1");
vi.advanceTimersByTime(1000);
await drp1.pushAsync(6);
await drp2.pushAsync(6);
expect(drp2.context.caller).toEqual("peer2");
vi.advanceTimersByTime(1000);
await drpObject2.merge(drpObject1.hashGraph.getAllVertices());
await drpObject1.merge(drpObject2.hashGraph.getAllVertices());
expect(drp1.query_array()).toEqual([1, 2, 3, 4, 5, 6]);
expect(drp2.query_array()).toEqual([1, 2, 3, 4, 5, 6]);
});
Expand Down
106 changes: 106 additions & 0 deletions packages/object/tests/hashgraph.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { MapConflictResolution, MapDRP, SetDRP } from "@ts-drp/blueprints";
import {
ACLGroup,
ActionType,
type DrpRuntimeContext,
DrpType,
type Hash,
type IDRP,
Expand Down Expand Up @@ -1115,3 +1116,108 @@ describe("HashGraph hook tests", () => {
}
});
});

class SetDRPWithContext<T> implements IDRP {
semanticsType = SemanticsType.pair;
context: DrpRuntimeContext = { caller: "" };
private _set: Set<T>;

constructor() {
this._set = new Set();
}

add(value: T): void {
this._set.add(value);
}

delete(value: T): void {
this._set.delete(value);
}

query_has(value: T): boolean {
return this._set.has(value);
}

query_getValues(): T[] {
return Array.from(this._set.values());
}
}

describe("DRP Context tests", () => {
let obj1: DRPObject;
let obj2: DRPObject;
let obj3: DRPObject;

beforeEach(() => {
obj1 = new DRPObject({ peerId: "peer1", acl, drp: new SetDRPWithContext<number>() });
obj2 = new DRPObject({ peerId: "peer2", acl, drp: new SetDRPWithContext<number>() });
obj3 = new DRPObject({ peerId: "peer3", acl, drp: new SetDRPWithContext<number>() });
});

test("caller should be empty if no operation is applied", () => {
const drp1 = obj1.drp as SetDRPWithContext<number>;
expect(drp1.context.caller).toBe("");
});

test("caller should be current node's peerId if operation is applied locally", () => {
const drp1 = obj1.drp as SetDRPWithContext<number>;
for (let i = 0; i < 10; i++) {
drp1.add(i);
expect(drp1.context.caller).toBe("peer1");
}

const drp2 = obj2.drp as SetDRPWithContext<number>;
for (let i = 0; i < 10; i++) {
drp2.add(i);
expect(drp2.context.caller).toBe("peer2");
}
});

test("caller should be the peerId of the node that applied the operation", async () => {
const drp1 = obj1.drp as SetDRPWithContext<number>;
const drp2 = obj2.drp as SetDRPWithContext<number>;
const drp3 = obj3.drp as SetDRPWithContext<number>;

for (let i = 1; i <= 10; ++i) {
drp1.add(i);
expect(drp1.context.caller).toBe("peer1");
await obj2.merge(obj1.hashGraph.getAllVertices());

drp2.add(10 + i);
const vertices2 = obj2.hashGraph.getAllVertices();
await obj1.merge([vertices2[vertices2.length - 1]]);
expect(drp1.context.caller).toBe("peer2");

await obj3.merge(obj2.hashGraph.getAllVertices());
drp3.add(20 + i);
const vertices3 = obj3.hashGraph.getAllVertices();
await obj2.merge([vertices3[vertices3.length - 1]]);
expect(drp2.context.caller).toBe("peer3");
await obj1.merge([vertices3[vertices3.length - 1]]);
expect(drp1.context.caller).toBe("peer3");
}
});

test("should not update the caller if the state is not changed", async () => {
const drp1 = obj1.drp as SetDRPWithContext<number>;
const drp2 = obj2.drp as SetDRPWithContext<number>;

for (let i = 0; i < 10; ++i) {
if (i % 2 === 0) {
drp1.add(i);
expect(drp1.context.caller).toBe("peer1");
await obj2.merge(obj1.hashGraph.getAllVertices());
expect(drp2.context.caller).toBe("peer1");
drp2.add(i);
expect(drp2.context.caller).toBe("peer1");
} else {
drp2.add(i);
expect(drp2.context.caller).toBe("peer2");
await obj1.merge(obj2.hashGraph.getAllVertices());
expect(drp1.context.caller).toBe("peer2");
drp1.add(i);
expect(drp1.context.caller).toBe("peer2");
}
}
});
});
Loading
Loading