mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-06-27 21:09:28 +00:00
Announce address when peer info is updated
This commit is contained in:
parent
564398bc62
commit
48d81dcc0e
@ -8,7 +8,7 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, net, os]
|
||||
import std/[options, net, os, sequtils]
|
||||
import results
|
||||
|
||||
import pkg/chronos
|
||||
@ -129,6 +129,32 @@ proc close*(m: NatPortMapper) =
|
||||
proc isPortMapped*(m: NatPortMapper, port: Port): bool =
|
||||
m.activeTcpPort.isSome and m.activeTcpPort.get == port
|
||||
|
||||
proc announcePeerInfoAddrs*(discovery: Discovery, peerInfo: PeerInfo, udpPort: Port) =
|
||||
## Announces peerInfo.addrs to the DHT, excluding relay circuit addresses:
|
||||
## they are announced via onReservation and must not enter the DHT routing
|
||||
## record. No-op when the addresses are already announced, so peerInfo
|
||||
## updates that only touch filtered-out addresses do not re-announce.
|
||||
let addrs = peerInfo.addrs.filterIt(not it.isCircuitRelayMA())
|
||||
if addrs.len == 0 or addrs == discovery.announceAddrs:
|
||||
return
|
||||
discovery.updateRecordsAndSpr(addrs, udpPort = udpPort)
|
||||
|
||||
proc setupPeerInfoObserver*(
|
||||
switch: Switch, autonat: AutonatV2Service, discovery: Discovery, udpPort: Port
|
||||
): PeerInfoObserver =
|
||||
## AutoNAT's address mapper resolves peerInfo.addrs into public addresses
|
||||
## once the node is Reachable; peerInfo.update() then notifies observers.
|
||||
## Keep the DHT records in sync with what libp2p announces.
|
||||
let observer: PeerInfoObserver = proc(peerInfo: PeerInfo) {.gcsafe, raises: [].} =
|
||||
info "PeerInfo updated",
|
||||
addrs = peerInfo.addrs, reachability = autonat.networkReachability
|
||||
if autonat.networkReachability != NetworkReachability.Reachable:
|
||||
return
|
||||
announcePeerInfoAddrs(discovery, peerInfo, udpPort)
|
||||
|
||||
switch.peerInfo.addObserver(observer)
|
||||
observer
|
||||
|
||||
method handleNatStatus*(
|
||||
m: NatPortMapper,
|
||||
networkReachability: NetworkReachability,
|
||||
@ -142,19 +168,13 @@ method handleNatStatus*(
|
||||
of Unknown:
|
||||
discard
|
||||
of Reachable:
|
||||
if dialBackAddr.isNone:
|
||||
warn "Got empty dialback address in AutoNat when node is Reachable"
|
||||
# Reachable but no address to announce: incomplete information, do nothing
|
||||
# and wait for the next AutoNAT cycle.
|
||||
return
|
||||
|
||||
if autoRelayService.isRunning:
|
||||
await autoRelayService.stop(switch)
|
||||
debug "AutoRelayService stopped"
|
||||
|
||||
# Update the record first, then flip to server mode: otherwise the node
|
||||
# briefly serves DHT queries with the previous (possibly empty) record.
|
||||
discovery.updateRecordsAndSpr(@[dialBackAddr.get], udpPort = discoveryPort)
|
||||
# No announce here: AutoNAT refreshes peerInfo right after this handler,
|
||||
# its address mapper (active now that the node is Reachable) resolves the
|
||||
# public addresses and the peerInfo observer announces them.
|
||||
discovery.protocol.clientMode = false
|
||||
of NotReachable:
|
||||
var hasPortMapping = false
|
||||
|
||||
@ -63,7 +63,7 @@ type
|
||||
natMapper*: Option[NatPortMapper]
|
||||
natRouter*: Option[NatRouter]
|
||||
holePunchHandler: Option[connmanager.PeerEventHandler]
|
||||
observedAddrMapper: Option[AddressMapper]
|
||||
peerInfoObserver: Option[PeerInfoObserver]
|
||||
isStarted: bool
|
||||
|
||||
StoragePrivateKey* = libp2p.PrivateKey # alias
|
||||
@ -139,11 +139,9 @@ proc start*(s: StorageServer) {.async.} =
|
||||
except CatchableError as e:
|
||||
warn "Cannot connect to bootstrap node", error = e.msg
|
||||
|
||||
# Refresh peerInfo.addrs so the observed address collected during the bootstrap
|
||||
# Identify exchange is applied to peerInfo via the address mapper, then start
|
||||
# AutoNAT here (we own it, it is not in switch.services) so its first probe targets
|
||||
# the now-connected bootstrap peers instead of firing at switch.start on no peers.
|
||||
await s.storageNode.switch.peerInfo.update()
|
||||
# Start AutoNAT here (we own it, it is not in switch.services) so its first
|
||||
# probe targets the now-connected bootstrap peers instead of firing at
|
||||
# switch.start on no peers.
|
||||
if s.autonatService.isSome:
|
||||
await s.autonatService.get.start(s.storageNode.switch)
|
||||
|
||||
@ -167,10 +165,8 @@ proc stop*(s: StorageServer) {.async.} =
|
||||
s.holePunchHandler.get, PeerEventKind.Joined
|
||||
)
|
||||
|
||||
if s.observedAddrMapper.isSome:
|
||||
s.storageNode.switch.peerInfo.addressMappers.keepItIf(
|
||||
it != s.observedAddrMapper.get
|
||||
)
|
||||
if s.peerInfoObserver.isSome:
|
||||
s.storageNode.switch.peerInfo.removeObserver(s.peerInfoObserver.get)
|
||||
|
||||
var futures = @[
|
||||
s.storageNode.switch.stop(),
|
||||
@ -313,11 +309,6 @@ proc new*(
|
||||
numPeersToAsk = config.natNumPeersToAsk,
|
||||
maxQueueSize = config.natMaxQueueSize,
|
||||
minConfidence = config.natMinConfidence,
|
||||
# The AddressMapper in libp2p injects the observed address
|
||||
# only when the node is detected Reachable.
|
||||
# We need it before, so we define our custom mapper below,
|
||||
# and disable this one to avoid having 2 mappers.
|
||||
enableAddressMapper = false,
|
||||
)
|
||||
)
|
||||
# At the first AutoNAT probe, the only identify observations available come
|
||||
@ -365,20 +356,6 @@ proc new*(
|
||||
else:
|
||||
none(AutonatV2Service)
|
||||
|
||||
# Inject observed addresses into peerInfo.addrs so AutoNAT advertises a
|
||||
# dialable (public) address. nim-libp2p collects observations via Identify
|
||||
# but does not wire them into peerInfo automatically; without this, the
|
||||
# AutoNAT DialRequest carries only private listen addresses and the server
|
||||
# responds EDialRefused.
|
||||
var observedAddrMapper: Option[AddressMapper]
|
||||
if not config.autonatServer and not config.nat.hasExtIp:
|
||||
let mapper: AddressMapper = proc(
|
||||
addrs: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} =
|
||||
addrs.mapIt(switch.peerStore.guessDialableAddr(it))
|
||||
switch.peerInfo.addressMappers.add(mapper)
|
||||
observedAddrMapper = some(mapper)
|
||||
|
||||
# Storage infrastructure
|
||||
|
||||
try:
|
||||
@ -482,6 +459,7 @@ proc new*(
|
||||
var natMapper: Option[NatPortMapper]
|
||||
var autoRelayService: Option[AutoRelayService]
|
||||
var holePunchHandler: Option[connmanager.PeerEventHandler]
|
||||
var peerInfoObserver: Option[PeerInfoObserver]
|
||||
|
||||
if autonatService.isSome:
|
||||
let relayService = AutoRelayService.new(
|
||||
@ -512,6 +490,10 @@ proc new*(
|
||||
if natRouter.isSome:
|
||||
natRouter.get.natMapper = natMapper
|
||||
|
||||
peerInfoObserver = some(
|
||||
setupPeerInfoObserver(switch, autonatService.get, discovery, config.discoveryPort)
|
||||
)
|
||||
|
||||
autonatService.get.setStatusAndConfidenceHandler(
|
||||
proc(
|
||||
networkReachability: NetworkReachability,
|
||||
@ -556,5 +538,5 @@ proc new*(
|
||||
natMapper: natMapper,
|
||||
natRouter: natRouter,
|
||||
holePunchHandler: holePunchHandler,
|
||||
observedAddrMapper: observedAddrMapper,
|
||||
peerInfoObserver: peerInfoObserver,
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user