persistence for the blockchain + everything through the chain instead of broadcasting

This commit is contained in:
Josip Milovac 2022-12-05 16:00:26 +11:00
parent 1cb1070d45
commit ea81105df6
5 changed files with 127 additions and 61 deletions

View file

@ -38,7 +38,6 @@ const QueryEngine = require('@comunica/query-sparql').QueryEngine;
const N3 = require('n3'); const N3 = require('n3');
const jsonld = require('jsonld'); const jsonld = require('jsonld');
const DataFactory = require('n3').DataFactory;
var mqtt = require('mqtt'); var mqtt = require('mqtt');
var aedes = require('aedes')(); /* aedes is a stream-based MQTT broker */ var aedes = require('aedes')(); /* aedes is a stream-based MQTT broker */
var MQTTserver = require('net').createServer(aedes.handle); var MQTTserver = require('net').createServer(aedes.handle);
@ -54,11 +53,10 @@ const app = express();
const bc = new Blockchain(); const bc = new Blockchain();
const wallet = new Wallet(); const wallet = new Wallet();
const tp = new TransactionPool(); const tp = new TransactionPool();
const p2pServer = new P2pServer(bc, tp); const p2pServer = new P2pServer(bc, tp,'./persist_block_chain.json');
const miner = new Miner(bc, tp, wallet, p2pServer); const miner = new Miner(bc, tp, wallet, p2pServer);
const parser = new N3.Parser(); //({format: 'application/n-quads'}); const parser = new N3.Parser(); //({format: 'application/n-quads'});
const store = new N3.Store();
const myEngine = new QueryEngine(); const myEngine = new QueryEngine();
app.use(bodyParser.json()); app.use(bodyParser.json());
@ -167,31 +165,15 @@ app.get('/IoTdeviceRegistration', (req, res)=> {
} }
//let SenShaMartOnt = SSNmetadata; //let SenShaMartOnt = SSNmetadata;
//SenShaMartOnt.push(SenSHaMArtExt); */ //SenShaMartOnt.push(SenSHaMArtExt); */
console.log(SenShaMartDesc) console.log(SenShaMartDesc)
jsonld.toRDF(SenShaMartDesc, {format: 'application/n-quads'}, jsonld.toRDF(SenShaMartDesc, {format: 'application/n-quads'},
(err, nquads) => { (err, nquads) => {
console.log(nquads) console.log(nquads)
var metaDataTransaction = wallet.createMetadata( var metaDataTransaction = wallet.createMetadata(
nquads, tp); nquads, tp);
});
parser.parse( });
nquads, res.json("MetadataTransactionCreated");
(error, quadN, prefixes) => {
if (quadN){
store.addQuad(DataFactory.quad(
DataFactory.namedNode(quadN.subject.id),
DataFactory.namedNode(quadN.predicate.id),
DataFactory.namedNode(quadN.object.id)));
}
else {
console.log("# That's all, folks!", prefixes);
}
});
//console.log(metaDataTransaction.SSNmetadata)
p2pServer.broadcastMetadata(metaDataTransaction);
});
});
res.json("MetadataTransactionCreated");
}); });
////////////////////////////////////////////////// //////////////////////////////////////////////////
@ -200,7 +182,7 @@ app.post('/mine', (req, res) => {
const block = bc.addBlock(req.body.data); const block = bc.addBlock(req.body.data);
console.log(`New block added: ${block.toString()}`); console.log(`New block added: ${block.toString()}`);
p2pServer.syncChains(); p2pServer.newBlock(block);
res.redirect('/blocks'); res.redirect('/blocks');
}); });
@ -208,7 +190,7 @@ app.post('/mine', (req, res) => {
app.post('/PaymentTransaction', (req, res) => { app.post('/PaymentTransaction', (req, res) => {
const { recipient, amount } = req.body; const { recipient, amount } = req.body;
const transaction = wallet.createTransaction(recipient, amount, bc, tp); const transaction = wallet.createTransaction(recipient, amount, bc, tp);
p2pServer.broadcastTransaction(transaction); //p2pServer.broadcastTransaction(transaction);
res.redirect('/transactions'); res.redirect('/transactions');
}); });
@ -249,13 +231,14 @@ app.post("/UploadMetafile", upload.single('file'), (req, res) => {
///////////////////// /////////////////////
//Start of comunica sparql query code //Start of comunica sparql query code
app.post('/sparql', (req, res) => { app.post('/sparql', (req, res) => {
console.log(req.body);
const {Select,subject,predicate,object,Limit}= req.body; const {Select,subject,predicate,object,Limit}= req.body;
const start = async function (a,b){ const start = async function (a,b){
const bindingsStream = await myEngine.queryBindings(`SELECT ${Select} WHERE const bindingsStream = await myEngine.queryBindings(`SELECT ${Select} WHERE
{${subject} ${predicate} ${object}} LIMIT {${subject} ${predicate} ${object}} LIMIT
${Limit}`, { sources: [{ type: 'rdfjsSource', ${Limit}`, { sources: [{ type: 'rdfjsSource',
value: store}] value: p2pServer.store}]
}); });
bindingsStream.on('data', (binding) => { bindingsStream.on('data', (binding) => {
console.log(binding.toString()); console.log(binding.toString());

View file

@ -25,9 +25,8 @@ class Miner {
console.log(validTransactions); console.log(validTransactions);
// const validMetadataS = this.transactionPool.metadataS; // const validMetadataS = this.transactionPool.metadataS;
const block = this.blockchain.addBlock([validTransactions, validMetadataS]); const block = this.blockchain.addBlock([validTransactions, validMetadataS]);
this.p2pServer.syncChains(); this.p2pServer.newBlock(block);
this.transactionPool.clear(); this.transactionPool.clear();
this.p2pServer.broadcastClearTransactions();
return block; return block;
} }

View file

@ -1,4 +1,9 @@
const Websocket = require('ws'); const Websocket = require('ws');
const N3 = require('n3');
const parser = new N3.Parser(); //({format: 'application/n-quads'});
const DataFactory = require('n3').DataFactory;
const fs = require('fs');
const process = require('process');
const P2P_PORT = process.env.P2P_PORT || 5000; const P2P_PORT = process.env.P2P_PORT || 5000;
const peers = process.env.PEERS ? process.env.PEERS.split(',') : []; const peers = process.env.PEERS ? process.env.PEERS.split(',') : [];
@ -10,10 +15,21 @@ const MESSAGE_TYPES = {
}; };
class P2pServer { class P2pServer {
constructor(blockchain, transactionPool) { constructor(blockchain, transactionPool,chainStorageLocation) {
this.blockchain = blockchain; this.blockchain = blockchain;
this.transactionPool = transactionPool; this.transactionPool = transactionPool;
this.sockets = []; this.sockets = [];
this.store = new N3.Store();
this.chainStorageLocation = chainStorageLocation;
//possible race if deleted after check, but we live with it I guess
if (fs.existsSync(this.chainStorageLocation)) {
const rawPersistedChain = fs.readFileSync(this.chainStorageLocation, 'utf8');
const chain = JSON.parse(rawPersistedChain);
this.newChain(chain, false);
} else {
console.log("Didn't find a persisted chain, starting from genesis");
}
} }
listen() { listen() {
@ -47,7 +63,7 @@ class P2pServer {
const data = JSON.parse(message); const data = JSON.parse(message);
switch(data.type) { switch(data.type) {
case MESSAGE_TYPES.chain: case MESSAGE_TYPES.chain:
this.blockchain.replaceChain(data.chain); newChain(data.chain);
break; break;
case MESSAGE_TYPES.transaction: case MESSAGE_TYPES.transaction:
this.transactionPool.updateOrAddTransaction(data.transaction); this.transactionPool.updateOrAddTransaction(data.transaction);
@ -62,6 +78,67 @@ class P2pServer {
}); });
} }
newBlock(block) {
this.onNewBlock(block.data);
this.syncChains();
this.persistChain(this.blockchain.chain);
}
newChain(chain,persist) {
if (!this.blockchain.replaceChain(chain)) {
//failed to replace
return;
}
if (typeof persist === "undefined" || persist) {
this.persistChain(chain);
}
//dirty clear
this.store = new N3.Store();
for (var block in this.blockchain.chain) {
this.onNewBlock(block);
}
}
persistChain(chain) {
try {
fs.writeFileSync(
this.chainStorageLocation,
JSON.stringify(chain));
} catch (err) {
console.error("Couldn't persist chain, aborting");
process.exit(-1);
}
}
onNewBlock(block) {
//block data is of form [transactions,metadatas]
if (block.length != 2) {
//assert?
return;
}
const metadatas = block[1];
for (var metadata in metadatas) {
if (!(SSNmetadata in metadata)) {
//assert?
return;
}
var ssn = metadata.SSNmetadata;
parser.parse(
ssn,
(error, quadN, prefixes) => {
if (quadN) {
store.addQuad(DataFactory.quad(
DataFactory.namedNode(quadN.subject.id),
DataFactory.namedNode(quadN.predicate.id),
DataFactory.namedNode(quadN.object.id)));
}
});
}
}
sendChain(socket) { sendChain(socket) {
socket.send(JSON.stringify({ socket.send(JSON.stringify({
type: MESSAGE_TYPES.chain, type: MESSAGE_TYPES.chain,
@ -69,36 +146,36 @@ class P2pServer {
})); }));
} }
sendTransaction(socket, transaction) { //sendTransaction(socket, transaction) {
socket.send(JSON.stringify({ // socket.send(JSON.stringify({
type: MESSAGE_TYPES.transaction, // type: MESSAGE_TYPES.transaction,
transaction // transaction
})); // }));
} //}
sendMetadata(socket, metadata) { //sendMetadata(socket, metadata) {
socket.send(JSON.stringify({ // socket.send(JSON.stringify({
type: MESSAGE_TYPES.metadata, // type: MESSAGE_TYPES.metadata,
metadata // metadata
})); // }));
} //}
syncChains() { syncChains() {
this.sockets.forEach(socket => this.sendChain(socket)); this.sockets.forEach(socket => this.sendChain(socket));
} }
broadcastTransaction(transaction) { //broadcastTransaction(transaction) {
this.sockets.forEach(socket => this.sendTransaction(socket, transaction)); // this.sockets.forEach(socket => this.sendTransaction(socket, transaction));
} //}
broadcastMetadata(metadata) { //broadcastMetadata(metadata) {
this.sockets.forEach(socket => this.sendMetadata(socket, metadata)); // this.sockets.forEach(socket => this.sendMetadata(socket, metadata));
} //}
broadcastClearTransactions() { //broadcastClearTransactions() {
this.sockets.forEach(socket => socket.send(JSON.stringify({ // this.sockets.forEach(socket => socket.send(JSON.stringify({
type: MESSAGE_TYPES.clear_transactions // type: MESSAGE_TYPES.clear_transactions
}))); // })));
} //}
} }
module.exports = P2pServer; module.exports = P2pServer;

View file

@ -24,7 +24,12 @@ class Block {
static genesis() { static genesis() {
return new this('Genesis time', '-----', 'f1r57-h45h', [], 0, DIFFICULTY); return new this('Genesis time', '-----', 'f1r57-h45h', [], 0, DIFFICULTY);
} }
//we want this to eventually be continously running where there are things in the pool,
//however as node is single threaded, this almost has to be a fiber, and yield after every
//other iteration to allow for meaningful forward progress
//we can either add all new transactions into the block as we see them, or stay with the starting list, idk which
//to be done later
static mineBlock(lastBlock, data) { static mineBlock(lastBlock, data) {
let hash, timestamp; let hash, timestamp;
const lastHash = lastBlock.hash; const lastHash = lastBlock.hash;

View file

@ -28,17 +28,19 @@ class Blockchain {
return true; return true;
} }
//return false on failure, true on success
replaceChain(newChain) { replaceChain(newChain) {
if (newChain.length <= this.chain.length) { if (newChain.length <= this.chain.length) {
console.log('Received chain is not longer than the current chain.'); console.log('Received chain is not longer than the current chain.');
return; return false;
} else if (!this.isValidChain(newChain)) { } else if (!this.isValidChain(newChain)) {
console.log('The received chain is not valid.'); console.log('The received chain is not valid.');
return; return false;
} }
console.log('Replacing blockchain with the new chain.'); console.log('Replacing blockchain with the new chain.');
this.chain = newChain; this.chain = newChain;
return true;
} }
} }