From 9f658c151e3ba9c3f7becedba9379b5c0ad4d52c Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Fri, 6 Jan 2023 11:14:38 +0100 Subject: [PATCH] Autonat refactoring (#834) --- libp2p/builders.nim | 2 +- libp2p/protocols/connectivity/autonat.nim | 324 ------------------ .../protocols/connectivity/autonat/client.nim | 69 ++++ .../protocols/connectivity/autonat/core.nim | 152 ++++++++ .../protocols/connectivity/autonat/server.nim | 141 ++++++++ .../connectivity/autonat/service.nim} | 21 +- ...{autonatstub.nim => autonatclientstub.nim} | 23 +- tests/testautonat.nim | 7 +- tests/testautonatservice.nim | 45 ++- 9 files changed, 416 insertions(+), 368 deletions(-) delete mode 100644 libp2p/protocols/connectivity/autonat.nim create mode 100644 libp2p/protocols/connectivity/autonat/client.nim create mode 100644 libp2p/protocols/connectivity/autonat/core.nim create mode 100644 libp2p/protocols/connectivity/autonat/server.nim rename libp2p/{services/autonatservice.nim => protocols/connectivity/autonat/service.nim} (92%) rename tests/stubs/{autonatstub.nim => autonatclientstub.nim} (56%) diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 5ff516970..6841d2da3 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -27,7 +27,7 @@ import crypto/crypto, transports/[transport, tcptransport], muxers/[muxer, mplex/mplex, yamux/yamux], protocols/[identify, secure/secure, secure/noise, rendezvous], - protocols/connectivity/[autonat, relay/relay, relay/client, relay/rtransport], + protocols/connectivity/[autonat/server, relay/relay, relay/client, relay/rtransport], connmanager, upgrademngrs/muxedupgrade, nameresolving/nameresolver, errors, utility diff --git a/libp2p/protocols/connectivity/autonat.nim b/libp2p/protocols/connectivity/autonat.nim deleted file mode 100644 index 47bd3a4d7..000000000 --- a/libp2p/protocols/connectivity/autonat.nim +++ /dev/null @@ -1,324 +0,0 @@ -# Nim-LibP2P -# Copyright (c) 2022 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import std/[options, sets, sequtils] -import stew/results -import chronos, chronicles, stew/objects -import ../protocol, - ../../switch, - ../../multiaddress, - ../../multicodec, - ../../peerid, - ../../utils/semaphore, - ../../errors - -logScope: - topics = "libp2p autonat" - -const - AutonatCodec* = "/libp2p/autonat/1.0.0" - AddressLimit = 8 - -type - AutonatError* = object of LPError - AutonatUnreachableError* = object of LPError - - MsgType* = enum - Dial = 0 - DialResponse = 1 - - ResponseStatus* = enum - Ok = 0 - DialError = 100 - DialRefused = 101 - BadRequest = 200 - InternalError = 300 - - AutonatPeerInfo* = object - id*: Option[PeerId] - addrs*: seq[MultiAddress] - - AutonatDial* = object - peerInfo*: Option[AutonatPeerInfo] - - AutonatDialResponse* = object - status*: ResponseStatus - text*: Option[string] - ma*: Option[MultiAddress] - - AutonatMsg* = object - msgType*: MsgType - dial*: Option[AutonatDial] - response*: Option[AutonatDialResponse] - -proc encode*(msg: AutonatMsg): ProtoBuffer = - result = initProtoBuffer() - result.write(1, msg.msgType.uint) - if msg.dial.isSome(): - var dial = initProtoBuffer() - if msg.dial.get().peerInfo.isSome(): - var bufferPeerInfo = initProtoBuffer() - let peerInfo = msg.dial.get().peerInfo.get() - if peerInfo.id.isSome(): - bufferPeerInfo.write(1, peerInfo.id.get()) - for ma in peerInfo.addrs: - bufferPeerInfo.write(2, ma.data.buffer) - bufferPeerInfo.finish() - dial.write(1, bufferPeerInfo.buffer) - dial.finish() - result.write(2, dial.buffer) - if msg.response.isSome(): - var bufferResponse = initProtoBuffer() - let response = msg.response.get() - bufferResponse.write(1, response.status.uint) - if response.text.isSome(): - bufferResponse.write(2, response.text.get()) - if response.ma.isSome(): - bufferResponse.write(3, response.ma.get()) - bufferResponse.finish() - result.write(3, bufferResponse.buffer) - result.finish() - -proc encode*(d: AutonatDial): ProtoBuffer = - result = initProtoBuffer() - result.write(1, MsgType.Dial.uint) - var dial = initProtoBuffer() - if d.peerInfo.isSome(): - var bufferPeerInfo = initProtoBuffer() - let peerInfo = d.peerInfo.get() - if peerInfo.id.isSome(): - bufferPeerInfo.write(1, peerInfo.id.get()) - for ma in peerInfo.addrs: - bufferPeerInfo.write(2, ma.data.buffer) - bufferPeerInfo.finish() - dial.write(1, bufferPeerInfo.buffer) - dial.finish() - result.write(2, dial.buffer) - result.finish() - -proc encode*(r: AutonatDialResponse): ProtoBuffer = - result = initProtoBuffer() - result.write(1, MsgType.DialResponse.uint) - var bufferResponse = initProtoBuffer() - bufferResponse.write(1, r.status.uint) - if r.text.isSome(): - bufferResponse.write(2, r.text.get()) - if r.ma.isSome(): - bufferResponse.write(3, r.ma.get()) - bufferResponse.finish() - result.write(3, bufferResponse.buffer) - result.finish() - -proc decode*(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] = - var - msgTypeOrd: uint32 - pbDial: ProtoBuffer - pbResponse: ProtoBuffer - msg: AutonatMsg - - let - pb = initProtoBuffer(buf) - r1 = pb.getField(1, msgTypeOrd) - r2 = pb.getField(2, pbDial) - r3 = pb.getField(3, pbResponse) - if r1.isErr() or r2.isErr() or r3.isErr(): return none(AutonatMsg) - - if r1.get() and not checkedEnumAssign(msg.msgType, msgTypeOrd): - return none(AutonatMsg) - if r2.get(): - var - pbPeerInfo: ProtoBuffer - dial: AutonatDial - let - r4 = pbDial.getField(1, pbPeerInfo) - if r4.isErr(): return none(AutonatMsg) - - var peerInfo: AutonatPeerInfo - if r4.get(): - var pid: PeerId - let - r5 = pbPeerInfo.getField(1, pid) - r6 = pbPeerInfo.getRepeatedField(2, peerInfo.addrs) - if r5.isErr() or r6.isErr(): return none(AutonatMsg) - if r5.get(): peerInfo.id = some(pid) - dial.peerInfo = some(peerInfo) - msg.dial = some(dial) - - if r3.get(): - var - statusOrd: uint - text: string - ma: MultiAddress - response: AutonatDialResponse - - let - r4 = pbResponse.getField(1, statusOrd) - r5 = pbResponse.getField(2, text) - r6 = pbResponse.getField(3, ma) - - if r4.isErr() or r5.isErr() or r6.isErr() or - (r4.get() and not checkedEnumAssign(response.status, statusOrd)): - return none(AutonatMsg) - if r5.get(): response.text = some(text) - if r6.get(): response.ma = some(ma) - msg.response = some(response) - - return some(msg) - -proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} = - let pb = AutonatDial(peerInfo: some(AutonatPeerInfo( - id: some(pid), - addrs: addrs - ))).encode() - await conn.writeLp(pb.buffer) - -proc sendResponseError(conn: Connection, status: ResponseStatus, text: string = "") {.async.} = - let pb = AutonatDialResponse( - status: status, - text: if text == "": none(string) else: some(text), - ma: none(MultiAddress) - ).encode() - await conn.writeLp(pb.buffer) - -proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = - let pb = AutonatDialResponse( - status: ResponseStatus.Ok, - text: some("Ok"), - ma: some(ma) - ).encode() - await conn.writeLp(pb.buffer) - -type - Autonat* = ref object of LPProtocol - sem: AsyncSemaphore - switch*: Switch - dialTimeout: Duration - -method dialMe*(a: Autonat, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()): - Future[MultiAddress] {.base, async.} = - - proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} = - if autonatMsg.isNone() or - autonatMsg.get().msgType != DialResponse or - autonatMsg.get().response.isNone() or - (autonatMsg.get().response.get().status == Ok and - autonatMsg.get().response.get().ma.isNone()): - raise newException(AutonatError, "Unexpected response") - else: - autonatMsg.get().response.get() - - let conn = - try: - if addrs.len == 0: - await a.switch.dial(pid, @[AutonatCodec]) - else: - await a.switch.dial(pid, addrs, AutonatCodec) - except CatchableError as err: - raise newException(AutonatError, "Unexpected error when dialling", err) - - defer: await conn.close() - await conn.sendDial(a.switch.peerInfo.peerId, a.switch.peerInfo.addrs) - let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) - return case response.status: - of ResponseStatus.Ok: - response.ma.get() - of ResponseStatus.DialError: - raise newException(AutonatUnreachableError, "Peer could not dial us back") - else: - raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get("")) - -proc tryDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} = - try: - await a.sem.acquire() - let ma = await a.switch.dialer.tryDial(conn.peerId, addrs).wait(a.dialTimeout) - if ma.isSome: - await conn.sendResponseOk(ma.get()) - else: - await conn.sendResponseError(DialError, "Missing observed address") - except CancelledError as exc: - raise exc - except CatchableError as exc: - await conn.sendResponseError(DialError, exc.msg) - finally: - a.sem.release() - -proc handleDial(a: Autonat, conn: Connection, msg: AutonatMsg): Future[void] = - if msg.dial.isNone() or msg.dial.get().peerInfo.isNone(): - return conn.sendResponseError(BadRequest, "Missing Peer Info") - let peerInfo = msg.dial.get().peerInfo.get() - if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId: - return conn.sendResponseError(BadRequest, "PeerId mismatch") - - if conn.observedAddr.isNone: - return conn.sendResponseError(BadRequest, "Missing observed address") - let observedAddr = conn.observedAddr.get() - - var isRelayed = observedAddr.contains(multiCodec("p2p-circuit")) - if isRelayed.isErr() or isRelayed.get(): - return conn.sendResponseError(DialRefused, "Refused to dial a relayed observed address") - let hostIp = observedAddr[0] - if hostIp.isErr() or not IP.match(hostIp.get()): - trace "wrong observed address", address=observedAddr - return conn.sendResponseError(InternalError, "Expected an IP address") - var addrs = initHashSet[MultiAddress]() - addrs.incl(observedAddr) - for ma in peerInfo.addrs: - isRelayed = ma.contains(multiCodec("p2p-circuit")) - if isRelayed.isErr() or isRelayed.get(): - continue - let maFirst = ma[0] - if maFirst.isErr() or not IP.match(maFirst.get()): - continue - - try: - addrs.incl( - if maFirst.get() == hostIp.get(): - ma - else: - let maEnd = ma[1..^1] - if maEnd.isErr(): continue - hostIp.get() & maEnd.get() - ) - except LPError as exc: - continue - if len(addrs) >= AddressLimit: - break - - if len(addrs) == 0: - return conn.sendResponseError(DialRefused, "No dialable address") - return a.tryDial(conn, toSeq(addrs)) - -proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T = - let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout) - autonat.init() - autonat - -method init*(a: Autonat) = - proc handleStream(conn: Connection, proto: string) {.async, gcsafe.} = - try: - let msgOpt = AutonatMsg.decode(await conn.readLp(1024)) - if msgOpt.isNone() or msgOpt.get().msgType != MsgType.Dial: - raise newException(AutonatError, "Received malformed message") - let msg = msgOpt.get() - await a.handleDial(conn, msg) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in autonat handler", exc = exc.msg, conn - finally: - trace "exiting autonat handler", conn - await conn.close() - - a.handler = handleStream - a.codec = AutonatCodec diff --git a/libp2p/protocols/connectivity/autonat/client.nim b/libp2p/protocols/connectivity/autonat/client.nim new file mode 100644 index 000000000..efae875af --- /dev/null +++ b/libp2p/protocols/connectivity/autonat/client.nim @@ -0,0 +1,69 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[options, sets, sequtils] +import stew/[results, objects] +import chronos, chronicles +import ../../../switch, + ../../../multiaddress, + ../../../peerid +import core + +export core + +logScope: + topics = "libp2p autonat" + +type + AutonatClient* = ref object of RootObj + +proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} = + let pb = AutonatDial(peerInfo: some(AutonatPeerInfo( + id: some(pid), + addrs: addrs + ))).encode() + await conn.writeLp(pb.buffer) + +method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()): + Future[MultiAddress] {.base, async.} = + + proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} = + if autonatMsg.isNone() or + autonatMsg.get().msgType != DialResponse or + autonatMsg.get().response.isNone() or + (autonatMsg.get().response.get().status == Ok and + autonatMsg.get().response.get().ma.isNone()): + raise newException(AutonatError, "Unexpected response") + else: + autonatMsg.get().response.get() + + let conn = + try: + if addrs.len == 0: + await switch.dial(pid, @[AutonatCodec]) + else: + await switch.dial(pid, addrs, AutonatCodec) + except CatchableError as err: + raise newException(AutonatError, "Unexpected error when dialling", err) + + defer: await conn.close() + await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs) + let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) + return case response.status: + of ResponseStatus.Ok: + response.ma.get() + of ResponseStatus.DialError: + raise newException(AutonatUnreachableError, "Peer could not dial us back") + else: + raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get("")) diff --git a/libp2p/protocols/connectivity/autonat/core.nim b/libp2p/protocols/connectivity/autonat/core.nim new file mode 100644 index 000000000..991ee0eab --- /dev/null +++ b/libp2p/protocols/connectivity/autonat/core.nim @@ -0,0 +1,152 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[options, sets, sequtils] +import stew/[results, objects] +import chronos, chronicles +import ../../../multiaddress, + ../../../peerid, + ../../../errors + +logScope: + topics = "libp2p autonat" + +const + AutonatCodec* = "/libp2p/autonat/1.0.0" + AddressLimit* = 8 + +type + AutonatError* = object of LPError + AutonatUnreachableError* = object of LPError + + MsgType* = enum + Dial = 0 + DialResponse = 1 + + ResponseStatus* = enum + Ok = 0 + DialError = 100 + DialRefused = 101 + BadRequest = 200 + InternalError = 300 + + AutonatPeerInfo* = object + id*: Option[PeerId] + addrs*: seq[MultiAddress] + + AutonatDial* = object + peerInfo*: Option[AutonatPeerInfo] + + AutonatDialResponse* = object + status*: ResponseStatus + text*: Option[string] + ma*: Option[MultiAddress] + + AutonatMsg* = object + msgType*: MsgType + dial*: Option[AutonatDial] + response*: Option[AutonatDialResponse] + +proc encode(p: AutonatPeerInfo): ProtoBuffer = + result = initProtoBuffer() + if p.id.isSome(): + result.write(1, p.id.get()) + for ma in p.addrs: + result.write(2, ma.data.buffer) + result.finish() + +proc encode*(d: AutonatDial): ProtoBuffer = + result = initProtoBuffer() + result.write(1, MsgType.Dial.uint) + var dial = initProtoBuffer() + if d.peerInfo.isSome(): + dial.write(1, encode(d.peerInfo.get())) + dial.finish() + result.write(2, dial.buffer) + result.finish() + +proc encode*(r: AutonatDialResponse): ProtoBuffer = + result = initProtoBuffer() + result.write(1, MsgType.DialResponse.uint) + var bufferResponse = initProtoBuffer() + bufferResponse.write(1, r.status.uint) + if r.text.isSome(): + bufferResponse.write(2, r.text.get()) + if r.ma.isSome(): + bufferResponse.write(3, r.ma.get()) + bufferResponse.finish() + result.write(3, bufferResponse.buffer) + result.finish() + +proc encode*(msg: AutonatMsg): ProtoBuffer = + if msg.dial.isSome(): + return encode(msg.dial.get()) + if msg.response.isSome(): + return encode(msg.response.get()) + +proc decode*(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] = + var + msgTypeOrd: uint32 + pbDial: ProtoBuffer + pbResponse: ProtoBuffer + msg: AutonatMsg + + let + pb = initProtoBuffer(buf) + r1 = pb.getField(1, msgTypeOrd) + r2 = pb.getField(2, pbDial) + r3 = pb.getField(3, pbResponse) + if r1.isErr() or r2.isErr() or r3.isErr(): return none(AutonatMsg) + + if r1.get() and not checkedEnumAssign(msg.msgType, msgTypeOrd): + return none(AutonatMsg) + if r2.get(): + var + pbPeerInfo: ProtoBuffer + dial: AutonatDial + let + r4 = pbDial.getField(1, pbPeerInfo) + if r4.isErr(): return none(AutonatMsg) + + var peerInfo: AutonatPeerInfo + if r4.get(): + var pid: PeerId + let + r5 = pbPeerInfo.getField(1, pid) + r6 = pbPeerInfo.getRepeatedField(2, peerInfo.addrs) + if r5.isErr() or r6.isErr(): return none(AutonatMsg) + if r5.get(): peerInfo.id = some(pid) + dial.peerInfo = some(peerInfo) + msg.dial = some(dial) + + if r3.get(): + var + statusOrd: uint + text: string + ma: MultiAddress + response: AutonatDialResponse + + let + r4 = pbResponse.getField(1, statusOrd) + r5 = pbResponse.getField(2, text) + r6 = pbResponse.getField(3, ma) + + if r4.isErr() or r5.isErr() or r6.isErr() or + (r4.get() and not checkedEnumAssign(response.status, statusOrd)): + return none(AutonatMsg) + if r5.get(): response.text = some(text) + if r6.get(): response.ma = some(ma) + msg.response = some(response) + + return some(msg) \ No newline at end of file diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim new file mode 100644 index 000000000..1f17ef17f --- /dev/null +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -0,0 +1,141 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[options, sets, sequtils] +import stew/results +import chronos, chronicles, stew/objects +import ../../protocol, + ../../../switch, + ../../../multiaddress, + ../../../multicodec, + ../../../peerid, + ../../../utils/semaphore, + ../../../errors +import core + +export core + +logScope: + topics = "libp2p autonat" + +type + Autonat* = ref object of LPProtocol + sem: AsyncSemaphore + switch*: Switch + dialTimeout: Duration + +proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} = + let pb = AutonatDial(peerInfo: some(AutonatPeerInfo( + id: some(pid), + addrs: addrs + ))).encode() + await conn.writeLp(pb.buffer) + +proc sendResponseError(conn: Connection, status: ResponseStatus, text: string = "") {.async.} = + let pb = AutonatDialResponse( + status: status, + text: if text == "": none(string) else: some(text), + ma: none(MultiAddress) + ).encode() + await conn.writeLp(pb.buffer) + +proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = + let pb = AutonatDialResponse( + status: ResponseStatus.Ok, + text: some("Ok"), + ma: some(ma) + ).encode() + await conn.writeLp(pb.buffer) + +proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} = + try: + await autonat.sem.acquire() + let ma = await autonat.switch.dialer.tryDial(conn.peerId, addrs).wait(autonat.dialTimeout) + if ma.isSome: + await conn.sendResponseOk(ma.get()) + else: + await conn.sendResponseError(DialError, "Missing observed address") + except CancelledError as exc: + raise exc + except CatchableError as exc: + await conn.sendResponseError(DialError, exc.msg) + finally: + autonat.sem.release() + +proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[void] = + if msg.dial.isNone() or msg.dial.get().peerInfo.isNone(): + return conn.sendResponseError(BadRequest, "Missing Peer Info") + let peerInfo = msg.dial.get().peerInfo.get() + if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId: + return conn.sendResponseError(BadRequest, "PeerId mismatch") + + if conn.observedAddr.isNone: + return conn.sendResponseError(BadRequest, "Missing observed address") + let observedAddr = conn.observedAddr.get() + + var isRelayed = observedAddr.contains(multiCodec("p2p-circuit")) + if isRelayed.isErr() or isRelayed.get(): + return conn.sendResponseError(DialRefused, "Refused to dial a relayed observed address") + let hostIp = observedAddr[0] + if hostIp.isErr() or not IP.match(hostIp.get()): + trace "wrong observed address", address=observedAddr + return conn.sendResponseError(InternalError, "Expected an IP address") + var addrs = initHashSet[MultiAddress]() + addrs.incl(observedAddr) + for ma in peerInfo.addrs: + isRelayed = ma.contains(multiCodec("p2p-circuit")) + if isRelayed.isErr() or isRelayed.get(): + continue + let maFirst = ma[0] + if maFirst.isErr() or not IP.match(maFirst.get()): + continue + + try: + addrs.incl( + if maFirst.get() == hostIp.get(): + ma + else: + let maEnd = ma[1..^1] + if maEnd.isErr(): continue + hostIp.get() & maEnd.get() + ) + except LPError as exc: + continue + if len(addrs) >= AddressLimit: + break + + if len(addrs) == 0: + return conn.sendResponseError(DialRefused, "No dialable address") + return autonat.tryDial(conn, toSeq(addrs)) + +proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T = + let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout) + proc handleStream(conn: Connection, proto: string) {.async, gcsafe.} = + try: + let msgOpt = AutonatMsg.decode(await conn.readLp(1024)) + if msgOpt.isNone() or msgOpt.get().msgType != MsgType.Dial: + raise newException(AutonatError, "Received malformed message") + let msg = msgOpt.get() + await autonat.handleDial(conn, msg) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception in autonat handler", exc = exc.msg, conn + finally: + trace "exiting autonat handler", conn + await conn.close() + + autonat.handler = handleStream + autonat.codec = AutonatCodec + autonat diff --git a/libp2p/services/autonatservice.nim b/libp2p/protocols/connectivity/autonat/service.nim similarity index 92% rename from libp2p/services/autonatservice.nim rename to libp2p/protocols/connectivity/autonat/service.nim index 0b26fd081..e64963a99 100644 --- a/libp2p/services/autonatservice.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -14,10 +14,13 @@ else: import std/[options, deques, sequtils] import chronos, metrics -import ../switch -import ../protocols/[connectivity/autonat] -import ../utils/heartbeat -import ../crypto/crypto +import ../../../switch +import client +import ../../../utils/heartbeat +import ../../../crypto/crypto + +logScope: + topics = "libp2p autonatservice" declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability confidence", labels = ["reachability"]) @@ -28,7 +31,7 @@ type networkReachability: NetworkReachability confidence: Option[float] answers: Deque[NetworkReachability] - autonat: Autonat + autonatClient: AutonatClient statusAndConfidenceHandler: StatusAndConfidenceHandler rng: ref HmacDrbgContext scheduleInterval: Option[Duration] @@ -45,7 +48,7 @@ type proc new*( T: typedesc[AutonatService], - autonat: Autonat, + autonatClient: AutonatClient, rng: ref HmacDrbgContext, scheduleInterval: Option[Duration] = none(Duration), askNewConnectedPeers = true, @@ -58,7 +61,7 @@ proc new*( networkReachability: Unknown, confidence: none(float), answers: initDeque[NetworkReachability](), - autonat: autonat, + autonatClient: autonatClient, rng: rng, askNewConnectedPeers: askNewConnectedPeers, numPeersToAsk: numPeersToAsk, @@ -94,11 +97,11 @@ proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} = trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence -proc askPeer(self: AutonatService, s: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} = +proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} = trace "Asking for reachability", peerId = $peerId let ans = try: - discard await self.autonat.dialMe(peerId).wait(self.dialTimeout) + discard await self.autonatClient.dialMe(switch, peerId).wait(self.dialTimeout) Reachable except AutonatUnreachableError: trace "dialMe answer is not reachable", peerId = $peerId diff --git a/tests/stubs/autonatstub.nim b/tests/stubs/autonatclientstub.nim similarity index 56% rename from tests/stubs/autonatstub.nim rename to tests/stubs/autonatclientstub.nim index e05da826a..1dd4c03f7 100644 --- a/tests/stubs/autonatstub.nim +++ b/tests/stubs/autonatclientstub.nim @@ -1,3 +1,12 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + {.used.} when (NimMajor, NimMinor) < (1, 4): @@ -6,12 +15,13 @@ else: {.push raises: [].} import chronos -import ../../libp2p/protocols/connectivity/autonat -import ../../libp2p/peerid -import ../../libp2p/multiaddress +import ../../libp2p/[protocols/connectivity/autonat/client, + peerid, + multiaddress, + switch] type - AutonatStub* = ref object of Autonat + AutonatClientStub* = ref object of AutonatClient answer*: Answer dials: int expectedDials: int @@ -22,11 +32,12 @@ type NotReachable, Unknown -proc new*(T: typedesc[AutonatStub], expectedDials: int): T = +proc new*(T: typedesc[AutonatClientStub], expectedDials: int): T = return T(dials: 0, expectedDials: expectedDials, finished: newFuture[void]()) method dialMe*( - self: AutonatStub, + self: AutonatClientStub, + switch: Switch, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()): Future[MultiAddress] {.async.} = diff --git a/tests/testautonat.nim b/tests/testautonat.nim index 7542cddf5..a945afa89 100644 --- a/tests/testautonat.nim +++ b/tests/testautonat.nim @@ -5,7 +5,8 @@ import transports/tcptransport, upgrademngrs/upgrade, builders, - protocols/connectivity/autonat + protocols/connectivity/autonat/client, + protocols/connectivity/autonat/server, ], ./helpers @@ -44,7 +45,7 @@ suite "Autonat": await dst.start() await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) - let ma = await Autonat.new(src).dialMe(dst.peerInfo.peerId, dst.peerInfo.addrs) + let ma = await AutonatClient.new().dialMe(src, dst.peerInfo.peerId, dst.peerInfo.addrs) check ma in src.peerInfo.addrs await allFutures(src.stop(), dst.stop()) @@ -58,7 +59,7 @@ suite "Autonat": await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) expect AutonatUnreachableError: - discard await Autonat.new(src).dialMe(dst.peerInfo.peerId, dst.peerInfo.addrs) + discard await AutonatClient.new().dialMe(src, dst.peerInfo.peerId, dst.peerInfo.addrs) await allFutures(src.stop(), dst.stop()) asyncTest "Timeout is triggered in autonat handle": diff --git a/tests/testautonatservice.nim b/tests/testautonatservice.nim index 0d0c5f3bb..cafb24eb2 100644 --- a/tests/testautonatservice.nim +++ b/tests/testautonatservice.nim @@ -12,10 +12,10 @@ import chronos, metrics import unittest2 import ../libp2p/[builders, switch, - services/autonatservice, - protocols/connectivity/autonat] + protocols/connectivity/autonat/client, + protocols/connectivity/autonat/service] import ./helpers -import stubs/autonatstub +import stubs/autonatclientstub proc createSwitch(autonatSvc: Service = nil, withAutonat = true): Switch = var builder = SwitchBuilder.new() @@ -39,10 +39,10 @@ suite "Autonat Service": asyncTest "Peer must be not reachable": - let autonatStub = AutonatStub.new(expectedDials = 3) - autonatStub.answer = NotReachable + let autonatClientStub = AutonatClientStub.new(expectedDials = 3) + autonatClientStub.answer = NotReachable - let autonatService = AutonatService.new(autonatStub, newRng()) + let autonatService = AutonatService.new(autonatClientStub, newRng()) let switch1 = createSwitch(autonatService) let switch2 = createSwitch() @@ -60,7 +60,7 @@ suite "Autonat Service": await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) - await autonatStub.finished + await autonatClientStub.finished check autonatService.networkReachability() == NetworkReachability.NotReachable check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 @@ -70,9 +70,7 @@ suite "Autonat Service": asyncTest "Peer must be reachable": - let autonat = Autonat.new(switch = nil) - - let autonatService = AutonatService.new(autonat, newRng(), some(1.seconds)) + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds)) let switch1 = createSwitch(autonatService) let switch2 = createSwitch() @@ -86,7 +84,6 @@ suite "Autonat Service": if not awaiter.finished: awaiter.complete() - autonat.switch = switch1 check autonatService.networkReachability() == NetworkReachability.Unknown autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) @@ -110,10 +107,10 @@ suite "Autonat Service": asyncTest "Peer must be not reachable and then reachable": - let autonatStub = AutonatStub.new(expectedDials = 6) - autonatStub.answer = NotReachable + let autonatClientStub = AutonatClientStub.new(expectedDials = 6) + autonatClientStub.answer = NotReachable - let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds)) + let autonatService = AutonatService.new(autonatClientStub, newRng(), some(1.seconds)) let switch1 = createSwitch(autonatService) let switch2 = createSwitch() @@ -125,7 +122,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: - autonatStub.answer = Reachable + autonatClientStub.answer = Reachable awaiter.complete() check autonatService.networkReachability() == NetworkReachability.Unknown @@ -146,7 +143,7 @@ suite "Autonat Service": check autonatService.networkReachability() == NetworkReachability.NotReachable check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 - await autonatStub.finished + await autonatClientStub.finished check autonatService.networkReachability() == NetworkReachability.Reachable check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3 @@ -154,9 +151,8 @@ suite "Autonat Service": await allFuturesThrowing(switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) asyncTest "Peer must be reachable when one connected peer has autonat disabled": - let autonat = Autonat.new(switch = nil) - let autonatService = AutonatService.new(autonat, newRng(), some(1.seconds), maxQueueSize = 2) + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 2) let switch1 = createSwitch(autonatService) let switch2 = createSwitch(withAutonat = false) @@ -170,7 +166,6 @@ suite "Autonat Service": if not awaiter.finished: awaiter.complete() - autonat.switch = switch1 check autonatService.networkReachability() == NetworkReachability.Unknown autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) @@ -194,10 +189,10 @@ suite "Autonat Service": asyncTest "Unknown answers must be ignored": - let autonatStub = AutonatStub.new(expectedDials = 6) - autonatStub.answer = NotReachable + let autonatClientStub = AutonatClientStub.new(expectedDials = 6) + autonatClientStub.answer = NotReachable - let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds), maxQueueSize = 3) + let autonatService = AutonatService.new(autonatClientStub, newRng(), some(1.seconds), maxQueueSize = 3) let switch1 = createSwitch(autonatService) let switch2 = createSwitch() @@ -209,7 +204,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: - autonatStub.answer = Unknown + autonatClientStub.answer = Unknown awaiter.complete() check autonatService.networkReachability() == NetworkReachability.Unknown @@ -230,7 +225,7 @@ suite "Autonat Service": check autonatService.networkReachability() == NetworkReachability.NotReachable check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3 - await autonatStub.finished + await autonatClientStub.finished check autonatService.networkReachability() == NetworkReachability.NotReachable check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3 @@ -240,7 +235,7 @@ suite "Autonat Service": asyncTest "Calling setup and stop twice must work": let switch = createSwitch() - let autonatService = AutonatService.new(AutonatStub.new(expectedDials = 0), newRng(), some(1.seconds)) + let autonatService = AutonatService.new(AutonatClientStub.new(expectedDials = 0), newRng(), some(1.seconds)) check (await autonatService.setup(switch)) == true check (await autonatService.setup(switch)) == false