-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.ts
51 lines (45 loc) · 1.67 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import { Stream, Writer } from "@rdfc/js-runner";
import { rdfParser } from "rdf-parse";
import { RdfStore } from "rdf-stores";
import { QueryEngine } from "@comunica/query-sparql-rdfjs";
import str from 'string-to-stream';
/**
* rdf-connect processor to map entities with blank node identifiers to equivalents with named node identifiers.
*
* @param incoming The data stream which must be transformed.
* @param outgoing The data stream into which the resulting stream is written.
* @param mime The MIME type of the data stream.
*/
export function processor(
incoming: Stream<string>,
outgoing: Writer<string>,
mime = "text/turtle",
): void {
let count = 0;
incoming.on("data", async (data) => {
count ++;
let store = RdfStore.createDefault();
await new Promise((resolve, reject) => {
store.import(rdfParser.parse(str(data),{
contentType: mime
})).on("end", resolve).on("error", reject);
});
//you can now query the store using comunica
const myEngine = new QueryEngine();
const bindingsStream = await myEngine.queryBindings(`
SELECT ?s ?p ?o WHERE {
?s ?p ?o
} LIMIT 100`, {
sources: [store],
});
const bindings = await bindingsStream.toArray();
console.log(bindings[0]?.get('s')?.value);
// Serialize the quads with named node identifiers.
await outgoing.push('member ' + count + 'processed\n');
});
// If a processor upstream terminates the channel, we propagate this change
// onto the processors downstream.
incoming.on("end", () => {
outgoing.end();
});
}