reorg code, integration publish/subscribe working, still some TODOs

This commit is contained in:
Josip Milovac 2023-02-01 13:01:51 +11:00
parent 043a95d9ef
commit 1af6d56e2d
35 changed files with 3052 additions and 1401 deletions

219
broker/broker-app.js Normal file
View file

@ -0,0 +1,219 @@
//BROKER
const express = require('express');
const bodyParser = require('body-parser');
const P2pServer = require('../p2p-server');
const Broker = require('./broker');
const Aedes = require('aedes');
const Config = require('../config');
const ChainUtil = require('../chain-util');
const QueryEngine = require('@comunica/query-sparql-rdfjs').QueryEngine;
const Blockchain = require('../blockchain/blockchain');
'use strict';
const {
DEFAULT_PORT_BROKER_API,
DEFAULT_PORT_BROKER_CHAIN,
DEFAULT_PORT_BROKER_SENSOR_HANDSHAKE,
DEFAULT_PORT_BROKER_SENSOR_MQTT,
DEFAULT_PORT_BROKER_CLIENT_MQTT,
DEFAULT_PORT_MINER_CHAIN
} = require('../constants');
const CONFIGS_STORAGE_LOCATION = "./settings.json";
const config = new Config(CONFIGS_STORAGE_LOCATION);
const broker = new Broker(config.get({
key: "broker-keypair",
default: ChainUtil.genKeyPair(),
transform: ChainUtil.deserializeKeyPair
}));
const broker_name = config.get({
key: "broker-name",
default: broker.keyPair.getPublic().encode('hex')
});
const apiPort = config.get({
key: "broker-api-port",
default: DEFAULT_PORT_BROKER_API
});
const blockchainLocation = config.get({
key: "broker-blockchain-location",
default: "./broker_blockchain.json"
});
const chainServerPort = config.get({
key: "broker-chain-server-port",
default: DEFAULT_PORT_BROKER_CHAIN
});
const chainServerPeers = config.get({
key: "broker-chain-server-peers",
default: ["ws://127.0.0.1:" + DEFAULT_PORT_MINER_CHAIN]
});
const sensorHandshakePort = config.get({
key: "broker-sensor-handshake-port",
default: DEFAULT_PORT_BROKER_SENSOR_HANDSHAKE
});
const sensorMQTTPort = config.get({
key: "broker-sensor-MQTT-port",
default: DEFAULT_PORT_BROKER_SENSOR_MQTT
});
const clientMQTTPort = config.get({
key: "broker-client-MQTT-port",
default: DEFAULT_PORT_BROKER_CLIENT_MQTT
});
const blockchain = Blockchain.loadFromDisk(blockchainLocation);
let sensorsServing = {};
const sensorMQTT = new Aedes({
id: broker_name
});
const sensorMQTTServer = require('net').createServer(sensorMQTT.handle);
const sensorMQTTSubscriptions = {};
const clientMQTT = new Aedes({
id: broker_name
});
const clientMQTTServer = require('net').createServer(clientMQTT.handle);
function onNewPacket(sensor, data) {
//check to see if sensor has been paid for
clientMQTT.publish({
topic: sensor,
payload: data
});
}
function onChainServerRecv(data) {
const replaceResult = blockchain.replaceChain(Blockchain.deserialize(data));
if (!replaceResult.result) {
console.log(`Failed to replace chain: ${replaceResult.reason}`);
//failed to replace
return;
}
blockchain.saveToDisk(blockchainLocation);
sensorsServing = {};
for (const sensorName in blockchain.sensors) {
const sensorData = blockchain.sensors[sensorName];
if (sensorData.integrationBroker === broker_name) {
sensorsServing[sensorName] = sensorData;
}
}
//UNSUBSCRIBE
for (const sensorName in sensorMQTTSubscriptions) {
if (!(sensorName in sensorsServing)) {
const deliverFunction = sensorMQTTSubscriptions[sensorName];
sensorMQTT.unsubscribe(sensorName, deliverFunction, () => { });
delete sensorMQTTSubscriptions[sensorName];
}
}
//SUBSCRIBE
for (const sensorName in sensorsServing) {
if (!(sensorName in sensorMQTTSubscriptions)) {
const deliverFunction = (packet, cb) => {
onNewPacket(packet.topic, packet.payload);
cb();
};
sensorMQTTSubscriptions[sensorName] = deliverFunction;
sensorMQTT.subscribe(sensorName, deliverFunction, () => { });
}
}
}
function onSensorHandshakeMsg(sensor, data) {
onNewPacket(sensor, data);
}
const chainServer = new P2pServer("Chain-server");
chainServer.start(chainServerPort, chainServerPeers, (_) => { }, onChainServerRecv);
broker.start(sensorHandshakePort, onSensorHandshakeMsg);
sensorMQTTServer.listen(sensorMQTTPort, () => {
console.log("Sensor MQTT started");
});
clientMQTTServer.listen(clientMQTTPort, () => {
console.log("Client MQTT started");
});
const app = express();
app.use(bodyParser.json());
app.listen(apiPort, () => console.log(`Listening on port ${apiPort}`));
app.get('/sensors', (req, res) => {
res.json(sensorsServing);
});
app.get('/ChainServer/sockets', (req, res) => {
res.json(chainServer.sockets);
});
app.post('/ChainServer/connect', (req, res) => {
chainServer.connect(req.body.url);
res.json("Connecting");
});
app.get('/public-key', (req, res) => {
res.json(wallet.publicKey);
});
app.get('/key-pair', (req, res) => {
res.json(ChainUtil.serializeKeyPair(wallet.keyPair));
});
app.get('/MyBalance', (req, res) => {
res.json(blockchain.getBalanceCopy(wallet.publicKey));
});
app.get('/Balance', (req, res) => {
const balance = blockchain.getBalanceCopy(req.body.publicKey);
res.json(balance);
});
app.get('/Balances', (req, res) => {
const balances = blockchain.balances;
res.json(balances);
});
const myEngine = new QueryEngine();
app.post('/sparql', (req, res) => {
const start = async function () {
try {
let result = [];
const bindingsStream = await myEngine.queryBindings(
req.body.query,
{
readOnly: true,
sources: getBlockchain().stores
});
bindingsStream.on('data', (binding) => {
result.push(binding);
});
bindingsStream.on('end', () => {
res.json(JSON.stringify(result));
});
bindingsStream.on('error', (err) => {
console.error(err);
});
} catch (err) {
console.error(err);
res.json("Error occured while querying");
}
};
start()
});

102
broker/broker.js Normal file
View file

@ -0,0 +1,102 @@
const Websocket = require('ws');
//const Mqtt = require('mqtt');
//const Aedes = require('aedes')(); /* aedes is a stream-based MQTT broker */
//const MQTTserver = require('net').createServer(aedes.handle);
const ChainUtil = require('../chain-util');
const crypto = require('crypto');
const STATE_CLIENT_HELLOING = 0;
const STATE_OPERATIONAL = 1;
function onClientHelloing(parent, socket, data) {
const asJson = JSON.parse(data);
const owner = asJson.owner;
const sensor = asJson.sensor;
const signature = asJson.signature;
const clientNonce = asJson.clientNonce;
if (typeof owner !== 'string' || typeof sensor !== 'string' || typeof signature !== 'object' || typeof clientNonce !== 'string') {
console.log("Bad client hello");
socket.close();
return;
}
socket.owner = owner;
socket.sensor = sensor;
socket.clientNonce = clientNonce;
const verifyRes = ChainUtil.verifySignature(owner, signature, socket.serverNonce + clientNonce);
if (!verifyRes.result) {
console.log("Bad client hello signature: " + verifyRes.reason);
socket.close();
return;
}
const ourSig = parent.keyPair.sign(clientNonce + socket.serverNonce);
socket.send(JSON.stringify(ourSig));
socket.state = STATE_OPERATIONAL;
console.log(`Sensor ${socket.owner}:${socket.sensor} is operational`);
}
function onOperational(parent, socket, data) {
parent.onMessage(socket.sensor, data);
}
function onSocketMessage(parent, socket, data) {
switch (socket.state) {
case STATE_CLIENT_HELLOING: onClientHelloing(parent, socket, data); break;
case STATE_OPERATIONAL: onOperational(parent, socket, data); break;
default: throw Error("Invalid internal state");
}
}
class Socket {
constructor(parent, socket, serverNonce) {
this.parent = parent;
this.socket = socket;
this.serverNonce = serverNonce;
this.state = STATE_CLIENT_HELLOING;
this.socket.on('message', (data) => {
onSocketMessage(parent, this, data);
});
this.send(serverNonce);
}
send(data) {
this.socket.send(data);
}
close() {
this.socket.close();
}
}
function onConnection(broker, rawSocket) {
console.log("Sensor connected");
crypto.randomBytes(2048, (err, buf) => {
if (err) {
console.log(`Couldn't generate server nonce: ${err}`);
rawSocket.close();
return;
}
new Socket(broker, rawSocket, buf.toString('hex'));
});
}
class Broker {
constructor(keyPair) {
//owner:sensor->mqtt channel
this.brokering = {};
this.keyPair = keyPair;
}
start(sensorPort, onMessage) {
this.onMessage = onMessage;
this.sensorPort = sensorPort;
this.server = new Websocket.Server({ port: this.sensorPort });
this.server.on('connection', socket => onConnection(this, socket));
console.log(`Broker listening for sensors on: ${this.sensorPort}`);
}
}
module.exports = Broker;