improved rdf store, fixed reconnect crash, made rdf store accessable easily from the blockchain

This commit is contained in:
Josip Milovac 2023-05-02 12:45:53 +10:00
parent 050e69e23f
commit fbb282a801
4 changed files with 560 additions and 69 deletions

View file

@ -1,5 +1,4 @@
const Block = require('./block'); const Block = require('./block');
const N3 = require('n3');
const DataFactory = require('n3').DataFactory; const DataFactory = require('n3').DataFactory;
const Payment = require('./payment'); const Payment = require('./payment');
const SensorRegistration = require('./sensor-registration'); const SensorRegistration = require('./sensor-registration');
@ -8,6 +7,7 @@ const Integration = require('./integration');
const Compensation = require('./compensation'); const Compensation = require('./compensation');
const fs = require('fs'); const fs = require('fs');
const ChainUtil = require('../chain-util'); const ChainUtil = require('../chain-util');
const RdsStore = require('./rds-store');
const { const {
MINING_REWARD} = require('../constants'); MINING_REWARD} = require('../constants');
@ -121,18 +121,22 @@ class Updater {
this.sensors = {}; this.sensors = {};
this.brokers = {}; this.brokers = {};
this.integrations = {}; this.integrations = {};
this.store = new N3.Store(); this.store = new RdsStore;
this.store.startPush();
if (block !== null) { if (block !== null) {
this.store.addQuad( this.store.push(
DataFactory.quad(
DataFactory.namedNode(this.block.hash), DataFactory.namedNode(this.block.hash),
DataFactory.namedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), DataFactory.namedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
DataFactory.namedNode("http://SSM/Block")); DataFactory.namedNode("http://SSM/Block")));
this.store.addQuad( this.store.push(
DataFactory.quad(
DataFactory.namedNode(this.block.hash), DataFactory.namedNode(this.block.hash),
DataFactory.namedNode("http://SSM/lastBlock"), DataFactory.namedNode("http://SSM/lastBlock"),
DataFactory.namedNode(this.parent.getBlockFromTop(0).hash)); DataFactory.namedNode(this.parent.getBlockFromTop(0).hash)));
} }
} }
@ -207,7 +211,7 @@ class Updater {
this.parent.sensors.add(this.sensors); this.parent.sensors.add(this.sensors);
this.parent.brokers.add(this.brokers); this.parent.brokers.add(this.brokers);
this.parent.integrations.add(this.integrations); this.parent.integrations.add(this.integrations);
this.parent.stores.push(this.store); this.store.pushInto(this.parent.store);
this.parent = null; this.parent = null;
} }
@ -230,7 +234,7 @@ class Chain {
this.brokers = new PropertyHistory(parent.brokers); this.brokers = new PropertyHistory(parent.brokers);
this.integrations = new PropertyHistory(parent.integrations); this.integrations = new PropertyHistory(parent.integrations);
} }
this.stores = []; this.store = new RdsStore();
} }
getBlockFromTop(i) { getBlockFromTop(i) {
@ -316,7 +320,7 @@ class Chain {
this.sensors.undo(); this.sensors.undo();
this.brokers.undo(); this.brokers.undo();
this.integrations.undo(); this.integrations.undo();
this.stores.pop(); this.store.pop();
} }
clone() { clone() {
@ -326,7 +330,7 @@ class Chain {
cloned.sensors = this.sensors.clone(); cloned.sensors = this.sensors.clone();
cloned.brokers = this.brokers.clone(); cloned.brokers = this.brokers.clone();
cloned.integrations = this.integrations.clone(); cloned.integrations = this.integrations.clone();
cloned.stores = [...this.stores]; cloned.store = this.store.clone();
return cloned; return cloned;
} }
@ -340,15 +344,15 @@ class Chain {
this.sensors.finish(); this.sensors.finish();
this.brokers.finish(); this.brokers.finish();
this.integrations.finish(); this.integrations.finish();
this.parent.stores.push(...this.stores); this.store.pushInto(this.parent.store);
this.parent = null; this.parent = null;
} }
} }
function addRDF(store, metadata) { function addRDF(store, metadata) {
for (const triple of metadata) { for (const triple of metadata) {
store.addQuad(DataFactory.quad( store.push(
DataFactory.quad(
DataFactory.namedNode(triple.s), DataFactory.namedNode(triple.s),
DataFactory.namedNode(triple.p), DataFactory.namedNode(triple.p),
DataFactory.namedNode(triple.o))); DataFactory.namedNode(triple.o)));
@ -608,22 +612,26 @@ function stepSensorRegistration(updater, reward, sensorRegistration) {
addRDF(updater.store, sensorRegistration.metadata); addRDF(updater.store, sensorRegistration.metadata);
const newSensor = extInfo.metadata; const newSensor = extInfo.metadata;
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(newSensor.sensorName), DataFactory.namedNode(newSensor.sensorName),
DataFactory.namedNode("http://SSM/transactionCounter"), DataFactory.namedNode("http://SSM/transactionCounter"),
DataFactory.literal(sensorRegistration.counter)); DataFactory.literal(sensorRegistration.counter)));
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(newSensor.sensorName), DataFactory.namedNode(newSensor.sensorName),
DataFactory.namedNode("http://SSM/OwnedBy"), DataFactory.namedNode("http://SSM/OwnedBy"),
DataFactory.namedNode("http://SSM/Wallet/" + sensorRegistration.input)); DataFactory.namedNode("http://SSM/Wallet/" + sensorRegistration.input)));
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(updater.block.hash), DataFactory.namedNode(updater.block.hash),
DataFactory.namedNode("http://SSM/Transaction"), DataFactory.namedNode("http://SSM/Transaction"),
DataFactory.namedNode(newSensor.sensorName)); DataFactory.namedNode(newSensor.sensorName)));
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(updater.block.hash), DataFactory.namedNode(updater.block.hash),
DataFactory.namedNode("http://SSM/SensorRegistration"), DataFactory.namedNode("http://SSM/SensorRegistration"),
DataFactory.namedNode(newSensor.sensorName)); DataFactory.namedNode(newSensor.sensorName)));
newSensor.counter = sensorRegistration.counter; newSensor.counter = sensorRegistration.counter;
updater.setSensor(newSensor.sensorName, newSensor); updater.setSensor(newSensor.sensorName, newSensor);
@ -679,22 +687,26 @@ function stepBrokerRegistration(updater, reward, brokerRegistration) {
const newBroker = extInfo.metadata; const newBroker = extInfo.metadata;
newBroker.input = brokerRegistration.input; newBroker.input = brokerRegistration.input;
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(newBroker.brokerName), DataFactory.namedNode(newBroker.brokerName),
DataFactory.namedNode("http://SSM/transactionCounter"), DataFactory.namedNode("http://SSM/transactionCounter"),
DataFactory.literal(brokerRegistration.counter)); DataFactory.literal(brokerRegistration.counter)));
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(newBroker.brokerName), DataFactory.namedNode(newBroker.brokerName),
DataFactory.namedNode("http://SSM/OwnedBy"), DataFactory.namedNode("http://SSM/OwnedBy"),
DataFactory.namedNode("http://SSM/Wallet/" + brokerRegistration.input)); DataFactory.namedNode("http://SSM/Wallet/" + brokerRegistration.input)));
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(updater.block.hash), DataFactory.namedNode(updater.block.hash),
DataFactory.namedNode("http://SSM/Transaction"), DataFactory.namedNode("http://SSM/Transaction"),
DataFactory.namedNode(newBroker.brokerName)); DataFactory.namedNode(newBroker.brokerName)));
updater.store.addQuad( updater.store.push(
DataFactory.quad(
DataFactory.namedNode(updater.block.hash), DataFactory.namedNode(updater.block.hash),
DataFactory.namedNode("http://SSM/BrokerRegistration"), DataFactory.namedNode("http://SSM/BrokerRegistration"),
DataFactory.namedNode(newBroker.brokerName)); DataFactory.namedNode(newBroker.brokerName)));
newBroker.counter = brokerRegistration.counter; newBroker.counter = brokerRegistration.counter;
updater.setBroker(newBroker.brokerName, newBroker); updater.setBroker(newBroker.brokerName, newBroker);
@ -857,7 +869,6 @@ function findBlocksDifference(oldBlocks, newBlocks) {
}; };
} }
function saveToDisk(blockchain, location) { function saveToDisk(blockchain, location) {
try { try {
fs.writeFileSync( fs.writeFileSync(
@ -1025,6 +1036,10 @@ class Blockchain {
addListener(listener) { addListener(listener) {
this.listeners.push(listener); this.listeners.push(listener);
} }
rdfSource() {
return this.chain.store;
}
} }
module.exports = Blockchain; module.exports = Blockchain;

474
blockchain/rds-store.js Normal file
View file

@ -0,0 +1,474 @@
const Stream = require("stream");
const DataFactory = require('n3').DataFactory;
//class NamedNode {
// constructor(value) {
// this.termType = "NamedNode";
// this.value = value;
// }
// equals(term) {
// if (typeof term === "undefined" || term === null) {
// return false;
// }
// if (term.termType !== this.termType) {
// return false;
// }
// return term.value === this.value;
// }
//};
//class BlankNode {
// constructor(value) {
// this.termType = "BlankNode";
// this.value = value;
// }
// equals(term) {
// if (typeof term === "undefined" || term === null) {
// return false;
// }
// if (term.termType !== this.termType) {
// return false;
// }
// return term.value === this.value;
// }
//};
//class Literal {
// constructor(value, language, dataType) {
// this.termType = "Literal";
// this.value = value;
// this.language = language;
// this.dataType = dataType;
// }
// equals(term) {
// if (typeof term === "undefined" || term === null) {
// return false;
// }
// if (term.termType !== this.termType) {
// return false;
// }
// if (term.value !== this.value) {
// return false;
// }
// if (term.language !== this.language) {
// return false;
// }
// return term.dataType.equals(this.dataType);
// }
//};
//class Variable {
// constructor(value) {
// this.termType = "Variable";
// this.value = value;
// }
// equals(term) {
// if (typeof term === "undefined" || term === null) {
// return false;
// }
// if (term.termType !== this.termType) {
// return false;
// }
// return term.value === this.value;
// }
//};
//class DefaultGraph {
// constructor() {
// this.termType = "DefaultGraph";
// this.value = "";
// }
// equals(term) {
// if (typeof term === "undefined" || term === null) {
// return false;
// }
// return term.termType === this.termType;
// }
//};
//function nodeUnlink(prev, next) {
// //if next is the same as prev, we are the last link in the list
// if (this.prev === this.next) {
// this.prev.cleanup();
// }
// prev.next = next;
// next.prev = prev;
// prev = null;
// next = null;
//}
//class ListNode {
// constructor() {
// this.next = null;
// this.prev = null;
// this.counter = null;
// }
// addAfter(node, counter) {
// this.next = node.next;
// this.prev = node;
// node.next = this;
// this.next.prev = this;
// this.counter = counter;
// }
//}
//class ListIterator {
// constructor(parent) {
// this.parent = parent;
// this.on = this.parent.next;
// }
// next() {
// if (this.on === this.parent) {
// return {
// done: true
// };
// }
// const returning = this.on;
// this.on = this.on.next;
// }
//}
//class ListHead {
// constructor(key, parentMap) {
// this.next = this;
// this.prev = this;
// this.parent = parentMap;
// this.key = key;
// }
// cleanup() {
// this.parent.delete(this.key);
// }
// *[Symbol.iterator]() {
// yield 1;
// yield 2;
// yield 3;
// }
//}
//class Quad {
// constructor(subject, predicate, _object, graph) {
// this.termType = "Quad";
// this.value = "";
// this.subject = subject;
// this.predicate = predicate;
// this._object = _object;
// this.graph = graph;
// this.subjectNext = null;
// this.subjectPrev = null;
// this.predicateNext = null;
// this.predicatePrev = null;
// this.objectNext = null;
// this.objectPrev = null;
// this.graphNext = null;
// this.graphPrev = null;
// this.globalNext = null;
// this.globalPrev = null;
// }
// equals(term) {
// if (typeof term === "undefined" || term === null) {
// return false;
// }
// if (term.termType !== this.termType) {
// return false;
// }
// if (!term.subject.equals(this.subject)) {
// return false;
// }
// if (!term.predicate.equals(this.predicate)) {
// return false;
// }
// if (!term._object.equals(this._object)) {
// return false;
// }
// if (!term.graph.equals(this.graph)) {
// return false;
// }
// return true;
// }
// unlink() {
// nodeUnlink(this.subjectPrev, this.subjectNext);
// nodeUnlink(this.predicatePrev, this.predicateNext);
// nodeUnlink(this.objectPrev, this.objectNext);
// nodeUnlink(this.graphPrev, this.graphNext);
// }
//};
//class DataFactory {
// constructor() {
// this.blankCounter = 0;
// }
// namedNode(value) {
// return NamedNode(value);
// }
// blankNode(value) {
// if (typeof value === "undefined") {
// value = "blank" + this.blankCounter.toString();
// this.blankCounter++;
// }
// return new BlankNode(value);
// }
// literal(value, languageOrDataType) {
// if (languageOrDataType instanceof NamedNode) {
// return new Literal(value, "", languageOrDataType);
// } else {
// return new Literal(value, languageOrDataType,
// this.namedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#langString"));
// }
// }
// variable(value) {
// return new Variable(value);
// }
// defaultGraph() {
// return new DefaultGraph();
// }
// quad(subject, predicate, _object, graph) {
// if (typeof graph === "undefined" || graph === null) {
// return new Quad(subject, predicate, _object, new DefaultGraph());
// } else {
// return new Quad(subject, predicate, _object, graph);
// }
// }
// fromTerm(term) {
// switch (term.termType) {
// case "NamedNode":
// return this.namedNode(term.value);
// case "BlankNode":
// return this.blankNode(term.value);
// case "Literal":
// return new Literal(term.value, term.language, this.fromTerm(term.dataType));
// case "Variable":
// return this.variable(term.value);
// case "DefaultGraph":
// return this.defaultGraph();
// case "Quad":
// return this.fromQuad(term);
// default:
// throw new Error("Unknown term");
// }
// fromQuad(quad) {
// return this.quad(
// this.fromTerm(term.subject),
// this.fromTerm(term.predicate),
// this.fromTerm(term._object),
// this.fromTerm(term.graph));
// }
// }
//};
function addQuadToMap(counter, map, key, quad, toPop) {
let quadMap = null;
let popper = null;
if (toPop.has(key)) {
popper = toPop.get(key);
} else {
popper = {
delete: false,
removing: []
};
toPop.set(key, popper);
}
if (map.has(key)) {
quadMap = map.get(key);
popper.removing.push(counter);
} else {
quadMap = new Map();
map.set(key, quadMap);
popper.delete = true;
}
quadMap.set(counter, quad);
}
function popFromSource(list, map) {
for (const [key, popper] of list) {
if (popper.delete) {
map.delete(key)
} else {
const keyMap = map.get(key);
for (const counter of popper.removing) {
keyMap.delete(counter);
}
}
}
}
function cloneTermMap(from, to) {
for (const [key, map] of from) {
const adding = new Map();
for (const [counter, quad] of map) {
adding.set(counter, quad);
}
to.set(key, adding);
}
}
class Source {
constructor() {
this.subjects = new Map();
this.predicates = new Map();
this.objects = new Map();
this.graphs = new Map();
this.all = [];
this.pop = [];
this.counter = 0;
}
startPush() {
this.pop.push({
subjects: new Map(),
predicates: new Map(),
objects: new Map(),
graphs: new Map(),
count: 0
});
}
push(quad) {
const toPop = this.pop[this.pop.length - 1];
addQuadToMap(this.counter, this.subjects, quad.subject.value, quad, toPop.subjects);
addQuadToMap(this.counter, this.predicates, quad.predicate.value, quad, toPop.predicates);
addQuadToMap(this.counter, this.objects, quad._object.value, quad, toPop.objects);
addQuadToMap(this.counter, this.graphs, quad.graph.value, quad, toPop.graphs);
this.all.push(quad);
toPop.count++;
this.counter++;
}
pop() {
if (this.pop.length === 0) {
throw new Error("Nothing to pop");
}
const toPop = this.pop.pop();
this.all.slice(0, -toPop.count);
popFromSource(toPop.subjects, this.subjects);
popFromSource(toPop.predicates, this.predicates);
popFromSource(toPop.objects, this.objects);
popFromSource(toPop.graphs, this.graphs);
}
//as we always insert at the front of the list, elements are sorted by descending insertion time,
//which means by descending counter
//we can then walk through each list of found nodes, only stopping on equal counters
match(subject, predicate, _object, graph) {
const maps = [];
if (typeof subject !== "undefined" && subject !== null) {
if (this.subjects.has(subject.value)) {
maps.push(this.subjects.get(subject.value));
} else {
return Stream.Readable.from([]);
}
}
if (typeof predicate !== "undefined" && predicate !== null) {
if (this.predicates.has(predicate.value)) {
maps.push(this.predicates.get(predicate.value));
} else {
return Stream.Readable.from([]);
}
}
if (typeof _object !== "undefined" && _object !== null) {
if (this.objects.has(_object.value)) {
maps.push(this.objects.get(_object.value));
} else {
return Stream.Readable.from([]);
}
}
if (typeof graph !== "undefined" && graph !== null) {
if (this.graphs.has(graph.value)) {
maps.push(this.graphs.get(graph.value));
} else {
return Stream.Readable.from([]);
}
}
if (maps.length === 0) {
return Stream.Readable.from(this.all);
}
const working = [];
for (const [counter, quad] of maps[0]) {
working.push({
counter: counter,
quad: quad
});
}
for (let i = 1; i < maps.length; i++) {
for (let j = 0; j < working.length;) {
if (!maps[i].has(working[j].counter)) {
working[j] = working[working.length - 1];
working.pop();
} else {
j++
}
}
}
return Stream.Readable.from(working.map(work => work.quad));
}
clone() {
const returning = new Source();
cloneTermMap(this.subjects, returning.subjects);
cloneTermMap(this.predicates, returning.predicates);
cloneTermMap(this.objects, returning.objects);
cloneTermMap(this.graphs, returning.graphs);
this.all.forEach(item => returning.all.push(item));
this.pop.forEach(item => returning.pop.push(item));
returning.counter = this.counter;
return returning;
}
pushInto(parent) {
let on = 0;
for (const toPop of this.pop) {
parent.startPush();
for (const quad of this.all.slice(on, on + toPop.count)) {
parent.push(quad);
}
on += toPop.count;
}
}
};
module.exports = Source;

View file

@ -44,15 +44,15 @@ class Connection {
this.socket = socket; this.socket = socket;
this.state = STATE_RUNNING; this.state = STATE_RUNNING;
this.socket.on("error", () => { this.socket.addEventListener("error", () => {
this.onError(); this.onError();
}); });
this.socket.on("open", () => { this.socket.addEventListener("open", () => {
this.onConnection(); this.onConnection();
}); });
this.socket.on("message", (data) => { this.socket.addEventListener("message", (data) => {
this.onMessage(data); this.onMessage(data);
}); });
@ -60,22 +60,25 @@ class Connection {
} }
connect(address) { connect(address) {
console.log(`${this.logName} connecting`);
this.address = address; this.address = address;
this.state = STATE_CONNECTING; this.state = STATE_CONNECTING;
this.reconnectWait = 1; this.reconnectWait = 1;
this.socket = new Websocket(this.address); this.reconnect();
}
this.socket.on("error", () => { reconnect() {
console.log(`${this.logName} connecting`);
this.socket = new Websocket(this.address);
this.socket.addEventListener("error", () => {
this.onError(); this.onError();
}); });
this.socket.on("open", () => { this.socket.addEventListener("open", () => {
this.onConnection(); this.onConnection();
}); });
this.socket.on("message", (data) => { this.socket.addEventListener("message", (data) => {
this.onMessage(data); this.onMessage(data);
}); });
} }
@ -88,7 +91,7 @@ class Connection {
switch (this.state) { switch (this.state) {
case STATE_CONNECTING: case STATE_CONNECTING:
//this.reconnectWait seconds + random [0,1000] ms //this.reconnectWait seconds + random [0,1000] ms
setTimeout(() => this.socket = new Websocket(this.address), setTimeout(() => this.reconnect(),
1000 * this.reconnectWait + Math.floor(Math.random() * 1000)); 1000 * this.reconnectWait + Math.floor(Math.random() * 1000));
this.reconnectWait *= 2; this.reconnectWait *= 2;
if (this.reconnectWait > 64) { if (this.reconnectWait > 64) {
@ -104,7 +107,7 @@ class Connection {
if (this.address !== null) { if (this.address !== null) {
this.state = STATE_CONNECTING; this.state = STATE_CONNECTING;
this.reconnectWait = 1; this.reconnectWait = 1;
this.socket = new Websocket(this.address); this.reconnect();
} else { } else {
//do nothing? //do nothing?
} }

View file

@ -293,12 +293,12 @@ const myEngine = new QueryEngine();
app.post('/sparql', (req, res) => { app.post('/sparql', (req, res) => {
const start = async function () { const start = async function () {
try { try {
let result = []; const result = [];
const bindingsStream = await myEngine.queryBindings( const bindingsStream = await myEngine.queryBindings(
req.body.query, req.body.query,
{ {
readOnly: true, readOnly: true,
sources: blockchain.chain.stores sources: [blockchain.rdfSource()]
}); });
bindingsStream.on('data', (binding) => { bindingsStream.on('data', (binding) => {
result.push(binding.entries); result.push(binding.entries);
@ -307,14 +307,13 @@ app.post('/sparql', (req, res) => {
res.json(result); res.json(result);
}); });
bindingsStream.on('error', (err) => { bindingsStream.on('error', (err) => {
console.error(err); res.json(err);
}); });
} catch (err) { } catch (err) {
console.error(err); console.error(err);
res.json("Error occured while querying"); res.json(err);
} }
}; };
start() start();
}); });