mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 13:53:06 +00:00
Relay incoming messages
Added a possibility to relay incoming messages if there are existing relays
This commit is contained in:
parent
0d884f3e90
commit
3336559a27
34
pubsub.go
34
pubsub.go
@ -607,8 +607,8 @@ func (p *PubSub) handleRemoveTopic(req *rmTopicReq) {
|
||||
}
|
||||
|
||||
// handleRemoveSubscription removes Subscription sub from bookeeping.
|
||||
// If this was the last Subscription for a given topic, it will also announce
|
||||
// that this node is not subscribing to this topic anymore.
|
||||
// If this was the last subscription and no more relays exist for a given topic,
|
||||
// it will also announce that this node is not subscribing to this topic anymore.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
||||
subs := p.mySubs[sub.topic]
|
||||
@ -634,8 +634,8 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
||||
}
|
||||
|
||||
// handleAddSubscription adds a Subscription for a particular topic. If it is
|
||||
// the first Subscription for the topic, it will announce that this node
|
||||
// subscribes to the topic.
|
||||
// the first subscription and no relays exist so far for the topic, it will
|
||||
// announce that this node subscribes to the topic.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
||||
sub := req.sub
|
||||
@ -661,8 +661,8 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
||||
}
|
||||
|
||||
// handleAddRelay adds a relay for a particular topic. If it is
|
||||
// the first relay for the topic, it will announce that this node
|
||||
// relays for the topic.
|
||||
// the first relay and no subscriptions exist so far for the topic , it will
|
||||
// announce that this node relays for the topic.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleAddRelay(req *addRelayReq) {
|
||||
topic := req.topic
|
||||
@ -685,8 +685,9 @@ func (p *PubSub) handleAddRelay(req *addRelayReq) {
|
||||
}
|
||||
|
||||
// handleRemoveRelay removes one relay reference from bookkeeping.
|
||||
// If this was the last relay reference for a given topic, it will also
|
||||
// announce that this node is not relaying for this topic anymore.
|
||||
// If this was the last relay reference and no more subscriptions exist
|
||||
// for a given topic, it will also announce that this node is not relaying
|
||||
// for this topic anymore.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleRemoveRelay(topic string) {
|
||||
if p.myRelays[topic] == 0 {
|
||||
@ -820,6 +821,21 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// canRelayMsg returns whether we are able to relay for one of the topics
|
||||
// of a given message
|
||||
func (p *PubSub) canRelayMsg(msg *pb.Message) bool {
|
||||
if len(p.myRelays) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, t := range msg.GetTopicIDs() {
|
||||
if relays := p.myRelays[t]; relays != 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
|
||||
if t, ok := p.myTopics[topic]; ok {
|
||||
t.sendNotification(PeerEvent{PeerLeave, pid})
|
||||
@ -865,7 +881,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
||||
}
|
||||
|
||||
for _, pmsg := range rpc.GetPublish() {
|
||||
if !p.subscribedToMsg(pmsg) {
|
||||
if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) {
|
||||
log.Warning("received message we didn't subscribe to. Dropping.")
|
||||
continue
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user