From 606757fdbb752ba31e740f380cec33edb4ada12e Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Wed, 11 Oct 2023 08:58:45 +0200 Subject: [PATCH] feat: add new metadata protocol (#2062) --- apps/wakunode2/external_config.nim | 5 ++ apps/wakunode2/internal_config.nim | 11 +-- tests/test_peer_manager.nim | 44 ++++++++++ tests/test_waku_metadata.nim | 50 ++++++++++++ tests/test_waku_protobufs.nim | 45 +++++++++++ tests/testlib/wakunode.nim | 10 ++- waku/node/config.nim | 3 + waku/node/peer_manager/peer_manager.nim | 52 +++++++++++- waku/node/waku_node.nim | 12 ++- waku/waku_metadata.nim | 10 +++ waku/waku_metadata/protocol.nim | 103 ++++++++++++++++++++++++ waku/waku_metadata/rpc.nim | 76 +++++++++++++++++ 12 files changed, 408 insertions(+), 13 deletions(-) create mode 100644 tests/test_waku_metadata.nim create mode 100644 tests/test_waku_protobufs.nim create mode 100644 waku/waku_metadata.nim create mode 100644 waku/waku_metadata/protocol.nim create mode 100644 waku/waku_metadata/rpc.nim diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index ac0c5cd30..0ec5c1fb0 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -54,6 +54,11 @@ type name: "log-format" .}: logging.LogFormat ## General node config + clusterId* {. + desc: "Cluster id that the node is running in. Node in a different cluster id is disconnected." + defaultValue: 0 + name: "cluster-id" }: uint32 + agentString* {. defaultValue: "nwaku", desc: "Node agent string which is used as identifier in network" diff --git a/apps/wakunode2/internal_config.nim b/apps/wakunode2/internal_config.nim index a5de21eee..06fc659bb 100644 --- a/apps/wakunode2/internal_config.nim +++ b/apps/wakunode2/internal_config.nim @@ -22,12 +22,12 @@ proc validateExtMultiAddrs*(vals: seq[string]): return ok(multiaddrs) proc dnsResolve*(domain: string, conf: WakuNodeConf): Future[Result[string, string]] {.async} = - + # Use conf's DNS servers var nameServers: seq[TransportAddress] for ip in conf.dnsAddrsNameServers: nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - + let dnsResolver = DnsResolver.new(nameServers) # Resolve domain IP @@ -93,18 +93,19 @@ proc networkConfiguration*(conf: WakuNodeConf, if dns4DomainName.isSome() and extIp.isNone(): try: let dnsRes = waitFor dnsResolve(conf.dns4DomainName, conf) - + if dnsRes.isErr(): return err($dnsRes.error) # Pass error down the stack - + extIp = some(ValidIpAddress.init(dnsRes.get())) except CatchableError: return err("Could not update extIp to resolved DNS IP: " & getCurrentExceptionMsg()) - + # Wrap in none because NetConfig does not have a default constructor # TODO: We could change bindIp in NetConfig to be something less restrictive # than ValidIpAddress, which doesn't allow default construction let netConfigRes = NetConfig.init( + clusterId = conf.clusterId, bindIp = conf.listenAddress, bindPort = Port(uint16(conf.tcpPort) + conf.portsShift), extIp = extIp, diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index cbd6d7ac2..b3ef488d8 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -26,6 +26,7 @@ import ../../waku/waku_filter, ../../waku/waku_lightpush, ../../waku/waku_peer_exchange, + ../../waku/waku_metadata, ./testlib/common, ./testlib/testutils, ./testlib/wakucore, @@ -38,6 +39,8 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.start())) let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo()) + await sleepAsync(chronos.milliseconds(500)) + check: connOk == true nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId) @@ -53,6 +56,8 @@ procSuite "Peer Manager": # Dial node2 from node1 let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) + await sleepAsync(chronos.milliseconds(500)) + # Check connection check: conn.isSome() @@ -145,6 +150,7 @@ procSuite "Peer Manager": let nonExistentPeer = nonExistentPeerRes.value require: (await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false + await sleepAsync(chronos.milliseconds(500)) check: # Cannot connect to node2 @@ -153,6 +159,8 @@ procSuite "Peer Manager": # Successful connection require: (await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) == true + await sleepAsync(chronos.milliseconds(500)) + check: # Currently connected to node2 nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected @@ -229,6 +237,8 @@ procSuite "Peer Manager": require: (await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true + await sleepAsync(chronos.milliseconds(500)) + check: # Currently connected to node2 node1.peerManager.peerStore.peers().len == 1 @@ -257,6 +267,36 @@ procSuite "Peer Manager": await allFutures([node1.stop(), node2.stop(), node3.stop()]) + asyncTest "Peer manager drops conections to peers on different networks": + let clusterId1 = 1.uint32 + let clusterId2 = 2.uint32 + + let + # different network + node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1) + + # same network + node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2) + node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2) + + # Start nodes + await allFutures([node1.start(), node2.start(), node3.start()]) + + # 1->2 (fails) + let conn1 = await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec) + + # 1->3 (fails) + let conn2 = await node1.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec) + + # 2->3 (succeeds) + let conn3 = await node2.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec) + + check: + conn1.isNone + conn2.isNone + conn3.isSome + + # TODO: nwaku/issues/1377 xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": let @@ -370,6 +410,8 @@ procSuite "Peer Manager": (await nodes[2].peerManager.connectRelay(peerInfos[0])) == true (await nodes[3].peerManager.connectRelay(peerInfos[0])) == true + await sleepAsync(chronos.milliseconds(500)) + check: # Peerstore track all three peers nodes[0].peerManager.peerStore.peers().len == 3 @@ -749,6 +791,7 @@ procSuite "Peer Manager": # 2 in connections discard await nodes[1].peerManager.connectRelay(pInfos[0]) discard await nodes[2].peerManager.connectRelay(pInfos[0]) + await sleepAsync(chronos.milliseconds(500)) # but one is pruned check nodes[0].peerManager.switch.connManager.getConnections().len == 1 @@ -756,6 +799,7 @@ procSuite "Peer Manager": # 2 out connections discard await nodes[0].peerManager.connectRelay(pInfos[3]) discard await nodes[0].peerManager.connectRelay(pInfos[4]) + await sleepAsync(chronos.milliseconds(500)) # they are also prunned check nodes[0].peerManager.switch.connManager.getConnections().len == 1 diff --git a/tests/test_waku_metadata.nim b/tests/test_waku_metadata.nim new file mode 100644 index 000000000..687ef7344 --- /dev/null +++ b/tests/test_waku_metadata.nim @@ -0,0 +1,50 @@ +{.used.} + +import + std/[options, sequtils, tables], + testutils/unittests, + chronos, + chronicles, + stew/shims/net, + libp2p/switch, + libp2p/peerId, + libp2p/crypto/crypto, + libp2p/multistream, + libp2p/muxers/muxer, + eth/keys, + eth/p2p/discoveryv5/enr +import + ../../waku/waku_node, + ../../waku/node/peer_manager, + ../../waku/waku_discv5, + ../../waku/waku_metadata, + ./testlib/wakucore, + ./testlib/wakunode + + +procSuite "Waku Metadata Protocol": + + # TODO: Add tests with shards when ready + asyncTest "request() returns the supported metadata of the peer": + let clusterId = 10.uint32 + let + node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId) + node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId) + + # Start nodes + await allFutures([node1.start(), node2.start()]) + + # Create connection + let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec) + require: + connOpt.isSome + + # Request metadata + let response1 = await node2.wakuMetadata.request(connOpt.get()) + + # Check the response or dont even continue + require: + response1.isOk + + check: + response1.get().clusterId.get() == clusterId diff --git a/tests/test_waku_protobufs.nim b/tests/test_waku_protobufs.nim new file mode 100644 index 000000000..b06eee847 --- /dev/null +++ b/tests/test_waku_protobufs.nim @@ -0,0 +1,45 @@ +{.used.} + +import + std/[options, sequtils, tables], + testutils/unittests, + chronos, + chronicles +import + ../../waku/waku_metadata, + ../../waku/waku_metadata/rpc, + ./testlib/wakucore, + ./testlib/wakunode + + +procSuite "Waku Protobufs": + # TODO: Missing test coverage in many encode/decode protobuf functions + + test "WakuMetadataResponse": + let res = WakuMetadataResponse( + clusterId: some(7), + shards: @[10, 23, 33], + ) + + let buffer = res.encode() + + let decodedBuff = WakuMetadataResponse.decode(buffer.buffer) + check: + decodedBuff.isOk() + decodedBuff.get().clusterId.get() == res.clusterId.get() + decodedBuff.get().shards == res.shards + + test "WakuMetadataRequest": + let req = WakuMetadataRequest( + clusterId: some(5), + shards: @[100, 2, 0], + ) + + let buffer = req.encode() + + let decodedBuff = WakuMetadataRequest.decode(buffer.buffer) + check: + decodedBuff.isOk() + decodedBuff.get().clusterId.get() == req.clusterId.get() + decodedBuff.get().shards == req.shards + diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 5920d3fca..4e3b3cf7e 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -54,20 +54,21 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), + clusterId: uint32 = 0.uint32, peerStoreCapacity = none(int)): WakuNode = - + var resolvedExtIp = extIp - # Update extPort to default value if it's missing and there's an extIp or a DNS domain + # Update extPort to default value if it's missing and there's an extIp or a DNS domain let extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000)) else: extPort - + if dns4DomainName.isSome() and extIp.isNone(): let conf = defaultTestWakuNodeConf() - # If there's an error resolving the IP, an exception is thrown and test fails + # If there's an error resolving the IP, an exception is thrown and test fails let dnsRes = waitFor dnsResolve(dns4DomainName.get(), conf) if dnsRes.isErr(): raise newException(Defect, $dnsRes.error) @@ -76,6 +77,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, let netConfigRes = NetConfig.init( bindIp = bindIp, + clusterId = clusterId, bindPort = bindPort, extIp = resolvedExtIp, extPort = extPort, diff --git a/waku/node/config.nim b/waku/node/config.nim index b1817904d..71cfaf8bc 100644 --- a/waku/node/config.nim +++ b/waku/node/config.nim @@ -16,6 +16,7 @@ import type NetConfig* = object hostAddress*: MultiAddress + clusterId*: uint32 wsHostAddress*: Option[MultiAddress] hostExtAddress*: Option[MultiAddress] wsExtAddress*: Option[MultiAddress] @@ -69,6 +70,7 @@ proc init*(T: type NetConfig, wssEnabled: bool = false, dns4DomainName = none(string), discv5UdpPort = none(Port), + clusterId: uint32 = 0, wakuFlags = none(CapabilitiesBitfield)): NetConfigResult = ## Initialize and validate waku node network configuration @@ -137,6 +139,7 @@ proc init*(T: type NetConfig, ok(NetConfig( hostAddress: hostAddress, + clusterId: clusterId, wsHostAddress: wsHostAddress, hostExtAddress: hostExtAddress, wsExtAddress: wsExtAddress, diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index a705ed450..009d0879e 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -17,6 +17,7 @@ import ../../waku_core, ../../waku_relay, ../../waku_enr/sharding, + ../../waku_metadata, ./peer_store/peer_storage, ./waku_peer_store @@ -66,6 +67,7 @@ type PeerManager* = ref object of RootObj switch*: Switch peerStore*: PeerStore + wakuMetadata*: WakuMetadata initialBackoffInSec*: int backoffFactor*: int maxFailedAttempts*: int @@ -138,7 +140,7 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: - pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) + pm.storage.insertOrReplace(remotePeerInfo.peerId, remotePeerInfo, NotConnected) # Connects to a given node. Note that this function uses `connect` and # does not provide a protocol. Streams for relay (gossipsub) are created @@ -231,6 +233,7 @@ proc dialPeer(pm: PeerManager, proc loadFromStorage(pm: PeerManager) = debug "loading peers from storage" # Load peers from storage, if available + var amount = 0 proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) = trace "loading peer", peerId=peerId, connectedness=connectedness @@ -250,12 +253,15 @@ proc loadFromStorage(pm: PeerManager) = pm.peerStore[DisconnectBook][peerId] = disconnectTime pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin + amount.inc() + let res = pm.storage.getAll(onData) if res.isErr: warn "failed to load peers from storage", err = res.error waku_peers_errors.inc(labelValues = ["storage_load_failure"]) - else: - debug "successfully queried peer storage" + return + + debug "successfully queried peer storage", amount = amount proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = @@ -315,6 +321,44 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = direction = if event.initiator: Outbound else: Inbound connectedness = Connected + var clusterOk = false + var reason = "" + # To prevent metadata protocol from breaking prev nodes, by now we only + # disconnect if the clusterid is specified. + if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0: + block wakuMetadata: + var conn: Connection + try: + conn = await pm.switch.dial(peerId, WakuMetadataCodec) + except CatchableError: + reason = "waku metadata codec not supported" + break wakuMetadata + + # request metadata from connecting peer + let metadata = (await pm.wakuMetadata.request(conn)).valueOr: + reason = "failed waku metadata codec request" + break wakuMetadata + + # does not report any clusterId + let clusterId = metadata.clusterId.valueOr: + reason = "empty clusterId reported" + break wakuMetadata + + # drop it if it doesnt match our network id + if pm.wakuMetadata.clusterId != clusterId: + reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId + break wakuMetadata + + # reaching here means the clusterId matches + clusterOk = true + + if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk: + info "disconnecting from peer", peerId=peerId, reason=reason + asyncSpawn(pm.switch.disconnect(peerId)) + pm.peerStore.delete(peerId) + + # TODO: Take action depending on the supported shards as reported by metadata + let ip = pm.getPeerIp(peerId) if ip.isSome: pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) @@ -346,6 +390,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = proc new*(T: type PeerManager, switch: Switch, + wakuMetadata: WakuMetadata = nil, maxRelayPeers: Option[int] = none(int), storage: PeerStorage = nil, initialBackoffInSec = InitialBackoffInSec, @@ -388,6 +433,7 @@ proc new*(T: type PeerManager, let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10) let pm = PeerManager(switch: switch, + wakuMetadata: wakuMetadata, peerStore: switch.peerStore, storage: storage, initialBackoffInSec: initialBackoffInSec, diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index eb04d15f0..83207024e 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -35,6 +35,7 @@ import ../waku_filter_v2, ../waku_filter_v2/client as filter_client, ../waku_lightpush, + ../waku_metadata, ../waku_lightpush/client as lightpush_client, ../waku_enr, ../waku_dnsdisc, @@ -95,6 +96,7 @@ type wakuLightPush*: WakuLightPush wakuLightpushClient*: WakuLightPushClient wakuPeerExchange*: WakuPeerExchange + wakuMetadata*: WakuMetadata enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext @@ -143,7 +145,7 @@ proc new*(T: type WakuNode, let queue = newAsyncEventQueue[SubscriptionEvent](30) - return WakuNode( + let node = WakuNode( peerManager: peerManager, switch: switch, rng: rng, @@ -152,6 +154,14 @@ proc new*(T: type WakuNode, topicSubscriptionQueue: queue ) + # mount metadata protocol + let metadata = WakuMetadata.new(netConfig.clusterId) + node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec)) + node.wakuMetadata = metadata + peerManager.wakuMetadata = metadata + + return node + proc peerInfo*(node: WakuNode): PeerInfo = node.switch.peerInfo diff --git a/waku/waku_metadata.nim b/waku/waku_metadata.nim new file mode 100644 index 000000000..efa9b4234 --- /dev/null +++ b/waku/waku_metadata.nim @@ -0,0 +1,10 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + ./waku_metadata/protocol + +export + protocol diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim new file mode 100644 index 000000000..a8b5ae227 --- /dev/null +++ b/waku/waku_metadata/protocol.nim @@ -0,0 +1,103 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[options, sequtils, random], + stew/results, + chronicles, + chronos, + metrics, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../waku_core, + ./rpc + +logScope: + topics = "waku metadata" + +const WakuMetadataCodec* = "/vac/waku/metadata/1.0.0" +const RpcResponseMaxBytes* = 1024 + +type + WakuMetadata* = ref object of LPProtocol + clusterId*: uint32 + shards*: seq[uint32] + +proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} = + try: + await conn.writeLP(WakuMetadataResponse( + clusterId: some(m.clusterId), + shards: m.shards + ).encode().buffer) + except CatchableError as exc: + return err(exc.msg) + + return ok() + +proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} = + var buffer: seq[byte] + var error: string + try: + await conn.writeLP(WakuMetadataRequest( + clusterId: some(m.clusterId), + shards: m.shards, + ).encode().buffer) + buffer = await conn.readLp(RpcResponseMaxBytes) + except CatchableError as exc: + error = $exc.msg + finally: + # close, no more data is expected + await conn.closeWithEof() + + if error.len > 0: + return err("write/read failed: " & error) + + let decodedBuff = WakuMetadataResponse.decode(buffer) + if decodedBuff.isErr(): + return err("decode failed: " & $decodedBuff.error) + + echo decodedBuff.get().clusterId + return ok(decodedBuff.get()) + +proc initProtocolHandler*(m: WakuMetadata) = + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + var buffer: seq[byte] + try: + buffer = await conn.readLp(RpcResponseMaxBytes) + except CatchableError as exc: + return + + let decBuf = WakuMetadataResponse.decode(buffer) + if decBuf.isErr(): + return + + let response = decBuf.get() + debug "Received WakuMetadata request", + remoteClusterId=response.clusterId, + remoteShards=response.shards, + localClusterId=m.clusterId, + localShards=m.shards + + discard await m.respond(conn) + + # close, no data is expected + await conn.closeWithEof() + + m.handler = handle + m.codec = WakuMetadataCodec + +proc new*(T: type WakuMetadata, clusterId: uint32): T = + let m = WakuMetadata( + clusterId: clusterId, + # TODO: must be updated real time + shards: @[], + ) + m.initProtocolHandler() + info "Created WakuMetadata protocol", clusterId=clusterId + return m diff --git a/waku/waku_metadata/rpc.nim b/waku/waku_metadata/rpc.nim new file mode 100644 index 000000000..dd6034d81 --- /dev/null +++ b/waku/waku_metadata/rpc.nim @@ -0,0 +1,76 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options + +import + ../common/protobuf + +type + WakuMetadataRequest* = object + clusterId*: Option[uint32] + shards*: seq[uint32] + +type + WakuMetadataResponse* = object + clusterId*: Option[uint32] + shards*: seq[uint32] + +proc encode*(rpc: WakuMetadataRequest): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.clusterId) + + for shard in rpc.shards: + pb.write3(2, shard) + pb.finish3() + + pb + +proc decode*(T: type WakuMetadataRequest, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = WakuMetadataRequest() + + var clusterId: uint64 + if not ?pb.getField(1, clusterId): + rpc.clusterId = none(uint32) + else: + rpc.clusterId = some(clusterId.uint32) + + var shards: seq[uint64] + if ?pb.getRepeatedField(2, shards): + for shard in shards: + rpc.shards.add(shard.uint32) + + ok(rpc) + +proc encode*(rpc: WakuMetadataResponse): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.clusterId) + + for shard in rpc.shards: + pb.write3(2, shard) + pb.finish3() + + pb + +proc decode*(T: type WakuMetadataResponse, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = WakuMetadataResponse() + + var clusterId: uint64 + if not ?pb.getField(1, clusterId): + rpc.clusterId = none(uint32) + else: + rpc.clusterId = some(clusterId.uint32) + + var shards: seq[uint64] + if ?pb.getRepeatedField(2, shards): + for shard in shards: + rpc.shards.add(shard.uint32) + + ok(rpc)