From a8a29fa17e9c5fec5874f1a4b7a2ce8baa6d00b4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 14 Sep 2016 14:12:20 -0700 Subject: [PATCH 01/12] refactor to use multiple feeds --- floodsub.go | 103 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 81 insertions(+), 22 deletions(-) diff --git a/floodsub.go b/floodsub.go index 4a3e8cc..c52d0b3 100644 --- a/floodsub.go +++ b/floodsub.go @@ -39,6 +39,9 @@ type PubSub struct { // getPeers chan *listPeerReq + // + addFeedHook chan *addFeedReq + // a notification channel for incoming streams from other peers newPeers chan inet.Stream @@ -46,7 +49,7 @@ type PubSub struct { peerDead chan peer.ID // The set of topics we are subscribed to - myTopics map[string]chan *Message + myTopics map[string][]*clientFeed // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} @@ -83,7 +86,8 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { getPeers: make(chan *listPeerReq), addSub: make(chan *addSub), getTopics: make(chan *topicReq), - myTopics: make(map[string]chan *Message), + addFeedHook: make(chan *addFeedReq, 32), + myTopics: make(map[string][]*clientFeed), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), seenMessages: timecache.NewTimeCache(time.Second * 30), @@ -114,6 +118,21 @@ func (p *PubSub) processLoop(ctx context.Context) { p.peers[pid] = messages + case req := <-p.addFeedHook: + feeds, ok := p.myTopics[req.topic] + + var out chan *Message + if ok { + out = make(chan *Message, 32) + nfeed := &clientFeed{ + out: out, + ctx: req.ctx, + } + + p.myTopics[req.topic] = append(feeds, nfeed) + } + + req.resp <- out case pid := <-p.peerDead: ch, ok := p.peers[pid] if ok { @@ -170,23 +189,21 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { Subscribe: &sub.sub, } - ch, ok := p.myTopics[sub.topic] + feeds, ok := p.myTopics[sub.topic] if sub.sub { if ok { - // we don't allow multiple subs per topic at this point - sub.resp <- nil return } - resp := make(chan *Message, 16) - p.myTopics[sub.topic] = resp - sub.resp <- resp + p.myTopics[sub.topic] = nil } else { if !ok { return } - close(ch) + for _, f := range feeds { + close(f.out) + } delete(p.myTopics, sub.topic) } @@ -198,9 +215,26 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { func (p *PubSub) notifySubs(msg *pb.Message) { for _, topic := range msg.GetTopicIDs() { - subch, ok := p.myTopics[topic] - if ok { - subch <- &Message{msg} + var cleanup bool + feeds := p.myTopics[topic] + for _, f := range feeds { + select { + case f.out <- &Message{msg}: + case <-f.ctx.Done(): + close(f.out) + f.out = nil + cleanup = true + } + } + + if cleanup { + out := make([]*clientFeed, 0, len(feeds)) + for _, f := range feeds { + if f.out != nil { + out = append(out, f) + } + } + p.myTopics[topic] = out } } } @@ -310,9 +344,15 @@ type addSub struct { } func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) { - return p.SubscribeComplicated(&pb.TopicDescriptor{ + err := p.AddTopicSubscription(&pb.TopicDescriptor{ Name: proto.String(topic), }) + + if err != nil { + return nil, err + } + + return p.GetFeed(ctx, topic) } type topicReq struct { @@ -325,28 +365,47 @@ func (p *PubSub) GetTopics() []string { return <-out } -func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) { +func (p *PubSub) AddTopicSubscription(td *pb.TopicDescriptor) error { if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { - return nil, fmt.Errorf("Auth method not yet supported") + return fmt.Errorf("Auth method not yet supported") } if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { - return nil, fmt.Errorf("Encryption method not yet supported") + return fmt.Errorf("Encryption method not yet supported") } - resp := make(chan chan *Message) p.addSub <- &addSub{ topic: td.GetName(), - resp: resp, sub: true, } - outch := <-resp - if outch == nil { - return nil, fmt.Errorf("error, duplicate subscription") + return nil +} + +type addFeedReq struct { + ctx context.Context + topic string + resp chan chan *Message +} + +type clientFeed struct { + out chan *Message + ctx context.Context +} + +func (p *PubSub) GetFeed(ctx context.Context, topic string) (<-chan *Message, error) { + out := make(chan chan *Message, 1) + p.addFeedHook <- &addFeedReq{ + ctx: ctx, + topic: topic, + resp: out, } - return outch, nil + resp := <-out + if resp == nil { + return nil, fmt.Errorf("not subscribed to topic %s", topic) + } + return resp, nil } func (p *PubSub) Unsub(topic string) { From 822640a482e9171dc5ba1340177bb4a4c5e89631 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 20 Oct 2016 01:01:06 +0200 Subject: [PATCH 02/12] multiple subscriptions prototype --- floodsub.go | 209 +++++++++++++++++++---------------------------- floodsub_test.go | 69 +++++++++------- subscription.go | 26 ++++++ 3 files changed, 149 insertions(+), 155 deletions(-) create mode 100644 subscription.go diff --git a/floodsub.go b/floodsub.go index c52d0b3..cdedce3 100644 --- a/floodsub.go +++ b/floodsub.go @@ -31,16 +31,16 @@ type PubSub struct { publish chan *Message // addSub is a control channel for us to add and remove subscriptions - addSub chan *addSub + addSub chan *addSubReq - // + // get list of topics we are subscribed to getTopics chan *topicReq - // + // get chan of peers we are connected to getPeers chan *listPeerReq - // - addFeedHook chan *addFeedReq + // send subscription here to cancel it + cancelCh chan *Subscription // a notification channel for incoming streams from other peers newPeers chan inet.Stream @@ -49,7 +49,7 @@ type PubSub struct { peerDead chan peer.ID // The set of topics we are subscribed to - myTopics map[string][]*clientFeed + myTopics map[string]map[*Subscription]struct{} // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} @@ -83,11 +83,11 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { publish: make(chan *Message), newPeers: make(chan inet.Stream), peerDead: make(chan peer.ID), + cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), - addSub: make(chan *addSub), + addSub: make(chan *addSubReq), getTopics: make(chan *topicReq), - addFeedHook: make(chan *addFeedReq, 32), - myTopics: make(map[string][]*clientFeed), + myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), seenMessages: timecache.NewTimeCache(time.Second * 30), @@ -118,21 +118,6 @@ func (p *PubSub) processLoop(ctx context.Context) { p.peers[pid] = messages - case req := <-p.addFeedHook: - feeds, ok := p.myTopics[req.topic] - - var out chan *Message - if ok { - out = make(chan *Message, 32) - nfeed := &clientFeed{ - out: out, - ctx: req.ctx, - } - - p.myTopics[req.topic] = append(feeds, nfeed) - } - - req.resp <- out case pid := <-p.peerDead: ch, ok := p.peers[pid] if ok { @@ -145,12 +130,16 @@ func (p *PubSub) processLoop(ctx context.Context) { } case treq := <-p.getTopics: var out []string - for t := range p.myTopics { - out = append(out, t) + for t, subs := range p.myTopics { + if len(subs) > 0 { + out = append(out, t) + } } treq.resp <- out + case sub := <-p.cancelCh: + p.handleRemoveSubscription(sub) case sub := <-p.addSub: - p.handleSubscriptionChange(sub) + p.handleAddSubscription(sub) case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -183,28 +172,51 @@ func (p *PubSub) processLoop(ctx context.Context) { } } -func (p *PubSub) handleSubscriptionChange(sub *addSub) { - subopt := &pb.RPC_SubOpts{ - Topicid: &sub.topic, - Subscribe: &sub.sub, +func (p *PubSub) handleRemoveSubscription(sub *Subscription) { + subs := p.myTopics[sub.topic] + + if subs == nil { + return } - feeds, ok := p.myTopics[sub.topic] - if sub.sub { - if ok { - return - } + sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") + close(sub.ch) + delete(subs, sub) - p.myTopics[sub.topic] = nil - } else { - if !ok { - return - } + if len(subs) == 0 { + p.announce(sub.topic, false) + } +} - for _, f := range feeds { - close(f.out) - } - delete(p.myTopics, sub.topic) +func (p *PubSub) handleAddSubscription(req *addSubReq) { + subs := p.myTopics[req.topic] + + // announce we want this topic + if len(subs) == 0 { + p.announce(req.topic, true) + } + + // make new if not there + if subs == nil { + p.myTopics[req.topic] = make(map[*Subscription]struct{}) + subs = p.myTopics[req.topic] + } + + sub := &Subscription{ + ch: make(chan *Message, 32), + topic: req.topic, + cancelCh: p.cancelCh, + } + + p.myTopics[sub.topic][sub] = struct{}{} + + req.resp <- sub +} + +func (p *PubSub) announce(topic string, sub bool) { + subopt := &pb.RPC_SubOpts{ + Topicid: &topic, + Subscribe: &sub, } out := rpcWithSubs(subopt) @@ -215,26 +227,9 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { func (p *PubSub) notifySubs(msg *pb.Message) { for _, topic := range msg.GetTopicIDs() { - var cleanup bool - feeds := p.myTopics[topic] - for _, f := range feeds { - select { - case f.out <- &Message{msg}: - case <-f.ctx.Done(): - close(f.out) - f.out = nil - cleanup = true - } - } - - if cleanup { - out := make([]*clientFeed, 0, len(feeds)) - for _, f := range feeds { - if f.out != nil { - out = append(out, f) - } - } - p.myTopics[topic] = out + subs := p.myTopics[topic] + for f := range subs { + f.ch <- &Message{msg} } } } @@ -337,22 +332,36 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { return nil } -type addSub struct { +type addSubReq struct { topic string - sub bool - resp chan chan *Message + resp chan *Subscription } -func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) { - err := p.AddTopicSubscription(&pb.TopicDescriptor{ +func (p *PubSub) Subscribe(topic string) (*Subscription, error) { + td := &pb.TopicDescriptor{ Name: proto.String(topic), - }) - - if err != nil { - return nil, err } - return p.GetFeed(ctx, topic) + if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { + return nil, fmt.Errorf("Auth method not yet supported") + } + + if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { + return nil, fmt.Errorf("Encryption method not yet supported") + } + + out := make(chan *Subscription, 1) + p.addSub <- &addSubReq{ + topic: topic, + resp: out, + } + + resp := <-out + if resp == nil { + return nil, fmt.Errorf("not subscribed to topic %s", topic) + } + + return resp, nil } type topicReq struct { @@ -365,56 +374,6 @@ func (p *PubSub) GetTopics() []string { return <-out } -func (p *PubSub) AddTopicSubscription(td *pb.TopicDescriptor) error { - if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { - return fmt.Errorf("Auth method not yet supported") - } - - if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { - return fmt.Errorf("Encryption method not yet supported") - } - - p.addSub <- &addSub{ - topic: td.GetName(), - sub: true, - } - - return nil -} - -type addFeedReq struct { - ctx context.Context - topic string - resp chan chan *Message -} - -type clientFeed struct { - out chan *Message - ctx context.Context -} - -func (p *PubSub) GetFeed(ctx context.Context, topic string) (<-chan *Message, error) { - out := make(chan chan *Message, 1) - p.addFeedHook <- &addFeedReq{ - ctx: ctx, - topic: topic, - resp: out, - } - - resp := <-out - if resp == nil { - return nil, fmt.Errorf("not subscribed to topic %s", topic) - } - return resp, nil -} - -func (p *PubSub) Unsub(topic string) { - p.addSub <- &addSub{ - topic: topic, - sub: false, - } -} - func (p *PubSub) Publish(topic string, data []byte) error { seqno := make([]byte, 8) binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano())) diff --git a/floodsub_test.go b/floodsub_test.go index dc87cc2..bac78bc 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -14,7 +14,7 @@ import ( netutil "github.com/libp2p/go-libp2p/p2p/test/util" ) -func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []<-chan *Message) { +func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) { data := make([]byte, 16) rand.Read(data) @@ -85,13 +85,14 @@ func getPubsubs(ctx context.Context, hs []host.Host) []*PubSub { return psubs } -func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) { +func assertReceive(t *testing.T, ch *Subscription, exp []byte) { select { - case msg := <-ch: + case msg := <-ch.ch: if !bytes.Equal(msg.GetData(), exp) { t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData())) } case <-time.After(time.Second * 5): + t.Logf("%#v\n", ch) t.Fatal("timed out waiting for message of: ", string(exp)) } } @@ -103,9 +104,9 @@ func TestBasicFloodsub(t *testing.T) { psubs := getPubsubs(ctx, hosts) - var msgs []<-chan *Message + var msgs []*Subscription for _, ps := range psubs { - subch, err := ps.Subscribe(ctx, "foobar") + subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } @@ -125,8 +126,11 @@ func TestBasicFloodsub(t *testing.T) { psubs[owner].Publish("foobar", msg) - for _, resp := range msgs { - got := <-resp + for _, sub := range msgs { + got, err := sub.Next() + if err != nil { + t.Fatal(sub.err) + } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } @@ -149,13 +153,13 @@ func TestMultihops(t *testing.T) { connect(t, hosts[3], hosts[4]) connect(t, hosts[4], hosts[5]) - var msgChs []<-chan *Message + var subs []*Subscription for i := 1; i < 6; i++ { - ch, err := psubs[i].Subscribe(ctx, "foobar") + ch, err := psubs[i].Subscribe("foobar") if err != nil { t.Fatal(err) } - msgChs = append(msgChs, ch) + subs = append(subs, ch) } time.Sleep(time.Millisecond * 100) @@ -168,7 +172,7 @@ func TestMultihops(t *testing.T) { // last node in the chain should get the message select { - case out := <-msgChs[4]: + case out := <-subs[4].ch: if !bytes.Equal(out.GetData(), msg) { t.Fatal("got wrong data") } @@ -188,12 +192,12 @@ func TestReconnects(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[0], hosts[2]) - A, err := psubs[1].Subscribe(ctx, "cats") + A, err := psubs[1].Subscribe("cats") if err != nil { t.Fatal(err) } - B, err := psubs[2].Subscribe(ctx, "cats") + B, err := psubs[2].Subscribe("cats") if err != nil { t.Fatal(err) } @@ -209,7 +213,7 @@ func TestReconnects(t *testing.T) { assertReceive(t, A, msg) assertReceive(t, B, msg) - psubs[2].Unsub("cats") + B.Cancel() time.Sleep(time.Millisecond * 50) @@ -221,7 +225,7 @@ func TestReconnects(t *testing.T) { assertReceive(t, A, msg2) select { - case _, ok := <-B: + case _, ok := <-B.ch: if ok { t.Fatal("shouldnt have gotten data on this channel") } @@ -229,12 +233,17 @@ func TestReconnects(t *testing.T) { t.Fatal("timed out waiting for B chan to be closed") } - ch2, err := psubs[2].Subscribe(ctx, "cats") + nSubs := len(psubs[2].myTopics["cats"]) + if nSubs > 0 { + t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) + } + + ch2, err := psubs[2].Subscribe("cats") if err != nil { t.Fatal(err) } - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 100) nextmsg := []byte("ifps is kul") err = psubs[0].Publish("cats", nextmsg) @@ -254,7 +263,7 @@ func TestNoConnection(t *testing.T) { psubs := getPubsubs(ctx, hosts) - ch, err := psubs[5].Subscribe(ctx, "foobar") + ch, err := psubs[5].Subscribe("foobar") if err != nil { t.Fatal(err) } @@ -265,7 +274,7 @@ func TestNoConnection(t *testing.T) { } select { - case <-ch: + case <-ch.ch: t.Fatal("shouldnt have gotten a message") case <-time.After(time.Millisecond * 200): } @@ -288,7 +297,7 @@ func TestSelfReceive(t *testing.T) { time.Sleep(time.Millisecond * 10) - ch, err := psub.Subscribe(ctx, "foobar") + ch, err := psub.Subscribe("foobar") if err != nil { t.Fatal(err) } @@ -311,14 +320,14 @@ func TestOneToOne(t *testing.T) { connect(t, hosts[0], hosts[1]) - ch, err := psubs[1].Subscribe(ctx, "foobar") + ch, err := psubs[1].Subscribe("foobar") if err != nil { t.Fatal(err) } time.Sleep(time.Millisecond * 50) - checkMessageRouting(t, "foobar", psubs, []<-chan *Message{ch}) + checkMessageRouting(t, "foobar", psubs, []*Subscription{ch}) } func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) { @@ -362,9 +371,9 @@ func TestTreeTopology(t *testing.T) { [8] -> [9] */ - var chs []<-chan *Message + var chs []*Subscription for _, ps := range psubs { - ch, err := ps.Subscribe(ctx, "fizzbuzz") + ch, err := ps.Subscribe("fizzbuzz") if err != nil { t.Fatal(err) } @@ -404,31 +413,31 @@ func TestSubReporting(t *testing.T) { host := getNetHosts(t, ctx, 1)[0] psub := NewFloodSub(ctx, host) - _, err := psub.Subscribe(ctx, "foo") + fooSub, err := psub.Subscribe("foo") if err != nil { t.Fatal(err) } - _, err = psub.Subscribe(ctx, "bar") + barSub, err := psub.Subscribe("bar") if err != nil { t.Fatal(err) } assertHasTopics(t, psub, "foo", "bar") - _, err = psub.Subscribe(ctx, "baz") + _, err = psub.Subscribe("baz") if err != nil { t.Fatal(err) } assertHasTopics(t, psub, "foo", "bar", "baz") - psub.Unsub("bar") + barSub.Cancel() assertHasTopics(t, psub, "foo", "baz") - psub.Unsub("foo") + fooSub.Cancel() assertHasTopics(t, psub, "baz") - _, err = psub.Subscribe(ctx, "fish") + _, err = psub.Subscribe("fish") if err != nil { t.Fatal(err) } diff --git a/subscription.go b/subscription.go new file mode 100644 index 0000000..6fd01fb --- /dev/null +++ b/subscription.go @@ -0,0 +1,26 @@ +package floodsub + +type Subscription struct { + topic string + ch chan *Message + cancelCh chan<- *Subscription + err error +} + +func (sub *Subscription) Topic() string { + return sub.topic +} + +func (sub *Subscription) Next() (*Message, error) { + msg, ok := <-sub.ch + + if !ok { + return msg, sub.err + } + + return msg, nil +} + +func (sub *Subscription) Cancel() { + sub.cancelCh <- sub +} From b71e3adbd7cc7d24a9193b57ca21c85b090ecfc4 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 20 Oct 2016 01:10:29 +0200 Subject: [PATCH 03/12] first cleanup wave --- floodsub.go | 21 ++---------- floodsub_test.go | 83 +++++++++++++----------------------------------- 2 files changed, 24 insertions(+), 80 deletions(-) diff --git a/floodsub.go b/floodsub.go index cdedce3..ac6438b 100644 --- a/floodsub.go +++ b/floodsub.go @@ -337,31 +337,14 @@ type addSubReq struct { resp chan *Subscription } -func (p *PubSub) Subscribe(topic string) (*Subscription, error) { - td := &pb.TopicDescriptor{ - Name: proto.String(topic), - } - - if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { - return nil, fmt.Errorf("Auth method not yet supported") - } - - if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { - return nil, fmt.Errorf("Encryption method not yet supported") - } - +func (p *PubSub) Subscribe(topic string) *Subscription { out := make(chan *Subscription, 1) p.addSub <- &addSubReq{ topic: topic, resp: out, } - resp := <-out - if resp == nil { - return nil, fmt.Errorf("not subscribed to topic %s", topic) - } - - return resp, nil + return <-out } type topicReq struct { diff --git a/floodsub_test.go b/floodsub_test.go index bac78bc..93fa6b7 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -106,10 +106,7 @@ func TestBasicFloodsub(t *testing.T) { var msgs []*Subscription for _, ps := range psubs { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + subch := ps.Subscribe("foobar") msgs = append(msgs, subch) } @@ -155,10 +152,7 @@ func TestMultihops(t *testing.T) { var subs []*Subscription for i := 1; i < 6; i++ { - ch, err := psubs[i].Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psubs[i].Subscribe("foobar") subs = append(subs, ch) } @@ -192,20 +186,14 @@ func TestReconnects(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[0], hosts[2]) - A, err := psubs[1].Subscribe("cats") - if err != nil { - t.Fatal(err) - } + A := psubs[1].Subscribe("cats") - B, err := psubs[2].Subscribe("cats") - if err != nil { - t.Fatal(err) - } + B := psubs[2].Subscribe("cats") time.Sleep(time.Millisecond * 100) msg := []byte("apples and oranges") - err = psubs[0].Publish("cats", msg) + err := psubs[0].Publish("cats", msg) if err != nil { t.Fatal(err) } @@ -238,10 +226,7 @@ func TestReconnects(t *testing.T) { t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) } - ch2, err := psubs[2].Subscribe("cats") - if err != nil { - t.Fatal(err) - } + ch2 := psubs[2].Subscribe("cats") time.Sleep(time.Millisecond * 100) @@ -263,12 +248,9 @@ func TestNoConnection(t *testing.T) { psubs := getPubsubs(ctx, hosts) - ch, err := psubs[5].Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psubs[5].Subscribe("foobar") - err = psubs[0].Publish("foobar", []byte("TESTING")) + err := psubs[0].Publish("foobar", []byte("TESTING")) if err != nil { t.Fatal(err) } @@ -297,10 +279,7 @@ func TestSelfReceive(t *testing.T) { time.Sleep(time.Millisecond * 10) - ch, err := psub.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psub.Subscribe("foobar") msg2 := []byte("goodbye world") err = psub.Publish("foobar", msg2) @@ -320,10 +299,7 @@ func TestOneToOne(t *testing.T) { connect(t, hosts[0], hosts[1]) - ch, err := psubs[1].Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psubs[1].Subscribe("foobar") time.Sleep(time.Millisecond * 50) @@ -373,10 +349,7 @@ func TestTreeTopology(t *testing.T) { var chs []*Subscription for _, ps := range psubs { - ch, err := ps.Subscribe("fizzbuzz") - if err != nil { - t.Fatal(err) - } + ch := ps.Subscribe("fizzbuzz") chs = append(chs, ch) } @@ -413,22 +386,13 @@ func TestSubReporting(t *testing.T) { host := getNetHosts(t, ctx, 1)[0] psub := NewFloodSub(ctx, host) - fooSub, err := psub.Subscribe("foo") - if err != nil { - t.Fatal(err) - } + fooSub := psub.Subscribe("foo") - barSub, err := psub.Subscribe("bar") - if err != nil { - t.Fatal(err) - } + barSub := psub.Subscribe("bar") assertHasTopics(t, psub, "foo", "bar") - _, err = psub.Subscribe("baz") - if err != nil { - t.Fatal(err) - } + _ = psub.Subscribe("baz") assertHasTopics(t, psub, "foo", "bar", "baz") @@ -437,10 +401,7 @@ func TestSubReporting(t *testing.T) { fooSub.Cancel() assertHasTopics(t, psub, "baz") - _, err = psub.Subscribe("fish") - if err != nil { - t.Fatal(err) - } + _ = psub.Subscribe("fish") assertHasTopics(t, psub, "baz", "fish") } @@ -456,15 +417,15 @@ func TestPeerTopicReporting(t *testing.T) { connect(t, hosts[0], hosts[2]) connect(t, hosts[0], hosts[3]) - psubs[1].Subscribe(ctx, "foo") - psubs[1].Subscribe(ctx, "bar") - psubs[1].Subscribe(ctx, "baz") + psubs[1].Subscribe("foo") + psubs[1].Subscribe("bar") + psubs[1].Subscribe("baz") - psubs[2].Subscribe(ctx, "foo") - psubs[2].Subscribe(ctx, "ipfs") + psubs[2].Subscribe("foo") + psubs[2].Subscribe("ipfs") - psubs[3].Subscribe(ctx, "baz") - psubs[3].Subscribe(ctx, "ipfs") + psubs[3].Subscribe("baz") + psubs[3].Subscribe("ipfs") time.Sleep(time.Millisecond * 10) peers := psubs[0].ListPeers("ipfs") From 3a30ab4c17e73eb9b2bffec8c065d34e655bc8e3 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 20 Oct 2016 13:23:38 +0200 Subject: [PATCH 04/12] comments --- floodsub.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/floodsub.go b/floodsub.go index ac6438b..40f530b 100644 --- a/floodsub.go +++ b/floodsub.go @@ -75,6 +75,7 @@ type RPC struct { from peer.ID } +// NewFloodSub returns a new FloodSub management object func NewFloodSub(ctx context.Context, h host.Host) *PubSub { ps := &PubSub{ host: h, @@ -101,6 +102,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { return ps } +// processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { for { select { @@ -172,6 +174,10 @@ func (p *PubSub) processLoop(ctx context.Context) { } } +// handleRemoveSubscription removes Subscription sub from bookeeping. +// If this was the last Subscription for a given topic, it will also announce +// that this node is not subscribing to this topic anymore. +// Only called from processLoop. func (p *PubSub) handleRemoveSubscription(sub *Subscription) { subs := p.myTopics[sub.topic] @@ -188,6 +194,10 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { } } +// handleAddSubscription adds a Subscription for a particular topic. If it is +// the first Subscription for the topic, it will announce that this node +// subscribes to the topic. +// Only called from processLoop. func (p *PubSub) handleAddSubscription(req *addSubReq) { subs := p.myTopics[req.topic] @@ -213,6 +223,8 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { req.resp <- sub } +// announce announces whether or not this node is interested in a given topic +// Only called from processLoop. func (p *PubSub) announce(topic string, sub bool) { subopt := &pb.RPC_SubOpts{ Topicid: &topic, @@ -225,6 +237,8 @@ func (p *PubSub) announce(topic string, sub bool) { } } +// notifySubs sends a given message to all corresponding subscribbers. +// Only called from processLoop. func (p *PubSub) notifySubs(msg *pb.Message) { for _, topic := range msg.GetTopicIDs() { subs := p.myTopics[topic] @@ -234,14 +248,18 @@ func (p *PubSub) notifySubs(msg *pb.Message) { } } +// seenMessage returns whether we already saw this message before func (p *PubSub) seenMessage(id string) bool { return p.seenMessages.Has(id) } +// markSeen marks a message as seen such that seenMessage returns `true' for the given id func (p *PubSub) markSeen(id string) { p.seenMessages.Add(id) } +// subscribedToMessage returns whether we are subscribed to one of the topics +// of a given message func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { for _, t := range msg.GetTopicIDs() { if _, ok := p.myTopics[t]; ok { @@ -282,6 +300,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return nil } +// msgID returns a unique ID of the passed Message func msgID(pmsg *pb.Message) string { return string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) } @@ -337,6 +356,7 @@ type addSubReq struct { resp chan *Subscription } +// Subscribe returns a new Subscription for the given topic func (p *PubSub) Subscribe(topic string) *Subscription { out := make(chan *Subscription, 1) p.addSub <- &addSubReq{ @@ -351,12 +371,14 @@ type topicReq struct { resp chan []string } +// GetTopics returns the topics this node is subscribed to func (p *PubSub) GetTopics() []string { out := make(chan []string, 1) p.getTopics <- &topicReq{resp: out} return <-out } +// Publish publishes data under the given topic func (p *PubSub) Publish(topic string, data []byte) error { seqno := make([]byte, 8) binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano())) @@ -377,6 +399,7 @@ type listPeerReq struct { topic string } +// ListPeers returns a list of peers we are connected to. func (p *PubSub) ListPeers(topic string) []peer.ID { out := make(chan []peer.ID) p.getPeers <- &listPeerReq{ From c9b2c6c8fd982f599304237c67e22662703f2e14 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 20 Oct 2016 13:46:52 +0200 Subject: [PATCH 05/12] make go vet happy (t.Fatal was provided a formatting string) --- floodsub_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 93fa6b7..d386c7a 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -446,12 +446,12 @@ func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) { sort.Sort(peer.IDSlice(expected)) if len(peers) != len(expected) { - t.Fatal("mismatch: %s != %s", peers, expected) + t.Fatalf("mismatch: %s != %s", peers, expected) } for i, p := range peers { if expected[i] != p { - t.Fatal("mismatch: %s != %s", peers, expected) + t.Fatalf("mismatch: %s != %s", peers, expected) } } } From 1c9a5765266d3903b6a0da4f2a93ff3e6362da1b Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Fri, 11 Nov 2016 16:22:47 +0100 Subject: [PATCH 06/12] Test, SubscribeByTopicDescriptor and minor improvement --- floodsub.go | 20 ++++++++++++++++---- floodsub_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/floodsub.go b/floodsub.go index 40f530b..b70c5c7 100644 --- a/floodsub.go +++ b/floodsub.go @@ -132,10 +132,8 @@ func (p *PubSub) processLoop(ctx context.Context) { } case treq := <-p.getTopics: var out []string - for t, subs := range p.myTopics { - if len(subs) > 0 { - out = append(out, t) - } + for t := range p.myTopics { + out = append(out, t) } treq.resp <- out case sub := <-p.cancelCh: @@ -190,6 +188,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { delete(subs, sub) if len(subs) == 0 { + delete(p.myTopics, sub.topic) p.announce(sub.topic, false) } } @@ -367,6 +366,19 @@ func (p *PubSub) Subscribe(topic string) *Subscription { return <-out } +// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor +func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscription, error) { + if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { + return nil, fmt.Errorf("auth mode not yet supported") + } + + if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { + return nil, fmt.Errorf("encryption mode not yet supported") + } + + return p.Subscribe(td.GetName()), nil +} + type topicReq struct { resp chan []string } diff --git a/floodsub_test.go b/floodsub_test.go index d386c7a..371cca5 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -441,6 +441,45 @@ func TestPeerTopicReporting(t *testing.T) { assertPeerList(t, peers, hosts[1].ID()) } +func TestSubscribeMultipleTimes(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + + sub1 := psubs[0].Subscribe("foo") + sub2 := psubs[0].Subscribe("foo") + + // make sure subscribing is finished by the time we publish + time.Sleep(1 * time.Millisecond) + + psubs[1].Publish("foo", []byte("bar")) + + msg, err := sub1.Next() + if err != nil { + t.Fatalf("unexpected error: %v.", err) + } + + data := string(msg.GetData()) + + if data != "bar" { + t.Fatalf("data is %s, expected %s.", data, "bar") + } + + msg, err = sub2.Next() + if err != nil { + t.Fatalf("unexpected error: %v.", err) + } + data = string(msg.GetData()) + + if data != "bar" { + t.Fatalf("data is %s, expected %s.", data, "bar") + } +} + func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) { sort.Sort(peer.IDSlice(peers)) sort.Sort(peer.IDSlice(expected)) From ae48a15d7bf94357d4efd66ec9a6748ea061672a Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Sat, 12 Nov 2016 00:47:53 +0100 Subject: [PATCH 07/12] make Subscribe go through SubscribeByTopicDescriptor --- floodsub.go | 18 ++++---- floodsub_test.go | 116 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 101 insertions(+), 33 deletions(-) diff --git a/floodsub.go b/floodsub.go index b70c5c7..0d7dec5 100644 --- a/floodsub.go +++ b/floodsub.go @@ -356,14 +356,10 @@ type addSubReq struct { } // Subscribe returns a new Subscription for the given topic -func (p *PubSub) Subscribe(topic string) *Subscription { - out := make(chan *Subscription, 1) - p.addSub <- &addSubReq{ - topic: topic, - resp: out, - } +func (p *PubSub) Subscribe(topic string) (*Subscription, error) { + td := pb.TopicDescriptor{Name: &topic} - return <-out + return p.SubscribeByTopicDescriptor(&td) } // SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor @@ -376,7 +372,13 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscripti return nil, fmt.Errorf("encryption mode not yet supported") } - return p.Subscribe(td.GetName()), nil + out := make(chan *Subscription, 1) + p.addSub <- &addSubReq{ + topic: td.GetName(), + resp: out, + } + + return <-out, nil } type topicReq struct { diff --git a/floodsub_test.go b/floodsub_test.go index 371cca5..83c114c 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -106,7 +106,10 @@ func TestBasicFloodsub(t *testing.T) { var msgs []*Subscription for _, ps := range psubs { - subch := ps.Subscribe("foobar") + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } msgs = append(msgs, subch) } @@ -152,7 +155,10 @@ func TestMultihops(t *testing.T) { var subs []*Subscription for i := 1; i < 6; i++ { - ch := psubs[i].Subscribe("foobar") + ch, err := psubs[i].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } subs = append(subs, ch) } @@ -186,14 +192,20 @@ func TestReconnects(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[0], hosts[2]) - A := psubs[1].Subscribe("cats") + A, err := psubs[1].Subscribe("cats") + if err != nil { + t.Fatal(err) + } - B := psubs[2].Subscribe("cats") + B, err := psubs[2].Subscribe("cats") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 100) msg := []byte("apples and oranges") - err := psubs[0].Publish("cats", msg) + err = psubs[0].Publish("cats", msg) if err != nil { t.Fatal(err) } @@ -226,7 +238,10 @@ func TestReconnects(t *testing.T) { t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) } - ch2 := psubs[2].Subscribe("cats") + ch2, err := psubs[2].Subscribe("cats") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 100) @@ -248,9 +263,12 @@ func TestNoConnection(t *testing.T) { psubs := getPubsubs(ctx, hosts) - ch := psubs[5].Subscribe("foobar") + ch, err := psubs[5].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } - err := psubs[0].Publish("foobar", []byte("TESTING")) + err = psubs[0].Publish("foobar", []byte("TESTING")) if err != nil { t.Fatal(err) } @@ -279,7 +297,10 @@ func TestSelfReceive(t *testing.T) { time.Sleep(time.Millisecond * 10) - ch := psub.Subscribe("foobar") + ch, err := psub.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } msg2 := []byte("goodbye world") err = psub.Publish("foobar", msg2) @@ -299,7 +320,10 @@ func TestOneToOne(t *testing.T) { connect(t, hosts[0], hosts[1]) - ch := psubs[1].Subscribe("foobar") + ch, err := psubs[1].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 50) @@ -349,7 +373,10 @@ func TestTreeTopology(t *testing.T) { var chs []*Subscription for _, ps := range psubs { - ch := ps.Subscribe("fizzbuzz") + ch, err := ps.Subscribe("fizzbuzz") + if err != nil { + t.Fatal(err) + } chs = append(chs, ch) } @@ -386,13 +413,22 @@ func TestSubReporting(t *testing.T) { host := getNetHosts(t, ctx, 1)[0] psub := NewFloodSub(ctx, host) - fooSub := psub.Subscribe("foo") + fooSub, err := psub.Subscribe("foo") + if err != nil { + t.Fatal(err) + } - barSub := psub.Subscribe("bar") + barSub, err := psub.Subscribe("bar") + if err != nil { + t.Fatal(err) + } assertHasTopics(t, psub, "foo", "bar") - _ = psub.Subscribe("baz") + _, err = psub.Subscribe("baz") + if err != nil { + t.Fatal(err) + } assertHasTopics(t, psub, "foo", "bar", "baz") @@ -401,7 +437,10 @@ func TestSubReporting(t *testing.T) { fooSub.Cancel() assertHasTopics(t, psub, "baz") - _ = psub.Subscribe("fish") + _, err = psub.Subscribe("fish") + if err != nil { + t.Fatal(err) + } assertHasTopics(t, psub, "baz", "fish") } @@ -417,17 +456,38 @@ func TestPeerTopicReporting(t *testing.T) { connect(t, hosts[0], hosts[2]) connect(t, hosts[0], hosts[3]) - psubs[1].Subscribe("foo") - psubs[1].Subscribe("bar") - psubs[1].Subscribe("baz") + _, err := psubs[1].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + _, err = psubs[1].Subscribe("bar") + if err != nil { + t.Fatal(err) + } + _, err = psubs[1].Subscribe("baz") + if err != nil { + t.Fatal(err) + } - psubs[2].Subscribe("foo") - psubs[2].Subscribe("ipfs") + _, err = psubs[2].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + _, err = psubs[2].Subscribe("ipfs") + if err != nil { + t.Fatal(err) + } - psubs[3].Subscribe("baz") - psubs[3].Subscribe("ipfs") + _, err = psubs[3].Subscribe("baz") + if err != nil { + t.Fatal(err) + } + _, err = psubs[3].Subscribe("ipfs") + if err != nil { + t.Fatal(err) + } - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 1) peers := psubs[0].ListPeers("ipfs") assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID()) @@ -450,8 +510,14 @@ func TestSubscribeMultipleTimes(t *testing.T) { connect(t, hosts[0], hosts[1]) - sub1 := psubs[0].Subscribe("foo") - sub2 := psubs[0].Subscribe("foo") + sub1, err := psubs[0].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + sub2, err := psubs[0].Subscribe("foo") + if err != nil { + t.Fatal(err) + } // make sure subscribing is finished by the time we publish time.Sleep(1 * time.Millisecond) From 539e4b6b45370be4ce2f7ba5ad42db149f961fac Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Sun, 13 Nov 2016 01:31:02 +0100 Subject: [PATCH 08/12] longer sleep in test so race becomes less likely --- floodsub_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/floodsub_test.go b/floodsub_test.go index 83c114c..6ab4690 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -487,7 +487,8 @@ func TestPeerTopicReporting(t *testing.T) { t.Fatal(err) } - time.Sleep(time.Millisecond * 1) + time.Sleep(time.Millisecond * 10) + peers := psubs[0].ListPeers("ipfs") assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID()) From 25b8aad61fe4dad01811c69ed3350b59597fdd3f Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 17 Nov 2016 11:27:57 +0100 Subject: [PATCH 09/12] add ctx to sub.Next for cancellation --- floodsub_test.go | 6 +++--- subscription.go | 19 +++++++++++++------ 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 6ab4690..f99a88a 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -127,7 +127,7 @@ func TestBasicFloodsub(t *testing.T) { psubs[owner].Publish("foobar", msg) for _, sub := range msgs { - got, err := sub.Next() + got, err := sub.Next(ctx) if err != nil { t.Fatal(sub.err) } @@ -525,7 +525,7 @@ func TestSubscribeMultipleTimes(t *testing.T) { psubs[1].Publish("foo", []byte("bar")) - msg, err := sub1.Next() + msg, err := sub1.Next(ctx) if err != nil { t.Fatalf("unexpected error: %v.", err) } @@ -536,7 +536,7 @@ func TestSubscribeMultipleTimes(t *testing.T) { t.Fatalf("data is %s, expected %s.", data, "bar") } - msg, err = sub2.Next() + msg, err = sub2.Next(ctx) if err != nil { t.Fatalf("unexpected error: %v.", err) } diff --git a/subscription.go b/subscription.go index 6fd01fb..d6e930c 100644 --- a/subscription.go +++ b/subscription.go @@ -1,5 +1,9 @@ package floodsub +import ( + "context" +) + type Subscription struct { topic string ch chan *Message @@ -11,14 +15,17 @@ func (sub *Subscription) Topic() string { return sub.topic } -func (sub *Subscription) Next() (*Message, error) { - msg, ok := <-sub.ch +func (sub *Subscription) Next(ctx context.Context) (*Message, error) { + select { + case msg, ok := <-sub.ch: + if !ok { + return msg, sub.err + } - if !ok { - return msg, sub.err + return msg, nil + case <-ctx.Done(): + return nil, ctx.Err() } - - return msg, nil } func (sub *Subscription) Cancel() { From 32d57f202a75acdbefc0588b625b02274d6b97ba Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Sat, 19 Nov 2016 23:13:16 +0100 Subject: [PATCH 10/12] update to go-libp2p-netutil --- floodsub_test.go | 6 ++++-- package.json | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index f99a88a..18885ef 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -10,8 +10,9 @@ import ( "time" host "github.com/libp2p/go-libp2p-host" + netutil "github.com/libp2p/go-libp2p-netutil" peer "github.com/libp2p/go-libp2p-peer" - netutil "github.com/libp2p/go-libp2p/p2p/test/util" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" ) func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) { @@ -34,7 +35,8 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host { var out []host.Host for i := 0; i < n; i++ { - h := netutil.GenHostSwarm(t, ctx) + netw := netutil.GenSwarmNetwork(t, ctx) + h := bhost.New(netw) out = append(out, h) } diff --git a/package.json b/package.json index 6855893..afb3ca4 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,12 @@ "hash": "Qmb6UFbVu1grhv5o5KnouvtZ6cqdrjXj6zLejAHWunxgCt", "name": "go-libp2p-host", "version": "1.3.0" + }, + { + "author": "whyrusleeping", + "hash": "QmbUDXBMqSe4VCRgTMeAfyBh1T3GBnELEBXobZDL7cjVgs", + "name": "go-libp2p-netutil", + "version": "0.1.1" } ], "gxVersion": "0.9.0", From be649ae2c3f2c9732209253a5754edeac56ffef4 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Sat, 19 Nov 2016 23:38:19 +0100 Subject: [PATCH 11/12] downgrade libp2p-netutil to 0.1.0 --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index afb3ca4..53ffaaa 100644 --- a/package.json +++ b/package.json @@ -33,9 +33,9 @@ }, { "author": "whyrusleeping", - "hash": "QmbUDXBMqSe4VCRgTMeAfyBh1T3GBnELEBXobZDL7cjVgs", + "hash": "QmcDTquYLTYirqj71RRWKUWEEw3nJt11Awzun5ep8kfY7W", "name": "go-libp2p-netutil", - "version": "0.1.1" + "version": "0.1.0" } ], "gxVersion": "0.9.0", From 4e943ef7f8a99af6295951285feeffdec76396d6 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Mon, 21 Nov 2016 16:36:28 +0100 Subject: [PATCH 12/12] gx publish 0.8.1 --- .gx/lastpubver | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index 9042917..b1d543b 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -0.8.0: QmWiLbk7eE1jGePDAuS26E2A9bMK3e3PMH3dcSeRY3MEBR +0.8.1: QmRJs5veT3gnuYpLAagC3NbzixbkgwjSdUXTKfh3hMo6XM diff --git a/package.json b/package.json index 53ffaaa..54b9c82 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,6 @@ "license": "", "name": "floodsub", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "0.8.0" + "version": "0.8.1" }