mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-05-11 22:09:32 +00:00
Update address announcements update
This commit is contained in:
parent
62af779266
commit
05882b4d41
@ -22,6 +22,7 @@ import pkg/codexdht/discv5/[routing_table, protocol as discv5]
|
||||
from pkg/nimcrypto import keccak256
|
||||
|
||||
import ./rng
|
||||
import ./utils/addrutils
|
||||
import ./errors
|
||||
import ./logutils
|
||||
|
||||
@ -175,29 +176,45 @@ method removeProvider*(
|
||||
warn "Error removing provider", peerId = peerId, exc = exc.msg
|
||||
raiseAssert("Unexpected Exception in removeProvider")
|
||||
|
||||
proc updateRecords*(
|
||||
d: Discovery, announceAddrs: openArray[MultiAddress], discoveryPort: Port
|
||||
) =
|
||||
## Update both provider and DHT records from TCP announce addresses.
|
||||
## Discovery (UDP) addresses are derived by remapping announceAddrs to UDP with discoveryPort.
|
||||
## Updates the discv5 SPR once with the full set of addresses.
|
||||
let tcpAddrs = @announceAddrs
|
||||
let udpAddrs =
|
||||
tcpAddrs.mapIt(it.remapAddr(protocol = some("udp"), port = some(discoveryPort)))
|
||||
|
||||
debug "Updating addresses", tcpAddrs, udpAddrs
|
||||
|
||||
d.announceAddrs = tcpAddrs
|
||||
d.providerRecord = SignedPeerRecord
|
||||
.init(d.key, PeerRecord.init(d.peerId, tcpAddrs))
|
||||
.expect("Should construct signed record").some
|
||||
d.dhtRecord = SignedPeerRecord
|
||||
.init(d.key, PeerRecord.init(d.peerId, tcpAddrs & udpAddrs))
|
||||
.expect("Should construct signed record").some
|
||||
|
||||
if not d.protocol.isNil:
|
||||
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
|
||||
|
||||
proc updateAnnounceRecord*(d: Discovery, addrs: openArray[MultiAddress]) =
|
||||
## Update providers record
|
||||
##
|
||||
|
||||
d.announceAddrs = @addrs
|
||||
|
||||
info "Updating announce record", addrs = d.announceAddrs
|
||||
d.providerRecord = SignedPeerRecord
|
||||
.init(d.key, PeerRecord.init(d.peerId, d.announceAddrs))
|
||||
.expect("Should construct signed record").some
|
||||
|
||||
if not d.protocol.isNil:
|
||||
d.protocol.updateRecord(d.providerRecord).expect("Should update SPR")
|
||||
|
||||
proc updateDhtRecord*(d: Discovery, addrs: openArray[MultiAddress]) =
|
||||
## Update providers record
|
||||
##
|
||||
|
||||
proc updateDhtRecord*(
|
||||
d: Discovery, addrs: openArray[MultiAddress]
|
||||
) {.deprecated: "use updateRecords instead".} =
|
||||
info "Updating Dht record", addrs = addrs
|
||||
d.dhtRecord = SignedPeerRecord
|
||||
.init(d.key, PeerRecord.init(d.peerId, @addrs))
|
||||
.expect("Should construct signed record").some
|
||||
|
||||
if not d.protocol.isNil:
|
||||
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
|
||||
|
||||
@ -248,7 +265,7 @@ proc new*(
|
||||
key: key, peerId: PeerId.init(key).expect("Should construct PeerId"), store: store
|
||||
)
|
||||
|
||||
self.updateAnnounceRecord(announceAddrs)
|
||||
self.updateRecords(@[], Port(0))
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# FIXME disable IP limits temporarily so we can run our workshop. Re-enable
|
||||
|
||||
@ -16,7 +16,6 @@ import
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/protocols/connectivity/autonatv2/service
|
||||
import pkg/libp2p/services/autorelayservice
|
||||
|
||||
import ./utils
|
||||
@ -332,10 +331,7 @@ proc handleNatStatus*(
|
||||
if not await autoRelayService.stop(switch):
|
||||
debug "AutoRelayService stop method returned false"
|
||||
|
||||
let discAddr =
|
||||
dialBackAddr.get.remapAddr(protocol = some("udp"), port = some(discoveryPort))
|
||||
discovery.updateAnnounceRecord(@[dialBackAddr.get])
|
||||
discovery.updateDhtRecord(@[discAddr])
|
||||
discovery.updateRecords(@[dialBackAddr.get], discoveryPort)
|
||||
# TODO: switch DHT to server mode
|
||||
of NotReachable:
|
||||
var hasPortMapping = false
|
||||
@ -348,8 +344,6 @@ proc handleNatStatus*(
|
||||
if maybePorts.isSome:
|
||||
let (tcpPort, udpPort) = maybePorts.get()
|
||||
let announceAddress = dialBackAddr.get.remapAddr(port = some(tcpPort))
|
||||
let discoveryAddrs =
|
||||
dialBackAddr.get.remapAddr(protocol = some("udp"), port = some(udpPort))
|
||||
|
||||
# TODO: Try a dial me to make sure we are reachable
|
||||
|
||||
@ -357,8 +351,7 @@ proc handleNatStatus*(
|
||||
if not await autoRelayService.stop(switch):
|
||||
debug "AutoRelayService stop method returned false"
|
||||
|
||||
discovery.updateAnnounceRecord(@[announceAddress])
|
||||
discovery.updateDhtRecord(@[discoveryAddrs])
|
||||
discovery.updateRecords(@[announceAddress], udpPort)
|
||||
|
||||
hasPortMapping = true
|
||||
|
||||
|
||||
@ -36,7 +36,6 @@ import ./blockexchange
|
||||
import ./utils/fileutils
|
||||
import ./discovery
|
||||
import ./utils/addrutils
|
||||
import ./utils/natutils
|
||||
import ./namespaces
|
||||
import ./storagetypes
|
||||
import ./logutils
|
||||
@ -81,22 +80,26 @@ proc start*(s: StorageServer) {.async.} =
|
||||
|
||||
await s.storageNode.switch.start()
|
||||
|
||||
let announceIp =
|
||||
let announceAddrs =
|
||||
if s.config.nat.hasExtIp:
|
||||
some(s.config.nat.extIp)
|
||||
# extip means that we assume the IP is reachable
|
||||
# So we just take the first peer addr and remap it with extip to keep the port only
|
||||
@[
|
||||
s.storageNode.switch.peerInfo.addrs[0].remapAddr(
|
||||
ip = some(s.config.nat.extIp), port = none(Port)
|
||||
)
|
||||
]
|
||||
else:
|
||||
getBestLocalAddress(s.config.listenIp)
|
||||
# If extip is not set, we have 2 choices:
|
||||
# 1- Announce the peer addrs contains detected addresses on the machine.
|
||||
# 2- Wait for AutoNat
|
||||
# The probleme with 1 is that you will certainly announce private addresses
|
||||
# and if you advertise a CID, you will advertise these private addresses.
|
||||
# TODO: DHT client mode
|
||||
#s.storageNode.switch.peerInfo.addrs
|
||||
@[]
|
||||
|
||||
if announceIp.isSome:
|
||||
let ip = announceIp.get
|
||||
let announceAddrs = s.storageNode.switch.peerInfo.addrs
|
||||
.mapIt(it.remapAddr(ip = some(ip), port = none(Port)))
|
||||
.deduplicate()
|
||||
let discAddr = getMultiAddrWithIPAndUDPPort(ip, s.config.discoveryPort)
|
||||
s.storageNode.discovery.updateAnnounceRecord(announceAddrs)
|
||||
s.storageNode.discovery.updateDhtRecord(announceAddrs & @[discAddr])
|
||||
else:
|
||||
warn "Unable to determine a local IP address to announce"
|
||||
s.storageNode.discovery.updateRecords(announceAddrs, s.config.discoveryPort)
|
||||
|
||||
await s.storageNode.start()
|
||||
|
||||
@ -324,8 +327,8 @@ proc new*(
|
||||
client = relayClient,
|
||||
onReservation = proc(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} =
|
||||
debug "Relay reservation updated", addresses
|
||||
discovery.updateAnnounceRecord(addresses)
|
||||
discovery.updateDhtRecord(addresses),
|
||||
# relay addresses are for download traffic only, not DHT routing
|
||||
discovery.updateAnnounceRecord(addresses),
|
||||
rng = random.Rng.instance(),
|
||||
)
|
||||
|
||||
|
||||
@ -14,7 +14,6 @@ import std/strutils
|
||||
import std/options
|
||||
|
||||
import pkg/libp2p
|
||||
import pkg/stew/endians2
|
||||
|
||||
func remapAddr*(
|
||||
address: MultiAddress,
|
||||
|
||||
@ -1,59 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[net, options], pkg/results, chronos, chronicles
|
||||
|
||||
import pkg/libp2p
|
||||
|
||||
type NatStrategy* = enum
|
||||
NatAuto
|
||||
NatUpnp
|
||||
NatPmp
|
||||
|
||||
proc getRouteIpv4*(): Result[IpAddress, cstring] =
|
||||
let
|
||||
publicAddress = TransportAddress(
|
||||
family: AddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1], port: Port(0)
|
||||
)
|
||||
route = getBestRoute(publicAddress)
|
||||
|
||||
if route.source.isUnspecified():
|
||||
err("No best ipv4 route found")
|
||||
else:
|
||||
let ip =
|
||||
try:
|
||||
route.source.address()
|
||||
except ValueError as e:
|
||||
error "Address conversion error", exception = e.name, msg = e.msg
|
||||
return err("Invalid IP address")
|
||||
ok(ip)
|
||||
|
||||
proc getRouteIpv6*(): Result[IpAddress, cstring] =
|
||||
const googleDnsIpv6 = TransportAddress(
|
||||
family: AddressFamily.IPv6,
|
||||
address_v6: [32'u8, 1, 72, 96, 72, 96, 0, 0, 0, 0, 0, 0, 0, 0, 136, 136],
|
||||
port: Port(0),
|
||||
)
|
||||
let route = getBestRoute(googleDnsIpv6)
|
||||
if route.source.isUnspecified():
|
||||
return err("No best ipv6 route found")
|
||||
try:
|
||||
ok(route.source.address())
|
||||
except ValueError as e:
|
||||
error "Address conversion error", exception = e.name, msg = e.msg
|
||||
err("Invalid IP address")
|
||||
|
||||
proc getBestLocalAddress*(bindIp: IpAddress): Option[IpAddress] =
|
||||
## If bindIp is anyLocal (0.0.0.0 or ::), finds the best local IP via routing table.
|
||||
## Otherwise returns bindIp as-is.
|
||||
let bindAddress = initTAddress(bindIp, Port(0))
|
||||
if bindAddress.isAnyLocal():
|
||||
let ip =
|
||||
if bindIp.family == IpAddressFamily.IPv6:
|
||||
getRouteIpv6()
|
||||
else:
|
||||
getRouteIpv4()
|
||||
if ip.isOk():
|
||||
return some(ip.get())
|
||||
return none(IpAddress)
|
||||
else:
|
||||
return some(bindIp)
|
||||
|
||||
@ -213,6 +213,7 @@ template multinodesuite*(suiteName: string, body: untyped) =
|
||||
trace "Setting up test", suite = suiteName, test = currentTestName, nodeConfigs
|
||||
if var clients =? nodeConfigs.clients:
|
||||
failAndTeardownOnError "failed to start client nodes":
|
||||
clients = clients.withExtIp()
|
||||
for config in clients.configs:
|
||||
let node = await startClientNode(config)
|
||||
running.add RunningNode(role: Role.Client, node: node)
|
||||
|
||||
@ -322,6 +322,15 @@ proc withNatScheduleInterval*(
|
||||
config.addCliOption("--nat-schedule-interval", $scheduleInterval)
|
||||
return startConfig
|
||||
|
||||
proc withExtIp*(
|
||||
self: StorageConfigs, idx: int, ip = "127.0.0.1"
|
||||
): StorageConfigs {.raises: [StorageConfigError].} =
|
||||
self.checkBounds idx
|
||||
|
||||
var startConfig = self
|
||||
startConfig.configs[idx].addCliOption("--nat", "extip:" & ip)
|
||||
return startConfig
|
||||
|
||||
proc withExtIp*(
|
||||
self: StorageConfigs, ip = "127.0.0.1"
|
||||
): StorageConfigs {.raises: [StorageConfigError].} =
|
||||
|
||||
@ -10,8 +10,6 @@ import pkg/storage/stores
|
||||
import pkg/storage/blocktype as bt
|
||||
import pkg/storage/blockexchange
|
||||
import pkg/storage/systemclock
|
||||
import pkg/storage/nat
|
||||
import pkg/storage/utils/natutils
|
||||
import pkg/storage/merkletree
|
||||
import pkg/storage/manifest
|
||||
|
||||
@ -218,8 +216,7 @@ proc generateNodes*(
|
||||
|
||||
if config.enableBootstrap:
|
||||
waitFor switch.peerInfo.update()
|
||||
blockDiscovery.updateAnnounceRecord(switch.peerInfo.addrs)
|
||||
blockDiscovery.updateDhtRecord(switch.peerInfo.addrs)
|
||||
blockDiscovery.updateRecords(switch.peerInfo.addrs, bindPort.Port)
|
||||
if blockDiscovery.dhtRecord.isSome:
|
||||
bootstrapNodes.add !blockDiscovery.dhtRecord
|
||||
|
||||
|
||||
@ -14,7 +14,6 @@ import ../../storage/nat
|
||||
import ../../storage/discovery
|
||||
import ../../storage/rng
|
||||
import ../../storage/utils
|
||||
import ../../storage/utils/natutils
|
||||
import ../../storage/utils/addrutils
|
||||
|
||||
type MockNatMapper = ref object of NatMapper
|
||||
@ -44,18 +43,6 @@ suite "nattedPorts":
|
||||
let natConfig = NatConfig(hasExtIp: true, extIp: parseIpAddress("8.8.8.8"))
|
||||
check nattedPorts(natConfig, Port(5000), Port(1234)).isNone
|
||||
|
||||
suite "hasPublicIp":
|
||||
test "hasPublicIp returns true when the address is public":
|
||||
let ma = MultiAddress.init("/ip4/8.8.8.8/tcp/8080").expect("valid")
|
||||
check hasPublicIp(@[ma])
|
||||
|
||||
test "hasPublicIp returns false when the address is private":
|
||||
let ma = MultiAddress.init("/ip4/192.168.1.1/tcp/8080").expect("valid")
|
||||
check not hasPublicIp(@[ma])
|
||||
|
||||
test "hasPublicIp returns false when the address is empty":
|
||||
check not hasPublicIp(@[])
|
||||
|
||||
asyncchecksuite "handleNatStatus":
|
||||
var sw: Switch
|
||||
var key: PrivateKey
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user