mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
chore: move SubscriptionManager under waku_core (#2025)
* chore: cherry-pick from Filter V2 RestApi PR: move FilterPushHandler and SubscriptionManager from Filter V1 to under waku_core
This commit is contained in:
parent
3755d0e714
commit
5913d6e946
@ -2,10 +2,12 @@ import
|
|||||||
./waku_core/topics,
|
./waku_core/topics,
|
||||||
./waku_core/time,
|
./waku_core/time,
|
||||||
./waku_core/message,
|
./waku_core/message,
|
||||||
./waku_core/peers
|
./waku_core/peers,
|
||||||
|
./waku_core/subscription
|
||||||
|
|
||||||
export
|
export
|
||||||
topics,
|
topics,
|
||||||
time,
|
time,
|
||||||
message,
|
message,
|
||||||
peers
|
peers,
|
||||||
|
subscription
|
||||||
|
|||||||
7
waku/waku_core/subscription.nim
Normal file
7
waku/waku_core/subscription.nim
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
import
|
||||||
|
./subscription/subscription_manager,
|
||||||
|
./subscription/push_handler
|
||||||
|
|
||||||
|
export
|
||||||
|
subscription_manager,
|
||||||
|
push_handler
|
||||||
13
waku/waku_core/subscription/push_handler.nim
Normal file
13
waku/waku_core/subscription/push_handler.nim
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
chronos
|
||||||
|
|
||||||
|
import
|
||||||
|
../topics,
|
||||||
|
../message
|
||||||
|
|
||||||
|
type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.}
|
||||||
48
waku/waku_core/subscription/subscription_manager.nim
Normal file
48
waku/waku_core/subscription/subscription_manager.nim
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/tables,
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos
|
||||||
|
|
||||||
|
import
|
||||||
|
./push_handler,
|
||||||
|
../topics,
|
||||||
|
../message
|
||||||
|
|
||||||
|
## Subscription manager
|
||||||
|
type SubscriptionManager* = object
|
||||||
|
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
|
||||||
|
|
||||||
|
proc init*(T: type SubscriptionManager): T =
|
||||||
|
SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]())
|
||||||
|
|
||||||
|
proc clear*(m: var SubscriptionManager) =
|
||||||
|
m.subscriptions.clear()
|
||||||
|
|
||||||
|
proc registerSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) =
|
||||||
|
try:
|
||||||
|
# TODO: Handle over subscription surprises
|
||||||
|
m.subscriptions[(pubsubTopic, contentTopic)]= handler
|
||||||
|
except CatchableError:
|
||||||
|
error "failed to register filter subscription", error=getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
proc removeSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) =
|
||||||
|
m.subscriptions.del((pubsubTopic, contentTopic))
|
||||||
|
|
||||||
|
proc notifySubscriptionHandler*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) =
|
||||||
|
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
|
||||||
|
asyncSpawn handler(pubsubTopic, message)
|
||||||
|
except CatchableError:
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc getSubscriptionsCount*(m: SubscriptionManager): int =
|
||||||
|
m.subscriptions.len()
|
||||||
@ -27,50 +27,7 @@ logScope:
|
|||||||
|
|
||||||
const Defaultstring = "/waku/2/default-waku/proto"
|
const Defaultstring = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
|
|
||||||
### Client, filter subscripton manager
|
|
||||||
|
|
||||||
type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.}
|
|
||||||
|
|
||||||
|
|
||||||
## Subscription manager
|
|
||||||
|
|
||||||
type SubscriptionManager = object
|
|
||||||
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
|
|
||||||
|
|
||||||
proc init(T: type SubscriptionManager): T =
|
|
||||||
SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]())
|
|
||||||
|
|
||||||
proc clear(m: var SubscriptionManager) =
|
|
||||||
m.subscriptions.clear()
|
|
||||||
|
|
||||||
proc registerSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) =
|
|
||||||
try:
|
|
||||||
m.subscriptions[(pubsubTopic, contentTopic)]= handler
|
|
||||||
except: # TODO: Fix "BareExcept" warning
|
|
||||||
error "failed to register filter subscription", error=getCurrentExceptionMsg()
|
|
||||||
|
|
||||||
proc removeSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) =
|
|
||||||
m.subscriptions.del((pubsubTopic, contentTopic))
|
|
||||||
|
|
||||||
proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) =
|
|
||||||
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
|
|
||||||
asyncSpawn handler(pubsubTopic, message)
|
|
||||||
except: # TODO: Fix "BareExcept" warning
|
|
||||||
discard
|
|
||||||
|
|
||||||
proc getSubscriptionsCount(m: SubscriptionManager): int =
|
|
||||||
m.subscriptions.len()
|
|
||||||
|
|
||||||
|
|
||||||
## Client
|
## Client
|
||||||
|
|
||||||
type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
|
|
||||||
|
|
||||||
type WakuFilterClientLegacy* = ref object of LPProtocol
|
type WakuFilterClientLegacy* = ref object of LPProtocol
|
||||||
rng: ref rand.HmacDrbgContext
|
rng: ref rand.HmacDrbgContext
|
||||||
peerManager: PeerManager
|
peerManager: PeerManager
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user