88 lines
2.6 KiB
Nim
88 lines
2.6 KiB
Nim
## Nim-LibP2P
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
import tables, sets
|
|
import chronos, chronicles
|
|
import pubsubpeer,
|
|
../protocol,
|
|
../../connection,
|
|
../../peerinfo
|
|
|
|
export PubSubPeer
|
|
|
|
logScope:
|
|
topic = "PubSub"
|
|
|
|
type
|
|
TopicHandler* = proc (topic: string,
|
|
data: seq[byte]):
|
|
Future[void] {.closure, gcsafe.}
|
|
|
|
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
|
|
|
Topic* = object
|
|
name*: string
|
|
handler*: seq[TopicHandler]
|
|
|
|
PubSub* = ref object of LPProtocol
|
|
peerInfo*: PeerInfo
|
|
topics*: Table[string, Topic] # local topics
|
|
triggerSelf*: bool # flag indicating if the local handler should be triggered on publish
|
|
|
|
method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} =
|
|
## subscribe to a peer to send/receive pubsub messages
|
|
discard
|
|
|
|
method unsubscribe*(p: PubSub,
|
|
topics: seq[TopicPair]) {.base, async, gcsafe.} =
|
|
## unsubscribe from a list of ``topic`` strings
|
|
for t in topics:
|
|
for i, h in p.topics[t.topic].handler:
|
|
if h == t.handler:
|
|
p.topics[t.topic].handler.del(i)
|
|
|
|
method unsubscribe*(p: PubSub,
|
|
topic: string,
|
|
handler: TopicHandler): Future[void] {.base, gcsafe.} =
|
|
## unsubscribe from a ``topic`` string
|
|
result = p.unsubscribe(@[(topic, handler)])
|
|
|
|
method subscribe*(p: PubSub,
|
|
topic: string,
|
|
handler: TopicHandler)
|
|
{.base, async, gcsafe.} =
|
|
## subscribe to a topic
|
|
##
|
|
## ``topic`` - a string topic to subscribe to
|
|
##
|
|
## ``handler`` - is a user provided proc
|
|
## that will be triggered
|
|
## on every received message
|
|
##
|
|
if not p.topics.contains(topic):
|
|
trace "subscribing to topic", name = topic
|
|
p.topics[topic] = Topic(name: topic)
|
|
|
|
p.topics[topic].handler.add(handler)
|
|
|
|
method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} =
|
|
## publish to a ``topic``
|
|
if p.triggerSelf and topic in p.topics:
|
|
for h in p.topics[topic].handler:
|
|
await h(topic, data)
|
|
|
|
method initPubSub*(p: PubSub) {.base.} =
|
|
discard
|
|
|
|
proc newPubSub*(p: typedesc[PubSub], peerInfo: PeerInfo, triggerSelf: bool = false): p =
|
|
new result
|
|
result.peerInfo = peerInfo
|
|
result.triggerSelf = triggerSelf
|
|
result.initPubSub()
|