Prem Chaitanya Prathi e4358c9718 chore: remove metadata protocol dependency on enr, relax check when nwaku is edge node (#3519)
* remove metadata protocol dep on enr, do not disconnect peers based on shards mismatch
2025-08-13 10:48:56 +05:30

150 lines
3.9 KiB
Nim

{.push raises: [].}
import
std/[options, sequtils, sets],
results,
chronicles,
chronos,
metrics,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import ../common/nimchronos, ../waku_core, ./rpc
from ../waku_core/codecs import WakuMetadataCodec
export WakuMetadataCodec
logScope:
topics = "waku metadata"
const RpcResponseMaxBytes* = 1024
type WakuMetadata* = ref object of LPProtocol
clusterId*: uint32
shards*: HashSet[uint32]
topicSubscriptionQueue: Option[AsyncEventQueue[SubscriptionEvent]]
proc respond(
m: WakuMetadata, conn: Connection
): Future[Result[void, string]] {.async, gcsafe.} =
let response =
WakuMetadataResponse(clusterId: some(m.clusterId.uint32), shards: toSeq(m.shards))
let res = catch:
await conn.writeLP(response.encode().buffer)
if res.isErr():
return err(res.error.msg)
return ok()
proc request*(
m: WakuMetadata, conn: Connection
): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
let request =
WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards))
let writeRes = catch:
await conn.writeLP(request.encode().buffer)
let readRes = catch:
await conn.readLp(RpcResponseMaxBytes)
# close no matter what
let closeRes = catch:
await conn.closeWithEof()
if closeRes.isErr():
return err("close failed: " & closeRes.error.msg)
if writeRes.isErr():
return err("write failed: " & writeRes.error.msg)
let buffer =
if readRes.isErr():
return err("read failed: " & readRes.error.msg)
else:
readRes.get()
let response = WakuMetadataResponse.decode(buffer).valueOr:
return err("decode failed: " & $error)
return ok(response)
proc initProtocolHandler(m: WakuMetadata) =
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
defer:
# close, no data is expected
await conn.closeWithEof()
let res = catch:
await conn.readLp(RpcResponseMaxBytes)
let buffer = res.valueOr:
error "Connection reading error", error = error.msg
return
let response = WakuMetadataResponse.decode(buffer).valueOr:
error "Response decoding error", error = error
return
debug "Received WakuMetadata request",
remoteClusterId = response.clusterId,
remoteShards = response.shards,
localClusterId = m.clusterId,
localShards = m.shards,
peer = conn.peerId
try:
discard await m.respond(conn)
except CatchableError:
error "Failed to respond to WakuMetadata request",
error = getCurrentExceptionMsg()
m.handler = handler
m.codec = WakuMetadataCodec
proc new*(
T: type WakuMetadata,
clusterId: uint32,
shards: HashSet[uint32],
queue: Option[AsyncEventQueue[SubscriptionEvent]],
): T =
let wm =
WakuMetadata(clusterId: clusterId, shards: shards, topicSubscriptionQueue: queue)
wm.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId = wm.clusterId, shards = wm.shards
return wm
proc subscriptionsListener(wm: WakuMetadata) {.async.} =
## Listen for pubsub topics subscriptions changes
if wm.topicSubscriptionQueue.isSome():
let key = wm.topicSubscriptionQueue.get().register()
while wm.started:
let events = await wm.topicSubscriptionQueue.get().waitEvents(key)
for event in events:
let parsedShard = RelayShard.parse(event.topic).valueOr:
continue
if parsedShard.clusterId != wm.clusterId:
continue
case event.kind
of PubsubSub:
wm.shards.incl(parsedShard.shardId)
of PubsubUnsub:
wm.shards.excl(parsedShard.shardId)
else:
continue
wm.topicSubscriptionQueue.get().unregister(key)
proc start*(wm: WakuMetadata) =
wm.started = true
asyncSpawn wm.subscriptionsListener()
proc stop*(wm: WakuMetadata) =
wm.started = false