mirror of https://github.com/vacp2p/nim-libp2p.git
Add the tutorial folder to store the sample code and modify directchat to make it more beginner friendly (#126)
* update readme and organize the example folder Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
parent
f4740c8b8e
commit
d1a7c08a0a
|
@ -0,0 +1,151 @@
|
||||||
|
when not(compileOption("threads")):
|
||||||
|
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||||
|
|
||||||
|
import tables, strformat, strutils
|
||||||
|
import chronos
|
||||||
|
import ../libp2p/[switch,
|
||||||
|
multistream,
|
||||||
|
crypto/crypto,
|
||||||
|
protocols/identify,
|
||||||
|
connection,
|
||||||
|
transports/transport,
|
||||||
|
transports/tcptransport,
|
||||||
|
multiaddress,
|
||||||
|
peerinfo,
|
||||||
|
peer,
|
||||||
|
protocols/protocol,
|
||||||
|
protocols/secure/secure,
|
||||||
|
protocols/secure/secio,
|
||||||
|
muxers/muxer,
|
||||||
|
muxers/mplex/mplex,
|
||||||
|
muxers/mplex/types]
|
||||||
|
|
||||||
|
const ChatCodec = "/nim-libp2p/chat/1.0.0"
|
||||||
|
const DefaultAddr = "/ip4/127.0.0.1/tcp/55505"
|
||||||
|
|
||||||
|
const Help = """
|
||||||
|
Commands: /[?|hep|connect|disconnect|exit]
|
||||||
|
help: Prints this help
|
||||||
|
connect: dials a remote peer
|
||||||
|
disconnect: ends current session
|
||||||
|
exit: closes the chat
|
||||||
|
"""
|
||||||
|
|
||||||
|
type ChatProto = ref object of LPProtocol
|
||||||
|
switch: Switch # a single entry point for dialing and listening to peer
|
||||||
|
transp: StreamTransport # transport streams between read & write file descriptor
|
||||||
|
conn: Connection # create and close read & write stream
|
||||||
|
connected: bool # if the node is connected to another peer
|
||||||
|
started: bool # if the node has started
|
||||||
|
|
||||||
|
# copied from https://github.com/status-im/nim-beacon-chain/blob/0ed657e953740a92458f23033d47483ffa17ccb0/beacon_chain/eth2_network.nim#L109-L115
|
||||||
|
proc initAddress(T: type MultiAddress, str: string): T =
|
||||||
|
let address = MultiAddress.init(str)
|
||||||
|
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
|
||||||
|
result = address
|
||||||
|
else:
|
||||||
|
raise newException(MultiAddressError,
|
||||||
|
"Invalid bootstrap node multi-address")
|
||||||
|
|
||||||
|
proc dialPeer(p: ChatProto, address: string) {.async.} =
|
||||||
|
let multiAddr = MultiAddress.initAddress(address);
|
||||||
|
let parts = address.split("/")
|
||||||
|
let remotePeer = PeerInfo.init(parts[^1],
|
||||||
|
[multiAddr])
|
||||||
|
|
||||||
|
echo &"dialing peer: {multiAddr}"
|
||||||
|
p.conn = await p.switch.dial(remotePeer, ChatCodec)
|
||||||
|
p.connected = true
|
||||||
|
|
||||||
|
proc readAndPrint(p: ChatProto) {.async.} =
|
||||||
|
while true:
|
||||||
|
while p.connected:
|
||||||
|
echo cast[string](await p.conn.readLp())
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
proc writeAndPrint(p: ChatProto) {.async.} =
|
||||||
|
while true:
|
||||||
|
if not p.connected:
|
||||||
|
echo "type an address or wait for a connection:"
|
||||||
|
echo "type /[help|?] for help"
|
||||||
|
|
||||||
|
let line = await p.transp.readLine()
|
||||||
|
if line.startsWith("/help") or line.startsWith("/?") or not p.started:
|
||||||
|
echo Help
|
||||||
|
continue
|
||||||
|
|
||||||
|
if line.startsWith("/disconnect"):
|
||||||
|
echo "Ending current session"
|
||||||
|
if p.connected and p.conn.closed.not:
|
||||||
|
await p.conn.close()
|
||||||
|
p.connected = false
|
||||||
|
elif line.startsWith("/connect"):
|
||||||
|
if p.connected:
|
||||||
|
var yesno = "N"
|
||||||
|
echo "a session is already in progress, do you want end it [y/N]?"
|
||||||
|
yesno = await p.transp.readLine()
|
||||||
|
if yesno.cmpIgnoreCase("y") == 0:
|
||||||
|
await p.conn.close()
|
||||||
|
p.connected = false
|
||||||
|
elif yesno.cmpIgnoreCase("n") == 0:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
echo "unrecognized response"
|
||||||
|
continue
|
||||||
|
|
||||||
|
echo "enter address of remote peer"
|
||||||
|
let address = await p.transp.readLine()
|
||||||
|
if address.len > 0:
|
||||||
|
await p.dialPeer(address)
|
||||||
|
|
||||||
|
elif line.startsWith("/exit"):
|
||||||
|
if p.connected and p.conn.closed.not:
|
||||||
|
await p.conn.close()
|
||||||
|
p.connected = false
|
||||||
|
|
||||||
|
await p.switch.stop()
|
||||||
|
echo "quitting..."
|
||||||
|
quit(0)
|
||||||
|
else:
|
||||||
|
if p.connected:
|
||||||
|
await p.conn.writeLp(line)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
if line.startsWith("/") and "ipfs" in line:
|
||||||
|
await p.dialPeer(line)
|
||||||
|
except:
|
||||||
|
echo &"unable to dial remote peer {line}"
|
||||||
|
echo getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
proc readWriteLoop(p: ChatProto) {.async.} =
|
||||||
|
asyncCheck p.writeAndPrint() # execute the async function but does not block
|
||||||
|
asyncCheck p.readAndPrint()
|
||||||
|
|
||||||
|
proc processInput(rfd: AsyncFD) {.async.} =
|
||||||
|
let transp = fromPipe(rfd)
|
||||||
|
while true:
|
||||||
|
let a = await transp.readLine()
|
||||||
|
echo "You just entered: " & a
|
||||||
|
|
||||||
|
proc readInput(wfd: AsyncFD) {.thread.} =
|
||||||
|
## This procedure performs reading from `stdin` and sends data over
|
||||||
|
## pipe to main thread.
|
||||||
|
let transp = fromPipe(wfd)
|
||||||
|
|
||||||
|
while true:
|
||||||
|
let line = stdin.readLine()
|
||||||
|
discard waitFor transp.write(line & "\r\n")
|
||||||
|
|
||||||
|
proc main() {.async.} =
|
||||||
|
let (rfd, wfd) = createAsyncPipe()
|
||||||
|
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
||||||
|
raise newException(ValueError, "Could not initialize pipe!")
|
||||||
|
|
||||||
|
var thread: Thread[AsyncFD]
|
||||||
|
thread.createThread(readInput, wfd)
|
||||||
|
|
||||||
|
await processInput(rfd)
|
||||||
|
|
||||||
|
when isMainModule: # isMainModule = true when the module is compiled as the main file
|
||||||
|
waitFor(main())
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
when not(compileOption("threads")):
|
||||||
|
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||||
|
|
||||||
|
import chronos # an efficient library for async
|
||||||
|
|
||||||
|
proc processInput(rfd: AsyncFD) {.async.} =
|
||||||
|
echo "Type something below to see if the multithread IO works:\nType 'exit' to exit."
|
||||||
|
|
||||||
|
let transp = fromPipe(rfd)
|
||||||
|
while true:
|
||||||
|
let a = await transp.readLine()
|
||||||
|
|
||||||
|
if a == "exit":
|
||||||
|
quit(0);
|
||||||
|
|
||||||
|
echo "You just entered: " & a
|
||||||
|
|
||||||
|
proc readInput(wfd: AsyncFD) {.thread.} =
|
||||||
|
## This procedure performs reading from `stdin` and sends data over
|
||||||
|
## pipe to main thread.
|
||||||
|
let transp = fromPipe(wfd)
|
||||||
|
|
||||||
|
while true:
|
||||||
|
let line = stdin.readLine()
|
||||||
|
discard waitFor transp.write(line & "\r\n")
|
||||||
|
|
||||||
|
proc main() {.async.} =
|
||||||
|
let (rfd, wfd) = createAsyncPipe()
|
||||||
|
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
||||||
|
raise newException(ValueError, "Could not initialize pipe!")
|
||||||
|
|
||||||
|
var thread: Thread[AsyncFD]
|
||||||
|
thread.createThread(readInput, wfd)
|
||||||
|
|
||||||
|
await processInput(rfd)
|
||||||
|
|
||||||
|
when isMainModule: # isMainModule = true when the module is compiled as the main file
|
||||||
|
waitFor(main())
|
||||||
|
|
|
@ -1,26 +1,24 @@
|
||||||
when not(compileOption("threads")):
|
when not(compileOption("threads")):
|
||||||
{.fatal: "Please, compile this program with the --threads:on option!".}
|
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||||
|
|
||||||
import tables, options, sequtils, algorithm, strformat, os, strutils
|
import tables, strformat, strutils
|
||||||
import chronos
|
import chronos # an efficient library for async
|
||||||
import ../libp2p/[switch,
|
import ../libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
||||||
multistream,
|
multistream, # tag stream with short header to identify it
|
||||||
crypto/crypto,
|
crypto/crypto, # cryptographic functions
|
||||||
protocols/identify,
|
protocols/identify, # identify the peer info of a peer
|
||||||
connection,
|
connection, # create and close stream read / write connections
|
||||||
transports/transport,
|
transports/transport, # listen and dial to other peers using p2p protocol
|
||||||
transports/tcptransport,
|
transports/tcptransport, # listen and dial to other peers using client-server protocol
|
||||||
multiaddress,
|
multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP
|
||||||
peerinfo,
|
peerinfo, # manage the information of a peer, such as peer ID and public / private key
|
||||||
peer,
|
peer, # Implement how peers interact
|
||||||
protocols/protocol,
|
protocols/protocol, # define the protocol base type
|
||||||
protocols/secure/secure,
|
protocols/secure/secure, # define the protocol of secure connection
|
||||||
protocols/secure/secio,
|
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
|
||||||
protocols/pubsub/pubsub,
|
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
|
||||||
protocols/pubsub/floodsub,
|
muxers/mplex/mplex, # implement stream multiplexing
|
||||||
muxers/muxer,
|
muxers/mplex/types] # define some contants and message types for stream multiplexing
|
||||||
muxers/mplex/mplex,
|
|
||||||
muxers/mplex/types]
|
|
||||||
|
|
||||||
const ChatCodec = "/nim-libp2p/chat/1.0.0"
|
const ChatCodec = "/nim-libp2p/chat/1.0.0"
|
||||||
const DefaultAddr = "/ip4/127.0.0.1/tcp/55505"
|
const DefaultAddr = "/ip4/127.0.0.1/tcp/55505"
|
||||||
|
@ -33,56 +31,47 @@ const Help = """
|
||||||
exit: closes the chat
|
exit: closes the chat
|
||||||
"""
|
"""
|
||||||
|
|
||||||
type
|
type ChatProto = ref object of LPProtocol
|
||||||
CustomData = ref object
|
switch: Switch # a single entry point for dialing and listening to peer
|
||||||
consoleFd: AsyncFD
|
transp: StreamTransport # transport streams between read & write file descriptor
|
||||||
serveFut: Future[void]
|
conn: Connection # create and close read & write stream
|
||||||
|
connected: bool # if the node is connected to another peer
|
||||||
|
started: bool # if the node has started
|
||||||
|
|
||||||
ChatProto = ref object of LPProtocol
|
|
||||||
customData*: CustomData
|
|
||||||
switch: Switch
|
|
||||||
transp: StreamTransport
|
|
||||||
conn: Connection
|
|
||||||
client: bool
|
|
||||||
connected: bool
|
|
||||||
started: bool
|
|
||||||
|
|
||||||
proc id (p: ChatProto): string =
|
proc initAddress(T: type MultiAddress, str: string): T =
|
||||||
if not isNil(p.conn.peerInfo):
|
let address = MultiAddress.init(str)
|
||||||
$p.conn.peerInfo.peerId
|
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
|
||||||
|
result = address
|
||||||
else:
|
else:
|
||||||
"unknown"
|
raise newException(MultiAddressError,
|
||||||
|
"Invalid bootstrap node multi-address")
|
||||||
|
|
||||||
# forward declaration
|
proc dialPeer(p: ChatProto, address: string) {.async.} =
|
||||||
proc readWriteLoop(p: ChatProto) {.async, gcsafe.}
|
let multiAddr = MultiAddress.initAddress(address);
|
||||||
proc readAndPrint(p: ChatProto) {.async, gcsafe.} =
|
let parts = address.split("/")
|
||||||
while true:
|
let remotePeer = PeerInfo.init(parts[^1],
|
||||||
while p.connected:
|
[multiAddr])
|
||||||
# echo &"{p.id} -> "
|
|
||||||
echo cast[string](await p.conn.readLp())
|
|
||||||
await sleepAsync(100.millis)
|
|
||||||
|
|
||||||
proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} =
|
echo &"dialing peer: {multiAddr}"
|
||||||
var parts = address.split("/")
|
|
||||||
if parts.len == 11 and parts[^2] notin ["ipfs", "p2p"]:
|
|
||||||
quit("invalid or incompelete peerId")
|
|
||||||
|
|
||||||
var remotePeer = PeerInfo.init(parts[^1],
|
|
||||||
[MultiAddress.init(address)])
|
|
||||||
|
|
||||||
echo &"dialing peer: {address}"
|
|
||||||
p.conn = await p.switch.dial(remotePeer, ChatCodec)
|
p.conn = await p.switch.dial(remotePeer, ChatCodec)
|
||||||
p.connected = true
|
p.connected = true
|
||||||
|
|
||||||
proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
|
proc readAndPrint(p: ChatProto) {.async.} =
|
||||||
|
while true:
|
||||||
|
while p.connected:
|
||||||
|
# TODO: echo &"{p.id} -> "
|
||||||
|
|
||||||
|
echo cast[string](await p.conn.readLp())
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
proc writeAndPrint(p: ChatProto) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
if not p.connected:
|
if not p.connected:
|
||||||
# echo &"{p.id} ->"
|
|
||||||
# else:
|
|
||||||
echo "type an address or wait for a connection:"
|
echo "type an address or wait for a connection:"
|
||||||
echo "type /[help|?] for help"
|
echo "type /[help|?] for help"
|
||||||
|
|
||||||
var line = await p.transp.readLine()
|
let line = await p.transp.readLine()
|
||||||
if line.startsWith("/help") or line.startsWith("/?") or not p.started:
|
if line.startsWith("/help") or line.startsWith("/?") or not p.started:
|
||||||
echo Help
|
echo Help
|
||||||
continue
|
continue
|
||||||
|
@ -128,44 +117,42 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
|
||||||
await p.dialPeer(line)
|
await p.dialPeer(line)
|
||||||
except:
|
except:
|
||||||
echo &"unable to dial remote peer {line}"
|
echo &"unable to dial remote peer {line}"
|
||||||
# echo getCurrentExceptionMsg()
|
echo getCurrentExceptionMsg()
|
||||||
|
|
||||||
proc readWriteLoop(p: ChatProto) {.async, gcsafe.} =
|
proc readWriteLoop(p: ChatProto) {.async.} =
|
||||||
asyncCheck p.writeAndPrint()
|
asyncCheck p.writeAndPrint() # execute the async function but does not block
|
||||||
asyncCheck p.readAndPrint()
|
asyncCheck p.readAndPrint()
|
||||||
|
|
||||||
method init(p: ChatProto) {.gcsafe.} =
|
proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto =
|
||||||
proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
|
var chatproto = ChatProto(switch: switch, transp: transp, codec: ChatCodec)
|
||||||
if p.connected and not p.conn.closed:
|
|
||||||
|
# create handler for incoming connection
|
||||||
|
proc handle(stream: Connection, proto: string) {.async.} =
|
||||||
|
if chatproto.connected and not chatproto.conn.closed:
|
||||||
echo "a chat session is already in progress - disconnecting!"
|
echo "a chat session is already in progress - disconnecting!"
|
||||||
await stream.close()
|
await stream.close()
|
||||||
else:
|
else:
|
||||||
p.conn = stream
|
chatproto.conn = stream
|
||||||
p.connected = true
|
chatproto.connected = true
|
||||||
|
|
||||||
p.codec = ChatCodec
|
# assign the new handler
|
||||||
p.handler = handle
|
chatproto.handler = handle
|
||||||
|
return chatproto
|
||||||
|
|
||||||
proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto =
|
proc readInput(wfd: AsyncFD) {.thread.} =
|
||||||
new result
|
|
||||||
result.switch = switch
|
|
||||||
result.transp = transp
|
|
||||||
result.init()
|
|
||||||
|
|
||||||
proc threadMain(wfd: AsyncFD) {.thread.} =
|
|
||||||
## This procedure performs reading from `stdin` and sends data over
|
## This procedure performs reading from `stdin` and sends data over
|
||||||
## pipe to main thread.
|
## pipe to main thread.
|
||||||
var transp = fromPipe(wfd)
|
let transp = fromPipe(wfd)
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
var line = stdin.readLine()
|
let line = stdin.readLine()
|
||||||
discard waitFor transp.write(line & "\r\n")
|
discard waitFor transp.write(line & "\r\n")
|
||||||
|
|
||||||
proc serveThread(customData: CustomData) {.async.} =
|
proc processInput(rfd: AsyncFD) {.async.} =
|
||||||
var transp = fromPipe(customData.consoleFd)
|
let transp = fromPipe(rfd)
|
||||||
|
|
||||||
let seckey = PrivateKey.random(RSA)
|
let seckey = PrivateKey.random(RSA)
|
||||||
var peerInfo = PeerInfo.init(seckey)
|
let peerInfo = PeerInfo.init(seckey)
|
||||||
var localAddress = DefaultAddr
|
var localAddress = DefaultAddr
|
||||||
while true:
|
while true:
|
||||||
echo &"Type an address to bind to or Enter to use the default {DefaultAddr}"
|
echo &"Type an address to bind to or Enter to use the default {DefaultAddr}"
|
||||||
|
@ -182,23 +169,24 @@ proc serveThread(customData: CustomData) {.async.} =
|
||||||
localAddress = DefaultAddr
|
localAddress = DefaultAddr
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# a constructor for building different multiplexers under various connections
|
||||||
proc createMplex(conn: Connection): Muxer =
|
proc createMplex(conn: Connection): Muxer =
|
||||||
result = newMplex(conn)
|
result = newMplex(conn)
|
||||||
|
|
||||||
var mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||||
var transports = @[Transport(newTransport(TcpTransport))]
|
let transports = @[Transport(newTransport(TcpTransport))]
|
||||||
var muxers = [(MplexCodec, mplexProvider)].toTable()
|
let muxers = [(MplexCodec, mplexProvider)].toTable()
|
||||||
var identify = newIdentify(peerInfo)
|
let identify = newIdentify(peerInfo)
|
||||||
var secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable()
|
let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable()
|
||||||
var switch = newSwitch(peerInfo,
|
let switch = newSwitch(peerInfo,
|
||||||
transports,
|
transports,
|
||||||
identify,
|
identify,
|
||||||
muxers,
|
muxers,
|
||||||
secureManagers = secureManagers)
|
secureManagers)
|
||||||
|
|
||||||
var chatProto = newChatProto(switch, transp)
|
let chatProto = newChatProto(switch, transp)
|
||||||
switch.mount(chatProto)
|
switch.mount(chatProto)
|
||||||
var libp2pFuts = await switch.start()
|
let libp2pFuts = await switch.start()
|
||||||
chatProto.started = true
|
chatProto.started = true
|
||||||
|
|
||||||
let id = peerInfo.peerId.pretty
|
let id = peerInfo.peerId.pretty
|
||||||
|
@ -211,17 +199,14 @@ proc serveThread(customData: CustomData) {.async.} =
|
||||||
await allFutures(libp2pFuts)
|
await allFutures(libp2pFuts)
|
||||||
|
|
||||||
proc main() {.async.} =
|
proc main() {.async.} =
|
||||||
var data = new CustomData
|
let (rfd, wfd) = createAsyncPipe()
|
||||||
|
|
||||||
var (rfd, wfd) = createAsyncPipe()
|
|
||||||
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
||||||
raise newException(ValueError, "Could not initialize pipe!")
|
raise newException(ValueError, "Could not initialize pipe!")
|
||||||
|
|
||||||
data.consoleFd = rfd
|
|
||||||
data.serveFut = serveThread(data)
|
|
||||||
var thread: Thread[AsyncFD]
|
var thread: Thread[AsyncFD]
|
||||||
thread.createThread(threadMain, wfd)
|
thread.createThread(readInput, wfd)
|
||||||
await data.serveFut
|
|
||||||
|
|
||||||
when isMainModule:
|
await processInput(rfd)
|
||||||
|
|
||||||
|
when isMainModule: # isMainModule = true when the module is compiled as the main file
|
||||||
waitFor(main())
|
waitFor(main())
|
||||||
|
|
Loading…
Reference in New Issue