From 3a30ab4c17e73eb9b2bffec8c065d34e655bc8e3 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 20 Oct 2016 13:23:38 +0200 Subject: [PATCH] comments --- floodsub.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/floodsub.go b/floodsub.go index ac6438b..40f530b 100644 --- a/floodsub.go +++ b/floodsub.go @@ -75,6 +75,7 @@ type RPC struct { from peer.ID } +// NewFloodSub returns a new FloodSub management object func NewFloodSub(ctx context.Context, h host.Host) *PubSub { ps := &PubSub{ host: h, @@ -101,6 +102,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { return ps } +// processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { for { select { @@ -172,6 +174,10 @@ func (p *PubSub) processLoop(ctx context.Context) { } } +// handleRemoveSubscription removes Subscription sub from bookeeping. +// If this was the last Subscription for a given topic, it will also announce +// that this node is not subscribing to this topic anymore. +// Only called from processLoop. func (p *PubSub) handleRemoveSubscription(sub *Subscription) { subs := p.myTopics[sub.topic] @@ -188,6 +194,10 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { } } +// handleAddSubscription adds a Subscription for a particular topic. If it is +// the first Subscription for the topic, it will announce that this node +// subscribes to the topic. +// Only called from processLoop. func (p *PubSub) handleAddSubscription(req *addSubReq) { subs := p.myTopics[req.topic] @@ -213,6 +223,8 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { req.resp <- sub } +// announce announces whether or not this node is interested in a given topic +// Only called from processLoop. func (p *PubSub) announce(topic string, sub bool) { subopt := &pb.RPC_SubOpts{ Topicid: &topic, @@ -225,6 +237,8 @@ func (p *PubSub) announce(topic string, sub bool) { } } +// notifySubs sends a given message to all corresponding subscribbers. +// Only called from processLoop. func (p *PubSub) notifySubs(msg *pb.Message) { for _, topic := range msg.GetTopicIDs() { subs := p.myTopics[topic] @@ -234,14 +248,18 @@ func (p *PubSub) notifySubs(msg *pb.Message) { } } +// seenMessage returns whether we already saw this message before func (p *PubSub) seenMessage(id string) bool { return p.seenMessages.Has(id) } +// markSeen marks a message as seen such that seenMessage returns `true' for the given id func (p *PubSub) markSeen(id string) { p.seenMessages.Add(id) } +// subscribedToMessage returns whether we are subscribed to one of the topics +// of a given message func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { for _, t := range msg.GetTopicIDs() { if _, ok := p.myTopics[t]; ok { @@ -282,6 +300,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return nil } +// msgID returns a unique ID of the passed Message func msgID(pmsg *pb.Message) string { return string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) } @@ -337,6 +356,7 @@ type addSubReq struct { resp chan *Subscription } +// Subscribe returns a new Subscription for the given topic func (p *PubSub) Subscribe(topic string) *Subscription { out := make(chan *Subscription, 1) p.addSub <- &addSubReq{ @@ -351,12 +371,14 @@ type topicReq struct { resp chan []string } +// GetTopics returns the topics this node is subscribed to func (p *PubSub) GetTopics() []string { out := make(chan []string, 1) p.getTopics <- &topicReq{resp: out} return <-out } +// Publish publishes data under the given topic func (p *PubSub) Publish(topic string, data []byte) error { seqno := make([]byte, 8) binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano())) @@ -377,6 +399,7 @@ type listPeerReq struct { topic string } +// ListPeers returns a list of peers we are connected to. func (p *PubSub) ListPeers(topic string) []peer.ID { out := make(chan []peer.ID) p.getPeers <- &listPeerReq{