Feat : handle dynamic peer topic sub unsub (#751)

* feat: handle dynamic peer join and leave a pubSub topic

Co-authored-by: richΛrd <info@richardramos.me>


---------

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2023-09-19 11:35:29 +05:30 committed by GitHub
parent a650469fae
commit 9b05d48318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 0 deletions

View File

@ -420,6 +420,10 @@ func (w *WakuNode) Start(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
err = w.peermanager.SubscribeToRelayEvtBus(w.relay.(*relay.WakuRelay).Events())
if err != nil {
return err
}
w.peermanager.Start(ctx) w.peermanager.Start(ctx)
w.registerAndMonitorReachability(ctx) w.registerAndMonitorReachability(ctx)
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
@ -29,6 +30,7 @@ type PeerManager struct {
host host.Host host host.Host
serviceSlots *ServiceSlots serviceSlots *ServiceSlots
ctx context.Context ctx context.Context
sub event.Subscription
} }
const peerConnectivityLoopSecs = 15 const peerConnectivityLoopSecs = 15
@ -80,9 +82,50 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {
pm.peerConnector = pc pm.peerConnector = pc
} }
func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error {
var err error
pm.sub, err = bus.Subscribe(new(relay.EvtPeerTopic))
if err != nil {
return err
}
return nil
}
func (pm *PeerManager) peerEventLoop(ctx context.Context) {
defer pm.sub.Close()
for {
select {
case e := <-pm.sub.Out():
peerEvt := e.(relay.EvtPeerTopic)
wps := pm.host.Peerstore().(*wps.WakuPeerstoreImpl)
peerID := peerEvt.PeerID
if peerEvt.State == relay.PEER_JOINED {
err := wps.AddPubSubTopic(peerID, peerEvt.Topic)
if err != nil {
pm.logger.Error("failed to add pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.Error(err))
}
} else if peerEvt.State == relay.PEER_LEFT {
err := wps.RemovePubSubTopic(peerID, peerEvt.Topic)
if err != nil {
pm.logger.Error("failed to remove pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.Error(err))
}
} else {
pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State)))
}
case <-ctx.Done():
return
}
}
}
// Start starts the processing to be done by peer manager. // Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) { func (pm *PeerManager) Start(ctx context.Context) {
pm.ctx = ctx pm.ctx = ctx
if pm.sub != nil {
go pm.peerEventLoop(ctx)
}
go pm.connectivityLoop(ctx) go pm.connectivityLoop(ctx)
} }

View File

@ -55,6 +55,7 @@ type WakuPeerstore interface {
Direction(p peer.ID) (network.Direction, error) Direction(p peer.ID) (network.Direction, error)
AddPubSubTopic(p peer.ID, topic string) error AddPubSubTopic(p peer.ID, topic string) error
RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error) PubSubTopics(p peer.ID) ([]string, error)
SetPubSubTopics(p peer.ID, topics []string) error SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopic(pubSubTopic string) peer.IDSlice PeersByPubSubTopic(pubSubTopic string) peer.IDSlice
@ -157,6 +158,22 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error {
return ps.peerStore.Put(p, peerPubSubTopics, existingTopics) return ps.peerStore.Put(p, peerPubSubTopics, existingTopics)
} }
// RemovePubSubTopic removes a pubSubTopic from the peer
func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error {
existingTopics, err := ps.PubSubTopics(p)
if err != nil {
return err
}
for i := range existingTopics {
existingTopics = append(existingTopics[:i], existingTopics[i+1:]...)
}
err = ps.SetPubSubTopics(p, existingTopics)
if err != nil {
return err
}
return nil
}
// SetPubSubTopics sets pubSubTopics for a peer, it also overrides existing ones that were set previously.. // SetPubSubTopics sets pubSubTopics for a peer, it also overrides existing ones that were set previously..
func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error { func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error {
return ps.peerStore.Put(p, peerPubSubTopics, topics) return ps.peerStore.Put(p, peerPubSubTopics, topics)

View File

@ -57,11 +57,13 @@ type WakuRelay struct {
topicsMutex sync.RWMutex topicsMutex sync.RWMutex
wakuRelayTopics map[string]*pubsub.Topic wakuRelayTopics map[string]*pubsub.Topic
relaySubs map[string]*pubsub.Subscription relaySubs map[string]*pubsub.Subscription
topicEvtHanders map[string]*pubsub.TopicEventHandler
events event.Bus events event.Bus
emitters struct { emitters struct {
EvtRelaySubscribed event.Emitter EvtRelaySubscribed event.Emitter
EvtRelayUnsubscribed event.Emitter EvtRelayUnsubscribed event.Emitter
EvtPeerTopic event.Emitter
} }
*waku_proto.CommonService *waku_proto.CommonService
@ -77,6 +79,19 @@ type EvtRelayUnsubscribed struct {
Topic string Topic string
} }
type PeerTopicState int
const (
PEER_JOINED = iota
PEER_LEFT
)
type EvtPeerTopic struct {
Topic string
PeerID peer.ID
State PeerTopicState
}
func msgIDFn(pmsg *pubsub_pb.Message) string { func msgIDFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data)) return string(hash.SHA256(pmsg.Data))
} }
@ -87,6 +102,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.timesource = timesource w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic) w.wakuRelayTopics = make(map[string]*pubsub.Topic)
w.relaySubs = make(map[string]*pubsub.Subscription) w.relaySubs = make(map[string]*pubsub.Subscription)
w.topicEvtHanders = make(map[string]*pubsub.TopicEventHandler)
w.topicValidators = make(map[string][]validatorFn) w.topicValidators = make(map[string][]validatorFn)
w.bcaster = bcaster w.bcaster = bcaster
w.minPeersToPublish = minPeersToPublish w.minPeersToPublish = minPeersToPublish
@ -229,6 +245,11 @@ func (w *WakuRelay) start() error {
return err return err
} }
w.emitters.EvtPeerTopic, err = w.events.Emitter(new(EvtPeerTopic))
if err != nil {
return err
}
w.log.Info("Relay protocol started") w.log.Info("Relay protocol started")
return nil return nil
} }
@ -303,6 +324,11 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
return nil, err return nil, err
} }
evtHandler, err := w.addPeerTopicEventListener(pubSubTopic)
if err != nil {
return nil, err
}
w.topicEvtHanders[topic] = evtHandler
w.relaySubs[topic] = sub w.relaySubs[topic] = sub
err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic}) err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic})
@ -420,6 +446,12 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
w.relaySubs[topic].Cancel() w.relaySubs[topic].Cancel()
delete(w.relaySubs, topic) delete(w.relaySubs, topic)
evtHandler, ok := w.topicEvtHanders[topic]
if ok {
evtHandler.Cancel()
delete(w.topicEvtHanders, topic)
}
err := w.wakuRelayTopics[topic].Close() err := w.wakuRelayTopics[topic].Close()
if err != nil { if err != nil {
return err return err
@ -495,3 +527,46 @@ func (w *WakuRelay) Params() pubsub.GossipSubParams {
func (w *WakuRelay) Events() event.Bus { func (w *WakuRelay) Events() event.Bus {
return w.events return w.events
} }
func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.TopicEventHandler, error) {
handler, err := topic.EventHandler()
if err != nil {
return nil, err
}
w.WaitGroup().Add(1)
go w.topicEventPoll(topic.String(), handler)
return handler, nil
}
func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) {
defer w.WaitGroup().Done()
for {
evt, err := handler.NextPeerEvent(w.Context())
if err != nil {
if err == context.Canceled {
break
}
w.log.Error("failed to get next peer event", zap.String("topic", topic), zap.Error(err))
continue
}
if evt.Peer.Validate() != nil { //Empty peerEvent is returned when context passed in done.
break
}
if evt.Type == pubsub.PeerJoin {
w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_JOINED})
if err != nil {
w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err))
}
} else if evt.Type == pubsub.PeerLeave {
w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_LEFT})
if err != nil {
w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err))
}
} else {
w.log.Error("unknown event type received", zap.String("topic", topic),
zap.Int("eventType", int(evt.Type)))
}
}
}