Skip to content

Commit de000e1

Browse files
authored
Merge pull request #94 from ssb-ngi-pointer/just-live
support and(live()) operations
2 parents c143df2 + e531d1f commit de000e1

File tree

2 files changed

+121
-80
lines changed

2 files changed

+121
-80
lines changed

operators.js

+96-80
Original file line numberDiff line numberDiff line change
@@ -7,42 +7,28 @@ const pullAwaitable = require('pull-awaitable')
77
const cat = require('pull-cat')
88
const { safeFilename } = require('./files')
99

10-
function query(...cbs) {
11-
let res = cbs[0]
12-
for (let i = 1, n = cbs.length; i < n; i++) if (cbs[i]) res = cbs[i](res)
13-
return res
14-
}
10+
//#region Helper functions and util operators
1511

16-
function fromDB(db) {
17-
return {
18-
meta: { db },
12+
function copyMeta(orig, dest) {
13+
if (orig.meta) {
14+
dest.meta = orig.meta
1915
}
2016
}
2117

22-
function toBufferOrFalsy(value) {
23-
if (!value) return value
24-
return Buffer.isBuffer(value) ? value : Buffer.from(value)
25-
}
26-
27-
function seqs(values) {
28-
return {
29-
type: 'SEQS',
30-
seqs: values,
31-
}
18+
function updateMeta(orig, key, value) {
19+
const res = Object.assign({}, orig)
20+
res.meta[key] = value
21+
return res
3222
}
3323

34-
function liveSeqs(pullStream) {
35-
return {
36-
type: 'LIVESEQS',
37-
stream: pullStream,
38-
}
24+
function extractMeta(orig) {
25+
const meta = orig.meta
26+
return meta
3927
}
4028

41-
function offsets(values) {
42-
return {
43-
type: 'OFFSETS',
44-
offsets: values,
45-
}
29+
function toBufferOrFalsy(value) {
30+
if (!value) return value
31+
return Buffer.isBuffer(value) ? value : Buffer.from(value)
4632
}
4733

4834
function seekFromDesc(desc) {
@@ -58,6 +44,40 @@ function seekFromDesc(desc) {
5844
}
5945
}
6046

47+
function query(...cbs) {
48+
let res = cbs[0]
49+
for (let i = 1, n = cbs.length; i < n; i++) if (cbs[i]) res = cbs[i](res)
50+
return res
51+
}
52+
53+
function debug() {
54+
return (ops) => {
55+
const meta = JSON.stringify(ops.meta, (key, val) =>
56+
key === 'db' ? void 0 : val
57+
)
58+
console.log(
59+
'debug',
60+
JSON.stringify(
61+
ops,
62+
(key, val) => {
63+
if (key === 'meta') return void 0
64+
else if (key === 'task' && typeof val === 'function')
65+
return '[Function]'
66+
else if (key === 'value' && val.type === 'Buffer')
67+
return Buffer.from(val.data).toString()
68+
else return val
69+
},
70+
2
71+
),
72+
meta === '{}' ? '' : 'meta: ' + meta
73+
)
74+
return ops
75+
}
76+
}
77+
78+
//#endregion
79+
//#region "Unit operators": they create objects that JITDB interprets
80+
6181
function slowEqual(seekDesc, target, opts) {
6282
opts = opts || {}
6383
const seek = seekFromDesc(seekDesc)
@@ -150,13 +170,6 @@ function includes(seek, target, opts) {
150170
}
151171
}
152172

153-
function not(ops) {
154-
return {
155-
type: 'NOT',
156-
data: [ops],
157-
}
158-
}
159-
160173
function gt(value, indexName) {
161174
if (typeof value !== 'number') throw new Error('gt() needs a number arg')
162175
return {
@@ -201,62 +214,51 @@ function lte(value, indexName) {
201214
}
202215
}
203216

204-
function deferred(task) {
217+
function seqs(values) {
205218
return {
206-
type: 'DEFERRED',
207-
task,
219+
type: 'SEQS',
220+
seqs: values,
208221
}
209222
}
210223

211-
function debug() {
212-
return (ops) => {
213-
const meta = JSON.stringify(ops.meta, (key, val) =>
214-
key === 'db' ? void 0 : val
215-
)
216-
console.log(
217-
'debug',
218-
JSON.stringify(
219-
ops,
220-
(key, val) => {
221-
if (key === 'meta') return void 0
222-
else if (key === 'task' && typeof val === 'function')
223-
return '[Function]'
224-
else if (key === 'value' && val.type === 'Buffer')
225-
return Buffer.from(val.data).toString()
226-
else return val
227-
},
228-
2
229-
),
230-
meta === '{}' ? '' : 'meta: ' + meta
231-
)
232-
return ops
224+
function liveSeqs(pullStream) {
225+
return {
226+
type: 'LIVESEQS',
227+
stream: pullStream,
233228
}
234229
}
235230

236-
function copyMeta(orig, dest) {
237-
if (orig.meta) {
238-
dest.meta = orig.meta
231+
function offsets(values) {
232+
return {
233+
type: 'OFFSETS',
234+
offsets: values,
239235
}
240236
}
241237

242-
function updateMeta(orig, key, value) {
243-
const res = Object.assign({}, orig)
244-
res.meta[key] = value
245-
return res
238+
function deferred(task) {
239+
return {
240+
type: 'DEFERRED',
241+
task,
242+
}
246243
}
247244

248-
function extractMeta(orig) {
249-
const meta = orig.meta
250-
return meta
245+
//#endregion
246+
//#region "Combinator operators": they build composite operations
247+
248+
function not(ops) {
249+
return {
250+
type: 'NOT',
251+
data: [ops],
252+
}
251253
}
252254

253255
function and(...args) {
254-
const rhs = args
255-
.map((arg) => (typeof arg === 'function' ? arg() : arg))
256-
.filter((arg) => !!arg)
257-
return (ops) => {
256+
return (ops, isSpecialOps) => {
257+
const rhs = args
258+
.map((arg) => (typeof arg === 'function' ? arg(ops, true) : arg))
259+
.filter((arg) => !!arg)
258260
const res =
259-
ops && ops.type
261+
ops && ops.type && !isSpecialOps
260262
? {
261263
type: 'AND',
262264
data: [ops, ...rhs],
@@ -273,12 +275,12 @@ function and(...args) {
273275
}
274276

275277
function or(...args) {
276-
const rhs = args
277-
.map((arg) => (typeof arg === 'function' ? arg() : arg))
278-
.filter((arg) => !!arg)
279-
return (ops) => {
278+
return (ops, isSpecialOps) => {
279+
const rhs = args
280+
.map((arg) => (typeof arg === 'function' ? arg(ops, true) : arg))
281+
.filter((arg) => !!arg)
280282
const res =
281-
ops && ops.type
283+
ops && ops.type && !isSpecialOps
282284
? {
283285
type: 'OR',
284286
data: [ops, ...rhs],
@@ -294,6 +296,15 @@ function or(...args) {
294296
}
295297
}
296298

299+
//#endregion
300+
//#region "Special operators": they only update meta
301+
302+
function fromDB(db) {
303+
return {
304+
meta: { db },
305+
}
306+
}
307+
297308
function live(opts) {
298309
if (opts && opts.old) return (ops) => updateMeta(ops, 'live', 'liveAndOld')
299310
else return (ops) => updateMeta(ops, 'live', 'liveOnly')
@@ -311,6 +322,9 @@ function paginate(pageSize) {
311322
return (ops) => updateMeta(ops, 'pageSize', pageSize)
312323
}
313324

325+
//#endregion
326+
//#region "Consumer operators": they execute the query tree
327+
314328
async function executeDeferredOps(ops, meta) {
315329
// Collect all deferred tasks and their object-traversal paths
316330
const allDeferred = []
@@ -420,6 +434,8 @@ function toAsyncIter() {
420434
}
421435
}
422436

437+
//#endregion
438+
423439
module.exports = {
424440
fromDB,
425441
query,

test/operators.js

+25
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,31 @@ prepareAndRunTest('support live operations', dir, (t, db, raf) => {
10331033
})
10341034
})
10351035

1036+
prepareAndRunTest('support live inside and', dir, (t, db, raf) => {
1037+
const msg = { type: 'post', text: 'Testing!' }
1038+
let state = validate.initial()
1039+
state = validate.appendNew(state, null, alice, msg, Date.now())
1040+
state = validate.appendNew(state, null, bob, msg, Date.now() + 1)
1041+
1042+
addMsg(state.queue[0].value, raf, (e1, msg1) => {
1043+
let i = 0
1044+
query(
1045+
fromDB(db),
1046+
and(live({ old: true }), slowEqual('value.content.type', 'post')),
1047+
toPullStream(),
1048+
pull.drain((msg) => {
1049+
if (i++ == 0) {
1050+
t.equal(msg.value.author, alice.id)
1051+
addMsg(state.queue[1].value, raf, (e2, msg2) => {})
1052+
} else {
1053+
t.equal(msg.value.author, bob.id)
1054+
t.end()
1055+
}
1056+
})
1057+
)
1058+
})
1059+
})
1060+
10361061
prepareAndRunTest('support live with not', dir, (t, db, raf) => {
10371062
const msg = { type: 'post', text: 'Testing!' }
10381063
let state = validate.initial()

0 commit comments

Comments
 (0)