fryorcraken 994d485b49 chore!: make sharding configuration explicit (#3468)
* Reserve `networkconfig` name to waku network related settings

* Rename cluster conf to network conf

 A `NetworkConf` is a Waku network configuration.

# Conflicts:
#	tests/factory/test_waku_conf.nim

# Conflicts:
#	tests/factory/test_waku_conf.nim

* Improve sharding configuration

A smarter data types simplifies the logic.

* Fixing tests

* fixup! rename to endpointConf

* wip: autosharding is a specific configuration state and treat it like
it

# Conflicts:
#	waku/factory/external_config.nim

* refactor lightpush handler

some metrics error reporting were missing

# Conflicts:
#	waku/waku_lightpush/protocol.nim

* test_node_factory tests pass

* remove warnings

* fix tests

* Revert eager previous replace-all command

* fix up build tools compilation

* metadata is used to store cluster id

* Mount relay routes in static sharding

* Rename activeRelayShards to subscribeShards

To make it clearer that these are the shards the node will subscribe to.

* Remove unused msg var

* Improve error handling

* Set autosharding as default, with 1 shard in network

Also makes shards to subscribe to all shards in auto sharding, none in
static sharding.
2025-07-04 17:10:53 +10:00

161 lines
4.1 KiB
Nim

{.push raises: [].}
import
std/[options, sequtils, sets],
results,
chronicles,
chronos,
metrics,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import ../common/nimchronos, ../common/enr, ../waku_core, ../waku_enr, ./rpc
from ../waku_core/codecs import WakuMetadataCodec
export WakuMetadataCodec
logScope:
topics = "waku metadata"
const RpcResponseMaxBytes* = 1024
type WakuMetadata* = ref object of LPProtocol
clusterId*: uint32
shards*: HashSet[uint32]
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
proc respond(
m: WakuMetadata, conn: Connection
): Future[Result[void, string]] {.async, gcsafe.} =
let response =
WakuMetadataResponse(clusterId: some(m.clusterId.uint32), shards: toSeq(m.shards))
let res = catch:
await conn.writeLP(response.encode().buffer)
if res.isErr():
return err(res.error.msg)
return ok()
proc request*(
m: WakuMetadata, conn: Connection
): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
let request =
WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards))
let writeRes = catch:
await conn.writeLP(request.encode().buffer)
let readRes = catch:
await conn.readLp(RpcResponseMaxBytes)
# close no watter what
let closeRes = catch:
await conn.closeWithEof()
if closeRes.isErr():
return err("close failed: " & closeRes.error.msg)
if writeRes.isErr():
return err("write failed: " & writeRes.error.msg)
let buffer =
if readRes.isErr():
return err("read failed: " & readRes.error.msg)
else:
readRes.get()
let response = WakuMetadataResponse.decode(buffer).valueOr:
return err("decode failed: " & $error)
return ok(response)
proc initProtocolHandler(m: WakuMetadata) =
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
defer:
# close, no data is expected
await conn.closeWithEof()
let res = catch:
await conn.readLp(RpcResponseMaxBytes)
let buffer = res.valueOr:
error "Connection reading error", error = error.msg
return
let response = WakuMetadataResponse.decode(buffer).valueOr:
error "Response decoding error", error = error
return
debug "Received WakuMetadata request",
remoteClusterId = response.clusterId,
remoteShards = response.shards,
localClusterId = m.clusterId,
localShards = m.shards,
peer = conn.peerId
try:
discard await m.respond(conn)
except CatchableError:
error "Failed to respond to WakuMetadata request",
error = getCurrentExceptionMsg()
m.handler = handler
m.codec = WakuMetadataCodec
proc new*(
T: type WakuMetadata,
clusterId: uint32,
enr: Record,
queue: AsyncEventQueue[SubscriptionEvent],
): T =
var (cluster, shards) = (clusterId, initHashSet[uint32]())
let enrRes = enr.toTyped()
if enrRes.isOk():
let shardingRes = enrRes.get().relaySharding()
if shardingRes.isSome():
let relayShard = shardingRes.get()
cluster = uint32(relayShard.clusterId)
shards = toHashSet(relayShard.shardIds.mapIt(uint32(it)))
let wm =
WakuMetadata(clusterId: cluster, shards: shards, topicSubscriptionQueue: queue)
wm.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId = wm.clusterId, shards = wm.shards
return wm
proc subscriptionsListener(wm: WakuMetadata) {.async.} =
## Listen for pubsub topics subscriptions changes
let key = wm.topicSubscriptionQueue.register()
while wm.started:
let events = await wm.topicSubscriptionQueue.waitEvents(key)
for event in events:
let parsedShard = RelayShard.parse(event.topic).valueOr:
continue
if parsedShard.clusterId != wm.clusterId:
continue
case event.kind
of PubsubSub:
wm.shards.incl(parsedShard.shardId)
of PubsubUnsub:
wm.shards.excl(parsedShard.shardId)
else:
continue
wm.topicSubscriptionQueue.unregister(key)
proc start*(wm: WakuMetadata) =
wm.started = true
asyncSpawn wm.subscriptionsListener()
proc stop*(wm: WakuMetadata) =
wm.started = false