mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-04-14 12:13:11 +00:00
init discovery integration
This commit is contained in:
parent
dc026bbff1
commit
e1ec7ba57a
2
.gitmodules
vendored
2
.gitmodules
vendored
@ -12,7 +12,7 @@
|
||||
path = vendor/nim-libp2p
|
||||
url = https://github.com/vacp2p/nim-libp2p.git
|
||||
ignore = dirty
|
||||
branch = master
|
||||
branch = feat--logos-capability-discovery
|
||||
[submodule "vendor/nim-stew"]
|
||||
path = vendor/nim-stew
|
||||
url = https://github.com/status-im/nim-stew.git
|
||||
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit ff8d51857b4b79a68468e7bcc27b2026cca02996
|
||||
Subproject commit 687bbb64d48b976fd63dc4f3df4cd7f562cff586
|
||||
@ -1,6 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, sequtils]
|
||||
import std/[options, sequtils, sugar]
|
||||
import
|
||||
chronos,
|
||||
chronicles,
|
||||
@ -8,111 +8,23 @@ import
|
||||
stew/byteutils,
|
||||
libp2p/[peerid, multiaddress, switch],
|
||||
libp2p/extended_peer_record,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/[kademlia, kad_disco],
|
||||
libp2p/protocols/kademlia_discovery/types as kad_types,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
libp2p/protocols/service_discovery/types
|
||||
|
||||
import waku/waku_core, waku/node/peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku extended kademlia discovery"
|
||||
topics = "waku kademlia discovery"
|
||||
|
||||
const
|
||||
DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5)
|
||||
ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5)
|
||||
const DefaultKademliaDiscoveryInterval* = chronos.seconds(10)
|
||||
|
||||
type
|
||||
MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].}
|
||||
NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].}
|
||||
type WakuKademlia* = ref object
|
||||
protocol*: KademliaDiscovery
|
||||
peerManager: PeerManager
|
||||
intervalFut: Future[void]
|
||||
|
||||
ExtendedKademliaDiscoveryParams* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
mixPubKey*: Option[Curve25519Key]
|
||||
advertiseMix*: bool = false
|
||||
|
||||
WakuKademlia* = ref object
|
||||
protocol*: KademliaDiscovery
|
||||
peerManager: PeerManager
|
||||
discoveryLoop: Future[void]
|
||||
running*: bool
|
||||
getMixNodePoolSize: MixNodePoolSizeProvider
|
||||
isNodeStarted: NodeStartedProvider
|
||||
|
||||
proc new*(
|
||||
T: type WakuKademlia,
|
||||
switch: Switch,
|
||||
params: ExtendedKademliaDiscoveryParams,
|
||||
peerManager: PeerManager,
|
||||
getMixNodePoolSize: MixNodePoolSizeProvider = nil,
|
||||
isNodeStarted: NodeStartedProvider = nil,
|
||||
): Result[T, string] =
|
||||
if params.bootstrapNodes.len == 0:
|
||||
info "creating kademlia discovery as seed node (no bootstrap nodes)"
|
||||
|
||||
let kademlia = KademliaDiscovery.new(
|
||||
switch,
|
||||
bootstrapNodes = params.bootstrapNodes,
|
||||
config = KadDHTConfig.new(
|
||||
validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector()
|
||||
),
|
||||
codec = ExtendedKademliaDiscoveryCodec,
|
||||
)
|
||||
|
||||
try:
|
||||
switch.mount(kademlia)
|
||||
except CatchableError:
|
||||
return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
# Register services BEFORE starting kademlia so they are included in the
|
||||
# initial self-signed peer record published to the DHT
|
||||
if params.advertiseMix:
|
||||
if params.mixPubKey.isSome():
|
||||
let alreadyAdvertising = kademlia.startAdvertising(
|
||||
ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get()))
|
||||
)
|
||||
if alreadyAdvertising:
|
||||
warn "mix service was already being advertised"
|
||||
debug "extended kademlia advertising mix service",
|
||||
keyHex = byteutils.toHex(params.mixPubKey.get()),
|
||||
bootstrapNodes = params.bootstrapNodes.len
|
||||
else:
|
||||
warn "mix advertising enabled but no key provided"
|
||||
|
||||
info "kademlia discovery created",
|
||||
bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix
|
||||
|
||||
return ok(
|
||||
WakuKademlia(
|
||||
protocol: kademlia,
|
||||
peerManager: peerManager,
|
||||
running: false,
|
||||
getMixNodePoolSize: getMixNodePoolSize,
|
||||
isNodeStarted: isNodeStarted,
|
||||
)
|
||||
)
|
||||
|
||||
proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] =
|
||||
if service.id != MixProtocolID:
|
||||
trace "service is not mix protocol",
|
||||
serviceId = service.id, mixProtocolId = MixProtocolID
|
||||
return none(Curve25519Key)
|
||||
|
||||
if service.data.len != Curve25519KeySize:
|
||||
warn "invalid mix pub key length from kademlia record",
|
||||
expected = Curve25519KeySize,
|
||||
actual = service.data.len,
|
||||
dataHex = byteutils.toHex(service.data)
|
||||
return none(Curve25519Key)
|
||||
|
||||
debug "found mix protocol service",
|
||||
dataLen = service.data.len, expectedLen = Curve25519KeySize
|
||||
|
||||
let key = intoCurve25519Key(service.data)
|
||||
debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key)
|
||||
return some(key)
|
||||
|
||||
proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
debug "processing kademlia record",
|
||||
peerId = record.peerId,
|
||||
numAddresses = record.addresses.len,
|
||||
@ -130,151 +42,119 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
|
||||
let protocols = record.services.mapIt(it.id)
|
||||
|
||||
var mixPubKey = none(Curve25519Key)
|
||||
for service in record.services:
|
||||
debug "checking service",
|
||||
peerId = record.peerId, serviceId = service.id, dataLen = service.data.len
|
||||
mixPubKey = extractMixPubKey(service)
|
||||
if mixPubKey.isSome():
|
||||
debug "extracted mix public key from service", peerId = record.peerId
|
||||
break
|
||||
|
||||
if record.services.len > 0 and mixPubKey.isNone():
|
||||
debug "record has services but no valid mix key",
|
||||
peerId = record.peerId, services = record.services.mapIt(it.id)
|
||||
return none(RemotePeerInfo)
|
||||
return some(
|
||||
RemotePeerInfo.init(
|
||||
record.peerId,
|
||||
addrs = addrs,
|
||||
protocols = protocols,
|
||||
origin = PeerOrigin.Kademlia,
|
||||
mixPubKey = mixPubKey,
|
||||
record.peerId, addrs = addrs, protocols = protocols, origin = PeerOrigin.Kademlia
|
||||
)
|
||||
)
|
||||
|
||||
proc lookupMixPeers*(
|
||||
wk: WakuKademlia
|
||||
): Future[Result[int, string]] {.async: (raises: []).} =
|
||||
## Lookup mix peers via kademlia and add them to the peer store.
|
||||
## Returns the number of mix peers found and added.
|
||||
if wk.protocol.isNil():
|
||||
return err("cannot lookup mix peers: kademlia not mounted")
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await wk.protocol.lookup(mixService)
|
||||
except CatchableError:
|
||||
return err("mix peer lookup failed: " & getCurrentExceptionMsg())
|
||||
|
||||
debug "mix peer lookup returned records", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
if peerInfo.mixPubKey.isNone():
|
||||
continue
|
||||
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
info "mix peer added via kademlia lookup",
|
||||
peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get())
|
||||
added.inc()
|
||||
|
||||
info "mix peer lookup complete", found = added
|
||||
return ok(added)
|
||||
|
||||
proc runDiscoveryLoop(
|
||||
wk: WakuKademlia, interval: Duration, minMixPeers: int
|
||||
) {.async: (raises: []).} =
|
||||
info "extended kademlia discovery loop started", interval = interval
|
||||
wk: WakuKademlia, interval: Duration
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
info "kademlia discovery loop started", interval = interval
|
||||
|
||||
try:
|
||||
while true:
|
||||
# Wait for node to be started
|
||||
if not wk.isNodeStarted.isNil() and not wk.isNodeStarted():
|
||||
await sleepAsync(ExtendedKademliaDiscoveryStartupDelay)
|
||||
while true:
|
||||
await sleepAsync(interval)
|
||||
|
||||
let res = catch:
|
||||
await wk.protocol.randomRecords()
|
||||
let records = res.valueOr:
|
||||
error "kademlia discovery lookup failed", error = res.error.msg
|
||||
continue
|
||||
|
||||
for record in records:
|
||||
let peerInfo = toRemotePeerInfo(record).valueOr:
|
||||
continue
|
||||
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await wk.protocol.randomRecords()
|
||||
except CatchableError as e:
|
||||
warn "extended kademlia discovery failed", error = e.msg
|
||||
await sleepAsync(interval)
|
||||
continue
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
|
||||
debug "received random records from kademlia", numRecords = records.len
|
||||
debug "peer added via kademlia discovery",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
#TODO peer added metric
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
debug "peer added via extended kademlia discovery",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols,
|
||||
hasMixPubKey = peerInfo.mixPubKey.isSome()
|
||||
added.inc()
|
||||
proc lookup*(
|
||||
self: WakuKademlia, codec: string
|
||||
): Future[seq[RemotePeerInfo]] {.async: (raises: []).} =
|
||||
let serviceId = hashServiceId(codec)
|
||||
|
||||
if added > 0:
|
||||
info "added peers from extended kademlia discovery", count = added
|
||||
let catchRes = catch:
|
||||
await self.protocol.lookup(serviceId)
|
||||
let lookupRes = catchRes.valueOr:
|
||||
error "kademlia discovery lookup failed", error = catchRes.error.msg
|
||||
return
|
||||
|
||||
# Targeted mix peer lookup when pool is low
|
||||
if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and
|
||||
wk.getMixNodePoolSize() < minMixPeers:
|
||||
debug "mix node pool below threshold, performing targeted lookup",
|
||||
currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers
|
||||
let found = (await wk.lookupMixPeers()).valueOr:
|
||||
warn "targeted mix peer lookup failed", error = error
|
||||
0
|
||||
if found > 0:
|
||||
info "found mix peers via targeted kademlia lookup", count = found
|
||||
let ads = lookupRes.valueOr:
|
||||
error "kademlia discovery lookup failed", error
|
||||
return
|
||||
|
||||
await sleepAsync(interval)
|
||||
except CancelledError as e:
|
||||
debug "extended kademlia discovery loop cancelled", error = e.msg
|
||||
except CatchableError as e:
|
||||
error "extended kademlia discovery loop failed", error = e.msg
|
||||
var peerInfos = newSeqOfCap[RemotePeerInfo](ads.len)
|
||||
for ad in ads:
|
||||
let peerInfo = toRemotePeerInfo(ad.data).valueOr:
|
||||
continue
|
||||
|
||||
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
|
||||
debug "peer added via kademlia discovery",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols
|
||||
|
||||
#TODO peer added metric
|
||||
|
||||
peerInfos.add(peerInfo)
|
||||
|
||||
return peerInfos
|
||||
|
||||
proc new*(
|
||||
T: type WakuKademlia,
|
||||
switch: Switch,
|
||||
peerManager: PeerManager,
|
||||
bootstrapNodes: seq[(PeerId, seq[MultiAddress])],
|
||||
providedServices: var seq[ServiceInfo],
|
||||
): T =
|
||||
if bootstrapNodes.len == 0:
|
||||
info "creating kademlia discovery as seed node (no bootstrap nodes)"
|
||||
|
||||
let kademlia = KademliaDiscovery.new(
|
||||
switch,
|
||||
bootstrapNodes = bootstrapNodes,
|
||||
config = KadDHTConfig.new(
|
||||
validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector()
|
||||
),
|
||||
services = providedServices,
|
||||
)
|
||||
|
||||
info "kademlia service discovery created", bootstrapNodes = bootstrapNodes.len
|
||||
|
||||
return WakuKademlia(protocol: kademlia, peerManager: peerManager)
|
||||
|
||||
proc start*(
|
||||
wk: WakuKademlia,
|
||||
interval: Duration = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if wk.running:
|
||||
return err("already running")
|
||||
self: WakuKademlia, interval: Duration = DefaultKademliaDiscoveryInterval
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if self.protocol.started:
|
||||
warn "Starting kad-disco twice"
|
||||
return
|
||||
|
||||
try:
|
||||
await wk.protocol.start()
|
||||
except CatchableError as e:
|
||||
return err("failed to start kademlia discovery: " & e.msg)
|
||||
await self.protocol.start()
|
||||
|
||||
wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers)
|
||||
self.intervalFut = self.runDiscoveryLoop(interval)
|
||||
|
||||
info "kademlia discovery started"
|
||||
return ok()
|
||||
|
||||
proc stop*(wk: WakuKademlia) {.async: (raises: []).} =
|
||||
if not wk.running:
|
||||
proc stop*(self: WakuKademlia) {.async: (raises: []).} =
|
||||
if not self.protocol.started:
|
||||
return
|
||||
|
||||
info "Stopping kademlia discovery"
|
||||
|
||||
wk.running = false
|
||||
if not self.intervalFut.isNil():
|
||||
self.intervalFut.cancelSoon()
|
||||
self.intervalFut = nil
|
||||
|
||||
if not wk.discoveryLoop.isNil():
|
||||
await wk.discoveryLoop.cancelAndWait()
|
||||
wk.discoveryLoop = nil
|
||||
if not self.protocol.isNil():
|
||||
await self.protocol.stop()
|
||||
|
||||
if not wk.protocol.isNil():
|
||||
await wk.protocol.stop()
|
||||
info "Successfully stopped kademlia discovery"
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import
|
||||
std/[options, sequtils],
|
||||
std/[options, sequtils, sugar],
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/peerid,
|
||||
@ -7,7 +7,9 @@ import
|
||||
libp2p/protocols/connectivity/relay/relay,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/extended_peer_record,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
|
||||
import
|
||||
./internal_config,
|
||||
@ -121,10 +123,11 @@ proc initNode(
|
||||
builder.withRateLimit(conf.rateLimit)
|
||||
builder.withCircuitRelay(relay)
|
||||
|
||||
let node = ?builder.build().mapErr(
|
||||
proc(err: string): string =
|
||||
"failed to create waku node instance: " & err
|
||||
)
|
||||
let node =
|
||||
?builder.build().mapErr(
|
||||
proc(err: string): string =
|
||||
"failed to create waku node instance: " & err
|
||||
)
|
||||
|
||||
ok(node)
|
||||
|
||||
@ -159,38 +162,18 @@ proc setupProtocols(
|
||||
error "Unrecoverable error occurred", error = msg
|
||||
quit(QuitFailure)
|
||||
|
||||
var providedServices: seq[ServiceInfo]
|
||||
|
||||
#mount mix
|
||||
if conf.mixConf.isSome():
|
||||
let mixConf = conf.mixConf.get()
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey))
|
||||
providedServices.add(mixService)
|
||||
|
||||
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
|
||||
# Setup extended kademlia discovery
|
||||
if conf.kademliaDiscoveryConf.isSome():
|
||||
let mixPubKey =
|
||||
if conf.mixConf.isSome():
|
||||
some(conf.mixConf.get().mixPubKey)
|
||||
else:
|
||||
none(Curve25519Key)
|
||||
|
||||
node.wakuKademlia = WakuKademlia.new(
|
||||
node.switch,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
mixPubKey: mixPubKey,
|
||||
advertiseMix: conf.mixConf.isSome(),
|
||||
),
|
||||
node.peerManager,
|
||||
getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} =
|
||||
if node.wakuMix.isNil():
|
||||
0
|
||||
else:
|
||||
node.getMixNodePoolSize(),
|
||||
isNodeStarted = proc(): bool {.gcsafe, raises: [].} =
|
||||
node.started,
|
||||
).valueOr:
|
||||
return err("failed to setup kademlia discovery: " & error)
|
||||
|
||||
if conf.storeServiceConf.isSome():
|
||||
let storeServiceConf = conf.storeServiceConf.get()
|
||||
|
||||
@ -399,6 +382,21 @@ proc setupProtocols(
|
||||
if conf.peerExchangeDiscovery:
|
||||
await node.mountPeerExchangeClient()
|
||||
|
||||
if conf.kademliaDiscoveryConf.isSome():
|
||||
let kademlia = WakuKademlia.new(
|
||||
node.switch,
|
||||
node.peerManager,
|
||||
conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
providedServices,
|
||||
)
|
||||
|
||||
let catchRes = catch:
|
||||
node.switch.mount(kademlia.protocol)
|
||||
if catchRes.isErr():
|
||||
return err("failed to mount kademlia discovery: " & catchRes.error.msg)
|
||||
|
||||
node.wakuKademlia = kademlia
|
||||
|
||||
return ok()
|
||||
|
||||
## Start node
|
||||
@ -451,9 +449,10 @@ proc startNode*(
|
||||
node.peerManager.start()
|
||||
|
||||
if not node.wakuKademlia.isNil():
|
||||
let minMixPeers = if conf.mixConf.isSome(): 4 else: 0
|
||||
(await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr:
|
||||
return err("failed to start kademlia discovery: " & error)
|
||||
let catchRes = catch:
|
||||
await node.wakuKademlia.start()
|
||||
if catchRes.isErr():
|
||||
return err("failed to start kademlia discovery: " & catchRes.error.msg)
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user