diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index e4e4833de..e0118db39 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -23,7 +23,7 @@ logScope: const FloodSubCodec* = "/floodsub/1.0.0" type - FloodSub = ref object of PubSub + FloodSub* = ref object of PubSub peers*: Table[string, PubSubPeer] # peerid to peer map peerTopics*: Table[string, HashSet[string]] # topic to remote peer map @@ -145,6 +145,8 @@ method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} = method publish*(f: FloodSub, topic: string, data: seq[byte]) {.async, gcsafe.} = + await procCall PubSub(f).publish(topic, data) + trace "about to publish message on topic", name = topic, data = data.toHex() if data.len > 0 and topic.len > 0: let msg = makeMessage(f.peerInfo.peerId.get(), data, topic) @@ -169,10 +171,8 @@ method unsubscribe*(f: FloodSub, for p in f.peers.values: await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) -proc newFloodSub*(peerInfo: PeerInfo): FloodSub = - new result - result.peerInfo = peerInfo - result.peers = initTable[string, PubSubPeer]() - result.topics = initTable[string, Topic]() - result.peerTopics = initTable[string, HashSet[string]]() - result.init() +method initPubSub*(f: FloodSub) = + f.peers = initTable[string, PubSubPeer]() + f.topics = initTable[string, Topic]() + f.peerTopics = initTable[string, HashSet[string]]() + f.init() diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 4b5d78909..58023d404 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -33,6 +33,7 @@ type PubSub* = ref object of LPProtocol peerInfo*: PeerInfo topics*: Table[string, Topic] # local topics + triggerSelf*: bool # flag indicating if the local handler should be triggered on publish method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = ## subscribe to a peer to send/receive pubsub messages @@ -72,4 +73,15 @@ method subscribe*(p: PubSub, method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} = ## publish to a ``topic`` + if p.triggerSelf and topic in p.topics: + for h in p.topics[topic].handler: + await h(topic, data) + +method initPubSub*(p: PubSub) {.base.} = discard + +proc newPubSub*(p: typedesc[PubSub], peerInfo: PeerInfo, triggerSelf: bool = false): p = + new result + result.peerInfo = peerInfo + result.triggerSelf = triggerSelf + result.initPubSub()