This commit is contained in:
Jan Winkelmann 2016-10-20 13:23:38 +02:00
parent b71e3adbd7
commit 3a30ab4c17
1 changed files with 23 additions and 0 deletions

View File

@ -75,6 +75,7 @@ type RPC struct {
from peer.ID from peer.ID
} }
// NewFloodSub returns a new FloodSub management object
func NewFloodSub(ctx context.Context, h host.Host) *PubSub { func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
ps := &PubSub{ ps := &PubSub{
host: h, host: h,
@ -101,6 +102,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
return ps return ps
} }
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) { func (p *PubSub) processLoop(ctx context.Context) {
for { for {
select { select {
@ -172,6 +174,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
} }
} }
// handleRemoveSubscription removes Subscription sub from bookeeping.
// If this was the last Subscription for a given topic, it will also announce
// that this node is not subscribing to this topic anymore.
// Only called from processLoop.
func (p *PubSub) handleRemoveSubscription(sub *Subscription) { func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
subs := p.myTopics[sub.topic] subs := p.myTopics[sub.topic]
@ -188,6 +194,10 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
} }
} }
// handleAddSubscription adds a Subscription for a particular topic. If it is
// the first Subscription for the topic, it will announce that this node
// subscribes to the topic.
// Only called from processLoop.
func (p *PubSub) handleAddSubscription(req *addSubReq) { func (p *PubSub) handleAddSubscription(req *addSubReq) {
subs := p.myTopics[req.topic] subs := p.myTopics[req.topic]
@ -213,6 +223,8 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
req.resp <- sub req.resp <- sub
} }
// announce announces whether or not this node is interested in a given topic
// Only called from processLoop.
func (p *PubSub) announce(topic string, sub bool) { func (p *PubSub) announce(topic string, sub bool) {
subopt := &pb.RPC_SubOpts{ subopt := &pb.RPC_SubOpts{
Topicid: &topic, Topicid: &topic,
@ -225,6 +237,8 @@ func (p *PubSub) announce(topic string, sub bool) {
} }
} }
// notifySubs sends a given message to all corresponding subscribbers.
// Only called from processLoop.
func (p *PubSub) notifySubs(msg *pb.Message) { func (p *PubSub) notifySubs(msg *pb.Message) {
for _, topic := range msg.GetTopicIDs() { for _, topic := range msg.GetTopicIDs() {
subs := p.myTopics[topic] subs := p.myTopics[topic]
@ -234,14 +248,18 @@ func (p *PubSub) notifySubs(msg *pb.Message) {
} }
} }
// seenMessage returns whether we already saw this message before
func (p *PubSub) seenMessage(id string) bool { func (p *PubSub) seenMessage(id string) bool {
return p.seenMessages.Has(id) return p.seenMessages.Has(id)
} }
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
func (p *PubSub) markSeen(id string) { func (p *PubSub) markSeen(id string) {
p.seenMessages.Add(id) p.seenMessages.Add(id)
} }
// subscribedToMessage returns whether we are subscribed to one of the topics
// of a given message
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
for _, t := range msg.GetTopicIDs() { for _, t := range msg.GetTopicIDs() {
if _, ok := p.myTopics[t]; ok { if _, ok := p.myTopics[t]; ok {
@ -282,6 +300,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
return nil return nil
} }
// msgID returns a unique ID of the passed Message
func msgID(pmsg *pb.Message) string { func msgID(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
} }
@ -337,6 +356,7 @@ type addSubReq struct {
resp chan *Subscription resp chan *Subscription
} }
// Subscribe returns a new Subscription for the given topic
func (p *PubSub) Subscribe(topic string) *Subscription { func (p *PubSub) Subscribe(topic string) *Subscription {
out := make(chan *Subscription, 1) out := make(chan *Subscription, 1)
p.addSub <- &addSubReq{ p.addSub <- &addSubReq{
@ -351,12 +371,14 @@ type topicReq struct {
resp chan []string resp chan []string
} }
// GetTopics returns the topics this node is subscribed to
func (p *PubSub) GetTopics() []string { func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1) out := make(chan []string, 1)
p.getTopics <- &topicReq{resp: out} p.getTopics <- &topicReq{resp: out}
return <-out return <-out
} }
// Publish publishes data under the given topic
func (p *PubSub) Publish(topic string, data []byte) error { func (p *PubSub) Publish(topic string, data []byte) error {
seqno := make([]byte, 8) seqno := make([]byte, 8)
binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano())) binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano()))
@ -377,6 +399,7 @@ type listPeerReq struct {
topic string topic string
} }
// ListPeers returns a list of peers we are connected to.
func (p *PubSub) ListPeers(topic string) []peer.ID { func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID) out := make(chan []peer.ID)
p.getPeers <- &listPeerReq{ p.getPeers <- &listPeerReq{