diff --git a/app/index.js b/app/index.js index b4f421e..b000015 100644 --- a/app/index.js +++ b/app/index.js @@ -38,7 +38,6 @@ const QueryEngine = require('@comunica/query-sparql').QueryEngine; const N3 = require('n3'); const jsonld = require('jsonld'); -const DataFactory = require('n3').DataFactory; var mqtt = require('mqtt'); var aedes = require('aedes')(); /* aedes is a stream-based MQTT broker */ var MQTTserver = require('net').createServer(aedes.handle); @@ -54,11 +53,10 @@ const app = express(); const bc = new Blockchain(); const wallet = new Wallet(); const tp = new TransactionPool(); -const p2pServer = new P2pServer(bc, tp); +const p2pServer = new P2pServer(bc, tp,'./persist_block_chain.json'); const miner = new Miner(bc, tp, wallet, p2pServer); const parser = new N3.Parser(); //({format: 'application/n-quads'}); -const store = new N3.Store(); const myEngine = new QueryEngine(); app.use(bodyParser.json()); @@ -167,31 +165,15 @@ app.get('/IoTdeviceRegistration', (req, res)=> { } //let SenShaMartOnt = SSNmetadata; //SenShaMartOnt.push(SenSHaMArtExt); */ -console.log(SenShaMartDesc) -jsonld.toRDF(SenShaMartDesc, {format: 'application/n-quads'}, - (err, nquads) => { - console.log(nquads) - var metaDataTransaction = wallet.createMetadata( - nquads, tp); - - parser.parse( - nquads, - (error, quadN, prefixes) => { - if (quadN){ - store.addQuad(DataFactory.quad( - DataFactory.namedNode(quadN.subject.id), - DataFactory.namedNode(quadN.predicate.id), - DataFactory.namedNode(quadN.object.id))); - } - else { - console.log("# That's all, folks!", prefixes); - } - }); - //console.log(metaDataTransaction.SSNmetadata) - p2pServer.broadcastMetadata(metaDataTransaction); -}); -}); - res.json("MetadataTransactionCreated"); + console.log(SenShaMartDesc) + jsonld.toRDF(SenShaMartDesc, {format: 'application/n-quads'}, + (err, nquads) => { + console.log(nquads) + var metaDataTransaction = wallet.createMetadata( + nquads, tp); + }); + }); + res.json("MetadataTransactionCreated"); }); ////////////////////////////////////////////////// @@ -200,7 +182,7 @@ app.post('/mine', (req, res) => { const block = bc.addBlock(req.body.data); console.log(`New block added: ${block.toString()}`); - p2pServer.syncChains(); + p2pServer.newBlock(block); res.redirect('/blocks'); }); @@ -208,7 +190,7 @@ app.post('/mine', (req, res) => { app.post('/PaymentTransaction', (req, res) => { const { recipient, amount } = req.body; const transaction = wallet.createTransaction(recipient, amount, bc, tp); - p2pServer.broadcastTransaction(transaction); + //p2pServer.broadcastTransaction(transaction); res.redirect('/transactions'); }); @@ -249,13 +231,14 @@ app.post("/UploadMetafile", upload.single('file'), (req, res) => { ///////////////////// //Start of comunica sparql query code - app.post('/sparql', (req, res) => { +app.post('/sparql', (req, res) => { + console.log(req.body); const {Select,subject,predicate,object,Limit}= req.body; const start = async function (a,b){ const bindingsStream = await myEngine.queryBindings(`SELECT ${Select} WHERE {${subject} ${predicate} ${object}} LIMIT ${Limit}`, { sources: [{ type: 'rdfjsSource', - value: store}] + value: p2pServer.store}] }); bindingsStream.on('data', (binding) => { console.log(binding.toString()); diff --git a/app/miner.js b/app/miner.js index 24fd7b3..b5a530e 100644 --- a/app/miner.js +++ b/app/miner.js @@ -25,9 +25,8 @@ class Miner { console.log(validTransactions); // const validMetadataS = this.transactionPool.metadataS; const block = this.blockchain.addBlock([validTransactions, validMetadataS]); - this.p2pServer.syncChains(); + this.p2pServer.newBlock(block); this.transactionPool.clear(); - this.p2pServer.broadcastClearTransactions(); return block; } diff --git a/app/p2p-server.js b/app/p2p-server.js index fd72ab9..2b9bcf4 100644 --- a/app/p2p-server.js +++ b/app/p2p-server.js @@ -1,4 +1,9 @@ const Websocket = require('ws'); +const N3 = require('n3'); +const parser = new N3.Parser(); //({format: 'application/n-quads'}); +const DataFactory = require('n3').DataFactory; +const fs = require('fs'); +const process = require('process'); const P2P_PORT = process.env.P2P_PORT || 5000; const peers = process.env.PEERS ? process.env.PEERS.split(',') : []; @@ -10,10 +15,21 @@ const MESSAGE_TYPES = { }; class P2pServer { - constructor(blockchain, transactionPool) { + constructor(blockchain, transactionPool,chainStorageLocation) { this.blockchain = blockchain; this.transactionPool = transactionPool; this.sockets = []; + this.store = new N3.Store(); + this.chainStorageLocation = chainStorageLocation; + + //possible race if deleted after check, but we live with it I guess + if (fs.existsSync(this.chainStorageLocation)) { + const rawPersistedChain = fs.readFileSync(this.chainStorageLocation, 'utf8'); + const chain = JSON.parse(rawPersistedChain); + this.newChain(chain, false); + } else { + console.log("Didn't find a persisted chain, starting from genesis"); + } } listen() { @@ -47,7 +63,7 @@ class P2pServer { const data = JSON.parse(message); switch(data.type) { case MESSAGE_TYPES.chain: - this.blockchain.replaceChain(data.chain); + newChain(data.chain); break; case MESSAGE_TYPES.transaction: this.transactionPool.updateOrAddTransaction(data.transaction); @@ -62,6 +78,67 @@ class P2pServer { }); } + newBlock(block) { + this.onNewBlock(block.data); + this.syncChains(); + this.persistChain(this.blockchain.chain); + } + + newChain(chain,persist) { + if (!this.blockchain.replaceChain(chain)) { + //failed to replace + return; + } + if (typeof persist === "undefined" || persist) { + this.persistChain(chain); + } + //dirty clear + this.store = new N3.Store(); + for (var block in this.blockchain.chain) { + this.onNewBlock(block); + } + } + + persistChain(chain) { + try { + fs.writeFileSync( + this.chainStorageLocation, + JSON.stringify(chain)); + } catch (err) { + console.error("Couldn't persist chain, aborting"); + process.exit(-1); + } + } + + onNewBlock(block) { + //block data is of form [transactions,metadatas] + if (block.length != 2) { + //assert? + return; + } + const metadatas = block[1]; + + for (var metadata in metadatas) { + if (!(SSNmetadata in metadata)) { + //assert? + return; + } + + var ssn = metadata.SSNmetadata; + + parser.parse( + ssn, + (error, quadN, prefixes) => { + if (quadN) { + store.addQuad(DataFactory.quad( + DataFactory.namedNode(quadN.subject.id), + DataFactory.namedNode(quadN.predicate.id), + DataFactory.namedNode(quadN.object.id))); + } + }); + } + } + sendChain(socket) { socket.send(JSON.stringify({ type: MESSAGE_TYPES.chain, @@ -69,36 +146,36 @@ class P2pServer { })); } - sendTransaction(socket, transaction) { - socket.send(JSON.stringify({ - type: MESSAGE_TYPES.transaction, - transaction - })); - } + //sendTransaction(socket, transaction) { + // socket.send(JSON.stringify({ + // type: MESSAGE_TYPES.transaction, + // transaction + // })); + //} - sendMetadata(socket, metadata) { - socket.send(JSON.stringify({ - type: MESSAGE_TYPES.metadata, - metadata - })); - } + //sendMetadata(socket, metadata) { + // socket.send(JSON.stringify({ + // type: MESSAGE_TYPES.metadata, + // metadata + // })); + //} syncChains() { this.sockets.forEach(socket => this.sendChain(socket)); } - broadcastTransaction(transaction) { - this.sockets.forEach(socket => this.sendTransaction(socket, transaction)); - } + //broadcastTransaction(transaction) { + // this.sockets.forEach(socket => this.sendTransaction(socket, transaction)); + //} - broadcastMetadata(metadata) { - this.sockets.forEach(socket => this.sendMetadata(socket, metadata)); - } + //broadcastMetadata(metadata) { + // this.sockets.forEach(socket => this.sendMetadata(socket, metadata)); + //} - broadcastClearTransactions() { - this.sockets.forEach(socket => socket.send(JSON.stringify({ - type: MESSAGE_TYPES.clear_transactions - }))); - } + //broadcastClearTransactions() { + // this.sockets.forEach(socket => socket.send(JSON.stringify({ + // type: MESSAGE_TYPES.clear_transactions + // }))); + //} } module.exports = P2pServer; \ No newline at end of file diff --git a/blockchain/block.js b/blockchain/block.js index f030903..0f44ac8 100644 --- a/blockchain/block.js +++ b/blockchain/block.js @@ -24,7 +24,12 @@ class Block { static genesis() { return new this('Genesis time', '-----', 'f1r57-h45h', [], 0, DIFFICULTY); } + //we want this to eventually be continously running where there are things in the pool, + //however as node is single threaded, this almost has to be a fiber, and yield after every + //other iteration to allow for meaningful forward progress + //we can either add all new transactions into the block as we see them, or stay with the starting list, idk which + //to be done later static mineBlock(lastBlock, data) { let hash, timestamp; const lastHash = lastBlock.hash; diff --git a/blockchain/index.js b/blockchain/index.js index 5f42d7d..84bffb3 100644 --- a/blockchain/index.js +++ b/blockchain/index.js @@ -28,17 +28,19 @@ class Blockchain { return true; } + //return false on failure, true on success replaceChain(newChain) { if (newChain.length <= this.chain.length) { console.log('Received chain is not longer than the current chain.'); - return; + return false; } else if (!this.isValidChain(newChain)) { console.log('The received chain is not valid.'); - return; + return false; } console.log('Replacing blockchain with the new chain.'); this.chain = newChain; + return true; } }