Create an ObservedAddrManager and add an AddressMapper in AutonatService and AutoRelayService (#871)

Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
diegomrsantos 2023-03-24 16:42:49 +01:00 committed by GitHub
parent bac754e2ad
commit af5299f26c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 261 additions and 28 deletions

View File

@ -1,6 +1,6 @@
bearssl;https://github.com/status-im/nim-bearssl@#acf9645e328bdcab481cfda1c158e07ecd46bd7b
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#5d3da66e563d21277b57a9b601744273c083a01b
chronos;https://github.com/status-im/nim-chronos@#f7835a192b45c37e97614d865141f21eea8c156e
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823
faststreams;https://github.com/status-im/nim-faststreams@#814f8927e1f356f39219f37f069b83066bcc893a
httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f

View File

@ -0,0 +1,98 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[sequtils, tables],
chronos, chronicles,
multiaddress, multicodec
type
## Manages observed MultiAddresses by reomte peers. It keeps track of the most observed IP and IP/Port.
ObservedAddrManager* = ref object of RootObj
observedIPsAndPorts: seq[MultiAddress]
maxSize: int
minCount: int
proc addObservation*(self:ObservedAddrManager, observedAddr: MultiAddress): bool =
## Adds a new observed MultiAddress. If the number of observations exceeds maxSize, the oldest one is removed.
if self.observedIPsAndPorts.len >= self.maxSize:
self.observedIPsAndPorts.del(0)
self.observedIPsAndPorts.add(observedAddr)
return true
proc getProtocol(self: ObservedAddrManager, observations: seq[MultiAddress], multiCodec: MultiCodec): Opt[MultiAddress] =
var countTable = toCountTable(observations)
countTable.sort()
var orderedPairs = toSeq(countTable.pairs)
for (ma, count) in orderedPairs:
let maFirst = ma[0].get()
if maFirst.protoCode.get() == multiCodec and count >= self.minCount:
return Opt.some(ma)
return Opt.none(MultiAddress)
proc getMostObservedProtocol(self: ObservedAddrManager, multiCodec: MultiCodec): Opt[MultiAddress] =
## Returns the most observed IP address or none if the number of observations are less than minCount.
let observedIPs = self.observedIPsAndPorts.mapIt(it[0].get())
return self.getProtocol(observedIPs, multiCodec)
proc getMostObservedProtoAndPort(self: ObservedAddrManager, multiCodec: MultiCodec): Opt[MultiAddress] =
## Returns the most observed IP/Port address or none if the number of observations are less than minCount.
return self.getProtocol(self.observedIPsAndPorts, multiCodec)
proc getMostObservedProtosAndPorts*(self: ObservedAddrManager): seq[MultiAddress] =
## Returns the most observed IP4/Port and IP6/Port address or an empty seq if the number of observations
## are less than minCount.
var res: seq[MultiAddress]
let ip4 = self.getMostObservedProtoAndPort(multiCodec("ip4"))
if ip4.isSome():
res.add(ip4.get())
let ip6 = self.getMostObservedProtoAndPort(multiCodec("ip6"))
if ip6.isSome():
res.add(ip6.get())
return res
proc guessDialableAddr*(
self: ObservedAddrManager,
ma: MultiAddress): MultiAddress =
## Replaces the first proto valeu of each listen address by the corresponding (matching the proto code) most observed value.
## If the most observed value is not available, the original MultiAddress is returned.
try:
let maFirst = ma[0]
let maRest = ma[1..^1]
if maRest.isErr():
return ma
let observedIP = self.getMostObservedProtocol(maFirst.get().protoCode().get())
return
if observedIP.isNone() or maFirst.get() == observedIP.get():
ma
else:
observedIP.get() & maRest.get()
except CatchableError as error:
debug "Error while handling manual port forwarding", msg = error.msg
return ma
proc `$`*(self: ObservedAddrManager): string =
## Returns a string representation of the ObservedAddrManager.
return "IPs and Ports: " & $self.observedIPsAndPorts
proc new*(
T: typedesc[ObservedAddrManager],
maxSize = 10,
minCount = 3): T =
## Creates a new ObservedAddrManager.
return T(
observedIPsAndPorts: newSeq[MultiAddress](),
maxSize: maxSize,
minCount: minCount)

View File

@ -28,7 +28,7 @@ else:
import
std/[tables, sets, options, macros],
chronos,
chronos, chronicles,
./crypto/crypto,
./protocols/identify,
./protocols/protocol,
@ -220,3 +220,6 @@ proc identify*(
peerStore.updatePeerInfo(info)
finally:
await stream.closeWithEOF()
proc guessDialableAddr*(self: PeerStore, ma: MultiAddress): MultiAddress =
return self.identify.observedAddrManager.guessDialableAddr(ma)

View File

@ -15,6 +15,7 @@ else:
import std/[options, deques, sequtils]
import chronos, metrics
import ../../../switch
import ../../../wire
import client
import ../../../utils/heartbeat
import ../../../crypto/crypto
@ -27,6 +28,7 @@ declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability
type
AutonatService* = ref object of Service
newConnectedPeerHandler: PeerEventHandler
addressMapper: AddressMapper
scheduleHandle: Future[void]
networkReachability: NetworkReachability
confidence: Option[float]
@ -40,6 +42,7 @@ type
maxQueueSize: int
minConfidence: float
dialTimeout: Duration
enableAddressMapper: bool
NetworkReachability* {.pure.} = enum
NotReachable, Reachable, Unknown
@ -55,7 +58,8 @@ proc new*(
numPeersToAsk: int = 5,
maxQueueSize: int = 10,
minConfidence: float = 0.3,
dialTimeout = 30.seconds): T =
dialTimeout = 30.seconds,
enableAddressMapper = true): T =
return T(
scheduleInterval: scheduleInterval,
networkReachability: Unknown,
@ -67,7 +71,8 @@ proc new*(
numPeersToAsk: numPeersToAsk,
maxQueueSize: maxQueueSize,
minConfidence: minConfidence,
dialTimeout: dialTimeout)
dialTimeout: dialTimeout,
enableAddressMapper: enableAddressMapper)
proc networkReachability*(self: AutonatService): NetworkReachability {.inline.} =
return self.networkReachability
@ -133,6 +138,7 @@ proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[Netwo
await self.handleAnswer(ans)
if not isNil(self.statusAndConfidenceHandler):
await self.statusAndConfidenceHandler(self.networkReachability, self.confidence)
await switch.peerInfo.update()
return ans
proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} =
@ -153,7 +159,30 @@ proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.asy
heartbeat "Scheduling AutonatService run", interval:
await service.run(switch)
proc addressMapper(
self: AutonatService,
peerStore: PeerStore,
listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =
if self.networkReachability != NetworkReachability.Reachable:
return listenAddrs
var addrs = newSeq[MultiAddress]()
for listenAddr in listenAddrs:
var processedMA = listenAddr
try:
let hostIP = initTAddress(listenAddr).get()
if not hostIP.isGlobal() and self.networkReachability == NetworkReachability.Reachable:
processedMA = peerStore.guessDialableAddr(listenAddr) # handle manual port forwarding
except CatchableError as exc:
debug "Error while handling address mapper", msg = exc.msg
addrs.add(processedMA)
return addrs
method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
self.addressMapper = proc (listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =
return await addressMapper(self, switch.peerStore, listenAddrs)
info "Setting up AutonatService"
let hasBeenSetup = await procCall Service(self).setup(switch)
if hasBeenSetup:
@ -163,6 +192,8 @@ method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
if self.scheduleInterval.isSome():
self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get())
if self.enableAddressMapper:
switch.peerInfo.addressMappers.add(self.addressMapper)
return hasBeenSetup
method run*(self: AutonatService, switch: Switch) {.async, public.} =
@ -170,7 +201,6 @@ method run*(self: AutonatService, switch: Switch) {.async, public.} =
await askConnectedPeers(self, switch)
await self.callHandler()
method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} =
info "Stopping AutonatService"
let hasBeenStopped = await procCall Service(self).stop(switch)
@ -180,6 +210,9 @@ method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public
self.scheduleHandle = nil
if not isNil(self.newConnectedPeerHandler):
switch.connManager.removePeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
if self.enableAddressMapper:
switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper)
await switch.peerInfo.update()
return hasBeenStopped
proc statusAndConfidenceHandler*(self: AutonatService, statusAndConfidenceHandler: StatusAndConfidenceHandler) =

View File

@ -26,7 +26,10 @@ import ../protobuf/minprotobuf,
../multiaddress,
../protocols/protocol,
../utility,
../errors
../errors,
../observedaddrmanager
export observedaddrmanager
logScope:
topics = "libp2p identify"
@ -56,6 +59,7 @@ type
Identify* = ref object of LPProtocol
peerInfo*: PeerInfo
sendSignedPeerRecord*: bool
observedAddrManager*: ObservedAddrManager
IdentifyPushHandler* = proc (
peer: PeerId,
@ -160,7 +164,8 @@ proc new*(
): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord
sendSignedPeerRecord: sendSignedPeerRecord,
observedAddrManager: ObservedAddrManager.new(),
)
identify.init()
identify
@ -182,7 +187,7 @@ method init*(p: Identify) =
p.handler = handle
p.codec = IdentifyCodec
proc identify*(p: Identify,
proc identify*(self: Identify,
conn: Connection,
remotePeerId: PeerId): Future[IdentifyInfo] {.async, gcsafe.} =
trace "initiating identify", conn
@ -194,23 +199,25 @@ proc identify*(p: Identify,
let infoOpt = decodeMsg(message)
if infoOpt.isNone():
raise newException(IdentityInvalidMsgError, "Incorrect message received!")
result = infoOpt.get()
if result.pubkey.isSome:
let peer = PeerId.init(result.pubkey.get())
if peer.isErr:
raise newException(IdentityInvalidMsgError, $peer.error)
else:
result.peerId = peer.get()
if peer.get() != remotePeerId:
trace "Peer ids don't match",
remote = peer,
local = remotePeerId
raise newException(IdentityNoMatchError, "Peer ids don't match")
else:
var info = infoOpt.get()
if info.pubkey.isNone():
raise newException(IdentityInvalidMsgError, "No pubkey in identify")
let peer = PeerId.init(info.pubkey.get())
if peer.isErr:
raise newException(IdentityInvalidMsgError, $peer.error)
if peer.get() != remotePeerId:
trace "Peer ids don't match", remote = peer, local = remotePeerId
raise newException(IdentityNoMatchError, "Peer ids don't match")
info.peerId = peer.get()
if info.observedAddr.isSome:
if not self.observedAddrManager.addObservation(info.observedAddr.get()):
debug "Observed address is not valid", observedAddr = info.observedAddr.get()
return info
proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} =
## Create a IdentifyPush protocol. `handler` will be called every time
## a peer sends us new `PeerInfo`

View File

@ -32,9 +32,15 @@ type
backingOff: seq[PeerId]
peerAvailable: AsyncEvent
onReservation: OnReservationHandler
addressMapper: AddressMapper
rng: ref HmacDrbgContext
proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId) {.async.} =
proc addressMapper(
self: AutoRelayService,
listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =
return concat(toSeq(self.relayAddresses.values))
proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, switch: Switch) {.async.} =
while self.running:
let
rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5))
@ -46,11 +52,15 @@ proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId)
break
if relayPid notin self.relayAddresses or self.relayAddresses[relayPid] != relayedAddr:
self.relayAddresses[relayPid] = relayedAddr
await switch.peerInfo.update()
if not self.onReservation.isNil():
self.onReservation(concat(toSeq(self.relayAddresses.values)))
await sleepAsync chronos.seconds(ttl - 30)
method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} =
self.addressMapper = proc (listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =
return await addressMapper(self, listenAddrs)
let hasBeenSetUp = await procCall Service(self).setup(switch)
if hasBeenSetUp:
proc handlePeerJoined(peerId: PeerId, event: PeerEvent) {.async.} =
@ -63,6 +73,7 @@ method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcs
future[].cancel()
switch.addPeerEventHandler(handlePeerJoined, Joined)
switch.addPeerEventHandler(handlePeerLeft, Left)
switch.peerInfo.addressMappers.add(self.addressMapper)
await self.run(switch)
return hasBeenSetUp
@ -96,7 +107,7 @@ proc innerRun(self: AutoRelayService, switch: Switch) {.async, gcsafe.} =
for relayPid in connectedPeers:
if self.relayPeers.len() >= self.numRelays:
break
self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId)
self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch)
if self.relayPeers.len() > 0:
await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait()
@ -116,6 +127,8 @@ method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsa
if hasBeenStopped:
self.running = false
self.runner.cancel()
switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper)
await switch.peerInfo.update()
return hasBeenStopped
proc getAddresses*(self: AutoRelayService): seq[MultiAddress] =

View File

@ -1,6 +1,6 @@
import unittest2
import unittest2, chronos
export unittest2
export unittest2, chronos
template asyncTeardown*(body: untyped): untyped =
teardown:

View File

@ -7,7 +7,7 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/options
import std/[options, sequtils]
import chronos, metrics
import unittest2
import ../libp2p/[builders,
@ -104,9 +104,13 @@ suite "Autonat Service":
check autonatService.networkReachability() == NetworkReachability.Reachable
check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3
check switch1.peerInfo.addrs == switch1.peerInfo.listenAddrs.mapIt(switch1.peerStore.guessDialableAddr(it))
await allFuturesThrowing(
switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop())
check switch1.peerInfo.addrs == switch1.peerInfo.listenAddrs
asyncTest "Peer must be not reachable and then reachable":
let autonatClientStub = AutonatClientStub.new(expectedDials = 6)

View File

@ -72,10 +72,13 @@ suite "Autorelay":
await fut.wait(1.seconds)
let addresses = autorelay.getAddresses()
check:
addresses[0] == buildRelayMA(switchRelay, switchClient)
addresses == @[buildRelayMA(switchRelay, switchClient)]
addresses.len() == 1
addresses == switchClient.peerInfo.addrs
await allFutures(switchClient.stop(), switchRelay.stop())
check addresses != switchClient.peerInfo.addrs
asyncTest "Three relays connections":
var state = 0
let

View File

@ -405,5 +405,12 @@ suite "MultiAddress test suite":
let maWithTcp = MultiAddress.init(onionMAWithTcpStr).get()
check $(maWithTcp[multiCodec("onion3")].tryGet()) == onionMAStr
test "matchPartial":
const
tcp = mapEq("tcp")
let ma = MultiAddress.init("/ip4/0.0.0.0/tcp/0").get()
check not tcp.matchPartial(ma)
check IP4.matchPartial(ma)

View File

@ -28,6 +28,7 @@ import testtcptransport,
testmultistream,
testbufferstream,
testidentify,
testobservedaddrmanager,
testconnmngr,
testswitch,
testnoise,

View File

@ -0,0 +1,64 @@
import unittest2,
../libp2p/multiaddress,
../libp2p/observedaddrmanager,
./helpers
suite "ObservedAddrManager":
teardown:
checkTrackers()
asyncTest "Calculate the most oberserved IP correctly":
let observedAddrManager = ObservedAddrManager.new(minCount = 3)
# Calculate the most oberserved IP4 correctly
let mostObservedIP4AndPort = MultiAddress.init("/ip4/1.2.3.0/tcp/1").get()
let maIP4 = MultiAddress.init("/ip4/0.0.0.0/tcp/80").get()
check:
observedAddrManager.addObservation(mostObservedIP4AndPort)
observedAddrManager.addObservation(mostObservedIP4AndPort)
observedAddrManager.guessDialableAddr(maIP4) == maIP4
observedAddrManager.addObservation(MultiAddress.init("/ip4/1.2.3.0/tcp/2").get())
observedAddrManager.addObservation(MultiAddress.init("/ip4/1.2.3.1/tcp/1").get())
observedAddrManager.guessDialableAddr(maIP4) == MultiAddress.init("/ip4/1.2.3.0/tcp/80").get()
observedAddrManager.getMostObservedProtosAndPorts().len == 0
observedAddrManager.addObservation(mostObservedIP4AndPort)
observedAddrManager.getMostObservedProtosAndPorts() == @[mostObservedIP4AndPort]
# Calculate the most oberserved IP6 correctly
let mostObservedIP6AndPort = MultiAddress.init("/ip6/::2/tcp/1").get()
let maIP6 = MultiAddress.init("/ip6/::1/tcp/80").get()
check:
observedAddrManager.addObservation(mostObservedIP6AndPort)
observedAddrManager.addObservation(mostObservedIP6AndPort)
observedAddrManager.guessDialableAddr(maIP6) == maIP6
observedAddrManager.addObservation(MultiAddress.init("/ip6/::2/tcp/2").get())
observedAddrManager.addObservation(MultiAddress.init("/ip6/::3/tcp/1").get())
observedAddrManager.guessDialableAddr(maIP6) == MultiAddress.init("/ip6/::2/tcp/80").get()
observedAddrManager.getMostObservedProtosAndPorts().len == 1
observedAddrManager.addObservation(mostObservedIP6AndPort)
observedAddrManager.getMostObservedProtosAndPorts() == @[mostObservedIP4AndPort, mostObservedIP6AndPort]
asyncTest "replace first proto value by most observed when there is only one protocol":
let observedAddrManager = ObservedAddrManager.new(minCount = 3)
let mostObservedIP4AndPort = MultiAddress.init("/ip4/1.2.3.4/tcp/1").get()
check:
observedAddrManager.addObservation(mostObservedIP4AndPort)
observedAddrManager.addObservation(mostObservedIP4AndPort)
observedAddrManager.addObservation(mostObservedIP4AndPort)
observedAddrManager.guessDialableAddr(
MultiAddress.init("/ip4/0.0.0.0").get()) == MultiAddress.init("/ip4/1.2.3.4").get()