From fbb282a801355af3fbc1ccceb87031141316e69a Mon Sep 17 00:00:00 2001 From: Josip Milovac Date: Tue, 2 May 2023 12:45:53 +1000 Subject: [PATCH] improved rdf store, fixed reconnect crash, made rdf store accessable easily from the blockchain --- blockchain/blockchain.js | 121 +++++----- blockchain/rds-store.js | 474 +++++++++++++++++++++++++++++++++++++ network/blockchain-prop.js | 23 +- wallet/wallet-app.js | 11 +- 4 files changed, 560 insertions(+), 69 deletions(-) create mode 100644 blockchain/rds-store.js diff --git a/blockchain/blockchain.js b/blockchain/blockchain.js index f36997c..4fb45aa 100644 --- a/blockchain/blockchain.js +++ b/blockchain/blockchain.js @@ -1,5 +1,4 @@ const Block = require('./block'); -const N3 = require('n3'); const DataFactory = require('n3').DataFactory; const Payment = require('./payment'); const SensorRegistration = require('./sensor-registration'); @@ -8,6 +7,7 @@ const Integration = require('./integration'); const Compensation = require('./compensation'); const fs = require('fs'); const ChainUtil = require('../chain-util'); +const RdsStore = require('./rds-store'); const { MINING_REWARD} = require('../constants'); @@ -121,18 +121,22 @@ class Updater { this.sensors = {}; this.brokers = {}; this.integrations = {}; - this.store = new N3.Store(); + this.store = new RdsStore; + + this.store.startPush(); if (block !== null) { - this.store.addQuad( - DataFactory.namedNode(this.block.hash), - DataFactory.namedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), - DataFactory.namedNode("http://SSM/Block")); + this.store.push( + DataFactory.quad( + DataFactory.namedNode(this.block.hash), + DataFactory.namedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + DataFactory.namedNode("http://SSM/Block"))); - this.store.addQuad( - DataFactory.namedNode(this.block.hash), - DataFactory.namedNode("http://SSM/lastBlock"), - DataFactory.namedNode(this.parent.getBlockFromTop(0).hash)); + this.store.push( + DataFactory.quad( + DataFactory.namedNode(this.block.hash), + DataFactory.namedNode("http://SSM/lastBlock"), + DataFactory.namedNode(this.parent.getBlockFromTop(0).hash))); } } @@ -207,7 +211,7 @@ class Updater { this.parent.sensors.add(this.sensors); this.parent.brokers.add(this.brokers); this.parent.integrations.add(this.integrations); - this.parent.stores.push(this.store); + this.store.pushInto(this.parent.store); this.parent = null; } @@ -230,7 +234,7 @@ class Chain { this.brokers = new PropertyHistory(parent.brokers); this.integrations = new PropertyHistory(parent.integrations); } - this.stores = []; + this.store = new RdsStore(); } getBlockFromTop(i) { @@ -316,7 +320,7 @@ class Chain { this.sensors.undo(); this.brokers.undo(); this.integrations.undo(); - this.stores.pop(); + this.store.pop(); } clone() { @@ -326,7 +330,7 @@ class Chain { cloned.sensors = this.sensors.clone(); cloned.brokers = this.brokers.clone(); cloned.integrations = this.integrations.clone(); - cloned.stores = [...this.stores]; + cloned.store = this.store.clone(); return cloned; } @@ -340,18 +344,18 @@ class Chain { this.sensors.finish(); this.brokers.finish(); this.integrations.finish(); - this.parent.stores.push(...this.stores); - + this.store.pushInto(this.parent.store); this.parent = null; } } function addRDF(store, metadata) { for (const triple of metadata) { - store.addQuad(DataFactory.quad( - DataFactory.namedNode(triple.s), - DataFactory.namedNode(triple.p), - DataFactory.namedNode(triple.o))); + store.push( + DataFactory.quad( + DataFactory.namedNode(triple.s), + DataFactory.namedNode(triple.p), + DataFactory.namedNode(triple.o))); } } @@ -608,22 +612,26 @@ function stepSensorRegistration(updater, reward, sensorRegistration) { addRDF(updater.store, sensorRegistration.metadata); const newSensor = extInfo.metadata; - updater.store.addQuad( - DataFactory.namedNode(newSensor.sensorName), - DataFactory.namedNode("http://SSM/transactionCounter"), - DataFactory.literal(sensorRegistration.counter)); - updater.store.addQuad( - DataFactory.namedNode(newSensor.sensorName), - DataFactory.namedNode("http://SSM/OwnedBy"), - DataFactory.namedNode("http://SSM/Wallet/" + sensorRegistration.input)); - updater.store.addQuad( - DataFactory.namedNode(updater.block.hash), - DataFactory.namedNode("http://SSM/Transaction"), - DataFactory.namedNode(newSensor.sensorName)); - updater.store.addQuad( - DataFactory.namedNode(updater.block.hash), - DataFactory.namedNode("http://SSM/SensorRegistration"), - DataFactory.namedNode(newSensor.sensorName)); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(newSensor.sensorName), + DataFactory.namedNode("http://SSM/transactionCounter"), + DataFactory.literal(sensorRegistration.counter))); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(newSensor.sensorName), + DataFactory.namedNode("http://SSM/OwnedBy"), + DataFactory.namedNode("http://SSM/Wallet/" + sensorRegistration.input))); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(updater.block.hash), + DataFactory.namedNode("http://SSM/Transaction"), + DataFactory.namedNode(newSensor.sensorName))); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(updater.block.hash), + DataFactory.namedNode("http://SSM/SensorRegistration"), + DataFactory.namedNode(newSensor.sensorName))); newSensor.counter = sensorRegistration.counter; updater.setSensor(newSensor.sensorName, newSensor); @@ -679,22 +687,26 @@ function stepBrokerRegistration(updater, reward, brokerRegistration) { const newBroker = extInfo.metadata; newBroker.input = brokerRegistration.input; - updater.store.addQuad( - DataFactory.namedNode(newBroker.brokerName), - DataFactory.namedNode("http://SSM/transactionCounter"), - DataFactory.literal(brokerRegistration.counter)); - updater.store.addQuad( - DataFactory.namedNode(newBroker.brokerName), - DataFactory.namedNode("http://SSM/OwnedBy"), - DataFactory.namedNode("http://SSM/Wallet/" + brokerRegistration.input)); - updater.store.addQuad( - DataFactory.namedNode(updater.block.hash), - DataFactory.namedNode("http://SSM/Transaction"), - DataFactory.namedNode(newBroker.brokerName)); - updater.store.addQuad( - DataFactory.namedNode(updater.block.hash), - DataFactory.namedNode("http://SSM/BrokerRegistration"), - DataFactory.namedNode(newBroker.brokerName)); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(newBroker.brokerName), + DataFactory.namedNode("http://SSM/transactionCounter"), + DataFactory.literal(brokerRegistration.counter))); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(newBroker.brokerName), + DataFactory.namedNode("http://SSM/OwnedBy"), + DataFactory.namedNode("http://SSM/Wallet/" + brokerRegistration.input))); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(updater.block.hash), + DataFactory.namedNode("http://SSM/Transaction"), + DataFactory.namedNode(newBroker.brokerName))); + updater.store.push( + DataFactory.quad( + DataFactory.namedNode(updater.block.hash), + DataFactory.namedNode("http://SSM/BrokerRegistration"), + DataFactory.namedNode(newBroker.brokerName))); newBroker.counter = brokerRegistration.counter; updater.setBroker(newBroker.brokerName, newBroker); @@ -857,7 +869,6 @@ function findBlocksDifference(oldBlocks, newBlocks) { }; } - function saveToDisk(blockchain, location) { try { fs.writeFileSync( @@ -1025,6 +1036,10 @@ class Blockchain { addListener(listener) { this.listeners.push(listener); } + + rdfSource() { + return this.chain.store; + } } module.exports = Blockchain; \ No newline at end of file diff --git a/blockchain/rds-store.js b/blockchain/rds-store.js new file mode 100644 index 0000000..611d044 --- /dev/null +++ b/blockchain/rds-store.js @@ -0,0 +1,474 @@ +const Stream = require("stream"); +const DataFactory = require('n3').DataFactory; + +//class NamedNode { +// constructor(value) { +// this.termType = "NamedNode"; +// this.value = value; +// } +// equals(term) { +// if (typeof term === "undefined" || term === null) { +// return false; +// } + +// if (term.termType !== this.termType) { +// return false; +// } + +// return term.value === this.value; +// } +//}; + +//class BlankNode { +// constructor(value) { +// this.termType = "BlankNode"; +// this.value = value; +// } +// equals(term) { +// if (typeof term === "undefined" || term === null) { +// return false; +// } + +// if (term.termType !== this.termType) { +// return false; +// } + +// return term.value === this.value; +// } +//}; + +//class Literal { +// constructor(value, language, dataType) { +// this.termType = "Literal"; +// this.value = value; +// this.language = language; +// this.dataType = dataType; +// } + +// equals(term) { +// if (typeof term === "undefined" || term === null) { +// return false; +// } + +// if (term.termType !== this.termType) { +// return false; +// } + +// if (term.value !== this.value) { +// return false; +// } + +// if (term.language !== this.language) { +// return false; +// } + +// return term.dataType.equals(this.dataType); +// } +//}; + +//class Variable { +// constructor(value) { +// this.termType = "Variable"; +// this.value = value; +// } + +// equals(term) { +// if (typeof term === "undefined" || term === null) { +// return false; +// } + +// if (term.termType !== this.termType) { +// return false; +// } + +// return term.value === this.value; +// } +//}; + +//class DefaultGraph { +// constructor() { +// this.termType = "DefaultGraph"; +// this.value = ""; +// } +// equals(term) { +// if (typeof term === "undefined" || term === null) { +// return false; +// } + +// return term.termType === this.termType; +// } +//}; + +//function nodeUnlink(prev, next) { +// //if next is the same as prev, we are the last link in the list +// if (this.prev === this.next) { +// this.prev.cleanup(); +// } + +// prev.next = next; +// next.prev = prev; +// prev = null; +// next = null; +//} + +//class ListNode { +// constructor() { +// this.next = null; +// this.prev = null; +// this.counter = null; +// } + +// addAfter(node, counter) { +// this.next = node.next; +// this.prev = node; +// node.next = this; +// this.next.prev = this; +// this.counter = counter; +// } +//} + +//class ListIterator { +// constructor(parent) { +// this.parent = parent; +// this.on = this.parent.next; +// } + +// next() { +// if (this.on === this.parent) { +// return { +// done: true +// }; +// } + +// const returning = this.on; +// this.on = this.on.next; +// } +//} + +//class ListHead { +// constructor(key, parentMap) { +// this.next = this; +// this.prev = this; +// this.parent = parentMap; +// this.key = key; +// } + +// cleanup() { +// this.parent.delete(this.key); +// } + +// *[Symbol.iterator]() { +// yield 1; +// yield 2; +// yield 3; +// } +//} + +//class Quad { +// constructor(subject, predicate, _object, graph) { +// this.termType = "Quad"; +// this.value = ""; +// this.subject = subject; +// this.predicate = predicate; +// this._object = _object; +// this.graph = graph; + +// this.subjectNext = null; +// this.subjectPrev = null; + +// this.predicateNext = null; +// this.predicatePrev = null; + +// this.objectNext = null; +// this.objectPrev = null; + +// this.graphNext = null; +// this.graphPrev = null; + +// this.globalNext = null; +// this.globalPrev = null; +// } +// equals(term) { +// if (typeof term === "undefined" || term === null) { +// return false; +// } + +// if (term.termType !== this.termType) { +// return false; +// } + +// if (!term.subject.equals(this.subject)) { +// return false; +// } + +// if (!term.predicate.equals(this.predicate)) { +// return false; +// } + +// if (!term._object.equals(this._object)) { +// return false; +// } + +// if (!term.graph.equals(this.graph)) { +// return false; +// } + +// return true; +// } +// unlink() { +// nodeUnlink(this.subjectPrev, this.subjectNext); +// nodeUnlink(this.predicatePrev, this.predicateNext); +// nodeUnlink(this.objectPrev, this.objectNext); +// nodeUnlink(this.graphPrev, this.graphNext); +// } +//}; + +//class DataFactory { +// constructor() { +// this.blankCounter = 0; +// } +// namedNode(value) { +// return NamedNode(value); +// } +// blankNode(value) { +// if (typeof value === "undefined") { +// value = "blank" + this.blankCounter.toString(); +// this.blankCounter++; +// } + +// return new BlankNode(value); +// } +// literal(value, languageOrDataType) { +// if (languageOrDataType instanceof NamedNode) { +// return new Literal(value, "", languageOrDataType); +// } else { +// return new Literal(value, languageOrDataType, +// this.namedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#langString")); +// } +// } +// variable(value) { +// return new Variable(value); +// } +// defaultGraph() { +// return new DefaultGraph(); +// } +// quad(subject, predicate, _object, graph) { +// if (typeof graph === "undefined" || graph === null) { +// return new Quad(subject, predicate, _object, new DefaultGraph()); +// } else { +// return new Quad(subject, predicate, _object, graph); +// } +// } +// fromTerm(term) { +// switch (term.termType) { +// case "NamedNode": +// return this.namedNode(term.value); +// case "BlankNode": +// return this.blankNode(term.value); +// case "Literal": +// return new Literal(term.value, term.language, this.fromTerm(term.dataType)); +// case "Variable": +// return this.variable(term.value); +// case "DefaultGraph": +// return this.defaultGraph(); +// case "Quad": +// return this.fromQuad(term); +// default: +// throw new Error("Unknown term"); +// } + +// fromQuad(quad) { +// return this.quad( +// this.fromTerm(term.subject), +// this.fromTerm(term.predicate), +// this.fromTerm(term._object), +// this.fromTerm(term.graph)); +// } +// } +//}; + +function addQuadToMap(counter, map, key, quad, toPop) { + let quadMap = null; + let popper = null; + + if (toPop.has(key)) { + popper = toPop.get(key); + } else { + popper = { + delete: false, + removing: [] + }; + toPop.set(key, popper); + } + + if (map.has(key)) { + quadMap = map.get(key); + popper.removing.push(counter); + } else { + quadMap = new Map(); + map.set(key, quadMap); + popper.delete = true; + } + quadMap.set(counter, quad); +} + +function popFromSource(list, map) { + for (const [key, popper] of list) { + if (popper.delete) { + map.delete(key) + } else { + const keyMap = map.get(key); + for (const counter of popper.removing) { + keyMap.delete(counter); + } + } + } +} + +function cloneTermMap(from, to) { + for (const [key, map] of from) { + const adding = new Map(); + for (const [counter, quad] of map) { + adding.set(counter, quad); + } + to.set(key, adding); + } +} + +class Source { + constructor() { + this.subjects = new Map(); + this.predicates = new Map(); + this.objects = new Map(); + this.graphs = new Map(); + this.all = []; + this.pop = []; + this.counter = 0; + } + + startPush() { + this.pop.push({ + subjects: new Map(), + predicates: new Map(), + objects: new Map(), + graphs: new Map(), + count: 0 + }); + } + + push(quad) { + const toPop = this.pop[this.pop.length - 1]; + + addQuadToMap(this.counter, this.subjects, quad.subject.value, quad, toPop.subjects); + addQuadToMap(this.counter, this.predicates, quad.predicate.value, quad, toPop.predicates); + addQuadToMap(this.counter, this.objects, quad._object.value, quad, toPop.objects); + addQuadToMap(this.counter, this.graphs, quad.graph.value, quad, toPop.graphs); + this.all.push(quad); + toPop.count++; + this.counter++; + } + + pop() { + if (this.pop.length === 0) { + throw new Error("Nothing to pop"); + } + + const toPop = this.pop.pop(); + + this.all.slice(0, -toPop.count); + + popFromSource(toPop.subjects, this.subjects); + popFromSource(toPop.predicates, this.predicates); + popFromSource(toPop.objects, this.objects); + popFromSource(toPop.graphs, this.graphs); + } + + //as we always insert at the front of the list, elements are sorted by descending insertion time, + //which means by descending counter + //we can then walk through each list of found nodes, only stopping on equal counters + match(subject, predicate, _object, graph) { + const maps = []; + if (typeof subject !== "undefined" && subject !== null) { + if (this.subjects.has(subject.value)) { + maps.push(this.subjects.get(subject.value)); + } else { + return Stream.Readable.from([]); + } + } + if (typeof predicate !== "undefined" && predicate !== null) { + if (this.predicates.has(predicate.value)) { + maps.push(this.predicates.get(predicate.value)); + } else { + return Stream.Readable.from([]); + } + } + if (typeof _object !== "undefined" && _object !== null) { + if (this.objects.has(_object.value)) { + maps.push(this.objects.get(_object.value)); + } else { + return Stream.Readable.from([]); + } + } + if (typeof graph !== "undefined" && graph !== null) { + if (this.graphs.has(graph.value)) { + maps.push(this.graphs.get(graph.value)); + } else { + return Stream.Readable.from([]); + } + } + + if (maps.length === 0) { + return Stream.Readable.from(this.all); + } + + const working = []; + + for (const [counter, quad] of maps[0]) { + working.push({ + counter: counter, + quad: quad + }); + } + + for (let i = 1; i < maps.length; i++) { + for (let j = 0; j < working.length;) { + if (!maps[i].has(working[j].counter)) { + working[j] = working[working.length - 1]; + working.pop(); + } else { + j++ + } + } + } + + return Stream.Readable.from(working.map(work => work.quad)); + } + + clone() { + const returning = new Source(); + + cloneTermMap(this.subjects, returning.subjects); + cloneTermMap(this.predicates, returning.predicates); + cloneTermMap(this.objects, returning.objects); + cloneTermMap(this.graphs, returning.graphs); + + this.all.forEach(item => returning.all.push(item)); + this.pop.forEach(item => returning.pop.push(item)); + returning.counter = this.counter; + + return returning; + } + + pushInto(parent) { + let on = 0; + for (const toPop of this.pop) { + parent.startPush(); + for (const quad of this.all.slice(on, on + toPop.count)) { + parent.push(quad); + } + on += toPop.count; + } + } +}; + +module.exports = Source; \ No newline at end of file diff --git a/network/blockchain-prop.js b/network/blockchain-prop.js index 3a5765b..b6f1b1f 100644 --- a/network/blockchain-prop.js +++ b/network/blockchain-prop.js @@ -44,15 +44,15 @@ class Connection { this.socket = socket; this.state = STATE_RUNNING; - this.socket.on("error", () => { + this.socket.addEventListener("error", () => { this.onError(); }); - this.socket.on("open", () => { + this.socket.addEventListener("open", () => { this.onConnection(); }); - this.socket.on("message", (data) => { + this.socket.addEventListener("message", (data) => { this.onMessage(data); }); @@ -60,22 +60,25 @@ class Connection { } connect(address) { - console.log(`${this.logName} connecting`); this.address = address; this.state = STATE_CONNECTING; this.reconnectWait = 1; - this.socket = new Websocket(this.address); + this.reconnect(); + } - this.socket.on("error", () => { + reconnect() { + console.log(`${this.logName} connecting`); + this.socket = new Websocket(this.address); + this.socket.addEventListener("error", () => { this.onError(); }); - this.socket.on("open", () => { + this.socket.addEventListener("open", () => { this.onConnection(); }); - this.socket.on("message", (data) => { + this.socket.addEventListener("message", (data) => { this.onMessage(data); }); } @@ -88,7 +91,7 @@ class Connection { switch (this.state) { case STATE_CONNECTING: //this.reconnectWait seconds + random [0,1000] ms - setTimeout(() => this.socket = new Websocket(this.address), + setTimeout(() => this.reconnect(), 1000 * this.reconnectWait + Math.floor(Math.random() * 1000)); this.reconnectWait *= 2; if (this.reconnectWait > 64) { @@ -104,7 +107,7 @@ class Connection { if (this.address !== null) { this.state = STATE_CONNECTING; this.reconnectWait = 1; - this.socket = new Websocket(this.address); + this.reconnect(); } else { //do nothing? } diff --git a/wallet/wallet-app.js b/wallet/wallet-app.js index fbb267a..6edce30 100644 --- a/wallet/wallet-app.js +++ b/wallet/wallet-app.js @@ -293,12 +293,12 @@ const myEngine = new QueryEngine(); app.post('/sparql', (req, res) => { const start = async function () { try { - let result = []; + const result = []; const bindingsStream = await myEngine.queryBindings( req.body.query, { readOnly: true, - sources: blockchain.chain.stores + sources: [blockchain.rdfSource()] }); bindingsStream.on('data', (binding) => { result.push(binding.entries); @@ -307,14 +307,13 @@ app.post('/sparql', (req, res) => { res.json(result); }); bindingsStream.on('error', (err) => { - console.error(err); + res.json(err); }); } catch (err) { console.error(err); - res.json("Error occured while querying"); + res.json(err); } }; - start() - + start(); }); \ No newline at end of file