From ab2fef7c1b3b5f6e8e966ca5eee22ba67632de7f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 11 Sep 2016 13:56:07 -0700 Subject: [PATCH] refactor publish messaging --- comm.go | 98 +++++++++++++++++++++++ floodsub.go | 203 ++++++++++++++--------------------------------- floodsub_test.go | 25 ++++++ 3 files changed, 183 insertions(+), 143 deletions(-) create mode 100644 comm.go diff --git a/comm.go b/comm.go new file mode 100644 index 0000000..be8b62d --- /dev/null +++ b/comm.go @@ -0,0 +1,98 @@ +package floodsub + +import ( + "bufio" + "context" + "io" + + pb "github.com/whyrusleeping/go-floodsub/pb" + + ggio "github.com/gogo/protobuf/io" + proto "github.com/gogo/protobuf/proto" + inet "github.com/libp2p/go-libp2p/p2p/net" +) + +// get the initial RPC containing all of our subscriptions to send to new peers +func (p *PubSub) getHelloPacket() *RPC { + var rpc RPC + for t := range p.myTopics { + as := &pb.RPC_SubOpts{ + Topicid: proto.String(t), + Subscribe: proto.Bool(true), + } + rpc.Subscriptions = append(rpc.Subscriptions, as) + } + return &rpc +} + +func (p *PubSub) handleNewStream(s inet.Stream) { + defer s.Close() + + r := ggio.NewDelimitedReader(s, 1<<20) + for { + rpc := new(RPC) + err := r.ReadMsg(&rpc.RPC) + if err != nil { + if err != io.EOF { + log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) + } + return + } + + rpc.from = s.Conn().RemotePeer() + select { + case p.incoming <- rpc: + case <-p.ctx.Done(): + return + } + } +} + +func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) { + var dead bool + bufw := bufio.NewWriter(s) + wc := ggio.NewDelimitedWriter(bufw) + + writeMsg := func(msg proto.Message) error { + err := wc.WriteMsg(msg) + if err != nil { + return err + } + + return bufw.Flush() + } + + defer wc.Close() + for { + select { + case rpc, ok := <-outgoing: + if !ok { + return + } + if dead { + // continue in order to drain messages + continue + } + + err := writeMsg(&rpc.RPC) + if err != nil { + log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err) + dead = true + go func() { + p.peerDead <- s.Conn().RemotePeer() + }() + } + + case <-ctx.Done(): + return + } + } +} + +func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { + return &RPC{ + RPC: pb.RPC{ + Subscriptions: subs, + }, + } +} diff --git a/floodsub.go b/floodsub.go index 43d8339..c71d683 100644 --- a/floodsub.go +++ b/floodsub.go @@ -1,17 +1,13 @@ package floodsub import ( - "bufio" "context" "encoding/binary" "fmt" - "io" - "sync" "time" pb "github.com/whyrusleeping/go-floodsub/pb" - ggio "github.com/gogo/protobuf/io" proto "github.com/gogo/protobuf/proto" peer "github.com/ipfs/go-libp2p-peer" logging "github.com/ipfs/go-log" @@ -23,31 +19,35 @@ import ( const ID = protocol.ID("/floodsub/1.0.0") -var ( - AddSubMessageType = "sub" - UnsubMessageType = "unsub" - PubMessageType = "pub" -) - var log = logging.Logger("floodsub") type PubSub struct { host host.Host + // incoming messages from other peers incoming chan *RPC - publish chan *Message + + // messages we are publishing out to our peers + publish chan *Message + + // addSub is a control channel for us to add and remove subscriptions + addSub chan *addSub + + // a notification channel for incoming streams from other peers newPeers chan inet.Stream + + // a notification channel for when our peers die peerDead chan peer.ID + // The set of topics we are subscribed to myTopics map[string]chan *Message - pubsubLk sync.Mutex - topics map[string]map[peer.ID]struct{} + // topics tracks which topics each of our peers are subscribed to + topics map[string]map[peer.ID]struct{} + peers map[peer.ID]chan *RPC seenMessages *timecache.TimeCache - addSub chan *addSub - ctx context.Context } @@ -89,84 +89,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { return ps } -func (p *PubSub) getHelloPacket() *RPC { - var rpc RPC - for t, _ := range p.myTopics { - as := &pb.RPC_SubOpts{ - Topicid: proto.String(t), - Subscribe: proto.Bool(true), - } - rpc.Subscriptions = append(rpc.Subscriptions, as) - } - return &rpc -} - -func (p *PubSub) handleNewStream(s inet.Stream) { - defer s.Close() - - r := ggio.NewDelimitedReader(s, 1<<20) - for { - rpc := new(RPC) - err := r.ReadMsg(&rpc.RPC) - if err != nil { - if err != io.EOF { - log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) - } - return - } - - rpc.from = s.Conn().RemotePeer() - select { - case p.incoming <- rpc: - case <-p.ctx.Done(): - return - } - } -} - -func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, in <-chan *RPC) { - var dead bool - bufw := bufio.NewWriter(s) - wc := ggio.NewDelimitedWriter(bufw) - - writeMsg := func(msg proto.Message) error { - err := wc.WriteMsg(msg) - if err != nil { - return err - } - - return bufw.Flush() - } - - defer wc.Close() - for { - select { - case rpc, ok := <-in: - if !ok { - return - } - if dead { - // continue in order to drain messages - continue - } - - err := writeMsg(&rpc.RPC) - if err != nil { - log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err) - dead = true - go func() { - p.peerDead <- s.Conn().RemotePeer() - }() - } - - case <-ctx.Done(): - return - } - } -} - func (p *PubSub) processLoop(ctx context.Context) { - for { select { case s := <-p.newPeers: @@ -192,16 +115,15 @@ func (p *PubSub) processLoop(ctx context.Context) { err := p.handleIncomingRPC(rpc) if err != nil { log.Error("handling RPC: ", err) + continue } case msg := <-p.publish: - err := p.recvMessage(msg.Message) - if err != nil { - log.Error("error receiving message: ", err) - } + p.notifySubs(msg.Message) - err = p.publishMessage(p.host.ID(), msg.Message) + err := p.publishMessage(p.host.ID(), msg.Message) if err != nil { log.Error("publishing message: ", err) + continue } case <-ctx.Done(): log.Info("pubsub processloop shutting down") @@ -209,9 +131,9 @@ func (p *PubSub) processLoop(ctx context.Context) { } } } -func (p *PubSub) handleSubscriptionChange(sub *addSub) { - subopt := pb.RPC_SubOpts{ +func (p *PubSub) handleSubscriptionChange(sub *addSub) { + subopt := &pb.RPC_SubOpts{ Topicid: &sub.topic, Subscribe: &sub.sub, } @@ -236,35 +158,19 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { delete(p.myTopics, sub.topic) } - out := &RPC{ - RPC: pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{ - &subopt, - }, - }, - } - + out := rpcWithSubs(subopt) for _, peer := range p.peers { peer <- out } } -func (p *PubSub) recvMessage(msg *pb.Message) error { - if len(msg.GetTopicIDs()) > 1 { - return fmt.Errorf("Dont yet handle multiple topics per message") +func (p *PubSub) notifySubs(msg *pb.Message) { + for _, topic := range msg.GetTopicIDs() { + subch, ok := p.myTopics[topic] + if ok { + subch <- &Message{msg} + } } - if len(msg.GetTopicIDs()) == 0 { - return fmt.Errorf("no topic on received message") - } - - topic := msg.GetTopicIDs()[0] - subch, ok := p.myTopics[topic] - if ok { - subch <- &Message{msg} - } else { - log.Error("received message we we'rent subscribed to") - } - return nil } func (p *PubSub) seenMessage(id string) bool { @@ -275,6 +181,15 @@ func (p *PubSub) markSeen(id string) { p.seenMessages.Add(id) } +func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { + for _, t := range msg.GetTopicIDs() { + if _, ok := p.myTopics[t]; ok { + return true + } + } + return false +} + func (p *PubSub) handleIncomingRPC(rpc *RPC) error { for _, subopt := range rpc.GetSubscriptions() { t := subopt.GetTopicid() @@ -296,33 +211,35 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { } for _, pmsg := range rpc.GetPublish() { - msg := &Message{pmsg} - - id := msg.Message.GetFrom() + string(msg.GetSeqno()) - - if p.seenMessage(id) { + if !p.subscribedToMsg(pmsg) { + log.Warning("received message we didn't subscribe to. Dropping.") continue } - if msg.GetFrom() == p.host.ID() { - log.Error("skipping message from self") - return nil - } - - p.markSeen(id) - - if err := p.recvMessage(pmsg); err != nil { - log.Error("error receiving message: ", err) - } - - err := p.publishMessage(rpc.from, pmsg) - if err != nil { - log.Error("publish message: ", err) - } + p.maybePublishMessage(rpc.from, pmsg) } return nil } +func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { + msg := &Message{pmsg} + + id := msg.Message.GetFrom() + string(msg.GetSeqno()) + + if p.seenMessage(id) { + return + } + + p.markSeen(id) + + p.notifySubs(pmsg) + + err := p.publishMessage(from, pmsg) + if err != nil { + log.Error("publish message: ", err) + } +} + func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { if len(msg.GetTopicIDs()) != 1 { return fmt.Errorf("don't support publishing to multiple topics in a single message") @@ -335,7 +252,7 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { out := &RPC{RPC: pb.RPC{Publish: []*pb.Message{msg}}} - for pid, _ := range tmap { + for pid := range tmap { if pid == from || pid == peer.ID(msg.GetFrom()) { continue } diff --git a/floodsub_test.go b/floodsub_test.go index 8e0d621..f8c2947 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -207,3 +207,28 @@ func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) { t.Fatal("timed out waiting for message of: ", exp) } } + +func TestNoConnection(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 10) + + psubs := getPubsubs(ctx, hosts) + + ch, err := psubs[5].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + err = psubs[0].Publish("foobar", []byte("TESTING")) + if err != nil { + t.Fatal(err) + } + + select { + case <-ch: + t.Fatal("shouldnt have gotten a message") + case <-time.After(time.Millisecond * 200): + } +}