mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-13 10:23:08 +00:00
* Adapt using chronos' TokenBucket. Removed TokenBucket and test. bump nim-chronos -> nim-libp2p/nim-lsquic/nim-jwt -> adapt to latest libp2p changes * Fix libp2p/utility reports unlisted exception can occure from close of socket in waitForService - -d:ssl compile flag caused it * Adapt request_limiter to new chronos' TokenBucket replenish algorithm to keep original intent of use * Fix filter dos protection test * Fix peer manager tests due change caused by new libp2p * Adjust store test rate limit to eliminate CI test flakyness of timing * Adjust store test rate limit to eliminate CI test flakyness of timing - lightpush/legacy_lightpush/filter * Rework filter dos protection test to avoid CI crazy timing causing flakyness in test results compared to local runs * Rework lightpush dos protection test to avoid CI crazy timing causing flakyness in test results compared to local runs * Rework lightpush and legacy lightpush rate limit tests to eliminate timing effect in CI that cause longer awaits thus result in minting new tokens unlike local runs
240 lines
7.2 KiB
Nim
240 lines
7.2 KiB
Nim
{.push raises: [].}
|
|
|
|
import
|
|
std/[sugar, options, sequtils, tables],
|
|
results,
|
|
chronos,
|
|
chronicles,
|
|
stew/byteutils,
|
|
libp2p/protocols/rendezvous,
|
|
libp2p/protocols/rendezvous/protobuf,
|
|
libp2p/utils/semaphore,
|
|
libp2p/utils/offsettedseq,
|
|
libp2p/crypto/curve25519,
|
|
libp2p/switch,
|
|
libp2p/utility
|
|
|
|
import metrics except collect
|
|
|
|
import
|
|
../node/peer_manager,
|
|
../common/callbacks,
|
|
../waku_enr/capabilities,
|
|
../waku_core/peers,
|
|
../waku_core/codecs,
|
|
./common,
|
|
./waku_peer_record
|
|
|
|
logScope:
|
|
topics = "waku rendezvous"
|
|
|
|
type WakuRendezVous* = ref object of GenericRendezVous[WakuPeerRecord]
|
|
peerManager: PeerManager
|
|
clusterId: uint16
|
|
getShards: GetShards
|
|
getCapabilities: GetCapabilities
|
|
getPeerRecord: GetWakuPeerRecord
|
|
|
|
registrationInterval: timer.Duration
|
|
periodicRegistrationFut: Future[void]
|
|
|
|
const MaximumNamespaceLen = 255
|
|
|
|
method discover*(
|
|
self: WakuRendezVous, conn: Connection, d: Discover
|
|
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
|
# Override discover method to avoid collect macro generic instantiation issues
|
|
trace "Received Discover", peerId = conn.peerId, ns = d.ns
|
|
await procCall GenericRendezVous[WakuPeerRecord](self).discover(conn, d)
|
|
|
|
proc advertise*(
|
|
self: WakuRendezVous,
|
|
namespace: string,
|
|
peers: seq[PeerId],
|
|
ttl: timer.Duration = self.minDuration,
|
|
): Future[Result[void, string]] {.async: (raises: []).} =
|
|
trace "advertising via waku rendezvous",
|
|
namespace = namespace, ttl = ttl, peers = $peers, peerRecord = $self.getPeerRecord()
|
|
let se = SignedPayload[WakuPeerRecord].init(
|
|
self.switch.peerInfo.privateKey, self.getPeerRecord()
|
|
).valueOr:
|
|
return
|
|
err("rendezvous advertisement failed: Failed to sign Waku Peer Record: " & $error)
|
|
let sprBuff = se.encode().valueOr:
|
|
return err("rendezvous advertisement failed: Wrong Signed Peer Record: " & $error)
|
|
|
|
# rendezvous.advertise expects already opened connections
|
|
# must dial first
|
|
|
|
var futs = collect(newSeq):
|
|
for peerId in peers:
|
|
self.peerManager.dialPeer(peerId, self.codec)
|
|
|
|
let dialCatch = catch:
|
|
await allFinished(futs)
|
|
|
|
if dialCatch.isErr():
|
|
return err("advertise: " & dialCatch.error.msg)
|
|
|
|
futs = dialCatch.get()
|
|
|
|
let conns = collect(newSeq):
|
|
for fut in futs:
|
|
let catchable = catch:
|
|
fut.read()
|
|
|
|
if catchable.isErr():
|
|
warn "a rendezvous dial failed", cause = catchable.error.msg
|
|
continue
|
|
|
|
let connOpt = catchable.get()
|
|
|
|
let conn = connOpt.valueOr:
|
|
continue
|
|
|
|
conn
|
|
|
|
if conns.len == 0:
|
|
return err("could not establish any connections to rendezvous peers")
|
|
|
|
try:
|
|
await self.advertise(namespace, ttl, peers, sprBuff)
|
|
except Exception as e:
|
|
return err("rendezvous advertisement failed: " & e.msg)
|
|
finally:
|
|
for conn in conns:
|
|
await conn.close()
|
|
return ok()
|
|
|
|
proc advertiseAll*(
|
|
self: WakuRendezVous
|
|
): Future[Result[void, string]] {.async: (raises: []).} =
|
|
trace "waku rendezvous advertisements started"
|
|
|
|
let rpi = self.peerManager.selectPeer(self.codec).valueOr:
|
|
return err("could not get a peer supporting RendezVousCodec")
|
|
|
|
let namespace = computeMixNamespace(self.clusterId)
|
|
|
|
# Advertise yourself on that peer
|
|
let res = await self.advertise(namespace, @[rpi.peerId])
|
|
|
|
trace "waku rendezvous advertisements finished"
|
|
|
|
return res
|
|
|
|
proc periodicRegistration(self: WakuRendezVous) {.async.} =
|
|
info "waku rendezvous periodic registration started",
|
|
interval = self.registrationInterval
|
|
|
|
# infinite loop
|
|
while true:
|
|
await sleepAsync(self.registrationInterval)
|
|
|
|
(await self.advertiseAll()).isOkOr:
|
|
info "waku rendezvous advertisements failed", error = error
|
|
|
|
if self.registrationInterval > MaxRegistrationInterval:
|
|
self.registrationInterval = MaxRegistrationInterval
|
|
else:
|
|
self.registrationInterval += self.registrationInterval
|
|
|
|
# Back to normal interval if no errors
|
|
self.registrationInterval = DefaultRegistrationInterval
|
|
|
|
proc new*(
|
|
T: type WakuRendezVous,
|
|
switch: Switch,
|
|
peerManager: PeerManager,
|
|
clusterId: uint16,
|
|
getShards: GetShards,
|
|
getCapabilities: GetCapabilities,
|
|
getPeerRecord: GetWakuPeerRecord,
|
|
): Result[T, string] {.raises: [].} =
|
|
let rng = newRng()
|
|
let wrv = T(
|
|
rng: rng,
|
|
salt: string.fromBytes(generateBytes(rng[], 8)),
|
|
registered: initOffsettedSeq[RegisteredData](),
|
|
expiredDT: Moment.now() - 1.days,
|
|
sema: newAsyncSemaphore(SemaphoreDefaultSize),
|
|
minDuration: rendezvous.MinimumAcceptedDuration,
|
|
maxDuration: rendezvous.MaximumDuration,
|
|
minTTL: rendezvous.MinimumAcceptedDuration.seconds.uint64,
|
|
maxTTL: rendezvous.MaximumDuration.seconds.uint64,
|
|
peerRecordValidator: checkWakuPeerRecord,
|
|
)
|
|
|
|
wrv.peerManager = peerManager
|
|
wrv.clusterId = clusterId
|
|
wrv.getShards = getShards
|
|
wrv.getCapabilities = getCapabilities
|
|
wrv.registrationInterval = DefaultRegistrationInterval
|
|
wrv.getPeerRecord = getPeerRecord
|
|
wrv.switch = switch
|
|
wrv.codec = WakuRendezVousCodec
|
|
|
|
proc handleStream(
|
|
conn: Connection, proto: string
|
|
) {.async: (raises: [CancelledError]).} =
|
|
try:
|
|
let
|
|
buf = await conn.readLp(4096)
|
|
msg = Message.decode(buf).tryGet()
|
|
case msg.msgType
|
|
of MessageType.Register:
|
|
#TODO: override this to store peers registered with us in peerstore with their info as well.
|
|
await wrv.register(conn, msg.register.tryGet(), wrv.getPeerRecord())
|
|
of MessageType.RegisterResponse:
|
|
trace "Got an unexpected Register Response", response = msg.registerResponse
|
|
of MessageType.Unregister:
|
|
wrv.unregister(conn, msg.unregister.tryGet())
|
|
of MessageType.Discover:
|
|
await wrv.discover(conn, msg.discover.tryGet())
|
|
of MessageType.DiscoverResponse:
|
|
trace "Got an unexpected Discover Response", response = msg.discoverResponse
|
|
except CancelledError as exc:
|
|
trace "cancelled rendezvous handler"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
trace "exception in rendezvous handler", description = exc.msg
|
|
finally:
|
|
await conn.close()
|
|
|
|
wrv.handler = handleStream
|
|
|
|
info "waku rendezvous initialized",
|
|
clusterId = clusterId,
|
|
shards = getShards(),
|
|
capabilities = getCapabilities(),
|
|
wakuPeerRecord = getPeerRecord()
|
|
|
|
return ok(wrv)
|
|
|
|
proc start*(self: WakuRendezVous) {.async: (raises: []).} =
|
|
# Start the parent GenericRendezVous (starts the register deletion loop)
|
|
if self.started:
|
|
warn "waku rendezvous already started"
|
|
return
|
|
try:
|
|
await procCall GenericRendezVous[WakuPeerRecord](self).start()
|
|
except CancelledError as exc:
|
|
error "failed to start GenericRendezVous", cause = exc.msg
|
|
return
|
|
# start registering forever
|
|
self.periodicRegistrationFut = self.periodicRegistration()
|
|
|
|
info "waku rendezvous discovery started"
|
|
|
|
proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} =
|
|
if not self.periodicRegistrationFut.isNil():
|
|
await self.periodicRegistrationFut.cancelAndWait()
|
|
|
|
# Stop the parent GenericRendezVous (stops the register deletion loop)
|
|
await GenericRendezVous[WakuPeerRecord](self).stop()
|
|
|
|
# Stop the parent GenericRendezVous (stops the register deletion loop)
|
|
await GenericRendezVous[WakuPeerRecord](self).stop()
|
|
|
|
info "waku rendezvous discovery stopped"
|