diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index e074e681..9a008851 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -420,6 +420,10 @@ func (w *WakuNode) Start(ctx context.Context) error { if err != nil { return err } + err = w.peermanager.SubscribeToRelayEvtBus(w.relay.(*relay.WakuRelay).Events()) + if err != nil { + return err + } w.peermanager.Start(ctx) w.registerAndMonitorReachability(ctx) } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index d5b83f85..46977de5 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -29,6 +30,7 @@ type PeerManager struct { host host.Host serviceSlots *ServiceSlots ctx context.Context + sub event.Subscription } const peerConnectivityLoopSecs = 15 @@ -80,9 +82,50 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { pm.peerConnector = pc } +func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error { + var err error + pm.sub, err = bus.Subscribe(new(relay.EvtPeerTopic)) + if err != nil { + return err + } + return nil +} + +func (pm *PeerManager) peerEventLoop(ctx context.Context) { + defer pm.sub.Close() + for { + select { + case e := <-pm.sub.Out(): + peerEvt := e.(relay.EvtPeerTopic) + wps := pm.host.Peerstore().(*wps.WakuPeerstoreImpl) + peerID := peerEvt.PeerID + if peerEvt.State == relay.PEER_JOINED { + err := wps.AddPubSubTopic(peerID, peerEvt.Topic) + if err != nil { + pm.logger.Error("failed to add pubSubTopic for peer", + logging.HostID("peerID", peerID), zap.Error(err)) + } + } else if peerEvt.State == relay.PEER_LEFT { + err := wps.RemovePubSubTopic(peerID, peerEvt.Topic) + if err != nil { + pm.logger.Error("failed to remove pubSubTopic for peer", + logging.HostID("peerID", peerID), zap.Error(err)) + } + } else { + pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State))) + } + case <-ctx.Done(): + return + } + } +} + // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { pm.ctx = ctx + if pm.sub != nil { + go pm.peerEventLoop(ctx) + } go pm.connectivityLoop(ctx) } diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index f8390fbd..515d7a3d 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -55,6 +55,7 @@ type WakuPeerstore interface { Direction(p peer.ID) (network.Direction, error) AddPubSubTopic(p peer.ID, topic string) error + RemovePubSubTopic(p peer.ID, topic string) error PubSubTopics(p peer.ID) ([]string, error) SetPubSubTopics(p peer.ID, topics []string) error PeersByPubSubTopic(pubSubTopic string) peer.IDSlice @@ -157,6 +158,22 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error { return ps.peerStore.Put(p, peerPubSubTopics, existingTopics) } +// RemovePubSubTopic removes a pubSubTopic from the peer +func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error { + existingTopics, err := ps.PubSubTopics(p) + if err != nil { + return err + } + for i := range existingTopics { + existingTopics = append(existingTopics[:i], existingTopics[i+1:]...) + } + err = ps.SetPubSubTopics(p, existingTopics) + if err != nil { + return err + } + return nil +} + // SetPubSubTopics sets pubSubTopics for a peer, it also overrides existing ones that were set previously.. func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error { return ps.peerStore.Put(p, peerPubSubTopics, topics) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 6ca51c84..15ac5a95 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -57,11 +57,13 @@ type WakuRelay struct { topicsMutex sync.RWMutex wakuRelayTopics map[string]*pubsub.Topic relaySubs map[string]*pubsub.Subscription + topicEvtHanders map[string]*pubsub.TopicEventHandler events event.Bus emitters struct { EvtRelaySubscribed event.Emitter EvtRelayUnsubscribed event.Emitter + EvtPeerTopic event.Emitter } *waku_proto.CommonService @@ -77,6 +79,19 @@ type EvtRelayUnsubscribed struct { Topic string } +type PeerTopicState int + +const ( + PEER_JOINED = iota + PEER_LEFT +) + +type EvtPeerTopic struct { + Topic string + PeerID peer.ID + State PeerTopicState +} + func msgIDFn(pmsg *pubsub_pb.Message) string { return string(hash.SHA256(pmsg.Data)) } @@ -87,6 +102,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.timesource = timesource w.wakuRelayTopics = make(map[string]*pubsub.Topic) w.relaySubs = make(map[string]*pubsub.Subscription) + w.topicEvtHanders = make(map[string]*pubsub.TopicEventHandler) w.topicValidators = make(map[string][]validatorFn) w.bcaster = bcaster w.minPeersToPublish = minPeersToPublish @@ -229,6 +245,11 @@ func (w *WakuRelay) start() error { return err } + w.emitters.EvtPeerTopic, err = w.events.Emitter(new(EvtPeerTopic)) + if err != nil { + return err + } + w.log.Info("Relay protocol started") return nil } @@ -303,6 +324,11 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return nil, err } + evtHandler, err := w.addPeerTopicEventListener(pubSubTopic) + if err != nil { + return nil, err + } + w.topicEvtHanders[topic] = evtHandler w.relaySubs[topic] = sub err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic}) @@ -420,6 +446,12 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { w.relaySubs[topic].Cancel() delete(w.relaySubs, topic) + evtHandler, ok := w.topicEvtHanders[topic] + if ok { + evtHandler.Cancel() + delete(w.topicEvtHanders, topic) + } + err := w.wakuRelayTopics[topic].Close() if err != nil { return err @@ -495,3 +527,46 @@ func (w *WakuRelay) Params() pubsub.GossipSubParams { func (w *WakuRelay) Events() event.Bus { return w.events } + +func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.TopicEventHandler, error) { + handler, err := topic.EventHandler() + if err != nil { + return nil, err + } + w.WaitGroup().Add(1) + go w.topicEventPoll(topic.String(), handler) + return handler, nil +} + +func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) { + defer w.WaitGroup().Done() + for { + evt, err := handler.NextPeerEvent(w.Context()) + if err != nil { + if err == context.Canceled { + break + } + w.log.Error("failed to get next peer event", zap.String("topic", topic), zap.Error(err)) + continue + } + if evt.Peer.Validate() != nil { //Empty peerEvent is returned when context passed in done. + break + } + if evt.Type == pubsub.PeerJoin { + w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) + err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_JOINED}) + if err != nil { + w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err)) + } + } else if evt.Type == pubsub.PeerLeave { + w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) + err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_LEFT}) + if err != nil { + w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err)) + } + } else { + w.log.Error("unknown event type received", zap.String("topic", topic), + zap.Int("eventType", int(evt.Type))) + } + } +}