SenShaMart/broker/broker-app.js
2023-07-13 11:32:02 +10:00

336 lines
No EOL
10 KiB
JavaScript

//BROKER
const express = require('express');
const bodyParser = require('body-parser');
const BlockchainProp = require('../network/blockchain-prop');
const Broker = require('./broker');
const Aedes = require('aedes');
const Config = require('../util/config');
const ChainUtil = require('../util/chain-util');
const QueryEngine = require('@comunica/query-sparql-rdfjs').QueryEngine;
const Blockchain = require('../blockchain/blockchain');
const Block = require('../blockchain/block');
const Integration = require('../blockchain/integration');
const SensorRegistration = require('../blockchain/sensor-registration');
const BrokerRegistration = require('../blockchain/sensor-registration');
'use strict';
const {
DEFAULT_PORT_BROKER_API,
DEFAULT_PORT_BROKER_CHAIN,
DEFAULT_PORT_BROKER_SENSOR_HANDSHAKE,
DEFAULT_PORT_BROKER_MQTT,
DEFAULT_PORT_MINER_CHAIN
} = require('../util/constants');
const CONFIGS_STORAGE_LOCATION = "./settings.json";
const config = new Config(CONFIGS_STORAGE_LOCATION);
const broker = new Broker(config.get({
key: "broker-keypair",
default: ChainUtil.genKeyPair(),
transform: ChainUtil.deserializeKeyPair
}));
const broker_name = config.get({
key: "broker-name",
default: broker.keyPair.getPublic().encode('hex')
});
const apiPort = config.get({
key: "broker-api-port",
default: DEFAULT_PORT_BROKER_API
});
const blockchainLocation = config.get({
key: "broker-blockchain-location",
default: "./broker_blockchain.json"
});
const chainServerPort = config.get({
key: "broker-chain-server-port",
default: DEFAULT_PORT_BROKER_CHAIN
});
const chainServerPeers = config.get({
key: "broker-chain-server-peers",
default: ["ws://127.0.0.1:" + DEFAULT_PORT_MINER_CHAIN]
});
const sensorHandshakePort = config.get({
key: "broker-sensor-handshake-port",
default: DEFAULT_PORT_BROKER_SENSOR_HANDSHAKE
});
const MQTTPort = config.get({
key: "broker-MQTT-port",
default: DEFAULT_PORT_BROKER_MQTT
});
const blockchain = Blockchain.loadFromDisk(blockchainLocation);
function minutesNow() {
//divide by 1000 for ms, 60 for seconds, and floor to get whole minutes passed
return Date.now() / (1000 * 60);
}
/*
Sensor name -> {
Integration Hash -> {
sensor per kb
sensor per min
dataLastAt
coinsLeft
index
}
}
*/
const ourIntegrations = new Map();
const ourSensors = new Map();
const sensorOwnerHistory = [];
function onBlockchainChange(newBlocks, oldBlocks, difference) {
const popCount = oldBlocks.length - difference;
for (let i = 0; i < popCount; i++) {
const changing = sensorOwnerHistory.pop();
for (const sensorName of changing.removing) {
ourSensors.delete(sensorName);
console.log(`No longer brokering due to pop: ${sensorName}`);
}
for (const sensor of changing.adding) {
ourSensors.set(SensorRegistration.getSensorName(sensor), sensor);
console.log(`Now brokering due to pop: ${SensorRegistration.getSensorName(sensor)}`);
}
}
//Integration hash -> Integration
const removedIntegrations = new Map();
for (let i = difference; i < oldBlocks.length; i++) {
for (const integration of Block.getIntegrations(oldBlocks[i])) {
removedIntegrations.set(Integration.hashToSign(integration), integration);
}
}
//see what's added, then see what's removed
//if it's been removed and added, we don't change anything, else we do the respective operation
for (let i = difference; i < newBlocks.length; i++) {
//play with the new integrations
const newHistory = {
adding: [],
removing: []
};
for (const integration of Block.getIntegrations(newBlocks[i])) {
const integrationHash = Integration.hashToSign(integration);
for (let i = 0; i < integration.outputs.length; i++) { //for every output
const output = integration.outputs[i];
if (ourSensors.has(output.sensorName)) { //if it references one of our sensors
const sensor = ourSensors.get(output.sensorName);
if (!ourIntegrations.has(output.sensorName)) { //if the entry for this sensor doesn't exist
ourIntegrations.set(output.sensorName, new Map()); //make it
}
const integrationMap = ourIntegrations.get(output.sensorName);
if (integrationMap.has(integrationHash)) { //if it already exists
removedIntegrations.delete(integrationHash); //remove it from the removed map, as it's still present in the new chain
} else { //else
console.log(`Starting to integrate for integration: ${integrationHash}, sensor: ${output.sensorName}, perMin: ${SensorRegistration.getCostPerMinute(sensor)}, costPerKB: ${SensorRegistration.getCostPerKB(sensor)}`);
integrationMap.set(Integration.hashToSign(integration), //add the integration
{
perKB: SensorRegistration.getCostPerKB(sensor),
perMin: SensorRegistration.getCostPerMinute(sensor),
dataLastAt: minutesNow(),
coinsLeft: output.amount,
index: i
});
}
}
}
}
//playing with integrations done, now update which sensors we own
for (const sensorRegistration of Block.getSensorRegistrations(newBlocks[i])) {
const sensorName = SensorRegistration.getSensorName(sensorRegistration);
if (ourSensors.has(sensorName)) { //if this sensor is currently one of ours
const existingSensor = ourSensors.get(sensorName);
if (SensorRegistration.getIntegrationBroker(sensorRegistration) !== broker_name) {//if the broker is now not us
newHistory.adding.push(existingSensor);
ourSensors.delete(sensorName);
console.log(`No longer brokering due to push: ${sensorName}`);
} else {
newHistory.adding.push(existingSensor);
ourSensors.set(sensorName, sensorRegistration);
console.log(`Updated brokering of ${sensorName}`);
}
} else { //else, we don't currently own this sensor
if (SensorRegistration.getIntegrationBroker(sensorRegistration) === broker_name) {
newHistory.removing.push(sensorName);
ourSensors.set(sensorName, sensorRegistration);
console.log(`Now brokering due to push: ${sensorName}`);
}
}
}
sensorOwnerHistory.push(newHistory);
}
for (const [hash, integration] of removedIntegrations) {
for (const output of integration.outputs) {
if (ourSensors.has(output.sensorName)) {
ourSensors.get(output.sensorName).integrations.remove(hash);
}
}
}
}
blockchain.addListener(onBlockchainChange);
onBlockchainChange(blockchain.blocks(), [], 0);
const mqtt = new Aedes({
id: broker_name
});
const MQTTServer = require('net').createServer(mqtt.handle);
function onNewPacket(sensor, data) {
//check to see if sensor has been paid for
console.log(`New packet from ${sensor} with size ${data.length}`);
const foundSensor = ourIntegrations.get(sensor);
if (typeof foundSensor === "undefined") {
return;
}
const now = minutesNow();
const removing = [];
for (const [hash, info] of foundSensor) {
const timeDelta = now - info.dataLastAt;
const cost =
timeDelta * info.perMin
+ data.length / 1024 * info.perKB;
console.log(`out/${hash}/${info.index} = timeDelta: ${timeDelta}, cost: ${cost}`);
if (cost >= info.coinsLeft) {
//we're out of money, integration is over
console.log(`out of coins for ${hash}`);
removing.push(hash);
} else {
info.coinsLeft -= cost;
info.dataLastAt = now;
mqtt.publish({
topic: "out/" + hash + '/' + info.index,
payload: data
});
}
}
for (const hash of removing) {
foundSensor.delete(hash);
}
if (foundSensor.size === 0) {
ourIntegrations.delete(sensor);
}
}
//can only subscribe to out/
mqtt.authorizeSubscribe = function (client, sub, callback) {
if (!sub.topic.startsWith("out/")) {
console.log(`Failed subscribe to topic ${sub.topic} by ${client}`);
return callback(new Error("Can't sub to this topic"));
} else {
console.log(`Subscription by ${client} to ${sub.topic}`);
}
callback(null, sub)
}
//can only publish to in/
mqtt.authorizePublish = function (client, packet, callback) {
if (!packet.topic.startsWith("in/")) {
console.log(`Failed publish to topic ${packet.topic} by ${client}`);
return callback(new Error("Can't publish to this topic"))
} else {
console.log(`Publish by ${client} to ${packet.topic} of size ${packet.payload.length}`);
onNewPacket(packet.topic.substring(3), packet.payload);
}
callback(null)
}
//this will change maybe
mqtt.authenticate = function (client, username, password, callback) {
callback(null, true)
}
function onSensorHandshakeMsg(sensor, data) {
onNewPacket(sensor, data);
}
const chainServer = new BlockchainProp("Chain-server", blockchain);
chainServer.start(chainServerPort, null, chainServerPeers);
broker.start(sensorHandshakePort, onSensorHandshakeMsg);
MQTTServer.listen(MQTTPort, () => {
console.log("Sensor MQTT started");
});
const app = express();
app.use(bodyParser.json());
app.listen(apiPort, () => console.log(`Listening on port ${apiPort}`));
app.get('/ourSensors', (req, res) => {
res.json(ourSensors);
});
app.get('/ChainServer/sockets', (req, res) => {
res.json(chainServer.sockets);
});
app.post('/ChainServer/connect', (req, res) => {
chainServer.connect(req.body.url);
res.json("Connecting");
});
app.get('/public-key', (req, res) => {
res.json(wallet.publicKey);
});
app.get('/key-pair', (req, res) => {
res.json(ChainUtil.serializeKeyPair(wallet.keyPair));
});
app.get('/MyBalance', (req, res) => {
res.json(blockchain.getBalanceCopy(wallet.publicKey));
});
app.get('/Balance', (req, res) => {
const balance = blockchain.getBalanceCopy(req.body.publicKey);
res.json(balance);
});
app.get('/Balances', (req, res) => {
const balances = blockchain.balances;
res.json(balances);
});
const myEngine = new QueryEngine();
app.post('/sparql', (req, res) => {
const start = async function () {
try {
let result = [];
const bindingsStream = await myEngine.queryBindings(
req.body.query,
{
readOnly: true,
sources: getBlockchain().stores
});
bindingsStream.on('data', (binding) => {
result.push(binding);
});
bindingsStream.on('end', () => {
res.json(JSON.stringify(result));
});
bindingsStream.on('error', (err) => {
console.error(err);
});
} catch (err) {
console.error(err);
res.json("Error occured while querying");
}
};
start()
});