Fix unix path multiaddress serialization/deserialization.

Add tests for unix path multiaddress.
Add wire.nim.
Fix latest breaking changes from go-libp2p-daemon.
This commit is contained in:
cheatfate 2019-01-09 19:12:15 +02:00
parent 7e2d3e213f
commit 7593d4d970
4 changed files with 192 additions and 53 deletions

View File

@ -10,15 +10,18 @@
## This module implementes API for `go-libp2p-daemon`. ## This module implementes API for `go-libp2p-daemon`.
import os, osproc, strutils, tables, streams, strtabs import os, osproc, strutils, tables, streams, strtabs
import asyncdispatch2 import asyncdispatch2
import ../varint, ../multiaddress, ../base58, ../cid import ../varint, ../multiaddress, ../multicodec, ../base58, ../cid
import ../protobuf/minprotobuf import ../wire, ../protobuf/minprotobuf
when not defined(windows): when not defined(windows):
import posix import posix
const const
DefaultSocketPath* = "/tmp/p2pd.sock" DefaultSocketPath* = "/unix/tmp/p2pd.sock"
DefaultSocketPattern* = "/tmp/p2pd-$1.sock" DefaultUnixSocketPattern* = "/unix/tmp/nim-p2pd-$1-$2.sock"
DefaultIpSocketPattern* = "/ip4/127.0.0.1/tcp/$2"
DefaultUnixChildPattern* = "/unix/tmp/nim-p2pd-handle-$1-$2.sock"
DefaultIpChildPattern* = "/ip4/127.0.0.1/tcp/$2"
DefaultDaemonFile* = "p2pd" DefaultDaemonFile* = "p2pd"
type type
@ -104,13 +107,12 @@ type
P2PServer = object P2PServer = object
server*: StreamServer server*: StreamServer
address*: TransportAddress address*: MultiAddress
DaemonAPI* = ref object DaemonAPI* = ref object
# pool*: TransportPool # pool*: TransportPool
flags*: set[P2PDaemonFlags] flags*: set[P2PDaemonFlags]
address*: TransportAddress address*: MultiAddress
sockname*: string
pattern*: string pattern*: string
ucounter*: int ucounter*: int
process*: Process process*: Process
@ -197,13 +199,13 @@ proc requestStreamOpen(peerid: PeerID,
result.write(initProtoField(3, msg)) result.write(initProtoField(3, msg))
result.finish() result.finish()
proc requestStreamHandler(path: string, proc requestStreamHandler(address: MultiAddress,
protocols: openarray[MultiProtocol]): ProtoBuffer = protocols: openarray[MultiProtocol]): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go
## Processing function `doStreamHandler(req *pb.Request)`. ## Processing function `doStreamHandler(req *pb.Request)`.
result = initProtoBuffer({WithVarintLength}) result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer() var msg = initProtoBuffer()
msg.write(initProtoField(1, path)) msg.write(initProtoField(1, address.data.buffer))
for item in protocols: for item in protocols:
msg.write(initProtoField(2, item)) msg.write(initProtoField(2, item))
result.write(initProtoField(1, cast[uint](RequestType.STREAM_HANDLER))) result.write(initProtoField(1, cast[uint](RequestType.STREAM_HANDLER)))
@ -483,19 +485,22 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
result = buffer result = buffer
proc newConnection*(api: DaemonAPI): Future[StreamTransport] = proc newConnection*(api: DaemonAPI): Future[StreamTransport] =
# echo "Establish new connection to daemon [", $api.address, "]"
result = connect(api.address) result = connect(api.address)
proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} = proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} =
# echo "Close connection with daemon [", $api.address, "]"
transp.close() transp.close()
await transp.join() await transp.join()
when not defined(windows): proc socketExists(address: MultiAddress): Future[bool] {.async.} =
proc socketExists(filename: string): bool = try:
var res: Stat var transp = await connect(address)
result = stat(filename, res) >= 0'i32 transp.close()
await transp.join()
result = true
except:
result = false
when not defined(windows):
proc loggingHandler(api: DaemonAPI): Future[void] = proc loggingHandler(api: DaemonAPI): Future[void] =
var retFuture = newFuture[void]("logging.handler") var retFuture = newFuture[void]("logging.handler")
var loop = getGlobalDispatcher() var loop = getGlobalDispatcher()
@ -521,9 +526,8 @@ when not defined(windows):
proc getProcessId(): int = proc getProcessId(): int =
result = posix.getpid() result = posix.getpid()
else: else:
proc socketExists(filename: string): bool = proc getCurrentProcessId*(): uint32 {.stdcall, dynlib: "kernel32",
# Not ready yet importc: "GetCurrentProcessId".}
false
proc loggingHandler(api: DaemonAPI): Future[void] = proc loggingHandler(api: DaemonAPI): Future[void] =
# Not ready yet. # Not ready yet.
@ -531,20 +535,18 @@ else:
proc getProcessId(): int = proc getProcessId(): int =
# Not ready yet # Not ready yet
discard result = cast[int](getCurrentProcessId())
proc getSocketName(pattern: string): string = proc getSocket(pattern: string): Future[MultiAddress] {.async.} =
var sockname = "" var sockname = ""
var pid = $getProcessId() var pid = $getProcessId()
while true: while true:
inc(daemonsCount) inc(daemonsCount)
sockname = pattern % [pid, $daemonsCount] sockname = pattern % [pid, $daemonsCount]
if socketExists(sockname): var ma = MultiAddress.init(sockname)
if tryRemoveFile(sockname): let res = await socketExists(ma)
result = sockname if not res:
break result = ma
else:
result = sockname
break break
# This is forward declaration needed for newDaemonApi() # This is forward declaration needed for newDaemonApi()
@ -554,14 +556,46 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
bootstrapNodes: seq[string] = @[], bootstrapNodes: seq[string] = @[],
id: string = "", id: string = "",
daemon = DefaultDaemonFile, daemon = DefaultDaemonFile,
sockpath = DefaultSocketPath, sockpath = "",
patternSock = "/tmp/nim-p2pd-$1-$2.sock", patternSock = DefaultUnixSocketPattern,
patternHandler = "/tmp/nim-p2pd-handle-$1-$2.sock", patternHandler = DefaultUnixChildPattern,
poolSize = 10, poolSize = 10,
gossipsubHeartbeatInterval = 0, gossipsubHeartbeatInterval = 0,
gossipsubHeartbeatDelay = 0, gossipsubHeartbeatDelay = 0,
peersRequired = 2): Future[DaemonAPI] {.async.} = peersRequired = 2): Future[DaemonAPI] {.async.} =
## Initialize connection to `go-libp2p-daemon` control socket. ## Initialize connection to `go-libp2p-daemon` control socket.
##
## ``flags`` - set of P2PDaemonFlags.
##
## ``bootstrapNodes`` - list of bootnode's addresses in MultiAddress format.
## (default: @[], which means usage of default nodes inside of
## `go-libp2p-daemon`).
##
## ``id`` - path to file with identification information (default: "" which
## means - generate new random identity).
##
## ``daemon`` - name of ``go-libp2p-daemon`` executable (default: "p2pd").
##
## ``sockpath`` - default control socket MultiAddress
## (default: "/unix/tmp/p2pd.sock").
##
## ``patternSock`` - MultiAddress pattern string, used to start multiple
## daemons (default: "/unix/tmp/nim-p2pd-$1-$2.sock").
##
## ``patternHandler`` - MultiAddress pattern string, used to establish
## incoming channels (default: "/unix/tmp/nim-p2pd-handle-$1-$2.sock").
##
## ``poolSize`` - size of connections pool (default: 10).
##
## ``gossipsubHeartbeatInterval`` - GossipSub protocol heartbeat interval in
## milliseconds (default: 0, use default `go-libp2p-daemon` values).
##
## ``gossipsubHeartbeatDelay`` - GossipSub protocol heartbeat delay in
## millseconds (default: 0, use default `go-libp2p-daemon` values).
##
## ``peersRequired`` - Wait until `go-libp2p-daemon` will connect to at least
## ``peersRequired`` peers before return from `newDaemonApi()` procedure
## (default: 2).
var api = new DaemonAPI var api = new DaemonAPI
var args = newSeq[string]() var args = newSeq[string]()
var env: StringTableRef var env: StringTableRef
@ -571,15 +605,15 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
api.pattern = patternHandler api.pattern = patternHandler
api.ucounter = 1 api.ucounter = 1
api.handlers = initTable[string, P2PStreamCallback]() api.handlers = initTable[string, P2PStreamCallback]()
api.sockname = sockpath
if api.sockname == DefaultSocketPath: if len(sockpath) == 0:
api.sockname = getSocketName(patternSock) api.address = await getSocket(patternSock)
else: else:
if not socketExists(api.sockname): api.address = MultiAddress.init(sockpath)
raise newException(DaemonLocalError, "Unix socket is not found!") let res = await socketExists(api.address)
if not res:
raise newException(DaemonLocalError, "Could not connect to remote daemon")
api.address = initTAddress(api.sockname)
inc(daemonsCount) inc(daemonsCount)
# DHTFull and DHTClient could not be present at the same time # DHTFull and DHTClient could not be present at the same time
@ -619,8 +653,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) args.add("-bootstrapPeers=" & bootstrapNodes.join(","))
if len(id) != 0: if len(id) != 0:
args.add("-id=" & id) args.add("-id=" & id)
if api.sockname != DefaultSocketPath: args.add("-listen=" & $api.address)
args.add("-sock=" & api.sockname)
# We are trying to get absolute daemon path. # We are trying to get absolute daemon path.
let cmd = findExe(daemon) let cmd = findExe(daemon)
@ -628,6 +661,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
raise newException(DaemonLocalError, "Could not find daemon executable!") raise newException(DaemonLocalError, "Could not find daemon executable!")
# Starting daemon process # Starting daemon process
# echo "Starting ", cmd, " ", args.join(" ")
api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut}) api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket. # Waiting until daemon will not be bound to control socket.
while true: while true:
@ -635,10 +669,11 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
echo api.process.errorStream.readAll() echo api.process.errorStream.readAll()
raise newException(DaemonLocalError, raise newException(DaemonLocalError,
"Daemon executable could not be started!") "Daemon executable could not be started!")
if socketExists(api.sockname): let res = await socketExists(api.address)
if res:
break break
await sleepAsync(100) await sleepAsync(500)
# api.pool = await newPool(api.address, poolsize = poolSize)
if Logging in api.flags: if Logging in api.flags:
api.loggerFut = loggingHandler(api) api.loggerFut = loggingHandler(api)
@ -673,7 +708,8 @@ proc close*(api: DaemonAPI) {.async.} =
pending.add(server.server.join()) pending.add(server.server.join())
await all(pending) await all(pending)
for server in api.servers: for server in api.servers:
discard tryRemoveFile($(server.address)) let address = initTAddress(server.address)
discard tryRemoveFile($address)
api.servers.setLen(0) api.servers.setLen(0)
# Closing daemon's process. # Closing daemon's process.
api.process.kill() api.process.kill()
@ -681,9 +717,10 @@ proc close*(api: DaemonAPI) {.async.} =
# Waiting for logger loop to exit # Waiting for logger loop to exit
if not isNil(api.loggerFut): if not isNil(api.loggerFut):
await api.loggerFut await api.loggerFut
# Attempt to delete control socket endpoint. # Attempt to delete unix socket endpoint.
if socketExists(api.sockname): let address = initTAddress(api.address)
discard tryRemoveFile(api.sockname) if address.family == AddressFamily.Unix:
discard tryRemoveFile($address)
template withMessage(m, body: untyped): untyped = template withMessage(m, body: untyped): untyped =
let kind = m.checkResponse() let kind = m.checkResponse()
@ -809,18 +846,18 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string],
handler: P2PStreamCallback) {.async.} = handler: P2PStreamCallback) {.async.} =
## Add stream handler ``handler`` for set of protocols ``protocols``. ## Add stream handler ``handler`` for set of protocols ``protocols``.
var transp = await api.newConnection() var transp = await api.newConnection()
var sockname = api.pattern % [$getProcessId(), $api.ucounter] let addrname = api.pattern % [$getProcessId(), $api.ucounter]
var localaddr = initTAddress(sockname) let maddress = MultiAddress.init(addrname)
inc(api.ucounter) inc(api.ucounter)
var server = createStreamServer(localaddr, streamHandler, udata = api) var server = createStreamServer(maddress, streamHandler, udata = api)
try: try:
for item in protocols: for item in protocols:
api.handlers[item] = handler api.handlers[item] = handler
server.start() server.start()
var pb = await transp.transactMessage(requestStreamHandler(sockname, var pb = await transp.transactMessage(requestStreamHandler(maddress,
protocols)) protocols))
pb.withMessage() do: pb.withMessage() do:
api.servers.add(P2PServer(server: server, address: localaddr)) api.servers.add(P2PServer(server: server, address: maddress))
except: except:
for item in protocols: for item in protocols:
api.handlers.del(item) api.handlers.del(item)

View File

@ -421,9 +421,9 @@ proc protoArgument*(ma: MultiAddress, value: var openarray[byte]): int =
elif proto.kind in {Length, Path}: elif proto.kind in {Length, Path}:
if vb.data.readSeq(buffer) == -1: if vb.data.readSeq(buffer) == -1:
raise newException(MultiAddressError, "Decoding protocol error") raise newException(MultiAddressError, "Decoding protocol error")
result = len(vb.data.buffer) result = len(buffer)
if len(value) >= result: if len(value) >= result:
copyMem(addr value[0], addr vb.data.buffer[0], result) copyMem(addr value[0], addr buffer[0], result)
proc getPart(ma: MultiAddress, index: int): MultiAddress = proc getPart(ma: MultiAddress, index: int): MultiAddress =
var header: uint64 var header: uint64
@ -517,7 +517,10 @@ proc `$`*(value: MultiAddress): string =
if not proto.coder.bufferToString(vb.data, part): if not proto.coder.bufferToString(vb.data, part):
raise newException(MultiAddressError, "Decoding protocol error") raise newException(MultiAddressError, "Decoding protocol error")
parts.add($(proto.mcodec)) parts.add($(proto.mcodec))
parts.add(part) if proto.kind == Path and part[0] == '/':
parts.add(part[1..^1])
else:
parts.add(part)
elif proto.kind == Marker: elif proto.kind == Marker:
parts.add($(proto.mcodec)) parts.add($(proto.mcodec))
if len(parts) > 0: if len(parts) > 0:

82
libp2p/wire.nim Normal file
View File

@ -0,0 +1,82 @@
## Nim-Libp2p
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
## This module implements wire network connection procedures.
import asyncdispatch2
import multiaddress, multicodec
proc initTAddress*(ma: MultiAddress): TransportAddress =
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
##
## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``.
var state = 0
var pbuf: array[2, byte]
for part in ma.items():
let code = part.protoCode()
if state == 0:
if code == multiCodec("ip4"):
result = TransportAddress(family: AddressFamily.IPv4)
if ma.protoArgument(result.address_v4) == 0:
raise newException(TransportAddressError, "Incorrect IPv4 address")
inc(state)
elif code == multiCodec("ip6"):
result = TransportAddress(family: AddressFamily.IPv6)
if ma.protoArgument(result.address_v6) == 0:
raise newException(TransportAddressError, "Incorrect IPv6 address")
inc(state)
elif code == multiCodec("unix"):
result = TransportAddress(family: AddressFamily.Unix)
if ma.protoArgument(result.address_un) == 0:
raise newException(TransportAddressError, "Incorrect Unix address")
result.port = Port(1)
break
else:
raise newException(TransportAddressError,
"Could not initialize address!")
elif state == 1:
if code == multiCodec("tcp") or code == multiCodec("udp"):
if ma.protoArgument(pbuf) == 0:
raise newException(TransportAddressError, "Incorrect port")
result.port = Port((cast[uint16](pbuf[0]) shl 8) or
cast[uint16](pbuf[1]))
break
else:
raise newException(TransportAddressError,
"Could not initialize address!")
proc connect*(ma: MultiAddress, bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil): Future[StreamTransport] =
## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
let address = initTAddress(ma)
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
if ma[1].protoCode() != multiCodec("tcp"):
var retFuture = newFuture[StreamTransport]()
retFuture.fail(newException(TransportAddressError,
"Incorrect address type!"))
return retFuture
result = connect(address, bufferSize, child)
proc createStreamServer*[T](ma: MultiAddress,
cbproc: StreamCallback,
flags: set[ServerFlags] = {},
udata: ref T,
sock: AsyncFD = asyncInvalidSocket,
backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer =
## Create new TCP stream server which bounds to ``ma`` address.
var address = initTAddress(ma)
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
if ma[1].protoCode() != multiCodec("tcp"):
raise newException(TransportAddressError, "Incorrect address type!")
result = createStreamServer(address, cbproc, flags, udata, sock, backlog,
bufferSize, child, init)

View File

@ -170,6 +170,16 @@ const
"/p2p-circuit/50" "/p2p-circuit/50"
] ]
PathVectors = [
"/unix/tmp/p2pd.sock",
"/unix/a/b/c/d/e/f/g/h/i.sock"
]
PathExpects = [
"90030E2F746D702F703270642E736F636B",
"9003172F612F622F632F642F652F662F672F682F692E736F636B"
]
UtilitySuccessVectors = [ UtilitySuccessVectors = [
"/ip4/127.0.0.1/tcp/1024", "/ip4/127.0.0.1/tcp/1024",
"/ip4/127.0.0.1/udp/1024", "/ip4/127.0.0.1/udp/1024",
@ -247,3 +257,10 @@ suite "MultiAddress test suite":
for item in UtilityFailVectors: for item in UtilityFailVectors:
var a = MultiAddress.init(item) var a = MultiAddress.init(item)
check a.isWire() == false check a.isWire() == false
test "Path addresses serialization/deserialization":
for i in 0..<len(PathVectors):
var a = MultiAddress.init(PathVectors[i])
check:
hex(a) == PathExpects[i]
$a == PathVectors[i]