nim-libp2p/examples/directchat.nim

214 lines
7.9 KiB
Nim
Raw Normal View History

2019-09-12 17:20:30 -06:00
when not(compileOption("threads")):
{.fatal: "Please, compile this program with the --threads:on option!".}
import tables, strformat, strutils, bearssl
import chronos # an efficient library for async
import ../libp2p/[switch, # manage transports, a single entry point for dialing and listening
multistream, # tag stream with short header to identify it
crypto/crypto, # cryptographic functions
errors, # error handling utilities
protocols/identify, # identify the peer info of a peer
stream/connection, # create and close stream read / write connections
transports/transport, # listen and dial to other peers using p2p protocol
transports/tcptransport, # listen and dial to other peers using client-server protocol
multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP
2020-05-08 22:58:23 +02:00
peerinfo, # manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protocols/protocol, # define the protocol base type
protocols/secure/secure, # define the protocol of secure connection
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
muxers/mplex/mplex] # define some contants and message types for stream multiplexing
2019-09-12 15:54:12 -06:00
const ChatCodec = "/nim-libp2p/chat/1.0.0"
2019-09-12 18:05:20 -06:00
const DefaultAddr = "/ip4/127.0.0.1/tcp/55505"
2019-09-12 17:20:30 -06:00
const Help = """
Commands: /[?|hep|connect|disconnect|exit]
help: Prints this help
connect: dials a remote peer
disconnect: ends current session
exit: closes the chat
"""
type ChatProto = ref object of LPProtocol
switch: Switch # a single entry point for dialing and listening to peer
2020-05-08 22:58:23 +02:00
transp: StreamTransport # transport streams between read & write file descriptor
conn: Connection # create and close read & write stream
connected: bool # if the node is connected to another peer
started: bool # if the node has started
2019-09-12 18:05:20 -06:00
2019-09-12 15:54:12 -06:00
proc initAddress(T: type MultiAddress, str: string): T =
let address = MultiAddress.init(str).tryGet()
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
result = address
else:
raise newException(ValueError,
"Invalid bootstrap node multi-address")
2019-09-12 15:54:12 -06:00
proc dialPeer(p: ChatProto, address: string) {.async.} =
let multiAddr = MultiAddress.initAddress(address);
let parts = address.split("/")
let remotePeer = PeerInfo.init(parts[^1],
[multiAddr])
2019-09-12 15:54:12 -06:00
echo &"dialing peer: {multiAddr}"
2019-09-12 15:54:12 -06:00
p.conn = await p.switch.dial(remotePeer, ChatCodec)
p.connected = true
proc readAndPrint(p: ChatProto) {.async.} =
while true:
while p.connected:
# TODO: echo &"{p.id} -> "
2020-05-08 22:58:23 +02:00
echo cast[string](await p.conn.readLp(1024))
await sleepAsync(100.millis)
proc writeAndPrint(p: ChatProto) {.async.} =
2019-09-12 15:54:12 -06:00
while true:
2019-09-12 18:05:20 -06:00
if not p.connected:
2019-09-12 15:54:12 -06:00
echo "type an address or wait for a connection:"
2019-09-12 17:20:30 -06:00
echo "type /[help|?] for help"
2019-09-12 15:54:12 -06:00
let line = await p.transp.readLine()
2019-09-12 17:20:30 -06:00
if line.startsWith("/help") or line.startsWith("/?") or not p.started:
echo Help
continue
2019-09-12 17:20:30 -06:00
if line.startsWith("/disconnect"):
echo "Ending current session"
2019-09-12 18:05:20 -06:00
if p.connected and p.conn.closed.not:
await p.conn.close()
2019-09-12 17:20:30 -06:00
p.connected = false
elif line.startsWith("/connect"):
if p.connected:
2019-09-14 07:54:09 -06:00
var yesno = "N"
2019-09-12 17:20:30 -06:00
echo "a session is already in progress, do you want end it [y/N]?"
2019-09-14 07:54:09 -06:00
yesno = await p.transp.readLine()
2019-09-12 17:20:30 -06:00
if yesno.cmpIgnoreCase("y") == 0:
await p.conn.close()
p.connected = false
elif yesno.cmpIgnoreCase("n") == 0:
2019-09-12 17:20:30 -06:00
continue
else:
echo "unrecognized response"
continue
echo "enter address of remote peer"
let address = await p.transp.readLine()
if address.len > 0:
await p.dialPeer(address)
elif line.startsWith("/exit"):
2019-09-12 18:05:20 -06:00
if p.connected and p.conn.closed.not:
await p.conn.close()
p.connected = false
await p.switch.stop()
echo "quitting..."
2019-09-12 17:20:30 -06:00
quit(0)
2019-09-12 15:54:12 -06:00
else:
2019-09-12 17:20:30 -06:00
if p.connected:
await p.conn.writeLp(line)
else:
try:
if line.startsWith("/") and "ipfs" in line:
await p.dialPeer(line)
except:
echo &"unable to dial remote peer {line}"
echo getCurrentExceptionMsg()
2019-09-12 15:54:12 -06:00
proc readWriteLoop(p: ChatProto) {.async.} =
asyncCheck p.writeAndPrint() # execute the async function but does not block
asyncCheck p.readAndPrint()
2019-09-12 15:54:12 -06:00
proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto =
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
var chatproto = ChatProto(switch: switch, transp: transp, codecs: @[ChatCodec])
# create handler for incoming connection
proc handle(stream: Connection, proto: string) {.async.} =
if chatproto.connected and not chatproto.conn.closed:
2019-09-12 17:20:30 -06:00
echo "a chat session is already in progress - disconnecting!"
await stream.close()
else:
chatproto.conn = stream
chatproto.connected = true
2019-09-12 15:54:12 -06:00
# assign the new handler
chatproto.handler = handle
return chatproto
2019-09-12 15:54:12 -06:00
proc readInput(wfd: AsyncFD) {.thread.} =
2019-09-12 15:54:12 -06:00
## This procedure performs reading from `stdin` and sends data over
## pipe to main thread.
let transp = fromPipe(wfd)
2019-09-12 15:54:12 -06:00
while true:
let line = stdin.readLine()
2019-09-25 13:33:50 -06:00
discard waitFor transp.write(line & "\r\n")
2019-09-12 15:54:12 -06:00
proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let transp = fromPipe(rfd)
2019-09-12 15:54:12 -06:00
let seckey = PrivateKey.random(RSA, rng[]).get()
let peerInfo = PeerInfo.init(seckey)
2019-09-12 18:05:20 -06:00
var localAddress = DefaultAddr
while true:
echo &"Type an address to bind to or Enter to use the default {DefaultAddr}"
let a = await transp.readLine()
try:
if a.len > 0:
peerInfo.addrs.add(Multiaddress.init(a).tryGet())
2019-09-12 18:05:20 -06:00
break
peerInfo.addrs.add(Multiaddress.init(localAddress).tryGet())
2019-09-12 18:05:20 -06:00
break
except:
echo "invalid address"
localAddress = DefaultAddr
continue
# a constructor for building different multiplexers under various connections
2019-09-12 15:54:12 -06:00
proc createMplex(conn: Connection): Muxer =
result = Mplex.init(conn)
2019-09-12 15:54:12 -06:00
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(TcpTransport.init())]
let muxers = [(MplexCodec, mplexProvider)].toTable()
let identify = newIdentify(peerInfo)
let secureManagers = [Secure(newSecio(rng, seckey))]
let switch = newSwitch(peerInfo,
transports,
identify,
muxers,
secureManagers)
2019-09-12 15:54:12 -06:00
let chatProto = newChatProto(switch, transp)
2019-09-12 15:54:12 -06:00
switch.mount(chatProto)
let libp2pFuts = await switch.start()
2019-09-12 17:20:30 -06:00
chatProto.started = true
let id = $peerInfo.peerId
2019-09-12 17:20:30 -06:00
echo "PeerID: " & id
2019-09-12 15:54:12 -06:00
echo "listening on: "
2019-09-12 17:20:30 -06:00
for a in peerInfo.addrs:
echo &"{a}/ipfs/{id}"
2019-09-12 15:54:12 -06:00
await chatProto.readWriteLoop()
await allFuturesThrowing(libp2pFuts)
2019-09-12 15:54:12 -06:00
proc main() {.async.} =
let rng = newRng() # Singe random number source for the whole application
let (rfd, wfd) = createAsyncPipe()
2019-09-12 15:54:12 -06:00
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
raise newException(ValueError, "Could not initialize pipe!")
2019-09-12 15:54:12 -06:00
var thread: Thread[AsyncFD]
thread.createThread(readInput, wfd)
await processInput(rfd, rng)
2019-09-12 15:54:12 -06:00
when isMainModule: # isMainModule = true when the module is compiled as the main file
2019-09-12 15:54:12 -06:00
waitFor(main())