diff --git a/.gx/lastpubver b/.gx/lastpubver index 9042917..b1d543b 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -0.8.0: QmWiLbk7eE1jGePDAuS26E2A9bMK3e3PMH3dcSeRY3MEBR +0.8.1: QmRJs5veT3gnuYpLAagC3NbzixbkgwjSdUXTKfh3hMo6XM diff --git a/floodsub.go b/floodsub.go index 4a3e8cc..0d7dec5 100644 --- a/floodsub.go +++ b/floodsub.go @@ -31,14 +31,17 @@ type PubSub struct { publish chan *Message // addSub is a control channel for us to add and remove subscriptions - addSub chan *addSub + addSub chan *addSubReq - // + // get list of topics we are subscribed to getTopics chan *topicReq - // + // get chan of peers we are connected to getPeers chan *listPeerReq + // send subscription here to cancel it + cancelCh chan *Subscription + // a notification channel for incoming streams from other peers newPeers chan inet.Stream @@ -46,7 +49,7 @@ type PubSub struct { peerDead chan peer.ID // The set of topics we are subscribed to - myTopics map[string]chan *Message + myTopics map[string]map[*Subscription]struct{} // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} @@ -72,6 +75,7 @@ type RPC struct { from peer.ID } +// NewFloodSub returns a new FloodSub management object func NewFloodSub(ctx context.Context, h host.Host) *PubSub { ps := &PubSub{ host: h, @@ -80,10 +84,11 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { publish: make(chan *Message), newPeers: make(chan inet.Stream), peerDead: make(chan peer.ID), + cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), - addSub: make(chan *addSub), + addSub: make(chan *addSubReq), getTopics: make(chan *topicReq), - myTopics: make(map[string]chan *Message), + myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), seenMessages: timecache.NewTimeCache(time.Second * 30), @@ -97,6 +102,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { return ps } +// processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { for { select { @@ -130,8 +136,10 @@ func (p *PubSub) processLoop(ctx context.Context) { out = append(out, t) } treq.resp <- out + case sub := <-p.cancelCh: + p.handleRemoveSubscription(sub) case sub := <-p.addSub: - p.handleSubscriptionChange(sub) + p.handleAddSubscription(sub) case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -164,30 +172,62 @@ func (p *PubSub) processLoop(ctx context.Context) { } } -func (p *PubSub) handleSubscriptionChange(sub *addSub) { - subopt := &pb.RPC_SubOpts{ - Topicid: &sub.topic, - Subscribe: &sub.sub, +// handleRemoveSubscription removes Subscription sub from bookeeping. +// If this was the last Subscription for a given topic, it will also announce +// that this node is not subscribing to this topic anymore. +// Only called from processLoop. +func (p *PubSub) handleRemoveSubscription(sub *Subscription) { + subs := p.myTopics[sub.topic] + + if subs == nil { + return } - ch, ok := p.myTopics[sub.topic] - if sub.sub { - if ok { - // we don't allow multiple subs per topic at this point - sub.resp <- nil - return - } + sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") + close(sub.ch) + delete(subs, sub) - resp := make(chan *Message, 16) - p.myTopics[sub.topic] = resp - sub.resp <- resp - } else { - if !ok { - return - } - - close(ch) + if len(subs) == 0 { delete(p.myTopics, sub.topic) + p.announce(sub.topic, false) + } +} + +// handleAddSubscription adds a Subscription for a particular topic. If it is +// the first Subscription for the topic, it will announce that this node +// subscribes to the topic. +// Only called from processLoop. +func (p *PubSub) handleAddSubscription(req *addSubReq) { + subs := p.myTopics[req.topic] + + // announce we want this topic + if len(subs) == 0 { + p.announce(req.topic, true) + } + + // make new if not there + if subs == nil { + p.myTopics[req.topic] = make(map[*Subscription]struct{}) + subs = p.myTopics[req.topic] + } + + sub := &Subscription{ + ch: make(chan *Message, 32), + topic: req.topic, + cancelCh: p.cancelCh, + } + + p.myTopics[sub.topic][sub] = struct{}{} + + req.resp <- sub +} + +// announce announces whether or not this node is interested in a given topic +// Only called from processLoop. +func (p *PubSub) announce(topic string, sub bool) { + subopt := &pb.RPC_SubOpts{ + Topicid: &topic, + Subscribe: &sub, } out := rpcWithSubs(subopt) @@ -196,23 +236,29 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { } } +// notifySubs sends a given message to all corresponding subscribbers. +// Only called from processLoop. func (p *PubSub) notifySubs(msg *pb.Message) { for _, topic := range msg.GetTopicIDs() { - subch, ok := p.myTopics[topic] - if ok { - subch <- &Message{msg} + subs := p.myTopics[topic] + for f := range subs { + f.ch <- &Message{msg} } } } +// seenMessage returns whether we already saw this message before func (p *PubSub) seenMessage(id string) bool { return p.seenMessages.Has(id) } +// markSeen marks a message as seen such that seenMessage returns `true' for the given id func (p *PubSub) markSeen(id string) { p.seenMessages.Add(id) } +// subscribedToMessage returns whether we are subscribed to one of the topics +// of a given message func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { for _, t := range msg.GetTopicIDs() { if _, ok := p.myTopics[t]; ok { @@ -253,6 +299,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return nil } +// msgID returns a unique ID of the passed Message func msgID(pmsg *pb.Message) string { return string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) } @@ -303,59 +350,49 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { return nil } -type addSub struct { +type addSubReq struct { topic string - sub bool - resp chan chan *Message + resp chan *Subscription } -func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) { - return p.SubscribeComplicated(&pb.TopicDescriptor{ - Name: proto.String(topic), - }) +// Subscribe returns a new Subscription for the given topic +func (p *PubSub) Subscribe(topic string) (*Subscription, error) { + td := pb.TopicDescriptor{Name: &topic} + + return p.SubscribeByTopicDescriptor(&td) +} + +// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor +func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscription, error) { + if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { + return nil, fmt.Errorf("auth mode not yet supported") + } + + if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { + return nil, fmt.Errorf("encryption mode not yet supported") + } + + out := make(chan *Subscription, 1) + p.addSub <- &addSubReq{ + topic: td.GetName(), + resp: out, + } + + return <-out, nil } type topicReq struct { resp chan []string } +// GetTopics returns the topics this node is subscribed to func (p *PubSub) GetTopics() []string { out := make(chan []string, 1) p.getTopics <- &topicReq{resp: out} return <-out } -func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) { - if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { - return nil, fmt.Errorf("Auth method not yet supported") - } - - if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { - return nil, fmt.Errorf("Encryption method not yet supported") - } - - resp := make(chan chan *Message) - p.addSub <- &addSub{ - topic: td.GetName(), - resp: resp, - sub: true, - } - - outch := <-resp - if outch == nil { - return nil, fmt.Errorf("error, duplicate subscription") - } - - return outch, nil -} - -func (p *PubSub) Unsub(topic string) { - p.addSub <- &addSub{ - topic: topic, - sub: false, - } -} - +// Publish publishes data under the given topic func (p *PubSub) Publish(topic string, data []byte) error { seqno := make([]byte, 8) binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano())) @@ -376,6 +413,7 @@ type listPeerReq struct { topic string } +// ListPeers returns a list of peers we are connected to. func (p *PubSub) ListPeers(topic string) []peer.ID { out := make(chan []peer.ID) p.getPeers <- &listPeerReq{ diff --git a/floodsub_test.go b/floodsub_test.go index dc87cc2..18885ef 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -10,11 +10,12 @@ import ( "time" host "github.com/libp2p/go-libp2p-host" + netutil "github.com/libp2p/go-libp2p-netutil" peer "github.com/libp2p/go-libp2p-peer" - netutil "github.com/libp2p/go-libp2p/p2p/test/util" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" ) -func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []<-chan *Message) { +func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) { data := make([]byte, 16) rand.Read(data) @@ -34,7 +35,8 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host { var out []host.Host for i := 0; i < n; i++ { - h := netutil.GenHostSwarm(t, ctx) + netw := netutil.GenSwarmNetwork(t, ctx) + h := bhost.New(netw) out = append(out, h) } @@ -85,13 +87,14 @@ func getPubsubs(ctx context.Context, hs []host.Host) []*PubSub { return psubs } -func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) { +func assertReceive(t *testing.T, ch *Subscription, exp []byte) { select { - case msg := <-ch: + case msg := <-ch.ch: if !bytes.Equal(msg.GetData(), exp) { t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData())) } case <-time.After(time.Second * 5): + t.Logf("%#v\n", ch) t.Fatal("timed out waiting for message of: ", string(exp)) } } @@ -103,9 +106,9 @@ func TestBasicFloodsub(t *testing.T) { psubs := getPubsubs(ctx, hosts) - var msgs []<-chan *Message + var msgs []*Subscription for _, ps := range psubs { - subch, err := ps.Subscribe(ctx, "foobar") + subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } @@ -125,8 +128,11 @@ func TestBasicFloodsub(t *testing.T) { psubs[owner].Publish("foobar", msg) - for _, resp := range msgs { - got := <-resp + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } @@ -149,13 +155,13 @@ func TestMultihops(t *testing.T) { connect(t, hosts[3], hosts[4]) connect(t, hosts[4], hosts[5]) - var msgChs []<-chan *Message + var subs []*Subscription for i := 1; i < 6; i++ { - ch, err := psubs[i].Subscribe(ctx, "foobar") + ch, err := psubs[i].Subscribe("foobar") if err != nil { t.Fatal(err) } - msgChs = append(msgChs, ch) + subs = append(subs, ch) } time.Sleep(time.Millisecond * 100) @@ -168,7 +174,7 @@ func TestMultihops(t *testing.T) { // last node in the chain should get the message select { - case out := <-msgChs[4]: + case out := <-subs[4].ch: if !bytes.Equal(out.GetData(), msg) { t.Fatal("got wrong data") } @@ -188,12 +194,12 @@ func TestReconnects(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[0], hosts[2]) - A, err := psubs[1].Subscribe(ctx, "cats") + A, err := psubs[1].Subscribe("cats") if err != nil { t.Fatal(err) } - B, err := psubs[2].Subscribe(ctx, "cats") + B, err := psubs[2].Subscribe("cats") if err != nil { t.Fatal(err) } @@ -209,7 +215,7 @@ func TestReconnects(t *testing.T) { assertReceive(t, A, msg) assertReceive(t, B, msg) - psubs[2].Unsub("cats") + B.Cancel() time.Sleep(time.Millisecond * 50) @@ -221,7 +227,7 @@ func TestReconnects(t *testing.T) { assertReceive(t, A, msg2) select { - case _, ok := <-B: + case _, ok := <-B.ch: if ok { t.Fatal("shouldnt have gotten data on this channel") } @@ -229,12 +235,17 @@ func TestReconnects(t *testing.T) { t.Fatal("timed out waiting for B chan to be closed") } - ch2, err := psubs[2].Subscribe(ctx, "cats") + nSubs := len(psubs[2].myTopics["cats"]) + if nSubs > 0 { + t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) + } + + ch2, err := psubs[2].Subscribe("cats") if err != nil { t.Fatal(err) } - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 100) nextmsg := []byte("ifps is kul") err = psubs[0].Publish("cats", nextmsg) @@ -254,7 +265,7 @@ func TestNoConnection(t *testing.T) { psubs := getPubsubs(ctx, hosts) - ch, err := psubs[5].Subscribe(ctx, "foobar") + ch, err := psubs[5].Subscribe("foobar") if err != nil { t.Fatal(err) } @@ -265,7 +276,7 @@ func TestNoConnection(t *testing.T) { } select { - case <-ch: + case <-ch.ch: t.Fatal("shouldnt have gotten a message") case <-time.After(time.Millisecond * 200): } @@ -288,7 +299,7 @@ func TestSelfReceive(t *testing.T) { time.Sleep(time.Millisecond * 10) - ch, err := psub.Subscribe(ctx, "foobar") + ch, err := psub.Subscribe("foobar") if err != nil { t.Fatal(err) } @@ -311,14 +322,14 @@ func TestOneToOne(t *testing.T) { connect(t, hosts[0], hosts[1]) - ch, err := psubs[1].Subscribe(ctx, "foobar") + ch, err := psubs[1].Subscribe("foobar") if err != nil { t.Fatal(err) } time.Sleep(time.Millisecond * 50) - checkMessageRouting(t, "foobar", psubs, []<-chan *Message{ch}) + checkMessageRouting(t, "foobar", psubs, []*Subscription{ch}) } func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) { @@ -362,9 +373,9 @@ func TestTreeTopology(t *testing.T) { [8] -> [9] */ - var chs []<-chan *Message + var chs []*Subscription for _, ps := range psubs { - ch, err := ps.Subscribe(ctx, "fizzbuzz") + ch, err := ps.Subscribe("fizzbuzz") if err != nil { t.Fatal(err) } @@ -404,31 +415,31 @@ func TestSubReporting(t *testing.T) { host := getNetHosts(t, ctx, 1)[0] psub := NewFloodSub(ctx, host) - _, err := psub.Subscribe(ctx, "foo") + fooSub, err := psub.Subscribe("foo") if err != nil { t.Fatal(err) } - _, err = psub.Subscribe(ctx, "bar") + barSub, err := psub.Subscribe("bar") if err != nil { t.Fatal(err) } assertHasTopics(t, psub, "foo", "bar") - _, err = psub.Subscribe(ctx, "baz") + _, err = psub.Subscribe("baz") if err != nil { t.Fatal(err) } assertHasTopics(t, psub, "foo", "bar", "baz") - psub.Unsub("bar") + barSub.Cancel() assertHasTopics(t, psub, "foo", "baz") - psub.Unsub("foo") + fooSub.Cancel() assertHasTopics(t, psub, "baz") - _, err = psub.Subscribe(ctx, "fish") + _, err = psub.Subscribe("fish") if err != nil { t.Fatal(err) } @@ -447,17 +458,39 @@ func TestPeerTopicReporting(t *testing.T) { connect(t, hosts[0], hosts[2]) connect(t, hosts[0], hosts[3]) - psubs[1].Subscribe(ctx, "foo") - psubs[1].Subscribe(ctx, "bar") - psubs[1].Subscribe(ctx, "baz") + _, err := psubs[1].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + _, err = psubs[1].Subscribe("bar") + if err != nil { + t.Fatal(err) + } + _, err = psubs[1].Subscribe("baz") + if err != nil { + t.Fatal(err) + } - psubs[2].Subscribe(ctx, "foo") - psubs[2].Subscribe(ctx, "ipfs") + _, err = psubs[2].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + _, err = psubs[2].Subscribe("ipfs") + if err != nil { + t.Fatal(err) + } - psubs[3].Subscribe(ctx, "baz") - psubs[3].Subscribe(ctx, "ipfs") + _, err = psubs[3].Subscribe("baz") + if err != nil { + t.Fatal(err) + } + _, err = psubs[3].Subscribe("ipfs") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 10) + peers := psubs[0].ListPeers("ipfs") assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID()) @@ -471,17 +504,62 @@ func TestPeerTopicReporting(t *testing.T) { assertPeerList(t, peers, hosts[1].ID()) } +func TestSubscribeMultipleTimes(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + + sub1, err := psubs[0].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + sub2, err := psubs[0].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + + // make sure subscribing is finished by the time we publish + time.Sleep(1 * time.Millisecond) + + psubs[1].Publish("foo", []byte("bar")) + + msg, err := sub1.Next(ctx) + if err != nil { + t.Fatalf("unexpected error: %v.", err) + } + + data := string(msg.GetData()) + + if data != "bar" { + t.Fatalf("data is %s, expected %s.", data, "bar") + } + + msg, err = sub2.Next(ctx) + if err != nil { + t.Fatalf("unexpected error: %v.", err) + } + data = string(msg.GetData()) + + if data != "bar" { + t.Fatalf("data is %s, expected %s.", data, "bar") + } +} + func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) { sort.Sort(peer.IDSlice(peers)) sort.Sort(peer.IDSlice(expected)) if len(peers) != len(expected) { - t.Fatal("mismatch: %s != %s", peers, expected) + t.Fatalf("mismatch: %s != %s", peers, expected) } for i, p := range peers { if expected[i] != p { - t.Fatal("mismatch: %s != %s", peers, expected) + t.Fatalf("mismatch: %s != %s", peers, expected) } } } diff --git a/package.json b/package.json index 6855893..54b9c82 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,12 @@ "hash": "Qmb6UFbVu1grhv5o5KnouvtZ6cqdrjXj6zLejAHWunxgCt", "name": "go-libp2p-host", "version": "1.3.0" + }, + { + "author": "whyrusleeping", + "hash": "QmcDTquYLTYirqj71RRWKUWEEw3nJt11Awzun5ep8kfY7W", + "name": "go-libp2p-netutil", + "version": "0.1.0" } ], "gxVersion": "0.9.0", @@ -37,6 +43,6 @@ "license": "", "name": "floodsub", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "0.8.0" + "version": "0.8.1" } diff --git a/subscription.go b/subscription.go new file mode 100644 index 0000000..d6e930c --- /dev/null +++ b/subscription.go @@ -0,0 +1,33 @@ +package floodsub + +import ( + "context" +) + +type Subscription struct { + topic string + ch chan *Message + cancelCh chan<- *Subscription + err error +} + +func (sub *Subscription) Topic() string { + return sub.topic +} + +func (sub *Subscription) Next(ctx context.Context) (*Message, error) { + select { + case msg, ok := <-sub.ch: + if !ok { + return msg, sub.err + } + + return msg, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (sub *Subscription) Cancel() { + sub.cancelCh <- sub +}