diff --git a/.appveyor.yml b/.appveyor.yml index a16d8ad..6a32d77 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -23,6 +23,21 @@ install: - IF "%PLATFORM%" == "x86" SET PATH=C:\mingw-w64\i686-6.3.0-posix-dwarf-rt_v5-rev1\mingw32\bin;%PATH% - IF "%PLATFORM%" == "x64" SET PATH=C:\mingw-w64\x86_64-8.1.0-posix-seh-rt_v6-rev0\mingw64\bin;%PATH% + # set path for produced Go binaries + - MKDIR goblin + - CD goblin + - SET GOPATH=%CD% + - SET PATH=%GOPATH%\bin;%PATH% + - CD .. + + # install and build go-libp2p-daemon + - go version + - git clone https://github.com/libp2p/go-libp2p-daemon + - CD go-libp2p-daemon + # - git checkout v0.0.1 + - go install ./... + - CD .. + # build nim from our own branch - this to avoid the day-to-day churn and # regressions of the fast-paced Nim development while maintaining the # flexibility to apply patches diff --git a/.travis.yml b/.travis.yml index 277aa91..07c2295 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,6 @@ language: c cache: directories: - NimBinaries - - $HOME/go/src/gx git: # when multiple CI builds are queued, the tested commit needs to be in the last X commits cloned with "--depth X" @@ -13,8 +12,9 @@ git: matrix: include: - os: linux - dist: xenial - go: 1.11.x + dist: trusty + before_install: + - export GOPATH=$HOME/go - os: osx addons: homebrew: @@ -28,13 +28,14 @@ install: # build nim from our own branch - this to avoid the day-to-day churn and # regressions of the fast-paced Nim development while maintaining the # flexibility to apply patches + - if [ "$TRAVIS_OS_NAME" = "linux" ]; then eval "$(gimme --force 1.12.7)"; fi - curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus/devel/build_nim.sh - env MAKE="make -j2" bash build_nim.sh Nim csources dist/nimble NimBinaries - export PATH="$PWD/Nim/bin:$GOPATH/bin:$PATH" - go version - git clone https://github.com/libp2p/go-libp2p-daemon - cd go-libp2p-daemon - - git checkout v0.0.1 + # - git checkout v0.0.1 - go install ./... - cd $HOME/build/status-im/nim-libp2p diff --git a/libp2p.nimble b/libp2p.nimble index c6f3be1..defe3d1 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -17,6 +17,5 @@ proc runTest(filename: string) = task test, "Runs the test suite": runTest "testnative" - when not defined(windows): - runTest "testdaemon" + runTest "testdaemon" diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index d5ebb58..82a4050 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -495,8 +495,7 @@ proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} = proc socketExists(address: MultiAddress): Future[bool] {.async.} = try: var transp = await connect(address) - transp.close() - await transp.join() + await transp.closeWait() result = true except: result = false @@ -538,21 +537,44 @@ else: # Not ready yet result = cast[int](getCurrentProcessId()) -proc getSocket(pattern: string): Future[MultiAddress] {.async.} = +proc getSocket(pattern: string, + count: ptr int): Future[MultiAddress] {.async.} = var sockname = "" var pid = $getProcessId() - while true: - inc(daemonsCount) - sockname = pattern % [pid, $daemonsCount] + sockname = pattern % [pid, $(count[])] + let tmpma = MultiAddress.init(sockname) + + if UNIX.match(tmpma): + while true: + count[] = count[] + 1 + sockname = pattern % [pid, $(count[])] + var ma = MultiAddress.init(sockname) + let res = await socketExists(ma) + if not res: + result = ma + break + elif TCP.match(tmpma): + sockname = pattern % [pid, "0"] var ma = MultiAddress.init(sockname) - let res = await socketExists(ma) - if not res: - result = ma - break + var sock = createAsyncSocket(ma) + if sock.bindAsyncSocket(ma): + # Socket was successfully bound, then its free to use + count[] = count[] + 1 + var ta = sock.getLocalAddress() + sockname = pattern % [pid, $ta.port] + result = MultiAddress.init(sockname) + closeSocket(sock) # This is forward declaration needed for newDaemonApi() proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} +when not defined(windows): + proc copyEnv(): StringTableRef = + ## This procedure copy all environment variables into StringTable. + result = newStringTable(modeStyleInsensitive) + for key, val in envPairs(): + result[key] = val + proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, bootstrapNodes: seq[string] = @[], id: string = "", @@ -560,8 +582,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, announcedAddresses: seq[MultiAddress] = @[], daemon = DefaultDaemonFile, sockpath = "", - patternSock = DefaultUnixSocketPattern, - patternHandler = DefaultUnixChildPattern, + patternSock = "", + patternHandler = "", poolSize = 10, gossipsubHeartbeatInterval = 0, gossipsubHeartbeatDelay = 0, @@ -590,10 +612,12 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, ## (default: "/unix/tmp/p2pd.sock"). ## ## ``patternSock`` - MultiAddress pattern string, used to start multiple - ## daemons (default: "/unix/tmp/nim-p2pd-$1-$2.sock"). + ## daemons (default on Unix: "/unix/tmp/nim-p2pd-$1-$2.sock", on Windows: + ## "/ip4/127.0.0.1/tcp/$2"). ## ## ``patternHandler`` - MultiAddress pattern string, used to establish - ## incoming channels (default: "/unix/tmp/nim-p2pd-handle-$1-$2.sock"). + ## incoming channels (default on Unix: "/unix/tmp/nim-p2pd-handle-$1-$2.sock", + ## on Windows: "/ip4/127.0.0.1/tcp/$2"). ## ## ``poolSize`` - size of connections pool (default: 10). ## @@ -610,22 +634,39 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, var args = newSeq[string]() var env: StringTableRef + when defined(windows): + var patternForSocket = if len(patternSock) > 0: + patternSock + else: + DefaultIpSocketPattern + var patternForChild = if len(patternHandler) > 0: + patternHandler + else: + DefaultIpChildPattern + else: + var patternForSocket = if len(patternSock) > 0: + patternSock + else: + DefaultUnixSocketPattern + var patternForChild = if len(patternHandler) > 0: + patternHandler + else: + DefaultUnixChildPattern + api.flags = flags api.servers = newSeq[P2PServer]() - api.pattern = patternHandler + api.pattern = patternForChild api.ucounter = 1 api.handlers = initTable[string, P2PStreamCallback]() if len(sockpath) == 0: - api.address = await getSocket(patternSock) + api.address = await getSocket(patternForSocket, addr daemonsCount) else: api.address = MultiAddress.init(sockpath) let res = await socketExists(api.address) if not res: raise newException(DaemonLocalError, "Could not connect to remote daemon") - inc(daemonsCount) - # DHTFull and DHTClient could not be present at the same time if DHTFull in flags and DHTClient in flags: api.flags.excl(DHTClient) @@ -639,7 +680,15 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, if {Bootstrap, WaitBootstrap} * api.flags != {}: args.add("-b") if Verbose in api.flags: - env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive) + when defined(windows): + # Currently enabling logging output is not a good idea, because we can't + # properly read process' stdout/stderr it can stuck on Windows. + env = nil + else: + env = copyEnv() + env["IPFS_LOGGING"] = "debug" + else: + env = nil if PSGossipSub in api.flags: args.add("-pubsub") args.add("-pubsubRouter=gossipsub") @@ -728,7 +777,7 @@ proc close*(api: DaemonAPI) {.async.} = server.server.stop() server.server.close() pending.add(server.server.join()) - await all(pending) + await allFutures(pending) for server in api.servers: let address = initTAddress(server.address) discard tryRemoveFile($address) @@ -866,9 +915,7 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback) {.async.} = ## Add stream handler ``handler`` for set of protocols ``protocols``. var transp = await api.newConnection() - let addrname = api.pattern % [$getProcessId(), $api.ucounter] - let maddress = MultiAddress.init(addrname) - inc(api.ucounter) + let maddress = await getSocket(api.pattern, addr api.ucounter) var server = createStreamServer(maddress, streamHandler, udata = api) try: for item in protocols: diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 72c7314..c915f15 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -386,6 +386,7 @@ const UDP* = pOr(pAnd(DNS, pEq("udp")), pAnd(IP, pEq("udp"))) UTP* = pAnd(UDP, pEq("utp")) QUIC* = pAnd(UDP, pEq("quic")) + UNIX* = pEq("unix") Unreliable* = pOr(UDP) @@ -729,13 +730,24 @@ proc init*(mtype: typedesc[MultiAddress]): MultiAddress = proc init*(mtype: typedesc[MultiAddress], address: IpAddress, protocol: Protocol, port: Port): MultiAddress = - # TODO: this can be more efficient - let protocol = case protocol - of IPPROTO_TCP: "/tcp/" - of IPPROTO_UDP: "/udp/" - else: raise newException(AssertionError, - "protocol should be either TCP or UDP") - MultiAddress.init("/ip4/" & $address & protocol & $port) + ## Initialize MultiAddress using stdlib's net.IpAddress (IPv4/IPv6) and + ## net.Protocol (UDP/TCP) information. + result.data = initVBuffer() + let familyProto = case address.family + of IpAddressFamily.IPv4: getProtocol("ip4") + of IpAddressFamily.IPv6: getProtocol("ip6") + let protoProto = case protocol + of IPPROTO_TCP: getProtocol("tcp") + of IPPROTO_UDP: getProtocol("udp") + else: raise newException(AssertionError, + "protocol should be either TCP or UDP") + result.data.write(familyProto.mcodec) + if not familyProto.coder.stringToBuffer($address, result.data): + raise newException(MultiAddressError, "Error encoding IPv4/IPv6 address") + result.data.write(protoProto.mcodec) + if not protoProto.coder.stringToBuffer($port, result.data): + raise newException(MultiAddressError, "Error encoding port number") + result.data.finish() proc isEmpty*(ma: MultiAddress): bool = ## Returns ``true``, if MultiAddress ``ma`` is empty or non initialized. diff --git a/libp2p/wire.nim b/libp2p/wire.nim index c2c4c05..1b7dc72 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -11,6 +11,11 @@ import chronos import multiaddress, multicodec +when defined(windows): + import winlean +else: + import posix + proc initTAddress*(ma: MultiAddress): TransportAddress = ## Initialize ``TransportAddress`` with MultiAddress ``ma``. ## @@ -22,17 +27,17 @@ proc initTAddress*(ma: MultiAddress): TransportAddress = if state == 0: if code == multiCodec("ip4"): result = TransportAddress(family: AddressFamily.IPv4) - if ma.protoArgument(result.address_v4) == 0: + if part.protoArgument(result.address_v4) == 0: raise newException(TransportAddressError, "Incorrect IPv4 address") inc(state) elif code == multiCodec("ip6"): result = TransportAddress(family: AddressFamily.IPv6) - if ma.protoArgument(result.address_v6) == 0: + if part.protoArgument(result.address_v6) == 0: raise newException(TransportAddressError, "Incorrect IPv6 address") inc(state) elif code == multiCodec("unix"): result = TransportAddress(family: AddressFamily.Unix) - if ma.protoArgument(result.address_un) == 0: + if part.protoArgument(result.address_un) == 0: raise newException(TransportAddressError, "Incorrect Unix address") result.port = Port(1) break @@ -41,7 +46,7 @@ proc initTAddress*(ma: MultiAddress): TransportAddress = "Could not initialize address!") elif state == 1: if code == multiCodec("tcp") or code == multiCodec("udp"): - if ma.protoArgument(pbuf) == 0: + if part.protoArgument(pbuf) == 0: raise newException(TransportAddressError, "Incorrect port") result.port = Port((cast[uint16](pbuf[0]) shl 8) or cast[uint16](pbuf[1])) @@ -55,6 +60,7 @@ proc connect*(ma: MultiAddress, bufferSize = DefaultStreamBufferSize, ## Open new connection to remote peer with address ``ma`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` is size of internal buffer for transport. + let address = initTAddress(ma) if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: if ma[1].protoCode() != multiCodec("tcp"): @@ -80,3 +86,57 @@ proc createStreamServer*[T](ma: MultiAddress, raise newException(TransportAddressError, "Incorrect address type!") result = createStreamServer(address, cbproc, flags, udata, sock, backlog, bufferSize, child, init) + +proc createAsyncSocket*(ma: MultiAddress): AsyncFD = + ## Create new asynchronous socket using MultiAddress' ``ma`` socket type and + ## protocol information. + ## + ## Returns ``asyncInvalidSocket`` on error. + var + socktype: SockType = SockType.SOCK_STREAM + protocol: Protocol = Protocol.IPPROTO_TCP + address: TransportAddress + + try: + address = initTAddress(ma) + except: + return asyncInvalidSocket + + if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: + if ma[1].protoCode() == multiCodec("udp"): + socktype = SockType.SOCK_DGRAM + protocol = Protocol.IPPROTO_UDP + elif ma[1].protoCode() == multiCodec("tcp"): + socktype = SockType.SOCK_STREAM + protocol = Protocol.IPPROTO_TCP + elif address.family in {AddressFamily.Unix}: + socktype = SockType.SOCK_STREAM + protocol = cast[Protocol](0) + else: + return asyncInvalidSocket + result = createAsyncSocket(address.getDomain(), socktype, protocol) + +proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool = + ## Bind socket ``sock`` to MultiAddress ``ma``. + var + saddr: Sockaddr_storage + slen: SockLen + address: TransportAddress + try: + address = initTAddress(ma) + except: + return false + toSAddr(address, saddr, slen) + if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr), slen) == 0: + result = true + else: + result = false + +proc getLocalAddress*(sock: AsyncFD): TransportAddress = + ## Retrieve local socket ``sock`` address. + var saddr: Sockaddr_storage + var slen = SockLen(sizeof(Sockaddr_storage)) + + if getsockname(SocketHandle(sock), cast[ptr SockAddr](addr saddr), + addr slen) == 0: + fromSAddr(addr saddr, slen, result) diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 3e24377..a341364 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -131,8 +131,8 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = # Publish test data via api1. await sleepAsync(500.milliseconds) await api1.pubsubPublish("test-topic", msgData) - var andfut = handlerFuture1 and handlerFuture2 - await andfut or sleepAsync(10.seconds) + var res = await one(allFutures(handlerFuture1, handlerFuture2), + sleepAsync(10.seconds)) await api1.close() await api2.close()