From d1a7c08a0a585273e601f551e9b49d76274a3572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=A9=B7=E5=A9=B7?= Date: Thu, 9 Apr 2020 01:21:06 +0800 Subject: [PATCH] Add the tutorial folder to store the sample code and modify directchat to make it more beginner friendly (#126) * update readme and organize the example folder Co-authored-by: Dmitriy Ryajov --- {doc => docs}/API.md | 0 {doc => docs}/GETTING_STARTED.md | 0 {doc => docs}/GO_DAEMON.md | 0 docs/tutorial/second.nim | 151 ++++++++++++++++++++++++++ docs/tutorial/start.nim | 39 +++++++ examples/directchat.nim | 177 ++++++++++++++----------------- 6 files changed, 271 insertions(+), 96 deletions(-) rename {doc => docs}/API.md (100%) rename {doc => docs}/GETTING_STARTED.md (100%) rename {doc => docs}/GO_DAEMON.md (100%) create mode 100644 docs/tutorial/second.nim create mode 100644 docs/tutorial/start.nim diff --git a/doc/API.md b/docs/API.md similarity index 100% rename from doc/API.md rename to docs/API.md diff --git a/doc/GETTING_STARTED.md b/docs/GETTING_STARTED.md similarity index 100% rename from doc/GETTING_STARTED.md rename to docs/GETTING_STARTED.md diff --git a/doc/GO_DAEMON.md b/docs/GO_DAEMON.md similarity index 100% rename from doc/GO_DAEMON.md rename to docs/GO_DAEMON.md diff --git a/docs/tutorial/second.nim b/docs/tutorial/second.nim new file mode 100644 index 000000000..3addd81a3 --- /dev/null +++ b/docs/tutorial/second.nim @@ -0,0 +1,151 @@ +when not(compileOption("threads")): + {.fatal: "Please, compile this program with the --threads:on option!".} + +import tables, strformat, strutils +import chronos +import ../libp2p/[switch, + multistream, + crypto/crypto, + protocols/identify, + connection, + transports/transport, + transports/tcptransport, + multiaddress, + peerinfo, + peer, + protocols/protocol, + protocols/secure/secure, + protocols/secure/secio, + muxers/muxer, + muxers/mplex/mplex, + muxers/mplex/types] + +const ChatCodec = "/nim-libp2p/chat/1.0.0" +const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" + +const Help = """ + Commands: /[?|hep|connect|disconnect|exit] + help: Prints this help + connect: dials a remote peer + disconnect: ends current session + exit: closes the chat +""" + +type ChatProto = ref object of LPProtocol + switch: Switch # a single entry point for dialing and listening to peer + transp: StreamTransport # transport streams between read & write file descriptor + conn: Connection # create and close read & write stream + connected: bool # if the node is connected to another peer + started: bool # if the node has started + +# copied from https://github.com/status-im/nim-beacon-chain/blob/0ed657e953740a92458f23033d47483ffa17ccb0/beacon_chain/eth2_network.nim#L109-L115 +proc initAddress(T: type MultiAddress, str: string): T = + let address = MultiAddress.init(str) + if IPFS.match(address) and matchPartial(multiaddress.TCP, address): + result = address + else: + raise newException(MultiAddressError, + "Invalid bootstrap node multi-address") + +proc dialPeer(p: ChatProto, address: string) {.async.} = + let multiAddr = MultiAddress.initAddress(address); + let parts = address.split("/") + let remotePeer = PeerInfo.init(parts[^1], + [multiAddr]) + + echo &"dialing peer: {multiAddr}" + p.conn = await p.switch.dial(remotePeer, ChatCodec) + p.connected = true + +proc readAndPrint(p: ChatProto) {.async.} = + while true: + while p.connected: + echo cast[string](await p.conn.readLp()) + await sleepAsync(100.millis) + +proc writeAndPrint(p: ChatProto) {.async.} = + while true: + if not p.connected: + echo "type an address or wait for a connection:" + echo "type /[help|?] for help" + + let line = await p.transp.readLine() + if line.startsWith("/help") or line.startsWith("/?") or not p.started: + echo Help + continue + + if line.startsWith("/disconnect"): + echo "Ending current session" + if p.connected and p.conn.closed.not: + await p.conn.close() + p.connected = false + elif line.startsWith("/connect"): + if p.connected: + var yesno = "N" + echo "a session is already in progress, do you want end it [y/N]?" + yesno = await p.transp.readLine() + if yesno.cmpIgnoreCase("y") == 0: + await p.conn.close() + p.connected = false + elif yesno.cmpIgnoreCase("n") == 0: + continue + else: + echo "unrecognized response" + continue + + echo "enter address of remote peer" + let address = await p.transp.readLine() + if address.len > 0: + await p.dialPeer(address) + + elif line.startsWith("/exit"): + if p.connected and p.conn.closed.not: + await p.conn.close() + p.connected = false + + await p.switch.stop() + echo "quitting..." + quit(0) + else: + if p.connected: + await p.conn.writeLp(line) + else: + try: + if line.startsWith("/") and "ipfs" in line: + await p.dialPeer(line) + except: + echo &"unable to dial remote peer {line}" + echo getCurrentExceptionMsg() + +proc readWriteLoop(p: ChatProto) {.async.} = + asyncCheck p.writeAndPrint() # execute the async function but does not block + asyncCheck p.readAndPrint() + +proc processInput(rfd: AsyncFD) {.async.} = + let transp = fromPipe(rfd) + while true: + let a = await transp.readLine() + echo "You just entered: " & a + +proc readInput(wfd: AsyncFD) {.thread.} = + ## This procedure performs reading from `stdin` and sends data over + ## pipe to main thread. + let transp = fromPipe(wfd) + + while true: + let line = stdin.readLine() + discard waitFor transp.write(line & "\r\n") + +proc main() {.async.} = + let (rfd, wfd) = createAsyncPipe() + if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: + raise newException(ValueError, "Could not initialize pipe!") + + var thread: Thread[AsyncFD] + thread.createThread(readInput, wfd) + + await processInput(rfd) + +when isMainModule: # isMainModule = true when the module is compiled as the main file + waitFor(main()) + \ No newline at end of file diff --git a/docs/tutorial/start.nim b/docs/tutorial/start.nim new file mode 100644 index 000000000..7e7b8326f --- /dev/null +++ b/docs/tutorial/start.nim @@ -0,0 +1,39 @@ +when not(compileOption("threads")): + {.fatal: "Please, compile this program with the --threads:on option!".} + +import chronos # an efficient library for async + +proc processInput(rfd: AsyncFD) {.async.} = + echo "Type something below to see if the multithread IO works:\nType 'exit' to exit." + + let transp = fromPipe(rfd) + while true: + let a = await transp.readLine() + + if a == "exit": + quit(0); + + echo "You just entered: " & a + +proc readInput(wfd: AsyncFD) {.thread.} = + ## This procedure performs reading from `stdin` and sends data over + ## pipe to main thread. + let transp = fromPipe(wfd) + + while true: + let line = stdin.readLine() + discard waitFor transp.write(line & "\r\n") + +proc main() {.async.} = + let (rfd, wfd) = createAsyncPipe() + if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: + raise newException(ValueError, "Could not initialize pipe!") + + var thread: Thread[AsyncFD] + thread.createThread(readInput, wfd) + + await processInput(rfd) + +when isMainModule: # isMainModule = true when the module is compiled as the main file + waitFor(main()) + \ No newline at end of file diff --git a/examples/directchat.nim b/examples/directchat.nim index 48379f9c7..4ba596d9e 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -1,26 +1,24 @@ when not(compileOption("threads")): {.fatal: "Please, compile this program with the --threads:on option!".} -import tables, options, sequtils, algorithm, strformat, os, strutils -import chronos -import ../libp2p/[switch, - multistream, - crypto/crypto, - protocols/identify, - connection, - transports/transport, - transports/tcptransport, - multiaddress, - peerinfo, - peer, - protocols/protocol, - protocols/secure/secure, - protocols/secure/secio, - protocols/pubsub/pubsub, - protocols/pubsub/floodsub, - muxers/muxer, - muxers/mplex/mplex, - muxers/mplex/types] +import tables, strformat, strutils +import chronos # an efficient library for async +import ../libp2p/[switch, # manage transports, a single entry point for dialing and listening + multistream, # tag stream with short header to identify it + crypto/crypto, # cryptographic functions + protocols/identify, # identify the peer info of a peer + connection, # create and close stream read / write connections + transports/transport, # listen and dial to other peers using p2p protocol + transports/tcptransport, # listen and dial to other peers using client-server protocol + multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP + peerinfo, # manage the information of a peer, such as peer ID and public / private key + peer, # Implement how peers interact + protocols/protocol, # define the protocol base type + protocols/secure/secure, # define the protocol of secure connection + protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS + muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection + muxers/mplex/mplex, # implement stream multiplexing + muxers/mplex/types] # define some contants and message types for stream multiplexing const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" @@ -33,56 +31,47 @@ const Help = """ exit: closes the chat """ -type - CustomData = ref object - consoleFd: AsyncFD - serveFut: Future[void] +type ChatProto = ref object of LPProtocol + switch: Switch # a single entry point for dialing and listening to peer + transp: StreamTransport # transport streams between read & write file descriptor + conn: Connection # create and close read & write stream + connected: bool # if the node is connected to another peer + started: bool # if the node has started - ChatProto = ref object of LPProtocol - customData*: CustomData - switch: Switch - transp: StreamTransport - conn: Connection - client: bool - connected: bool - started: bool -proc id (p: ChatProto): string = - if not isNil(p.conn.peerInfo): - $p.conn.peerInfo.peerId +proc initAddress(T: type MultiAddress, str: string): T = + let address = MultiAddress.init(str) + if IPFS.match(address) and matchPartial(multiaddress.TCP, address): + result = address else: - "unknown" + raise newException(MultiAddressError, + "Invalid bootstrap node multi-address") -# forward declaration -proc readWriteLoop(p: ChatProto) {.async, gcsafe.} -proc readAndPrint(p: ChatProto) {.async, gcsafe.} = - while true: - while p.connected: - # echo &"{p.id} -> " - echo cast[string](await p.conn.readLp()) - await sleepAsync(100.millis) +proc dialPeer(p: ChatProto, address: string) {.async.} = + let multiAddr = MultiAddress.initAddress(address); + let parts = address.split("/") + let remotePeer = PeerInfo.init(parts[^1], + [multiAddr]) -proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} = - var parts = address.split("/") - if parts.len == 11 and parts[^2] notin ["ipfs", "p2p"]: - quit("invalid or incompelete peerId") - - var remotePeer = PeerInfo.init(parts[^1], - [MultiAddress.init(address)]) - - echo &"dialing peer: {address}" + echo &"dialing peer: {multiAddr}" p.conn = await p.switch.dial(remotePeer, ChatCodec) p.connected = true -proc writeAndPrint(p: ChatProto) {.async, gcsafe.} = +proc readAndPrint(p: ChatProto) {.async.} = + while true: + while p.connected: + # TODO: echo &"{p.id} -> " + + echo cast[string](await p.conn.readLp()) + await sleepAsync(100.millis) + +proc writeAndPrint(p: ChatProto) {.async.} = while true: if not p.connected: - # echo &"{p.id} ->" - # else: echo "type an address or wait for a connection:" echo "type /[help|?] for help" - var line = await p.transp.readLine() + let line = await p.transp.readLine() if line.startsWith("/help") or line.startsWith("/?") or not p.started: echo Help continue @@ -128,44 +117,42 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} = await p.dialPeer(line) except: echo &"unable to dial remote peer {line}" - # echo getCurrentExceptionMsg() + echo getCurrentExceptionMsg() -proc readWriteLoop(p: ChatProto) {.async, gcsafe.} = - asyncCheck p.writeAndPrint() +proc readWriteLoop(p: ChatProto) {.async.} = + asyncCheck p.writeAndPrint() # execute the async function but does not block asyncCheck p.readAndPrint() -method init(p: ChatProto) {.gcsafe.} = - proc handle(stream: Connection, proto: string) {.async, gcsafe.} = - if p.connected and not p.conn.closed: +proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto = + var chatproto = ChatProto(switch: switch, transp: transp, codec: ChatCodec) + + # create handler for incoming connection + proc handle(stream: Connection, proto: string) {.async.} = + if chatproto.connected and not chatproto.conn.closed: echo "a chat session is already in progress - disconnecting!" await stream.close() else: - p.conn = stream - p.connected = true + chatproto.conn = stream + chatproto.connected = true - p.codec = ChatCodec - p.handler = handle + # assign the new handler + chatproto.handler = handle + return chatproto -proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto = - new result - result.switch = switch - result.transp = transp - result.init() - -proc threadMain(wfd: AsyncFD) {.thread.} = +proc readInput(wfd: AsyncFD) {.thread.} = ## This procedure performs reading from `stdin` and sends data over ## pipe to main thread. - var transp = fromPipe(wfd) + let transp = fromPipe(wfd) while true: - var line = stdin.readLine() + let line = stdin.readLine() discard waitFor transp.write(line & "\r\n") -proc serveThread(customData: CustomData) {.async.} = - var transp = fromPipe(customData.consoleFd) +proc processInput(rfd: AsyncFD) {.async.} = + let transp = fromPipe(rfd) let seckey = PrivateKey.random(RSA) - var peerInfo = PeerInfo.init(seckey) + let peerInfo = PeerInfo.init(seckey) var localAddress = DefaultAddr while true: echo &"Type an address to bind to or Enter to use the default {DefaultAddr}" @@ -182,23 +169,24 @@ proc serveThread(customData: CustomData) {.async.} = localAddress = DefaultAddr continue + # a constructor for building different multiplexers under various connections proc createMplex(conn: Connection): Muxer = result = newMplex(conn) - var mplexProvider = newMuxerProvider(createMplex, MplexCodec) - var transports = @[Transport(newTransport(TcpTransport))] - var muxers = [(MplexCodec, mplexProvider)].toTable() - var identify = newIdentify(peerInfo) - var secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() - var switch = newSwitch(peerInfo, + let mplexProvider = newMuxerProvider(createMplex, MplexCodec) + let transports = @[Transport(newTransport(TcpTransport))] + let muxers = [(MplexCodec, mplexProvider)].toTable() + let identify = newIdentify(peerInfo) + let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() + let switch = newSwitch(peerInfo, transports, identify, muxers, - secureManagers = secureManagers) + secureManagers) - var chatProto = newChatProto(switch, transp) + let chatProto = newChatProto(switch, transp) switch.mount(chatProto) - var libp2pFuts = await switch.start() + let libp2pFuts = await switch.start() chatProto.started = true let id = peerInfo.peerId.pretty @@ -211,17 +199,14 @@ proc serveThread(customData: CustomData) {.async.} = await allFutures(libp2pFuts) proc main() {.async.} = - var data = new CustomData - - var (rfd, wfd) = createAsyncPipe() + let (rfd, wfd) = createAsyncPipe() if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: raise newException(ValueError, "Could not initialize pipe!") - data.consoleFd = rfd - data.serveFut = serveThread(data) var thread: Thread[AsyncFD] - thread.createThread(threadMain, wfd) - await data.serveFut + thread.createThread(readInput, wfd) -when isMainModule: + await processInput(rfd) + +when isMainModule: # isMainModule = true when the module is compiled as the main file waitFor(main())