mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-06-28 05:19:28 +00:00
Integrate nat simulation and expose auto relayse service to api
This commit is contained in:
parent
b8dc03b4aa
commit
e8335c14eb
@ -24,6 +24,7 @@ import pkg/confutils
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/routing_record
|
||||
import pkg/libp2p/protocols/connectivity/autonatv2/service
|
||||
import pkg/libp2p/services/autorelayservice
|
||||
import pkg/codexdht/discv5/spr as spr
|
||||
|
||||
import ../logutils
|
||||
@ -38,6 +39,7 @@ import ../stores/repostore
|
||||
import ../blockexchange
|
||||
import ../units
|
||||
import ../utils/options
|
||||
import ../utils/natsimulation
|
||||
|
||||
import ./coders
|
||||
import ./json
|
||||
@ -562,6 +564,8 @@ proc initDebugApi(
|
||||
node: StorageNodeRef,
|
||||
conf: StorageConf,
|
||||
autonat: Option[AutonatV2Service],
|
||||
autoRelay: Option[AutoRelayService],
|
||||
natRouter: Option[NatRouter],
|
||||
router: var RestRouter,
|
||||
) =
|
||||
let allowedOrigin = router.allowedOrigin
|
||||
@ -588,7 +592,8 @@ proc initDebugApi(
|
||||
if autonat.isSome:
|
||||
$autonat.get.networkReachability
|
||||
else:
|
||||
"unknown"
|
||||
"unknown",
|
||||
"relayRunning": autoRelay.isSome and autoRelay.get.isRunning,
|
||||
},
|
||||
}
|
||||
|
||||
@ -626,6 +631,26 @@ proc initDebugApi(
|
||||
trace "Excepting processing request", exc = exc.msg
|
||||
return RestApiResponse.error(Http500, headers = headers)
|
||||
|
||||
router.api(MethodPost, "/api/storage/v1/debug/nat/filtering") do(
|
||||
filtering: Option[string]
|
||||
) -> RestApiResponse:
|
||||
var headers = buildCorsHeaders("POST", allowedOrigin)
|
||||
|
||||
without natSimulation =? natRouter:
|
||||
return RestApiResponse.error(
|
||||
Http400, "NAT simulation not active on this node", headers = headers
|
||||
)
|
||||
|
||||
without res =? filtering and filtering =? res:
|
||||
return
|
||||
RestApiResponse.error(Http400, "Missing filtering value", headers = headers)
|
||||
|
||||
let behavior = FilteringBehavior.fromString(filtering).valueOr:
|
||||
return RestApiResponse.error(Http400, "Invalid filtering value", headers = headers)
|
||||
|
||||
natSimulation.setFiltering(behavior)
|
||||
return RestApiResponse.response("", headers = headers)
|
||||
|
||||
when storage_enable_api_debug_peers:
|
||||
router.api(MethodGet, "/api/storage/v1/debug/peer/{peerId}") do(
|
||||
peerId: PeerId
|
||||
@ -651,12 +676,14 @@ proc initRestApi*(
|
||||
conf: StorageConf,
|
||||
repoStore: RepoStore,
|
||||
autonat: Option[AutonatV2Service],
|
||||
autoRelay: Option[AutoRelayService],
|
||||
natRouter: Option[NatRouter],
|
||||
corsAllowedOrigin: ?string,
|
||||
): RestRouter =
|
||||
var router = RestRouter.init(validate, corsAllowedOrigin)
|
||||
|
||||
initDataApi(node, repoStore, router)
|
||||
initNodeApi(node, conf, router)
|
||||
initDebugApi(node, conf, autonat, router)
|
||||
initDebugApi(node, conf, autonat, autoRelay, natRouter, router)
|
||||
|
||||
return router
|
||||
|
||||
@ -19,6 +19,7 @@ import pkg/presto
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/protocols/connectivity/autonatv2/[service, client]
|
||||
import pkg/libp2p/protocols/connectivity/relay/client as relayClientModule
|
||||
import pkg/libp2p/protocols/connectivity/relay/relay as relayModule
|
||||
import pkg/libp2p/services/autorelayservice
|
||||
import pkg/confutils
|
||||
import pkg/confutils/defs
|
||||
@ -40,6 +41,7 @@ import ./namespaces
|
||||
import ./storagetypes
|
||||
import ./logutils
|
||||
import ./nat
|
||||
import ./utils/natsimulation
|
||||
|
||||
logScope:
|
||||
topics = "storage node"
|
||||
@ -53,9 +55,11 @@ type
|
||||
repoStore: RepoStore
|
||||
maintenance: BlockMaintainer
|
||||
taskpool: Taskpool
|
||||
# Expose to make reachability accessible from rest api
|
||||
autonatService*: Option[AutonatV2Service]
|
||||
autoRelayService: AutoRelayService
|
||||
natMapper: NatMapper
|
||||
autoRelayService: Option[AutoRelayService]
|
||||
natMapper: Option[NatMapper]
|
||||
natRouter*: Option[NatRouter]
|
||||
isStarted: bool
|
||||
|
||||
StoragePrivateKey* = libp2p.PrivateKey # alias
|
||||
@ -127,8 +131,8 @@ proc stop*(s: StorageServer) {.async.} =
|
||||
|
||||
notice "Stopping Storage node"
|
||||
|
||||
if s.natMapper != nil:
|
||||
s.natMapper.close()
|
||||
if s.natMapper.isSome:
|
||||
s.natMapper.get.close()
|
||||
|
||||
var futures = @[
|
||||
s.storageNode.switch.stop(),
|
||||
@ -137,6 +141,12 @@ proc stop*(s: StorageServer) {.async.} =
|
||||
s.maintenance.stop(),
|
||||
]
|
||||
|
||||
if s.autoRelayService.isSome and s.autoRelayService.get.isRunning:
|
||||
proc stopAutoRelay(): Future[void] {.async: (raises: []).} =
|
||||
discard await noCancel s.autoRelayService.get.stop(s.storageNode.switch)
|
||||
|
||||
futures.add(stopAutoRelay())
|
||||
|
||||
if s.restServer != nil:
|
||||
futures.add(s.restServer.stop())
|
||||
|
||||
@ -194,8 +204,6 @@ proc new*(
|
||||
## create StorageServer including setting up datastore, repostore, etc
|
||||
let listenMultiAddr = getMultiAddrWithIpAndTcpPort(config.listenIp, config.listenPort)
|
||||
|
||||
let relayClient = relayClientModule.RelayClient.new(canHop = config.relay)
|
||||
|
||||
let autonatClient = AutonatV2Client.new(random.Rng.instance())
|
||||
let autonatService =
|
||||
if config.nat.hasExtIp:
|
||||
@ -215,7 +223,14 @@ proc new*(
|
||||
)
|
||||
)
|
||||
|
||||
let switch = SwitchBuilder
|
||||
let relayClient = RelayClient.new()
|
||||
let relay: Relay =
|
||||
if config.relay:
|
||||
Relay.new()
|
||||
else:
|
||||
relayClient
|
||||
|
||||
let switchBuilder = SwitchBuilder
|
||||
.new()
|
||||
.withPrivateKey(privateKey)
|
||||
.withAddresses(@[listenMultiAddr], enableWildcardResolver = true)
|
||||
@ -226,16 +241,29 @@ proc new*(
|
||||
.withMaxConnections(config.maxPeers)
|
||||
.withAgentVersion(config.agentString)
|
||||
.withSignedPeerRecord(true)
|
||||
.withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
|
||||
.withAutonatV2Server()
|
||||
.withCircuitRelay(relayClient)
|
||||
.withCircuitRelay(relay)
|
||||
.withServices(
|
||||
if autonatService.isSome:
|
||||
@[Service(autonatService.get)]
|
||||
else:
|
||||
@[]
|
||||
)
|
||||
.build()
|
||||
|
||||
var natRouter: Option[NatRouter]
|
||||
let switch =
|
||||
if config.natSimulation.isSome:
|
||||
let filtering = FilteringBehavior.fromString(config.natSimulation.get)
|
||||
.valueOr(AddressAndPortDependent)
|
||||
let router = NatRouter.new(filtering)
|
||||
natRouter = some(router)
|
||||
switchBuilder
|
||||
.withNatTransport(router, {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
|
||||
.build()
|
||||
else:
|
||||
switchBuilder
|
||||
.withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
|
||||
.build()
|
||||
|
||||
var taskPool: Taskpool
|
||||
autonatClient.setup(switch)
|
||||
@ -355,7 +383,18 @@ proc new*(
|
||||
taskPool = taskPool,
|
||||
)
|
||||
|
||||
autoRelayService = AutoRelayService.new(
|
||||
var natMapper: Option[NatMapper]
|
||||
var autoRelayService: Option[AutoRelayService]
|
||||
|
||||
if autonatService.isSome:
|
||||
natMapper = some(
|
||||
NatMapper(
|
||||
natConfig: config.nat,
|
||||
tcpPort: config.listenPort,
|
||||
discoveryPort: config.discoveryPort,
|
||||
)
|
||||
)
|
||||
let relayService = AutoRelayService.new(
|
||||
maxNumRelays = config.natMaxRelays,
|
||||
client = relayClient,
|
||||
onReservation = proc(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} =
|
||||
@ -365,29 +404,8 @@ proc new*(
|
||||
rng = random.Rng.instance(),
|
||||
)
|
||||
|
||||
var restServer: RestServerRef = nil
|
||||
autoRelayService = some(relayService)
|
||||
|
||||
if config.apiBindAddress.isSome:
|
||||
restServer = RestServerRef
|
||||
.new(
|
||||
storageNode.initRestApi(
|
||||
config, repoStore, autonatService, config.apiCorsAllowedOrigin
|
||||
),
|
||||
initTAddress(config.apiBindAddress.get(), config.apiPort),
|
||||
bufferSize = (1024 * 64),
|
||||
maxRequestBodySize = int.high,
|
||||
)
|
||||
.expect("Should create rest server!")
|
||||
|
||||
switch.mount(network)
|
||||
switch.mount(manifestProto)
|
||||
|
||||
let natMapper = NatMapper(
|
||||
natConfig: config.nat,
|
||||
tcpPort: config.listenPort,
|
||||
discoveryPort: config.discoveryPort,
|
||||
)
|
||||
if autonatService.isSome:
|
||||
autonatService.get.setStatusAndConfidenceHandler(
|
||||
proc(
|
||||
networkReachability: NetworkReachability,
|
||||
@ -395,12 +413,30 @@ proc new*(
|
||||
addrs: Opt[MultiAddress],
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
debug "AutoNAT status", reachability = networkReachability, confidence
|
||||
await natMapper.handleNatStatus(
|
||||
await natMapper.get.handleNatStatus(
|
||||
networkReachability, addrs, config.discoveryPort, discovery, switch,
|
||||
autoRelayService,
|
||||
relayService,
|
||||
)
|
||||
)
|
||||
|
||||
switch.mount(network)
|
||||
switch.mount(manifestProto)
|
||||
|
||||
var restServer: RestServerRef = nil
|
||||
|
||||
if config.apiBindAddress.isSome:
|
||||
restServer = RestServerRef
|
||||
.new(
|
||||
storageNode.initRestApi(
|
||||
config, repoStore, autonatService, autoRelayService, natRouter,
|
||||
config.apiCorsAllowedOrigin,
|
||||
),
|
||||
initTAddress(config.apiBindAddress.get(), config.apiPort),
|
||||
bufferSize = (1024 * 64),
|
||||
maxRequestBodySize = int.high,
|
||||
)
|
||||
.expect("Should create rest server!")
|
||||
|
||||
StorageServer(
|
||||
config: config,
|
||||
storageNode: storageNode,
|
||||
@ -412,4 +448,5 @@ proc new*(
|
||||
autonatService: autonatService,
|
||||
autoRelayService: autoRelayService,
|
||||
natMapper: natMapper,
|
||||
natRouter: natRouter,
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user