2022-09-20 13:03:34 +02:00
|
|
|
|
import
|
2024-07-29 15:53:43 -04:00
|
|
|
|
std/[options, sequtils, random, sugar],
|
2024-07-09 13:14:28 +02:00
|
|
|
|
results,
|
2022-09-20 13:03:34 +02:00
|
|
|
|
chronicles,
|
|
|
|
|
chronos,
|
|
|
|
|
metrics,
|
|
|
|
|
libp2p/protocols/protocol,
|
|
|
|
|
libp2p/crypto/crypto,
|
|
|
|
|
eth/p2p/discoveryv5/enr
|
|
|
|
|
import
|
2023-08-09 18:11:50 +01:00
|
|
|
|
../common/nimchronos,
|
2023-04-18 15:22:10 +02:00
|
|
|
|
../node/peer_manager,
|
2023-04-19 13:29:23 +02:00
|
|
|
|
../waku_core,
|
2024-04-17 21:48:20 +02:00
|
|
|
|
../discovery/waku_discv5,
|
2022-09-20 13:03:34 +02:00
|
|
|
|
./rpc,
|
2024-09-18 15:58:07 +02:00
|
|
|
|
./rpc_codec,
|
|
|
|
|
../common/rate_limit/request_limiter
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
|
declarePublicGauge waku_px_peers_received_total,
|
|
|
|
|
"number of ENRs received via peer exchange"
|
|
|
|
|
declarePublicGauge waku_px_peers_received_unknown,
|
|
|
|
|
"number of previously unknown ENRs received via peer exchange"
|
2024-09-24 12:47:52 +02:00
|
|
|
|
declarePublicCounter waku_px_peers_sent,
|
|
|
|
|
"number of ENRs sent to peer exchange requesters"
|
2022-09-20 13:03:34 +02:00
|
|
|
|
declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached"
|
2024-09-24 12:47:52 +02:00
|
|
|
|
declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"]
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
|
|
|
|
logScope:
|
2022-11-03 16:36:24 +01:00
|
|
|
|
topics = "waku peer_exchange"
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
|
|
|
|
const
|
|
|
|
|
# We add a 64kB safety buffer for protocol overhead.
|
|
|
|
|
# 10x-multiplier also for safety
|
2024-04-20 09:10:52 +05:30
|
|
|
|
DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
|
2024-03-16 00:08:47 +01:00
|
|
|
|
# TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
|
2023-04-19 16:12:00 +02:00
|
|
|
|
MaxPeersCacheSize = 60
|
2024-08-23 23:31:30 +05:30
|
|
|
|
CacheRefreshInterval = 10.minutes
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
|
|
|
|
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
|
|
|
|
|
|
|
|
|
|
# Error types (metric label values)
|
|
|
|
|
const
|
|
|
|
|
dialFailure = "dial_failure"
|
|
|
|
|
peerNotFoundFailure = "peer_not_found_failure"
|
|
|
|
|
decodeRpcFailure = "decode_rpc_failure"
|
2024-03-16 00:08:47 +01:00
|
|
|
|
retrievePeersDiscv5Error = "retrieve_peers_discv5_failure"
|
2022-09-20 13:03:34 +02:00
|
|
|
|
pxFailure = "px_failure"
|
|
|
|
|
|
|
|
|
|
type
|
2024-09-18 15:58:07 +02:00
|
|
|
|
WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus]
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
|
|
|
|
WakuPeerExchange* = ref object of LPProtocol
|
|
|
|
|
peerManager*: PeerManager
|
2024-03-16 00:08:47 +01:00
|
|
|
|
enrCache*: seq[enr.Record]
|
2024-07-29 15:53:43 -04:00
|
|
|
|
cluster*: Option[uint16]
|
2024-03-16 00:08:47 +01:00
|
|
|
|
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
2024-09-18 15:58:07 +02:00
|
|
|
|
requestRateLimiter*: RequestRateLimiter
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
|
proc request*(
|
|
|
|
|
wpx: WakuPeerExchange, numPeers: uint64, conn: Connection
|
2024-05-03 14:07:15 +02:00
|
|
|
|
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
|
2024-09-18 15:58:07 +02:00
|
|
|
|
let rpc = PeerExchangeRpc.makeRequest(numPeers)
|
2023-02-09 16:59:29 +01:00
|
|
|
|
|
|
|
|
|
var buffer: seq[byte]
|
2024-09-18 15:58:07 +02:00
|
|
|
|
var callResult =
|
|
|
|
|
(status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string))
|
2023-02-09 16:59:29 +01:00
|
|
|
|
try:
|
|
|
|
|
await conn.writeLP(rpc.encode().buffer)
|
2024-04-20 09:10:52 +05:30
|
|
|
|
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
2023-02-09 16:59:29 +01:00
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
waku_px_errors.inc(labelValues = [exc.msg])
|
2024-09-18 15:58:07 +02:00
|
|
|
|
callResult = (
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
|
|
|
|
status_desc: some($exc.msg),
|
|
|
|
|
)
|
2023-05-25 16:42:48 +02:00
|
|
|
|
finally:
|
|
|
|
|
# close, no more data is expected
|
|
|
|
|
await conn.closeWithEof()
|
|
|
|
|
|
2024-09-18 15:58:07 +02:00
|
|
|
|
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
|
|
|
|
return err(callResult)
|
2023-02-09 16:59:29 +01:00
|
|
|
|
|
|
|
|
|
let decodedBuff = PeerExchangeRpc.decode(buffer)
|
|
|
|
|
if decodedBuff.isErr():
|
2024-09-18 15:58:07 +02:00
|
|
|
|
return err(
|
|
|
|
|
(
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
|
|
|
|
status_desc: some($decodedBuff.error),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
|
|
|
|
return err(
|
|
|
|
|
(
|
|
|
|
|
status_code: decodedBuff.get().response.status_code,
|
|
|
|
|
status_desc: decodedBuff.get().response.status_desc,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
2023-02-09 16:59:29 +01:00
|
|
|
|
return ok(decodedBuff.get().response)
|
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
|
proc request*(
|
|
|
|
|
wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo
|
2024-05-03 14:07:15 +02:00
|
|
|
|
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
|
|
|
|
|
try:
|
|
|
|
|
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
|
|
|
|
|
if connOpt.isNone():
|
2024-09-18 15:58:07 +02:00
|
|
|
|
return err(
|
|
|
|
|
(
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
|
|
|
|
|
status_desc: some(dialFailure),
|
|
|
|
|
)
|
|
|
|
|
)
|
2024-05-03 14:07:15 +02:00
|
|
|
|
return await wpx.request(numPeers, connOpt.get())
|
|
|
|
|
except CatchableError:
|
2024-09-18 15:58:07 +02:00
|
|
|
|
return err(
|
|
|
|
|
(
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
|
|
|
|
status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()),
|
|
|
|
|
)
|
|
|
|
|
)
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
|
proc request*(
|
|
|
|
|
wpx: WakuPeerExchange, numPeers: uint64
|
2024-05-03 14:07:15 +02:00
|
|
|
|
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
|
2023-01-26 10:20:20 +01:00
|
|
|
|
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
2022-09-20 13:03:34 +02:00
|
|
|
|
if peerOpt.isNone():
|
|
|
|
|
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
2024-09-18 15:58:07 +02:00
|
|
|
|
return err(
|
|
|
|
|
(
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
|
|
|
|
status_desc: some(peerNotFoundFailure),
|
|
|
|
|
)
|
|
|
|
|
)
|
2022-09-20 13:03:34 +02:00
|
|
|
|
return await wpx.request(numPeers, peerOpt.get())
|
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
|
proc respond(
|
|
|
|
|
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection
|
|
|
|
|
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
2024-09-18 15:58:07 +02:00
|
|
|
|
let rpc = PeerExchangeRpc.makeResponse(enrs.mapIt(PeerExchangePeerInfo(enr: it.raw)))
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2023-02-09 16:59:29 +01:00
|
|
|
|
try:
|
|
|
|
|
await conn.writeLP(rpc.encode().buffer)
|
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
waku_px_errors.inc(labelValues = [exc.msg])
|
2024-09-18 15:58:07 +02:00
|
|
|
|
return err(
|
|
|
|
|
(
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
|
|
|
|
|
status_desc: some("exception dialing peer: " & exc.msg),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return ok()
|
|
|
|
|
|
|
|
|
|
proc respondError(
|
|
|
|
|
wpx: WakuPeerExchange,
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode,
|
|
|
|
|
status_desc: Option[string],
|
|
|
|
|
conn: Connection,
|
|
|
|
|
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
|
|
|
|
let rpc = PeerExchangeRpc.makeErrorResponse(status_code, status_desc)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
await conn.writeLP(rpc.encode().buffer)
|
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
waku_px_errors.inc(labelValues = [exc.msg])
|
|
|
|
|
return err(
|
|
|
|
|
(
|
|
|
|
|
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
|
|
|
|
status_desc: some("exception dialing peer: " & exc.msg),
|
|
|
|
|
)
|
|
|
|
|
)
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
|
|
|
|
return ok()
|
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
|
proc getEnrsFromCache(
|
|
|
|
|
wpx: WakuPeerExchange, numPeers: uint64
|
|
|
|
|
): seq[enr.Record] {.gcsafe.} =
|
2023-04-19 16:12:00 +02:00
|
|
|
|
if wpx.enrCache.len() == 0:
|
|
|
|
|
debug "peer exchange ENR cache is empty"
|
|
|
|
|
return @[]
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2023-04-19 16:12:00 +02:00
|
|
|
|
# copy and shuffle
|
|
|
|
|
randomize()
|
|
|
|
|
var shuffledCache = wpx.enrCache
|
|
|
|
|
shuffledCache.shuffle()
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2023-04-19 16:12:00 +02:00
|
|
|
|
# return numPeers or less if cache is smaller
|
2024-03-16 00:08:47 +01:00
|
|
|
|
return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)]
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2024-07-29 15:53:43 -04:00
|
|
|
|
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
|
|
|
|
if peer.origin != Discv5:
|
|
|
|
|
trace "peer not from discv5", peer = $peer, origin = $peer.origin
|
|
|
|
|
return false
|
|
|
|
|
|
|
|
|
|
if peer.enr.isNone():
|
|
|
|
|
trace "peer has no ENR", peer = $peer
|
|
|
|
|
return false
|
|
|
|
|
|
|
|
|
|
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
|
|
|
|
trace "peer has mismatching cluster", peer = $peer
|
|
|
|
|
return false
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
2023-04-19 16:12:00 +02:00
|
|
|
|
proc populateEnrCache(wpx: WakuPeerExchange) =
|
2024-07-29 15:53:43 -04:00
|
|
|
|
# share only peers that i) are reachable ii) come from discv5 iii) share cluster
|
|
|
|
|
let withEnr =
|
|
|
|
|
wpx.peerManager.peerStore.getReachablePeers().filterIt(poolFilter(wpx.cluster, it))
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2023-04-19 16:12:00 +02:00
|
|
|
|
# either what we have or max cache size
|
|
|
|
|
var newEnrCache = newSeq[enr.Record](0)
|
2024-03-16 00:08:47 +01:00
|
|
|
|
for i in 0 ..< min(withEnr.len, MaxPeersCacheSize):
|
2023-04-19 16:12:00 +02:00
|
|
|
|
newEnrCache.add(withEnr[i].enr.get())
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2023-04-19 16:12:00 +02:00
|
|
|
|
# swap cache for new
|
|
|
|
|
wpx.enrCache = newEnrCache
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2023-04-19 16:12:00 +02:00
|
|
|
|
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
|
|
|
|
|
# try more aggressively to fill the cache at startup
|
|
|
|
|
while wpx.enrCache.len < MaxPeersCacheSize:
|
|
|
|
|
wpx.populateEnrCache()
|
|
|
|
|
await sleepAsync(5.seconds)
|
|
|
|
|
|
|
|
|
|
heartbeat "Updating px enr cache", CacheRefreshInterval:
|
|
|
|
|
wpx.populateEnrCache()
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2022-11-02 09:45:21 +01:00
|
|
|
|
proc initProtocolHandler(wpx: WakuPeerExchange) =
|
2022-09-20 13:03:34 +02:00
|
|
|
|
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
2023-02-09 16:59:29 +01:00
|
|
|
|
var buffer: seq[byte]
|
2024-09-18 15:58:07 +02:00
|
|
|
|
wpx.requestRateLimiter.checkUsageLimit(WakuPeerExchangeCodec, conn):
|
|
|
|
|
try:
|
|
|
|
|
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
waku_px_errors.inc(labelValues = [exc.msg])
|
|
|
|
|
|
|
|
|
|
(
|
|
|
|
|
await wpx.respondError(
|
|
|
|
|
PeerExchangeResponseStatusCode.BAD_REQUEST, some(exc.msg), conn
|
|
|
|
|
)
|
|
|
|
|
).isOkOr:
|
|
|
|
|
error "Failed to respond with BAD_REQUEST:", error = $error
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
let decBuf = PeerExchangeRpc.decode(buffer)
|
|
|
|
|
if decBuf.isErr():
|
|
|
|
|
waku_px_errors.inc(labelValues = [decodeRpcFailure])
|
|
|
|
|
error "Failed to decode PeerExchange request", error = $decBuf.error
|
|
|
|
|
|
|
|
|
|
(
|
|
|
|
|
await wpx.respondError(
|
|
|
|
|
PeerExchangeResponseStatusCode.BAD_REQUEST, some($decBuf.error), conn
|
|
|
|
|
)
|
|
|
|
|
).isOkOr:
|
|
|
|
|
error "Failed to respond with BAD_REQUEST:", error = $error
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
trace "peer exchange request received"
|
|
|
|
|
let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers)
|
|
|
|
|
(await wpx.respond(enrs, conn)).isErrOr:
|
|
|
|
|
waku_px_peers_sent.inc(enrs.len().int64())
|
|
|
|
|
do:
|
|
|
|
|
(
|
|
|
|
|
await wpx.respondError(
|
|
|
|
|
PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS, none(string), conn
|
|
|
|
|
)
|
|
|
|
|
).isOkOr:
|
|
|
|
|
error "Failed to respond with TOO_MANY_REQUESTS:", error = $error
|
2023-05-25 16:42:48 +02:00
|
|
|
|
# close, no data is expected
|
|
|
|
|
await conn.closeWithEof()
|
|
|
|
|
|
2022-11-02 09:45:21 +01:00
|
|
|
|
wpx.handler = handler
|
|
|
|
|
wpx.codec = WakuPeerExchangeCodec
|
2022-09-20 13:03:34 +02:00
|
|
|
|
|
2024-07-29 15:53:43 -04:00
|
|
|
|
proc new*(
|
|
|
|
|
T: type WakuPeerExchange,
|
|
|
|
|
peerManager: PeerManager,
|
|
|
|
|
cluster: Option[uint16] = none(uint16),
|
2024-09-18 15:58:07 +02:00
|
|
|
|
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
|
2024-07-29 15:53:43 -04:00
|
|
|
|
): T =
|
2024-09-18 15:58:07 +02:00
|
|
|
|
let wpx = WakuPeerExchange(
|
|
|
|
|
peerManager: peerManager,
|
|
|
|
|
cluster: cluster,
|
|
|
|
|
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
|
|
|
|
|
)
|
2022-11-02 09:45:21 +01:00
|
|
|
|
wpx.initProtocolHandler()
|
2024-09-18 15:58:07 +02:00
|
|
|
|
setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting)
|
2023-04-19 16:12:00 +02:00
|
|
|
|
asyncSpawn wpx.updatePxEnrCache()
|
2022-11-02 09:45:21 +01:00
|
|
|
|
return wpx
|