From 7593d4d970be176fe72b31377aedc79a120e0a6a Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 9 Jan 2019 19:12:15 +0200 Subject: [PATCH] Fix unix path multiaddress serialization/deserialization. Add tests for unix path multiaddress. Add wire.nim. Fix latest breaking changes from go-libp2p-daemon. --- libp2p/daemon/daemonapi.nim | 137 +++++++++++++++++++++++------------- libp2p/multiaddress.nim | 9 ++- libp2p/wire.nim | 82 +++++++++++++++++++++ tests/testmultiaddress.nim | 17 +++++ 4 files changed, 192 insertions(+), 53 deletions(-) create mode 100644 libp2p/wire.nim diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index e47f1b9..8fdd259 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -10,15 +10,18 @@ ## This module implementes API for `go-libp2p-daemon`. import os, osproc, strutils, tables, streams, strtabs import asyncdispatch2 -import ../varint, ../multiaddress, ../base58, ../cid -import ../protobuf/minprotobuf +import ../varint, ../multiaddress, ../multicodec, ../base58, ../cid +import ../wire, ../protobuf/minprotobuf when not defined(windows): import posix const - DefaultSocketPath* = "/tmp/p2pd.sock" - DefaultSocketPattern* = "/tmp/p2pd-$1.sock" + DefaultSocketPath* = "/unix/tmp/p2pd.sock" + DefaultUnixSocketPattern* = "/unix/tmp/nim-p2pd-$1-$2.sock" + DefaultIpSocketPattern* = "/ip4/127.0.0.1/tcp/$2" + DefaultUnixChildPattern* = "/unix/tmp/nim-p2pd-handle-$1-$2.sock" + DefaultIpChildPattern* = "/ip4/127.0.0.1/tcp/$2" DefaultDaemonFile* = "p2pd" type @@ -104,13 +107,12 @@ type P2PServer = object server*: StreamServer - address*: TransportAddress + address*: MultiAddress DaemonAPI* = ref object # pool*: TransportPool flags*: set[P2PDaemonFlags] - address*: TransportAddress - sockname*: string + address*: MultiAddress pattern*: string ucounter*: int process*: Process @@ -197,13 +199,13 @@ proc requestStreamOpen(peerid: PeerID, result.write(initProtoField(3, msg)) result.finish() -proc requestStreamHandler(path: string, +proc requestStreamHandler(address: MultiAddress, protocols: openarray[MultiProtocol]): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doStreamHandler(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, path)) + msg.write(initProtoField(1, address.data.buffer)) for item in protocols: msg.write(initProtoField(2, item)) result.write(initProtoField(1, cast[uint](RequestType.STREAM_HANDLER))) @@ -483,19 +485,22 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = result = buffer proc newConnection*(api: DaemonAPI): Future[StreamTransport] = - # echo "Establish new connection to daemon [", $api.address, "]" result = connect(api.address) proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} = - # echo "Close connection with daemon [", $api.address, "]" transp.close() await transp.join() -when not defined(windows): - proc socketExists(filename: string): bool = - var res: Stat - result = stat(filename, res) >= 0'i32 +proc socketExists(address: MultiAddress): Future[bool] {.async.} = + try: + var transp = await connect(address) + transp.close() + await transp.join() + result = true + except: + result = false +when not defined(windows): proc loggingHandler(api: DaemonAPI): Future[void] = var retFuture = newFuture[void]("logging.handler") var loop = getGlobalDispatcher() @@ -521,9 +526,8 @@ when not defined(windows): proc getProcessId(): int = result = posix.getpid() else: - proc socketExists(filename: string): bool = - # Not ready yet - false + proc getCurrentProcessId*(): uint32 {.stdcall, dynlib: "kernel32", + importc: "GetCurrentProcessId".} proc loggingHandler(api: DaemonAPI): Future[void] = # Not ready yet. @@ -531,20 +535,18 @@ else: proc getProcessId(): int = # Not ready yet - discard + result = cast[int](getCurrentProcessId()) -proc getSocketName(pattern: string): string = +proc getSocket(pattern: string): Future[MultiAddress] {.async.} = var sockname = "" var pid = $getProcessId() while true: inc(daemonsCount) sockname = pattern % [pid, $daemonsCount] - if socketExists(sockname): - if tryRemoveFile(sockname): - result = sockname - break - else: - result = sockname + var ma = MultiAddress.init(sockname) + let res = await socketExists(ma) + if not res: + result = ma break # This is forward declaration needed for newDaemonApi() @@ -554,14 +556,46 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, bootstrapNodes: seq[string] = @[], id: string = "", daemon = DefaultDaemonFile, - sockpath = DefaultSocketPath, - patternSock = "/tmp/nim-p2pd-$1-$2.sock", - patternHandler = "/tmp/nim-p2pd-handle-$1-$2.sock", + sockpath = "", + patternSock = DefaultUnixSocketPattern, + patternHandler = DefaultUnixChildPattern, poolSize = 10, gossipsubHeartbeatInterval = 0, gossipsubHeartbeatDelay = 0, peersRequired = 2): Future[DaemonAPI] {.async.} = ## Initialize connection to `go-libp2p-daemon` control socket. + ## + ## ``flags`` - set of P2PDaemonFlags. + ## + ## ``bootstrapNodes`` - list of bootnode's addresses in MultiAddress format. + ## (default: @[], which means usage of default nodes inside of + ## `go-libp2p-daemon`). + ## + ## ``id`` - path to file with identification information (default: "" which + ## means - generate new random identity). + ## + ## ``daemon`` - name of ``go-libp2p-daemon`` executable (default: "p2pd"). + ## + ## ``sockpath`` - default control socket MultiAddress + ## (default: "/unix/tmp/p2pd.sock"). + ## + ## ``patternSock`` - MultiAddress pattern string, used to start multiple + ## daemons (default: "/unix/tmp/nim-p2pd-$1-$2.sock"). + ## + ## ``patternHandler`` - MultiAddress pattern string, used to establish + ## incoming channels (default: "/unix/tmp/nim-p2pd-handle-$1-$2.sock"). + ## + ## ``poolSize`` - size of connections pool (default: 10). + ## + ## ``gossipsubHeartbeatInterval`` - GossipSub protocol heartbeat interval in + ## milliseconds (default: 0, use default `go-libp2p-daemon` values). + ## + ## ``gossipsubHeartbeatDelay`` - GossipSub protocol heartbeat delay in + ## millseconds (default: 0, use default `go-libp2p-daemon` values). + ## + ## ``peersRequired`` - Wait until `go-libp2p-daemon` will connect to at least + ## ``peersRequired`` peers before return from `newDaemonApi()` procedure + ## (default: 2). var api = new DaemonAPI var args = newSeq[string]() var env: StringTableRef @@ -571,15 +605,15 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, api.pattern = patternHandler api.ucounter = 1 api.handlers = initTable[string, P2PStreamCallback]() - api.sockname = sockpath - if api.sockname == DefaultSocketPath: - api.sockname = getSocketName(patternSock) + if len(sockpath) == 0: + api.address = await getSocket(patternSock) else: - if not socketExists(api.sockname): - raise newException(DaemonLocalError, "Unix socket is not found!") + api.address = MultiAddress.init(sockpath) + let res = await socketExists(api.address) + if not res: + raise newException(DaemonLocalError, "Could not connect to remote daemon") - api.address = initTAddress(api.sockname) inc(daemonsCount) # DHTFull and DHTClient could not be present at the same time @@ -619,15 +653,15 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) if len(id) != 0: args.add("-id=" & id) - if api.sockname != DefaultSocketPath: - args.add("-sock=" & api.sockname) + args.add("-listen=" & $api.address) # We are trying to get absolute daemon path. let cmd = findExe(daemon) if len(cmd) == 0: raise newException(DaemonLocalError, "Could not find daemon executable!") - + # Starting daemon process + # echo "Starting ", cmd, " ", args.join(" ") api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut}) # Waiting until daemon will not be bound to control socket. while true: @@ -635,10 +669,11 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, echo api.process.errorStream.readAll() raise newException(DaemonLocalError, "Daemon executable could not be started!") - if socketExists(api.sockname): + let res = await socketExists(api.address) + if res: break - await sleepAsync(100) - # api.pool = await newPool(api.address, poolsize = poolSize) + await sleepAsync(500) + if Logging in api.flags: api.loggerFut = loggingHandler(api) @@ -673,7 +708,8 @@ proc close*(api: DaemonAPI) {.async.} = pending.add(server.server.join()) await all(pending) for server in api.servers: - discard tryRemoveFile($(server.address)) + let address = initTAddress(server.address) + discard tryRemoveFile($address) api.servers.setLen(0) # Closing daemon's process. api.process.kill() @@ -681,9 +717,10 @@ proc close*(api: DaemonAPI) {.async.} = # Waiting for logger loop to exit if not isNil(api.loggerFut): await api.loggerFut - # Attempt to delete control socket endpoint. - if socketExists(api.sockname): - discard tryRemoveFile(api.sockname) + # Attempt to delete unix socket endpoint. + let address = initTAddress(api.address) + if address.family == AddressFamily.Unix: + discard tryRemoveFile($address) template withMessage(m, body: untyped): untyped = let kind = m.checkResponse() @@ -809,18 +846,18 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback) {.async.} = ## Add stream handler ``handler`` for set of protocols ``protocols``. var transp = await api.newConnection() - var sockname = api.pattern % [$getProcessId(), $api.ucounter] - var localaddr = initTAddress(sockname) + let addrname = api.pattern % [$getProcessId(), $api.ucounter] + let maddress = MultiAddress.init(addrname) inc(api.ucounter) - var server = createStreamServer(localaddr, streamHandler, udata = api) + var server = createStreamServer(maddress, streamHandler, udata = api) try: for item in protocols: api.handlers[item] = handler server.start() - var pb = await transp.transactMessage(requestStreamHandler(sockname, + var pb = await transp.transactMessage(requestStreamHandler(maddress, protocols)) pb.withMessage() do: - api.servers.add(P2PServer(server: server, address: localaddr)) + api.servers.add(P2PServer(server: server, address: maddress)) except: for item in protocols: api.handlers.del(item) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 9b85929..5e952eb 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -421,9 +421,9 @@ proc protoArgument*(ma: MultiAddress, value: var openarray[byte]): int = elif proto.kind in {Length, Path}: if vb.data.readSeq(buffer) == -1: raise newException(MultiAddressError, "Decoding protocol error") - result = len(vb.data.buffer) + result = len(buffer) if len(value) >= result: - copyMem(addr value[0], addr vb.data.buffer[0], result) + copyMem(addr value[0], addr buffer[0], result) proc getPart(ma: MultiAddress, index: int): MultiAddress = var header: uint64 @@ -517,7 +517,10 @@ proc `$`*(value: MultiAddress): string = if not proto.coder.bufferToString(vb.data, part): raise newException(MultiAddressError, "Decoding protocol error") parts.add($(proto.mcodec)) - parts.add(part) + if proto.kind == Path and part[0] == '/': + parts.add(part[1..^1]) + else: + parts.add(part) elif proto.kind == Marker: parts.add($(proto.mcodec)) if len(parts) > 0: diff --git a/libp2p/wire.nim b/libp2p/wire.nim new file mode 100644 index 0000000..305b27f --- /dev/null +++ b/libp2p/wire.nim @@ -0,0 +1,82 @@ +## Nim-Libp2p +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +## This module implements wire network connection procedures. +import asyncdispatch2 +import multiaddress, multicodec + +proc initTAddress*(ma: MultiAddress): TransportAddress = + ## Initialize ``TransportAddress`` with MultiAddress ``ma``. + ## + ## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``. + var state = 0 + var pbuf: array[2, byte] + for part in ma.items(): + let code = part.protoCode() + if state == 0: + if code == multiCodec("ip4"): + result = TransportAddress(family: AddressFamily.IPv4) + if ma.protoArgument(result.address_v4) == 0: + raise newException(TransportAddressError, "Incorrect IPv4 address") + inc(state) + elif code == multiCodec("ip6"): + result = TransportAddress(family: AddressFamily.IPv6) + if ma.protoArgument(result.address_v6) == 0: + raise newException(TransportAddressError, "Incorrect IPv6 address") + inc(state) + elif code == multiCodec("unix"): + result = TransportAddress(family: AddressFamily.Unix) + if ma.protoArgument(result.address_un) == 0: + raise newException(TransportAddressError, "Incorrect Unix address") + result.port = Port(1) + break + else: + raise newException(TransportAddressError, + "Could not initialize address!") + elif state == 1: + if code == multiCodec("tcp") or code == multiCodec("udp"): + if ma.protoArgument(pbuf) == 0: + raise newException(TransportAddressError, "Incorrect port") + result.port = Port((cast[uint16](pbuf[0]) shl 8) or + cast[uint16](pbuf[1])) + break + else: + raise newException(TransportAddressError, + "Could not initialize address!") + +proc connect*(ma: MultiAddress, bufferSize = DefaultStreamBufferSize, + child: StreamTransport = nil): Future[StreamTransport] = + ## Open new connection to remote peer with address ``ma`` and create + ## new transport object ``StreamTransport`` for established connection. + ## ``bufferSize`` is size of internal buffer for transport. + let address = initTAddress(ma) + if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: + if ma[1].protoCode() != multiCodec("tcp"): + var retFuture = newFuture[StreamTransport]() + retFuture.fail(newException(TransportAddressError, + "Incorrect address type!")) + return retFuture + result = connect(address, bufferSize, child) + +proc createStreamServer*[T](ma: MultiAddress, + cbproc: StreamCallback, + flags: set[ServerFlags] = {}, + udata: ref T, + sock: AsyncFD = asyncInvalidSocket, + backlog: int = 100, + bufferSize: int = DefaultStreamBufferSize, + child: StreamServer = nil, + init: TransportInitCallback = nil): StreamServer = + ## Create new TCP stream server which bounds to ``ma`` address. + var address = initTAddress(ma) + if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: + if ma[1].protoCode() != multiCodec("tcp"): + raise newException(TransportAddressError, "Incorrect address type!") + result = createStreamServer(address, cbproc, flags, udata, sock, backlog, + bufferSize, child, init) diff --git a/tests/testmultiaddress.nim b/tests/testmultiaddress.nim index d2cd168..c26a781 100644 --- a/tests/testmultiaddress.nim +++ b/tests/testmultiaddress.nim @@ -170,6 +170,16 @@ const "/p2p-circuit/50" ] + PathVectors = [ + "/unix/tmp/p2pd.sock", + "/unix/a/b/c/d/e/f/g/h/i.sock" + ] + + PathExpects = [ + "90030E2F746D702F703270642E736F636B", + "9003172F612F622F632F642F652F662F672F682F692E736F636B" + ] + UtilitySuccessVectors = [ "/ip4/127.0.0.1/tcp/1024", "/ip4/127.0.0.1/udp/1024", @@ -247,3 +257,10 @@ suite "MultiAddress test suite": for item in UtilityFailVectors: var a = MultiAddress.init(item) check a.isWire() == false + + test "Path addresses serialization/deserialization": + for i in 0..