diff --git a/nodejs/src/chat.js b/nodejs/src/chat.js new file mode 100644 index 0000000..c107fb9 --- /dev/null +++ b/nodejs/src/chat.js @@ -0,0 +1,203 @@ +const protons = require('protons') + +const { Request, Stats } = protons(` +message Request { + enum Type { + SEND_MESSAGE = 0; + UPDATE_PEER = 1; + STATS = 2; + } + + required Type type = 1; + optional SendMessage sendMessage = 2; + optional UpdatePeer updatePeer = 3; + optional Stats stats = 4; +} + +message SendMessage { + required bytes data = 1; + required int64 created = 2; + required bytes id = 3; +} + +message UpdatePeer { + optional bytes userHandle = 1; +} + +message Stats { + enum NodeType { + GO = 0; + NODEJS = 1; + BROWSER = 2; + } + + repeated bytes connectedPeers = 1; + optional NodeType nodeType = 2; +} +`) + +class Chat { + /** + * + * @param {Libp2p} libp2p A Libp2p node to communicate through + * @param {string} topic The topic to subscribe to + * @param {function(Message)} messageHandler Called with every `Message` received on `topic` + */ + constructor(libp2p, topic, messageHandler) { + this.libp2p = libp2p + this.topic = topic + this.messageHandler = messageHandler + this.userHandles = new Map([ + [libp2p.peerId.toB58String(), 'Me'] + ]) + + this.connectedPeers = new Set() + this.libp2p.connectionManager.on('peer:connect', (connection) => { + if (this.connectedPeers.has(connection.remotePeer.toB58String())) return + this.connectedPeers.add(connection.remotePeer.toB58String()) + this.sendStats(Array.from(this.connectedPeers)) + }) + this.libp2p.connectionManager.on('peer:disconnect', (connection) => { + if (this.connectedPeers.delete(connection.remotePeer.toB58String())) { + this.sendStats(Array.from(this.connectedPeers)) + } + }) + + // Join if libp2p is already on + if (this.libp2p.isStarted()) this.join() + } + + /** + * Handler that is run when `this.libp2p` starts + */ + onStart () { + this.join() + } + + /** + * Handler that is run when `this.libp2p` stops + */ + onStop () { + this.leave() + } + + /** + * Subscribes to `Chat.topic`. All messages will be + * forwarded to `messageHandler` + * @private + */ + join () { + this.libp2p.pubsub.subscribe(this.topic, (message) => { + try { + const request = Request.decode(message.data) + switch (request.type) { + case Request.Type.UPDATE_PEER: + const newHandle = request.updatePeer.userHandle.toString() + console.info(`System: ${message.from} is now ${newHandle}.`) + this.userHandles.set(message.from, newHandle) + break + case Request.Type.SEND_MESSAGE: + this.messageHandler({ + from: message.from, + message: request.sendMessage + }) + break + default: + // Do nothing + } + } catch (err) { + console.error(err) + } + }) + } + + /** + * Unsubscribes from `Chat.topic` + * @private + */ + leave () { + this.libp2p.pubsub.unsubscribe(this.topic) + } + + /** + * Crudely checks the input for a command. If no command is + * found `false` is returned. If the input contains a command, + * that command will be processed and `true` will be returned. + * @param {Buffer|string} input Text submitted by the user + * @returns {boolean} Whether or not there was a command + */ + checkCommand (input) { + const str = input.toString() + if (str.startsWith('/')) { + const args = str.slice(1).split(' ') + switch (args[0]) { + case 'name': + this.updatePeer(args[1]) + return true + } + } + return false + } + + /** + * Sends a message over pubsub to update the user handle + * to the provided `name`. + * @param {Buffer|string} name Username to change to + */ + async updatePeer (name) { + const msg = Request.encode({ + type: Request.Type.UPDATE_PEER, + updatePeer: { + userHandle: Buffer.from(name) + } + }) + + try { + await this.libp2p.pubsub.publish(this.topic, msg) + } catch (err) { + console.error('Could not publish name change', err) + } + } + + /** + * Sends the updated stats to the pubsub network + * @param {Array} connectedPeers + */ + async sendStats (connectedPeers) { + const msg = Request.encode({ + type: Request.Type.STATS, + stats: { + connectedPeers, + nodeType: Stats.NodeType.NODEJS + } + }) + + try { + await this.libp2p.pubsub.publish(this.topic, msg) + } catch (err) { + console.error('Could not publish stats update', err) + } + } + + /** + * Publishes the given `message` to pubsub peers + * @throws + * @param {Buffer|string} message The chat message to send + */ + async send (message) { + const msg = Request.encode({ + type: Request.Type.SEND_MESSAGE, + sendMessage: { + id: (~~(Math.random() * 1e9)).toString(36) + Date.now(), + data: Buffer.from(message), + created: Date.now() + } + }) + + await this.libp2p.pubsub.publish(this.topic, msg) + } +} + +module.exports = Chat +module.exports.TOPIC = '/libp2p/example/chat/1.0.0' +module.exports.CLEARLINE = '\033[1A' diff --git a/nodejs/src/index.js b/nodejs/src/index.js new file mode 100644 index 0000000..47ab703 --- /dev/null +++ b/nodejs/src/index.js @@ -0,0 +1,94 @@ +'use strict' + +// Libp2p Core +const Libp2p = require('libp2p') +// Transports +const TCP = require('libp2p-tcp') +const Websockets = require('libp2p-websockets') +const WebrtcStar = require('libp2p-webrtc-star') +const wrtc = require('wrtc') +// Stream Muxer +const Mplex = require('libp2p-mplex') +// Connection Encryption +const { NOISE } = require('libp2p-noise') +const Secio = require('libp2p-secio') +// Chat over Pubsub +const PubsubChat = require('./chat') +// Peer Discovery +const Bootstrap = require('libp2p-bootstrap') +const MDNS = require('libp2p-mdns') +const KadDHT = require('libp2p-kad-dht') +// PubSub implementation +const Gossipsub = require('libp2p-gossipsub') + +;(async () => { + // Create the Node + const libp2p = await Libp2p.create({ + addresses: { + listen: [ + '/ip4/0.0.0.0/tcp/0', + '/ip4/0.0.0.0/tcp/0/ws', + `/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/` + ] + }, + modules: { + transport: [ TCP, Websockets, WebrtcStar ], + streamMuxer: [ Mplex ], + connEncryption: [ NOISE, Secio ], + peerDiscovery: [ Bootstrap, MDNS ], + dht: KadDHT, + pubsub: Gossipsub + }, + config: { + transport : { + [WebrtcStar.prototype[Symbol.toStringTag]]: { + wrtc + } + }, + peerDiscovery: { + bootstrap: { + list: [ '/ip4/127.0.0.1/tcp/63785/ipfs/QmWjz6xb8v9K4KnYEwP5Yk75k5mMBCehzWFLCvvQpYxF3d' ] + } + }, + dht: { + enabled: true, + randomWalk: { + enabled: true + } + } + } + }) + + // Listen on libp2p for `peer:connect` and log the provided connection.remotePeer.toB58String() peer id string. + libp2p.connectionManager.on('peer:connect', (connection) => { + console.info(`Connected to ${connection.remotePeer.toB58String()}!`) + }) + + // Start libp2p + await libp2p.start() + + // Create our PubsubChat client + const pubsubChat = new PubsubChat(libp2p, PubsubChat.TOPIC, ({ from, message }) => { + let fromMe = from === libp2p.peerId.toB58String() + let user = from.substring(0, 6) + if (pubsubChat.userHandles.has(from)) { + user = pubsubChat.userHandles.get(from) + } + console.info(`${fromMe ? PubsubChat.CLEARLINE : ''}${user}(${new Date(message.created).toLocaleTimeString()}): ${message.data}`) + }) + + // Set up our input handler + process.stdin.on('data', async (message) => { + // Remove trailing newline + message = message.slice(0, -1) + // If there was a command, exit early + if (pubsubChat.checkCommand(message)) return + + try { + // Publish the message + await pubsubChat.send(message) + } catch (err) { + console.error('Could not publish chat', err) + } + }) +})()