mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-06-28 13:29:28 +00:00
feat: DHT queries via Mix
Part of https://github.com/logos-storage/logos-storage-pm/issues/13 Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
parent
8f9eceaa19
commit
ca89ebd935
@ -114,7 +114,7 @@ when (NimMajor, NimMinor, NimPatch) >= (1, 6, 11):
|
||||
"BareExcept:off"
|
||||
when (NimMajor, NimMinor) >= (2, 0):
|
||||
--mm:
|
||||
orc
|
||||
refc
|
||||
|
||||
switch("define", "withoutPCRE")
|
||||
|
||||
|
||||
@ -65,6 +65,14 @@ when isMainModule:
|
||||
echo "Invalid value for --log-level. " & err.msg
|
||||
quit QuitFailure
|
||||
|
||||
if config.mixEnabled:
|
||||
if config.mixPoolDir.len == 0:
|
||||
fatal "mix-enabled requires --mix-pool-dir"
|
||||
quit QuitFailure
|
||||
if config.bootstrapNodes.len > 0 and config.dhtMixProxies.len == 0:
|
||||
fatal "mix-enabled requires at least one --dht-mix-proxy"
|
||||
quit QuitFailure
|
||||
|
||||
if err =? config.setupMetrics().errorOption:
|
||||
fatal "Failed to start metrics server", err = err.msg
|
||||
quit QuitFailure
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
|
||||
import std/tables
|
||||
import std/sequtils
|
||||
import std/sets
|
||||
|
||||
import pkg/chronos
|
||||
|
||||
@ -72,6 +73,7 @@ type
|
||||
|
||||
BlockExcNetwork* = ref object of LPProtocol
|
||||
peers*: Table[PeerId, NetworkPeer]
|
||||
excludedPeers: HashSet[PeerId]
|
||||
switch*: Switch
|
||||
handlers*: BlockExcHandlers
|
||||
request*: BlockExcRequest
|
||||
@ -258,9 +260,15 @@ proc dropPeer*(
|
||||
except CatchableError as error:
|
||||
warn "Error attempting to disconnect from peer", peer = peer, error = error.msg
|
||||
|
||||
proc excludeRelays*(self: BlockExcNetwork, peers: openArray[PeerId]) =
|
||||
for p in peers:
|
||||
self.excludedPeers.incl(p)
|
||||
|
||||
proc handlePeerJoined*(
|
||||
self: BlockExcNetwork, peer: PeerId
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if peer in self.excludedPeers:
|
||||
return
|
||||
discard self.getOrCreatePeer(peer)
|
||||
if not self.handlers.onPeerJoined.isNil:
|
||||
await self.handlers.onPeerJoined(peer)
|
||||
@ -271,6 +279,8 @@ proc handlePeerDeparted*(
|
||||
## Cleanup disconnected peer
|
||||
##
|
||||
|
||||
if peer in self.excludedPeers:
|
||||
return
|
||||
trace "Cleaning up departed peer", peer
|
||||
self.peers.del(peer)
|
||||
if not self.handlers.onPeerDeparted.isNil:
|
||||
|
||||
@ -200,6 +200,25 @@ type
|
||||
defaultValue: DefaultNetworkPreset
|
||||
.}: NetworkPreset
|
||||
|
||||
dhtMixProxies* {.
|
||||
desc: "Peers used as dht-proxy destinations when Mix is enabled",
|
||||
name: "dht-mix-proxy"
|
||||
.}: seq[SignedPeerRecord]
|
||||
|
||||
mixEnabled* {.
|
||||
desc:
|
||||
"Route DHT provider lookups through the Mix protocol via the " &
|
||||
"dht-mix-proxy. Hides the requester's identity from the proxy.",
|
||||
defaultValue: false,
|
||||
name: "mix-enabled"
|
||||
.}: bool
|
||||
|
||||
mixPoolDir* {.
|
||||
desc: "Path to the Mix relay pool (expects `pubInfo/mixNode_<i>` files inside)",
|
||||
defaultValue: "",
|
||||
name: "mix-pool-dir"
|
||||
.}: string
|
||||
|
||||
maxPeers* {.
|
||||
desc: "The maximum number of peers to connect to",
|
||||
defaultValue: 160,
|
||||
|
||||
119
storage/dht_proxy/client.nim
Normal file
119
storage/dht_proxy/client.nim
Normal file
@ -0,0 +1,119 @@
|
||||
## Logos Storage
|
||||
## Copyright (c) 2026 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import std/strutils
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/cid
|
||||
import pkg/libp2p/routing_record
|
||||
import pkg/libp2p/protocols/mix
|
||||
|
||||
import ../errors
|
||||
import ../logutils
|
||||
import ../utils/mixidentity
|
||||
import ./protocol
|
||||
|
||||
const DefaultLookupTimeout* = 30.seconds
|
||||
|
||||
logScope:
|
||||
topics = "storage dht-proxy client"
|
||||
|
||||
type LookupResult = object
|
||||
status: ResponseStatus
|
||||
errorKind: ErrorKind
|
||||
providers: seq[SignedPeerRecord]
|
||||
|
||||
proc requestLookup(
|
||||
conn: Connection, request: LookupRequest
|
||||
): Future[?!LookupResult] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
let encoded = request.encode()
|
||||
if encoded.len > MaxLookupRequestBytes:
|
||||
return failure(
|
||||
"Request exceeds " & $MaxLookupRequestBytes & " bytes (got " & $encoded.len & ")"
|
||||
)
|
||||
await conn.writeLp(encoded)
|
||||
|
||||
let
|
||||
respBytes = await conn.readLp(MaxLookupResponseBytes)
|
||||
resp = LookupResponse.decode(respBytes).valueOr:
|
||||
return
|
||||
failure("Failed to decode response (bytes=" & $respBytes.len & "): " & $error)
|
||||
|
||||
var providers = newSeqOfCap[SignedPeerRecord](resp.providers.len)
|
||||
for sprBytes in resp.providers:
|
||||
let res = SignedPeerRecord.decode(sprBytes)
|
||||
if res.isOk:
|
||||
providers.add(res.get)
|
||||
else:
|
||||
warn "Failed to decode SignedPeerRecord from response", err = $res.error
|
||||
|
||||
return success LookupResult(
|
||||
status: resp.status, errorKind: resp.errorKind, providers: providers
|
||||
)
|
||||
except LPStreamError as exc:
|
||||
return failure("Stream error: " & exc.msg)
|
||||
except CatchableError as exc:
|
||||
return failure("Client error: " & exc.msg)
|
||||
|
||||
proc lookupProviders*(
|
||||
mixProto: MixProtocol, proxy: PeerRecord, cid: Cid
|
||||
): Future[?!seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
|
||||
if proxy.addresses.len == 0:
|
||||
return failure("Proxy has no addresses")
|
||||
|
||||
let mixAddr = pickMixCompatibleMultiAddr(proxy.addresses.mapIt(it.address)).valueOr:
|
||||
let dump = proxy.addresses.mapIt($it.address).join(",")
|
||||
return failure(
|
||||
"No Mix-compatible address on proxy " & $proxy.peerId & " (advertised: [" & dump &
|
||||
"])"
|
||||
)
|
||||
|
||||
let
|
||||
destination = MixDestination.init(proxy.peerId, mixAddr)
|
||||
request =
|
||||
LookupRequest(queryType: QueryType.FindProviders, queryBytes: cid.data.buffer)
|
||||
|
||||
var conn: Connection
|
||||
try:
|
||||
conn = mixProto.toConnection(
|
||||
destination,
|
||||
DhtProxyCodec,
|
||||
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(1'u8)),
|
||||
).valueOr:
|
||||
return failure("Failed to obtain Mix connection: " & error)
|
||||
|
||||
let lookupFut = requestLookup(conn, request)
|
||||
if not (await lookupFut.withTimeout(DefaultLookupTimeout)):
|
||||
lookupFut.cancelSoon()
|
||||
return failure("Mix lookup timed out after " & $DefaultLookupTimeout)
|
||||
|
||||
let lookupRes = lookupFut.read()
|
||||
if lookupRes.isErr:
|
||||
return failure(lookupRes.error)
|
||||
let lookup = lookupRes.get()
|
||||
|
||||
case lookup.status
|
||||
of ResponseStatus.Ok:
|
||||
return success lookup.providers
|
||||
of ResponseStatus.NotFound:
|
||||
return success newSeq[SignedPeerRecord]()
|
||||
of ResponseStatus.Error:
|
||||
return failure("Remote returned error: " & $lookup.errorKind)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
return failure("Mix lookup failed: " & exc.msg)
|
||||
finally:
|
||||
if not conn.isNil:
|
||||
await noCancel conn.close()
|
||||
100
storage/dht_proxy/handler.nim
Normal file
100
storage/dht_proxy/handler.nim
Normal file
@ -0,0 +1,100 @@
|
||||
## Logos Storage
|
||||
## Copyright (c) 2026 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/cid
|
||||
import pkg/libp2p/routing_record
|
||||
|
||||
import ../discovery
|
||||
import ../logutils
|
||||
import ./protocol
|
||||
|
||||
export protocol
|
||||
|
||||
logScope:
|
||||
topics = "storage dht-proxy server"
|
||||
|
||||
type DhtProxyProtocol* = ref object of LPProtocol
|
||||
discovery*: Discovery
|
||||
|
||||
proc handleFindProviders(
|
||||
self: DhtProxyProtocol, queryBytes: seq[byte]
|
||||
): Future[LookupResponse] {.async: (raises: [CancelledError]).} =
|
||||
let
|
||||
cid = Cid.init(queryBytes).valueOr:
|
||||
warn "Invalid CID in lookup request"
|
||||
return
|
||||
LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.InvalidCid)
|
||||
providers = (await self.discovery.findDirect(cid)).valueOr:
|
||||
warn "Direct lookup failed", cid, err = error.msg
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal)
|
||||
|
||||
if providers.len == 0:
|
||||
return LookupResponse(status: ResponseStatus.NotFound)
|
||||
|
||||
var encoded = newSeqOfCap[seq[byte]](providers.len)
|
||||
for spr in providers:
|
||||
let bytes = spr.encode().valueOr:
|
||||
warn "Failed to encode SignedPeerRecord", err = error
|
||||
continue
|
||||
encoded.add(bytes)
|
||||
|
||||
if encoded.len == 0:
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal)
|
||||
|
||||
let packed = packProviders(encoded, MaxLookupResponseBytes).valueOr:
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: error)
|
||||
|
||||
LookupResponse(status: ResponseStatus.Ok, providers: packed)
|
||||
|
||||
proc handleLookupRequest(
|
||||
self: DhtProxyProtocol, conn: Connection
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
let
|
||||
reqBytes = await conn.readLp(MaxLookupRequestBytes)
|
||||
req = LookupRequest.decode(reqBytes).valueOr:
|
||||
warn "Failed to decode lookup request"
|
||||
await conn.writeLp(
|
||||
LookupResponse(
|
||||
status: ResponseStatus.Error, errorKind: ErrorKind.DecodeFailed
|
||||
).encode()
|
||||
)
|
||||
return
|
||||
|
||||
let resp =
|
||||
case req.queryType
|
||||
of FindProviders:
|
||||
await self.handleFindProviders(req.queryBytes)
|
||||
|
||||
await conn.writeLp(resp.encode())
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except LPStreamError as exc:
|
||||
warn "Stream error", err = exc.msg
|
||||
except CatchableError as exc:
|
||||
warn "Handler error", err = exc.msg
|
||||
|
||||
proc new*(T: type DhtProxyProtocol, discovery: Discovery): DhtProxyProtocol =
|
||||
let self = DhtProxyProtocol(discovery: discovery)
|
||||
|
||||
proc handler(
|
||||
conn: Connection, proto: string
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
await self.handleLookupRequest(conn)
|
||||
finally:
|
||||
await noCancel conn.close()
|
||||
|
||||
self.handler = handler
|
||||
self.codec = DhtProxyCodec
|
||||
return self
|
||||
130
storage/dht_proxy/protocol.nim
Normal file
130
storage/dht_proxy/protocol.nim
Normal file
@ -0,0 +1,130 @@
|
||||
## Logos Storage
|
||||
## Copyright (c) 2026 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import pkg/libp2p/protobuf/minprotobuf
|
||||
import pkg/libp2p/protocols/mix
|
||||
import pkg/libp2p/routing_record
|
||||
|
||||
import ../logutils
|
||||
|
||||
const DhtProxyCodec* = "/storage/dht-proxy/1.0.0"
|
||||
|
||||
let MaxLookupRequestBytes* = getMaxMessageSizeForCodec(DhtProxyCodec, 1).expect(
|
||||
"DhtProxyCodec framing leaves no room for a Sphinx forward payload"
|
||||
)
|
||||
|
||||
let MaxLookupResponseBytes* = getMaxMessageSizeForCodec(DhtProxyCodec, 0).expect(
|
||||
"DhtProxyCodec framing leaves no room for a Sphinx reply payload"
|
||||
)
|
||||
|
||||
type
|
||||
QueryType* {.pure.} = enum
|
||||
FindProviders = 0
|
||||
|
||||
ResponseStatus* {.pure.} = enum
|
||||
Ok = 0
|
||||
NotFound = 1
|
||||
Error = 2
|
||||
|
||||
ErrorKind* {.pure.} = enum
|
||||
DecodeFailed = 0
|
||||
InvalidCid = 1
|
||||
Internal = 2
|
||||
ResponseTooLarge = 3
|
||||
|
||||
LookupRequest* = object
|
||||
queryType*: QueryType
|
||||
queryBytes*: seq[byte]
|
||||
|
||||
LookupResponse* = object
|
||||
status*: ResponseStatus
|
||||
errorKind*: ErrorKind
|
||||
providers*: seq[seq[byte]]
|
||||
|
||||
proc encode*(req: LookupRequest): seq[byte] =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write(1, req.queryType.uint32)
|
||||
pb.write(2, req.queryBytes)
|
||||
pb.finish()
|
||||
pb.buffer
|
||||
|
||||
proc encode*(resp: LookupResponse): seq[byte] =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write(1, resp.status.uint32)
|
||||
if resp.status == ResponseStatus.Error:
|
||||
pb.write(2, resp.errorKind.uint32)
|
||||
for spr in resp.providers:
|
||||
pb.write(3, spr)
|
||||
pb.finish()
|
||||
pb.buffer
|
||||
|
||||
proc decode*(_: type LookupRequest, data: openArray[byte]): ProtoResult[LookupRequest] =
|
||||
let pb = initProtoBuffer(data)
|
||||
var
|
||||
req = LookupRequest()
|
||||
qt: uint32
|
||||
|
||||
if ?pb.getField(1, qt):
|
||||
if qt > QueryType.high.uint32:
|
||||
return err(ProtoError.IncorrectBlob)
|
||||
req.queryType = QueryType(qt)
|
||||
|
||||
discard ?pb.getField(2, req.queryBytes)
|
||||
ok(req)
|
||||
|
||||
proc decode*(
|
||||
_: type LookupResponse, data: openArray[byte]
|
||||
): ProtoResult[LookupResponse] =
|
||||
let pb = initProtoBuffer(data)
|
||||
var
|
||||
resp = LookupResponse()
|
||||
status: uint32
|
||||
|
||||
if ?pb.getField(1, status):
|
||||
if status > ResponseStatus.high.uint32:
|
||||
return err(ProtoError.IncorrectBlob)
|
||||
resp.status = ResponseStatus(status)
|
||||
|
||||
if resp.status == ResponseStatus.Error:
|
||||
var ek: uint32
|
||||
if ?pb.getField(2, ek):
|
||||
if ek > ErrorKind.high.uint32:
|
||||
return err(ProtoError.IncorrectBlob)
|
||||
resp.errorKind = ErrorKind(ek)
|
||||
|
||||
discard ?pb.getRepeatedField(3, resp.providers)
|
||||
|
||||
ok(resp)
|
||||
|
||||
proc packProviders*(
|
||||
providers: seq[seq[byte]], budget_bytes: int
|
||||
): Result[seq[seq[byte]], ErrorKind] =
|
||||
if providers.len == 0:
|
||||
error "packProviders called with no providers"
|
||||
return err(ErrorKind.Internal)
|
||||
|
||||
let single = LookupResponse(status: ResponseStatus.Ok, providers: providers[0 ..< 1])
|
||||
if single.encode().len > budget_bytes:
|
||||
return err(ErrorKind.ResponseTooLarge)
|
||||
|
||||
var
|
||||
lo = 1
|
||||
hi = providers.len
|
||||
while lo < hi:
|
||||
let
|
||||
mid = (lo + hi + 1) div 2
|
||||
test = LookupResponse(status: ResponseStatus.Ok, providers: providers[0 ..< mid])
|
||||
if test.encode().len <= budget_bytes:
|
||||
lo = mid
|
||||
else:
|
||||
hi = mid - 1
|
||||
|
||||
ok(providers[0 ..< lo])
|
||||
@ -11,10 +11,12 @@
|
||||
|
||||
import std/algorithm
|
||||
import std/net
|
||||
import std/random
|
||||
import std/sequtils
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/libp2p/[cid, multicodec, routing_record, signed_envelope]
|
||||
import pkg/libp2p/protocols/mix
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/contractabi/address as ca
|
||||
@ -24,6 +26,7 @@ from pkg/nimcrypto import keccak256
|
||||
import ./rng as storage_rng
|
||||
import ./errors
|
||||
import ./logutils
|
||||
import ./dht_proxy/client as dht_proxy_client
|
||||
|
||||
export discv5
|
||||
|
||||
@ -45,6 +48,8 @@ type Discovery* = ref object of RootObj
|
||||
dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information
|
||||
isStarted: bool
|
||||
store: Datastore
|
||||
mixProto*: MixProtocol
|
||||
dhtMixProxies*: seq[SignedPeerRecord]
|
||||
|
||||
proc toNodeId*(cid: Cid): NodeId =
|
||||
## Cid to discovery id
|
||||
@ -81,23 +86,45 @@ proc findPeer*(
|
||||
|
||||
return PeerRecord.none
|
||||
|
||||
proc findViaMix(
|
||||
d: Discovery, cid: Cid
|
||||
): Future[?!seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
|
||||
var candidates = d.dhtMixProxies
|
||||
shuffle(candidates)
|
||||
|
||||
for record in candidates:
|
||||
let proxy = record.data
|
||||
let res = await dht_proxy_client.lookupProviders(d.mixProto, proxy, cid)
|
||||
if res.isErr:
|
||||
warn "Mix lookup proxy failed", cid, proxy = proxy.peerId, err = res.error.msg
|
||||
continue
|
||||
return success res.get
|
||||
|
||||
failure("All Mix lookup proxies failed (candidates=" & $candidates.len & ")")
|
||||
|
||||
proc findDirect*(
|
||||
d: Discovery, cid: Cid
|
||||
): Future[?!seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
return (await d.protocol.getProviders(cid.toNodeId())).mapFailure
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
return failure("Error finding providers for block " & $cid & ": " & exc.msg)
|
||||
|
||||
method find*(
|
||||
d: Discovery, cid: Cid
|
||||
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]), base.} =
|
||||
## Find block providers
|
||||
##
|
||||
|
||||
try:
|
||||
without providers =? (await d.protocol.getProviders(cid.toNodeId())).mapFailure,
|
||||
error:
|
||||
warn "Error finding providers for block", cid, error = error.msg
|
||||
|
||||
return providers.filterIt(not (it.data.peerId == d.peerId))
|
||||
except CancelledError as exc:
|
||||
warn "Error finding providers for block", cid, exc = exc.msg
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "Error finding providers for block", cid, exc = exc.msg
|
||||
let providers =
|
||||
if not d.mixProto.isNil:
|
||||
(await d.findViaMix(cid)).valueOr:
|
||||
warn "Mix lookup failed", cid, err = error.msg
|
||||
return @[]
|
||||
else:
|
||||
(await d.findDirect(cid)).valueOr:
|
||||
warn "Direct lookup failed", cid, err = error.msg
|
||||
return @[]
|
||||
providers.filterIt(not (it.data.peerId == d.peerId))
|
||||
|
||||
method provide*(d: Discovery, cid: Cid) {.async: (raises: [CancelledError]), base.} =
|
||||
## Provide a block Cid
|
||||
@ -239,6 +266,7 @@ proc new*(
|
||||
bindPort = 0.Port,
|
||||
announceAddrs: openArray[MultiAddress],
|
||||
bootstrapNodes: openArray[SignedPeerRecord] = [],
|
||||
dhtMixProxies: openArray[SignedPeerRecord] = [],
|
||||
store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"),
|
||||
tableIpLimits: TableIpLimits = DefaultTableIpLimits,
|
||||
): Discovery =
|
||||
@ -246,7 +274,10 @@ proc new*(
|
||||
##
|
||||
|
||||
var self = Discovery(
|
||||
key: key, peerId: PeerId.init(key).expect("Should construct PeerId"), store: store
|
||||
key: key,
|
||||
peerId: PeerId.init(key).expect("Should construct PeerId"),
|
||||
store: store,
|
||||
dhtMixProxies: @dhtMixProxies,
|
||||
)
|
||||
|
||||
self.updateAnnounceRecord(announceAddrs)
|
||||
|
||||
@ -574,6 +574,11 @@ proc initDebugApi(node: StorageNodeRef, conf: StorageConf, router: var RestRoute
|
||||
"repo": $conf.dataDir,
|
||||
"spr":
|
||||
if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "",
|
||||
"announceSpr":
|
||||
if node.discovery.providerRecord.isSome:
|
||||
node.discovery.providerRecord.get.toURI
|
||||
else:
|
||||
"",
|
||||
"announceAddresses": node.discovery.announceAddrs,
|
||||
"table": table,
|
||||
"storage": {"version": $storageVersion, "revision": $storageRevision},
|
||||
|
||||
@ -17,6 +17,7 @@ import pkg/chronos
|
||||
import pkg/taskpools
|
||||
import pkg/presto
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/protocols/mix
|
||||
import pkg/confutils
|
||||
import pkg/confutils/defs
|
||||
import pkg/stew/io2
|
||||
@ -30,7 +31,9 @@ import ./rng as random
|
||||
import ./rest/api
|
||||
import ./stores
|
||||
import ./blockexchange
|
||||
import ./dht_proxy/handler
|
||||
import ./utils/fileutils
|
||||
import ./utils/mixidentity
|
||||
import ./discovery
|
||||
import ./utils/addrutils
|
||||
import ./utils/natutils
|
||||
@ -76,6 +79,37 @@ proc start*(s: StorageServer) {.async.} =
|
||||
|
||||
await s.storageNode.switch.start()
|
||||
|
||||
if s.config.mixEnabled:
|
||||
let
|
||||
switch = s.storageNode.switch
|
||||
(mixPub, mixPriv) = loadOrGenerateMixKeys(
|
||||
string(s.config.dataDir) / "mix-identity"
|
||||
).valueOr:
|
||||
raise newException(
|
||||
StorageError, "Failed to load or generate Mix keys: " & error.msg
|
||||
)
|
||||
mixAddr = pickMixCompatibleMultiAddr(switch.peerInfo.addrs).valueOr:
|
||||
raise newException(StorageError, "No Mix-compatible address among listen addrs")
|
||||
mixNodeInfo = buildMixNodeInfo(
|
||||
mixPub, mixPriv, switch.peerInfo.peerId, mixAddr, switch.peerInfo.privateKey
|
||||
).valueOr:
|
||||
raise newException(StorageError, "Failed to build Mix node info: " & error.msg)
|
||||
relayPool = loadRelayPubInfoTable(s.config.mixPoolDir).valueOr:
|
||||
raise newException(StorageError, "Failed to load Mix relay pool: " & error.msg)
|
||||
mixProto = MixProtocol.new(mixNodeInfo, relayPool, switch)
|
||||
|
||||
mixProto.registerDestReadBehavior(DhtProxyCodec, mix.readLp(MaxLookupResponseBytes))
|
||||
await mixProto.start()
|
||||
switch.mount(mixProto)
|
||||
|
||||
let dhtProxyProto = DhtProxyProtocol.new(s.storageNode.discovery)
|
||||
await dhtProxyProto.start()
|
||||
switch.mount(dhtProxyProto)
|
||||
|
||||
s.storageNode.discovery.mixProto = mixProto
|
||||
|
||||
s.storageNode.engine.network.excludeRelays(relayPool.keys.toSeq)
|
||||
|
||||
let (announceAddrs, discoveryAddrs) = nattedAddress(
|
||||
s.config.nat, s.storageNode.switch.peerInfo.addrs, s.config.discoveryPort
|
||||
)
|
||||
@ -239,6 +273,7 @@ proc new*(
|
||||
announceAddrs = @[listenMultiAddr],
|
||||
bindPort = config.discoveryPort,
|
||||
bootstrapNodes = bootstrapNodes,
|
||||
dhtMixProxies = config.dhtMixProxies,
|
||||
store = discoveryStore,
|
||||
)
|
||||
|
||||
|
||||
132
storage/utils/mixidentity.nim
Normal file
132
storage/utils/mixidentity.nim
Normal file
@ -0,0 +1,132 @@
|
||||
## Logos Storage
|
||||
## Copyright (c) 2026 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[os, tables]
|
||||
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/crypto/crypto
|
||||
import pkg/libp2p/protocols/mix
|
||||
import pkg/libp2p/protocols/mix/[curve25519, mix_node]
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/byteutils
|
||||
|
||||
import ../errors
|
||||
|
||||
const MixIdentityFileSize = 2 * FieldElementSize
|
||||
|
||||
proc pickMixCompatibleMultiAddr*(addrs: openArray[MultiAddress]): Opt[MultiAddress] =
|
||||
## Mix only supports /ip4/*/tcp/* or /ip4/*/udp/*/quic-v1 multiaddrs.
|
||||
for ma in addrs:
|
||||
if TCP_IP.match(ma) or QUIC_V1_IP.match(ma):
|
||||
return Opt.some(ma)
|
||||
Opt.none(MultiAddress)
|
||||
|
||||
proc loadOrGenerateMixKeys*(
|
||||
path: string
|
||||
): ?!tuple[mixPub: FieldElement, mixPriv: FieldElement] =
|
||||
if fileExists(path):
|
||||
let raw =
|
||||
try:
|
||||
readFile(path)
|
||||
except IOError as exc:
|
||||
return failure("Failed to read mix-identity from " & path & ": " & exc.msg)
|
||||
|
||||
if raw.len != MixIdentityFileSize:
|
||||
return failure(
|
||||
"Invalid mix-identity file size at " & path & " (expected " &
|
||||
$MixIdentityFileSize & ", got " & $raw.len & ")"
|
||||
)
|
||||
|
||||
let
|
||||
pub = bytesToFieldElement(raw.toOpenArrayByte(0, FieldElementSize - 1)).valueOr:
|
||||
return failure("Bad mix pub key in " & path & ": " & error)
|
||||
priv = bytesToFieldElement(
|
||||
raw.toOpenArrayByte(FieldElementSize, 2 * FieldElementSize - 1)
|
||||
).valueOr:
|
||||
return failure("Bad mix priv key in " & path & ": " & error)
|
||||
return success((mixPub: pub, mixPriv: priv))
|
||||
|
||||
let (priv, pub) = generateKeyPair().valueOr:
|
||||
return failure("Failed to generate Mix keypair: " & error)
|
||||
|
||||
let dir = parentDir(path)
|
||||
if dir.len > 0 and not dirExists(dir):
|
||||
try:
|
||||
createDir(dir)
|
||||
except OSError as exc:
|
||||
return failure("Failed to create directory " & dir & ": " & exc.msg)
|
||||
except IOError as exc:
|
||||
return failure("Failed to create directory " & dir & ": " & exc.msg)
|
||||
|
||||
let blob = fieldElementToBytes(pub) & fieldElementToBytes(priv)
|
||||
|
||||
try:
|
||||
writeFile(path, string.fromBytes(blob))
|
||||
setFilePermissions(path, {fpUserRead, fpUserWrite})
|
||||
except IOError as exc:
|
||||
return failure("Failed to write mix-identity to " & path & ": " & exc.msg)
|
||||
except OSError as exc:
|
||||
return failure("Failed to set permissions on " & path & ": " & exc.msg)
|
||||
|
||||
success((mixPub: pub, mixPriv: priv))
|
||||
|
||||
proc buildMixNodeInfo*(
|
||||
mixPub, mixPriv: FieldElement,
|
||||
peerId: PeerId,
|
||||
multiAddr: MultiAddress,
|
||||
libp2pPriv: PrivateKey,
|
||||
): ?!MixNodeInfo =
|
||||
if libp2pPriv.scheme != Secp256k1:
|
||||
return failure("Mix requires a Secp256k1 libp2p key; got " & $libp2pPriv.scheme)
|
||||
|
||||
let libp2pPub = libp2pPriv.getPublicKey().valueOr:
|
||||
return failure("Failed to derive libp2p pub key: " & $error)
|
||||
|
||||
if libp2pPub.scheme != Secp256k1:
|
||||
return failure("Unexpected libp2p pub key scheme: " & $libp2pPub.scheme)
|
||||
|
||||
success initMixNodeInfo(
|
||||
peerId = peerId,
|
||||
multiAddr = multiAddr,
|
||||
mixPubKey = mixPub,
|
||||
mixPrivKey = mixPriv,
|
||||
libp2pPubKey = libp2pPub.skkey,
|
||||
libp2pPrivKey = libp2pPriv.skkey,
|
||||
)
|
||||
|
||||
proc loadRelayPubInfoTable*(mixPoolDir: string): ?!Table[PeerId, MixPubInfo] =
|
||||
let pubInfoDir = mixPoolDir / "pubInfo"
|
||||
if not dirExists(pubInfoDir):
|
||||
return failure("Relay pubInfo directory does not exist: " & pubInfoDir)
|
||||
|
||||
var
|
||||
t = initTable[PeerId, MixPubInfo]()
|
||||
i = 0
|
||||
while true:
|
||||
let entry =
|
||||
try:
|
||||
MixPubInfo.readFromFile(i, pubInfoDir)
|
||||
except IOError as exc:
|
||||
return failure("I/O error reading pubInfo at index " & $i & ": " & exc.msg)
|
||||
except OSError as exc:
|
||||
return failure("OS error reading pubInfo at index " & $i & ": " & exc.msg)
|
||||
if entry.isErr:
|
||||
break
|
||||
let info = entry.get()
|
||||
t[info.peerId] = info
|
||||
inc i
|
||||
|
||||
if t.len == 0:
|
||||
return failure("No relay entries found in " & pubInfoDir)
|
||||
|
||||
success t
|
||||
|
||||
{.pop.}
|
||||
Loading…
x
Reference in New Issue
Block a user