more code cleanup, remove debug statements, add support for multiple

messages and topics
This commit is contained in:
Jeromy 2016-09-12 13:22:16 -07:00
parent ab2fef7c1b
commit dd331f38c6
5 changed files with 106 additions and 54 deletions

View File

@ -76,7 +76,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo
err := writeMsg(&rpc.RPC)
if err != nil {
log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err)
log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err)
dead = true
go func() {
p.peerDead <- s.Conn().RemotePeer()
@ -96,3 +96,7 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
},
}
}
func rpcWithMessages(msgs ...*pb.Message) *RPC {
return &RPC{RPC: pb.RPC{Publish: msgs}}
}

View File

@ -82,7 +82,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
}
h.SetStreamHandler(ID, ps.handleNewStream)
h.Network().Notify(ps)
h.Network().Notify((*PubSubNotif)(ps))
go ps.processLoop(ctx)
@ -109,6 +109,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
case pid := <-p.peerDead:
delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
}
case sub := <-p.addSub:
p.handleSubscriptionChange(sub)
case rpc := <-p.incoming:
@ -118,13 +121,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
continue
}
case msg := <-p.publish:
p.notifySubs(msg.Message)
err := p.publishMessage(p.host.ID(), msg.Message)
if err != nil {
log.Error("publishing message: ", err)
continue
}
p.maybePublishMessage(p.host.ID(), msg.Message)
case <-ctx.Done():
log.Info("pubsub processloop shutting down")
return
@ -222,10 +219,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
}
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
msg := &Message{pmsg}
id := msg.Message.GetFrom() + string(msg.GetSeqno())
id := string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
if p.seenMessage(id) {
return
}
@ -241,18 +235,20 @@ func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
}
func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
if len(msg.GetTopicIDs()) != 1 {
return fmt.Errorf("don't support publishing to multiple topics in a single message")
}
tmap, ok := p.topics[msg.GetTopicIDs()[0]]
tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() {
tmap, ok := p.topics[topic]
if !ok {
return nil
continue
}
out := &RPC{RPC: pb.RPC{Publish: []*pb.Message{msg}}}
for p, _ := range tmap {
tosend[p] = struct{}{}
}
}
for pid := range tmap {
out := rpcWithMessages(msg)
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}

View File

@ -67,6 +67,17 @@ func getPubsubs(ctx context.Context, hs []host.Host) []*PubSub {
return psubs
}
func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) {
select {
case msg := <-ch:
if !bytes.Equal(msg.GetData(), exp) {
t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData()))
}
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for message of: ", string(exp))
}
}
func TestBasicFloodsub(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -152,7 +163,7 @@ func TestReconnects(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getNetHosts(t, ctx, 3)
psubs := getPubsubs(ctx, hosts)
@ -180,7 +191,9 @@ func TestReconnects(t *testing.T) {
assertReceive(t, A, msg)
assertReceive(t, B, msg)
hosts[2].Close()
psubs[2].Unsub("cats")
time.Sleep(time.Millisecond * 50)
msg2 := []byte("potato")
err = psubs[0].Publish("cats", msg2)
@ -189,25 +202,32 @@ func TestReconnects(t *testing.T) {
}
assertReceive(t, A, msg2)
select {
case _, ok := <-B:
if ok {
t.Fatal("shouldnt have gotten data on this channel")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for B chan to be closed")
}
ch2, err := psubs[2].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
_, ok := psubs[0].peers[hosts[2].ID()]
if ok {
t.Fatal("shouldnt have this peer anymore")
}
}
func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) {
select {
case msg := <-ch:
if !bytes.Equal(msg.GetData(), exp) {
t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData()))
}
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for message of: ", exp)
nextmsg := []byte("ifps is kul")
err = psubs[0].Publish("cats", nextmsg)
if err != nil {
t.Fatal(err)
}
assertReceive(t, ch2, nextmsg)
}
// make sure messages arent routed between nodes who arent subscribed
func TestNoConnection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -232,3 +252,34 @@ func TestNoConnection(t *testing.T) {
case <-time.After(time.Millisecond * 200):
}
}
func TestSelfReceive(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host := getNetHosts(t, ctx, 1)[0]
psub := NewFloodSub(ctx, host)
msg := []byte("hello world")
err := psub.Publish("foobar", msg)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 10)
ch, err := psub.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
msg2 := []byte("goodbye world")
err = psub.Publish("foobar", msg2)
if err != nil {
t.Fatal(err)
}
assertReceive(t, ch, msg2)
}

View File

@ -7,34 +7,35 @@ import (
inet "github.com/libp2p/go-libp2p/p2p/net"
)
var _ inet.Notifiee = (*PubSub)(nil)
var _ inet.Notifiee = (*PubSubNotif)(nil)
func (p *PubSub) OpenedStream(n inet.Network, s inet.Stream) {
type PubSubNotif PubSub
func (p *PubSubNotif) OpenedStream(n inet.Network, s inet.Stream) {
}
func (p *PubSub) ClosedStream(n inet.Network, s inet.Stream) {
func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream) {
}
func (p *PubSub) Connected(n inet.Network, c inet.Conn) {
func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) {
s, err := p.host.NewStream(context.Background(), c.RemotePeer(), ID)
if err != nil {
log.Error("opening new stream to peer: ", err)
log.Error("opening new stream to peer: ", err, c.LocalPeer(), c.RemotePeer())
return
}
p.newPeers <- s
select {
case p.newPeers <- s:
case <-p.ctx.Done():
s.Close()
}
}
func (p *PubSub) Disconnected(n inet.Network, c inet.Conn) {
func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn) {
}
func (p *PubSub) Listen(n inet.Network, _ ma.Multiaddr) {
func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr) {
}
func (p *PubSub) ListenClose(n inet.Network, _ ma.Multiaddr) {
func (p *PubSubNotif) ListenClose(n inet.Network, _ ma.Multiaddr) {
}

View File

@ -9,9 +9,9 @@
"gxDependencies": [
{
"author": "whyrusleeping",
"hash": "Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS",
"hash": "QmcpZpCmnfjRunzeYtXZdtcy16P2mC65CThjb7aA8sPqNY",
"name": "go-libp2p",
"version": "3.4.1"
"version": "3.5.1"
},
{
"author": "whyrusleeping",