diff --git a/floodsub.go b/floodsub.go index 65d640a..1b2eb08 100644 --- a/floodsub.go +++ b/floodsub.go @@ -78,3 +78,7 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { } } } + +func (fs *FloodSubRouter) Join(topic string) {} + +func (fs *FloodSubRouter) Leave(topic string) {} diff --git a/pubsub.go b/pubsub.go index 5d92af9..220bba1 100644 --- a/pubsub.go +++ b/pubsub.go @@ -96,6 +96,8 @@ type PubSubRouter interface { RemovePeer(peer.ID) HandleRPC(*RPC) Publish(peer.ID, *pb.Message) + Join(topic string) + Leave(topic string) } type Message struct { @@ -276,6 +278,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { if len(subs) == 0 { delete(p.myTopics, sub.topic) p.announce(sub.topic, false) + p.rt.Leave(sub.topic) } } @@ -290,6 +293,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { // announce we want this topic if len(subs) == 0 { p.announce(sub.topic, true) + p.rt.Join(sub.topic) } // make new if not there