nwaku/waku/waku_peer_exchange/protocol.nim

295 lines
9.3 KiB
Nim
Raw Permalink Normal View History

import
std/[options, sequtils, random],
results,
chronicles,
chronos,
metrics,
libp2p/protocols/protocol,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../node/peer_manager,
../waku_core,
../discovery/waku_discv5,
./rpc,
./rpc_codec,
../common/rate_limit/request_limiter
from ../waku_core/codecs import WakuPeerExchangeCodec
export WakuPeerExchangeCodec
declarePublicGauge waku_px_peers_received_total,
"number of ENRs received via peer exchange"
declarePublicGauge waku_px_peers_received_unknown,
"number of previously unknown ENRs received via peer exchange"
declarePublicCounter waku_px_peers_sent,
"number of ENRs sent to peer exchange requesters"
declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached"
declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"]
logScope:
topics = "waku peer_exchange"
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety
DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
MaxPeersCacheSize = 60
2024-08-23 18:01:30 +00:00
CacheRefreshInterval = 10.minutes
DefaultPXNumPeersReq* = 5.uint64()
# Error types (metric label values)
const
dialFailure = "dial_failure"
peerNotFoundFailure = "peer_not_found_failure"
decodeRpcFailure = "decode_rpc_failure"
retrievePeersDiscv5Error = "retrieve_peers_discv5_failure"
pxFailure = "px_failure"
type
WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus]
WakuPeerExchange* = ref object of LPProtocol
peerManager*: PeerManager
enrCache*: seq[enr.Record]
cluster*: Option[uint16]
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
requestRateLimiter*: RequestRateLimiter
pxLoopHandle*: Future[void]
proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, conn: Connection
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let rpc = PeerExchangeRpc.makeRequest(numPeers)
var buffer: seq[byte]
var callResult =
(status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string))
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
callResult = (
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some($exc.msg),
)
finally:
# close, no more data is expected
await conn.closeWithEof()
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
return err(callResult)
let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr():
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some($decodedBuff.error),
)
)
if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
return err(
(
status_code: decodedBuff.get().response.status_code,
status_desc: decodedBuff.get().response.status_desc,
)
)
return ok(decodedBuff.get().response)
proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, peer: RemotePeerInfo
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
try:
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
return err(
(
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
status_desc: some(dialFailure),
)
)
return await wpx.request(numPeers, connOpt.get())
except CatchableError:
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()),
)
)
proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some(peerNotFoundFailure),
)
)
return await wpx.request(numPeers, peerOpt.get())
proc respond(
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let rpc = PeerExchangeRpc.makeResponse(enrs.mapIt(PeerExchangePeerInfo(enr: it.raw)))
try:
await conn.writeLP(rpc.encode().buffer)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err(
(
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
status_desc: some("exception dialing peer: " & exc.msg),
)
)
return ok()
proc respondError(
wpx: WakuPeerExchange,
status_code: PeerExchangeResponseStatusCode,
status_desc: Option[string],
conn: Connection,
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let rpc = PeerExchangeRpc.makeErrorResponse(status_code, status_desc)
try:
await conn.writeLP(rpc.encode().buffer)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some("exception dialing peer: " & exc.msg),
)
)
return ok()
proc getEnrsFromCache(
wpx: WakuPeerExchange, numPeers: uint64
): seq[enr.Record] {.gcsafe.} =
if wpx.enrCache.len() == 0:
debug "peer exchange ENR cache is empty"
return @[]
# copy and shuffle
randomize()
var shuffledCache = wpx.enrCache
shuffledCache.shuffle()
# return numPeers or less if cache is smaller
return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)]
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
if peer.origin != Discv5:
trace "peer not from discv5", peer = $peer, origin = $peer.origin
return false
if peer.enr.isNone():
trace "peer has no ENR", peer = $peer
return false
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
trace "peer has mismatching cluster", peer = $peer
return false
return true
proc populateEnrCache(wpx: WakuPeerExchange) =
# share only peers that i) are reachable ii) come from discv5 iii) share cluster
let withEnr = wpx.peerManager.wakuPeerStore.getReachablePeers().filterIt(
poolFilter(wpx.cluster, it)
)
# either what we have or max cache size
var newEnrCache = newSeq[enr.Record](0)
for i in 0 ..< min(withEnr.len, MaxPeersCacheSize):
newEnrCache.add(withEnr[i].enr.get())
# swap cache for new
wpx.enrCache = newEnrCache
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
# try more aggressively to fill the cache at startup
var attempts = 50
while wpx.enrCache.len < MaxPeersCacheSize and attempts > 0:
attempts -= 1
wpx.populateEnrCache()
await sleepAsync(1.seconds)
heartbeat "Updating px enr cache", CacheRefreshInterval:
wpx.populateEnrCache()
proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
wpx.requestRateLimiter.checkUsageLimit(WakuPeerExchangeCodec, conn):
try:
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
(
await wpx.respondError(
PeerExchangeResponseStatusCode.BAD_REQUEST, some(exc.msg), conn
)
).isOkOr:
error "Failed to respond with BAD_REQUEST:", error = $error
return
let decBuf = PeerExchangeRpc.decode(buffer)
if decBuf.isErr():
waku_px_errors.inc(labelValues = [decodeRpcFailure])
error "Failed to decode PeerExchange request", error = $decBuf.error
(
await wpx.respondError(
PeerExchangeResponseStatusCode.BAD_REQUEST, some($decBuf.error), conn
)
).isOkOr:
error "Failed to respond with BAD_REQUEST:", error = $error
return
trace "peer exchange request received"
let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers)
(await wpx.respond(enrs, conn)).isErrOr:
waku_px_peers_sent.inc(enrs.len().int64())
do:
(
await wpx.respondError(
PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS, none(string), conn
)
).isOkOr:
error "Failed to respond with TOO_MANY_REQUESTS:", error = $error
# close, no data is expected
await conn.closeWithEof()
wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec
proc new*(
T: type WakuPeerExchange,
peerManager: PeerManager,
cluster: Option[uint16] = none(uint16),
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wpx = WakuPeerExchange(
peerManager: peerManager,
cluster: cluster,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
wpx.initProtocolHandler()
setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting)
asyncSpawn wpx.updatePxEnrCache()
return wpx