chore: move SubscriptionManager under waku_core (#2025)

* chore: cherry-pick from Filter V2 RestApi PR: move FilterPushHandler and SubscriptionManager from Filter V1 to under waku_core
This commit is contained in:
NagyZoltanPeter 2023-09-12 17:08:40 +02:00 committed by GitHub
parent ebe715e9fa
commit 563b2b20a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 45 deletions

View File

@ -2,10 +2,12 @@ import
./waku_core/topics,
./waku_core/time,
./waku_core/message,
./waku_core/peers
./waku_core/peers,
./waku_core/subscription
export
topics,
time,
message,
peers
peers,
subscription

View File

@ -0,0 +1,7 @@
import
./subscription/subscription_manager,
./subscription/push_handler
export
subscription_manager,
push_handler

View File

@ -0,0 +1,13 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos
import
../topics,
../message
type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.}

View File

@ -0,0 +1,48 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/tables,
stew/results,
chronicles,
chronos
import
./push_handler,
../topics,
../message
## Subscription manager
type SubscriptionManager* = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
proc init*(T: type SubscriptionManager): T =
SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]())
proc clear*(m: var SubscriptionManager) =
m.subscriptions.clear()
proc registerSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) =
try:
# TODO: Handle over subscription surprises
m.subscriptions[(pubsubTopic, contentTopic)]= handler
except CatchableError:
error "failed to register filter subscription", error=getCurrentExceptionMsg()
proc removeSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) =
m.subscriptions.del((pubsubTopic, contentTopic))
proc notifySubscriptionHandler*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) =
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
return
try:
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
asyncSpawn handler(pubsubTopic, message)
except CatchableError:
discard
proc getSubscriptionsCount*(m: SubscriptionManager): int =
m.subscriptions.len()

View File

@ -27,50 +27,7 @@ logScope:
const Defaultstring = "/waku/2/default-waku/proto"
### Client, filter subscripton manager
type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.}
## Subscription manager
type SubscriptionManager = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
proc init(T: type SubscriptionManager): T =
SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]())
proc clear(m: var SubscriptionManager) =
m.subscriptions.clear()
proc registerSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) =
try:
m.subscriptions[(pubsubTopic, contentTopic)]= handler
except: # TODO: Fix "BareExcept" warning
error "failed to register filter subscription", error=getCurrentExceptionMsg()
proc removeSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) =
m.subscriptions.del((pubsubTopic, contentTopic))
proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) =
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
return
try:
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
asyncSpawn handler(pubsubTopic, message)
except: # TODO: Fix "BareExcept" warning
discard
proc getSubscriptionsCount(m: SubscriptionManager): int =
m.subscriptions.len()
## Client
type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
type WakuFilterClientLegacy* = ref object of LPProtocol
rng: ref rand.HmacDrbgContext
peerManager: PeerManager