From 66973aed3ff5cc28cb85523755d2d37f69064157 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Thu, 22 Nov 2018 16:12:46 +0200 Subject: [PATCH] Simple chat. --- examples/chat.nim | 81 +++++++++++++++++++++++++++++++++ libp2p/protobuf/minprotobuf.nim | 7 +-- 2 files changed, 85 insertions(+), 3 deletions(-) create mode 100644 examples/chat.nim diff --git a/examples/chat.nim b/examples/chat.nim new file mode 100644 index 000000000..b79a21f5b --- /dev/null +++ b/examples/chat.nim @@ -0,0 +1,81 @@ +import asyncdispatch2, nimcrypto, strutils +import ../libp2p/daemon/daemonapi + +const + ConsoleAddress = "/tmp/console-chat.sock" + ServerAddress = "/tmp/remote-chat.sock" + ServerProtocols = @["/test-chat-stream"] + +type + CustomData = ref object + api: DaemonAPI + remotes: seq[StreamTransport] + +proc threadMain(a: int) {.thread.} = + ## This procedure performs reading from `stdin` and sends data over + ## unix domain socket to main thread. + var transp = waitFor connect(initTAddress(ConsoleAddress)) + + while true: + var line = stdin.readLine() + let res = waitFor transp.write(line & "\r\n") + +proc serveThread(server: StreamServer, + transp: StreamTransport) {.async.} = + ## This procedure perform readin on local unix domain socket and + ## sends data to remote clients. + var udata = getUserData[CustomData](server) + while true: + try: + var line = await transp.readLine() + if line.startsWith("/connect"): + var parts = line.split(" ") + if len(parts) == 2: + var address = fromHex(parts[1]) + echo "= Searching for peer ", toHex(address) + var id = await udata.api.dhtFindPeer(address) + echo "= Connecting to peer ", toHex(address) + await udata.api.connect(id.peer, id.addresses) + echo "= Opening stream to peer chat ", toHex(address) + var stream = await udata.api.openStream(id.peer, ServerProtocols) + udata.remotes.add(transp) + echo "= Connected to peer chat ", toHex(address) + else: + var msg = line & "\r\n" + echo "<< ", msg + var pending = newSeq[Future[int]]() + for item in udata.remotes: + pending.add(item.write(msg)) + if len(pending) > 0: + var results = await all(pending) + except: + break + +proc main() {.async.} = + var data = new CustomData + data.remotes = newSeq[StreamTransport]() + + var lserver = createStreamServer(initTAddress(ConsoleAddress), + serveThread, udata = data) + lserver.start() + var thread: Thread[int] + thread.createThread(threadMain, 0) + + echo "= Starting P2P node" + data.api = await newDaemonApi({DHTFull}) + await sleepAsync(1000) + var id = await data.api.identity() + + proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + echo "= Peer ", toHex(stream.peer), " joined chat" + while true: + var line = await stream.transp.readLine() + echo ">> ", line + + await data.api.addHandler(ServerProtocols, streamHandler) + echo "= Your PeerID is ", toHex(id.peer) + +when isMainModule: + waitFor(main()) + while true: + poll() diff --git a/libp2p/protobuf/minprotobuf.nim b/libp2p/protobuf/minprotobuf.nim index feaf1a75a..14d008df4 100644 --- a/libp2p/protobuf/minprotobuf.nim +++ b/libp2p/protobuf/minprotobuf.nim @@ -167,9 +167,10 @@ proc write*(pb: var ProtoBuffer, field: ProtoField) = assert(res == VarintStatus.Success) pb.offset += length assert(pb.isEnough(len(field.vbuffer))) - copyMem(addr pb.buffer[pb.offset], unsafeAddr field.vbuffer[0], - len(field.vbuffer)) - pb.offset += len(field.vbuffer) + if len(field.vbuffer) > 0: + copyMem(addr pb.buffer[pb.offset], unsafeAddr field.vbuffer[0], + len(field.vbuffer)) + pb.offset += len(field.vbuffer) else: discard