Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 12 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
"module": "dist/mjs/index.js",
"types": "dist/mjs/index.d.ts",
"scripts": {
"lint": "npx eslint .",
"lint:fix": "npx eslint --fix .",
"format": "npx prettier --write --ignore-path .gitignore src",
"format:check": "npx prettier --check --ignore-path .gitignore src",
"test": "npx vitest run",
"coverage": "npx vitest run --coverage",
"lint": "eslint .",
"lint:fix": "eslint --fix .",
"format": "prettier --write --ignore-path .gitignore src",
"format:check": "prettier --check --ignore-path .gitignore src",
"test": "vitest run",
"test:watch": "vitest",
"coverage": "vitest run --coverage",
"build": "npm run build:cjs && npm run build:mjs",
"build:mjs": "npx tsc --project tsconfig.mjs.json && cp res/package.mjs.json dist/mjs/package.json",
"build:cjs": "npx tsc --project tsconfig.cjs.json && cp res/package.cjs.json dist/cjs/package.json",
"prepare": "npx husky install",
"docs": "npx typedoc"
"build:mjs": "tsc --project tsconfig.mjs.json && cp res/package.mjs.json dist/mjs/package.json",
"build:cjs": "tsc --project tsconfig.cjs.json && cp res/package.cjs.json dist/cjs/package.json",
"prepare": "husky install",
"docs": "typedoc"
},
"repository": {
"type": "git",
Expand Down Expand Up @@ -50,6 +51,7 @@
"eslint-plugin-jsdoc": "^46.0.0",
"eslint-plugin-security": "^1.7.1",
"husky": "^8.0.0",
"ky": "^1.2.0",
"mock-socket": "^9.1.5",
"prettier": "^3.1.0",
"typedoc": "^0.25.3",
Expand Down
2 changes: 2 additions & 0 deletions src/Model.discontinuity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ describe('Model', () => {
);
channel.detach = vi.fn<any, any>();
ably.channels.release = vi.fn<any, any>();
channel.setOptions = vi.fn();

let counter = 0;

Expand Down Expand Up @@ -133,6 +134,7 @@ describe('Model', () => {
);
channel.detach = vi.fn<any, any>();
ably.channels.release = vi.fn<any, any>();
channel.setOptions = vi.fn();

let counter = 0;

Expand Down
64 changes: 64 additions & 0 deletions src/ModelsClient.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Realtime } from 'ably/promises';
import { it, describe, expect, beforeEach, vi } from 'vitest';

import ModelsClient from './ModelsClient.js';
import { createAblyApp } from './utilities/test/createAblyApp.js';

interface ModelsTestContext {
modelsClient: ModelsClient;
channelName: string;
}

describe('ModelsClients integration', () => {
beforeEach<ModelsTestContext>(async (context) => {
const name = 'test';
const data = await createAblyApp({
keys: [{}],
namespaces: [{ id: name, persisted: true }],
channels: [
{
name,
presence: [
{ clientId: 'John', data: '[email protected]' },
{ clientId: 'Dave', data: '[email protected]' },
],
},
],
});
const ably = new Realtime.Promise({
key: data.keys[0].keyStr,
environment: 'sandbox',
});

const modelsClient = new ModelsClient({ ably, logLevel: 'debug' });

context.modelsClient = modelsClient;
context.channelName = name;
});

it<ModelsTestContext>('shows no erros when there is a racing condition between subscribe and unsubscribe', async ({
modelsClient,
channelName,
}) => {
const model = modelsClient.models.get({
name: channelName,
channelName,
sync: async (x) => await new Promise((resolve) => setTimeout(() => resolve(x), 500)),
merge: vi.fn(),
});

expect(model.state).toBe('initialized');
model.subscribe(vi.fn());
expect(model.state).toBe('syncing');
await model.sync('1');
model.unsubscribe(vi.fn());
expect(model.state).toBe('ready');
model.subscribe(vi.fn());
await model.sync('1');
setTimeout(() => model.unsubscribe(vi.fn()), 300);
model.subscribe(vi.fn());
model.dispose();

expect(model.state).toBe('disposed');
});
});
65 changes: 65 additions & 0 deletions src/stream/Stream.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { Realtime, Types } from 'ably/promises';
import pino from 'pino';
import { it, describe, expect, beforeEach, afterEach } from 'vitest';

import Stream from './Stream.js';
import { defaultSyncOptions, defaultEventBufferOptions } from '../Options.js';
import type { StreamOptions } from '../types/stream.js';
import { createAblyApp } from '../utilities/test/createAblyApp.js';

interface StreamTestContext extends StreamOptions {
stream: Stream;
channel: Types.RealtimeChannelPromise;
}

describe('Stream integration', () => {
beforeEach<StreamTestContext>(async (context) => {
const name = 'test';
const data = await createAblyApp({
keys: [{}],
namespaces: [{ id: name, persisted: true }],
channels: [
{
name,
presence: [
{ clientId: 'John', data: '[email protected]' },
{ clientId: 'Dave', data: '[email protected]' },
],
},
],
});
const ably = new Realtime({
key: data.keys[0].keyStr,
environment: 'sandbox',
});
const logger = pino({ level: 'silent' });
const channel = ably.channels.get(name);
const stream = new Stream({
ably,
logger,
channelName: name,
syncOptions: defaultSyncOptions,
eventBufferOptions: defaultEventBufferOptions,
});

context.stream = stream;
context.channel = channel;
});

afterEach<StreamTestContext>(async ({ stream, channel }) => {
await stream.dispose();
await channel.detach();
});

it<StreamTestContext>('sets agent options when state is not attached', async ({ channel, stream }) => {
await stream.replay('0');
//@ts-ignore - `agent` is filtered out in `channel.params`, so that's the only way to check this
expect(channel.channelOptions.params).toEqual({ agent: 'models/0.0.2' }); // initial call from test
});

it<StreamTestContext>('does not sets agent options when state is attached', async ({ channel }) => {
await channel.attach();
//@ts-ignore - `agent` is filtered out in `channel.params`, so that's the only way to check this
expect(channel.channelOptions.params).toEqual(undefined); // initial call from test
});
});
20 changes: 14 additions & 6 deletions src/stream/Stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { defaultSyncOptions, defaultEventBufferOptions } from '../Options.js';
import type { StreamOptions } from '../types/stream.js';
import { statePromise } from '../utilities/promises.js';
import { createMessage } from '../utilities/test/messages.js';
import { VERSION } from '../version.js';

vi.mock('ably/promises');

Expand Down Expand Up @@ -61,6 +60,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand All @@ -77,11 +77,7 @@ describe('Stream', () => {

expect(ably.channels.get).toHaveBeenCalledTimes(2);
expect(ably.channels.get).toHaveBeenNthCalledWith(1, channelName); // initial call from test
expect(ably.channels.get).toHaveBeenNthCalledWith(2, channelName, {
params: {
agent: `models/${VERSION}`,
},
}); // internal call with agent channel param
expect(ably.channels.get).toHaveBeenNthCalledWith(2, channelName); // the state in ably is undefined, so the internal call with agent channel param is not triggered
expect(channel.subscribe).toHaveBeenCalledOnce();
expect(channel.history).toHaveBeenCalledOnce();
expect(channel.history).toHaveBeenNthCalledWith(1, {
Expand All @@ -99,6 +95,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -146,6 +143,7 @@ describe('Stream', () => {
hasNext: () => false,
};
});
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -225,6 +223,7 @@ describe('Stream', () => {
hasNext: () => false,
};
});
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -292,6 +291,7 @@ describe('Stream', () => {
hasBacklog: false,
};
});
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -337,6 +337,7 @@ describe('Stream', () => {
hasBacklog: false,
};
});
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -382,6 +383,7 @@ describe('Stream', () => {
hasNext: () => false,
};
});
channel.setOptions = vi.fn();
let messages = new Subject<Types.Message>();
channel.subscribe = vi.fn<any, any>((callback) => {
messages.subscribe((message) => callback(message));
Expand Down Expand Up @@ -439,6 +441,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -488,6 +491,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -535,6 +539,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -586,6 +591,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();

const stream = new Stream({
ably,
Expand Down Expand Up @@ -623,6 +629,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();
ably.channels.release = vi.fn();

const stream = new Stream({
Expand Down Expand Up @@ -656,6 +663,7 @@ describe('Stream', () => {
hasNext: () => false,
}),
);
channel.setOptions = vi.fn();
ably.channels.release = vi.fn();

let fail: (...args: any[]) => void = () => {
Expand Down
6 changes: 5 additions & 1 deletion src/stream/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
);
this.middleware.subscribe(this.onMiddlewareMessage.bind(this));

this.ablyChannel = this.ably.channels.get(this.options.channelName, { params: { agent: `models/${VERSION}` } });
this.ablyChannel = this.ably.channels.get(this.options.channelName);

if (this.ablyChannel.state !== 'attached' && this.ablyChannel.state !== 'attaching') {
this.ablyChannel.setOptions({ params: { agent: `models/${VERSION}` } });
}
this.ablyChannel.on('failed', (change) => {
this.dispose(change.reason);
this.subscriptions.error(new Error('Stream failed: ' + change.reason));
Expand Down
Loading