Autonat refactoring (#834)

This commit is contained in:
diegomrsantos 2023-01-06 11:14:38 +01:00 committed by GitHub
parent e304ad0f7e
commit 9f658c151e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 416 additions and 368 deletions

View File

@ -27,7 +27,7 @@ import
crypto/crypto, transports/[transport, tcptransport], crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, yamux/yamux], muxers/[muxer, mplex/mplex, yamux/yamux],
protocols/[identify, secure/secure, secure/noise, rendezvous], protocols/[identify, secure/secure, secure/noise, rendezvous],
protocols/connectivity/[autonat, relay/relay, relay/client, relay/rtransport], protocols/connectivity/[autonat/server, relay/relay, relay/client, relay/rtransport],
connmanager, upgrademngrs/muxedupgrade, connmanager, upgrademngrs/muxedupgrade,
nameresolving/nameresolver, nameresolving/nameresolver,
errors, utility errors, utility

View File

@ -1,324 +0,0 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[options, sets, sequtils]
import stew/results
import chronos, chronicles, stew/objects
import ../protocol,
../../switch,
../../multiaddress,
../../multicodec,
../../peerid,
../../utils/semaphore,
../../errors
logScope:
topics = "libp2p autonat"
const
AutonatCodec* = "/libp2p/autonat/1.0.0"
AddressLimit = 8
type
AutonatError* = object of LPError
AutonatUnreachableError* = object of LPError
MsgType* = enum
Dial = 0
DialResponse = 1
ResponseStatus* = enum
Ok = 0
DialError = 100
DialRefused = 101
BadRequest = 200
InternalError = 300
AutonatPeerInfo* = object
id*: Option[PeerId]
addrs*: seq[MultiAddress]
AutonatDial* = object
peerInfo*: Option[AutonatPeerInfo]
AutonatDialResponse* = object
status*: ResponseStatus
text*: Option[string]
ma*: Option[MultiAddress]
AutonatMsg* = object
msgType*: MsgType
dial*: Option[AutonatDial]
response*: Option[AutonatDialResponse]
proc encode*(msg: AutonatMsg): ProtoBuffer =
result = initProtoBuffer()
result.write(1, msg.msgType.uint)
if msg.dial.isSome():
var dial = initProtoBuffer()
if msg.dial.get().peerInfo.isSome():
var bufferPeerInfo = initProtoBuffer()
let peerInfo = msg.dial.get().peerInfo.get()
if peerInfo.id.isSome():
bufferPeerInfo.write(1, peerInfo.id.get())
for ma in peerInfo.addrs:
bufferPeerInfo.write(2, ma.data.buffer)
bufferPeerInfo.finish()
dial.write(1, bufferPeerInfo.buffer)
dial.finish()
result.write(2, dial.buffer)
if msg.response.isSome():
var bufferResponse = initProtoBuffer()
let response = msg.response.get()
bufferResponse.write(1, response.status.uint)
if response.text.isSome():
bufferResponse.write(2, response.text.get())
if response.ma.isSome():
bufferResponse.write(3, response.ma.get())
bufferResponse.finish()
result.write(3, bufferResponse.buffer)
result.finish()
proc encode*(d: AutonatDial): ProtoBuffer =
result = initProtoBuffer()
result.write(1, MsgType.Dial.uint)
var dial = initProtoBuffer()
if d.peerInfo.isSome():
var bufferPeerInfo = initProtoBuffer()
let peerInfo = d.peerInfo.get()
if peerInfo.id.isSome():
bufferPeerInfo.write(1, peerInfo.id.get())
for ma in peerInfo.addrs:
bufferPeerInfo.write(2, ma.data.buffer)
bufferPeerInfo.finish()
dial.write(1, bufferPeerInfo.buffer)
dial.finish()
result.write(2, dial.buffer)
result.finish()
proc encode*(r: AutonatDialResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, MsgType.DialResponse.uint)
var bufferResponse = initProtoBuffer()
bufferResponse.write(1, r.status.uint)
if r.text.isSome():
bufferResponse.write(2, r.text.get())
if r.ma.isSome():
bufferResponse.write(3, r.ma.get())
bufferResponse.finish()
result.write(3, bufferResponse.buffer)
result.finish()
proc decode*(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] =
var
msgTypeOrd: uint32
pbDial: ProtoBuffer
pbResponse: ProtoBuffer
msg: AutonatMsg
let
pb = initProtoBuffer(buf)
r1 = pb.getField(1, msgTypeOrd)
r2 = pb.getField(2, pbDial)
r3 = pb.getField(3, pbResponse)
if r1.isErr() or r2.isErr() or r3.isErr(): return none(AutonatMsg)
if r1.get() and not checkedEnumAssign(msg.msgType, msgTypeOrd):
return none(AutonatMsg)
if r2.get():
var
pbPeerInfo: ProtoBuffer
dial: AutonatDial
let
r4 = pbDial.getField(1, pbPeerInfo)
if r4.isErr(): return none(AutonatMsg)
var peerInfo: AutonatPeerInfo
if r4.get():
var pid: PeerId
let
r5 = pbPeerInfo.getField(1, pid)
r6 = pbPeerInfo.getRepeatedField(2, peerInfo.addrs)
if r5.isErr() or r6.isErr(): return none(AutonatMsg)
if r5.get(): peerInfo.id = some(pid)
dial.peerInfo = some(peerInfo)
msg.dial = some(dial)
if r3.get():
var
statusOrd: uint
text: string
ma: MultiAddress
response: AutonatDialResponse
let
r4 = pbResponse.getField(1, statusOrd)
r5 = pbResponse.getField(2, text)
r6 = pbResponse.getField(3, ma)
if r4.isErr() or r5.isErr() or r6.isErr() or
(r4.get() and not checkedEnumAssign(response.status, statusOrd)):
return none(AutonatMsg)
if r5.get(): response.text = some(text)
if r6.get(): response.ma = some(ma)
msg.response = some(response)
return some(msg)
proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} =
let pb = AutonatDial(peerInfo: some(AutonatPeerInfo(
id: some(pid),
addrs: addrs
))).encode()
await conn.writeLp(pb.buffer)
proc sendResponseError(conn: Connection, status: ResponseStatus, text: string = "") {.async.} =
let pb = AutonatDialResponse(
status: status,
text: if text == "": none(string) else: some(text),
ma: none(MultiAddress)
).encode()
await conn.writeLp(pb.buffer)
proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} =
let pb = AutonatDialResponse(
status: ResponseStatus.Ok,
text: some("Ok"),
ma: some(ma)
).encode()
await conn.writeLp(pb.buffer)
type
Autonat* = ref object of LPProtocol
sem: AsyncSemaphore
switch*: Switch
dialTimeout: Duration
method dialMe*(a: Autonat, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()):
Future[MultiAddress] {.base, async.} =
proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} =
if autonatMsg.isNone() or
autonatMsg.get().msgType != DialResponse or
autonatMsg.get().response.isNone() or
(autonatMsg.get().response.get().status == Ok and
autonatMsg.get().response.get().ma.isNone()):
raise newException(AutonatError, "Unexpected response")
else:
autonatMsg.get().response.get()
let conn =
try:
if addrs.len == 0:
await a.switch.dial(pid, @[AutonatCodec])
else:
await a.switch.dial(pid, addrs, AutonatCodec)
except CatchableError as err:
raise newException(AutonatError, "Unexpected error when dialling", err)
defer: await conn.close()
await conn.sendDial(a.switch.peerInfo.peerId, a.switch.peerInfo.addrs)
let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024)))
return case response.status:
of ResponseStatus.Ok:
response.ma.get()
of ResponseStatus.DialError:
raise newException(AutonatUnreachableError, "Peer could not dial us back")
else:
raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get(""))
proc tryDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
try:
await a.sem.acquire()
let ma = await a.switch.dialer.tryDial(conn.peerId, addrs).wait(a.dialTimeout)
if ma.isSome:
await conn.sendResponseOk(ma.get())
else:
await conn.sendResponseError(DialError, "Missing observed address")
except CancelledError as exc:
raise exc
except CatchableError as exc:
await conn.sendResponseError(DialError, exc.msg)
finally:
a.sem.release()
proc handleDial(a: Autonat, conn: Connection, msg: AutonatMsg): Future[void] =
if msg.dial.isNone() or msg.dial.get().peerInfo.isNone():
return conn.sendResponseError(BadRequest, "Missing Peer Info")
let peerInfo = msg.dial.get().peerInfo.get()
if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId:
return conn.sendResponseError(BadRequest, "PeerId mismatch")
if conn.observedAddr.isNone:
return conn.sendResponseError(BadRequest, "Missing observed address")
let observedAddr = conn.observedAddr.get()
var isRelayed = observedAddr.contains(multiCodec("p2p-circuit"))
if isRelayed.isErr() or isRelayed.get():
return conn.sendResponseError(DialRefused, "Refused to dial a relayed observed address")
let hostIp = observedAddr[0]
if hostIp.isErr() or not IP.match(hostIp.get()):
trace "wrong observed address", address=observedAddr
return conn.sendResponseError(InternalError, "Expected an IP address")
var addrs = initHashSet[MultiAddress]()
addrs.incl(observedAddr)
for ma in peerInfo.addrs:
isRelayed = ma.contains(multiCodec("p2p-circuit"))
if isRelayed.isErr() or isRelayed.get():
continue
let maFirst = ma[0]
if maFirst.isErr() or not IP.match(maFirst.get()):
continue
try:
addrs.incl(
if maFirst.get() == hostIp.get():
ma
else:
let maEnd = ma[1..^1]
if maEnd.isErr(): continue
hostIp.get() & maEnd.get()
)
except LPError as exc:
continue
if len(addrs) >= AddressLimit:
break
if len(addrs) == 0:
return conn.sendResponseError(DialRefused, "No dialable address")
return a.tryDial(conn, toSeq(addrs))
proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T =
let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout)
autonat.init()
autonat
method init*(a: Autonat) =
proc handleStream(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let msgOpt = AutonatMsg.decode(await conn.readLp(1024))
if msgOpt.isNone() or msgOpt.get().msgType != MsgType.Dial:
raise newException(AutonatError, "Received malformed message")
let msg = msgOpt.get()
await a.handleDial(conn, msg)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in autonat handler", exc = exc.msg, conn
finally:
trace "exiting autonat handler", conn
await conn.close()
a.handler = handleStream
a.codec = AutonatCodec

View File

@ -0,0 +1,69 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[options, sets, sequtils]
import stew/[results, objects]
import chronos, chronicles
import ../../../switch,
../../../multiaddress,
../../../peerid
import core
export core
logScope:
topics = "libp2p autonat"
type
AutonatClient* = ref object of RootObj
proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} =
let pb = AutonatDial(peerInfo: some(AutonatPeerInfo(
id: some(pid),
addrs: addrs
))).encode()
await conn.writeLp(pb.buffer)
method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()):
Future[MultiAddress] {.base, async.} =
proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} =
if autonatMsg.isNone() or
autonatMsg.get().msgType != DialResponse or
autonatMsg.get().response.isNone() or
(autonatMsg.get().response.get().status == Ok and
autonatMsg.get().response.get().ma.isNone()):
raise newException(AutonatError, "Unexpected response")
else:
autonatMsg.get().response.get()
let conn =
try:
if addrs.len == 0:
await switch.dial(pid, @[AutonatCodec])
else:
await switch.dial(pid, addrs, AutonatCodec)
except CatchableError as err:
raise newException(AutonatError, "Unexpected error when dialling", err)
defer: await conn.close()
await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs)
let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024)))
return case response.status:
of ResponseStatus.Ok:
response.ma.get()
of ResponseStatus.DialError:
raise newException(AutonatUnreachableError, "Peer could not dial us back")
else:
raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get(""))

View File

@ -0,0 +1,152 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[options, sets, sequtils]
import stew/[results, objects]
import chronos, chronicles
import ../../../multiaddress,
../../../peerid,
../../../errors
logScope:
topics = "libp2p autonat"
const
AutonatCodec* = "/libp2p/autonat/1.0.0"
AddressLimit* = 8
type
AutonatError* = object of LPError
AutonatUnreachableError* = object of LPError
MsgType* = enum
Dial = 0
DialResponse = 1
ResponseStatus* = enum
Ok = 0
DialError = 100
DialRefused = 101
BadRequest = 200
InternalError = 300
AutonatPeerInfo* = object
id*: Option[PeerId]
addrs*: seq[MultiAddress]
AutonatDial* = object
peerInfo*: Option[AutonatPeerInfo]
AutonatDialResponse* = object
status*: ResponseStatus
text*: Option[string]
ma*: Option[MultiAddress]
AutonatMsg* = object
msgType*: MsgType
dial*: Option[AutonatDial]
response*: Option[AutonatDialResponse]
proc encode(p: AutonatPeerInfo): ProtoBuffer =
result = initProtoBuffer()
if p.id.isSome():
result.write(1, p.id.get())
for ma in p.addrs:
result.write(2, ma.data.buffer)
result.finish()
proc encode*(d: AutonatDial): ProtoBuffer =
result = initProtoBuffer()
result.write(1, MsgType.Dial.uint)
var dial = initProtoBuffer()
if d.peerInfo.isSome():
dial.write(1, encode(d.peerInfo.get()))
dial.finish()
result.write(2, dial.buffer)
result.finish()
proc encode*(r: AutonatDialResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, MsgType.DialResponse.uint)
var bufferResponse = initProtoBuffer()
bufferResponse.write(1, r.status.uint)
if r.text.isSome():
bufferResponse.write(2, r.text.get())
if r.ma.isSome():
bufferResponse.write(3, r.ma.get())
bufferResponse.finish()
result.write(3, bufferResponse.buffer)
result.finish()
proc encode*(msg: AutonatMsg): ProtoBuffer =
if msg.dial.isSome():
return encode(msg.dial.get())
if msg.response.isSome():
return encode(msg.response.get())
proc decode*(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] =
var
msgTypeOrd: uint32
pbDial: ProtoBuffer
pbResponse: ProtoBuffer
msg: AutonatMsg
let
pb = initProtoBuffer(buf)
r1 = pb.getField(1, msgTypeOrd)
r2 = pb.getField(2, pbDial)
r3 = pb.getField(3, pbResponse)
if r1.isErr() or r2.isErr() or r3.isErr(): return none(AutonatMsg)
if r1.get() and not checkedEnumAssign(msg.msgType, msgTypeOrd):
return none(AutonatMsg)
if r2.get():
var
pbPeerInfo: ProtoBuffer
dial: AutonatDial
let
r4 = pbDial.getField(1, pbPeerInfo)
if r4.isErr(): return none(AutonatMsg)
var peerInfo: AutonatPeerInfo
if r4.get():
var pid: PeerId
let
r5 = pbPeerInfo.getField(1, pid)
r6 = pbPeerInfo.getRepeatedField(2, peerInfo.addrs)
if r5.isErr() or r6.isErr(): return none(AutonatMsg)
if r5.get(): peerInfo.id = some(pid)
dial.peerInfo = some(peerInfo)
msg.dial = some(dial)
if r3.get():
var
statusOrd: uint
text: string
ma: MultiAddress
response: AutonatDialResponse
let
r4 = pbResponse.getField(1, statusOrd)
r5 = pbResponse.getField(2, text)
r6 = pbResponse.getField(3, ma)
if r4.isErr() or r5.isErr() or r6.isErr() or
(r4.get() and not checkedEnumAssign(response.status, statusOrd)):
return none(AutonatMsg)
if r5.get(): response.text = some(text)
if r6.get(): response.ma = some(ma)
msg.response = some(response)
return some(msg)

View File

@ -0,0 +1,141 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[options, sets, sequtils]
import stew/results
import chronos, chronicles, stew/objects
import ../../protocol,
../../../switch,
../../../multiaddress,
../../../multicodec,
../../../peerid,
../../../utils/semaphore,
../../../errors
import core
export core
logScope:
topics = "libp2p autonat"
type
Autonat* = ref object of LPProtocol
sem: AsyncSemaphore
switch*: Switch
dialTimeout: Duration
proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} =
let pb = AutonatDial(peerInfo: some(AutonatPeerInfo(
id: some(pid),
addrs: addrs
))).encode()
await conn.writeLp(pb.buffer)
proc sendResponseError(conn: Connection, status: ResponseStatus, text: string = "") {.async.} =
let pb = AutonatDialResponse(
status: status,
text: if text == "": none(string) else: some(text),
ma: none(MultiAddress)
).encode()
await conn.writeLp(pb.buffer)
proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} =
let pb = AutonatDialResponse(
status: ResponseStatus.Ok,
text: some("Ok"),
ma: some(ma)
).encode()
await conn.writeLp(pb.buffer)
proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
try:
await autonat.sem.acquire()
let ma = await autonat.switch.dialer.tryDial(conn.peerId, addrs).wait(autonat.dialTimeout)
if ma.isSome:
await conn.sendResponseOk(ma.get())
else:
await conn.sendResponseError(DialError, "Missing observed address")
except CancelledError as exc:
raise exc
except CatchableError as exc:
await conn.sendResponseError(DialError, exc.msg)
finally:
autonat.sem.release()
proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[void] =
if msg.dial.isNone() or msg.dial.get().peerInfo.isNone():
return conn.sendResponseError(BadRequest, "Missing Peer Info")
let peerInfo = msg.dial.get().peerInfo.get()
if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId:
return conn.sendResponseError(BadRequest, "PeerId mismatch")
if conn.observedAddr.isNone:
return conn.sendResponseError(BadRequest, "Missing observed address")
let observedAddr = conn.observedAddr.get()
var isRelayed = observedAddr.contains(multiCodec("p2p-circuit"))
if isRelayed.isErr() or isRelayed.get():
return conn.sendResponseError(DialRefused, "Refused to dial a relayed observed address")
let hostIp = observedAddr[0]
if hostIp.isErr() or not IP.match(hostIp.get()):
trace "wrong observed address", address=observedAddr
return conn.sendResponseError(InternalError, "Expected an IP address")
var addrs = initHashSet[MultiAddress]()
addrs.incl(observedAddr)
for ma in peerInfo.addrs:
isRelayed = ma.contains(multiCodec("p2p-circuit"))
if isRelayed.isErr() or isRelayed.get():
continue
let maFirst = ma[0]
if maFirst.isErr() or not IP.match(maFirst.get()):
continue
try:
addrs.incl(
if maFirst.get() == hostIp.get():
ma
else:
let maEnd = ma[1..^1]
if maEnd.isErr(): continue
hostIp.get() & maEnd.get()
)
except LPError as exc:
continue
if len(addrs) >= AddressLimit:
break
if len(addrs) == 0:
return conn.sendResponseError(DialRefused, "No dialable address")
return autonat.tryDial(conn, toSeq(addrs))
proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T =
let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout)
proc handleStream(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let msgOpt = AutonatMsg.decode(await conn.readLp(1024))
if msgOpt.isNone() or msgOpt.get().msgType != MsgType.Dial:
raise newException(AutonatError, "Received malformed message")
let msg = msgOpt.get()
await autonat.handleDial(conn, msg)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in autonat handler", exc = exc.msg, conn
finally:
trace "exiting autonat handler", conn
await conn.close()
autonat.handler = handleStream
autonat.codec = AutonatCodec
autonat

View File

@ -14,10 +14,13 @@ else:
import std/[options, deques, sequtils] import std/[options, deques, sequtils]
import chronos, metrics import chronos, metrics
import ../switch import ../../../switch
import ../protocols/[connectivity/autonat] import client
import ../utils/heartbeat import ../../../utils/heartbeat
import ../crypto/crypto import ../../../crypto/crypto
logScope:
topics = "libp2p autonatservice"
declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability confidence", labels = ["reachability"]) declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability confidence", labels = ["reachability"])
@ -28,7 +31,7 @@ type
networkReachability: NetworkReachability networkReachability: NetworkReachability
confidence: Option[float] confidence: Option[float]
answers: Deque[NetworkReachability] answers: Deque[NetworkReachability]
autonat: Autonat autonatClient: AutonatClient
statusAndConfidenceHandler: StatusAndConfidenceHandler statusAndConfidenceHandler: StatusAndConfidenceHandler
rng: ref HmacDrbgContext rng: ref HmacDrbgContext
scheduleInterval: Option[Duration] scheduleInterval: Option[Duration]
@ -45,7 +48,7 @@ type
proc new*( proc new*(
T: typedesc[AutonatService], T: typedesc[AutonatService],
autonat: Autonat, autonatClient: AutonatClient,
rng: ref HmacDrbgContext, rng: ref HmacDrbgContext,
scheduleInterval: Option[Duration] = none(Duration), scheduleInterval: Option[Duration] = none(Duration),
askNewConnectedPeers = true, askNewConnectedPeers = true,
@ -58,7 +61,7 @@ proc new*(
networkReachability: Unknown, networkReachability: Unknown,
confidence: none(float), confidence: none(float),
answers: initDeque[NetworkReachability](), answers: initDeque[NetworkReachability](),
autonat: autonat, autonatClient: autonatClient,
rng: rng, rng: rng,
askNewConnectedPeers: askNewConnectedPeers, askNewConnectedPeers: askNewConnectedPeers,
numPeersToAsk: numPeersToAsk, numPeersToAsk: numPeersToAsk,
@ -94,11 +97,11 @@ proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} =
trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence
proc askPeer(self: AutonatService, s: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} = proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} =
trace "Asking for reachability", peerId = $peerId trace "Asking for reachability", peerId = $peerId
let ans = let ans =
try: try:
discard await self.autonat.dialMe(peerId).wait(self.dialTimeout) discard await self.autonatClient.dialMe(switch, peerId).wait(self.dialTimeout)
Reachable Reachable
except AutonatUnreachableError: except AutonatUnreachableError:
trace "dialMe answer is not reachable", peerId = $peerId trace "dialMe answer is not reachable", peerId = $peerId

View File

@ -1,3 +1,12 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.used.} {.used.}
when (NimMajor, NimMinor) < (1, 4): when (NimMajor, NimMinor) < (1, 4):
@ -6,12 +15,13 @@ else:
{.push raises: [].} {.push raises: [].}
import chronos import chronos
import ../../libp2p/protocols/connectivity/autonat import ../../libp2p/[protocols/connectivity/autonat/client,
import ../../libp2p/peerid peerid,
import ../../libp2p/multiaddress multiaddress,
switch]
type type
AutonatStub* = ref object of Autonat AutonatClientStub* = ref object of AutonatClient
answer*: Answer answer*: Answer
dials: int dials: int
expectedDials: int expectedDials: int
@ -22,11 +32,12 @@ type
NotReachable, NotReachable,
Unknown Unknown
proc new*(T: typedesc[AutonatStub], expectedDials: int): T = proc new*(T: typedesc[AutonatClientStub], expectedDials: int): T =
return T(dials: 0, expectedDials: expectedDials, finished: newFuture[void]()) return T(dials: 0, expectedDials: expectedDials, finished: newFuture[void]())
method dialMe*( method dialMe*(
self: AutonatStub, self: AutonatClientStub,
switch: Switch,
pid: PeerId, pid: PeerId,
addrs: seq[MultiAddress] = newSeq[MultiAddress]()): addrs: seq[MultiAddress] = newSeq[MultiAddress]()):
Future[MultiAddress] {.async.} = Future[MultiAddress] {.async.} =

View File

@ -5,7 +5,8 @@ import
transports/tcptransport, transports/tcptransport,
upgrademngrs/upgrade, upgrademngrs/upgrade,
builders, builders,
protocols/connectivity/autonat protocols/connectivity/autonat/client,
protocols/connectivity/autonat/server,
], ],
./helpers ./helpers
@ -44,7 +45,7 @@ suite "Autonat":
await dst.start() await dst.start()
await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
let ma = await Autonat.new(src).dialMe(dst.peerInfo.peerId, dst.peerInfo.addrs) let ma = await AutonatClient.new().dialMe(src, dst.peerInfo.peerId, dst.peerInfo.addrs)
check ma in src.peerInfo.addrs check ma in src.peerInfo.addrs
await allFutures(src.stop(), dst.stop()) await allFutures(src.stop(), dst.stop())
@ -58,7 +59,7 @@ suite "Autonat":
await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
expect AutonatUnreachableError: expect AutonatUnreachableError:
discard await Autonat.new(src).dialMe(dst.peerInfo.peerId, dst.peerInfo.addrs) discard await AutonatClient.new().dialMe(src, dst.peerInfo.peerId, dst.peerInfo.addrs)
await allFutures(src.stop(), dst.stop()) await allFutures(src.stop(), dst.stop())
asyncTest "Timeout is triggered in autonat handle": asyncTest "Timeout is triggered in autonat handle":

View File

@ -12,10 +12,10 @@ import chronos, metrics
import unittest2 import unittest2
import ../libp2p/[builders, import ../libp2p/[builders,
switch, switch,
services/autonatservice, protocols/connectivity/autonat/client,
protocols/connectivity/autonat] protocols/connectivity/autonat/service]
import ./helpers import ./helpers
import stubs/autonatstub import stubs/autonatclientstub
proc createSwitch(autonatSvc: Service = nil, withAutonat = true): Switch = proc createSwitch(autonatSvc: Service = nil, withAutonat = true): Switch =
var builder = SwitchBuilder.new() var builder = SwitchBuilder.new()
@ -39,10 +39,10 @@ suite "Autonat Service":
asyncTest "Peer must be not reachable": asyncTest "Peer must be not reachable":
let autonatStub = AutonatStub.new(expectedDials = 3) let autonatClientStub = AutonatClientStub.new(expectedDials = 3)
autonatStub.answer = NotReachable autonatClientStub.answer = NotReachable
let autonatService = AutonatService.new(autonatStub, newRng()) let autonatService = AutonatService.new(autonatClientStub, newRng())
let switch1 = createSwitch(autonatService) let switch1 = createSwitch(autonatService)
let switch2 = createSwitch() let switch2 = createSwitch()
@ -60,7 +60,7 @@ suite "Autonat Service":
await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs)
await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs)
await autonatStub.finished await autonatClientStub.finished
check autonatService.networkReachability() == NetworkReachability.NotReachable check autonatService.networkReachability() == NetworkReachability.NotReachable
check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3
@ -70,9 +70,7 @@ suite "Autonat Service":
asyncTest "Peer must be reachable": asyncTest "Peer must be reachable":
let autonat = Autonat.new(switch = nil) let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds))
let autonatService = AutonatService.new(autonat, newRng(), some(1.seconds))
let switch1 = createSwitch(autonatService) let switch1 = createSwitch(autonatService)
let switch2 = createSwitch() let switch2 = createSwitch()
@ -86,7 +84,6 @@ suite "Autonat Service":
if not awaiter.finished: if not awaiter.finished:
awaiter.complete() awaiter.complete()
autonat.switch = switch1
check autonatService.networkReachability() == NetworkReachability.Unknown check autonatService.networkReachability() == NetworkReachability.Unknown
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
@ -110,10 +107,10 @@ suite "Autonat Service":
asyncTest "Peer must be not reachable and then reachable": asyncTest "Peer must be not reachable and then reachable":
let autonatStub = AutonatStub.new(expectedDials = 6) let autonatClientStub = AutonatClientStub.new(expectedDials = 6)
autonatStub.answer = NotReachable autonatClientStub.answer = NotReachable
let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds)) let autonatService = AutonatService.new(autonatClientStub, newRng(), some(1.seconds))
let switch1 = createSwitch(autonatService) let switch1 = createSwitch(autonatService)
let switch2 = createSwitch() let switch2 = createSwitch()
@ -125,7 +122,7 @@ suite "Autonat Service":
proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} =
if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3:
if not awaiter.finished: if not awaiter.finished:
autonatStub.answer = Reachable autonatClientStub.answer = Reachable
awaiter.complete() awaiter.complete()
check autonatService.networkReachability() == NetworkReachability.Unknown check autonatService.networkReachability() == NetworkReachability.Unknown
@ -146,7 +143,7 @@ suite "Autonat Service":
check autonatService.networkReachability() == NetworkReachability.NotReachable check autonatService.networkReachability() == NetworkReachability.NotReachable
check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3
await autonatStub.finished await autonatClientStub.finished
check autonatService.networkReachability() == NetworkReachability.Reachable check autonatService.networkReachability() == NetworkReachability.Reachable
check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3 check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3
@ -154,9 +151,8 @@ suite "Autonat Service":
await allFuturesThrowing(switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) await allFuturesThrowing(switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop())
asyncTest "Peer must be reachable when one connected peer has autonat disabled": asyncTest "Peer must be reachable when one connected peer has autonat disabled":
let autonat = Autonat.new(switch = nil)
let autonatService = AutonatService.new(autonat, newRng(), some(1.seconds), maxQueueSize = 2) let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 2)
let switch1 = createSwitch(autonatService) let switch1 = createSwitch(autonatService)
let switch2 = createSwitch(withAutonat = false) let switch2 = createSwitch(withAutonat = false)
@ -170,7 +166,6 @@ suite "Autonat Service":
if not awaiter.finished: if not awaiter.finished:
awaiter.complete() awaiter.complete()
autonat.switch = switch1
check autonatService.networkReachability() == NetworkReachability.Unknown check autonatService.networkReachability() == NetworkReachability.Unknown
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
@ -194,10 +189,10 @@ suite "Autonat Service":
asyncTest "Unknown answers must be ignored": asyncTest "Unknown answers must be ignored":
let autonatStub = AutonatStub.new(expectedDials = 6) let autonatClientStub = AutonatClientStub.new(expectedDials = 6)
autonatStub.answer = NotReachable autonatClientStub.answer = NotReachable
let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds), maxQueueSize = 3) let autonatService = AutonatService.new(autonatClientStub, newRng(), some(1.seconds), maxQueueSize = 3)
let switch1 = createSwitch(autonatService) let switch1 = createSwitch(autonatService)
let switch2 = createSwitch() let switch2 = createSwitch()
@ -209,7 +204,7 @@ suite "Autonat Service":
proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} =
if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3:
if not awaiter.finished: if not awaiter.finished:
autonatStub.answer = Unknown autonatClientStub.answer = Unknown
awaiter.complete() awaiter.complete()
check autonatService.networkReachability() == NetworkReachability.Unknown check autonatService.networkReachability() == NetworkReachability.Unknown
@ -230,7 +225,7 @@ suite "Autonat Service":
check autonatService.networkReachability() == NetworkReachability.NotReachable check autonatService.networkReachability() == NetworkReachability.NotReachable
check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3 check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3
await autonatStub.finished await autonatClientStub.finished
check autonatService.networkReachability() == NetworkReachability.NotReachable check autonatService.networkReachability() == NetworkReachability.NotReachable
check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3 check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3
@ -240,7 +235,7 @@ suite "Autonat Service":
asyncTest "Calling setup and stop twice must work": asyncTest "Calling setup and stop twice must work":
let switch = createSwitch() let switch = createSwitch()
let autonatService = AutonatService.new(AutonatStub.new(expectedDials = 0), newRng(), some(1.seconds)) let autonatService = AutonatService.new(AutonatClientStub.new(expectedDials = 0), newRng(), some(1.seconds))
check (await autonatService.setup(switch)) == true check (await autonatService.setup(switch)) == true
check (await autonatService.setup(switch)) == false check (await autonatService.setup(switch)) == false