From e8335c14ebdfc550e2575dd91a50b46d8029e640 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Tue, 12 May 2026 10:05:58 +0400 Subject: [PATCH] Integrate nat simulation and expose auto relayse service to api --- storage/rest/api.nim | 31 ++++++++++++- storage/storage.nim | 107 +++++++++++++++++++++++++++++-------------- 2 files changed, 101 insertions(+), 37 deletions(-) diff --git a/storage/rest/api.nim b/storage/rest/api.nim index 5047686d..2079d3bc 100644 --- a/storage/rest/api.nim +++ b/storage/rest/api.nim @@ -24,6 +24,7 @@ import pkg/confutils import pkg/libp2p import pkg/libp2p/routing_record import pkg/libp2p/protocols/connectivity/autonatv2/service +import pkg/libp2p/services/autorelayservice import pkg/codexdht/discv5/spr as spr import ../logutils @@ -38,6 +39,7 @@ import ../stores/repostore import ../blockexchange import ../units import ../utils/options +import ../utils/natsimulation import ./coders import ./json @@ -562,6 +564,8 @@ proc initDebugApi( node: StorageNodeRef, conf: StorageConf, autonat: Option[AutonatV2Service], + autoRelay: Option[AutoRelayService], + natRouter: Option[NatRouter], router: var RestRouter, ) = let allowedOrigin = router.allowedOrigin @@ -588,7 +592,8 @@ proc initDebugApi( if autonat.isSome: $autonat.get.networkReachability else: - "unknown" + "unknown", + "relayRunning": autoRelay.isSome and autoRelay.get.isRunning, }, } @@ -626,6 +631,26 @@ proc initDebugApi( trace "Excepting processing request", exc = exc.msg return RestApiResponse.error(Http500, headers = headers) + router.api(MethodPost, "/api/storage/v1/debug/nat/filtering") do( + filtering: Option[string] + ) -> RestApiResponse: + var headers = buildCorsHeaders("POST", allowedOrigin) + + without natSimulation =? natRouter: + return RestApiResponse.error( + Http400, "NAT simulation not active on this node", headers = headers + ) + + without res =? filtering and filtering =? res: + return + RestApiResponse.error(Http400, "Missing filtering value", headers = headers) + + let behavior = FilteringBehavior.fromString(filtering).valueOr: + return RestApiResponse.error(Http400, "Invalid filtering value", headers = headers) + + natSimulation.setFiltering(behavior) + return RestApiResponse.response("", headers = headers) + when storage_enable_api_debug_peers: router.api(MethodGet, "/api/storage/v1/debug/peer/{peerId}") do( peerId: PeerId @@ -651,12 +676,14 @@ proc initRestApi*( conf: StorageConf, repoStore: RepoStore, autonat: Option[AutonatV2Service], + autoRelay: Option[AutoRelayService], + natRouter: Option[NatRouter], corsAllowedOrigin: ?string, ): RestRouter = var router = RestRouter.init(validate, corsAllowedOrigin) initDataApi(node, repoStore, router) initNodeApi(node, conf, router) - initDebugApi(node, conf, autonat, router) + initDebugApi(node, conf, autonat, autoRelay, natRouter, router) return router diff --git a/storage/storage.nim b/storage/storage.nim index 302056b3..d9ca82c4 100644 --- a/storage/storage.nim +++ b/storage/storage.nim @@ -19,6 +19,7 @@ import pkg/presto import pkg/libp2p import pkg/libp2p/protocols/connectivity/autonatv2/[service, client] import pkg/libp2p/protocols/connectivity/relay/client as relayClientModule +import pkg/libp2p/protocols/connectivity/relay/relay as relayModule import pkg/libp2p/services/autorelayservice import pkg/confutils import pkg/confutils/defs @@ -40,6 +41,7 @@ import ./namespaces import ./storagetypes import ./logutils import ./nat +import ./utils/natsimulation logScope: topics = "storage node" @@ -53,9 +55,11 @@ type repoStore: RepoStore maintenance: BlockMaintainer taskpool: Taskpool + # Expose to make reachability accessible from rest api autonatService*: Option[AutonatV2Service] - autoRelayService: AutoRelayService - natMapper: NatMapper + autoRelayService: Option[AutoRelayService] + natMapper: Option[NatMapper] + natRouter*: Option[NatRouter] isStarted: bool StoragePrivateKey* = libp2p.PrivateKey # alias @@ -127,8 +131,8 @@ proc stop*(s: StorageServer) {.async.} = notice "Stopping Storage node" - if s.natMapper != nil: - s.natMapper.close() + if s.natMapper.isSome: + s.natMapper.get.close() var futures = @[ s.storageNode.switch.stop(), @@ -137,6 +141,12 @@ proc stop*(s: StorageServer) {.async.} = s.maintenance.stop(), ] + if s.autoRelayService.isSome and s.autoRelayService.get.isRunning: + proc stopAutoRelay(): Future[void] {.async: (raises: []).} = + discard await noCancel s.autoRelayService.get.stop(s.storageNode.switch) + + futures.add(stopAutoRelay()) + if s.restServer != nil: futures.add(s.restServer.stop()) @@ -194,8 +204,6 @@ proc new*( ## create StorageServer including setting up datastore, repostore, etc let listenMultiAddr = getMultiAddrWithIpAndTcpPort(config.listenIp, config.listenPort) - let relayClient = relayClientModule.RelayClient.new(canHop = config.relay) - let autonatClient = AutonatV2Client.new(random.Rng.instance()) let autonatService = if config.nat.hasExtIp: @@ -215,7 +223,14 @@ proc new*( ) ) - let switch = SwitchBuilder + let relayClient = RelayClient.new() + let relay: Relay = + if config.relay: + Relay.new() + else: + relayClient + + let switchBuilder = SwitchBuilder .new() .withPrivateKey(privateKey) .withAddresses(@[listenMultiAddr], enableWildcardResolver = true) @@ -226,16 +241,29 @@ proc new*( .withMaxConnections(config.maxPeers) .withAgentVersion(config.agentString) .withSignedPeerRecord(true) - .withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}) .withAutonatV2Server() - .withCircuitRelay(relayClient) + .withCircuitRelay(relay) .withServices( if autonatService.isSome: @[Service(autonatService.get)] else: @[] ) - .build() + + var natRouter: Option[NatRouter] + let switch = + if config.natSimulation.isSome: + let filtering = FilteringBehavior.fromString(config.natSimulation.get) + .valueOr(AddressAndPortDependent) + let router = NatRouter.new(filtering) + natRouter = some(router) + switchBuilder + .withNatTransport(router, {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}) + .build() + else: + switchBuilder + .withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}) + .build() var taskPool: Taskpool autonatClient.setup(switch) @@ -355,7 +383,18 @@ proc new*( taskPool = taskPool, ) - autoRelayService = AutoRelayService.new( + var natMapper: Option[NatMapper] + var autoRelayService: Option[AutoRelayService] + + if autonatService.isSome: + natMapper = some( + NatMapper( + natConfig: config.nat, + tcpPort: config.listenPort, + discoveryPort: config.discoveryPort, + ) + ) + let relayService = AutoRelayService.new( maxNumRelays = config.natMaxRelays, client = relayClient, onReservation = proc(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} = @@ -365,29 +404,8 @@ proc new*( rng = random.Rng.instance(), ) - var restServer: RestServerRef = nil + autoRelayService = some(relayService) - if config.apiBindAddress.isSome: - restServer = RestServerRef - .new( - storageNode.initRestApi( - config, repoStore, autonatService, config.apiCorsAllowedOrigin - ), - initTAddress(config.apiBindAddress.get(), config.apiPort), - bufferSize = (1024 * 64), - maxRequestBodySize = int.high, - ) - .expect("Should create rest server!") - - switch.mount(network) - switch.mount(manifestProto) - - let natMapper = NatMapper( - natConfig: config.nat, - tcpPort: config.listenPort, - discoveryPort: config.discoveryPort, - ) - if autonatService.isSome: autonatService.get.setStatusAndConfidenceHandler( proc( networkReachability: NetworkReachability, @@ -395,12 +413,30 @@ proc new*( addrs: Opt[MultiAddress], ) {.async: (raises: [CancelledError]).} = debug "AutoNAT status", reachability = networkReachability, confidence - await natMapper.handleNatStatus( + await natMapper.get.handleNatStatus( networkReachability, addrs, config.discoveryPort, discovery, switch, - autoRelayService, + relayService, ) ) + switch.mount(network) + switch.mount(manifestProto) + + var restServer: RestServerRef = nil + + if config.apiBindAddress.isSome: + restServer = RestServerRef + .new( + storageNode.initRestApi( + config, repoStore, autonatService, autoRelayService, natRouter, + config.apiCorsAllowedOrigin, + ), + initTAddress(config.apiBindAddress.get(), config.apiPort), + bufferSize = (1024 * 64), + maxRequestBodySize = int.high, + ) + .expect("Should create rest server!") + StorageServer( config: config, storageNode: storageNode, @@ -412,4 +448,5 @@ proc new*( autonatService: autonatService, autoRelayService: autoRelayService, natMapper: natMapper, + natRouter: natRouter, )