mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-06 22:19:30 +00:00
Combines five dep-and-build changes that all flow from the libp2p v2.0.0
upgrade and the move to the extracted libp2p_mix / mix-rln plugin stack:
waku.nimble:
* libp2p: ff8d51857 -> c43199378 (release/v2.0.0 tip; sha-pinned until
vacp2p cuts a v2.0.0 tag).
* Drop the bare `zlib < 0.2` cap — no longer needed by the upgraded
libp2p.
* websock: bare ">= 0.4.0" — replaces the d4cd68b URL+SHA workaround
that pinned through a libp2p commit-specific websock SHA.
* nim-json-rpc: switch to chaitanyaprem/nim-json-rpc#f05fad25 — relaxes
websock cap to allow >=0.4.0. TODO: revert to status-im/nim-json-rpc
once status-im/nim-json-rpc#277 merges and a tag is cut.
* lsquic: bare ">= 0.4.1" (drops URL form).
* Add mix-rln-spam-protection-plugin pin (23b278b4) and nim-libp2p-mix
pin (50c4ab4f — PR #14 HEAD); the plugin pins the same libp2p_mix
SHA so the diamond dep collapses to a single source.
waku/factory/waku.nim:
* Explicit HPService.setup(switch) / AutonatService.setup(switch)
calls. libp2p v2.0.0's Service lifecycle refactor (libp2p#2462)
removed switch.start's auto-setup loop, so any caller that assigns
directly to switch.services (we do) is responsible for calling
setup() themselves. Without it, AutonatService.addressMapper stays
nil and peerInfo.expandAddrs SIGSEGVs during start(). Wrapped in
try/except for ServiceSetupError so a setup failure surfaces as a
logged error rather than a crash.
Build / scripts:
* scripts/build_rln_mix.sh removed and Makefile simplified — librln
is now a single shared archive built from zerokit's `stateless`
features (no separate librln_mix archive).
* simulations/mixnet/build_setup.sh + setup_credentials.nim updated
to use librln_v2.0.2.a directly and run RLN keystore setup before
nodes start.
Validated:
* Cold local-cache nimble setup --localdeps -y.
* wakunode2 and chat2mix link cleanly.
* Mixnet roundtrip sim: [PASS] bob received message from alice.
* RLN proof generation + verification on every in-path mix node:
5 gen_called == 5 verified, 0 SPAM_PROOF_* errors.
239 lines
7.3 KiB
Nim
239 lines
7.3 KiB
Nim
{.push raises: [].}
|
|
|
|
import
|
|
std/[sugar, options, sequtils, tables],
|
|
results,
|
|
chronos,
|
|
chronicles,
|
|
stew/byteutils,
|
|
libp2p/protocols/rendezvous,
|
|
libp2p/protocols/rendezvous/protobuf,
|
|
libp2p/crypto/rng,
|
|
libp2p/utils/offsettedseq,
|
|
libp2p/crypto/curve25519,
|
|
libp2p/switch,
|
|
libp2p/utility
|
|
|
|
import metrics except collect
|
|
|
|
import
|
|
../node/peer_manager,
|
|
../common/callbacks,
|
|
../waku_enr/capabilities,
|
|
../waku_core/peers,
|
|
../waku_core/codecs,
|
|
./common,
|
|
./waku_peer_record
|
|
|
|
logScope:
|
|
topics = "waku rendezvous"
|
|
|
|
type WakuRendezVous* = ref object of GenericRendezVous[WakuPeerRecord]
|
|
peerManager: PeerManager
|
|
clusterId: uint16
|
|
getShards: GetShards
|
|
getCapabilities: GetCapabilities
|
|
getPeerRecord: GetWakuPeerRecord
|
|
|
|
registrationInterval: timer.Duration
|
|
periodicRegistrationFut: Future[void]
|
|
|
|
const MaximumNamespaceLen = 255
|
|
|
|
method discover*(
|
|
self: WakuRendezVous, conn: Connection, d: Discover
|
|
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
|
# Override discover method to avoid collect macro generic instantiation issues
|
|
trace "Received Discover", peerId = conn.peerId, ns = d.ns
|
|
await procCall GenericRendezVous[WakuPeerRecord](self).discover(conn, d)
|
|
|
|
proc advertise*(
|
|
self: WakuRendezVous,
|
|
namespace: string,
|
|
peers: seq[PeerId],
|
|
ttl: timer.Duration = self.config.minDuration,
|
|
): Future[Result[void, string]] {.async: (raises: []).} =
|
|
trace "advertising via waku rendezvous",
|
|
namespace = namespace, ttl = ttl, peers = $peers, peerRecord = $self.getPeerRecord()
|
|
let se = SignedPayload[WakuPeerRecord].init(
|
|
self.switch.peerInfo.privateKey, self.getPeerRecord()
|
|
).valueOr:
|
|
return
|
|
err("rendezvous advertisement failed: Failed to sign Waku Peer Record: " & $error)
|
|
let sprBuff = se.encode().valueOr:
|
|
return err("rendezvous advertisement failed: Wrong Signed Peer Record: " & $error)
|
|
|
|
# rendezvous.advertise expects already opened connections
|
|
# must dial first
|
|
|
|
var futs = collect(newSeq):
|
|
for peerId in peers:
|
|
self.peerManager.dialPeer(peerId, self.codec)
|
|
|
|
let dialCatch = catch:
|
|
await allFinished(futs)
|
|
|
|
if dialCatch.isErr():
|
|
return err("advertise: " & dialCatch.error.msg)
|
|
|
|
futs = dialCatch.get()
|
|
|
|
let conns = collect(newSeq):
|
|
for fut in futs:
|
|
let catchable = catch:
|
|
fut.read()
|
|
|
|
if catchable.isErr():
|
|
warn "a rendezvous dial failed", cause = catchable.error.msg
|
|
continue
|
|
|
|
let connOpt = catchable.get()
|
|
|
|
let conn = connOpt.valueOr:
|
|
continue
|
|
|
|
conn
|
|
|
|
if conns.len == 0:
|
|
return err("could not establish any connections to rendezvous peers")
|
|
|
|
try:
|
|
await self.advertise(namespace, ttl, peers, sprBuff)
|
|
except Exception as e:
|
|
return err("rendezvous advertisement failed: " & e.msg)
|
|
finally:
|
|
for conn in conns:
|
|
await conn.close()
|
|
return ok()
|
|
|
|
proc advertiseAll*(
|
|
self: WakuRendezVous
|
|
): Future[Result[void, string]] {.async: (raises: []).} =
|
|
trace "waku rendezvous advertisements started"
|
|
|
|
let rpi = self.peerManager.selectPeer(self.codec).valueOr:
|
|
return err("could not get a peer supporting RendezVousCodec")
|
|
|
|
let namespace = computeMixNamespace(self.clusterId)
|
|
|
|
# Advertise yourself on that peer
|
|
let res = await self.advertise(namespace, @[rpi.peerId])
|
|
|
|
trace "waku rendezvous advertisements finished"
|
|
|
|
return res
|
|
|
|
proc periodicRegistration(self: WakuRendezVous) {.async.} =
|
|
info "waku rendezvous periodic registration started",
|
|
interval = self.registrationInterval
|
|
|
|
# infinite loop
|
|
while true:
|
|
await sleepAsync(self.registrationInterval)
|
|
|
|
(await self.advertiseAll()).isOkOr:
|
|
info "waku rendezvous advertisements failed", error = error
|
|
|
|
if self.registrationInterval > MaxRegistrationInterval:
|
|
self.registrationInterval = MaxRegistrationInterval
|
|
else:
|
|
self.registrationInterval += self.registrationInterval
|
|
|
|
# Back to normal interval if no errors
|
|
self.registrationInterval = DefaultRegistrationInterval
|
|
|
|
proc new*(
|
|
T: type WakuRendezVous,
|
|
switch: Switch,
|
|
peerManager: PeerManager,
|
|
clusterId: uint16,
|
|
getShards: GetShards,
|
|
getCapabilities: GetCapabilities,
|
|
getPeerRecord: GetWakuPeerRecord,
|
|
): Result[T, string] {.raises: [].} =
|
|
let rng = newRng()
|
|
let wrv = T(
|
|
rng: rng,
|
|
# libp2p 1.15.3: generateBytes now takes the libp2p Rng directly
|
|
# (used to take the underlying ref HmacDrbgContext via `rng[]`).
|
|
salt: string.fromBytes(generateBytes(rng, 8)),
|
|
registered: initOffsettedSeq[RegisteredData](),
|
|
expiredDT: Moment.now() - 1.days,
|
|
sema: newAsyncSemaphore(SemaphoreDefaultSize),
|
|
# libp2p 1.15.3 moved minDuration/maxDuration/minTTL/maxTTL onto
|
|
# GenericRendezVous.config (RendezVousConfig).
|
|
config: RendezVousConfig(
|
|
minDuration: rendezvous.MinimumAcceptedDuration,
|
|
maxDuration: rendezvous.MaximumDuration,
|
|
minTTL: rendezvous.MinimumAcceptedDuration.seconds.uint64,
|
|
maxTTL: rendezvous.MaximumDuration.seconds.uint64,
|
|
),
|
|
peerRecordValidator: checkWakuPeerRecord,
|
|
)
|
|
|
|
wrv.peerManager = peerManager
|
|
wrv.clusterId = clusterId
|
|
wrv.getShards = getShards
|
|
wrv.getCapabilities = getCapabilities
|
|
wrv.registrationInterval = DefaultRegistrationInterval
|
|
wrv.getPeerRecord = getPeerRecord
|
|
wrv.switch = switch
|
|
wrv.codec = WakuRendezVousCodec
|
|
|
|
proc handleStream(
|
|
conn: Connection, proto: string
|
|
) {.async: (raises: [CancelledError]).} =
|
|
try:
|
|
let
|
|
buf = await conn.readLp(4096)
|
|
msg = Message.decode(buf).tryGet()
|
|
case msg.msgType
|
|
of MessageType.Register:
|
|
#TODO: override this to store peers registered with us in peerstore with their info as well.
|
|
await wrv.register(conn, msg.register.tryGet(), wrv.getPeerRecord())
|
|
of MessageType.RegisterResponse:
|
|
trace "Got an unexpected Register Response", response = msg.registerResponse
|
|
of MessageType.Unregister:
|
|
wrv.unregister(conn, msg.unregister.tryGet())
|
|
of MessageType.Discover:
|
|
await wrv.discover(conn, msg.discover.tryGet())
|
|
of MessageType.DiscoverResponse:
|
|
trace "Got an unexpected Discover Response", response = msg.discoverResponse
|
|
except CancelledError as exc:
|
|
trace "cancelled rendezvous handler"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
trace "exception in rendezvous handler", description = exc.msg
|
|
finally:
|
|
await conn.close()
|
|
|
|
wrv.handler = handleStream
|
|
|
|
info "waku rendezvous initialized",
|
|
clusterId = clusterId,
|
|
shards = getShards(),
|
|
capabilities = getCapabilities(),
|
|
wakuPeerRecord = getPeerRecord()
|
|
|
|
return ok(wrv)
|
|
|
|
method start*(self: WakuRendezVous) {.async: (raises: [CancelledError]).} =
|
|
# Start the parent GenericRendezVous (starts the register deletion loop)
|
|
if self.started:
|
|
warn "waku rendezvous already started"
|
|
return
|
|
await procCall GenericRendezVous[WakuPeerRecord](self).start()
|
|
# start registering forever
|
|
self.periodicRegistrationFut = self.periodicRegistration()
|
|
|
|
info "waku rendezvous discovery started"
|
|
|
|
method stop*(self: WakuRendezVous) {.async: (raises: []).} =
|
|
if not self.periodicRegistrationFut.isNil():
|
|
await self.periodicRegistrationFut.cancelAndWait()
|
|
|
|
# Stop the parent GenericRendezVous (stops the register deletion loop)
|
|
await procCall GenericRendezVous[WakuPeerRecord](self).stop()
|
|
|
|
info "waku rendezvous discovery stopped"
|