nim-libp2p-experimental/libp2p/protocols/pubsub/pubsub.nim

88 lines
2.6 KiB
Nim

## Nim-LibP2P
## Copyright (c) 2019 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import tables, sets
import chronos, chronicles
import pubsubpeer,
../protocol,
../../connection,
../../peerinfo
export PubSubPeer
logScope:
topic = "PubSub"
type
TopicHandler* = proc (topic: string,
data: seq[byte]):
Future[void] {.closure, gcsafe.}
TopicPair* = tuple[topic: string, handler: TopicHandler]
Topic* = object
name*: string
handler*: seq[TopicHandler]
PubSub* = ref object of LPProtocol
peerInfo*: PeerInfo
topics*: Table[string, Topic] # local topics
triggerSelf*: bool # flag indicating if the local handler should be triggered on publish
method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} =
## subscribe to a peer to send/receive pubsub messages
discard
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async, gcsafe.} =
## unsubscribe from a list of ``topic`` strings
for t in topics:
for i, h in p.topics[t.topic].handler:
if h == t.handler:
p.topics[t.topic].handler.del(i)
method unsubscribe*(p: PubSub,
topic: string,
handler: TopicHandler): Future[void] {.base, gcsafe.} =
## unsubscribe from a ``topic`` string
result = p.unsubscribe(@[(topic, handler)])
method subscribe*(p: PubSub,
topic: string,
handler: TopicHandler)
{.base, async, gcsafe.} =
## subscribe to a topic
##
## ``topic`` - a string topic to subscribe to
##
## ``handler`` - is a user provided proc
## that will be triggered
## on every received message
##
if not p.topics.contains(topic):
trace "subscribing to topic", name = topic
p.topics[topic] = Topic(name: topic)
p.topics[topic].handler.add(handler)
method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} =
## publish to a ``topic``
if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler:
await h(topic, data)
method initPubSub*(p: PubSub) {.base.} =
discard
proc newPubSub*(p: typedesc[PubSub], peerInfo: PeerInfo, triggerSelf: bool = false): p =
new result
result.peerInfo = peerInfo
result.triggerSelf = triggerSelf
result.initPubSub()