Skip to content

Commit

Permalink
add libp2p module for realtime messages
Browse files Browse the repository at this point in the history
Signed-off-by: vol4tim <vol4tim@gmail.com>
  • Loading branch information
vol4tim committed Sep 27, 2023
1 parent 76624cf commit 3b1f96d
Show file tree
Hide file tree
Showing 9 changed files with 1,635 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Setup Node
uses: actions/setup-node@v1
with:
node-version: "16.3.x"
node-version: "18.17.x"

- name: Get yarn cache
id: yarn-cache
Expand Down
11 changes: 10 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "sensors.robonomics.network",
"version": "0.1.0",
"version": "0.5.0",
"private": true,
"scripts": {
"dev": "vite",
Expand All @@ -9,13 +9,20 @@
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix --ignore-path .gitignore"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^13.0.1",
"@fortawesome/fontawesome-svg-core": "^6.2.0",
"@fortawesome/free-solid-svg-icons": "^6.2.0",
"@fortawesome/vue-fontawesome": "^3.0.1",
"@kyvg/vue3-notification": "^2.9.1",
"@libp2p/bootstrap": "^9.0.6",
"@libp2p/floodsub": "^8.0.7",
"@libp2p/mplex": "^9.0.6",
"@libp2p/webrtc": "^3.2.0",
"@libp2p/websockets": "^7.0.7",
"axios": "^1.1.2",
"buffer": "^6.0.3",
"chroma-js": "^2.4.2",
"events": "^3.3.0",
"highcharts": "^10.2.1",
"js-file-download": "^0.4.12",
"js-queue": "^2.0.2",
Expand All @@ -26,7 +33,9 @@
"leaflet.locatecontrol": "^0.76.1",
"leaflet.markercluster": "^1.5.3",
"leaflet.tilelayer.colorfilter": "^1.2.5",
"libp2p": "^0.46.11",
"moment": "^2.29.4",
"multiaddr": "^10.0.1",
"pinia": "^2.0.23",
"socket.io-client": "^2.3.0",
"vue": "^3.2.38",
Expand Down
7 changes: 7 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
export default {
LIBP2P: {
bootstrappers: [
"/dns4/1.pubsub.aira.life/tcp/443/wss/ipfs/QmdfQmbmXt6sqjZyowxPUsmvBsgSGQjm4VXrV7WGy62dv8",
"/dns4/2.pubsub.aira.life/tcp/443/wss/ipfs/QmPTFt7GJ2MfDuVYwJJTULr6EnsQtGVp8ahYn9NSyoxmd9",
"/dns4/3.pubsub.aira.life/tcp/443/wss/ipfs/QmWZSKTEQQ985mnNzMqhGCrwQ1aTA6sxVsorsycQz9cQrw",
],
},
IPFS: {
fallback: {
repo: "ipfs/robonomics/5",
Expand Down
1 change: 1 addition & 0 deletions src/providers/index.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { default as Ipfs } from "./ipfs";
export { default as Libp2p } from "./libp2p";
export { default as Remote } from "./remote";
118 changes: 118 additions & 0 deletions src/providers/libp2p.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { createNode } from "../utils/libp2p";
import { measurements as converter } from "../utils/measurement";
import { getAgents } from "../utils/utils";

const topic = "airalab.lighthouse.5.robonomics.eth";

class Provider {
constructor(config) {
this.node = null;
this.isReady = false;
this.whiteListAccounts = [];
this.history = {};
this.init(config).then(() => {
this.isReady = true;
window.pubsubPeers = () => {
console.log(
"peers",
this.node.services.pubsub.getPeers().map((peer) => peer.toString())
);
console.log(
"pubsub",
this.node.services.pubsub
.getSubscribers(topic)
.map((peer) => peer.toString())
);
};
// this.node.addEventListener("peer:connect", (evt) => {
// const peerId = evt.detail;
// console.log("Connection established to:", peerId.toString());
// });
// this.node.addEventListener("peer:discovery", (evt) => {
// const peerInfo = evt.detail;
// console.log("Discovered:", peerInfo.id.toString());
// });
});
}

async init(config) {
this.node = await createNode(config);
this.whiteListAccounts = getAgents();
}

ready() {
return new Promise((res) => {
const t = setInterval(() => {
if (this.isReady) {
res();
clearInterval(t);
}
}, 100);
});
}

getHistoryBySensor(sensor) {
return Promise.resolve(this.history[sensor] ? this.history[sensor] : []);
}

watch(cb) {
this.node.services.pubsub.subscribe(topic);
this.node.services.pubsub.addEventListener("message", (evt) => {
const sender = evt.detail.from.toString();
if (!this.whiteListAccounts.includes(sender)) {
// console.log(`skip from ${sender}`);
return;
}

let json;
try {
json = JSON.parse(Buffer.from(evt.detail.data).toString("utf8"));
} catch (e) {
// console.log(sender, Buffer.from(r.data).toString("utf8"));
console.error(e.message);
return;
}

for (const sensor_id in json) {
const data = json[sensor_id];
if (
Object.prototype.hasOwnProperty.call(data, "model") &&
(!Object.prototype.hasOwnProperty.call(this.history, sensor_id) ||
this.history[sensor_id].find((item) => {
return item.timestamp === data.measurement.timestamp;
}) === undefined)
) {
const { timestamp, ...measurement } = data.measurement;
const measurementLowerCase = {};
for (var key in measurement) {
const name = key.toLowerCase();
measurementLowerCase[name] = converter[name]?.calc
? converter[name].calc(measurement[key])
: measurement[key];
}
const [lat, lng] = data.geo.split(",");
const donated_by = data.donated_by || undefined;
const point = {
sensor_id,
sender,
model: data.model,
geo: { lat, lng },
data: measurementLowerCase,
donated_by,
timestamp,
};
if (!Object.prototype.hasOwnProperty.call(this.history, sensor_id)) {
this.history[sensor_id] = [];
}
this.history[sensor_id].push(point);

cb(point);
} else {
// console.log(sensor_id, data);
}
}
});
}
}

export default Provider;
52 changes: 52 additions & 0 deletions src/utils/libp2p.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { noise } from "@chainsafe/libp2p-noise";
import { bootstrap } from "@libp2p/bootstrap";
import { floodsub } from "@libp2p/floodsub";
import { mplex } from "@libp2p/mplex";
import { webRTC } from "@libp2p/webrtc";
import { webSockets } from "@libp2p/websockets";
import * as filters from "@libp2p/websockets/filters";
import { createLibp2p } from "libp2p";
import { circuitRelayTransport } from "libp2p/circuit-relay";
import { identifyService } from "libp2p/identify";

export const createNode = async (config) => {
const node = await createLibp2p({
addresses: {
listen: ["/webrtc"],
},

transports: [
webSockets({
filter: filters.all,
}),
webRTC(),
circuitRelayTransport({
discoverRelays: 1,
}),
],

streamMuxers: [mplex()],
connectionEncryption: [noise()],
services: {
pubsub: floodsub(),
identify: identifyService(),
},
peerDiscovery: [
bootstrap({
list: config.bootstrappers,
}),
],

connectionGater: {
denyDialMultiaddr: () => {
// by default we refuse to dial local addresses from the browser since they
// are usually sent by remote peers broadcasting undialable multiaddrs but
// here we are explicitly connecting to a local node so do not deny dialing
// any discovered address
return false;
},
},
});

return node;
};
3 changes: 2 additions & 1 deletion src/views/Main.vue
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ export default {
if (this.provider === "remote") {
this.providerObj = new providers.Remote(config.REMOTE_PROVIDER);
} else {
this.providerObj = new providers.Ipfs(config.IPFS);
this.providerObj = new providers.Libp2p(config.LIBP2P);
// this.providerObj = new providers.Ipfs(config.IPFS);
}
this.providerObj.ready().then(() => {
this.providerReady = true;
Expand Down
10 changes: 9 additions & 1 deletion vite.config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { fileURLToPath, URL } from "node:url";

import { defineConfig } from "vite";
import vue from "@vitejs/plugin-vue";
import { defineConfig } from "vite";

// https://vitejs.dev/config/
export default defineConfig({
Expand All @@ -11,4 +11,12 @@ export default defineConfig({
"@": fileURLToPath(new URL("./src", import.meta.url)),
},
},
build: {
target: ["es2020"],
},
optimizeDeps: {
esbuildOptions: {
target: ["es2020"],
},
},
});
Loading

0 comments on commit 3b1f96d

Please sign in to comment.