option to allow triggering own handlers on publish

This commit is contained in:
Dmitriy Ryajov 2019-10-03 16:22:49 -06:00
parent 37d7a03fba
commit 1d4b51413e
2 changed files with 20 additions and 8 deletions

View File

@ -23,7 +23,7 @@ logScope:
const FloodSubCodec* = "/floodsub/1.0.0" const FloodSubCodec* = "/floodsub/1.0.0"
type type
FloodSub = ref object of PubSub FloodSub* = ref object of PubSub
peers*: Table[string, PubSubPeer] # peerid to peer map peers*: Table[string, PubSubPeer] # peerid to peer map
peerTopics*: Table[string, HashSet[string]] # topic to remote peer map peerTopics*: Table[string, HashSet[string]] # topic to remote peer map
@ -145,6 +145,8 @@ method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} =
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]) {.async, gcsafe.} = data: seq[byte]) {.async, gcsafe.} =
await procCall PubSub(f).publish(topic, data)
trace "about to publish message on topic", name = topic, data = data.toHex() trace "about to publish message on topic", name = topic, data = data.toHex()
if data.len > 0 and topic.len > 0: if data.len > 0 and topic.len > 0:
let msg = makeMessage(f.peerInfo.peerId.get(), data, topic) let msg = makeMessage(f.peerInfo.peerId.get(), data, topic)
@ -169,10 +171,8 @@ method unsubscribe*(f: FloodSub,
for p in f.peers.values: for p in f.peers.values:
await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
proc newFloodSub*(peerInfo: PeerInfo): FloodSub = method initPubSub*(f: FloodSub) =
new result f.peers = initTable[string, PubSubPeer]()
result.peerInfo = peerInfo f.topics = initTable[string, Topic]()
result.peers = initTable[string, PubSubPeer]() f.peerTopics = initTable[string, HashSet[string]]()
result.topics = initTable[string, Topic]() f.init()
result.peerTopics = initTable[string, HashSet[string]]()
result.init()

View File

@ -33,6 +33,7 @@ type
PubSub* = ref object of LPProtocol PubSub* = ref object of LPProtocol
peerInfo*: PeerInfo peerInfo*: PeerInfo
topics*: Table[string, Topic] # local topics topics*: Table[string, Topic] # local topics
triggerSelf*: bool # flag indicating if the local handler should be triggered on publish
method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} =
## subscribe to a peer to send/receive pubsub messages ## subscribe to a peer to send/receive pubsub messages
@ -72,4 +73,15 @@ method subscribe*(p: PubSub,
method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} = method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} =
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler:
await h(topic, data)
method initPubSub*(p: PubSub) {.base.} =
discard discard
proc newPubSub*(p: typedesc[PubSub], peerInfo: PeerInfo, triggerSelf: bool = false): p =
new result
result.peerInfo = peerInfo
result.triggerSelf = triggerSelf
result.initPubSub()