225 lines
6.4 KiB
Nim
225 lines
6.4 KiB
Nim
when not(compileOption("threads")):
|
|
{.fatal: "Please, compile this program with the --threads:on option!".}
|
|
|
|
import tables, options, sequtils, algorithm, strformat, os, strutils
|
|
import chronos
|
|
import ../libp2p/switch,
|
|
../libp2p/multistream,
|
|
../libp2p/crypto/crypto,
|
|
../libp2p/protocols/identify,
|
|
../libp2p/connection,
|
|
../libp2p/transports/[transport, tcptransport],
|
|
../libp2p/multiaddress,
|
|
../libp2p/peerinfo,
|
|
../libp2p/peer,
|
|
../libp2p/protocols/protocol,
|
|
../libp2p/protocols/secure/secure,
|
|
../libp2p/protocols/secure/secio,
|
|
../libp2p/protocols/pubsub/pubsub,
|
|
../libp2p/protocols/pubsub/floodsub,
|
|
../libp2p/muxers/muxer,
|
|
../libp2p/muxers/mplex/mplex,
|
|
../libp2p/muxers/mplex/types
|
|
|
|
const ChatCodec = "/nim-libp2p/chat/1.0.0"
|
|
const DefaultAddr = "/ip4/127.0.0.1/tcp/55505"
|
|
|
|
|
|
const Help = """
|
|
Commands: /[?|hep|connect|disconnect|exit]
|
|
help: Prints this help
|
|
connect: dials a remote peer
|
|
disconnect: ends current session
|
|
exit: closes the chat
|
|
"""
|
|
|
|
type
|
|
CustomData = ref object
|
|
consoleFd: AsyncFD
|
|
serveFut: Future[void]
|
|
|
|
ChatProto = ref object of LPProtocol
|
|
customData*: CustomData
|
|
switch: Switch
|
|
transp: StreamTransport
|
|
conn: Connection
|
|
client: bool
|
|
connected: bool
|
|
started: bool
|
|
|
|
proc id (p: ChatProto): string =
|
|
result = "unknown"
|
|
if p.conn.peerInfo.peerId.isSome:
|
|
result = $p.conn.peerInfo.peerId.get()
|
|
|
|
# forward declaration
|
|
proc readWriteLoop(p: ChatProto) {.async, gcsafe.}
|
|
proc readAndPrint(p: ChatProto) {.async, gcsafe.} =
|
|
while true:
|
|
while p.connected:
|
|
# echo &"{p.id} -> "
|
|
echo cast[string](await p.conn.readLp())
|
|
await sleepAsync(100.millis)
|
|
|
|
proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} =
|
|
var parts = address.split("/")
|
|
if parts.len == 11 and parts[^2] notin ["ipfs", "p2p"]:
|
|
quit("invalid or incompelete peerId")
|
|
|
|
var remotePeer: PeerInfo
|
|
remotePeer.peerId = some(PeerID.init(parts[^1]))
|
|
remotePeer.addrs.add(MultiAddress.init(address))
|
|
|
|
echo &"dialing peer: {address}"
|
|
p.conn = await p.switch.dial(remotePeer, ChatCodec)
|
|
p.connected = true
|
|
|
|
proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
|
|
while true:
|
|
if not p.connected:
|
|
# echo &"{p.id} ->"
|
|
# else:
|
|
echo "type an address or wait for a connection:"
|
|
echo "type /[help|?] for help"
|
|
|
|
var line = await p.transp.readLine()
|
|
if line.startsWith("/help") or line.startsWith("/?") or not p.started:
|
|
echo Help
|
|
continue
|
|
|
|
if line.startsWith("/disconnect"):
|
|
echo "Ending current session"
|
|
if p.connected and p.conn.closed.not:
|
|
await p.conn.close()
|
|
p.connected = false
|
|
elif line.startsWith("/connect"):
|
|
if p.connected:
|
|
var yesno = "N"
|
|
echo "a session is already in progress, do you want end it [y/N]?"
|
|
yesno = await p.transp.readLine()
|
|
if yesno.cmpIgnoreCase("y") == 0:
|
|
await p.conn.close()
|
|
p.connected = false
|
|
elif yesno.cmpIgnoreCase("n") == 0:
|
|
continue
|
|
else:
|
|
echo "unrecognized response"
|
|
continue
|
|
|
|
echo "enter address of remote peer"
|
|
let address = await p.transp.readLine()
|
|
if address.len > 0:
|
|
await p.dialPeer(address)
|
|
|
|
elif line.startsWith("/exit"):
|
|
if p.connected and p.conn.closed.not:
|
|
await p.conn.close()
|
|
p.connected = false
|
|
|
|
await p.switch.stop()
|
|
echo "quitting..."
|
|
quit(0)
|
|
else:
|
|
if p.connected:
|
|
await p.conn.writeLp(line)
|
|
else:
|
|
try:
|
|
if line.startsWith("/") and "ipfs" in line:
|
|
await p.dialPeer(line)
|
|
except:
|
|
echo &"unable to dial {line}"
|
|
# echo getCurrentExceptionMsg()
|
|
|
|
proc readWriteLoop(p: ChatProto) {.async, gcsafe.} =
|
|
asyncCheck p.writeAndPrint()
|
|
asyncCheck p.readAndPrint()
|
|
|
|
method init(p: ChatProto) {.gcsafe.} =
|
|
proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
|
|
if p.connected and not p.conn.closed:
|
|
echo "a chat session is already in progress - disconnecting!"
|
|
await stream.close()
|
|
|
|
p.conn = stream
|
|
p.connected = true
|
|
|
|
p.codec = ChatCodec
|
|
p.handler = handle
|
|
|
|
proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto =
|
|
new result
|
|
result.switch = switch
|
|
result.transp = transp
|
|
result.init()
|
|
|
|
proc threadMain(wfd: AsyncFD) {.thread.} =
|
|
## This procedure performs reading from `stdin` and sends data over
|
|
## pipe to main thread.
|
|
var transp = fromPipe(wfd)
|
|
|
|
while true:
|
|
var line = stdin.readLine()
|
|
discard waitFor transp.write(line & "\r\n")
|
|
|
|
proc serveThread(customData: CustomData) {.async.} =
|
|
var transp = fromPipe(customData.consoleFd)
|
|
|
|
let seckey = PrivateKey.random(RSA)
|
|
var peerInfo: PeerInfo
|
|
peerInfo.peerId = some(PeerID.init(seckey))
|
|
var localAddress = DefaultAddr
|
|
while true:
|
|
echo &"Type an address to bind to or Enter to use the default {DefaultAddr}"
|
|
let a = await transp.readLine()
|
|
try:
|
|
if a.len > 0:
|
|
peerInfo.addrs.add(Multiaddress.init(a))
|
|
break
|
|
|
|
peerInfo.addrs.add(Multiaddress.init(localAddress))
|
|
break
|
|
except:
|
|
echo "invalid address"
|
|
localAddress = DefaultAddr
|
|
continue
|
|
|
|
proc createMplex(conn: Connection): Muxer =
|
|
result = newMplex(conn)
|
|
|
|
var mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
|
var transports = @[Transport(newTransport(TcpTransport))]
|
|
var muxers = [(MplexCodec, mplexProvider)].toTable()
|
|
var identify = newIdentify(peerInfo)
|
|
var secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable()
|
|
var switch = newSwitch(peerInfo, transports, identify, muxers, secureManagers = secureManagers)
|
|
|
|
var chatProto = newChatProto(switch, transp)
|
|
switch.mount(chatProto)
|
|
var libp2pFuts = await switch.start()
|
|
chatProto.started = true
|
|
|
|
let id = peerInfo.peerId.get().pretty
|
|
echo "PeerID: " & id
|
|
echo "listening on: "
|
|
for a in peerInfo.addrs:
|
|
echo &"{a}/ipfs/{id}"
|
|
|
|
await chatProto.readWriteLoop()
|
|
await allFutures(libp2pFuts)
|
|
|
|
proc main() {.async.} =
|
|
var data = new CustomData
|
|
|
|
var (rfd, wfd) = createAsyncPipe()
|
|
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
|
raise newException(ValueError, "Could not initialize pipe!")
|
|
|
|
data.consoleFd = rfd
|
|
data.serveFut = serveThread(data)
|
|
var thread: Thread[AsyncFD]
|
|
thread.createThread(threadMain, wfd)
|
|
await data.serveFut
|
|
|
|
when isMainModule:
|
|
waitFor(main())
|