Import nodejs chat

This commit is contained in:
Oskar Thoren 2020-09-21 12:24:23 +08:00
parent 597c1514f5
commit f6f5d9b01e
No known key found for this signature in database
GPG Key ID: B2ECCFD3BC2EF77E
2 changed files with 297 additions and 0 deletions

203
nodejs/src/chat.js Normal file
View File

@ -0,0 +1,203 @@
const protons = require('protons')
const { Request, Stats } = protons(`
message Request {
enum Type {
SEND_MESSAGE = 0;
UPDATE_PEER = 1;
STATS = 2;
}
required Type type = 1;
optional SendMessage sendMessage = 2;
optional UpdatePeer updatePeer = 3;
optional Stats stats = 4;
}
message SendMessage {
required bytes data = 1;
required int64 created = 2;
required bytes id = 3;
}
message UpdatePeer {
optional bytes userHandle = 1;
}
message Stats {
enum NodeType {
GO = 0;
NODEJS = 1;
BROWSER = 2;
}
repeated bytes connectedPeers = 1;
optional NodeType nodeType = 2;
}
`)
class Chat {
/**
*
* @param {Libp2p} libp2p A Libp2p node to communicate through
* @param {string} topic The topic to subscribe to
* @param {function(Message)} messageHandler Called with every `Message` received on `topic`
*/
constructor(libp2p, topic, messageHandler) {
this.libp2p = libp2p
this.topic = topic
this.messageHandler = messageHandler
this.userHandles = new Map([
[libp2p.peerId.toB58String(), 'Me']
])
this.connectedPeers = new Set()
this.libp2p.connectionManager.on('peer:connect', (connection) => {
if (this.connectedPeers.has(connection.remotePeer.toB58String())) return
this.connectedPeers.add(connection.remotePeer.toB58String())
this.sendStats(Array.from(this.connectedPeers))
})
this.libp2p.connectionManager.on('peer:disconnect', (connection) => {
if (this.connectedPeers.delete(connection.remotePeer.toB58String())) {
this.sendStats(Array.from(this.connectedPeers))
}
})
// Join if libp2p is already on
if (this.libp2p.isStarted()) this.join()
}
/**
* Handler that is run when `this.libp2p` starts
*/
onStart () {
this.join()
}
/**
* Handler that is run when `this.libp2p` stops
*/
onStop () {
this.leave()
}
/**
* Subscribes to `Chat.topic`. All messages will be
* forwarded to `messageHandler`
* @private
*/
join () {
this.libp2p.pubsub.subscribe(this.topic, (message) => {
try {
const request = Request.decode(message.data)
switch (request.type) {
case Request.Type.UPDATE_PEER:
const newHandle = request.updatePeer.userHandle.toString()
console.info(`System: ${message.from} is now ${newHandle}.`)
this.userHandles.set(message.from, newHandle)
break
case Request.Type.SEND_MESSAGE:
this.messageHandler({
from: message.from,
message: request.sendMessage
})
break
default:
// Do nothing
}
} catch (err) {
console.error(err)
}
})
}
/**
* Unsubscribes from `Chat.topic`
* @private
*/
leave () {
this.libp2p.pubsub.unsubscribe(this.topic)
}
/**
* Crudely checks the input for a command. If no command is
* found `false` is returned. If the input contains a command,
* that command will be processed and `true` will be returned.
* @param {Buffer|string} input Text submitted by the user
* @returns {boolean} Whether or not there was a command
*/
checkCommand (input) {
const str = input.toString()
if (str.startsWith('/')) {
const args = str.slice(1).split(' ')
switch (args[0]) {
case 'name':
this.updatePeer(args[1])
return true
}
}
return false
}
/**
* Sends a message over pubsub to update the user handle
* to the provided `name`.
* @param {Buffer|string} name Username to change to
*/
async updatePeer (name) {
const msg = Request.encode({
type: Request.Type.UPDATE_PEER,
updatePeer: {
userHandle: Buffer.from(name)
}
})
try {
await this.libp2p.pubsub.publish(this.topic, msg)
} catch (err) {
console.error('Could not publish name change', err)
}
}
/**
* Sends the updated stats to the pubsub network
* @param {Array<Buffer>} connectedPeers
*/
async sendStats (connectedPeers) {
const msg = Request.encode({
type: Request.Type.STATS,
stats: {
connectedPeers,
nodeType: Stats.NodeType.NODEJS
}
})
try {
await this.libp2p.pubsub.publish(this.topic, msg)
} catch (err) {
console.error('Could not publish stats update', err)
}
}
/**
* Publishes the given `message` to pubsub peers
* @throws
* @param {Buffer|string} message The chat message to send
*/
async send (message) {
const msg = Request.encode({
type: Request.Type.SEND_MESSAGE,
sendMessage: {
id: (~~(Math.random() * 1e9)).toString(36) + Date.now(),
data: Buffer.from(message),
created: Date.now()
}
})
await this.libp2p.pubsub.publish(this.topic, msg)
}
}
module.exports = Chat
module.exports.TOPIC = '/libp2p/example/chat/1.0.0'
module.exports.CLEARLINE = '\033[1A'

94
nodejs/src/index.js Normal file
View File

@ -0,0 +1,94 @@
'use strict'
// Libp2p Core
const Libp2p = require('libp2p')
// Transports
const TCP = require('libp2p-tcp')
const Websockets = require('libp2p-websockets')
const WebrtcStar = require('libp2p-webrtc-star')
const wrtc = require('wrtc')
// Stream Muxer
const Mplex = require('libp2p-mplex')
// Connection Encryption
const { NOISE } = require('libp2p-noise')
const Secio = require('libp2p-secio')
// Chat over Pubsub
const PubsubChat = require('./chat')
// Peer Discovery
const Bootstrap = require('libp2p-bootstrap')
const MDNS = require('libp2p-mdns')
const KadDHT = require('libp2p-kad-dht')
// PubSub implementation
const Gossipsub = require('libp2p-gossipsub')
;(async () => {
// Create the Node
const libp2p = await Libp2p.create({
addresses: {
listen: [
'/ip4/0.0.0.0/tcp/0',
'/ip4/0.0.0.0/tcp/0/ws',
`/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/`
]
},
modules: {
transport: [ TCP, Websockets, WebrtcStar ],
streamMuxer: [ Mplex ],
connEncryption: [ NOISE, Secio ],
peerDiscovery: [ Bootstrap, MDNS ],
dht: KadDHT,
pubsub: Gossipsub
},
config: {
transport : {
[WebrtcStar.prototype[Symbol.toStringTag]]: {
wrtc
}
},
peerDiscovery: {
bootstrap: {
list: [ '/ip4/127.0.0.1/tcp/63785/ipfs/QmWjz6xb8v9K4KnYEwP5Yk75k5mMBCehzWFLCvvQpYxF3d' ]
}
},
dht: {
enabled: true,
randomWalk: {
enabled: true
}
}
}
})
// Listen on libp2p for `peer:connect` and log the provided connection.remotePeer.toB58String() peer id string.
libp2p.connectionManager.on('peer:connect', (connection) => {
console.info(`Connected to ${connection.remotePeer.toB58String()}!`)
})
// Start libp2p
await libp2p.start()
// Create our PubsubChat client
const pubsubChat = new PubsubChat(libp2p, PubsubChat.TOPIC, ({ from, message }) => {
let fromMe = from === libp2p.peerId.toB58String()
let user = from.substring(0, 6)
if (pubsubChat.userHandles.has(from)) {
user = pubsubChat.userHandles.get(from)
}
console.info(`${fromMe ? PubsubChat.CLEARLINE : ''}${user}(${new Date(message.created).toLocaleTimeString()}): ${message.data}`)
})
// Set up our input handler
process.stdin.on('data', async (message) => {
// Remove trailing newline
message = message.slice(0, -1)
// If there was a command, exit early
if (pubsubChat.checkCommand(message)) return
try {
// Publish the message
await pubsubChat.send(message)
} catch (err) {
console.error('Could not publish chat', err)
}
})
})()