From 3336559a2799fa8f588d47f0b5049e65aefc0535 Mon Sep 17 00:00:00 2001 From: Lukasz Zimnoch Date: Wed, 29 Apr 2020 15:06:23 +0200 Subject: [PATCH] Relay incoming messages Added a possibility to relay incoming messages if there are existing relays --- pubsub.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/pubsub.go b/pubsub.go index 3ac27fb..2e1c0b6 100644 --- a/pubsub.go +++ b/pubsub.go @@ -607,8 +607,8 @@ func (p *PubSub) handleRemoveTopic(req *rmTopicReq) { } // handleRemoveSubscription removes Subscription sub from bookeeping. -// If this was the last Subscription for a given topic, it will also announce -// that this node is not subscribing to this topic anymore. +// If this was the last subscription and no more relays exist for a given topic, +// it will also announce that this node is not subscribing to this topic anymore. // Only called from processLoop. func (p *PubSub) handleRemoveSubscription(sub *Subscription) { subs := p.mySubs[sub.topic] @@ -634,8 +634,8 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { } // handleAddSubscription adds a Subscription for a particular topic. If it is -// the first Subscription for the topic, it will announce that this node -// subscribes to the topic. +// the first subscription and no relays exist so far for the topic, it will +// announce that this node subscribes to the topic. // Only called from processLoop. func (p *PubSub) handleAddSubscription(req *addSubReq) { sub := req.sub @@ -661,8 +661,8 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { } // handleAddRelay adds a relay for a particular topic. If it is -// the first relay for the topic, it will announce that this node -// relays for the topic. +// the first relay and no subscriptions exist so far for the topic , it will +// announce that this node relays for the topic. // Only called from processLoop. func (p *PubSub) handleAddRelay(req *addRelayReq) { topic := req.topic @@ -685,8 +685,9 @@ func (p *PubSub) handleAddRelay(req *addRelayReq) { } // handleRemoveRelay removes one relay reference from bookkeeping. -// If this was the last relay reference for a given topic, it will also -// announce that this node is not relaying for this topic anymore. +// If this was the last relay reference and no more subscriptions exist +// for a given topic, it will also announce that this node is not relaying +// for this topic anymore. // Only called from processLoop. func (p *PubSub) handleRemoveRelay(topic string) { if p.myRelays[topic] == 0 { @@ -820,6 +821,21 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { return false } +// canRelayMsg returns whether we are able to relay for one of the topics +// of a given message +func (p *PubSub) canRelayMsg(msg *pb.Message) bool { + if len(p.myRelays) == 0 { + return false + } + + for _, t := range msg.GetTopicIDs() { + if relays := p.myRelays[t]; relays != 0 { + return true + } + } + return false +} + func (p *PubSub) notifyLeave(topic string, pid peer.ID) { if t, ok := p.myTopics[topic]; ok { t.sendNotification(PeerEvent{PeerLeave, pid}) @@ -865,7 +881,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { } for _, pmsg := range rpc.GetPublish() { - if !p.subscribedToMsg(pmsg) { + if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) { log.Warning("received message we didn't subscribe to. Dropping.") continue }