From e9785bb509900083e16e504cb2f4a4174e99eb60 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 18 Dec 2018 16:31:58 +0200 Subject: [PATCH] Make unix socket name generation more reliable (depending on process id). Add examples of bootstrap node and gossipsub node. --- examples/bootstrap.nim | 43 ++++++++++++ examples/hexdump.nim | 92 +++++++++++++++++++++++++ examples/node.nim | 36 ++++++++++ libp2p/daemon/daemonapi.nim | 132 ++++++++++++++++++++---------------- 4 files changed, 243 insertions(+), 60 deletions(-) create mode 100644 examples/bootstrap.nim create mode 100644 examples/hexdump.nim create mode 100644 examples/node.nim diff --git a/examples/bootstrap.nim b/examples/bootstrap.nim new file mode 100644 index 000000000..9cd92e224 --- /dev/null +++ b/examples/bootstrap.nim @@ -0,0 +1,43 @@ +import asyncdispatch2, nimcrypto, strutils +import ../libp2p/daemon/daemonapi, ../libp2p/[base58, multicodec, multiaddress] +import hexdump + +const + PubSubTopic = "test-net" + +proc dumpSubscribedPeers(api: DaemonAPI) {.async.} = + var peers = await api.pubsubListPeers(PubSubTopic) + echo "= List of connected and subscribed peers:" + for item in peers: + echo Base58.encode(item) + +proc main() {.async.} = + echo "= Starting P2P bootnode" + var api = await newDaemonApi({DHTFull, PSGossipSub}) + var id = await api.identity() + let tpeerid = Base58.encode(id.peer) + echo "= P2P bootnode ", tpeerid, " started." + let mcip4 = multiCodec("ip4") + let mcip6 = multiCodec("ip6") + echo "= You can use one of this addresses to bootstrap your nodes:" + for item in id.addresses: + if item.protoCode() == mcip4 or item.protoCode() == mcip6: + echo $item & "/ipfs/" & tpeerid + + proc pubsubLogger(api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.async.} = + let bpeer = Base58.encode(message.peer) + let msglen = len(message.data) + echo "= Recieved pubsub message wit length ", msglen, + " bytes from peer ", bpeer + echo dumpHex(message.data) + await api.dumpSubscribedPeers() + result = true + + var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger) + +when isMainModule: + waitFor(main()) + while true: + poll() diff --git a/examples/hexdump.nim b/examples/hexdump.nim new file mode 100644 index 000000000..38498d94b --- /dev/null +++ b/examples/hexdump.nim @@ -0,0 +1,92 @@ +# +# Copyright (c) 2016 Eugene Kabanov +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +from strutils import toHex, repeat + +proc dumpHex*(pbytes: pointer, nbytes: int, items = 1, ascii = true): string = + ## Return hexadecimal memory dump representation pointed by ``p``. + ## ``nbytes`` - number of bytes to show + ## ``items`` - number of bytes in group (supported ``items`` count is + ## 1, 2, 4, 8) + ## ``ascii`` - if ``true`` show ASCII representation of memory dump. + result = "" + let hexSize = items * 2 + var i = 0 + var slider = pbytes + var asciiText = "" + while i < nbytes: + if i %% 16 == 0: + result = result & toHex(cast[BiggestInt](slider), + sizeof(BiggestInt) * 2) & ": " + var k = 0 + while k < items: + var ch = cast[ptr char](cast[uint](slider) + k.uint)[] + if ord(ch) > 31 and ord(ch) < 127: asciiText &= ch else: asciiText &= "." + inc(k) + case items: + of 1: + result = result & toHex(cast[BiggestInt](cast[ptr uint8](slider)[]), + hexSize) + of 2: + result = result & toHex(cast[BiggestInt](cast[ptr uint16](slider)[]), + hexSize) + of 4: + result = result & toHex(cast[BiggestInt](cast[ptr uint32](slider)[]), + hexSize) + of 8: + result = result & toHex(cast[BiggestInt](cast[ptr uint64](slider)[]), + hexSize) + else: + raise newException(ValueError, "Wrong items size!") + result = result & " " + slider = cast[pointer](cast[uint](slider) + items.uint) + i = i + items + if i %% 16 == 0: + result = result & " " & asciiText + asciiText.setLen(0) + result = result & "\n" + + if i %% 16 != 0: + var spacesCount = ((16 - (i %% 16)) div items) * (hexSize + 1) + 1 + result = result & repeat(' ', spacesCount) + result = result & asciiText + result = result & "\n" + +proc dumpHex*[T](v: openarray[T], items: int = 0, ascii = true): string = + ## Return hexadecimal memory dump representation of openarray[T] ``v``. + ## ``items`` - number of bytes in group (supported ``items`` count is + ## 0, 1, 2, 4, 8). If ``items`` is ``0`` group size will depend on + ## ``sizeof(T)``. + ## ``ascii`` - if ``true`` show ASCII representation of memory dump. + var i = 0 + if items == 0: + when sizeof(T) == 2: + i = 2 + elif sizeof(T) == 4: + i = 4 + elif sizeof(T) == 8: + i = 8 + else: + i = 1 + else: + i = items + result = dumpHex(unsafeAddr v[0], sizeof(T) * len(v), i, ascii) diff --git a/examples/node.nim b/examples/node.nim new file mode 100644 index 000000000..9f6d2e756 --- /dev/null +++ b/examples/node.nim @@ -0,0 +1,36 @@ +import asyncdispatch2, nimcrypto, strutils, os +import ../libp2p/daemon/daemonapi, ../libp2p/[base58, multiaddress] + +proc main(bn: string) {.async.} = + echo "= Starting P2P node" + var bootnodes = bn.split(",") + var api = await newDaemonApi({DHTFull, PSGossipSub, WaitBootstrap}, + bootstrapNodes = bootnodes, + peersRequired = 1) + var id = await api.identity() + echo "= P2P node ", Base58.encode(id.peer), " started:" + for item in id.addresses: + echo item + + proc pubsubLogger(api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.async.} = + let bpeer = Base58.encode(message.peer) + let msglen = len(message.data) + echo "= Recieved pubsub message wit length ", msglen, + " bytes from peer ", bpeer + result = true + + var ticket = await api.pubsubSubscribe("test-net", pubsubLogger) + + var data = "HELLO\r\n" + var msgData = cast[seq[byte]](data) + await api.pubsubPublish("test-net", msgData) + +when isMainModule: + if paramCount() != 1: + echo "Please supply bootnodes!" + else: + waitFor(main(paramStr(1))) + while true: + poll() diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index 443488b09..e47f1b9ce 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -517,13 +517,36 @@ when not defined(windows): register(AsyncFD(pfd)) addReader(fd, readOutputLoop, nil) result = retFuture + + proc getProcessId(): int = + result = posix.getpid() else: - proc socketExists(filename: string): bool = false + proc socketExists(filename: string): bool = + # Not ready yet + false proc loggingHandler(api: DaemonAPI): Future[void] = # Not ready yet. discard + proc getProcessId(): int = + # Not ready yet + discard + +proc getSocketName(pattern: string): string = + var sockname = "" + var pid = $getProcessId() + while true: + inc(daemonsCount) + sockname = pattern % [pid, $daemonsCount] + if socketExists(sockname): + if tryRemoveFile(sockname): + result = sockname + break + else: + result = sockname + break + # This is forward declaration needed for newDaemonApi() proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} @@ -532,7 +555,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, id: string = "", daemon = DefaultDaemonFile, sockpath = DefaultSocketPath, - pattern = "/tmp/nim-p2pd-$1.sock", + patternSock = "/tmp/nim-p2pd-$1-$2.sock", + patternHandler = "/tmp/nim-p2pd-handle-$1-$2.sock", poolSize = 10, gossipsubHeartbeatInterval = 0, gossipsubHeartbeatDelay = 0, @@ -544,77 +568,65 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, api.flags = flags api.servers = newSeq[P2PServer]() - api.pattern = pattern + api.pattern = patternHandler api.ucounter = 1 api.handlers = initTable[string, P2PStreamCallback]() api.sockname = sockpath if api.sockname == DefaultSocketPath: - # If client not specify `sockpath` but tries to spawn many daemons, we will - # replace sockname. - if daemonsCount != 0: - api.sockname = DefaultSocketPattern % [$daemonsCount] + api.sockname = getSocketName(patternSock) + else: + if not socketExists(api.sockname): + raise newException(DaemonLocalError, "Unix socket is not found!") api.address = initTAddress(api.sockname) inc(daemonsCount) - # We will start daemon process only when control socket path is not default or - # options are specified. - if flags == {} and api.sockname == DefaultSocketPath: - discard - else: - # DHTFull and DHTClient could not be present at the same time - if DHTFull in flags and DHTClient in flags: - api.flags.excl(DHTClient) - # PSGossipSub and PSFloodSub could not be present at the same time - if PSGossipSub in flags and PSFloodSub in flags: - api.flags.excl(PSFloodSub) - if DHTFull in api.flags: - args.add("-dht") - if DHTClient in api.flags: - args.add("-dhtClient") - if {Bootstrap, WaitBootstrap} * api.flags != {}: - args.add("-b") - if Verbose in api.flags: - env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive) - if PSGossipSub in api.flags: - args.add("-pubsub") - args.add("-pubsubRouter=gossipsub") - if gossipsubHeartbeatInterval != 0: - let param = $gossipsubHeartbeatInterval & "ms" - args.add("-gossipsubHeartbeatInterval=" & param) - if gossipsubHeartbeatDelay != 0: - let param = $gossipsubHeartbeatDelay & "ms" - args.add("-gossipsubHeartbeatInitialDelay=" & param) - if PSFloodSub in api.flags: - args.add("-pubsub") - args.add("-pubsubRouter=floodsub") - if api.flags * {PSFloodSub, PSGossipSub} != {}: - if PSNoSign in api.flags: - args.add("-pubsubSign=false") - if PSStrictSign in api.flags: - args.add("-pubsubSignStrict=true") - if NATPortMap in api.flags: - args.add("-natPortMap=true") - if len(bootstrapNodes) > 0: - args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) - if len(id) != 0: - args.add("-id=" & id) - if api.sockname != DefaultSocketPath: - args.add("-sock=" & api.sockname) + # DHTFull and DHTClient could not be present at the same time + if DHTFull in flags and DHTClient in flags: + api.flags.excl(DHTClient) + # PSGossipSub and PSFloodSub could not be present at the same time + if PSGossipSub in flags and PSFloodSub in flags: + api.flags.excl(PSFloodSub) + if DHTFull in api.flags: + args.add("-dht") + if DHTClient in api.flags: + args.add("-dhtClient") + if {Bootstrap, WaitBootstrap} * api.flags != {}: + args.add("-b") + if Verbose in api.flags: + env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive) + if PSGossipSub in api.flags: + args.add("-pubsub") + args.add("-pubsubRouter=gossipsub") + if gossipsubHeartbeatInterval != 0: + let param = $gossipsubHeartbeatInterval & "ms" + args.add("-gossipsubHeartbeatInterval=" & param) + if gossipsubHeartbeatDelay != 0: + let param = $gossipsubHeartbeatDelay & "ms" + args.add("-gossipsubHeartbeatInitialDelay=" & param) + if PSFloodSub in api.flags: + args.add("-pubsub") + args.add("-pubsubRouter=floodsub") + if api.flags * {PSFloodSub, PSGossipSub} != {}: + if PSNoSign in api.flags: + args.add("-pubsubSign=false") + if PSStrictSign in api.flags: + args.add("-pubsubSignStrict=true") + if NATPortMap in api.flags: + args.add("-natPortMap=true") + if len(bootstrapNodes) > 0: + args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) + if len(id) != 0: + args.add("-id=" & id) + if api.sockname != DefaultSocketPath: + args.add("-sock=" & api.sockname) # We are trying to get absolute daemon path. let cmd = findExe(daemon) if len(cmd) == 0: raise newException(DaemonLocalError, "Could not find daemon executable!") - # We will try to remove control socket file, because daemon will fail - # if its not able to create new socket control file. - # We can't use `existsFile()` because it do not support unix-domain socket - # endpoints. - if socketExists(api.sockname): - if not tryRemoveFile(api.sockname): - if api.sockname != sockpath: - raise newException(DaemonLocalError, "Socket is already bound!") + # Starting daemon process api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut}) # Waiting until daemon will not be bound to control socket. @@ -797,7 +809,7 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback) {.async.} = ## Add stream handler ``handler`` for set of protocols ``protocols``. var transp = await api.newConnection() - var sockname = api.pattern % [$api.ucounter] + var sockname = api.pattern % [$getProcessId(), $api.ucounter] var localaddr = initTAddress(sockname) inc(api.ucounter) var server = createStreamServer(localaddr, streamHandler, udata = api)