From dd331f38c64aad7205b2d58dcad78d6cbbd470e3 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 12 Sep 2016 13:22:16 -0700 Subject: [PATCH] more code cleanup, remove debug statements, add support for multiple messages and topics --- comm.go | 6 +++- floodsub.go | 40 +++++++++++------------ floodsub_test.go | 83 ++++++++++++++++++++++++++++++++++++++---------- notify.go | 27 ++++++++-------- package.json | 4 +-- 5 files changed, 106 insertions(+), 54 deletions(-) diff --git a/comm.go b/comm.go index be8b62d..f78d54c 100644 --- a/comm.go +++ b/comm.go @@ -76,7 +76,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo err := writeMsg(&rpc.RPC) if err != nil { - log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err) + log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err) dead = true go func() { p.peerDead <- s.Conn().RemotePeer() @@ -96,3 +96,7 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { }, } } + +func rpcWithMessages(msgs ...*pb.Message) *RPC { + return &RPC{RPC: pb.RPC{Publish: msgs}} +} diff --git a/floodsub.go b/floodsub.go index c71d683..10d4d15 100644 --- a/floodsub.go +++ b/floodsub.go @@ -82,7 +82,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { } h.SetStreamHandler(ID, ps.handleNewStream) - h.Network().Notify(ps) + h.Network().Notify((*PubSubNotif)(ps)) go ps.processLoop(ctx) @@ -109,6 +109,9 @@ func (p *PubSub) processLoop(ctx context.Context) { case pid := <-p.peerDead: delete(p.peers, pid) + for _, t := range p.topics { + delete(t, pid) + } case sub := <-p.addSub: p.handleSubscriptionChange(sub) case rpc := <-p.incoming: @@ -118,13 +121,7 @@ func (p *PubSub) processLoop(ctx context.Context) { continue } case msg := <-p.publish: - p.notifySubs(msg.Message) - - err := p.publishMessage(p.host.ID(), msg.Message) - if err != nil { - log.Error("publishing message: ", err) - continue - } + p.maybePublishMessage(p.host.ID(), msg.Message) case <-ctx.Done(): log.Info("pubsub processloop shutting down") return @@ -222,10 +219,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { } func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { - msg := &Message{pmsg} - - id := msg.Message.GetFrom() + string(msg.GetSeqno()) - + id := string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) if p.seenMessage(id) { return } @@ -241,18 +235,20 @@ func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { } func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { - if len(msg.GetTopicIDs()) != 1 { - return fmt.Errorf("don't support publishing to multiple topics in a single message") + tosend := make(map[peer.ID]struct{}) + for _, topic := range msg.GetTopicIDs() { + tmap, ok := p.topics[topic] + if !ok { + continue + } + + for p, _ := range tmap { + tosend[p] = struct{}{} + } } - tmap, ok := p.topics[msg.GetTopicIDs()[0]] - if !ok { - return nil - } - - out := &RPC{RPC: pb.RPC{Publish: []*pb.Message{msg}}} - - for pid := range tmap { + out := rpcWithMessages(msg) + for pid := range tosend { if pid == from || pid == peer.ID(msg.GetFrom()) { continue } diff --git a/floodsub_test.go b/floodsub_test.go index f8c2947..5bfac6f 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -67,6 +67,17 @@ func getPubsubs(ctx context.Context, hs []host.Host) []*PubSub { return psubs } +func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) { + select { + case msg := <-ch: + if !bytes.Equal(msg.GetData(), exp) { + t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData())) + } + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for message of: ", string(exp)) + } +} + func TestBasicFloodsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -152,7 +163,7 @@ func TestReconnects(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getNetHosts(t, ctx, 3) psubs := getPubsubs(ctx, hosts) @@ -180,7 +191,9 @@ func TestReconnects(t *testing.T) { assertReceive(t, A, msg) assertReceive(t, B, msg) - hosts[2].Close() + psubs[2].Unsub("cats") + + time.Sleep(time.Millisecond * 50) msg2 := []byte("potato") err = psubs[0].Publish("cats", msg2) @@ -189,25 +202,32 @@ func TestReconnects(t *testing.T) { } assertReceive(t, A, msg2) + select { + case _, ok := <-B: + if ok { + t.Fatal("shouldnt have gotten data on this channel") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for B chan to be closed") + } + + ch2, err := psubs[2].Subscribe("cats") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 50) - _, ok := psubs[0].peers[hosts[2].ID()] - if ok { - t.Fatal("shouldnt have this peer anymore") - } -} - -func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) { - select { - case msg := <-ch: - if !bytes.Equal(msg.GetData(), exp) { - t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData())) - } - case <-time.After(time.Second * 5): - t.Fatal("timed out waiting for message of: ", exp) + + nextmsg := []byte("ifps is kul") + err = psubs[0].Publish("cats", nextmsg) + if err != nil { + t.Fatal(err) } + + assertReceive(t, ch2, nextmsg) } +// make sure messages arent routed between nodes who arent subscribed func TestNoConnection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -232,3 +252,34 @@ func TestNoConnection(t *testing.T) { case <-time.After(time.Millisecond * 200): } } + +func TestSelfReceive(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host := getNetHosts(t, ctx, 1)[0] + + psub := NewFloodSub(ctx, host) + + msg := []byte("hello world") + + err := psub.Publish("foobar", msg) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 10) + + ch, err := psub.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msg2 := []byte("goodbye world") + err = psub.Publish("foobar", msg2) + if err != nil { + t.Fatal(err) + } + + assertReceive(t, ch, msg2) +} diff --git a/notify.go b/notify.go index 60d9b59..e91c7e0 100644 --- a/notify.go +++ b/notify.go @@ -7,34 +7,35 @@ import ( inet "github.com/libp2p/go-libp2p/p2p/net" ) -var _ inet.Notifiee = (*PubSub)(nil) +var _ inet.Notifiee = (*PubSubNotif)(nil) -func (p *PubSub) OpenedStream(n inet.Network, s inet.Stream) { +type PubSubNotif PubSub +func (p *PubSubNotif) OpenedStream(n inet.Network, s inet.Stream) { } -func (p *PubSub) ClosedStream(n inet.Network, s inet.Stream) { - +func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream) { } -func (p *PubSub) Connected(n inet.Network, c inet.Conn) { +func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) { s, err := p.host.NewStream(context.Background(), c.RemotePeer(), ID) if err != nil { - log.Error("opening new stream to peer: ", err) + log.Error("opening new stream to peer: ", err, c.LocalPeer(), c.RemotePeer()) return } - p.newPeers <- s + select { + case p.newPeers <- s: + case <-p.ctx.Done(): + s.Close() + } } -func (p *PubSub) Disconnected(n inet.Network, c inet.Conn) { - +func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn) { } -func (p *PubSub) Listen(n inet.Network, _ ma.Multiaddr) { - +func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr) { } -func (p *PubSub) ListenClose(n inet.Network, _ ma.Multiaddr) { - +func (p *PubSubNotif) ListenClose(n inet.Network, _ ma.Multiaddr) { } diff --git a/package.json b/package.json index 96cd5f6..eca0cb9 100644 --- a/package.json +++ b/package.json @@ -9,9 +9,9 @@ "gxDependencies": [ { "author": "whyrusleeping", - "hash": "Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS", + "hash": "QmcpZpCmnfjRunzeYtXZdtcy16P2mC65CThjb7aA8sPqNY", "name": "go-libp2p", - "version": "3.4.1" + "version": "3.5.1" }, { "author": "whyrusleeping",