Autonat service (#814)

Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
diegomrsantos 2022-12-16 12:32:00 +01:00 committed by GitHub
parent fe7a69e389
commit 67ef25fae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 437 additions and 24 deletions

View File

@ -61,6 +61,7 @@ type
autonat: bool
circuitRelay: Relay
rdv: RendezVous
services: seq[Service]
proc new*(T: type[SwitchBuilder]): T {.public.} =
## Creates a SwitchBuilder
@ -199,6 +200,10 @@ proc withRendezVous*(b: SwitchBuilder, rdv: RendezVous = RendezVous.new()): Swit
b.rdv = rdv
b
proc withServices*(b: SwitchBuilder, services: seq[Service]): SwitchBuilder =
b.services = services
b
proc build*(b: SwitchBuilder): Switch
{.raises: [Defect, LPError], public.} =
@ -254,7 +259,8 @@ proc build*(b: SwitchBuilder): Switch
connManager = connManager,
ms = ms,
nameResolver = b.nameResolver,
peerStore = peerStore)
peerStore = peerStore,
services = b.services)
if b.autonat:
let autonat = Autonat.new(switch)

View File

@ -110,6 +110,13 @@ proc new*(C: type ConnManager,
proc connCount*(c: ConnManager, peerId: PeerId): int =
c.conns.getOrDefault(peerId).len
proc connectedPeers*(c: ConnManager, dir: Direction): seq[PeerId] =
var peers = newSeq[PeerId]()
for peerId, conns in c.conns:
if conns.anyIt(it.dir == dir):
peers.add(peerId)
return peers
proc addConnEventHandler*(c: ConnManager,
handler: ConnEventHandler,
kind: ConnEventKind) =
@ -537,3 +544,4 @@ proc close*(c: ConnManager) {.async.} =
await conn.close()
trace "Closed ConnManager"

View File

@ -32,6 +32,7 @@ const
type
AutonatError* = object of LPError
AutonatUnreachableError* = object of LPError
MsgType* = enum
Dial = 0
@ -203,25 +204,37 @@ type
sem: AsyncSemaphore
switch*: Switch
proc dialMe*(a: Autonat, pid: PeerId, ma: MultiAddress|seq[MultiAddress]):
Future[MultiAddress] {.async.} =
let addrs = when ma is MultiAddress: @[ma] else: ma
let conn = await a.switch.dial(pid, addrs, AutonatCodec)
method dialMe*(a: Autonat, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()):
Future[MultiAddress] {.base, async.} =
proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} =
if autonatMsg.isNone() or
autonatMsg.get().msgType != DialResponse or
autonatMsg.get().response.isNone() or
autonatMsg.get().response.get().ma.isNone():
raise newException(AutonatError, "Unexpected response")
else:
autonatMsg.get().response.get()
let conn =
try:
if addrs.len == 0:
await a.switch.dial(pid, @[AutonatCodec])
else:
await a.switch.dial(pid, addrs, AutonatCodec)
except CatchableError as err:
raise newException(AutonatError, "Unexpected error when dialling", err)
defer: await conn.close()
await conn.sendDial(a.switch.peerInfo.peerId, a.switch.peerInfo.addrs)
let msgOpt = AutonatMsg.decode(await conn.readLp(1024))
if msgOpt.isNone() or
msgOpt.get().msgType != DialResponse or
msgOpt.get().response.isNone():
raise newException(AutonatError, "Unexpected response")
let response = msgOpt.get().response.get()
if response.status != ResponseStatus.Ok:
raise newException(AutonatError, "Bad status " &
$response.status & " " &
response.text.get(""))
if response.ma.isNone():
raise newException(AutonatError, "Missing address")
return response.ma.get()
let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024)))
return case response.status:
of ResponseStatus.Ok:
response.ma.get()
of ResponseStatus.DialError:
raise newException(AutonatUnreachableError, "Peer could not dial us back")
else:
raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get(""))
proc tryDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
try:

View File

@ -25,6 +25,7 @@ import ./relay,
../../../multiaddress,
../../../stream/connection
export options
logScope:
topics = "libp2p relay relay-client"

View File

@ -0,0 +1,157 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[options, deques, sequtils]
import chronos, metrics
import ../switch
import ../protocols/[connectivity/autonat]
import ../utils/heartbeat
import ../crypto/crypto
declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability confidence", labels = ["reachability"])
type
AutonatService* = ref object of Service
newConnectedPeerHandler: PeerEventHandler
scheduleHandle: Future[void]
networkReachability: NetworkReachability
confidence: Option[float]
answers: Deque[NetworkReachability]
autonat: Autonat
statusAndConfidenceHandler: StatusAndConfidenceHandler
rng: ref HmacDrbgContext
scheduleInterval: Option[Duration]
askNewConnectedPeers: bool
numPeersToAsk: int
maxQueueSize: int
minConfidence: float
dialTimeout: Duration
NetworkReachability* {.pure.} = enum
NotReachable, Reachable, Unknown
StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].}
proc new*(
T: typedesc[AutonatService],
autonat: Autonat,
rng: ref HmacDrbgContext,
scheduleInterval: Option[Duration] = none(Duration),
askNewConnectedPeers = true,
numPeersToAsk: int = 5,
maxQueueSize: int = 10,
minConfidence: float = 0.3,
dialTimeout = 5.seconds): T =
return T(
scheduleInterval: scheduleInterval,
networkReachability: Unknown,
confidence: none(float),
answers: initDeque[NetworkReachability](),
autonat: autonat,
rng: rng,
askNewConnectedPeers: askNewConnectedPeers,
numPeersToAsk: numPeersToAsk,
maxQueueSize: maxQueueSize,
minConfidence: minConfidence,
dialTimeout: dialTimeout)
proc networkReachability*(self: AutonatService): NetworkReachability {.inline.} =
return self.networkReachability
proc callHandler(self: AutonatService) {.async.} =
if not isNil(self.statusAndConfidenceHandler):
await self.statusAndConfidenceHandler(self.networkReachability, self.confidence)
proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} =
if self.answers.len == self.maxQueueSize:
self.answers.popFirst()
self.answers.addLast(ans)
self.networkReachability = Unknown
self.confidence = none(float)
const reachabilityPriority = [Reachable, NotReachable]
for reachability in reachabilityPriority:
let confidence = self.answers.countIt(it == reachability) / self.maxQueueSize
libp2p_autonat_reachability_confidence.set(value = confidence, labelValues = [$reachability])
if self.confidence.isNone and confidence >= self.minConfidence:
self.networkReachability = reachability
self.confidence = some(confidence)
trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence
proc askPeer(self: AutonatService, s: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} =
trace "Asking for reachability", peerId = $peerId
let ans =
try:
discard await self.autonat.dialMe(peerId).wait(self.dialTimeout)
Reachable
except AutonatUnreachableError:
trace "dialMe answer is not reachable", peerId = $peerId
NotReachable
except AsyncTimeoutError:
trace "dialMe timed out", peerId = $peerId
Unknown
except CatchableError as err:
trace "dialMe unexpected error", peerId = $peerId, errMsg = $err.msg
Unknown
await self.handleAnswer(ans)
if not isNil(self.statusAndConfidenceHandler):
await self.statusAndConfidenceHandler(self.networkReachability, self.confidence)
return ans
proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} =
var peers = switch.connectedPeers(Direction.Out)
self.rng.shuffle(peers)
var answersFromPeers = 0
for peer in peers:
if answersFromPeers >= self.numPeersToAsk:
break
elif (await askPeer(self, switch, peer)) != Unknown:
answersFromPeers.inc()
proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} =
heartbeat "Schedule AutonatService run", interval:
await service.run(switch)
method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
let hasBeenSetup = await procCall Service(self).setup(switch)
if hasBeenSetup:
if self.askNewConnectedPeers:
self.newConnectedPeerHandler = proc (peerId: PeerId, event: PeerEvent): Future[void] {.async.} =
discard askPeer(self, switch, peerId)
await self.callHandler()
switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
if self.scheduleInterval.isSome():
self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get())
return hasBeenSetup
method run*(self: AutonatService, switch: Switch) {.async, public.} =
await askConnectedPeers(self, switch)
await self.callHandler()
method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} =
let hasBeenStopped = await procCall Service(self).stop(switch)
if hasBeenStopped:
if not isNil(self.scheduleHandle):
self.scheduleHandle.cancel()
self.scheduleHandle = nil
if not isNil(self.newConnectedPeerHandler):
switch.connManager.removePeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
return hasBeenStopped
proc statusAndConfidenceHandler*(self: AutonatService, statusAndConfidenceHandler: StatusAndConfidenceHandler) =
self.statusAndConfidenceHandler = statusAndConfidenceHandler

View File

@ -74,6 +74,28 @@ type
peerStore*: PeerStore
nameResolver*: NameResolver
started: bool
services*: seq[Service]
Service* = ref object of RootObj
inUse: bool
method setup*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} =
if self.inUse:
warn "service setup has already been called"
return false
self.inUse = true
return true
method run*(self: Service, switch: Switch) {.base, async, gcsafe.} =
doAssert(false, "Not implemented!")
method stop*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} =
if not self.inUse:
warn "service is already stopped"
return false
self.inUse = false
return true
proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler,
@ -108,6 +130,9 @@ method addTransport*(s: Switch, t: Transport) =
s.transports &= t
s.dialer.addTransport(t)
proc connectedPeers*(s: Switch, dir: Direction): seq[PeerId] =
s.connManager.connectedPeers(dir)
proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} =
## returns true if the peer has one or more
## associated connections
@ -294,6 +319,9 @@ proc stop*(s: Switch) {.async, public.} =
if not a.finished:
a.cancel()
for service in s.services:
discard await service.stop(s)
await s.ms.stop()
trace "Switch stopped"
@ -335,6 +363,9 @@ proc start*(s: Switch) {.async, gcsafe, public.} =
await s.ms.start()
for service in s.services:
discard await service.setup(s)
s.started = true
debug "Started libp2p node", peer = s.peerInfo
@ -346,7 +377,8 @@ proc newSwitch*(peerInfo: PeerInfo,
connManager: ConnManager,
ms: MultistreamSelect,
nameResolver: NameResolver = nil,
peerStore = PeerStore.new()): Switch
peerStore = PeerStore.new(),
services = newSeq[Service]()): Switch
{.raises: [Defect, LPError], public.} =
if secureManagers.len == 0:
raise newException(LPError, "Provide at least one secure manager")
@ -358,8 +390,10 @@ proc newSwitch*(peerInfo: PeerInfo,
connManager: connManager,
peerStore: peerStore,
dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms, nameResolver),
nameResolver: nameResolver)
nameResolver: nameResolver,
services: services)
switch.connManager.peerStore = peerStore
switch.mount(identity)
return switch

View File

@ -1,3 +1,5 @@
import ../config.nims
--threads:on
--d:metrics
--d:withoutPCRE

View File

@ -0,0 +1,36 @@
{.used.}
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import chronos
import ../../libp2p/protocols/connectivity/autonat
import ../../libp2p/peerid
import ../../libp2p/multiaddress
type
AutonatStub* = ref object of Autonat
returnSuccess*: bool
dials: int
expectedDials: int
finished*: Future[void]
proc new*(T: typedesc[AutonatStub], expectedDials: int): T =
return T(dials: 0, expectedDials: expectedDials, finished: newFuture[void]())
method dialMe*(
self: AutonatStub,
pid: PeerId,
addrs: seq[MultiAddress] = newSeq[MultiAddress]()):
Future[MultiAddress] {.async.} =
self.dials += 1
if self.dials == self.expectedDials:
self.finished.complete()
if self.returnSuccess:
return MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
else:
raise newException(AutonatUnreachableError, "")

View File

@ -7,7 +7,7 @@ else:
import tables
import chronos, stew/[byteutils, endians2, shims/net]
import ../libp2p/[stream/connection,
import ../../libp2p/[stream/connection,
protocols/connectivity/relay/utils,
transports/tcptransport,
transports/tortransport,

View File

@ -0,0 +1,155 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/options
import chronos, metrics
import unittest2
import ../libp2p/[builders,
switch,
services/autonatservice]
import ./helpers
import stubs/autonatstub
proc createSwitch(autonatSvc: Service = nil): Switch =
var builder = SwitchBuilder.new()
.withRng(newRng())
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
.withTcpTransport()
.withMplex()
.withAutonat()
.withNoise()
if autonatSvc != nil:
builder = builder.withServices(@[autonatSvc])
return builder.build()
suite "Autonat Service":
teardown:
checkTrackers()
asyncTest "Autonat Service Private Reachability test":
let autonatStub = AutonatStub.new(expectedDials = 3)
autonatStub.returnSuccess = false
let autonatService = AutonatService.new(autonatStub, newRng())
let switch1 = createSwitch(autonatService)
let switch2 = createSwitch()
let switch3 = createSwitch()
let switch4 = createSwitch()
check autonatService.networkReachability() == NetworkReachability.Unknown
await switch1.start()
await switch2.start()
await switch3.start()
await switch4.start()
await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs)
await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs)
await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs)
await autonatStub.finished
check autonatService.networkReachability() == NetworkReachability.NotReachable
check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3
await allFuturesThrowing(
switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop())
asyncTest "Autonat Service Public Reachability test":
let autonatStub = AutonatStub.new(expectedDials = 3)
autonatStub.returnSuccess = true
let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds))
let switch1 = createSwitch(autonatService)
let switch2 = createSwitch()
let switch3 = createSwitch()
let switch4 = createSwitch()
check autonatService.networkReachability() == NetworkReachability.Unknown
await switch1.start()
await switch2.start()
await switch3.start()
await switch4.start()
await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs)
await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs)
await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs)
await autonatStub.finished
check autonatService.networkReachability() == NetworkReachability.Reachable
check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3
await allFuturesThrowing(
switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop())
asyncTest "Autonat Service Full Reachability test":
let autonatStub = AutonatStub.new(expectedDials = 6)
autonatStub.returnSuccess = false
let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds))
let switch1 = createSwitch(autonatService)
let switch2 = createSwitch()
let switch3 = createSwitch()
let switch4 = createSwitch()
let awaiter = newFuture[void]()
proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} =
if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3:
if not awaiter.finished:
autonatStub.returnSuccess = true
awaiter.complete()
check autonatService.networkReachability() == NetworkReachability.Unknown
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
await switch1.start()
await switch2.start()
await switch3.start()
await switch4.start()
await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs)
await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs)
await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs)
await awaiter
check autonatService.networkReachability() == NetworkReachability.NotReachable
check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3
await autonatStub.finished
check autonatService.networkReachability() == NetworkReachability.Reachable
check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3
await allFuturesThrowing(switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop())
asyncTest "Autonat Service setup and stop twice":
let switch = createSwitch()
let autonatService = AutonatService.new(AutonatStub.new(expectedDials = 0), newRng(), some(1.seconds))
check (await autonatService.setup(switch)) == true
check (await autonatService.setup(switch)) == false
check (await autonatService.stop(switch)) == true
check (await autonatService.stop(switch)) == false
await allFuturesThrowing(switch.stop())

View File

@ -41,4 +41,5 @@ import testtcptransport,
testrendezvous,
testdiscovery,
testyamux,
testautonat
testautonat,
testautonatservice

View File

@ -14,7 +14,7 @@ import ../libp2p/[stream/connection,
multiaddress,
builders]
import ./helpers, ./stubs, ./commontransport
import ./helpers, ./stubs/torstub, ./commontransport
const torServer = initTAddress("127.0.0.1", 9050.Port)
var stub: TorServerStub