diff --git a/config.nims b/config.nims index 5b1ecb00..ed49cfb2 100644 --- a/config.nims +++ b/config.nims @@ -114,7 +114,7 @@ when (NimMajor, NimMinor, NimPatch) >= (1, 6, 11): "BareExcept:off" when (NimMajor, NimMinor) >= (2, 0): --mm: - orc + refc switch("define", "withoutPCRE") diff --git a/storage.nim b/storage.nim index f1f4372a..fb52066f 100644 --- a/storage.nim +++ b/storage.nim @@ -65,6 +65,14 @@ when isMainModule: echo "Invalid value for --log-level. " & err.msg quit QuitFailure + if config.mixEnabled: + if config.mixPoolDir.len == 0: + fatal "mix-enabled requires --mix-pool-dir" + quit QuitFailure + if config.bootstrapNodes.len > 0 and config.dhtMixProxies.len == 0: + fatal "mix-enabled requires at least one --dht-mix-proxy" + quit QuitFailure + if err =? config.setupMetrics().errorOption: fatal "Failed to start metrics server", err = err.msg quit QuitFailure diff --git a/storage/blockexchange/network/network.nim b/storage/blockexchange/network/network.nim index 1d7ebafb..2a5e326b 100644 --- a/storage/blockexchange/network/network.nim +++ b/storage/blockexchange/network/network.nim @@ -9,6 +9,7 @@ import std/tables import std/sequtils +import std/sets import pkg/chronos @@ -72,6 +73,7 @@ type BlockExcNetwork* = ref object of LPProtocol peers*: Table[PeerId, NetworkPeer] + excludedPeers: HashSet[PeerId] switch*: Switch handlers*: BlockExcHandlers request*: BlockExcRequest @@ -258,9 +260,15 @@ proc dropPeer*( except CatchableError as error: warn "Error attempting to disconnect from peer", peer = peer, error = error.msg +proc excludeRelays*(self: BlockExcNetwork, peers: openArray[PeerId]) = + for p in peers: + self.excludedPeers.incl(p) + proc handlePeerJoined*( self: BlockExcNetwork, peer: PeerId ) {.async: (raises: [CancelledError]).} = + if peer in self.excludedPeers: + return discard self.getOrCreatePeer(peer) if not self.handlers.onPeerJoined.isNil: await self.handlers.onPeerJoined(peer) @@ -271,6 +279,8 @@ proc handlePeerDeparted*( ## Cleanup disconnected peer ## + if peer in self.excludedPeers: + return trace "Cleaning up departed peer", peer self.peers.del(peer) if not self.handlers.onPeerDeparted.isNil: diff --git a/storage/conf.nim b/storage/conf.nim index 2937bad4..fa6dc7e2 100644 --- a/storage/conf.nim +++ b/storage/conf.nim @@ -200,6 +200,25 @@ type defaultValue: DefaultNetworkPreset .}: NetworkPreset + dhtMixProxies* {. + desc: "Peers used as dht-proxy destinations when Mix is enabled", + name: "dht-mix-proxy" + .}: seq[SignedPeerRecord] + + mixEnabled* {. + desc: + "Route DHT provider lookups through the Mix protocol via the " & + "dht-mix-proxy. Hides the requester's identity from the proxy.", + defaultValue: false, + name: "mix-enabled" + .}: bool + + mixPoolDir* {. + desc: "Path to the Mix relay pool (expects `pubInfo/mixNode_` files inside)", + defaultValue: "", + name: "mix-pool-dir" + .}: string + maxPeers* {. desc: "The maximum number of peers to connect to", defaultValue: 160, diff --git a/storage/dht_proxy/client.nim b/storage/dht_proxy/client.nim new file mode 100644 index 00000000..7455801e --- /dev/null +++ b/storage/dht_proxy/client.nim @@ -0,0 +1,119 @@ +## Logos Storage +## Copyright (c) 2026 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/sequtils +import std/strutils + +import pkg/chronos +import pkg/libp2p +import pkg/libp2p/cid +import pkg/libp2p/routing_record +import pkg/libp2p/protocols/mix + +import ../errors +import ../logutils +import ../utils/mixidentity +import ./protocol + +const DefaultLookupTimeout* = 30.seconds + +logScope: + topics = "storage dht-proxy client" + +type LookupResult = object + status: ResponseStatus + errorKind: ErrorKind + providers: seq[SignedPeerRecord] + +proc requestLookup( + conn: Connection, request: LookupRequest +): Future[?!LookupResult] {.async: (raises: [CancelledError]).} = + try: + let encoded = request.encode() + if encoded.len > MaxLookupRequestBytes: + return failure( + "Request exceeds " & $MaxLookupRequestBytes & " bytes (got " & $encoded.len & ")" + ) + await conn.writeLp(encoded) + + let + respBytes = await conn.readLp(MaxLookupResponseBytes) + resp = LookupResponse.decode(respBytes).valueOr: + return + failure("Failed to decode response (bytes=" & $respBytes.len & "): " & $error) + + var providers = newSeqOfCap[SignedPeerRecord](resp.providers.len) + for sprBytes in resp.providers: + let res = SignedPeerRecord.decode(sprBytes) + if res.isOk: + providers.add(res.get) + else: + warn "Failed to decode SignedPeerRecord from response", err = $res.error + + return success LookupResult( + status: resp.status, errorKind: resp.errorKind, providers: providers + ) + except LPStreamError as exc: + return failure("Stream error: " & exc.msg) + except CatchableError as exc: + return failure("Client error: " & exc.msg) + +proc lookupProviders*( + mixProto: MixProtocol, proxy: PeerRecord, cid: Cid +): Future[?!seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = + if proxy.addresses.len == 0: + return failure("Proxy has no addresses") + + let mixAddr = pickMixCompatibleMultiAddr(proxy.addresses.mapIt(it.address)).valueOr: + let dump = proxy.addresses.mapIt($it.address).join(",") + return failure( + "No Mix-compatible address on proxy " & $proxy.peerId & " (advertised: [" & dump & + "])" + ) + + let + destination = MixDestination.init(proxy.peerId, mixAddr) + request = + LookupRequest(queryType: QueryType.FindProviders, queryBytes: cid.data.buffer) + + var conn: Connection + try: + conn = mixProto.toConnection( + destination, + DhtProxyCodec, + MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(1'u8)), + ).valueOr: + return failure("Failed to obtain Mix connection: " & error) + + let lookupFut = requestLookup(conn, request) + if not (await lookupFut.withTimeout(DefaultLookupTimeout)): + lookupFut.cancelSoon() + return failure("Mix lookup timed out after " & $DefaultLookupTimeout) + + let lookupRes = lookupFut.read() + if lookupRes.isErr: + return failure(lookupRes.error) + let lookup = lookupRes.get() + + case lookup.status + of ResponseStatus.Ok: + return success lookup.providers + of ResponseStatus.NotFound: + return success newSeq[SignedPeerRecord]() + of ResponseStatus.Error: + return failure("Remote returned error: " & $lookup.errorKind) + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure("Mix lookup failed: " & exc.msg) + finally: + if not conn.isNil: + await noCancel conn.close() diff --git a/storage/dht_proxy/handler.nim b/storage/dht_proxy/handler.nim new file mode 100644 index 00000000..cf6356a1 --- /dev/null +++ b/storage/dht_proxy/handler.nim @@ -0,0 +1,100 @@ +## Logos Storage +## Copyright (c) 2026 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/chronos +import pkg/libp2p +import pkg/libp2p/cid +import pkg/libp2p/routing_record + +import ../discovery +import ../logutils +import ./protocol + +export protocol + +logScope: + topics = "storage dht-proxy server" + +type DhtProxyProtocol* = ref object of LPProtocol + discovery*: Discovery + +proc handleFindProviders( + self: DhtProxyProtocol, queryBytes: seq[byte] +): Future[LookupResponse] {.async: (raises: [CancelledError]).} = + let + cid = Cid.init(queryBytes).valueOr: + warn "Invalid CID in lookup request" + return + LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.InvalidCid) + providers = (await self.discovery.findDirect(cid)).valueOr: + warn "Direct lookup failed", cid, err = error.msg + return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal) + + if providers.len == 0: + return LookupResponse(status: ResponseStatus.NotFound) + + var encoded = newSeqOfCap[seq[byte]](providers.len) + for spr in providers: + let bytes = spr.encode().valueOr: + warn "Failed to encode SignedPeerRecord", err = error + continue + encoded.add(bytes) + + if encoded.len == 0: + return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal) + + let packed = packProviders(encoded, MaxLookupResponseBytes).valueOr: + return LookupResponse(status: ResponseStatus.Error, errorKind: error) + + LookupResponse(status: ResponseStatus.Ok, providers: packed) + +proc handleLookupRequest( + self: DhtProxyProtocol, conn: Connection +) {.async: (raises: [CancelledError]).} = + try: + let + reqBytes = await conn.readLp(MaxLookupRequestBytes) + req = LookupRequest.decode(reqBytes).valueOr: + warn "Failed to decode lookup request" + await conn.writeLp( + LookupResponse( + status: ResponseStatus.Error, errorKind: ErrorKind.DecodeFailed + ).encode() + ) + return + + let resp = + case req.queryType + of FindProviders: + await self.handleFindProviders(req.queryBytes) + + await conn.writeLp(resp.encode()) + except CancelledError as exc: + raise exc + except LPStreamError as exc: + warn "Stream error", err = exc.msg + except CatchableError as exc: + warn "Handler error", err = exc.msg + +proc new*(T: type DhtProxyProtocol, discovery: Discovery): DhtProxyProtocol = + let self = DhtProxyProtocol(discovery: discovery) + + proc handler( + conn: Connection, proto: string + ): Future[void] {.async: (raises: [CancelledError]).} = + try: + await self.handleLookupRequest(conn) + finally: + await noCancel conn.close() + + self.handler = handler + self.codec = DhtProxyCodec + return self diff --git a/storage/dht_proxy/protocol.nim b/storage/dht_proxy/protocol.nim new file mode 100644 index 00000000..f4ea2734 --- /dev/null +++ b/storage/dht_proxy/protocol.nim @@ -0,0 +1,130 @@ +## Logos Storage +## Copyright (c) 2026 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/protobuf/minprotobuf +import pkg/libp2p/protocols/mix +import pkg/libp2p/routing_record + +import ../logutils + +const DhtProxyCodec* = "/storage/dht-proxy/1.0.0" + +let MaxLookupRequestBytes* = getMaxMessageSizeForCodec(DhtProxyCodec, 1).expect( + "DhtProxyCodec framing leaves no room for a Sphinx forward payload" + ) + +let MaxLookupResponseBytes* = getMaxMessageSizeForCodec(DhtProxyCodec, 0).expect( + "DhtProxyCodec framing leaves no room for a Sphinx reply payload" + ) + +type + QueryType* {.pure.} = enum + FindProviders = 0 + + ResponseStatus* {.pure.} = enum + Ok = 0 + NotFound = 1 + Error = 2 + + ErrorKind* {.pure.} = enum + DecodeFailed = 0 + InvalidCid = 1 + Internal = 2 + ResponseTooLarge = 3 + + LookupRequest* = object + queryType*: QueryType + queryBytes*: seq[byte] + + LookupResponse* = object + status*: ResponseStatus + errorKind*: ErrorKind + providers*: seq[seq[byte]] + +proc encode*(req: LookupRequest): seq[byte] = + var pb = initProtoBuffer() + pb.write(1, req.queryType.uint32) + pb.write(2, req.queryBytes) + pb.finish() + pb.buffer + +proc encode*(resp: LookupResponse): seq[byte] = + var pb = initProtoBuffer() + pb.write(1, resp.status.uint32) + if resp.status == ResponseStatus.Error: + pb.write(2, resp.errorKind.uint32) + for spr in resp.providers: + pb.write(3, spr) + pb.finish() + pb.buffer + +proc decode*(_: type LookupRequest, data: openArray[byte]): ProtoResult[LookupRequest] = + let pb = initProtoBuffer(data) + var + req = LookupRequest() + qt: uint32 + + if ?pb.getField(1, qt): + if qt > QueryType.high.uint32: + return err(ProtoError.IncorrectBlob) + req.queryType = QueryType(qt) + + discard ?pb.getField(2, req.queryBytes) + ok(req) + +proc decode*( + _: type LookupResponse, data: openArray[byte] +): ProtoResult[LookupResponse] = + let pb = initProtoBuffer(data) + var + resp = LookupResponse() + status: uint32 + + if ?pb.getField(1, status): + if status > ResponseStatus.high.uint32: + return err(ProtoError.IncorrectBlob) + resp.status = ResponseStatus(status) + + if resp.status == ResponseStatus.Error: + var ek: uint32 + if ?pb.getField(2, ek): + if ek > ErrorKind.high.uint32: + return err(ProtoError.IncorrectBlob) + resp.errorKind = ErrorKind(ek) + + discard ?pb.getRepeatedField(3, resp.providers) + + ok(resp) + +proc packProviders*( + providers: seq[seq[byte]], budget_bytes: int +): Result[seq[seq[byte]], ErrorKind] = + if providers.len == 0: + error "packProviders called with no providers" + return err(ErrorKind.Internal) + + let single = LookupResponse(status: ResponseStatus.Ok, providers: providers[0 ..< 1]) + if single.encode().len > budget_bytes: + return err(ErrorKind.ResponseTooLarge) + + var + lo = 1 + hi = providers.len + while lo < hi: + let + mid = (lo + hi + 1) div 2 + test = LookupResponse(status: ResponseStatus.Ok, providers: providers[0 ..< mid]) + if test.encode().len <= budget_bytes: + lo = mid + else: + hi = mid - 1 + + ok(providers[0 ..< lo]) diff --git a/storage/discovery.nim b/storage/discovery.nim index c5943d88..f50f7a51 100644 --- a/storage/discovery.nim +++ b/storage/discovery.nim @@ -11,10 +11,12 @@ import std/algorithm import std/net +import std/random import std/sequtils import pkg/chronos import pkg/libp2p/[cid, multicodec, routing_record, signed_envelope] +import pkg/libp2p/protocols/mix import pkg/questionable import pkg/questionable/results import pkg/contractabi/address as ca @@ -24,6 +26,7 @@ from pkg/nimcrypto import keccak256 import ./rng as storage_rng import ./errors import ./logutils +import ./dht_proxy/client as dht_proxy_client export discv5 @@ -45,6 +48,8 @@ type Discovery* = ref object of RootObj dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information isStarted: bool store: Datastore + mixProto*: MixProtocol + dhtMixProxies*: seq[SignedPeerRecord] proc toNodeId*(cid: Cid): NodeId = ## Cid to discovery id @@ -81,23 +86,45 @@ proc findPeer*( return PeerRecord.none +proc findViaMix( + d: Discovery, cid: Cid +): Future[?!seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = + var candidates = d.dhtMixProxies + shuffle(candidates) + + for record in candidates: + let proxy = record.data + let res = await dht_proxy_client.lookupProviders(d.mixProto, proxy, cid) + if res.isErr: + warn "Mix lookup proxy failed", cid, proxy = proxy.peerId, err = res.error.msg + continue + return success res.get + + failure("All Mix lookup proxies failed (candidates=" & $candidates.len & ")") + +proc findDirect*( + d: Discovery, cid: Cid +): Future[?!seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = + try: + return (await d.protocol.getProviders(cid.toNodeId())).mapFailure + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure("Error finding providers for block " & $cid & ": " & exc.msg) + method find*( d: Discovery, cid: Cid ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]), base.} = - ## Find block providers - ## - - try: - without providers =? (await d.protocol.getProviders(cid.toNodeId())).mapFailure, - error: - warn "Error finding providers for block", cid, error = error.msg - - return providers.filterIt(not (it.data.peerId == d.peerId)) - except CancelledError as exc: - warn "Error finding providers for block", cid, exc = exc.msg - raise exc - except CatchableError as exc: - warn "Error finding providers for block", cid, exc = exc.msg + let providers = + if not d.mixProto.isNil: + (await d.findViaMix(cid)).valueOr: + warn "Mix lookup failed", cid, err = error.msg + return @[] + else: + (await d.findDirect(cid)).valueOr: + warn "Direct lookup failed", cid, err = error.msg + return @[] + providers.filterIt(not (it.data.peerId == d.peerId)) method provide*(d: Discovery, cid: Cid) {.async: (raises: [CancelledError]), base.} = ## Provide a block Cid @@ -239,6 +266,7 @@ proc new*( bindPort = 0.Port, announceAddrs: openArray[MultiAddress], bootstrapNodes: openArray[SignedPeerRecord] = [], + dhtMixProxies: openArray[SignedPeerRecord] = [], store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"), tableIpLimits: TableIpLimits = DefaultTableIpLimits, ): Discovery = @@ -246,7 +274,10 @@ proc new*( ## var self = Discovery( - key: key, peerId: PeerId.init(key).expect("Should construct PeerId"), store: store + key: key, + peerId: PeerId.init(key).expect("Should construct PeerId"), + store: store, + dhtMixProxies: @dhtMixProxies, ) self.updateAnnounceRecord(announceAddrs) diff --git a/storage/rest/api.nim b/storage/rest/api.nim index 865591fc..d7fc0e09 100644 --- a/storage/rest/api.nim +++ b/storage/rest/api.nim @@ -574,6 +574,11 @@ proc initDebugApi(node: StorageNodeRef, conf: StorageConf, router: var RestRoute "repo": $conf.dataDir, "spr": if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "", + "announceSpr": + if node.discovery.providerRecord.isSome: + node.discovery.providerRecord.get.toURI + else: + "", "announceAddresses": node.discovery.announceAddrs, "table": table, "storage": {"version": $storageVersion, "revision": $storageRevision}, diff --git a/storage/storage.nim b/storage/storage.nim index 678f5f87..ba72de2c 100644 --- a/storage/storage.nim +++ b/storage/storage.nim @@ -17,6 +17,7 @@ import pkg/chronos import pkg/taskpools import pkg/presto import pkg/libp2p +import pkg/libp2p/protocols/mix import pkg/confutils import pkg/confutils/defs import pkg/stew/io2 @@ -30,7 +31,9 @@ import ./rng as random import ./rest/api import ./stores import ./blockexchange +import ./dht_proxy/handler import ./utils/fileutils +import ./utils/mixidentity import ./discovery import ./utils/addrutils import ./utils/natutils @@ -76,6 +79,37 @@ proc start*(s: StorageServer) {.async.} = await s.storageNode.switch.start() + if s.config.mixEnabled: + let + switch = s.storageNode.switch + (mixPub, mixPriv) = loadOrGenerateMixKeys( + string(s.config.dataDir) / "mix-identity" + ).valueOr: + raise newException( + StorageError, "Failed to load or generate Mix keys: " & error.msg + ) + mixAddr = pickMixCompatibleMultiAddr(switch.peerInfo.addrs).valueOr: + raise newException(StorageError, "No Mix-compatible address among listen addrs") + mixNodeInfo = buildMixNodeInfo( + mixPub, mixPriv, switch.peerInfo.peerId, mixAddr, switch.peerInfo.privateKey + ).valueOr: + raise newException(StorageError, "Failed to build Mix node info: " & error.msg) + relayPool = loadRelayPubInfoTable(s.config.mixPoolDir).valueOr: + raise newException(StorageError, "Failed to load Mix relay pool: " & error.msg) + mixProto = MixProtocol.new(mixNodeInfo, relayPool, switch) + + mixProto.registerDestReadBehavior(DhtProxyCodec, mix.readLp(MaxLookupResponseBytes)) + await mixProto.start() + switch.mount(mixProto) + + let dhtProxyProto = DhtProxyProtocol.new(s.storageNode.discovery) + await dhtProxyProto.start() + switch.mount(dhtProxyProto) + + s.storageNode.discovery.mixProto = mixProto + + s.storageNode.engine.network.excludeRelays(relayPool.keys.toSeq) + let (announceAddrs, discoveryAddrs) = nattedAddress( s.config.nat, s.storageNode.switch.peerInfo.addrs, s.config.discoveryPort ) @@ -239,6 +273,7 @@ proc new*( announceAddrs = @[listenMultiAddr], bindPort = config.discoveryPort, bootstrapNodes = bootstrapNodes, + dhtMixProxies = config.dhtMixProxies, store = discoveryStore, ) diff --git a/storage/utils/mixidentity.nim b/storage/utils/mixidentity.nim new file mode 100644 index 00000000..c8134137 --- /dev/null +++ b/storage/utils/mixidentity.nim @@ -0,0 +1,132 @@ +## Logos Storage +## Copyright (c) 2026 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/[os, tables] + +import pkg/libp2p +import pkg/libp2p/crypto/crypto +import pkg/libp2p/protocols/mix +import pkg/libp2p/protocols/mix/[curve25519, mix_node] +import pkg/questionable/results +import pkg/stew/byteutils + +import ../errors + +const MixIdentityFileSize = 2 * FieldElementSize + +proc pickMixCompatibleMultiAddr*(addrs: openArray[MultiAddress]): Opt[MultiAddress] = + ## Mix only supports /ip4/*/tcp/* or /ip4/*/udp/*/quic-v1 multiaddrs. + for ma in addrs: + if TCP_IP.match(ma) or QUIC_V1_IP.match(ma): + return Opt.some(ma) + Opt.none(MultiAddress) + +proc loadOrGenerateMixKeys*( + path: string +): ?!tuple[mixPub: FieldElement, mixPriv: FieldElement] = + if fileExists(path): + let raw = + try: + readFile(path) + except IOError as exc: + return failure("Failed to read mix-identity from " & path & ": " & exc.msg) + + if raw.len != MixIdentityFileSize: + return failure( + "Invalid mix-identity file size at " & path & " (expected " & + $MixIdentityFileSize & ", got " & $raw.len & ")" + ) + + let + pub = bytesToFieldElement(raw.toOpenArrayByte(0, FieldElementSize - 1)).valueOr: + return failure("Bad mix pub key in " & path & ": " & error) + priv = bytesToFieldElement( + raw.toOpenArrayByte(FieldElementSize, 2 * FieldElementSize - 1) + ).valueOr: + return failure("Bad mix priv key in " & path & ": " & error) + return success((mixPub: pub, mixPriv: priv)) + + let (priv, pub) = generateKeyPair().valueOr: + return failure("Failed to generate Mix keypair: " & error) + + let dir = parentDir(path) + if dir.len > 0 and not dirExists(dir): + try: + createDir(dir) + except OSError as exc: + return failure("Failed to create directory " & dir & ": " & exc.msg) + except IOError as exc: + return failure("Failed to create directory " & dir & ": " & exc.msg) + + let blob = fieldElementToBytes(pub) & fieldElementToBytes(priv) + + try: + writeFile(path, string.fromBytes(blob)) + setFilePermissions(path, {fpUserRead, fpUserWrite}) + except IOError as exc: + return failure("Failed to write mix-identity to " & path & ": " & exc.msg) + except OSError as exc: + return failure("Failed to set permissions on " & path & ": " & exc.msg) + + success((mixPub: pub, mixPriv: priv)) + +proc buildMixNodeInfo*( + mixPub, mixPriv: FieldElement, + peerId: PeerId, + multiAddr: MultiAddress, + libp2pPriv: PrivateKey, +): ?!MixNodeInfo = + if libp2pPriv.scheme != Secp256k1: + return failure("Mix requires a Secp256k1 libp2p key; got " & $libp2pPriv.scheme) + + let libp2pPub = libp2pPriv.getPublicKey().valueOr: + return failure("Failed to derive libp2p pub key: " & $error) + + if libp2pPub.scheme != Secp256k1: + return failure("Unexpected libp2p pub key scheme: " & $libp2pPub.scheme) + + success initMixNodeInfo( + peerId = peerId, + multiAddr = multiAddr, + mixPubKey = mixPub, + mixPrivKey = mixPriv, + libp2pPubKey = libp2pPub.skkey, + libp2pPrivKey = libp2pPriv.skkey, + ) + +proc loadRelayPubInfoTable*(mixPoolDir: string): ?!Table[PeerId, MixPubInfo] = + let pubInfoDir = mixPoolDir / "pubInfo" + if not dirExists(pubInfoDir): + return failure("Relay pubInfo directory does not exist: " & pubInfoDir) + + var + t = initTable[PeerId, MixPubInfo]() + i = 0 + while true: + let entry = + try: + MixPubInfo.readFromFile(i, pubInfoDir) + except IOError as exc: + return failure("I/O error reading pubInfo at index " & $i & ": " & exc.msg) + except OSError as exc: + return failure("OS error reading pubInfo at index " & $i & ": " & exc.msg) + if entry.isErr: + break + let info = entry.get() + t[info.peerId] = info + inc i + + if t.len == 0: + return failure("No relay entries found in " & pubInfoDir) + + success t + +{.pop.}