2024-06-28 16:04:57 +05:30
|
|
|
{.push raises: [].}
|
2023-10-11 08:58:45 +02:00
|
|
|
|
|
|
|
|
import
|
2025-10-08 20:06:46 +05:30
|
|
|
std/[options, sequtils],
|
2024-07-09 13:14:28 +02:00
|
|
|
results,
|
2023-10-11 08:58:45 +02:00
|
|
|
chronicles,
|
|
|
|
|
chronos,
|
|
|
|
|
metrics,
|
|
|
|
|
libp2p/protocols/protocol,
|
|
|
|
|
libp2p/stream/connection,
|
|
|
|
|
libp2p/crypto/crypto,
|
|
|
|
|
eth/p2p/discoveryv5/enr
|
2025-08-29 18:43:29 +05:30
|
|
|
import ../common/nimchronos, ../waku_core, ./rpc, ../common/callbacks
|
2023-10-11 08:58:45 +02:00
|
|
|
|
2024-10-24 15:31:04 +03:00
|
|
|
from ../waku_core/codecs import WakuMetadataCodec
|
|
|
|
|
export WakuMetadataCodec
|
|
|
|
|
|
2023-10-11 08:58:45 +02:00
|
|
|
logScope:
|
|
|
|
|
topics = "waku metadata"
|
|
|
|
|
|
|
|
|
|
const RpcResponseMaxBytes* = 1024
|
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
type WakuMetadata* = ref object of LPProtocol
|
|
|
|
|
clusterId*: uint32
|
2025-08-29 18:43:29 +05:30
|
|
|
getShards: GetShards
|
2024-03-16 00:08:47 +01:00
|
|
|
|
|
|
|
|
proc respond(
|
|
|
|
|
m: WakuMetadata, conn: Connection
|
|
|
|
|
): Future[Result[void, string]] {.async, gcsafe.} =
|
2025-08-29 18:43:29 +05:30
|
|
|
let response = WakuMetadataResponse(
|
|
|
|
|
clusterId: some(m.clusterId.uint32), shards: m.getShards().mapIt(it.uint32)
|
|
|
|
|
)
|
2024-03-16 00:08:47 +01:00
|
|
|
|
|
|
|
|
let res = catch:
|
|
|
|
|
await conn.writeLP(response.encode().buffer)
|
2025-10-27 14:07:06 -03:00
|
|
|
res.isOkOr:
|
|
|
|
|
return err(error.msg)
|
2023-10-11 08:58:45 +02:00
|
|
|
|
|
|
|
|
return ok()
|
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
proc request*(
|
|
|
|
|
m: WakuMetadata, conn: Connection
|
|
|
|
|
): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
|
2025-08-29 18:43:29 +05:30
|
|
|
let request = WakuMetadataRequest(
|
|
|
|
|
clusterId: some(m.clusterId), shards: m.getShards().mapIt(it.uint32)
|
|
|
|
|
)
|
2024-03-16 00:08:47 +01:00
|
|
|
|
|
|
|
|
let writeRes = catch:
|
|
|
|
|
await conn.writeLP(request.encode().buffer)
|
|
|
|
|
let readRes = catch:
|
|
|
|
|
await conn.readLp(RpcResponseMaxBytes)
|
2023-10-30 16:58:15 -04:00
|
|
|
|
2025-08-13 10:48:56 +05:30
|
|
|
# close no matter what
|
2024-03-16 00:08:47 +01:00
|
|
|
let closeRes = catch:
|
|
|
|
|
await conn.closeWithEof()
|
2025-10-27 14:07:06 -03:00
|
|
|
closeRes.isOkOr:
|
|
|
|
|
return err("close failed: " & error.msg)
|
2023-10-11 08:58:45 +02:00
|
|
|
|
2025-10-27 14:07:06 -03:00
|
|
|
writeRes.isOkOr:
|
|
|
|
|
return err("write failed: " & error.msg)
|
2023-10-11 08:58:45 +02:00
|
|
|
|
2025-10-27 14:07:06 -03:00
|
|
|
let buffer = readRes.valueOr:
|
|
|
|
|
return err("read failed: " & error.msg)
|
2023-10-11 08:58:45 +02:00
|
|
|
|
2023-10-30 16:58:15 -04:00
|
|
|
let response = WakuMetadataResponse.decode(buffer).valueOr:
|
|
|
|
|
return err("decode failed: " & $error)
|
|
|
|
|
|
|
|
|
|
return ok(response)
|
2023-10-11 08:58:45 +02:00
|
|
|
|
2024-03-04 15:31:37 +01:00
|
|
|
proc initProtocolHandler(m: WakuMetadata) =
|
2025-05-26 21:58:02 +02:00
|
|
|
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
|
|
|
|
defer:
|
|
|
|
|
# close, no data is expected
|
|
|
|
|
await conn.closeWithEof()
|
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
let res = catch:
|
|
|
|
|
await conn.readLp(RpcResponseMaxBytes)
|
2023-10-30 16:58:15 -04:00
|
|
|
let buffer = res.valueOr:
|
2024-03-16 00:08:47 +01:00
|
|
|
error "Connection reading error", error = error.msg
|
2023-10-11 08:58:45 +02:00
|
|
|
return
|
|
|
|
|
|
2023-10-30 16:58:15 -04:00
|
|
|
let response = WakuMetadataResponse.decode(buffer).valueOr:
|
2024-03-16 00:08:47 +01:00
|
|
|
error "Response decoding error", error = error
|
2023-10-11 08:58:45 +02:00
|
|
|
return
|
|
|
|
|
|
2025-10-15 10:49:36 +02:00
|
|
|
info "Received WakuMetadata request",
|
2024-03-16 00:08:47 +01:00
|
|
|
remoteClusterId = response.clusterId,
|
|
|
|
|
remoteShards = response.shards,
|
|
|
|
|
localClusterId = m.clusterId,
|
2025-08-29 18:43:29 +05:30
|
|
|
localShards = m.getShards(),
|
2024-07-30 19:26:49 +05:30
|
|
|
peer = conn.peerId
|
2023-10-11 08:58:45 +02:00
|
|
|
|
2025-05-26 21:58:02 +02:00
|
|
|
try:
|
|
|
|
|
discard await m.respond(conn)
|
|
|
|
|
except CatchableError:
|
|
|
|
|
error "Failed to respond to WakuMetadata request",
|
|
|
|
|
error = getCurrentExceptionMsg()
|
2023-10-11 08:58:45 +02:00
|
|
|
|
2025-05-26 21:58:02 +02:00
|
|
|
m.handler = handler
|
2023-10-11 08:58:45 +02:00
|
|
|
m.codec = WakuMetadataCodec
|
|
|
|
|
|
2025-08-29 18:43:29 +05:30
|
|
|
proc new*(T: type WakuMetadata, clusterId: uint32, getShards: GetShards): T =
|
|
|
|
|
let wm = WakuMetadata(clusterId: clusterId, getShards: getShards)
|
2023-10-30 16:58:15 -04:00
|
|
|
|
|
|
|
|
wm.initProtocolHandler()
|
|
|
|
|
|
2025-08-29 18:43:29 +05:30
|
|
|
info "Created WakuMetadata protocol",
|
|
|
|
|
clusterId = wm.clusterId, shards = wm.getShards()
|
2023-10-30 16:58:15 -04:00
|
|
|
|
|
|
|
|
return wm
|
|
|
|
|
|
|
|
|
|
proc start*(wm: WakuMetadata) =
|
|
|
|
|
wm.started = true
|
|
|
|
|
|
|
|
|
|
proc stop*(wm: WakuMetadata) =
|
2024-03-16 00:08:47 +01:00
|
|
|
wm.started = false
|