From 5a225809cdfbd6144a3257ff1ec688c41fadb270 Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Wed, 13 Mar 2024 10:58:13 +0100 Subject: [PATCH] fix: enable autosharding in any cluster (#2505) --- tests/waku_core/topics/test_sharding.nim | 119 ++++++++++++++--------- tests/waku_relay/test_wakunode_relay.nim | 10 +- tests/wakunode_rest/test_rest_relay.nim | 11 +-- waku/factory/internal_config.nim | 24 ++--- waku/factory/node_factory.nim | 5 +- waku/node/waku_node.nim | 23 +++-- waku/waku_api/rest/relay/handlers.nim | 2 +- waku/waku_core/topics/sharding.nim | 41 ++++---- waku/waku_enr/sharding.nim | 10 +- 9 files changed, 134 insertions(+), 111 deletions(-) diff --git a/tests/waku_core/topics/test_sharding.nim b/tests/waku_core/topics/test_sharding.nim index 102bb8219..fbab1a4b4 100644 --- a/tests/waku_core/topics/test_sharding.nim +++ b/tests/waku_core/topics/test_sharding.nim @@ -1,19 +1,9 @@ -import - std/[ - options, - tables - ], - testutils/unittests +import std/[options, tables], testutils/unittests +import ../../../../waku/waku_core/topics, ../../testlib/[wakucore, tables, testutils] -import - ../../../../waku/waku_core/topics, - ../../testlib/[ - wakucore, - tables, - testutils - ] - +const GenerationZeroShardsCount = 8 +const ClusterId = 1 suite "Autosharding": const @@ -22,78 +12,96 @@ suite "Autosharding": contentTopicShort = "/toychat/2/huilong/proto" contentTopicFull = "/0/toychat/2/huilong/proto" contentTopicInvalid = "/1/toychat/2/huilong/proto" - suite "getGenZeroShard": test "Generate Gen0 Shard": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # Given two valid topics - let + let nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value() nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value() - + # When we generate a gen0 shard from them - let - nsPubsubTopic1 = getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount) - nsPubsubTopic2 = getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount) - + let + nsPubsubTopic1 = + sharding.getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount) + nsPubsubTopic2 = + sharding.getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount) + # Then the generated shards are valid check: nsPubsubTopic1 == NsPubsubTopic.staticSharding(ClusterId, 3) nsPubsubTopic2 == NsPubsubTopic.staticSharding(ClusterId, 3) - + suite "getShard from NsContentTopic": test "Generate Gen0 Shard with topic.generation==none": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) + # When we get a shard from a topic without generation - let nsPubsubTopic = getShard(contentTopicShort) - + let nsPubsubTopic = sharding.getShard(contentTopicShort) + # Then the generated shard is valid check: nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3) test "Generate Gen0 Shard with topic.generation==0": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When we get a shard from a gen0 topic - let nsPubsubTopic = getShard(contentTopicFull) - + let nsPubsubTopic = sharding.getShard(contentTopicFull) + # Then the generated shard is valid check: nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3) test "Generate Gen0 Shard with topic.generation==other": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When we get a shard from ain invalid content topic - let nsPubsubTopic = getShard(contentTopicInvalid) - + let nsPubsubTopic = sharding.getShard(contentTopicInvalid) + # Then the generated shard is valid check: nsPubsubTopic.error() == "Generation > 0 are not supported yet" suite "getShard from ContentTopic": test "Generate Gen0 Shard with topic.generation==none": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When we get a shard from it - let nsPubsubTopic = getShard(contentTopicShort) - + let nsPubsubTopic = sharding.getShard(contentTopicShort) + # Then the generated shard is valid check: nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3) - + test "Generate Gen0 Shard with topic.generation==0": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When we get a shard from it - let nsPubsubTopic = getShard(contentTopicFull) - + let nsPubsubTopic = sharding.getShard(contentTopicFull) + # Then the generated shard is valid check: nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3) - + test "Generate Gen0 Shard with topic.generation==other": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When we get a shard from it - let nsPubsubTopic = getShard(contentTopicInvalid) - + let nsPubsubTopic = sharding.getShard(contentTopicInvalid) + # Then the generated shard is valid check: nsPubsubTopic.error() == "Generation > 0 are not supported yet" - + test "Generate Gen0 Shard invalid topic": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When we get a shard from it - let nsPubsubTopic = getShard("invalid") + let nsPubsubTopic = sharding.getShard("invalid") # Then the generated shard is valid check: @@ -101,52 +109,69 @@ suite "Autosharding": suite "parseSharding": test "contentTopics is ContentTopic": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When calling with contentTopic as string - let topicMap = parseSharding(some(pubsubTopic04), contentTopicShort) + let topicMap = sharding.parseSharding(some(pubsubTopic04), contentTopicShort) # Then the topicMap is valid check: topicMap.value() == {pubsubTopic04: @[contentTopicShort]} test "contentTopics is seq[ContentTopic]": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When calling with contentTopic as string seq - let topicMap = parseSharding(some(pubsubTopic04), @[contentTopicShort, "/0/foo/1/bar/proto"]) + let topicMap = sharding.parseSharding( + some(pubsubTopic04), @[contentTopicShort, "/0/foo/1/bar/proto"] + ) # Then the topicMap is valid check: topicMap.value() == {pubsubTopic04: @[contentTopicShort, "/0/foo/1/bar/proto"]} test "pubsubTopic is none": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When calling with pubsubTopic as none - let topicMap = parseSharding(PubsubTopic.none(), contentTopicShort) + let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicShort) # Then the topicMap is valid check: topicMap.value() == {pubsubTopic13: @[contentTopicShort]} test "content parse error": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When calling with pubsubTopic as none with invalid content - let topicMap = parseSharding(PubsubTopic.none(), "invalid") + let topicMap = sharding.parseSharding(PubsubTopic.none(), "invalid") # Then the topicMap is valid check: - topicMap.error() == "Cannot parse content topic: invalid format: topic must start with slash" + topicMap.error() == + "Cannot parse content topic: invalid format: topic must start with slash" test "pubsubTopic parse error": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When calling with pubsubTopic as none with invalid content - let topicMap = parseSharding(some("invalid"), contentTopicShort) + let topicMap = sharding.parseSharding(some("invalid"), contentTopicShort) # Then the topicMap is valid check: - topicMap.error() == "Cannot parse pubsub topic: invalid format: must start with /waku/2" + topicMap.error() == + "Cannot parse pubsub topic: invalid format: must start with /waku/2" test "pubsubTopic getShard error": + let sharding = + Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) # When calling with pubsubTopic as none with invalid content - let topicMap = parseSharding(PubsubTopic.none(), contentTopicInvalid) + let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicInvalid) # Then the topicMap is valid check: - topicMap.error() == "Cannot autoshard content topic: Generation > 0 are not supported yet" + topicMap.error() == + "Cannot autoshard content topic: Generation > 0 are not supported yet" xtest "catchable error on add to topicMap": # TODO: Trigger a CatchableError or mock diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index d98ef0b3d..15c1f09e6 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -506,10 +506,11 @@ suite "WakuNode - Relay": await node.start() await node.mountRelay() + require node.mountSharding(1, 1).isOk ## Given let - shard = "/waku/2/rs/1/1" + shard = "/waku/2/rs/1/0" contentTopicA = DefaultContentTopic contentTopicB = ContentTopic("/waku/2/default-content1/proto") contentTopicC = ContentTopic("/waku/2/default-content2/proto") @@ -520,10 +521,9 @@ suite "WakuNode - Relay": ): Future[void] {.gcsafe, raises: [Defect].} = discard pubsubTopic discard message - - assert shard == getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard" - assert shard == getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard" - assert shard == getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard" + assert shard == node.wakuSharding.getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard" + assert shard == node.wakuSharding.getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard" + assert shard == node.wakuSharding.getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard" ## When node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler)) diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index a287c0329..02a4b7c00 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -263,6 +263,7 @@ suite "Waku v2 Rest API - Relay": let node = testWakuNode() await node.start() await node.mountRelay() + require node.mountSharding(1, 8).isOk var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -276,13 +277,11 @@ suite "Waku v2 Rest API - Relay": restServer.start() let contentTopics = @[ - ContentTopic("/waku/2/default-content1/proto"), - ContentTopic("/waku/2/default-content2/proto"), - ContentTopic("/waku/2/default-content3/proto") + ContentTopic("/app-1/2/default-content/proto"), + ContentTopic("/app-2/2/default-content/proto"), + ContentTopic("/app-3/2/default-content/proto") ] - let shards = contentTopics.mapIt(getShard(it).expect("Valid Shard")).deduplicate() - # When let client = newRestHttpClient(initTAddress(restAddress, restPort)) let response = await client.relayPostAutoSubscriptionsV1(contentTopics) @@ -300,7 +299,7 @@ suite "Waku v2 Rest API - Relay": check: # Node should be subscribed to all shards - toSeq(node.wakuRelay.subscribedTopics).len == shards.len + node.wakuRelay.subscribedTopics == @["/waku/2/rs/1/7", "/waku/2/rs/1/2", "/waku/2/rs/1/5"] await restServer.stop() await restServer.closeWait() diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 132821060..50cb36160 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -31,24 +31,14 @@ proc enrConfiguration*(conf: WakuNodeConf, netConfig: NetConfig, key: crypto.Pri enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - let topics = - if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0: - let shardsRes = conf.contentTopics.mapIt(getShard(it)) - for res in shardsRes: - if res.isErr(): - error "failed to shard content topic", error=res.error - return err($res.error) + let shards: seq[uint16] = + # no shards configured + if conf.shards.len == 0: toSeq(0.. 0 or conf.contentTopics.len > 0: # TODO autoshard content topics only once. # Already checked for errors in app.init - let shards = conf.contentTopics.mapIt(getShard(it).expect("Valid Shard")) + let shards = conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard")) conf.pubsubTopics & shards else: conf.topics diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index fe016c15f..e4fea1db6 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -97,6 +97,7 @@ type wakuLightpushClient*: WakuLightPushClient wakuPeerExchange*: WakuPeerExchange wakuMetadata*: WakuMetadata + wakuSharding*: Sharding enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext @@ -199,6 +200,12 @@ proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = return ok() +## Waku Sharding +proc mountSharding*(node: WakuNode, clusterId: uint32, shardCount: uint32): Result[void, string] = + info "mounting sharding", clusterId=clusterId, shardCount=shardCount + node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) + return ok() + ## Waku relay proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = @@ -255,7 +262,7 @@ proc subscribe*(node: WakuNode, subscription: SubscriptionEvent, handler = none( let (pubsubTopic, contentTopicOp) = case subscription.kind: of ContentSub: - let shard = getShard((subscription.topic)).valueOr: + let shard = node.wakuSharding.getShard((subscription.topic)).valueOr: error "Autosharding error", error=error return @@ -288,7 +295,7 @@ proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) = let (pubsubTopic, contentTopicOp) = case subscription.kind: of ContentUnsub: - let shard = getShard((subscription.topic)).valueOr: + let shard = node.wakuSharding.getShard((subscription.topic)).valueOr: error "Autosharding error", error=error return @@ -329,7 +336,7 @@ proc publish*( return err(msg) let pubsubTopic = pubsubTopicOp.valueOr: - getShard(message.contentTopic).valueOr: + node.wakuSharding.getShard(message.contentTopic).valueOr: let msg = "Autosharding error: " & error error "publish error", msg=msg return err(msg) @@ -514,7 +521,7 @@ proc legacyFilterSubscribe*(node: WakuNode, error "failed legacy filter subscription", error=res.error waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) else: - let topicMapRes = parseSharding(pubsubTopic, contentTopics) + let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) let topicMap = if topicMapRes.isErr(): @@ -580,7 +587,7 @@ proc filterSubscribe*(node: WakuNode, return subRes else: - let topicMapRes = parseSharding(pubsubTopic, contentTopics) + let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) let topicMap = if topicMapRes.isErr(): @@ -642,7 +649,7 @@ proc legacyFilterUnsubscribe*(node: WakuNode, error "failed filter unsubscription", error=res.error waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) else: - let topicMapRes = parseSharding(pubsubTopic, contentTopics) + let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) let topicMap = if topicMapRes.isErr(): @@ -705,7 +712,7 @@ proc filterUnsubscribe*(node: WakuNode, return unsubRes else: # pubsubTopic.isNone - let topicMapRes = parseSharding(pubsubTopic, contentTopics) + let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) let topicMap = if topicMapRes.isErr(): @@ -947,7 +954,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message debug "publishing message with lightpush", pubsubTopic=pubsubTopic.get(), contentTopic=message.contentTopic, peer=peer.peerId return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer) - let topicMapRes = parseSharding(pubsubTopic, message.contentTopic) + let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic) let topicMap = if topicMapRes.isErr(): diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index 00e94caa7..fca9f17a1 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -211,7 +211,7 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes var message: WakuMessage = req.toWakuMessage(version = 0).valueOr: return RestApiResponse.badRequest() - let pubsubTopic = getShard(message.contentTopic).valueOr: + let pubsubTopic = node.wakuSharding.getShard(message.contentTopic).valueOr: let msg = "Autosharding error: " & error error "publish error", msg=msg return RestApiResponse.badRequest("Failed to publish. " & msg) diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index c62bacfbf..ab8a32983 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -19,11 +19,16 @@ import ./content_topic, ./pubsub_topic -## For indices allocation and other magic numbers refer to RFC 64 -const ClusterId* = 1 -const GenerationZeroShardsCount* = 8 +type Sharding* = object + clusterId*: uint32 + # TODO: generations could be stored in a table here + shardCountGenZero*: uint32 -proc getGenZeroShard*(topic: NsContentTopic, count: int): NsPubsubTopic = +proc new*(T: type Sharding, clusterId: uint32, shardCount: uint32): T = + return Sharding(clusterId: clusterId, shardCountGenZero: shardCount) + + +proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): NsPubsubTopic = let bytes = toBytes(topic.application) & toBytes(topic.version) let hash = sha256.digest(bytes) @@ -33,35 +38,35 @@ proc getGenZeroShard*(topic: NsContentTopic, count: int): NsPubsubTopic = # This is equilavent to modulo shard count but faster let shard = hashValue and uint64((count - 1)) - - NsPubsubTopic.staticSharding(ClusterId, uint16(shard)) -proc getShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] = + NsPubsubTopic.staticSharding(uint16(s.clusterId), uint16(shard)) + +proc getShard*(s: Sharding, topic: NsContentTopic): Result[NsPubsubTopic, string] = ## Compute the (pubsub topic) shard to use for this content topic. - + if topic.generation.isNone(): ## Implicit generation # is 0 for all content topic - return ok(getGenZeroShard(topic, GenerationZeroShardsCount)) - + return ok(s.getGenZeroShard(topic, int(s.shardCountGenZero))) + case topic.generation.get(): - of 0: return ok(getGenZeroShard(topic, GenerationZeroShardsCount)) + of 0: return ok(s.getGenZeroShard(topic, int(s.shardCountGenZero))) else: return err("Generation > 0 are not supported yet") -proc getShard*(topic: ContentTopic): Result[PubsubTopic, string] = +proc getShard*(s: Sharding, topic: ContentTopic): Result[PubsubTopic, string] = let parsedTopic = NsContentTopic.parse(topic).valueOr: return err($error) - let shard = ?getShard(parsedTopic) + let shard = ?s.getShard(parsedTopic) ok($shard) -proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]): Result[Table[NsPubsubTopic, seq[NsContentTopic]], string] = +proc parseSharding*(s: Sharding, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]): Result[Table[NsPubsubTopic, seq[NsContentTopic]], string] = var topics: seq[ContentTopic] when contentTopics is seq[ContentTopic]: topics = contentTopics else: topics = @[contentTopics] - + var topicMap = initTable[NsPubsubTopic, seq[NsContentTopic]]() for contentTopic in topics: let parseRes = NsContentTopic.parse(contentTopic) @@ -79,7 +84,7 @@ proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopi return err("Cannot parse pubsub topic: " & $parseRes.error) else: parseRes.get() else: - let shardsRes = getShard(content) + let shardsRes = s.getShard(content) if shardsRes.isErr(): return err("Cannot autoshard content topic: " & $shardsRes.error) @@ -87,7 +92,7 @@ proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopi if not topicMap.hasKey(pubsub): topicMap[pubsub] = @[] - + try: topicMap[pubsub].add(content) except CatchableError: @@ -152,4 +157,4 @@ proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopi let (pubsub, _) = list[list.len - 1] - ok(pubsub) ]# \ No newline at end of file + ok(pubsub) ]# diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index 07f78bb5b..f5045ed6d 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -27,14 +27,8 @@ const type RelayShards* = object - clusterId: uint16 - shardIds: seq[uint16] - -func clusterId*(rs: RelayShards): uint16 = - rs.clusterId - -func shardIds*(rs: RelayShards): seq[uint16] = - rs.shardIds + clusterId*: uint16 + shardIds*: seq[uint16] func topics*(rs: RelayShards): seq[NsPubsubTopic] = rs.shardIds.mapIt(NsPubsubTopic.staticSharding(rs.clusterId, it))