diff --git a/comm.go b/comm.go index d0d40da..ba8a12d 100644 --- a/comm.go +++ b/comm.go @@ -19,7 +19,7 @@ import ( // get the initial RPC containing all of our subscriptions to send to new peers func (p *PubSub) getHelloPacket() *RPC { var rpc RPC - for t := range p.myTopics { + for t := range p.mySubs { as := &pb.RPC_SubOpts{ Topicid: proto.String(t), Subscribe: proto.Bool(true), diff --git a/discovery.go b/discovery.go new file mode 100644 index 0000000..b69f209 --- /dev/null +++ b/discovery.go @@ -0,0 +1,336 @@ +package pubsub + +import ( + "context" + "math/rand" + "time" + + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + discimpl "github.com/libp2p/go-libp2p-discovery" +) + +var ( + // poll interval + + // DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling + DiscoveryPollInitialDelay = 0 * time.Millisecond + // DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the + // more peers are needed for any topic + DiscoveryPollInterval = 1 * time.Second +) + +type DiscoverOpt func(*discoverOptions) error + +type discoverOptions struct { + connFactory BackoffConnectorFactory + opts []discovery.Option +} + +func defaultDiscoverOptions() *discoverOptions { + rng := rand.New(rand.NewSource(rand.Int63())) + minBackoff, maxBackoff := time.Second*10, time.Hour + cacheSize := 100 + dialTimeout := time.Minute * 2 + discoverOpts := &discoverOptions{ + connFactory: func(host host.Host) (*discimpl.BackoffConnector, error) { + backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rng) + return discimpl.NewBackoffConnector(host, cacheSize, dialTimeout, backoff) + }, + } + + return discoverOpts +} + +// discover represents the discovery pipeline. +// The discovery pipeline handles advertising and discovery of peers +type discover struct { + p *PubSub + + // discovery assists in discovering and advertising peers for a topic + discovery discovery.Discovery + + // advertising tracks which topics are being advertised + advertising map[string]context.CancelFunc + + // discoverQ handles continuing peer discovery + discoverQ chan *discoverReq + + // ongoing tracks ongoing discovery requests + ongoing map[string]struct{} + + // done handles completion of a discovery request + done chan string + + // connector handles connecting to new peers found via discovery + connector *discimpl.BackoffConnector + + // options are the set of options to be used to complete struct construction in Start + options *discoverOptions +} + +// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size. +// The router ultimately decides the whether it is ready or not, the given size is just a suggestion. +func MinTopicSize(size int) RouterReady { + return func(rt PubSubRouter, topic string) (bool, error) { + return rt.EnoughPeers(topic, size), nil + } +} + +// Start attaches the discovery pipeline to a pubsub instance, initializes discovery and starts event loop +func (d *discover) Start(p *PubSub, opts ...DiscoverOpt) error { + if d.discovery == nil || p == nil { + return nil + } + + d.p = p + d.advertising = make(map[string]context.CancelFunc) + d.discoverQ = make(chan *discoverReq, 32) + d.ongoing = make(map[string]struct{}) + d.done = make(chan string) + + conn, err := d.options.connFactory(p.host) + if err != nil { + return err + } + d.connector = conn + + go d.discoverLoop() + go d.pollTimer() + + return nil +} + +func (d *discover) pollTimer() { + select { + case <-time.After(DiscoveryPollInitialDelay): + case <-d.p.ctx.Done(): + return + } + + select { + case d.p.eval <- d.requestDiscovery: + case <-d.p.ctx.Done(): + return + } + + ticker := time.NewTicker(DiscoveryPollInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + select { + case d.p.eval <- d.requestDiscovery: + case <-d.p.ctx.Done(): + return + } + case <-d.p.ctx.Done(): + return + } + } +} + +func (d *discover) requestDiscovery() { + for t := range d.p.myTopics { + if !d.p.rt.EnoughPeers(t, 0) { + d.discoverQ <- &discoverReq{topic: t, done: make(chan struct{}, 1)} + } + } +} + +func (d *discover) discoverLoop() { + for { + select { + case discover := <-d.discoverQ: + topic := discover.topic + + if _, ok := d.ongoing[topic]; ok { + discover.done <- struct{}{} + continue + } + + d.ongoing[topic] = struct{}{} + + go func() { + d.handleDiscovery(d.p.ctx, topic, discover.opts) + select { + case d.done <- topic: + case <-d.p.ctx.Done(): + } + discover.done <- struct{}{} + }() + case topic := <-d.done: + delete(d.ongoing, topic) + case <-d.p.ctx.Done(): + return + } + } +} + +// Advertise advertises this node's interest in a topic to a discovery service. Advertise is not thread-safe. +func (d *discover) Advertise(topic string) { + if d.discovery == nil { + return + } + + advertisingCtx, cancel := context.WithCancel(d.p.ctx) + + if _, ok := d.advertising[topic]; ok { + cancel() + return + } + d.advertising[topic] = cancel + + go func() { + next, err := d.discovery.Advertise(advertisingCtx, topic) + if err != nil { + log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error()) + } + + t := time.NewTimer(next) + for { + select { + case <-t.C: + next, err = d.discovery.Advertise(advertisingCtx, topic) + if err != nil { + log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error()) + } + t.Reset(next) + case <-advertisingCtx.Done(): + t.Stop() + return + } + } + }() +} + +// StopAdvertise stops advertising this node's interest in a topic. StopAdvertise is not thread-safe. +func (d *discover) StopAdvertise(topic string) { + if d.discovery == nil { + return + } + + if advertiseCancel, ok := d.advertising[topic]; ok { + advertiseCancel() + delete(d.advertising, topic) + } +} + +// Discover searches for additional peers interested in a given topic +func (d *discover) Discover(topic string, opts ...discovery.Option) { + if d.discovery == nil { + return + } + + d.discoverQ <- &discoverReq{topic, opts, make(chan struct{}, 1)} +} + +// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise. +func (d *discover) Bootstrap(ctx context.Context, topic string, ready RouterReady, opts ...discovery.Option) bool { + if d.discovery == nil { + return true + } + + t := time.NewTimer(time.Hour) + if !t.Stop() { + <-t.C + } + + for { + // Check if ready for publishing + bootstrapped := make(chan bool, 1) + select { + case d.p.eval <- func() { + done, _ := ready(d.p.rt, topic) + bootstrapped <- done + }: + if <-bootstrapped { + return true + } + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + + // If not ready discover more peers + disc := &discoverReq{topic, opts, make(chan struct{}, 1)} + select { + case d.discoverQ <- disc: + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + + select { + case <-disc.done: + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + + t.Reset(time.Millisecond * 100) + select { + case <-t.C: + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + } +} + +func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) { + discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + peerCh, err := d.discovery.FindPeers(discoverCtx, topic, opts...) + if err != nil { + log.Debugf("error finding peers for topic %s: %v", topic, err) + return + } + + d.connector.Connect(ctx, peerCh) +} + +type discoverReq struct { + topic string + opts []discovery.Option + done chan struct{} +} + +type pubSubDiscovery struct { + discovery.Discovery + opts []discovery.Option +} + +func (d *pubSubDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + return d.Discovery.Advertise(ctx, "floodsub:"+ns, append(opts, d.opts...)...) +} + +func (d *pubSubDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + return d.Discovery.FindPeers(ctx, "floodsub:"+ns, append(opts, d.opts...)...) +} + +// WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem +func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt { + return func(d *discoverOptions) error { + d.opts = opts + return nil + } +} + +// BackoffConnectorFactory creates a BackoffConnector that is attached to a given host +type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error) + +// WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers +func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt { + return func(d *discoverOptions) error { + d.connFactory = connFactory + return nil + } +} diff --git a/discovery_test.go b/discovery_test.go new file mode 100644 index 0000000..59e5181 --- /dev/null +++ b/discovery_test.go @@ -0,0 +1,292 @@ +package pubsub + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" +) + +type mockDiscoveryServer struct { + mx sync.Mutex + db map[string]map[peer.ID]*discoveryRegistration +} + +type discoveryRegistration struct { + info peer.AddrInfo + ttl time.Duration +} + +func newDiscoveryServer() *mockDiscoveryServer { + return &mockDiscoveryServer{ + db: make(map[string]map[peer.ID]*discoveryRegistration), + } +} + +func (s *mockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time.Duration) (time.Duration, error) { + s.mx.Lock() + defer s.mx.Unlock() + + peers, ok := s.db[ns] + if !ok { + peers = make(map[peer.ID]*discoveryRegistration) + s.db[ns] = peers + } + peers[info.ID] = &discoveryRegistration{info, ttl} + return ttl, nil +} + +func (s *mockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrInfo, error) { + s.mx.Lock() + defer s.mx.Unlock() + + peers, ok := s.db[ns] + if !ok || len(peers) == 0 { + emptyCh := make(chan peer.AddrInfo) + close(emptyCh) + return emptyCh, nil + } + + count := len(peers) + if count > limit { + count = limit + } + ch := make(chan peer.AddrInfo, count) + numSent := 0 + for _, reg := range peers { + if numSent == count { + break + } + numSent++ + ch <- reg.info + } + close(ch) + + return ch, nil +} + +func (s *mockDiscoveryServer) hasPeerRecord(ns string, pid peer.ID) bool { + s.mx.Lock() + defer s.mx.Unlock() + + if peers, ok := s.db[ns]; ok { + _, ok := peers[pid] + return ok + } + return false +} + +type mockDiscoveryClient struct { + host host.Host + server *mockDiscoveryServer +} + +func (d *mockDiscoveryClient) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return 0, err + } + + return d.server.Advertise(ns, *host.InfoFromHost(d.host), options.Ttl) +} + +func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return nil, err + } + + return d.server.FindPeers(ns, options.Limit) +} + +func TestSimpleDiscovery(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup Discovery server and pubsub clients + const numHosts = 20 + const topic = "foobar" + + server := newDiscoveryServer() + discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(1 * time.Minute)} + + hosts := getNetHosts(t, ctx, numHosts) + psubs := make([]*PubSub, numHosts) + topicHandlers := make([]*Topic, numHosts) + + for i, h := range hosts { + disc := &mockDiscoveryClient{h, server} + ps := getPubsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...))) + psubs[i] = ps + topicHandlers[i], _ = ps.Join(topic) + } + + // Subscribe with all but one pubsub instance + msgs := make([]*Subscription, numHosts) + for i, th := range topicHandlers[1:] { + subch, err := th.Subscribe() + if err != nil { + t.Fatal(err) + } + + msgs[i+1] = subch + } + + // Wait for the advertisements to go through then check that they did + for { + server.mx.Lock() + numPeers := len(server.db["floodsub:foobar"]) + server.mx.Unlock() + if numPeers == numHosts-1 { + break + } else { + time.Sleep(time.Millisecond * 100) + } + } + + for i, h := range hosts[1:] { + if !server.hasPeerRecord("floodsub:"+topic, h.ID()) { + t.Fatalf("Server did not register host %d with ID: %s", i+1, h.ID().Pretty()) + } + } + + // Try subscribing followed by publishing a single message + subch, err := topicHandlers[0].Subscribe() + if err != nil { + t.Fatal(err) + } + msgs[0] = subch + + msg := []byte("first message") + if err := topicHandlers[0].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil { + t.Fatal(err) + } + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + // Try random peers sending messages and make sure they are received + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(1))); err != nil { + t.Fatal(err) + } + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup Discovery server and pubsub clients + partitionSize := GossipSubDlo - 1 + numHosts := partitionSize * 2 + const ttl = 1 * time.Minute + + const topic = "foobar" + + server1, server2 := newDiscoveryServer(), newDiscoveryServer() + discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)} + + // Put the pubsub clients into two partitions + hosts := getNetHosts(t, ctx, numHosts) + psubs := make([]*PubSub, numHosts) + topicHandlers := make([]*Topic, numHosts) + + for i, h := range hosts { + s := server1 + if i >= partitionSize { + s = server2 + } + disc := &mockDiscoveryClient{h, s} + ps := getGossipsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...))) + psubs[i] = ps + topicHandlers[i], _ = ps.Join(topic) + } + + msgs := make([]*Subscription, numHosts) + for i, th := range topicHandlers { + subch, err := th.Subscribe() + if err != nil { + t.Fatal(err) + } + + msgs[i] = subch + } + + // Wait for network to finish forming then join the partitions via discovery + for _, ps := range psubs { + waitUntilGossipsubMeshCount(ps, topic, partitionSize-1) + } + + for i := 0; i < partitionSize; i++ { + if _, err := server1.Advertise("floodsub:"+topic, *host.InfoFromHost(hosts[i+partitionSize]), ttl); err != nil { + t.Fatal(err) + } + } + + // test the mesh + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(numHosts) + + if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil { + t.Fatal(err) + } + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) { + done := false + doneCh := make(chan bool, 1) + rt := ps.rt.(*GossipSubRouter) + for !done { + ps.eval <- func() { + doneCh <- len(rt.mesh[topic]) == count + } + done = <-doneCh + if !done { + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/floodsub.go b/floodsub.go index 0d57c99..f70a0a7 100644 --- a/floodsub.go +++ b/floodsub.go @@ -11,7 +11,8 @@ import ( ) const ( - FloodSubID = protocol.ID("/floodsub/1.0.0") + FloodSubID = protocol.ID("/floodsub/1.0.0") + FloodSubTopicSearchSize = 5 ) // NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps. @@ -44,6 +45,24 @@ func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {} func (fs *FloodSubRouter) RemovePeer(peer.ID) {} +func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { + // check all peers in the topic + tmap, ok := fs.p.topics[topic] + if !ok { + return false + } + + if suggested == 0 { + suggested = FloodSubTopicSearchSize + } + + if len(tmap) >= suggested { + return true + } + + return false +} + func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { diff --git a/floodsub_test.go b/floodsub_test.go index 465114b..8182a47 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -254,7 +254,7 @@ func TestReconnects(t *testing.T) { t.Fatal("timed out waiting for B chan to be closed") } - nSubs := len(psubs[2].myTopics["cats"]) + nSubs := len(psubs[2].mySubs["cats"]) if nSubs > 0 { t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) } @@ -1064,341 +1064,6 @@ func TestImproperlySignedMessageRejected(t *testing.T) { } } -func TestSubscriptionJoinNotification(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const numLateSubscribers = 10 - const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) - - psubs := getPubsubs(ctx, hosts) - - msgs := make([]*Subscription, numHosts) - subPeersFound := make([]map[peer.ID]struct{}, numHosts) - - // Have some peers subscribe earlier than other peers. - // This exercises whether we get subscription notifications from - // existing peers. - for i, ps := range psubs[numLateSubscribers:] { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } - - msgs[i] = subch - } - - connectAll(t, hosts) - - time.Sleep(time.Millisecond * 100) - - // Have the rest subscribe - for i, ps := range psubs[:numLateSubscribers] { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } - - msgs[i+numLateSubscribers] = subch - } - - wg := sync.WaitGroup{} - for i := 0; i < numHosts; i++ { - peersFound := make(map[peer.ID]struct{}) - subPeersFound[i] = peersFound - sub := msgs[i] - wg.Add(1) - go func(peersFound map[peer.ID]struct{}) { - defer wg.Done() - for len(peersFound) < numHosts-1 { - event, err := sub.NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerJoin { - peersFound[event.Peer] = struct{}{} - } - } - }(peersFound) - } - - wg.Wait() - for _, peersFound := range subPeersFound { - if len(peersFound) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - } -} - -func TestSubscriptionLeaveNotification(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) - - psubs := getPubsubs(ctx, hosts) - - msgs := make([]*Subscription, numHosts) - subPeersFound := make([]map[peer.ID]struct{}, numHosts) - - // Subscribe all peers and wait until they've all been found - for i, ps := range psubs { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } - - msgs[i] = subch - } - - connectAll(t, hosts) - - time.Sleep(time.Millisecond * 100) - - wg := sync.WaitGroup{} - for i := 0; i < numHosts; i++ { - peersFound := make(map[peer.ID]struct{}) - subPeersFound[i] = peersFound - sub := msgs[i] - wg.Add(1) - go func(peersFound map[peer.ID]struct{}) { - defer wg.Done() - for len(peersFound) < numHosts-1 { - event, err := sub.NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerJoin { - peersFound[event.Peer] = struct{}{} - } - } - }(peersFound) - } - - wg.Wait() - for _, peersFound := range subPeersFound { - if len(peersFound) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - } - - // Test removing peers and verifying that they cause events - msgs[1].Cancel() - hosts[2].Close() - psubs[0].BlacklistPeer(hosts[3].ID()) - - leavingPeers := make(map[peer.ID]struct{}) - for len(leavingPeers) < 3 { - event, err := msgs[0].NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerLeave { - leavingPeers[event.Peer] = struct{}{} - } - } - - if _, ok := leavingPeers[hosts[1].ID()]; !ok { - t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event")) - } - if _, ok := leavingPeers[hosts[2].ID()]; !ok { - t.Fatal(fmt.Errorf("closing host did not cause a leave event")) - } - if _, ok := leavingPeers[hosts[3].ID()]; !ok { - t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event")) - } -} - -func TestSubscriptionManyNotifications(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const topic = "foobar" - - const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) - - psubs := getPubsubs(ctx, hosts) - - msgs := make([]*Subscription, numHosts) - subPeersFound := make([]map[peer.ID]struct{}, numHosts) - - // Subscribe all peers except one and wait until they've all been found - for i := 1; i < numHosts; i++ { - subch, err := psubs[i].Subscribe(topic) - if err != nil { - t.Fatal(err) - } - - msgs[i] = subch - } - - connectAll(t, hosts) - - time.Sleep(time.Millisecond * 100) - - wg := sync.WaitGroup{} - for i := 1; i < numHosts; i++ { - peersFound := make(map[peer.ID]struct{}) - subPeersFound[i] = peersFound - sub := msgs[i] - wg.Add(1) - go func(peersFound map[peer.ID]struct{}) { - defer wg.Done() - for len(peersFound) < numHosts-2 { - event, err := sub.NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerJoin { - peersFound[event.Peer] = struct{}{} - } - } - }(peersFound) - } - - wg.Wait() - for _, peersFound := range subPeersFound[1:] { - if len(peersFound) != numHosts-2 { - t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2) - } - } - - // Wait for remaining peer to find other peers - for len(psubs[0].ListPeers(topic)) < numHosts-1 { - time.Sleep(time.Millisecond * 100) - } - - // Subscribe the remaining peer and check that all the events came through - sub, err := psubs[0].Subscribe(topic) - if err != nil { - t.Fatal(err) - } - - msgs[0] = sub - - peerState := readAllQueuedEvents(ctx, t, sub) - - if len(peerState) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - - for _, e := range peerState { - if e != PeerJoin { - t.Fatal("non Join event occurred") - } - } - - // Unsubscribe all peers except one and check that all the events came through - for i := 1; i < numHosts; i++ { - msgs[i].Cancel() - } - - // Wait for remaining peer to disconnect from the other peers - for len(psubs[0].ListPeers(topic)) != 0 { - time.Sleep(time.Millisecond * 100) - } - - peerState = readAllQueuedEvents(ctx, t, sub) - - if len(peerState) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - - for _, e := range peerState { - if e != PeerLeave { - t.Fatal("non Leave event occurred") - } - } -} - -func TestSubscriptionNotificationSubUnSub(t *testing.T) { - // Resubscribe and Unsubscribe a peers and check the state for consistency - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const topic = "foobar" - - const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) - psubs := getPubsubs(ctx, hosts) - - for i := 1; i < numHosts; i++ { - connect(t, hosts[0], hosts[i]) - } - time.Sleep(time.Millisecond * 100) - - notifSubThenUnSub(ctx, t, topic, psubs) -} - -func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string, - psubs []*PubSub) { - - ps := psubs[0] - msgs := make([]*Subscription, len(psubs)) - checkSize := len(psubs) - 1 - - // Subscribe all peers to the topic - var err error - for i, ps := range psubs { - msgs[i], err = ps.Subscribe(topic) - if err != nil { - t.Fatal(err) - } - } - - sub := msgs[0] - - // Wait for the primary peer to be connected to the other peers - for len(ps.ListPeers(topic)) < checkSize { - time.Sleep(time.Millisecond * 100) - } - - // Unsubscribe all peers except the primary - for i := 1; i < checkSize+1; i++ { - msgs[i].Cancel() - } - - // Wait for the unsubscribe messages to reach the primary peer - for len(ps.ListPeers(topic)) < 0 { - time.Sleep(time.Millisecond * 100) - } - - // read all available events and verify that there are no events to process - // this is because every peer that joined also left - peerState := readAllQueuedEvents(ctx, t, sub) - - if len(peerState) != 0 { - for p, s := range peerState { - fmt.Println(p, s) - } - t.Fatalf("Received incorrect events. %d extra events", len(peerState)) - } -} - -func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) map[peer.ID]EventType { - peerState := make(map[peer.ID]EventType) - for { - ctx, _ := context.WithTimeout(ctx, time.Millisecond*100) - event, err := sub.NextPeerEvent(ctx) - if err == context.DeadlineExceeded { - break - } else if err != nil { - t.Fatal(err) - } - - e, ok := peerState[event.Peer] - if !ok { - peerState[event.Peer] = event.Type - } else if e != event.Type { - delete(peerState, event.Peer) - } - } - return peerState -} - func TestMessageSender(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go.mod b/go.mod index d3e1404..9171192 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,14 @@ module github.com/libp2p/go-libp2p-pubsub require ( - github.com/gogo/protobuf v1.2.1 + github.com/gogo/protobuf v1.3.1 github.com/hashicorp/golang-lru v0.5.1 github.com/ipfs/go-log v0.0.1 - github.com/libp2p/go-libp2p-blankhost v0.1.1 - github.com/libp2p/go-libp2p-core v0.0.1 - github.com/libp2p/go-libp2p-swarm v0.1.0 - github.com/multiformats/go-multiaddr v0.0.4 + github.com/libp2p/go-libp2p-blankhost v0.1.4 + github.com/libp2p/go-libp2p-core v0.2.4 + github.com/libp2p/go-libp2p-discovery v0.2.0 + github.com/libp2p/go-libp2p-swarm v0.2.2 + github.com/multiformats/go-multiaddr v0.1.1 github.com/multiformats/go-multistream v0.1.0 github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee ) diff --git a/go.sum b/go.sum index 2352c3c..626e219 100644 --- a/go.sum +++ b/go.sum @@ -1,45 +1,80 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= +github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= +github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 h1:A/EVblehb75cUgXA5njHPn0kLAsykn6mJGz7rnmW5W0= +github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= +github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk= github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms= +github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= +github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc= @@ -52,10 +87,14 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -68,41 +107,61 @@ github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOS github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-conn-security-multistream v0.1.0 h1:aqGmto+ttL/uJgX0JtQI0tD21CIEy5eYd1Hlp0juHY0= github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc= +github.com/libp2p/go-eventbus v0.1.0 h1:mlawomSAjjkk97QnYiEmHsLu7E136+2oCWSHRUvMfzQ= +github.com/libp2p/go-eventbus v0.1.0/go.mod h1:vROgu5cs5T7cv7POWlWxBaVLxfSegC5UGQf8A2eEmx4= github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s= github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= -github.com/libp2p/go-libp2p-blankhost v0.1.1 h1:X919sCh+KLqJcNRApj43xCSiQRYqOSI88Fdf55ngf78= github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro= +github.com/libp2p/go-libp2p-blankhost v0.1.4 h1:I96SWjR4rK9irDHcHq3XHN6hawCRTPUADzkJacgZLvk= +github.com/libp2p/go-libp2p-blankhost v0.1.4/go.mod h1:oJF0saYsAXQCSfDq254GMNmLNz6ZTHTOvtF4ZydUvwU= github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= -github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= +github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I= +github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI= +github.com/libp2p/go-libp2p-core v0.2.2/go.mod h1:8fcwTbsG2B+lTgRJ1ICZtiM5GWCWZVoVrLaDRvIRng0= +github.com/libp2p/go-libp2p-core v0.2.4 h1:Et6ykkTwI6PU44tr8qUF9k43vP0aduMNniShAbUJJw8= +github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= +github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY= +github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= github.com/libp2p/go-libp2p-mplex v0.2.1 h1:E1xaJBQnbSiTHGI1gaBKmKhu1TUKkErKJnE8iGvirYI= github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE= -github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUjer50DsY= github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY= -github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E= github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY= -github.com/libp2p/go-libp2p-secio v0.1.0 h1:NNP5KLxuP97sE5Bu3iuwOWyT/dKEGMN5zSLMWdB7GTQ= +github.com/libp2p/go-libp2p-peerstore v0.1.3 h1:wMgajt1uM2tMiqf4M+4qWKVyyFc8SfA+84VV9glZq1M= +github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI= github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8= -github.com/libp2p/go-libp2p-swarm v0.1.0 h1:HrFk2p0awrGEgch9JXK/qp/hfjqQfgNxpLWnCiWPg5s= +github.com/libp2p/go-libp2p-secio v0.2.0 h1:ywzZBsWEEz2KNTn5RtzauEDq5RFEefPsttXYwAWqHng= +github.com/libp2p/go-libp2p-secio v0.2.0/go.mod h1:2JdZepB8J5V9mBp79BmwsaPQhRPNN2NrnB2lKQcdy6g= github.com/libp2p/go-libp2p-swarm v0.1.0/go.mod h1:wQVsCdjsuZoc730CgOvh5ox6K8evllckjebkdiY5ta4= +github.com/libp2p/go-libp2p-swarm v0.2.2 h1:T4hUpgEs2r371PweU3DuH7EOmBIdTBCwWs+FLcgx3bQ= +github.com/libp2p/go-libp2p-swarm v0.2.2/go.mod h1:fvmtQ0T1nErXym1/aa1uJEyN7JzaTNyBcHImCxRpPKU= github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.3 h1:bdij4bKaaND7tCsaXVjRfYkMpvoOeKj9AVQGJllA6jM= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= +github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= +github.com/libp2p/go-libp2p-testing v0.1.0 h1:WaFRj/t3HdMZGNZqnU2pS7pDRBmMeoDx7/HDNpeyT9U= +github.com/libp2p/go-libp2p-testing v0.1.0/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0= github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 h1:PZMS9lhjK9VytzMCW3tWHAXtKXmlURSc3ZdvwEcKCzw= github.com/libp2p/go-libp2p-transport-upgrader v0.1.1/go.mod h1:IEtA6or8JUbsV07qPW4r01GnTenLW4oi3lOPbUMGJJA= github.com/libp2p/go-libp2p-yamux v0.2.0 h1:TSPZ5cMMz/wdoYsye/wU1TE4G3LDGMoeEN0xgnCKU/I= github.com/libp2p/go-libp2p-yamux v0.2.0/go.mod h1:Db2gU+XfLpm6E4rG5uGCFX6uXA8MEXOxFcRoXUODaK8= github.com/libp2p/go-maddr-filter v0.0.4 h1:hx8HIuuwk34KePddrp2mM5ivgPkZ09JH4AvsALRbFUs= github.com/libp2p/go-maddr-filter v0.0.4/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9VImZbj3B9FBH/Q= +github.com/libp2p/go-maddr-filter v0.0.5 h1:CW3AgbMO6vUvT4kf87y4N+0P8KUl2aqLYhrGyDUbLSg= +github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M= github.com/libp2p/go-mplex v0.0.3/go.mod h1:pK5yMLmOoBR1pNCqDlA2GQrdAVTMkqFalaTWe7l4Yd0= github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0= github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= -github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0= github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= +github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA= +github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= +github.com/libp2p/go-openssl v0.0.2/go.mod h1:v8Zw2ijCSWBQi8Pq5GAixw6DbFfa9u6VIYDXnvOXkc0= +github.com/libp2p/go-openssl v0.0.3 h1:wjlG7HvQkt4Fq4cfH33Ivpwp0omaElYEi9z26qaIkIk= +github.com/libp2p/go-openssl v0.0.3/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA= github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4= @@ -110,14 +169,20 @@ github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA2 github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14= github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg= github.com/libp2p/go-stream-muxer-multistream v0.2.0/go.mod h1:j9eyPol/LLRqT+GPLSxvimPhNph4sfYfMoDPd7HkzIc= -github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQtiRQD60/8o= github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc= +github.com/libp2p/go-tcp-transport v0.1.1 h1:yGlqURmqgNA2fvzjSgZNlHcsd/IulAnKM8Ncu+vlqnw= +github.com/libp2p/go-tcp-transport v0.1.1/go.mod h1:3HzGvLbx6etZjnFlERyakbaYPdfjg2pWP97dFZworkY= github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg= github.com/libp2p/go-yamux v1.2.2/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ= @@ -125,28 +190,42 @@ github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+ github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/minio/sha256-simd v0.1.0 h1:U41/2erhAKcmSI14xh/ZTUdBPOzDOIfS93ibzUSl8KM= github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= +github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= +github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4= github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= +github.com/multiformats/go-multiaddr v0.1.0/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= +github.com/multiformats/go-multiaddr v0.1.1 h1:rVAztJYMhCQ7vEFr8FvxW3mS+HF2eY/oPbOMeS0ZDnE= +github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= -github.com/multiformats/go-multiaddr-dns v0.0.2 h1:/Bbsgsy3R6e3jf2qBahzNHzww6usYaZ0NhNH3sqdFS8= github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= -github.com/multiformats/go-multiaddr-fmt v0.0.1 h1:5YjeOIzbX8OTKVaN72aOzGIYW7PnrZrnkDyOfAWRSMA= github.com/multiformats/go-multiaddr-fmt v0.0.1/go.mod h1:aBYjqL4T/7j4Qx+R73XSv/8JsgnRFlf0w2KGLCmXl3Q= +github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= +github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= github.com/multiformats/go-multiaddr-net v0.0.1 h1:76O59E3FavvHqNg7jvzWzsPSW5JSi/ek0E4eiDVbg9g= github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d1tnvM7ihcFwHMSstNAVUU= +github.com/multiformats/go-multiaddr-net v0.1.0 h1:ZepO8Ezwovd+7b5XPPDhQhayk1yt0AJpzQBpq9fejx4= +github.com/multiformats/go-multiaddr-net v0.1.0/go.mod h1:5JNbcfBOP4dnhoZOv10JJVkJO0pCCEf8mTnipAo2UZQ= +github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs/UIi93+uik= github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= +github.com/multiformats/go-multihash v0.0.8 h1:wrYcW5yxSi3dU07n5jnuS5PrNwyHy0zRHGVoUugWvXg= +github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ= github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -158,20 +237,34 @@ github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= @@ -181,22 +274,48 @@ github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1: github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= +github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= +go.opencensus.io v0.22.1 h1:8dP3SGL7MPB94crU3bEPplMPe83FI4EouesJUeFHv50= +go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo= golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= @@ -204,16 +323,39 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= +gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/gossipsub.go b/gossipsub.go index 9b8c3ad..1b2026c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -94,6 +94,35 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) { delete(gs.control, p) } +func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { + // check all peers in the topic + tmap, ok := gs.p.topics[topic] + if !ok { + return false + } + + fsPeers, gsPeers := 0, 0 + // floodsub peers + for p := range tmap { + if gs.peers[p] == FloodSubID { + fsPeers++ + } + } + + // gossipsub peers + gsPeers = len(gs.mesh[topic]) + + if suggested == 0 { + suggested = GossipSubDlo + } + + if fsPeers+gsPeers >= suggested || gsPeers >= GossipSubDhi { + return true + } + + return false +} + func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { ctl := rpc.GetControl() if ctl == nil { diff --git a/gossipsub_test.go b/gossipsub_test.go index 67af2b7..90041cc 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -11,14 +11,18 @@ import ( "github.com/libp2p/go-libp2p-core/host" ) +func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub { + ps, err := NewGossipSub(ctx, h, opts...) + if err != nil { + panic(err) + } + return ps +} + func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { var psubs []*PubSub for _, h := range hs { - ps, err := NewGossipSub(ctx, h, opts...) - if err != nil { - panic(err) - } - psubs = append(psubs, ps) + psubs = append(psubs, getGossipsub(ctx, h, opts...)) } return psubs } diff --git a/pubsub.go b/pubsub.go index 7e34dad..cffb83b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -12,6 +12,7 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -42,6 +43,8 @@ type PubSub struct { val *validation + disc *discover + // incoming messages from other peers incoming chan *RPC @@ -60,6 +63,12 @@ type PubSub struct { // send subscription here to cancel it cancelCh chan *Subscription + // addSub is a channel for us to add a topic + addTopic chan *addTopicReq + + // removeTopic is a topic cancellation channel + rmTopic chan *rmTopicReq + // a notification channel for new peer connections newPeers chan peer.ID @@ -73,7 +82,10 @@ type PubSub struct { peerDead chan peer.ID // The set of topics we are subscribed to - myTopics map[string]map[*Subscription]struct{} + mySubs map[string]map[*Subscription]struct{} + + // The set of topics we are interested in + myTopics map[string]*Topic // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} @@ -120,6 +132,9 @@ type PubSubRouter interface { AddPeer(peer.ID, protocol.ID) // RemovePeer notifies the router that a peer has been disconnected. RemovePeer(peer.ID) + // EnoughPeers returns whether the router needs more peers before it's ready to publish new records. + // Suggested (if greater than 0) is a suggested number of peers that the router should need. + EnoughPeers(topic string, suggested int) bool // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) @@ -158,6 +173,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option ctx: ctx, rt: rt, val: newValidation(), + disc: &discover{}, signID: h.ID(), signKey: h.Peerstore().PrivKey(h.ID()), signStrict: true, @@ -170,12 +186,15 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), + addTopic: make(chan *addTopicReq), + rmTopic: make(chan *rmTopicReq), getTopics: make(chan *topicReq), sendMsg: make(chan *Message, 32), addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), eval: make(chan func()), - myTopics: make(map[string]map[*Subscription]struct{}), + myTopics: make(map[string]*Topic), + mySubs: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), blacklist: NewMapBlacklist(), @@ -195,6 +214,10 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option return nil, fmt.Errorf("strict signature verification enabled but message signing is disabled") } + if err := ps.disc.Start(ps); err != nil { + return nil, err + } + rt.Attach(ps) for _, id := range rt.Protocols() { @@ -263,6 +286,23 @@ func WithBlacklist(b Blacklist) Option { } } +// WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub +func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option { + return func(p *PubSub) error { + discoverOpts := defaultDiscoverOptions() + for _, opt := range opts { + err := opt(discoverOpts) + if err != nil { + return err + } + } + + p.disc.discovery = &pubSubDiscovery{Discovery: d, opts: discoverOpts.opts} + p.disc.options = discoverOpts + return nil + } +} + // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { defer func() { @@ -345,10 +385,14 @@ func (p *PubSub) processLoop(ctx context.Context) { case treq := <-p.getTopics: var out []string - for t := range p.myTopics { + for t := range p.mySubs { out = append(out, t) } treq.resp <- out + case topic := <-p.addTopic: + p.handleAddTopic(topic) + case topic := <-p.rmTopic: + p.handleRemoveTopic(topic) case sub := <-p.cancelCh: p.handleRemoveSubscription(sub) case sub := <-p.addSub: @@ -412,12 +456,47 @@ func (p *PubSub) processLoop(ctx context.Context) { } } +// handleAddTopic adds a tracker for a particular topic. +// Only called from processLoop. +func (p *PubSub) handleAddTopic(req *addTopicReq) { + topic := req.topic + topicID := topic.topic + + t, ok := p.myTopics[topicID] + if ok { + req.resp <- t + return + } + + p.myTopics[topicID] = topic + req.resp <- topic +} + +// handleRemoveTopic removes Topic tracker from bookkeeping. +// Only called from processLoop. +func (p *PubSub) handleRemoveTopic(req *rmTopicReq) { + topic := p.myTopics[req.topic.topic] + + if topic == nil { + req.resp <- nil + return + } + + if len(topic.evtHandlers) == 0 && len(p.mySubs[req.topic.topic]) == 0 { + delete(p.myTopics, topic.topic) + req.resp <- nil + return + } + + req.resp <- fmt.Errorf("cannot close topic: outstanding event handlers or subscriptions") +} + // handleRemoveSubscription removes Subscription sub from bookeeping. // If this was the last Subscription for a given topic, it will also announce // that this node is not subscribing to this topic anymore. // Only called from processLoop. func (p *PubSub) handleRemoveSubscription(sub *Subscription) { - subs := p.myTopics[sub.topic] + subs := p.mySubs[sub.topic] if subs == nil { return @@ -428,7 +507,8 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { delete(subs, sub) if len(subs) == 0 { - delete(p.myTopics, sub.topic) + delete(p.mySubs, sub.topic) + p.disc.StopAdvertise(sub.topic) p.announce(sub.topic, false) p.rt.Leave(sub.topic) } @@ -440,28 +520,23 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { // Only called from processLoop. func (p *PubSub) handleAddSubscription(req *addSubReq) { sub := req.sub - subs := p.myTopics[sub.topic] + subs := p.mySubs[sub.topic] // announce we want this topic if len(subs) == 0 { + p.disc.Advertise(sub.topic) p.announce(sub.topic, true) p.rt.Join(sub.topic) } // make new if not there if subs == nil { - p.myTopics[sub.topic] = make(map[*Subscription]struct{}) - subs = p.myTopics[sub.topic] + p.mySubs[sub.topic] = make(map[*Subscription]struct{}) } - tmap := p.topics[sub.topic] - - for p := range tmap { - sub.evtLog[p] = PeerJoin - } sub.cancelCh = p.cancelCh - p.myTopics[sub.topic][sub] = struct{}{} + p.mySubs[sub.topic][sub] = struct{}{} req.resp <- sub } @@ -489,7 +564,7 @@ func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) { time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) retry := func() { - _, ok := p.myTopics[topic] + _, ok := p.mySubs[topic] if (ok && sub) || (!ok && !sub) { p.doAnnounceRetry(pid, topic, sub) } @@ -525,7 +600,7 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { // Only called from processLoop. func (p *PubSub) notifySubs(msg *Message) { for _, topic := range msg.GetTopicIDs() { - subs := p.myTopics[topic] + subs := p.mySubs[topic] for f := range subs { select { case f.ch <- msg: @@ -559,12 +634,12 @@ func (p *PubSub) markSeen(id string) bool { // subscribedToMessage returns whether we are subscribed to one of the topics // of a given message func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { - if len(p.myTopics) == 0 { + if len(p.mySubs) == 0 { return false } for _, t := range msg.GetTopicIDs() { - if _, ok := p.myTopics[t]; ok { + if _, ok := p.mySubs[t]; ok { return true } } @@ -572,10 +647,8 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { } func (p *PubSub) notifyLeave(topic string, pid peer.ID) { - if subs, ok := p.myTopics[topic]; ok { - for s := range subs { - s.sendNotification(PeerEvent{PeerLeave, pid}) - } + if t, ok := p.myTopics[topic]; ok { + t.sendNotification(PeerEvent{PeerLeave, pid}) } } @@ -591,11 +664,9 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if _, ok = tmap[rpc.from]; !ok { tmap[rpc.from] = struct{}{} - if subs, ok := p.myTopics[t]; ok { + if topic, ok := p.myTopics[t]; ok { peer := rpc.from - for s := range subs { - s.sendNotification(PeerEvent{PeerJoin, peer}) - } + topic.sendNotification(PeerEvent{PeerJoin, peer}) } } } else { @@ -670,6 +741,67 @@ func (p *PubSub) publishMessage(msg *Message) { p.rt.Publish(msg.ReceivedFrom, msg.Message) } +type addTopicReq struct { + topic *Topic + resp chan *Topic +} + +type rmTopicReq struct { + topic *Topic + resp chan error +} + +type TopicOptions struct{} + +type TopicOpt func(t *Topic) error + +// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if +// the Topic handle already exists. +func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) { + t, ok, err := p.tryJoin(topic, opts...) + if err != nil { + return nil, err + } + + if !ok { + return nil, fmt.Errorf("topic already exists") + } + + return t, nil +} + +// tryJoin is an internal function that tries to join a topic +// Returns the topic if it can be created or found +// Returns true if the topic was newly created, false otherwise +// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed +func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) { + t := &Topic{ + p: p, + topic: topic, + evtHandlers: make(map[*TopicEventHandler]struct{}), + } + + for _, opt := range opts { + err := opt(t) + if err != nil { + return nil, false, err + } + } + + resp := make(chan *Topic, 1) + t.p.addTopic <- &addTopicReq{ + topic: t, + resp: resp, + } + returnedTopic := <-resp + + if returnedTopic != t { + return returnedTopic, false, nil + } + + return t, true, nil +} + type addSubReq struct { sub *Subscription resp chan *Subscription @@ -680,6 +812,8 @@ type SubOpt func(sub *Subscription) error // Subscribe returns a new Subscription for the given topic. // Note that subscription is not an instanteneous operation. It may take some time // before the subscription is processed by the pubsub main loop and propagated to our peers. +// +// Deprecated: use pubsub.Join() and topic.Subscribe() instead func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) { td := pb.TopicDescriptor{Name: &topic} @@ -687,6 +821,8 @@ func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) } // SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor. +// +// Deprecated: use pubsub.Join() and topic.Subscribe() instead func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) { if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { return nil, fmt.Errorf("auth mode not yet supported") @@ -696,30 +832,13 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO return nil, fmt.Errorf("encryption mode not yet supported") } - sub := &Subscription{ - topic: td.GetName(), - ctx: p.ctx, - - ch: make(chan *Message, 32), - peerEvtCh: make(chan PeerEvent, 1), - evtLog: make(map[peer.ID]EventType), - evtLogCh: make(chan struct{}, 1), + // ignore whether the topic was newly created or not, since either way we have a valid topic to work with + topic, _, err := p.tryJoin(td.GetName()) + if err != nil { + return nil, err } - for _, opt := range opts { - err := opt(sub) - if err != nil { - return nil, err - } - } - - out := make(chan *Subscription, 1) - p.addSub <- &addSubReq{ - sub: sub, - resp: out, - } - - return <-out, nil + return topic.Subscribe(opts...) } type topicReq struct { @@ -734,24 +853,16 @@ func (p *PubSub) GetTopics() []string { } // Publish publishes data to the given topic. -func (p *PubSub) Publish(topic string, data []byte) error { - seqno := p.nextSeqno() - id := p.host.ID() - m := &pb.Message{ - Data: data, - TopicIDs: []string{topic}, - From: []byte(id), - Seqno: seqno, +// +// Deprecated: use pubsub.Join() and topic.Publish() instead +func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) error { + // ignore whether the topic was newly created or not, since either way we have a valid topic to work with + t, _, err := p.tryJoin(topic) + if err != nil { + return err } - if p.signKey != nil { - m.From = []byte(p.signID) - err := signMessage(p.signID, p.signKey, m) - if err != nil { - return err - } - } - p.publish <- &Message{m, id} - return nil + + return t.Publish(context.TODO(), data, opts...) } func (p *PubSub) nextSeqno() []byte { diff --git a/randomsub.go b/randomsub.go index e4304d2..9435c71 100644 --- a/randomsub.go +++ b/randomsub.go @@ -49,6 +49,41 @@ func (rs *RandomSubRouter) RemovePeer(p peer.ID) { delete(rs.peers, p) } +func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool { + // check all peers in the topic + tmap, ok := rs.p.topics[topic] + if !ok { + return false + } + + fsPeers := 0 + rsPeers := 0 + + // count floodsub and randomsub peers + for p := range tmap { + switch rs.peers[p] { + case FloodSubID: + fsPeers++ + case RandomSubID: + rsPeers++ + } + } + + if suggested == 0 { + suggested = RandomSubD + } + + if fsPeers+rsPeers >= suggested { + return true + } + + if rsPeers >= RandomSubD { + return true + } + + return false +} + func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) { diff --git a/subscription.go b/subscription.go index b3ddf83..3d773e8 100644 --- a/subscription.go +++ b/subscription.go @@ -2,35 +2,19 @@ package pubsub import ( "context" - "github.com/libp2p/go-libp2p-core/peer" - "sync" -) - -type EventType int - -const ( - PeerJoin EventType = iota - PeerLeave ) +// Subscription handles the details of a particular Topic subscription. +// There may be many subscriptions for a given Topic. type Subscription struct { topic string ch chan *Message cancelCh chan<- *Subscription - err error ctx context.Context - - peerEvtCh chan PeerEvent - evtLogMx sync.Mutex - evtLog map[peer.ID]EventType - evtLogCh chan struct{} -} - -type PeerEvent struct { - Type EventType - Peer peer.ID + err error } +// Topic returns the topic string associated with the Subscription func (sub *Subscription) Topic() string { return sub.topic } @@ -49,6 +33,8 @@ func (sub *Subscription) Next(ctx context.Context) (*Message, error) { } } +// Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe +// announcement to the network. func (sub *Subscription) Cancel() { select { case sub.cancelCh <- sub: @@ -59,65 +45,3 @@ func (sub *Subscription) Cancel() { func (sub *Subscription) close() { close(sub.ch) } - -func (sub *Subscription) sendNotification(evt PeerEvent) { - sub.evtLogMx.Lock() - defer sub.evtLogMx.Unlock() - - sub.addToEventLog(evt) -} - -// addToEventLog assumes a lock has been taken to protect the event log -func (sub *Subscription) addToEventLog(evt PeerEvent) { - e, ok := sub.evtLog[evt.Peer] - if !ok { - sub.evtLog[evt.Peer] = evt.Type - // send signal that an event has been added to the event log - select { - case sub.evtLogCh <- struct{}{}: - default: - } - } else if e != evt.Type { - delete(sub.evtLog, evt.Peer) - } -} - -// pullFromEventLog assumes a lock has been taken to protect the event log -func (sub *Subscription) pullFromEventLog() (PeerEvent, bool) { - for k, v := range sub.evtLog { - evt := PeerEvent{Peer: k, Type: v} - delete(sub.evtLog, k) - return evt, true - } - return PeerEvent{}, false -} - -// NextPeerEvent returns the next event regarding subscribed peers -// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. -// Unless a peer both Joins and Leaves before NextPeerEvent emits either event -// all events will eventually be received from NextPeerEvent. -func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { - for { - sub.evtLogMx.Lock() - evt, ok := sub.pullFromEventLog() - if ok { - // make sure an event log signal is available if there are events in the event log - if len(sub.evtLog) > 0 { - select { - case sub.evtLogCh <- struct{}{}: - default: - } - } - sub.evtLogMx.Unlock() - return evt, nil - } - sub.evtLogMx.Unlock() - - select { - case <-sub.evtLogCh: - continue - case <-ctx.Done(): - return PeerEvent{}, ctx.Err() - } - } -} diff --git a/topic.go b/topic.go new file mode 100644 index 0000000..2ccc4a4 --- /dev/null +++ b/topic.go @@ -0,0 +1,254 @@ +package pubsub + +import ( + "context" + "fmt" + "sync" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" + + "github.com/libp2p/go-libp2p-core/peer" +) + +// Topic is the handle for a pubsub topic +type Topic struct { + p *PubSub + topic string + + evtHandlerMux sync.RWMutex + evtHandlers map[*TopicEventHandler]struct{} +} + +// EventHandler creates a handle for topic specific events +// Multiple event handlers may be created and will operate independently of each other +func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) { + h := &TopicEventHandler{ + err: nil, + + evtLog: make(map[peer.ID]EventType), + evtLogCh: make(chan struct{}, 1), + } + + for _, opt := range opts { + err := opt(h) + if err != nil { + return nil, err + } + } + + done := make(chan struct{}, 1) + t.p.eval <- func() { + tmap := t.p.topics[t.topic] + for p := range tmap { + h.evtLog[p] = PeerJoin + } + + t.evtHandlerMux.Lock() + t.evtHandlers[h] = struct{}{} + t.evtHandlerMux.Unlock() + done <- struct{}{} + } + + <-done + + return h, nil +} + +func (t *Topic) sendNotification(evt PeerEvent) { + t.evtHandlerMux.RLock() + defer t.evtHandlerMux.RUnlock() + + for h := range t.evtHandlers { + h.sendNotification(evt) + } +} + +// Subscribe returns a new Subscription for the topic. +// Note that subscription is not an instanteneous operation. It may take some time +// before the subscription is processed by the pubsub main loop and propagated to our peers. +func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { + sub := &Subscription{ + topic: t.topic, + ch: make(chan *Message, 32), + ctx: t.p.ctx, + } + + for _, opt := range opts { + err := opt(sub) + if err != nil { + return nil, err + } + } + + out := make(chan *Subscription, 1) + + t.p.disc.Discover(sub.topic) + + t.p.addSub <- &addSubReq{ + sub: sub, + resp: out, + } + + return <-out, nil +} + +// RouterReady is a function that decides if a router is ready to publish +type RouterReady func(rt PubSubRouter, topic string) (bool, error) + +type PublishOptions struct { + ready RouterReady +} + +type PubOpt func(pub *PublishOptions) error + +// Publish publishes data to topic. +func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error { + seqno := t.p.nextSeqno() + id := t.p.host.ID() + m := &pb.Message{ + Data: data, + TopicIDs: []string{t.topic}, + From: []byte(id), + Seqno: seqno, + } + if t.p.signKey != nil { + m.From = []byte(t.p.signID) + err := signMessage(t.p.signID, t.p.signKey, m) + if err != nil { + return err + } + } + + pub := &PublishOptions{} + for _, opt := range opts { + err := opt(pub) + if err != nil { + return err + } + } + + if pub.ready != nil { + t.p.disc.Bootstrap(ctx, t.topic, pub.ready) + } + + t.p.publish <- &Message{m, id} + + return nil +} + +// WithReadiness returns a publishing option for only publishing when the router is ready. +// This option is not useful unless PubSub is also using WithDiscovery +func WithReadiness(ready RouterReady) PubOpt { + return func(pub *PublishOptions) error { + pub.ready = ready + return nil + } +} + +// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. +// Does not error if the topic is already closed. +func (t *Topic) Close() error { + req := &rmTopicReq{t, make(chan error, 1)} + t.p.rmTopic <- req + return <-req.resp +} + +// ListPeers returns a list of peers we are connected to in the given topic. +func (t *Topic) ListPeers() []peer.ID { + return t.p.ListPeers(t.topic) +} + +type EventType int + +const ( + PeerJoin EventType = iota + PeerLeave +) + +// TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events. +type TopicEventHandler struct { + topic *Topic + err error + + evtLogMx sync.Mutex + evtLog map[peer.ID]EventType + evtLogCh chan struct{} +} + +type TopicEventHandlerOpt func(t *TopicEventHandler) error + +type PeerEvent struct { + Type EventType + Peer peer.ID +} + +// Cancel closes the topic event handler +func (t *TopicEventHandler) Cancel() { + topic := t.topic + t.err = fmt.Errorf("topic event handler cancelled by calling handler.Cancel()") + + topic.evtHandlerMux.Lock() + delete(topic.evtHandlers, t) + t.topic.evtHandlerMux.Unlock() +} + +func (t *TopicEventHandler) sendNotification(evt PeerEvent) { + t.evtLogMx.Lock() + t.addToEventLog(evt) + t.evtLogMx.Unlock() +} + +// addToEventLog assumes a lock has been taken to protect the event log +func (t *TopicEventHandler) addToEventLog(evt PeerEvent) { + e, ok := t.evtLog[evt.Peer] + if !ok { + t.evtLog[evt.Peer] = evt.Type + // send signal that an event has been added to the event log + select { + case t.evtLogCh <- struct{}{}: + default: + } + } else if e != evt.Type { + delete(t.evtLog, evt.Peer) + } +} + +// pullFromEventLog assumes a lock has been taken to protect the event log +func (t *TopicEventHandler) pullFromEventLog() (PeerEvent, bool) { + for k, v := range t.evtLog { + evt := PeerEvent{Peer: k, Type: v} + delete(t.evtLog, k) + return evt, true + } + return PeerEvent{}, false +} + +// NextPeerEvent returns the next event regarding subscribed peers +// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. +// Unless a peer both Joins and Leaves before NextPeerEvent emits either event +// all events will eventually be received from NextPeerEvent. +func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error) { + for { + t.evtLogMx.Lock() + evt, ok := t.pullFromEventLog() + if ok { + // make sure an event log signal is available if there are events in the event log + if len(t.evtLog) > 0 { + select { + case t.evtLogCh <- struct{}{}: + default: + } + } + t.evtLogMx.Unlock() + return evt, nil + } + t.evtLogMx.Unlock() + + select { + case <-t.evtLogCh: + continue + case <-ctx.Done(): + return PeerEvent{}, ctx.Err() + } + } +} diff --git a/topic_test.go b/topic_test.go new file mode 100644 index 0000000..8ea6d1e --- /dev/null +++ b/topic_test.go @@ -0,0 +1,421 @@ +package pubsub + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +func getTopics(psubs []*PubSub, topicID string, opts ...TopicOpt) []*Topic { + topics := make([]*Topic, len(psubs)) + + for i, ps := range psubs { + t, err := ps.Join(topicID, opts...) + if err != nil { + panic(err) + } + topics[i] = t + } + + return topics +} + +func getTopicEvts(topics []*Topic, opts ...TopicEventHandlerOpt) []*TopicEventHandler { + handlers := make([]*TopicEventHandler, len(topics)) + + for i, t := range topics { + h, err := t.EventHandler(opts...) + if err != nil { + panic(err) + } + handlers[i] = h + } + + return handlers +} + +func TestTopicClose(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 1 + topicID := "foobar" + hosts := getNetHosts(t, ctx, numHosts) + ps := getPubsub(ctx, hosts[0]) + + // Try create and cancel topic + topic, err := ps.Join(topicID) + if err != nil { + t.Fatal(err) + } + + if err := topic.Close(); err != nil { + t.Fatal(err) + } + + // Try create and cancel topic while there's an outstanding subscription + topic, err = ps.Join(topicID) + if err != nil { + t.Fatal(err) + } + + sub, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + if err := topic.Close(); err == nil { + t.Fatal("expected an error closing a topic with an open subscription") + } + + // Check if the topic closes properly after canceling the outstanding subscription + sub.Cancel() + time.Sleep(time.Millisecond * 100) + + if err := topic.Close(); err != nil { + t.Fatal(err) + } +} + +func TestSubscriptionJoinNotification(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numLateSubscribers = 10 + const numHosts = 20 + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), "foobar") + evts := getTopicEvts(topics) + + subs := make([]*Subscription, numHosts) + topicPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Have some peers subscribe earlier than other peers. + // This exercises whether we get subscription notifications from + // existing peers. + for i, topic := range topics[numLateSubscribers:] { + subch, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + // Have the rest subscribe + for i, topic := range topics[:numLateSubscribers] { + subch, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i+numLateSubscribers] = subch + } + + wg := sync.WaitGroup{} + for i := 0; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + topicPeersFound[i] = peersFound + evt := evts[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-1 { + event, err := evt.NextPeerEvent(ctx) + if err != nil { + panic(err) + } + if event.Type == PeerJoin { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range topicPeersFound { + if len(peersFound) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + } +} + +func TestSubscriptionLeaveNotification(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 20 + hosts := getNetHosts(t, ctx, numHosts) + psubs := getPubsubs(ctx, hosts) + topics := getTopics(psubs, "foobar") + evts := getTopicEvts(topics) + + subs := make([]*Subscription, numHosts) + topicPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Subscribe all peers and wait until they've all been found + for i, topic := range topics { + subch, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + wg := sync.WaitGroup{} + for i := 0; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + topicPeersFound[i] = peersFound + evt := evts[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-1 { + event, err := evt.NextPeerEvent(ctx) + if err != nil { + panic(err) + } + if event.Type == PeerJoin { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range topicPeersFound { + if len(peersFound) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + } + + // Test removing peers and verifying that they cause events + subs[1].Cancel() + hosts[2].Close() + psubs[0].BlacklistPeer(hosts[3].ID()) + + leavingPeers := make(map[peer.ID]struct{}) + for len(leavingPeers) < 3 { + event, err := evts[0].NextPeerEvent(ctx) + if err != nil { + t.Fatal(err) + } + if event.Type == PeerLeave { + leavingPeers[event.Peer] = struct{}{} + } + } + + if _, ok := leavingPeers[hosts[1].ID()]; !ok { + t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event")) + } + if _, ok := leavingPeers[hosts[2].ID()]; !ok { + t.Fatal(fmt.Errorf("closing host did not cause a leave event")) + } + if _, ok := leavingPeers[hosts[3].ID()]; !ok { + t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event")) + } +} + +func TestSubscriptionManyNotifications(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 33 + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + evts := getTopicEvts(topics) + + subs := make([]*Subscription, numHosts) + topicPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Subscribe all peers except one and wait until they've all been found + for i := 1; i < numHosts; i++ { + subch, err := topics[i].Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + wg := sync.WaitGroup{} + for i := 1; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + topicPeersFound[i] = peersFound + evt := evts[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-2 { + event, err := evt.NextPeerEvent(ctx) + if err != nil { + panic(err) + } + if event.Type == PeerJoin { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range topicPeersFound[1:] { + if len(peersFound) != numHosts-2 { + t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2) + } + } + + // Wait for remaining peer to find other peers + remPeerTopic, remPeerEvts := topics[0], evts[0] + for len(remPeerTopic.ListPeers()) < numHosts-1 { + time.Sleep(time.Millisecond * 100) + } + + // Subscribe the remaining peer and check that all the events came through + sub, err := remPeerTopic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[0] = sub + + peerState := readAllQueuedEvents(ctx, t, remPeerEvts) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PeerJoin { + t.Fatal("non Join event occurred") + } + } + + // Unsubscribe all peers except one and check that all the events came through + for i := 1; i < numHosts; i++ { + subs[i].Cancel() + } + + // Wait for remaining peer to disconnect from the other peers + for len(topics[0].ListPeers()) != 0 { + time.Sleep(time.Millisecond * 100) + } + + peerState = readAllQueuedEvents(ctx, t, remPeerEvts) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PeerLeave { + t.Fatal("non Leave event occurred") + } + } +} + +func TestSubscriptionNotificationSubUnSub(t *testing.T) { + // Resubscribe and Unsubscribe a peers and check the state for consistency + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + + for i := 1; i < numHosts; i++ { + connect(t, hosts[0], hosts[i]) + } + time.Sleep(time.Millisecond * 100) + + notifSubThenUnSub(ctx, t, topics) +} + +func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) { + primaryTopic := topics[0] + msgs := make([]*Subscription, len(topics)) + checkSize := len(topics) - 1 + + // Subscribe all peers to the topic + var err error + for i, topic := range topics { + msgs[i], err = topic.Subscribe() + if err != nil { + t.Fatal(err) + } + } + + // Wait for the primary peer to be connected to the other peers + for len(primaryTopic.ListPeers()) < checkSize { + time.Sleep(time.Millisecond * 100) + } + + // Unsubscribe all peers except the primary + for i := 1; i < checkSize+1; i++ { + msgs[i].Cancel() + } + + // Wait for the unsubscribe messages to reach the primary peer + for len(primaryTopic.ListPeers()) < 0 { + time.Sleep(time.Millisecond * 100) + } + + // read all available events and verify that there are no events to process + // this is because every peer that joined also left + primaryEvts, err := primaryTopic.EventHandler() + if err != nil { + t.Fatal(err) + } + peerState := readAllQueuedEvents(ctx, t, primaryEvts) + + if len(peerState) != 0 { + for p, s := range peerState { + fmt.Println(p, s) + } + t.Fatalf("Received incorrect events. %d extra events", len(peerState)) + } +} + +func readAllQueuedEvents(ctx context.Context, t *testing.T, evt *TopicEventHandler) map[peer.ID]EventType { + peerState := make(map[peer.ID]EventType) + for { + ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100) + event, err := evt.NextPeerEvent(ctx) + cancel() + + if err == context.DeadlineExceeded { + break + } else if err != nil { + t.Fatal(err) + } + + e, ok := peerState[event.Peer] + if !ok { + peerState[event.Peer] = event.Type + } else if e != event.Type { + delete(peerState, event.Peer) + } + } + return peerState +}