2026-05-07 10:36:51 +04:00

362 lines
10 KiB
Nim

## Logos Storage
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/os
import std/tables
import std/cpuinfo
import std/net
import std/sequtils
import pkg/chronos
import pkg/taskpools
import pkg/presto
import pkg/libp2p
import pkg/libp2p/protocols/connectivity/autonat/[service, client]
import pkg/confutils
import pkg/confutils/defs
import pkg/stew/io2
import pkg/datastore
import pkg/stew/io2
import ./node
import ./manifest/protocol
import ./conf
import ./rng as random
import ./rest/api
import ./stores
import ./blockexchange
import ./utils/fileutils
import ./discovery
import ./utils/addrutils
import ./utils/natutils
import ./namespaces
import ./storagetypes
import ./logutils
import ./nat
import ./utils/natutils
logScope:
topics = "storage node"
type
StorageServer* = ref object
config: StorageConf
logFile*: Option[IoHandle]
restServer: RestServerRef
storageNode: StorageNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
taskpool: Taskpool
autonatService*: AutonatService
isStarted: bool
StoragePrivateKey* = libp2p.PrivateKey # alias
func config*(self: StorageServer): StorageConf =
return self.config
func node*(self: StorageServer): StorageNodeRef =
return self.storageNode
func repoStore*(self: StorageServer): RepoStore =
return self.repoStore
proc start*(s: StorageServer) {.async.} =
if s.isStarted:
warn "Storage server already started, skipping"
return
trace "Starting Storage node", config = $s.config
await s.repoStore.start()
s.maintenance.start()
await s.storageNode.switch.start()
let announceIp =
if s.config.nat.hasExtIp:
some(s.config.nat.extIp)
else:
getBestLocalAddress(s.config.listenIp)
if announceIp.isNone:
# We should have an IP, even at private IP
raise newException(StorageError, "Unable to determine an IP address to announce")
# Remap switch addresses to the resolved IP (replaces 0.0.0.0 or :: with the actual address),
# keeping unique entries only.
let announceAddrs = s.storageNode.switch.peerInfo.addrs
.mapIt(it.remapAddr(ip = announceIp, port = none(Port)))
.deduplicate()
let discoveryAddrs =
@[getMultiAddrWithIPAndUDPPort(announceIp.get, s.config.discoveryPort)]
s.storageNode.discovery.updateDhtRecord(announceAddrs & discoveryAddrs)
s.storageNode.discovery.updateAnnounceRecord(announceAddrs)
var hasPublicAddr = false
for announceAddr in announceAddrs:
let (maybeIp, _) = getAddressAndPort(announceAddr)
if maybeIp.isSome and maybeIp.get.isGlobalUnicast():
hasPublicAddr = true
break
if not hasPublicAddr:
warn "Unable to determine a public IP address. This node will only be reachable on a private network."
await s.storageNode.start()
for spr in s.config.bootstrapNodes:
try:
let addrs = spr.data.addresses.mapIt(it.address)
await s.storageNode.switch.connect(spr.data.peerId, addrs)
except CatchableError as e:
warn "Cannot connect to bootstrap node", error = e.msg
discard
if s.restServer != nil:
s.restServer.start()
s.isStarted = true
proc stop*(s: StorageServer) {.async.} =
if not s.isStarted:
warn "Storage is not started"
return
notice "Stopping Storage node"
var futures = @[
s.storageNode.switch.stop(),
s.storageNode.stop(),
s.repoStore.stop(),
s.maintenance.stop(),
]
if s.restServer != nil:
futures.add(s.restServer.stop())
let res = await noCancel allFinishedFailed[void](futures)
s.isStarted = false
if res.failure.len > 0:
error "Failed to stop Storage node", failures = res.failure.len
raise newException(
StorageError,
"Failed to stop Storage node: " & res.failure.mapIt(it.error.msg).join(", "),
)
proc close*(s: StorageServer) {.async.} =
var futures =
@[s.storageNode.close(), s.repoStore.close(), s.storageNode.discovery.close()]
let res = await noCancel allFinishedFailed[void](futures)
if not s.taskpool.isNil:
try:
s.taskpool.shutdown()
except Exception as exc:
error "Failed to stop the taskpool", failures = res.failure.len
raise newException(StorageError, "Failure in taskpool shutdown: " & exc.msg)
when defaultChroniclesStream.outputs.type.arity >= 3:
proc noOutput(logLevel: LogLevel, msg: LogOutputStr) =
discard
defaultChroniclesStream.outputs[2].writer = noOutput
if s.logFile.isSome:
if error =? closeFile(s.logFile.get()).errorOption:
error "Failed to close log file", errorCode = $error
if res.failure.len > 0:
error "Failed to close Storage node", failures = res.failure.len
raise newException(
StorageError,
"Failed to close Storage node: " & res.failure.mapIt(it.error.msg).join(", "),
)
proc shutdown*(server: StorageServer) {.async.} =
await server.stop()
await server.close()
proc new*(
T: type StorageServer,
config: StorageConf,
privateKey: StoragePrivateKey,
logFile: Option[IoHandle] = IoHandle.none,
): StorageServer =
## create StorageServer including setting up datastore, repostore, etc
let listenMultiAddr = getMultiAddrWithIpAndTcpPort(config.listenIp, config.listenPort)
let autonatService = AutonatService.new(
autonatClient = AutonatClient.new(),
rng = random.Rng.instance(),
scheduleInterval = Opt.some(config.natScheduleInterval),
askNewConnectedPeers = true,
numPeersToAsk = config.natNumPeersToAsk,
maxQueueSize = config.natMaxQueueSize,
minConfidence = config.natMinConfidence,
)
let switch = SwitchBuilder
.new()
.withPrivateKey(privateKey)
.withAddresses(@[listenMultiAddr])
.withRng(random.Rng.instance())
.withNoise()
.withYamux()
.withMaxConnections(config.maxPeers)
.withAgentVersion(config.agentString)
.withSignedPeerRecord(true)
.withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
.withAutonat()
.withServices(@[Service(autonatService)])
.build()
var
cache: CacheStore = nil
taskPool: Taskpool
try:
if config.numThreads == ThreadCount(0):
taskPool = Taskpool.new(numThreads = min(countProcessors(), 16))
else:
taskPool = Taskpool.new(numThreads = int(config.numThreads))
info "Threadpool started", numThreads = taskPool.numThreads
except CatchableError as exc:
raiseAssert("Failure in taskPool initialization:" & exc.msg)
if config.cacheSize > 0'nb:
cache = CacheStore.new(cacheSize = config.cacheSize)
## Is unused?
let discoveryDir = config.dataDir / StorageDhtNamespace
if io2.createPath(discoveryDir).isErr:
trace "Unable to create discovery directory for block store",
discoveryDir = discoveryDir
raise (ref Defect)(
msg: "Unable to create discovery directory for block store: " & discoveryDir
)
let providersPath = config.dataDir / StorageDhtProvidersNamespace
let discoveryStoreRes = LevelDbDatastore.new(providersPath)
if discoveryStoreRes.isErr:
error "Failed to initialize discovery datastore",
path = providersPath, err = discoveryStoreRes.error.msg
let
discoveryStore =
Datastore(discoveryStoreRes.expect("Should create discovery datastore!"))
discovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[listenMultiAddr],
bindPort = config.discoveryPort,
bootstrapNodes = config.bootstrapNodes,
store = discoveryStore,
)
network = BlockExcNetwork.new(switch)
repoData =
case config.repoKind
of repoFS:
Datastore(
FSDatastore.new($config.dataDir, depth = 5).expect(
"Should create repo file data store!"
)
)
of repoSQLite:
Datastore(
SQLiteDatastore.new($config.dataDir).expect(
"Should create repo SQLite data store!"
)
)
of repoLevelDb:
Datastore(
LevelDbDatastore.new($config.dataDir).expect(
"Should create repo LevelDB data store!"
)
)
repoStore = RepoStore.new(
repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / StorageMetaNamespace).expect(
"Should create metadata store!"
),
quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl,
)
maintenance = BlockMaintainer.new(
repoStore,
interval = config.blockMaintenanceInterval,
numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks,
)
peerStore = PeerContextStore.new()
downloadManager = DownloadManager.new(retries = config.blockRetries)
advertiser = Advertiser.new(repoStore, discovery)
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery)
engine = BlockExcEngine.new(
repoStore, network, blockDiscovery, advertiser, peerStore, downloadManager
)
store = NetworkStore.new(engine, repoStore)
manifestProto = ManifestProtocol.new(switch, repoStore, discovery)
storageNode = StorageNodeRef.new(
switch = switch,
networkStore = store,
engine = engine,
discovery = discovery,
manifestProto = manifestProto,
taskPool = taskPool,
)
var restServer: RestServerRef = nil
if config.apiBindAddress.isSome:
restServer = RestServerRef
.new(
storageNode.initRestApi(
config, repoStore, autonatService, config.apiCorsAllowedOrigin
),
initTAddress(config.apiBindAddress.get(), config.apiPort),
bufferSize = (1024 * 64),
maxRequestBodySize = int.high,
)
.expect("Should create rest server!")
switch.mount(network)
switch.mount(manifestProto)
autonatService.statusAndConfidenceHandler(
proc(
networkReachability: NetworkReachability, confidence: Opt[float]
) {.async: (raises: [CancelledError]).} =
if networkReachability == NotReachable:
let (announceAddrs, discoveryAddrs) =
nattedAddress(config.nat, switch.peerInfo.addrs, config.discoveryPort)
discovery.updateAnnounceRecord(announceAddrs)
discovery.updateDhtRecord(announceAddrs & discoveryAddrs)
)
StorageServer(
config: config,
storageNode: storageNode,
restServer: restServer,
repoStore: repoStore,
maintenance: maintenance,
taskPool: taskPool,
logFile: logFile,
autonatService: autonatService,
)