mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
feat: update various protocols to autoshard (#1857)
* feat: update FILTER & LIGHTPUSH to autoshard
This commit is contained in:
parent
e760a34c62
commit
51fee9d744
@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) =
|
||||
|
||||
if not c.node.wakuLightPush.isNil():
|
||||
# Attempt lightpush
|
||||
asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message)
|
||||
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
|
||||
else:
|
||||
asyncSpawn c.node.publish(DefaultPubsubTopic, message)
|
||||
|
||||
@ -267,7 +267,7 @@ proc writeAndPrint(c: Chat) {.async.} =
|
||||
if not c.node.wakuFilter.isNil():
|
||||
echo "unsubscribing from content filters..."
|
||||
|
||||
await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic)
|
||||
await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)
|
||||
|
||||
echo "quitting..."
|
||||
|
||||
@ -473,7 +473,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
trace "Hit filter handler", contentTopic=msg.contentTopic
|
||||
chat.printReceivedMessage(msg)
|
||||
|
||||
await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler)
|
||||
await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)
|
||||
|
||||
else:
|
||||
error "Filter not mounted. Couldn't parse conf.filternode",
|
||||
@ -488,7 +488,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
chat.printReceivedMessage(msg)
|
||||
|
||||
let topic = DefaultPubsubTopic
|
||||
node.subscribe(topic, handler)
|
||||
await node.subscribe(some(topic), @[ContentTopic("")], handler)
|
||||
|
||||
when defined(rln):
|
||||
if conf.rlnRelay:
|
||||
|
||||
@ -135,7 +135,7 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
|
||||
error "failed to parse content topic", error=res.error
|
||||
quit(QuitFailure)
|
||||
|
||||
let shardsRes = contentTopicsRes.mapIt(singleHighestWeigthShard(it.get()))
|
||||
let shardsRes = contentTopicsRes.mapIt(getShard(it.get()))
|
||||
|
||||
for res in shardsRes:
|
||||
if res.isErr():
|
||||
@ -363,7 +363,7 @@ proc setupProtocols(node: WakuNode,
|
||||
# TODO autoshard content topics only once.
|
||||
# Already checked for errors in app.init
|
||||
let contentTopics = conf.contentTopics.mapIt(NsContentTopic.parse(it).expect("Parsing"))
|
||||
let shards = contentTopics.mapIt($(singleHighestWeigthShard(it).expect("Sharding")))
|
||||
let shards = contentTopics.mapIt($(getShard(it).expect("Sharding")))
|
||||
|
||||
let pubsubTopics = conf.topics & conf.pubsubTopics & shards
|
||||
try:
|
||||
|
||||
@ -115,4 +115,4 @@ suite "Waku Lightpush":
|
||||
requestError == error
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
@ -1,6 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/options,
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
@ -43,7 +44,7 @@ suite "WakuNode - Filter":
|
||||
filterPushHandlerFut.complete((pubsubTopic, msg))
|
||||
|
||||
## When
|
||||
await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)
|
||||
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)
|
||||
|
||||
# Wait for subscription to take effect
|
||||
waitFor sleepAsync(100.millis)
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/options,
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
@ -54,7 +55,7 @@ suite "WakuNode - Lightpush":
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
## When
|
||||
await lightNode.lightpushPublish(DefaultPubsubTopic, message)
|
||||
await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
|
||||
|
||||
## Then
|
||||
check await completionFutRelay.withTimeout(5.seconds)
|
||||
|
||||
@ -13,7 +13,6 @@ suite "Waku Message - Content topics namespacing":
|
||||
## Given
|
||||
var ns = NsContentTopic()
|
||||
ns.generation = none(int)
|
||||
ns.bias = Unbiased
|
||||
ns.application = "toychat"
|
||||
ns.version = "2"
|
||||
ns.name = "huilong"
|
||||
@ -39,7 +38,6 @@ suite "Waku Message - Content topics namespacing":
|
||||
let ns = nsRes.get()
|
||||
check:
|
||||
ns.generation == none(int)
|
||||
ns.bias == Unbiased
|
||||
ns.application == "toychat"
|
||||
ns.version == "2"
|
||||
ns.name == "huilong"
|
||||
@ -47,7 +45,7 @@ suite "Waku Message - Content topics namespacing":
|
||||
|
||||
test "Parse content topic string - Valid string with sharding":
|
||||
## Given
|
||||
let topic = "/0/lower20/toychat/2/huilong/proto"
|
||||
let topic = "/0/toychat/2/huilong/proto"
|
||||
|
||||
## When
|
||||
let nsRes = NsContentTopic.parse(topic)
|
||||
@ -58,7 +56,6 @@ suite "Waku Message - Content topics namespacing":
|
||||
let ns = nsRes.get()
|
||||
check:
|
||||
ns.generation == some(0)
|
||||
ns.bias == Lower20
|
||||
ns.application == "toychat"
|
||||
ns.version == "2"
|
||||
ns.name == "huilong"
|
||||
@ -122,11 +119,11 @@ suite "Waku Message - Content topics namespacing":
|
||||
let err = ns.tryError()
|
||||
check:
|
||||
err.kind == ParsingErrorKind.InvalidFormat
|
||||
err.cause == "invalid topic structure"
|
||||
err.cause == "generation should be a numeric value"
|
||||
|
||||
test "Parse content topic string - Invalid string: non numeric generation":
|
||||
## Given
|
||||
let topic = "/first/unbiased/toychat/2/huilong/proto"
|
||||
let topic = "/first/toychat/2/huilong/proto"
|
||||
|
||||
## When
|
||||
let ns = NsContentTopic.parse(topic)
|
||||
@ -139,21 +136,6 @@ suite "Waku Message - Content topics namespacing":
|
||||
err.kind == ParsingErrorKind.InvalidFormat
|
||||
err.cause == "generation should be a numeric value"
|
||||
|
||||
test "Parse content topic string - Invalid string: invalid bias":
|
||||
## Given
|
||||
let topic = "/0/no/toychat/2/huilong/proto"
|
||||
|
||||
## When
|
||||
let ns = NsContentTopic.parse(topic)
|
||||
|
||||
## Then
|
||||
assert ns.isErr(), $ns.get()
|
||||
|
||||
let err = ns.tryError()
|
||||
check:
|
||||
err.kind == ParsingErrorKind.InvalidFormat
|
||||
err.cause == "bias should be one of; unbiased, lower20 or higher80"
|
||||
|
||||
suite "Waku Message - Pub-sub topics namespacing":
|
||||
|
||||
test "Stringify named sharding pub-sub topic":
|
||||
|
||||
@ -4,7 +4,6 @@ import
|
||||
std/options,
|
||||
std/strutils,
|
||||
std/sugar,
|
||||
std/algorithm,
|
||||
std/random,
|
||||
stew/results,
|
||||
testutils/unittests
|
||||
@ -34,88 +33,60 @@ suite "Waku Sharding":
|
||||
|
||||
let enc = "cbor"
|
||||
|
||||
NsContentTopic.init(none(int), Unbiased, app, version, name, enc)
|
||||
NsContentTopic.init(none(int), app, version, name, enc)
|
||||
|
||||
test "Implicit content topic generation":
|
||||
## Given
|
||||
let topic = "/toychat/2/huilong/proto"
|
||||
|
||||
## When
|
||||
let ns = NsContentTopic.parse(topic).expect("Parsing")
|
||||
|
||||
let paramRes = shardCount(ns)
|
||||
let parseRes = NsContentTopic.parse(topic)
|
||||
|
||||
## Then
|
||||
assert paramRes.isOk(), paramRes.error
|
||||
assert parseRes.isOk(), $parseRes.error
|
||||
|
||||
let count = paramRes.get()
|
||||
let nsTopic = parseRes.get()
|
||||
check:
|
||||
count == GenerationZeroShardsCount
|
||||
ns.bias == Unbiased
|
||||
nsTopic.generation == none(int)
|
||||
|
||||
test "Valid content topic":
|
||||
## Given
|
||||
let topic = "/0/lower20/toychat/2/huilong/proto"
|
||||
let topic = "/0/toychat/2/huilong/proto"
|
||||
|
||||
## When
|
||||
let ns = NsContentTopic.parse(topic).expect("Parsing")
|
||||
|
||||
let paramRes = shardCount(ns)
|
||||
let parseRes = NsContentTopic.parse(topic)
|
||||
|
||||
## Then
|
||||
assert paramRes.isOk(), paramRes.error
|
||||
assert parseRes.isOk(), $parseRes.error
|
||||
|
||||
let count = paramRes.get()
|
||||
let nsTopic = parseRes.get()
|
||||
check:
|
||||
count == GenerationZeroShardsCount
|
||||
ns.bias == Lower20
|
||||
nsTopic.generation.get() == 0
|
||||
|
||||
test "Invalid content topic generation":
|
||||
## Given
|
||||
let topic = "/1/unbiased/toychat/2/huilong/proto"
|
||||
let topic = "/1/toychat/2/huilong/proto"
|
||||
|
||||
## When
|
||||
let ns = NsContentTopic.parse(topic).expect("Parsing")
|
||||
|
||||
let paramRes = shardCount(ns)
|
||||
let shardRes = getShard(ns)
|
||||
|
||||
## Then
|
||||
assert paramRes.isErr(), $paramRes.get()
|
||||
assert shardRes.isErr(), $shardRes.get()
|
||||
|
||||
let err = paramRes.error
|
||||
let err = shardRes.error
|
||||
check:
|
||||
err == "Generation > 0 are not supported yet"
|
||||
|
||||
test "Weigths bias":
|
||||
#[ test "Sorted shard list":
|
||||
## Given
|
||||
let count = 5
|
||||
|
||||
## When
|
||||
let anonWeigths = biasedWeights(count, ShardingBias.Lower20)
|
||||
let speedWeigths = biasedWeights(count, ShardingBias.Higher80)
|
||||
|
||||
## Then
|
||||
check:
|
||||
anonWeigths[0] == 2.0
|
||||
anonWeigths[1] == 1.0
|
||||
anonWeigths[2] == 1.0
|
||||
anonWeigths[3] == 1.0
|
||||
anonWeigths[4] == 1.0
|
||||
|
||||
speedWeigths[0] == 1.0
|
||||
speedWeigths[1] == 2.0
|
||||
speedWeigths[2] == 2.0
|
||||
speedWeigths[3] == 2.0
|
||||
speedWeigths[4] == 2.0
|
||||
|
||||
test "Sorted shard list":
|
||||
## Given
|
||||
let topic = "/0/unbiased/toychat/2/huilong/proto"
|
||||
let topic = "/0/toychat/2/huilong/proto"
|
||||
|
||||
## When
|
||||
let contentTopic = NsContentTopic.parse(topic).expect("Parsing")
|
||||
let count = shardCount(contentTopic).expect("Valid parameters")
|
||||
let weights = biasedWeights(count, contentTopic.bias)
|
||||
let weights = repeat(1.0, count)
|
||||
|
||||
let shardsRes = weightedShardList(contentTopic, count, weights)
|
||||
|
||||
@ -125,7 +96,7 @@ suite "Waku Sharding":
|
||||
let shards = shardsRes.get()
|
||||
check:
|
||||
shards.len == count
|
||||
isSorted(shards, hashOrder)
|
||||
isSorted(shards, hashOrder) ]#
|
||||
|
||||
test "Shard Choice Reproducibility":
|
||||
## Given
|
||||
@ -134,15 +105,11 @@ suite "Waku Sharding":
|
||||
## When
|
||||
let contentTopic = NsContentTopic.parse(topic).expect("Parsing")
|
||||
|
||||
let res = singleHighestWeigthShard(contentTopic)
|
||||
let pubsub = getGenZeroShard(contentTopic, GenerationZeroShardsCount)
|
||||
|
||||
## Then
|
||||
assert res.isOk(), res.error
|
||||
|
||||
let pubsubTopic = res.get()
|
||||
|
||||
check:
|
||||
pubsubTopic == NsPubsubTopic.staticSharding(ClusterIndex, 3)
|
||||
pubsub == NsPubsubTopic.staticSharding(ClusterIndex, 3)
|
||||
|
||||
test "Shard Choice Simulation":
|
||||
## Given
|
||||
@ -154,7 +121,7 @@ suite "Waku Sharding":
|
||||
|
||||
## When
|
||||
for topic in topics:
|
||||
let pubsub = singleHighestWeigthShard(topic).expect("Valid Topic")
|
||||
let pubsub = getShard(topic).expect("Valid Topic")
|
||||
counts[pubsub.shard] += 1
|
||||
|
||||
## Then
|
||||
|
||||
@ -10,7 +10,6 @@ import
|
||||
../../../waku/node/peer_manager,
|
||||
../../../waku/waku_filter_v2,
|
||||
../../../waku/waku_filter_v2/client,
|
||||
../../../waku/waku_filter_v2/rpc,
|
||||
../../../waku/waku_core,
|
||||
../testlib/common,
|
||||
../testlib/wakucore
|
||||
|
||||
@ -115,4 +115,4 @@ suite "Waku Store - query handler":
|
||||
error.kind == HistoryErrorKind.BAD_REQUEST
|
||||
|
||||
## Cleanup
|
||||
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
||||
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
||||
@ -232,7 +232,7 @@ procSuite "WakuNode - Store":
|
||||
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
|
||||
filterFut.complete((pubsubTopic, msg))
|
||||
|
||||
waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)
|
||||
waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer)
|
||||
|
||||
waitFor sleepAsync(100.millis)
|
||||
|
||||
|
||||
@ -61,9 +61,9 @@ procSuite "Waku v2 JSON-RPC API - Filter":
|
||||
|
||||
let contentFilters = @[
|
||||
ContentFilter(contentTopic: DefaultContentTopic),
|
||||
ContentFilter(contentTopic: ContentTopic("2")),
|
||||
ContentFilter(contentTopic: ContentTopic("3")),
|
||||
ContentFilter(contentTopic: ContentTopic("4")),
|
||||
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content2/proto")),
|
||||
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content3/proto")),
|
||||
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content4/proto")),
|
||||
]
|
||||
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
|
||||
check:
|
||||
|
||||
@ -90,7 +90,7 @@ suite "Waku v2 Rest API - Filter":
|
||||
]
|
||||
|
||||
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
|
||||
pubsubTopic: DefaultPubsubTopic)
|
||||
pubsubTopic: some(DefaultPubsubTopic))
|
||||
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
|
||||
|
||||
# Then
|
||||
@ -106,7 +106,7 @@ suite "Waku v2 Rest API - Filter":
|
||||
restFilterTest.messageCache.isSubscribed("4")
|
||||
|
||||
# When - error case
|
||||
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "")
|
||||
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string))
|
||||
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
|
||||
|
||||
check:
|
||||
@ -137,7 +137,7 @@ suite "Waku v2 Rest API - Filter":
|
||||
|
||||
# When
|
||||
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
|
||||
pubsubTopic: DefaultPubsubTopic)
|
||||
pubsubTopic: some(DefaultPubsubTopic))
|
||||
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
|
||||
|
||||
# Then
|
||||
|
||||
@ -21,8 +21,6 @@ logScope:
|
||||
topics = "waku node jsonrpc filter_api"
|
||||
|
||||
|
||||
const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
const futTimeout* = 5.seconds # Max time to wait for futures
|
||||
|
||||
|
||||
@ -32,7 +30,7 @@ type
|
||||
|
||||
proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) =
|
||||
|
||||
server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool:
|
||||
server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], pubsubTopic: Option[PubsubTopic]) -> bool:
|
||||
## Subscribes a node to a list of content filters
|
||||
debug "post_waku_v2_filter_v1_subscription"
|
||||
|
||||
@ -40,9 +38,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
|
||||
if peerOpt.isNone():
|
||||
raise newException(ValueError, "no suitable remote filter peers")
|
||||
|
||||
let
|
||||
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
|
||||
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
|
||||
let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
|
||||
|
||||
let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
|
||||
cache.addMessage(msg.contentTopic, msg)
|
||||
@ -57,13 +53,11 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
|
||||
|
||||
return true
|
||||
|
||||
server.rpc("delete_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool:
|
||||
server.rpc("delete_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], pubsubTopic: Option[PubsubTopic]) -> bool:
|
||||
## Unsubscribes a node from a list of content filters
|
||||
debug "delete_waku_v2_filter_v1_subscription"
|
||||
|
||||
let
|
||||
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
|
||||
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
|
||||
let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
|
||||
|
||||
let unsubFut = node.unsubscribe(pubsubTopic, contentTopics)
|
||||
if not await unsubFut.withTimeout(futTimeout):
|
||||
|
||||
@ -25,7 +25,7 @@ type FilterWakuMessage* = object
|
||||
type FilterGetMessagesResponse* = seq[FilterWakuMessage]
|
||||
|
||||
type FilterSubscriptionsRequest* = object
|
||||
pubsubTopic*: PubSubTopic
|
||||
pubsubTopic*: Option[PubSubTopic]
|
||||
contentFilters*: seq[ContentTopic]
|
||||
|
||||
#### Type conversion
|
||||
@ -146,6 +146,6 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions
|
||||
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
|
||||
|
||||
value = FilterSubscriptionsRequest(
|
||||
pubsubTopic: pubsubTopic.get(),
|
||||
pubsubTopic: if pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
|
||||
contentFilters: contentFilters.get()
|
||||
)
|
||||
@ -4,7 +4,7 @@ else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[hashes, options, tables, strutils, sequtils, os],
|
||||
std/[hashes, options, sugar, tables, strutils, sequtils, os],
|
||||
chronos, chronicles, metrics,
|
||||
stew/results,
|
||||
stew/byteutils,
|
||||
@ -365,7 +365,7 @@ proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
|
||||
node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec))
|
||||
|
||||
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
|
||||
proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic],
|
||||
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
|
||||
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
||||
if node.wakuFilterClientLegacy.isNil():
|
||||
@ -379,8 +379,6 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
||||
|
||||
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
|
||||
# TODO: Move this logic to wakunode2 app
|
||||
let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} =
|
||||
@ -389,14 +387,44 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
|
||||
|
||||
await handler(pubsubTopic, message)
|
||||
|
||||
let subRes = await node.wakuFilterClientLegacy.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer)
|
||||
if subRes.isOk():
|
||||
info "subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
|
||||
else:
|
||||
error "failed filter subscription", error=subRes.error
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
if pubsubTopic.isSome():
|
||||
info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
|
||||
|
||||
proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
|
||||
let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), contentTopics, handlerWrapper, peer=remotePeer)
|
||||
|
||||
if res.isOk():
|
||||
info "subscribed to topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
|
||||
else:
|
||||
error "failed filter subscription", error=res.error
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
else:
|
||||
let topicMapRes = parseSharding(pubsubTopic, contentTopics)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
error "can't get shard", error=topicMapRes.error
|
||||
return
|
||||
else: topicMapRes.get()
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
|
||||
let content = topics.mapIt($it)
|
||||
node.wakuFilterClientLegacy.subscribe($pubsub, content, handlerWrapper, peer=remotePeer)
|
||||
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
|
||||
if res.isErr():
|
||||
error "failed filter subscription", error=res.error
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics
|
||||
|
||||
proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic],
|
||||
peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
|
||||
## Unsubscribe from a content filter.
|
||||
if node.wakuFilterClientLegacy.isNil():
|
||||
@ -410,17 +438,45 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
||||
if pubsubTopic.isSome():
|
||||
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
|
||||
|
||||
let unsubRes = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
|
||||
if unsubRes.isOk():
|
||||
info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
|
||||
let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer)
|
||||
|
||||
if res.isOk():
|
||||
info "unsubscribed from topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
|
||||
else:
|
||||
error "failed filter unsubscription", error=res.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
else:
|
||||
error "failed filter unsubscription", error=unsubRes.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
let topicMapRes = parseSharding(pubsubTopic, contentTopics)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
error "can't get shard", error = topicMapRes.error
|
||||
return
|
||||
else: topicMapRes.get()
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
|
||||
let content = topics.mapIt($it)
|
||||
node.wakuFilterClientLegacy.unsubscribe($pubsub, content, peer=remotePeer)
|
||||
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
|
||||
if res.isErr():
|
||||
error "failed filter unsubscription", error=res.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
|
||||
proc subscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
|
||||
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} =
|
||||
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
||||
if node.wakuFilterClientLegacy.isNil():
|
||||
@ -435,7 +491,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
|
||||
await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get())
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe,
|
||||
proc unsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe,
|
||||
deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} =
|
||||
## Unsubscribe from a content filter.
|
||||
if node.wakuFilterClientLegacy.isNil():
|
||||
@ -623,7 +679,7 @@ proc mountLightPushClient*(node: WakuNode) =
|
||||
|
||||
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
|
||||
|
||||
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
||||
## Returns whether relaying was successful or not.
|
||||
## `WakuMessage` should contain a `contentTopic` field for light node
|
||||
@ -631,12 +687,23 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
|
||||
if node.wakuLightpushClient.isNil():
|
||||
return err("waku lightpush client is nil")
|
||||
|
||||
debug "publishing message with lightpush", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, peer=peer.peerId
|
||||
if pubsubTopic.isSome():
|
||||
debug "publishing message with lightpush", pubsubTopic=pubsubTopic.get(), contentTopic=message.contentTopic, peer=peer.peerId
|
||||
return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer)
|
||||
|
||||
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
||||
let topicMapRes = parseSharding(pubsubTopic, message.contentTopic)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
return err(topicMapRes.error)
|
||||
else: topicMapRes.get()
|
||||
|
||||
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
|
||||
debug "publishing message with lightpush", pubsubTopic=pubsub, contentTopic=message.contentTopic, peer=peer.peerId
|
||||
return await node.wakuLightpushClient.publish($pubsub, message, peer)
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.async, gcsafe,
|
||||
proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage): Future[void] {.async, gcsafe,
|
||||
deprecated: "Use 'node.lightpushPublish()' instead".} =
|
||||
if node.wakuLightpushClient.isNil():
|
||||
error "failed to publish message", error="waku lightpush client is nil"
|
||||
|
||||
@ -26,25 +26,18 @@ const DefaultContentTopic* = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
## Namespaced content topic
|
||||
|
||||
type ShardingBias* = enum
|
||||
Unbiased = "unbiased"
|
||||
Lower20 = "lower20"
|
||||
Higher80 = "higher80"
|
||||
|
||||
type
|
||||
NsContentTopic* = object
|
||||
generation*: Option[int]
|
||||
bias*: ShardingBias
|
||||
application*: string
|
||||
version*: string
|
||||
name*: string
|
||||
encoding*: string
|
||||
|
||||
proc init*(T: type NsContentTopic, generation: Option[int], bias: ShardingBias,
|
||||
proc init*(T: type NsContentTopic, generation: Option[int],
|
||||
application: string, version: string, name: string, encoding: string): T =
|
||||
NsContentTopic(
|
||||
generation: generation,
|
||||
bias: bias,
|
||||
application: application,
|
||||
version: version,
|
||||
name: name,
|
||||
@ -56,16 +49,13 @@ proc init*(T: type NsContentTopic, generation: Option[int], bias: ShardingBias,
|
||||
proc `$`*(topic: NsContentTopic): string =
|
||||
## Returns a string representation of a namespaced topic
|
||||
## in the format `/<application>/<version>/<topic-name>/<encoding>`
|
||||
## Autosharding adds 2 optional prefixes `/<gen#>/bias
|
||||
## Autosharding adds 1 optional prefix `/<gen#>
|
||||
|
||||
var formatted = ""
|
||||
|
||||
if topic.generation.isSome():
|
||||
formatted = formatted & "/" & $topic.generation.get()
|
||||
|
||||
if topic.bias != ShardingBias.Unbiased:
|
||||
formatted = formatted & "/" & $topic.bias
|
||||
|
||||
formatted & "/" & topic.application & "/" & topic.version & "/" & topic.name & "/" & topic.encoding
|
||||
|
||||
# Deserialization
|
||||
@ -73,7 +63,7 @@ proc `$`*(topic: NsContentTopic): string =
|
||||
proc parse*(T: type NsContentTopic, topic: ContentTopic|string): ParsingResult[NsContentTopic] =
|
||||
## Splits a namespaced topic string into its constituent parts.
|
||||
## The topic string has to be in the format `/<application>/<version>/<topic-name>/<encoding>`
|
||||
## Autosharding adds 2 optional prefixes `/<gen#>/bias
|
||||
## Autosharding adds 1 optional prefix `/<gen#>
|
||||
|
||||
if not topic.startsWith("/"):
|
||||
return err(ParsingError.invalidFormat("topic must start with slash"))
|
||||
@ -98,8 +88,8 @@ proc parse*(T: type NsContentTopic, topic: ContentTopic|string): ParsingResult[N
|
||||
if enc.len == 0:
|
||||
return err(ParsingError.missingPart("encoding"))
|
||||
|
||||
return ok(NsContentTopic.init(none(int), Unbiased, app, ver, name, enc))
|
||||
of 6:
|
||||
return ok(NsContentTopic.init(none(int), app, ver, name, enc))
|
||||
of 5:
|
||||
if parts[0].len == 0:
|
||||
return err(ParsingError.missingPart("generation"))
|
||||
|
||||
@ -108,31 +98,23 @@ proc parse*(T: type NsContentTopic, topic: ContentTopic|string): ParsingResult[N
|
||||
except ValueError:
|
||||
return err(ParsingError.invalidFormat("generation should be a numeric value"))
|
||||
|
||||
if parts[1].len == 0:
|
||||
return err(ParsingError.missingPart("sharding-bias"))
|
||||
|
||||
let bias = try:
|
||||
parseEnum[ShardingBias](parts[1])
|
||||
except ValueError:
|
||||
return err(ParsingError.invalidFormat("bias should be one of; unbiased, lower20 or higher80"))
|
||||
|
||||
let app = parts[2]
|
||||
let app = parts[1]
|
||||
if app.len == 0:
|
||||
return err(ParsingError.missingPart("appplication"))
|
||||
|
||||
let ver = parts[3]
|
||||
let ver = parts[2]
|
||||
if ver.len == 0:
|
||||
return err(ParsingError.missingPart("version"))
|
||||
|
||||
let name = parts[4]
|
||||
let name = parts[3]
|
||||
if name.len == 0:
|
||||
return err(ParsingError.missingPart("topic-name"))
|
||||
|
||||
let enc = parts[5]
|
||||
let enc = parts[4]
|
||||
if enc.len == 0:
|
||||
return err(ParsingError.missingPart("encoding"))
|
||||
|
||||
return ok(NsContentTopic.init(some(gen), bias, app, ver, name, enc))
|
||||
return ok(NsContentTopic.init(some(gen), app, ver, name, enc))
|
||||
else:
|
||||
return err(ParsingError.invalidFormat("invalid topic structure"))
|
||||
|
||||
|
||||
@ -10,9 +10,7 @@ else:
|
||||
import
|
||||
nimcrypto,
|
||||
std/options,
|
||||
std/math,
|
||||
std/sequtils,
|
||||
std/algorithm,
|
||||
std/tables,
|
||||
stew/endians2,
|
||||
stew/results,
|
||||
stew/byteutils
|
||||
@ -22,14 +20,77 @@ import
|
||||
./pubsub_topic
|
||||
|
||||
## For indices allocation and other magic numbers refer to RFC 51
|
||||
const ClusterIndex* = 49152
|
||||
const GenerationZeroShardsCount* = 5
|
||||
const ClusterIndex* = 1
|
||||
const GenerationZeroShardsCount* = 8
|
||||
|
||||
type ShardsPriority = seq[tuple[topic: NsPubsubTopic, value: float64]]
|
||||
proc getGenZeroShard*(topic: NsContentTopic, count: int): NsPubsubTopic =
|
||||
let bytes = toBytes(topic.application) & toBytes(topic.version)
|
||||
|
||||
proc shardCount*(topic: NsContentTopic): Result[int, string] =
|
||||
## Returns the total shard count, sharding selection bias
|
||||
## and the shard name from the content topic.
|
||||
let hash = sha256.digest(bytes)
|
||||
|
||||
# We only use the last 64 bits of the hash as having more shards is unlikely.
|
||||
let hashValue = uint64.fromBytesBE(hash.data[24..31])
|
||||
|
||||
# This is equilavent to modulo shard count but faster
|
||||
let shard = hashValue and uint64((count - 1))
|
||||
|
||||
NsPubsubTopic.staticSharding(ClusterIndex, uint16(shard))
|
||||
|
||||
proc getShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] =
|
||||
## Compute the (pubsub topic) shard to use for this content topic.
|
||||
|
||||
if topic.generation.isNone():
|
||||
## Implicit generation # is 0 for all content topic
|
||||
return ok(getGenZeroShard(topic, GenerationZeroShardsCount))
|
||||
|
||||
case topic.generation.get():
|
||||
of 0: return ok(getGenZeroShard(topic, GenerationZeroShardsCount))
|
||||
else: return err("Generation > 0 are not supported yet")
|
||||
|
||||
proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]): Result[Table[NsPubsubTopic, seq[NsContentTopic]], string] =
|
||||
var topics: seq[ContentTopic]
|
||||
when contentTopics is seq[ContentTopic]:
|
||||
topics = contentTopics
|
||||
else:
|
||||
topics = @[contentTopics]
|
||||
|
||||
var topicMap = initTable[NsPubsubTopic, seq[NsContentTopic]]()
|
||||
for contentTopic in topics:
|
||||
let parseRes = NsContentTopic.parse(contentTopic)
|
||||
|
||||
let content =
|
||||
if parseRes.isErr():
|
||||
return err("Cannot parse content topic: " & $parseRes.error)
|
||||
else: parseRes.get()
|
||||
|
||||
let pubsub =
|
||||
if pubsubTopic.isSome():
|
||||
let parseRes = NsPubsubTopic.parse(pubsubTopic.get())
|
||||
|
||||
if parseRes.isErr():
|
||||
return err("Cannot parse pubsub topic: " & $parseRes.error)
|
||||
else: parseRes.get()
|
||||
else:
|
||||
let shardsRes = getShard(content)
|
||||
|
||||
if shardsRes.isErr():
|
||||
return err("Cannot autoshard content topic: " & $shardsRes.error)
|
||||
else: shardsRes.get()
|
||||
|
||||
if not topicMap.hasKey(pubsub):
|
||||
topicMap[pubsub] = @[]
|
||||
|
||||
try:
|
||||
topicMap[pubsub].add(content)
|
||||
except CatchableError:
|
||||
return err(getCurrentExceptionMsg())
|
||||
|
||||
ok(topicMap)
|
||||
|
||||
#type ShardsPriority = seq[tuple[topic: NsPubsubTopic, value: float64]]
|
||||
|
||||
#[ proc shardCount*(topic: NsContentTopic): Result[int, string] =
|
||||
## Returns the total shard count from the content topic.
|
||||
let shardCount =
|
||||
if topic.generation.isNone():
|
||||
## Implicit generation # is 0 for all content topic
|
||||
@ -41,34 +102,15 @@ proc shardCount*(topic: NsContentTopic): Result[int, string] =
|
||||
else:
|
||||
return err("Generation > 0 are not supported yet")
|
||||
|
||||
ok((shardCount))
|
||||
ok((shardCount)) ]#
|
||||
|
||||
proc biasedWeights*(shardCount: int, bias: ShardingBias): seq[float64] =
|
||||
var weights = repeat(1.0, shardCount)
|
||||
#[ proc applyWeight(hashValue: uint64, weight: float64): float64 =
|
||||
(-weight) / math.ln(float64(hashValue) / float64(high(uint64))) ]#
|
||||
|
||||
case bias:
|
||||
of Unbiased:
|
||||
return weights
|
||||
of Lower20:
|
||||
# we choose the lower 20% of shards and double their weigths
|
||||
let index = shardCount div 5
|
||||
for i in (0..<index):
|
||||
weights[i] *= 2.0
|
||||
of Higher80:
|
||||
# we choose the higher 80% of shards and double their weigths
|
||||
let index = shardCount div 5
|
||||
for i in (index..<shardCount):
|
||||
weights[i] *= 2.0
|
||||
#[ proc hashOrder*(x, y: (NsPubsubTopic, float64)): int =
|
||||
cmp(x[1], y[1]) ]#
|
||||
|
||||
weights
|
||||
|
||||
proc applyWeight(hashValue: uint64, weight: float64): float64 =
|
||||
(-weight) / math.ln(float64(hashValue) / float64(high(uint64)))
|
||||
|
||||
proc hashOrder*(x, y: (NsPubsubTopic, float64)): int =
|
||||
cmp(x[1], y[1])
|
||||
|
||||
proc weightedShardList*(topic: NsContentTopic, shardCount: int, weightList: seq[float64]): Result[ShardsPriority, string] =
|
||||
#[ proc weightedShardList*(topic: NsContentTopic, shardCount: int, weightList: seq[float64]): Result[ShardsPriority, string] =
|
||||
## Returns the ordered list of shards and their priority values.
|
||||
if weightList.len < shardCount:
|
||||
return err("Must provide weights for every shards")
|
||||
@ -91,15 +133,15 @@ proc weightedShardList*(topic: NsContentTopic, shardCount: int, weightList: seq[
|
||||
|
||||
list.sort(hashOrder)
|
||||
|
||||
ok(list)
|
||||
ok(list) ]#
|
||||
|
||||
proc singleHighestWeigthShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] =
|
||||
#[ proc singleHighestWeigthShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] =
|
||||
let count = ? shardCount(topic)
|
||||
|
||||
let weights = biasedWeights(count, topic.bias)
|
||||
let weights = repeat(1.0, count)
|
||||
|
||||
let list = ? weightedShardList(topic, count, weights)
|
||||
|
||||
let (pubsub, _) = list[list.len - 1]
|
||||
|
||||
ok(pubsub)
|
||||
ok(pubsub) ]#
|
||||
@ -66,6 +66,6 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
|
||||
|
||||
return ok()
|
||||
|
||||
proc publish*(wl: WakuLightPushClient, pubsubTopic: PubsubTopic, message: WakuMessage, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
let pushRequest = PushRequest(pubsubTopic: pubsubTopic, message: message)
|
||||
return await wl.sendPushRequest(pushRequest, peer)
|
||||
proc publish*(wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
|
||||
return await wl.sendPushRequest(pushRequest, peer)
|
||||
Loading…
x
Reference in New Issue
Block a user