From 8680a79d3a673142099178d0c5175eabdb94c42d Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 3 Feb 2018 11:30:19 +0200 Subject: [PATCH 01/50] add Join/Leave to PubSubRouter interface advanced routers will want to know when they are subscribed to a topic as they may want to send control messages. --- floodsub.go | 4 ++++ pubsub.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/floodsub.go b/floodsub.go index 65d640a..1b2eb08 100644 --- a/floodsub.go +++ b/floodsub.go @@ -78,3 +78,7 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { } } } + +func (fs *FloodSubRouter) Join(topic string) {} + +func (fs *FloodSubRouter) Leave(topic string) {} diff --git a/pubsub.go b/pubsub.go index 5d92af9..220bba1 100644 --- a/pubsub.go +++ b/pubsub.go @@ -96,6 +96,8 @@ type PubSubRouter interface { RemovePeer(peer.ID) HandleRPC(*RPC) Publish(peer.ID, *pb.Message) + Join(topic string) + Leave(topic string) } type Message struct { @@ -276,6 +278,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { if len(subs) == 0 { delete(p.myTopics, sub.topic) p.announce(sub.topic, false) + p.rt.Leave(sub.topic) } } @@ -290,6 +293,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { // announce we want this topic if len(subs) == 0 { p.announce(sub.topic, true) + p.rt.Join(sub.topic) } // make new if not there From 448f38072203e4fc4f1494cca159a7ff0dce6b1d Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 16 Feb 2018 22:01:15 +0200 Subject: [PATCH 02/50] gossipsub: router outline --- gossipsub.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 gossipsub.go diff --git a/gossipsub.go b/gossipsub.go new file mode 100644 index 0000000..46f50cf --- /dev/null +++ b/gossipsub.go @@ -0,0 +1,56 @@ +package floodsub + +import ( + "context" + + pb "github.com/libp2p/go-floodsub/pb" + + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + protocol "github.com/libp2p/go-libp2p-protocol" +) + +const ( + GossipSubID = protocol.ID("/meshsub/1.0.0") +) + +func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { + rt := &GossipSubRouter{} + return NewPubSub(ctx, h, rt, opts...) +} + +type GossipSubRouter struct { + p *PubSub +} + +func (fs *GossipSubRouter) Protocols() []protocol.ID { + return []protocol.ID{GossipSubID, FloodSubID} +} + +func (fs *GossipSubRouter) Attach(p *PubSub) { + fs.p = p +} + +func (fs *GossipSubRouter) AddPeer(peer.ID, protocol.ID) { + +} + +func (fs *GossipSubRouter) RemovePeer(peer.ID) { + +} + +func (fs *GossipSubRouter) HandleRPC(rpc *RPC) { + +} + +func (fs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { + +} + +func (fs *GossipSubRouter) Join(topic string) { + +} + +func (fs *GossipSubRouter) Leave(topic string) { + +} From b09c9d1a48e34ced756199b918a03deb5e1ef255 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 14:04:47 +0200 Subject: [PATCH 03/50] check and mark seen messages prior to validation this allows us to avoid revalidating messages, either because they were concurrently received or were previously rejected by the validator. also allows us to filter invalid messages from gossip. --- pubsub.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pubsub.go b/pubsub.go index 220bba1..b21e5ab 100644 --- a/pubsub.go +++ b/pubsub.go @@ -245,7 +245,7 @@ func (p *PubSub) processLoop(ctx context.Context) { p.pushMsg(vals, p.host.ID(), msg) case req := <-p.sendMsg: - p.maybePublishMessage(req.from, req.msg.Message) + p.publishMessage(req.from, req.msg.Message) case req := <-p.addVal: p.addValidator(req) @@ -409,6 +409,12 @@ func msgID(pmsg *pb.Message) string { // pushMsg pushes a message performing validation as necessary func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { + id := msgID(msg.Message) + if p.seenMessage(id) { + return + } + p.markSeen(id) + if len(vals) > 0 { // validation is asynchronous and globally throttled with the throttleValidate semaphore. // the purpose of the global throttle is to bound the goncurrency possible from incoming @@ -426,7 +432,7 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } - p.maybePublishMessage(src, msg.Message) + p.publishMessage(src, msg.Message) } // validate performs validation and only sends the message if all validators succeed @@ -476,13 +482,7 @@ loop: } } -func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { - id := msgID(pmsg) - if p.seenMessage(id) { - return - } - - p.markSeen(id) +func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { p.notifySubs(pmsg) p.rt.Publish(from, pmsg) } @@ -580,7 +580,8 @@ type listPeerReq struct { topic string } -// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done. +// sendReq is a request to call publishMessage. +// It is issued after message validation is done. type sendReq struct { from peer.ID msg *Message From 458c75b33fca54da76a66e6694f6986fef07fe7a Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 14:07:18 +0200 Subject: [PATCH 04/50] add TODO for reliable announcements --- pubsub.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pubsub.go b/pubsub.go index b21e5ab..aaea49a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -323,6 +323,8 @@ func (p *PubSub) announce(topic string, sub bool) { select { case peer <- out: default: + // TODO this needs to be reliable, schedule it for piggybacking + // in a subsequent message or retry later log.Infof("dropping announce message to peer %s: queue full", pid) } } From d6104094c8b4d749af14266c5244f5e71032e0f6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 14:10:24 +0200 Subject: [PATCH 05/50] increase timeCache length to 120s --- pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub.go b/pubsub.go index aaea49a..4362b3e 100644 --- a/pubsub.go +++ b/pubsub.go @@ -139,7 +139,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), topicVals: make(map[string]*topicVal), - seenMessages: timecache.NewTimeCache(time.Second * 30), + seenMessages: timecache.NewTimeCache(time.Second * 120), counter: uint64(time.Now().UnixNano()), } From b867200feed7143df8737387d3620b15d44a8c0c Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 14:50:14 +0200 Subject: [PATCH 06/50] gossipsub: heartbeat timer --- gossipsub.go | 24 ++++++++++++++++++++++++ pubsub.go | 7 +++++++ 2 files changed, 31 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index 46f50cf..d8dae5c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -2,6 +2,7 @@ package floodsub import ( "context" + "time" pb "github.com/libp2p/go-floodsub/pb" @@ -29,6 +30,7 @@ func (fs *GossipSubRouter) Protocols() []protocol.ID { func (fs *GossipSubRouter) Attach(p *PubSub) { fs.p = p + go fs.heartbeatTimer() } func (fs *GossipSubRouter) AddPeer(peer.ID, protocol.ID) { @@ -54,3 +56,25 @@ func (fs *GossipSubRouter) Join(topic string) { func (fs *GossipSubRouter) Leave(topic string) { } + +func (fs *GossipSubRouter) heartbeatTimer() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + select { + case fs.p.eval <- fs.heartbeat: + case <-fs.p.ctx.Done(): + return + } + case <-fs.p.ctx.Done(): + return + } + } +} + +func (fs *GossipSubRouter) heartbeat() { + +} diff --git a/pubsub.go b/pubsub.go index 4362b3e..3d0b3e8 100644 --- a/pubsub.go +++ b/pubsub.go @@ -82,6 +82,9 @@ type PubSub struct { // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} + // eval thunk in event loop + eval chan func() + peers map[peer.ID]chan *RPC seenMessages *timecache.TimeCache @@ -135,6 +138,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), validateThrottle: make(chan struct{}, defaultValidateThrottle), + eval: make(chan func()), myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), @@ -253,6 +257,9 @@ func (p *PubSub) processLoop(ctx context.Context) { case req := <-p.rmVal: p.rmValidator(req) + case thunk := <-p.eval: + thunk() + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return From 375c4176b911cdbf67e78e9111f0268cf4cf07fc Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 16:13:18 +0200 Subject: [PATCH 07/50] gossipsub publish --- gossipsub.go | 141 ++++++++++++++++++++++++++++++++++++++++++--------- mcache.go | 16 ++++++ 2 files changed, 134 insertions(+), 23 deletions(-) create mode 100644 mcache.go diff --git a/gossipsub.go b/gossipsub.go index d8dae5c..0d63f92 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -13,51 +13,142 @@ import ( const ( GossipSubID = protocol.ID("/meshsub/1.0.0") + + // overlay parameters + GossipSubD = 6 + GossipSubDlo = 4 + GossipSubDhi = 12 ) func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { - rt := &GossipSubRouter{} + rt := &GossipSubRouter{ + peers: make(map[peer.ID]protocol.ID), + mesh: make(map[string]map[peer.ID]struct{}), + fanout: make(map[string]map[peer.ID]struct{}), + mcache: NewMessageCache(5), + } return NewPubSub(ctx, h, rt, opts...) } type GossipSubRouter struct { - p *PubSub + p *PubSub + peers map[peer.ID]protocol.ID // peer protocols + mesh map[string]map[peer.ID]struct{} // topic meshes + fanout map[string]map[peer.ID]struct{} // topic fanout + mcache *MessageCache } -func (fs *GossipSubRouter) Protocols() []protocol.ID { +func (gs *GossipSubRouter) Protocols() []protocol.ID { return []protocol.ID{GossipSubID, FloodSubID} } -func (fs *GossipSubRouter) Attach(p *PubSub) { - fs.p = p - go fs.heartbeatTimer() +func (gs *GossipSubRouter) Attach(p *PubSub) { + gs.p = p + go gs.heartbeatTimer() } -func (fs *GossipSubRouter) AddPeer(peer.ID, protocol.ID) { - +func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { + gs.peers[p] = proto } -func (fs *GossipSubRouter) RemovePeer(peer.ID) { - +func (gs *GossipSubRouter) RemovePeer(p peer.ID) { + delete(gs.peers, p) + for _, peers := range gs.mesh { + delete(peers, p) + } + for _, peers := range gs.fanout { + delete(peers, p) + } } -func (fs *GossipSubRouter) HandleRPC(rpc *RPC) { - +func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { + // TODO } -func (fs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { +func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { + gs.mcache.Add(msg) + tosend := make(map[peer.ID]struct{}) + for _, topic := range msg.GetTopicIDs() { + // any peers in the topic? + tmap, ok := gs.p.topics[topic] + if !ok { + continue + } + + // floodsub peers + for p := range tmap { + if gs.peers[p] == FloodSubID { + tosend[p] = struct{}{} + } + } + + // gossipsub peers + gmap, ok := gs.mesh[topic] + if ok { + // direct peers in the mesh for topic + for p := range gmap { + tosend[p] = struct{}{} + } + } else { + // fanout peers, we are not in the mesh for topic + gmap, ok = gs.fanout[topic] + if !ok { + // we don't have any yet, pick some + var peers []peer.ID + for p := range tmap { + if gs.peers[p] == GossipSubID { + peers = append(peers, p) + } + } + + if len(peers) > 0 { + gmap = make(map[peer.ID]struct{}) + + shufflePeers(peers) + for _, p := range peers[:GossipSubD] { + gmap[p] = struct{}{} + } + + gs.fanout[topic] = gmap + } + } + } + + for p := range gmap { + tosend[p] = struct{}{} + } + } + + out := rpcWithMessages(msg) + for pid := range tosend { + if pid == from || pid == peer.ID(msg.GetFrom()) { + continue + } + + mch, ok := gs.p.peers[pid] + if !ok { + continue + } + + select { + case mch <- out: + default: + log.Infof("dropping message to peer %s: queue full", pid) + // Drop it. The peer is too slow. + } + } } -func (fs *GossipSubRouter) Join(topic string) { - +func (gs *GossipSubRouter) Join(topic string) { + // TODO } -func (fs *GossipSubRouter) Leave(topic string) { - +func (gs *GossipSubRouter) Leave(topic string) { + // TODO } -func (fs *GossipSubRouter) heartbeatTimer() { +func (gs *GossipSubRouter) heartbeatTimer() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -65,16 +156,20 @@ func (fs *GossipSubRouter) heartbeatTimer() { select { case <-ticker.C: select { - case fs.p.eval <- fs.heartbeat: - case <-fs.p.ctx.Done(): + case gs.p.eval <- gs.heartbeat: + case <-gs.p.ctx.Done(): return } - case <-fs.p.ctx.Done(): + case <-gs.p.ctx.Done(): return } } } -func (fs *GossipSubRouter) heartbeat() { - +func (gs *GossipSubRouter) heartbeat() { + // TODO +} + +func shufflePeers(peers []peer.ID) { + // TODO } diff --git a/mcache.go b/mcache.go new file mode 100644 index 0000000..47e869e --- /dev/null +++ b/mcache.go @@ -0,0 +1,16 @@ +package floodsub + +import ( + pb "github.com/libp2p/go-floodsub/pb" +) + +func NewMessageCache(win int) *MessageCache { + return &MessageCache{} +} + +type MessageCache struct { +} + +func (mc *MessageCache) Add(msg *pb.Message) { + // TODO +} From dd50a31c40c85e9a0541397db87bf4bd5d299e85 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 17:24:19 +0200 Subject: [PATCH 08/50] add gossipsub control messages --- pb/rpc.pb.go | 136 +++++++++++++++++++++++++++++++++++++++++++++++++-- pb/rpc.proto | 26 ++++++++++ 2 files changed, 159 insertions(+), 3 deletions(-) diff --git a/pb/rpc.pb.go b/pb/rpc.pb.go index a5933c0..f970bbc 100644 --- a/pb/rpc.pb.go +++ b/pb/rpc.pb.go @@ -11,6 +11,11 @@ It is generated from these files: It has these top-level messages: RPC Message + ControlMessage + ControlIHave + ControlIWant + ControlGraft + ControlPrune TopicDescriptor */ package floodsub_pb @@ -97,9 +102,10 @@ func (x *TopicDescriptor_EncOpts_EncMode) UnmarshalJSON(data []byte) error { } type RPC struct { - Subscriptions []*RPC_SubOpts `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"` - Publish []*Message `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"` - XXX_unrecognized []byte `json:"-"` + Subscriptions []*RPC_SubOpts `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"` + Publish []*Message `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"` + Control *ControlMessage `protobuf:"bytes,3,opt,name=control" json:"control,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *RPC) Reset() { *m = RPC{} } @@ -120,6 +126,13 @@ func (m *RPC) GetPublish() []*Message { return nil } +func (m *RPC) GetControl() *ControlMessage { + if m != nil { + return m.Control + } + return nil +} + type RPC_SubOpts struct { Subscribe *bool `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"` Topicid *string `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"` @@ -184,6 +197,118 @@ func (m *Message) GetTopicIDs() []string { return nil } +type ControlMessage struct { + Ihave []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"` + Iwant []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"` + Graft []*ControlGraft `protobuf:"bytes,3,rep,name=graft" json:"graft,omitempty"` + Prune []*ControlPrune `protobuf:"bytes,4,rep,name=prune" json:"prune,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlMessage) Reset() { *m = ControlMessage{} } +func (m *ControlMessage) String() string { return proto.CompactTextString(m) } +func (*ControlMessage) ProtoMessage() {} + +func (m *ControlMessage) GetIhave() []*ControlIHave { + if m != nil { + return m.Ihave + } + return nil +} + +func (m *ControlMessage) GetIwant() []*ControlIWant { + if m != nil { + return m.Iwant + } + return nil +} + +func (m *ControlMessage) GetGraft() []*ControlGraft { + if m != nil { + return m.Graft + } + return nil +} + +func (m *ControlMessage) GetPrune() []*ControlPrune { + if m != nil { + return m.Prune + } + return nil +} + +type ControlIHave struct { + TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` + MessageIDs []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlIHave) Reset() { *m = ControlIHave{} } +func (m *ControlIHave) String() string { return proto.CompactTextString(m) } +func (*ControlIHave) ProtoMessage() {} + +func (m *ControlIHave) GetTopicID() string { + if m != nil && m.TopicID != nil { + return *m.TopicID + } + return "" +} + +func (m *ControlIHave) GetMessageIDs() []string { + if m != nil { + return m.MessageIDs + } + return nil +} + +type ControlIWant struct { + MessageIDs []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlIWant) Reset() { *m = ControlIWant{} } +func (m *ControlIWant) String() string { return proto.CompactTextString(m) } +func (*ControlIWant) ProtoMessage() {} + +func (m *ControlIWant) GetMessageIDs() []string { + if m != nil { + return m.MessageIDs + } + return nil +} + +type ControlGraft struct { + TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlGraft) Reset() { *m = ControlGraft{} } +func (m *ControlGraft) String() string { return proto.CompactTextString(m) } +func (*ControlGraft) ProtoMessage() {} + +func (m *ControlGraft) GetTopicID() string { + if m != nil && m.TopicID != nil { + return *m.TopicID + } + return "" +} + +type ControlPrune struct { + TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlPrune) Reset() { *m = ControlPrune{} } +func (m *ControlPrune) String() string { return proto.CompactTextString(m) } +func (*ControlPrune) ProtoMessage() {} + +func (m *ControlPrune) GetTopicID() string { + if m != nil && m.TopicID != nil { + return *m.TopicID + } + return "" +} + // topicID = hash(topicDescriptor); (not the topic.name) type TopicDescriptor struct { Name *string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` @@ -269,6 +394,11 @@ func init() { proto.RegisterType((*RPC)(nil), "floodsub.pb.RPC") proto.RegisterType((*RPC_SubOpts)(nil), "floodsub.pb.RPC.SubOpts") proto.RegisterType((*Message)(nil), "floodsub.pb.Message") + proto.RegisterType((*ControlMessage)(nil), "floodsub.pb.ControlMessage") + proto.RegisterType((*ControlIHave)(nil), "floodsub.pb.ControlIHave") + proto.RegisterType((*ControlIWant)(nil), "floodsub.pb.ControlIWant") + proto.RegisterType((*ControlGraft)(nil), "floodsub.pb.ControlGraft") + proto.RegisterType((*ControlPrune)(nil), "floodsub.pb.ControlPrune") proto.RegisterType((*TopicDescriptor)(nil), "floodsub.pb.TopicDescriptor") proto.RegisterType((*TopicDescriptor_AuthOpts)(nil), "floodsub.pb.TopicDescriptor.AuthOpts") proto.RegisterType((*TopicDescriptor_EncOpts)(nil), "floodsub.pb.TopicDescriptor.EncOpts") diff --git a/pb/rpc.proto b/pb/rpc.proto index f43d3c1..a7fe600 100644 --- a/pb/rpc.proto +++ b/pb/rpc.proto @@ -8,6 +8,8 @@ message RPC { optional bool subscribe = 1; // subscribe or unsubcribe optional string topicid = 2; } + + optional ControlMessage control = 3; } message Message { @@ -17,6 +19,30 @@ message Message { repeated string topicIDs = 4; } +message ControlMessage { + repeated ControlIHave ihave = 1; + repeated ControlIWant iwant = 2; + repeated ControlGraft graft = 3; + repeated ControlPrune prune = 4; +} + +message ControlIHave { + optional string topicID = 1; + repeated string messageIDs = 2; +} + +message ControlIWant { + repeated string messageIDs = 1; +} + +message ControlGraft { + optional string topicID = 1; +} + +message ControlPrune { + optional string topicID = 1; +} + // topicID = hash(topicDescriptor); (not the topic.name) message TopicDescriptor { optional string name = 1; From 6a177a73962e8e56d14abf120744396c0f98344e Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 18:45:10 +0200 Subject: [PATCH 09/50] handle gossipsub control messages --- comm.go | 18 ++++++++ gossipsub.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++- mcache.go | 5 +++ 3 files changed, 138 insertions(+), 1 deletion(-) diff --git a/comm.go b/comm.go index 430e6a6..182e992 100644 --- a/comm.go +++ b/comm.go @@ -104,3 +104,21 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { func rpcWithMessages(msgs ...*pb.Message) *RPC { return &RPC{RPC: pb.RPC{Publish: msgs}} } + +func rpcWithControl(msgs []*pb.Message, + ihave []*pb.ControlIHave, + iwant []*pb.ControlIWant, + graft []*pb.ControlGraft, + prune []*pb.ControlPrune) *RPC { + return &RPC{ + RPC: pb.RPC{ + Publish: msgs, + Control: &pb.ControlMessage{ + Ihave: ihave, + Iwant: iwant, + Graft: graft, + Prune: prune, + }, + }, + } +} diff --git a/gossipsub.go b/gossipsub.go index 0d63f92..2ecbb0a 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -62,7 +62,121 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) { } func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { - // TODO + ctl := rpc.GetControl() + if ctl == nil { + return + } + + iwant := gs.handleIHave(ctl) + msgs := gs.handleIWant(ctl) + prune := gs.handleGraft(rpc.from, ctl) + gs.handlePrune(rpc.from, ctl) + + if len(iwant) == 0 && len(msgs) == 0 && len(prune) == 0 { + return + } + + // TODO piggyback gossip IHAVE + out := rpcWithControl(msgs, nil, iwant, nil, prune) + + mch, ok := gs.p.peers[rpc.from] + if !ok { + return + } + + select { + case mch <- out: + default: + // TODO PRUNE messages should be reliable; schedule for piggybacking or retry + log.Infof("dropping message to peer %s: queue full", rpc.from) + } +} + +func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWant { + iwant := make(map[string]struct{}) + + for _, ihave := range ctl.GetIhave() { + topic := ihave.GetTopicID() + _, ok := gs.mesh[topic] + if !ok { + continue + } + + for _, mid := range ihave.GetMessageIDs() { + if gs.p.seenMessage(mid) { + continue + } + iwant[mid] = struct{}{} + } + } + + if len(iwant) == 0 { + return nil + } + + iwantlst := make([]string, 0, len(iwant)) + for mid := range iwant { + iwantlst = append(iwantlst, mid) + } + + return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}} +} + +func (gs *GossipSubRouter) handleIWant(ctl *pb.ControlMessage) []*pb.Message { + ihave := make(map[string]*pb.Message) + for _, iwant := range ctl.GetIwant() { + for _, mid := range iwant.GetMessageIDs() { + msg, ok := gs.mcache.Get(mid) + if ok { + ihave[mid] = msg + } + } + } + + if len(ihave) == 0 { + return nil + } + + msgs := make([]*pb.Message, 0, len(ihave)) + for _, msg := range ihave { + msgs = append(msgs, msg) + } + + return msgs +} + +func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune { + var prune []string + for _, graft := range ctl.GetGraft() { + topic := graft.GetTopicID() + peers, ok := gs.mesh[topic] + if !ok { + prune = append(prune, topic) + } else { + peers[p] = struct{}{} + } + } + + if len(prune) == 0 { + return nil + } + + cprune := make([]*pb.ControlPrune, 0, len(prune)) + for _, topic := range prune { + cprune = append(cprune, &pb.ControlPrune{TopicID: &topic}) + } + + return cprune +} + +func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { + for _, prune := range ctl.GetPrune() { + topic := prune.GetTopicID() + peers, ok := gs.mesh[topic] + if ok { + delete(peers, p) + } + } } func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { diff --git a/mcache.go b/mcache.go index 47e869e..01d78f6 100644 --- a/mcache.go +++ b/mcache.go @@ -14,3 +14,8 @@ type MessageCache struct { func (mc *MessageCache) Add(msg *pb.Message) { // TODO } + +func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { + // TODO + return nil, false +} From e1fbe11c973244b2722c32cbc97661709c48b576 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 19:47:54 +0200 Subject: [PATCH 10/50] refactor Publish to use getPeers --- gossipsub.go | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 2ecbb0a..e7d469a 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -199,27 +199,16 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { // gossipsub peers gmap, ok := gs.mesh[topic] - if ok { - // direct peers in the mesh for topic - for p := range gmap { - tosend[p] = struct{}{} - } - } else { - // fanout peers, we are not in the mesh for topic + if !ok { + // we are not in the mesh for topic, use fanout peers gmap, ok = gs.fanout[topic] if !ok { - // we don't have any yet, pick some - var peers []peer.ID - for p := range tmap { - if gs.peers[p] == GossipSubID { - peers = append(peers, p) - } - } + // we don't have any, pick some + peers := gs.getPeers(topic, func(peer.ID) bool { return true }) if len(peers) > 0 { gmap = make(map[peer.ID]struct{}) - shufflePeers(peers) for _, p := range peers[:GossipSubD] { gmap[p] = struct{}{} } @@ -254,6 +243,24 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { } } +func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID { + tmap, ok := gs.p.topics[topic] + if !ok { + return nil + } + + peers := make([]peer.ID, 0, len(tmap)) + for p := range tmap { + if gs.peers[p] == GossipSubID && filter(p) { + peers = append(peers, p) + } + } + + shufflePeers(peers) + + return peers +} + func (gs *GossipSubRouter) Join(topic string) { // TODO } From 34509d47b3d6aee537cc1a000edb6e24a1cf3fe0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 20:16:58 +0200 Subject: [PATCH 11/50] implement Join and Leave, refactor sendRPC --- gossipsub.go | 126 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 39 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index e7d469a..d34bfff 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -68,27 +68,19 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { } iwant := gs.handleIHave(ctl) - msgs := gs.handleIWant(ctl) + ihave := gs.handleIWant(ctl) prune := gs.handleGraft(rpc.from, ctl) gs.handlePrune(rpc.from, ctl) - if len(iwant) == 0 && len(msgs) == 0 && len(prune) == 0 { + if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 { return } - // TODO piggyback gossip IHAVE - out := rpcWithControl(msgs, nil, iwant, nil, prune) - - mch, ok := gs.p.peers[rpc.from] - if !ok { - return - } - - select { - case mch <- out: - default: - // TODO PRUNE messages should be reliable; schedule for piggybacking or retry - log.Infof("dropping message to peer %s: queue full", rpc.from) + out := rpcWithControl(ihave, nil, iwant, nil, prune) + if len(prune) == 0 { + gs.sendMessage(rpc.from, out) + } else { + gs.sendControl(rpc.from, out) } } @@ -207,12 +199,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { peers := gs.getPeers(topic, func(peer.ID) bool { return true }) if len(peers) > 0 { - gmap = make(map[peer.ID]struct{}) - - for _, p := range peers[:GossipSubD] { - gmap[p] = struct{}{} - } - + gmap = peerListToMap(peers[:GossipSubD]) gs.fanout[topic] = gmap } } @@ -229,17 +216,78 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { continue } - mch, ok := gs.p.peers[pid] - if !ok { - continue - } + gs.sendMessage(pid, out) + } +} - select { - case mch <- out: - default: - log.Infof("dropping message to peer %s: queue full", pid) - // Drop it. The peer is too slow. - } +func (gs *GossipSubRouter) Join(topic string) { + gmap, ok := gs.mesh[topic] + if ok { + return + } + + gmap, ok = gs.fanout[topic] + if ok { + gs.mesh[topic] = gmap + delete(gs.fanout, topic) + } else { + peers := gs.getPeers(topic, func(peer.ID) bool { return true }) + gmap = peerListToMap(peers[:GossipSubD]) + gs.mesh[topic] = gmap + } + + for p := range gmap { + gs.sendGraft(p, topic) + } +} + +func (gs *GossipSubRouter) Leave(topic string) { + gmap, ok := gs.mesh[topic] + if !ok { + return + } + + for p := range gmap { + gs.sendPrune(p, topic) + } + delete(gs.mesh, topic) +} + +func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { + graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}} + out := rpcWithControl(nil, nil, nil, graft, nil) + gs.sendControl(p, out) +} + +func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { + prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}} + out := rpcWithControl(nil, nil, nil, nil, prune) + gs.sendControl(p, out) +} + +func (gs *GossipSubRouter) sendControl(p peer.ID, out *RPC) { + gs.sendRPC(p, out, true) +} + +func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) { + gs.sendRPC(p, out, false) +} + +func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { + // TODO control messages and gossip piggyback + // - control messages (GRAFT/PRUNE) must be reliable and should + // be scheduled for piggyback or retry if the queue is full + // - gossip should be piggybacked on messages oppurtinistcally + + mch, ok := gs.p.peers[p] + if !ok { + return + } + + select { + case mch <- out: + default: + log.Infof("dropping message to peer %s: queue full", p) } } @@ -261,14 +309,6 @@ func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []p return peers } -func (gs *GossipSubRouter) Join(topic string) { - // TODO -} - -func (gs *GossipSubRouter) Leave(topic string) { - // TODO -} - func (gs *GossipSubRouter) heartbeatTimer() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -291,6 +331,14 @@ func (gs *GossipSubRouter) heartbeat() { // TODO } +func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { + pmap := make(map[peer.ID]struct{}) + for _, p := range peers { + pmap[p] = struct{}{} + } + return pmap +} + func shufflePeers(peers []peer.ID) { // TODO } From 73da34138657f6bb5de21da0b9411c35e6e9c16e Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 21:24:17 +0200 Subject: [PATCH 12/50] hearbeat preliminaries: overlay management --- gossipsub.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++---- mcache.go | 8 ++++-- 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index d34bfff..a399f88 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -25,7 +25,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er peers: make(map[peer.ID]protocol.ID), mesh: make(map[string]map[peer.ID]struct{}), fanout: make(map[string]map[peer.ID]struct{}), - mcache: NewMessageCache(5), + mcache: NewMessageCache(3, 5), } return NewPubSub(ctx, h, rt, opts...) } @@ -172,7 +172,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { } func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { - gs.mcache.Add(msg) + gs.mcache.Put(msg) tosend := make(map[peer.ID]struct{}) for _, topic := range msg.GetTopicIDs() { @@ -274,10 +274,10 @@ func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) { } func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { - // TODO control messages and gossip piggyback + // TODO control message reliability and gossip piggyback // - control messages (GRAFT/PRUNE) must be reliable and should // be scheduled for piggyback or retry if the queue is full - // - gossip should be piggybacked on messages oppurtinistcally + // - gossip (IHAVE) should be piggybacked on messages oppurtinistcally mch, ok := gs.p.peers[p] if !ok { @@ -328,7 +328,70 @@ func (gs *GossipSubRouter) heartbeatTimer() { } func (gs *GossipSubRouter) heartbeat() { - // TODO + gs.mcache.Shift() + + tograft := make(map[peer.ID][]string) + toprune := make(map[peer.ID][]string) + + for topic, peers := range gs.mesh { + + if len(peers) < GossipSubDlo { + ineed := GossipSubD - len(peers) + plst := gs.getPeers(topic, func(p peer.ID) bool { + _, ok := peers[p] + return !ok + }) + + for _, p := range plst[:ineed] { + peers[p] = struct{}{} + topics := tograft[p] + tograft[p] = append(topics, topic) + } + } + + if len(peers) > GossipSubDhi { + idontneed := len(peers) - GossipSubD + plst := peerMapToList(peers) + shufflePeers(plst) + for _, p := range plst[:idontneed] { + delete(peers, p) + topics := toprune[p] + toprune[p] = append(topics, topic) + } + } + + // TODO gossip + } + + for p, topics := range tograft { + graft := make([]*pb.ControlGraft, 0, len(topics)) + for _, topic := range topics { + graft = append(graft, &pb.ControlGraft{TopicID: &topic}) + } + + var prune []*pb.ControlPrune + pruning, ok := toprune[p] + if ok { + delete(toprune, p) + prune = make([]*pb.ControlPrune, 0, len(pruning)) + for _, topic := range pruning { + prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + } + } + + out := rpcWithControl(nil, nil, nil, graft, prune) + gs.sendControl(p, out) + } + + for p, topics := range toprune { + prune := make([]*pb.ControlPrune, 0, len(topics)) + for _, topic := range topics { + prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + } + + out := rpcWithControl(nil, nil, nil, nil, prune) + gs.sendControl(p, out) + } } func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { @@ -339,6 +402,14 @@ func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { return pmap } +func peerMapToList(peers map[peer.ID]struct{}) []peer.ID { + plst := make([]peer.ID, 0, len(peers)) + for p := range peers { + plst = append(plst, p) + } + return plst +} + func shufflePeers(peers []peer.ID) { // TODO } diff --git a/mcache.go b/mcache.go index 01d78f6..0b79892 100644 --- a/mcache.go +++ b/mcache.go @@ -4,14 +4,14 @@ import ( pb "github.com/libp2p/go-floodsub/pb" ) -func NewMessageCache(win int) *MessageCache { +func NewMessageCache(gossip, history int) *MessageCache { return &MessageCache{} } type MessageCache struct { } -func (mc *MessageCache) Add(msg *pb.Message) { +func (mc *MessageCache) Put(msg *pb.Message) { // TODO } @@ -19,3 +19,7 @@ func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { // TODO return nil, false } + +func (mc *MessageCache) Shift() { + // TODO +} From bc25116516756346541e333da07b3eaf92410ad7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 10:22:53 +0200 Subject: [PATCH 13/50] clean peers that have left the topic on heartbeat --- gossipsub.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index a399f88..0cdf7c3 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -335,6 +335,15 @@ func (gs *GossipSubRouter) heartbeat() { for topic, peers := range gs.mesh { + // check whether our peers are still in the topic + for p := range peers { + _, ok := gs.p.topics[topic][p] + if !ok { + delete(peers, p) + } + } + + // do we have enough peers? if len(peers) < GossipSubDlo { ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, func(p peer.ID) bool { @@ -349,10 +358,12 @@ func (gs *GossipSubRouter) heartbeat() { } } + // do we have too many peers if len(peers) > GossipSubDhi { idontneed := len(peers) - GossipSubD plst := peerMapToList(peers) shufflePeers(plst) + for _, p := range plst[:idontneed] { delete(peers, p) topics := toprune[p] @@ -363,6 +374,7 @@ func (gs *GossipSubRouter) heartbeat() { // TODO gossip } + // send coalesced GRAFT/PRUNE messages for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics)) for _, topic := range topics { From 78618fce232488405ab928737f8f95442a4149fb Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 10:43:27 +0200 Subject: [PATCH 14/50] maintain fanout peer lists on heartbeat --- gossipsub.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index 0cdf7c3..849ed4c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -404,6 +404,30 @@ func (gs *GossipSubRouter) heartbeat() { out := rpcWithControl(nil, nil, nil, nil, prune) gs.sendControl(p, out) } + + // maintain our fanout for topics we are publishing but we have not joined + for topic, peers := range gs.fanout { + // check whether our peers are still in the topic + for p := range peers { + _, ok := gs.p.topics[topic][p] + if !ok { + delete(peers, p) + } + } + + // do we need more peers + if len(peers) < GossipSubD { + ineed := GossipSubD - len(peers) + plst := gs.getPeers(topic, func(p peer.ID) bool { + _, ok := peers[p] + return !ok + }) + + for _, p := range plst[:ineed] { + peers[p] = struct{}{} + } + } + } } func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { From 7251c64e65b83723c2d499ce04f16b072b87cf64 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 12:08:18 +0200 Subject: [PATCH 15/50] control message piggybacking logic --- gossipsub.go | 139 +++++++++++++++++++++++++++++++++------------------ mcache.go | 4 ++ 2 files changed, 94 insertions(+), 49 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 849ed4c..4f7ac3d 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -22,20 +22,24 @@ const ( func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { rt := &GossipSubRouter{ - peers: make(map[peer.ID]protocol.ID), - mesh: make(map[string]map[peer.ID]struct{}), - fanout: make(map[string]map[peer.ID]struct{}), - mcache: NewMessageCache(3, 5), + peers: make(map[peer.ID]protocol.ID), + mesh: make(map[string]map[peer.ID]struct{}), + fanout: make(map[string]map[peer.ID]struct{}), + gossip: make(map[peer.ID][]*pb.ControlIHave), + control: make(map[peer.ID]*pb.ControlMessage), + mcache: NewMessageCache(3, 5), } return NewPubSub(ctx, h, rt, opts...) } type GossipSubRouter struct { - p *PubSub - peers map[peer.ID]protocol.ID // peer protocols - mesh map[string]map[peer.ID]struct{} // topic meshes - fanout map[string]map[peer.ID]struct{} // topic fanout - mcache *MessageCache + p *PubSub + peers map[peer.ID]protocol.ID // peer protocols + mesh map[string]map[peer.ID]struct{} // topic meshes + fanout map[string]map[peer.ID]struct{} // topic fanout + gossip map[peer.ID][]*pb.ControlIHave // pending gossip + control map[peer.ID]*pb.ControlMessage // pending control messages + mcache *MessageCache } func (gs *GossipSubRouter) Protocols() []protocol.ID { @@ -77,11 +81,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { } out := rpcWithControl(ihave, nil, iwant, nil, prune) - if len(prune) == 0 { - gs.sendMessage(rpc.from, out) - } else { - gs.sendControl(rpc.from, out) - } + gs.sendRPC(rpc.from, out) } func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWant { @@ -216,7 +216,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { continue } - gs.sendMessage(pid, out) + gs.sendRPC(pid, out) } } @@ -256,28 +256,29 @@ func (gs *GossipSubRouter) Leave(topic string) { func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}} out := rpcWithControl(nil, nil, nil, graft, nil) - gs.sendControl(p, out) + gs.sendRPC(p, out) } func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}} out := rpcWithControl(nil, nil, nil, nil, prune) - gs.sendControl(p, out) + gs.sendRPC(p, out) } -func (gs *GossipSubRouter) sendControl(p peer.ID, out *RPC) { - gs.sendRPC(p, out, true) -} +func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { + // piggyback cotrol message retries + ctl, ok := gs.control[p] + if ok { + gs.piggybackControl(p, out, ctl) + delete(gs.control, p) + } -func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) { - gs.sendRPC(p, out, false) -} - -func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { - // TODO control message reliability and gossip piggyback - // - control messages (GRAFT/PRUNE) must be reliable and should - // be scheduled for piggyback or retry if the queue is full - // - gossip (IHAVE) should be piggybacked on messages oppurtinistcally + // piggyback gossip + ihave, ok := gs.gossip[p] + if ok { + gs.piggybackGossip(p, out, ihave) + delete(gs.gossip, p) + } mch, ok := gs.p.peers[p] if !ok { @@ -288,25 +289,12 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { case mch <- out: default: log.Infof("dropping message to peer %s: queue full", p) - } -} - -func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID { - tmap, ok := gs.p.topics[topic] - if !ok { - return nil - } - - peers := make([]peer.ID, 0, len(tmap)) - for p := range tmap { - if gs.peers[p] == GossipSubID && filter(p) { - peers = append(peers, p) + // push control messages that need to be retried + ctl := out.GetControl() + if ctl != nil { + gs.pushControl(p, ctl) } } - - shufflePeers(peers) - - return peers } func (gs *GossipSubRouter) heartbeatTimer() { @@ -330,6 +318,10 @@ func (gs *GossipSubRouter) heartbeatTimer() { func (gs *GossipSubRouter) heartbeat() { gs.mcache.Shift() + // flush pending control message from retries and gossip + // that hasn't been piggybacked since the last heartbeat + gs.flush() + tograft := make(map[peer.ID][]string) toprune := make(map[peer.ID][]string) @@ -371,7 +363,16 @@ func (gs *GossipSubRouter) heartbeat() { } } - // TODO gossip + // schedule gossip + mids := gs.mcache.GetGossipIDs(topic) + if len(mids) == 0 { + continue + } + + gpeers := gs.getPeers(topic, func(peer.ID) bool { return true }) + for _, p := range gpeers[:GossipSubD] { + gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) + } } // send coalesced GRAFT/PRUNE messages @@ -392,7 +393,7 @@ func (gs *GossipSubRouter) heartbeat() { } out := rpcWithControl(nil, nil, nil, graft, prune) - gs.sendControl(p, out) + gs.sendRPC(p, out) } for p, topics := range toprune { @@ -402,7 +403,7 @@ func (gs *GossipSubRouter) heartbeat() { } out := rpcWithControl(nil, nil, nil, nil, prune) - gs.sendControl(p, out) + gs.sendRPC(p, out) } // maintain our fanout for topics we are publishing but we have not joined @@ -430,6 +431,46 @@ func (gs *GossipSubRouter) heartbeat() { } } +func (gs *GossipSubRouter) flush() { + // TODO +} + +func (gs *GossipSubRouter) pushGossip(p peer.ID, ihave *pb.ControlIHave) { + gossip := gs.gossip[p] + gossip = append(gossip, ihave) + gs.gossip[p] = gossip +} + +func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) { + // TODO +} + +func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) { + // TODO +} + +func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) { + // TODO +} + +func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID { + tmap, ok := gs.p.topics[topic] + if !ok { + return nil + } + + peers := make([]peer.ID, 0, len(tmap)) + for p := range tmap { + if gs.peers[p] == GossipSubID && filter(p) { + peers = append(peers, p) + } + } + + shufflePeers(peers) + + return peers +} + func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { pmap := make(map[peer.ID]struct{}) for _, p := range peers { diff --git a/mcache.go b/mcache.go index 0b79892..0ab465d 100644 --- a/mcache.go +++ b/mcache.go @@ -20,6 +20,10 @@ func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { return nil, false } +func (mc *MessageCache) GetGossipIDs(topic string) []string { + return nil +} + func (mc *MessageCache) Shift() { // TODO } From 74a10cfa70ab25bb08581e0ded18cc18d51516f8 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 12:36:49 +0200 Subject: [PATCH 16/50] piggybacking details --- gossipsub.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 4f7ac3d..4086e55 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -441,16 +441,61 @@ func (gs *GossipSubRouter) pushGossip(p peer.ID, ihave *pb.ControlIHave) { gs.gossip[p] = gossip } -func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) { - // TODO +func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) { + ctl := out.GetControl() + if ctl == nil { + ctl = &pb.ControlMessage{} + out.Control = ctl + } + + ctl.Ihave = ihave } -func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) { - // TODO +func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) { + // remove IHAVE/IWANT from control message, gossip is not retried + ctl.Ihave = nil + ctl.Iwant = nil + gs.control[p] = ctl } func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) { - // TODO + // check control message for staleness first + var tograft []*pb.ControlGraft + var toprune []*pb.ControlPrune + + for _, graft := range ctl.GetGraft() { + topic := graft.GetTopicID() + peers, ok := gs.mesh[topic] + if !ok { + continue + } + _, ok = peers[p] + if ok { + tograft = append(tograft, graft) + } + } + + for _, prune := range ctl.GetPrune() { + topic := prune.GetTopicID() + peers, ok := gs.mesh[topic] + if !ok { + toprune = append(toprune, prune) + continue + } + _, ok = peers[p] + if !ok { + toprune = append(toprune, prune) + } + } + + xctl := out.Control + if xctl == nil { + xctl = &pb.ControlMessage{} + out.Control = xctl + } + + xctl.Graft = append(xctl.Graft, tograft...) + xctl.Prune = append(xctl.Prune, toprune...) } func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID { From 07875f149e5e6b147400c5c2195e92e85d8f2c59 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 12:50:24 +0200 Subject: [PATCH 17/50] implement flush --- gossipsub.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 4086e55..bf20a09 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -432,7 +432,19 @@ func (gs *GossipSubRouter) heartbeat() { } func (gs *GossipSubRouter) flush() { - // TODO + // send gossip first, which will also piggyback control + for p, ihave := range gs.gossip { + delete(gs.gossip, p) + out := rpcWithControl(nil, ihave, nil, nil, nil) + gs.sendRPC(p, out) + } + + // send the remaining control messages + for p, ctl := range gs.control { + delete(gs.control, p) + out := rpcWithControl(nil, nil, nil, ctl.Graft, ctl.Prune) + gs.sendRPC(p, out) + } } func (gs *GossipSubRouter) pushGossip(p peer.ID, ihave *pb.ControlIHave) { @@ -455,7 +467,9 @@ func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) { // remove IHAVE/IWANT from control message, gossip is not retried ctl.Ihave = nil ctl.Iwant = nil - gs.control[p] = ctl + if ctl.Graft != nil || ctl.Prune != nil { + gs.control[p] = ctl + } } func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) { From 8fbc4e1c70685f0c04888da0203fad2ae14db60d Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 13:56:23 +0200 Subject: [PATCH 18/50] implement mcache --- mcache.go | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/mcache.go b/mcache.go index 0ab465d..f501d1e 100644 --- a/mcache.go +++ b/mcache.go @@ -5,25 +5,57 @@ import ( ) func NewMessageCache(gossip, history int) *MessageCache { - return &MessageCache{} + return &MessageCache{ + msgs: make(map[string]*pb.Message), + history: make([][]CacheEntry, history), + gossip: gossip, + } } type MessageCache struct { + msgs map[string]*pb.Message + history [][]CacheEntry + gossip int +} + +type CacheEntry struct { + mid string + topics []string } func (mc *MessageCache) Put(msg *pb.Message) { - // TODO + mid := msgID(msg) + mc.msgs[mid] = msg + mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topics: msg.GetTopicIDs()}) } func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { - // TODO - return nil, false + m, ok := mc.msgs[mid] + return m, ok } func (mc *MessageCache) GetGossipIDs(topic string) []string { - return nil + var mids []string + for _, entries := range mc.history[:mc.gossip] { + for _, entry := range entries { + for _, t := range entry.topics { + if t == topic { + mids = append(mids, entry.mid) + break + } + } + } + } + return mids } func (mc *MessageCache) Shift() { - // TODO + last := mc.history[len(mc.history)-1] + for _, entry := range last { + delete(mc.msgs, entry.mid) + } + for i := len(mc.history) - 2; i >= 0; i-- { + mc.history[i+1] = mc.history[i] + } + mc.history[0] = nil } From 64d35994d16ca5aaf001c47a07804fc049806eba Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 14:00:33 +0200 Subject: [PATCH 19/50] shuffle peers --- gossipsub.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index bf20a09..0eb3910 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -2,6 +2,7 @@ package floodsub import ( "context" + "math/rand" "time" pb "github.com/libp2p/go-floodsub/pb" @@ -350,7 +351,7 @@ func (gs *GossipSubRouter) heartbeat() { } } - // do we have too many peers + // do we have too many peers? if len(peers) > GossipSubDhi { idontneed := len(peers) - GossipSubD plst := peerMapToList(peers) @@ -526,7 +527,6 @@ func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []p } shufflePeers(peers) - return peers } @@ -547,5 +547,8 @@ func peerMapToList(peers map[peer.ID]struct{}) []peer.ID { } func shufflePeers(peers []peer.ID) { - // TODO + for i := range peers { + j := rand.Intn(i + 1) + peers[i], peers[j] = peers[j], peers[i] + } } From 0e288dc7412b936eac3c485f6b37d1a6a5cd60c1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 14:12:52 +0200 Subject: [PATCH 20/50] delete mesh before sending prunes on leave --- gossipsub.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 0eb3910..63dfef3 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -248,10 +248,11 @@ func (gs *GossipSubRouter) Leave(topic string) { return } + delete(gs.mesh, topic) + for p := range gmap { gs.sendPrune(p, topic) } - delete(gs.mesh, topic) } func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { @@ -417,7 +418,7 @@ func (gs *GossipSubRouter) heartbeat() { } } - // do we need more peers + // do we need more peers? if len(peers) < GossipSubD { ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, func(p peer.ID) bool { From c5fe29038957c3b83cc619e98b37d37c7a325ae9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 15:10:16 +0200 Subject: [PATCH 21/50] reduce gossip amplification; don't send to mesh peers --- gossipsub.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gossipsub.go b/gossipsub.go index 63dfef3..7efc160 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -373,7 +373,11 @@ func (gs *GossipSubRouter) heartbeat() { gpeers := gs.getPeers(topic, func(peer.ID) bool { return true }) for _, p := range gpeers[:GossipSubD] { - gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) + // skip mesh peers + _, ok := peers[p] + if !ok { + gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) + } } } From bd29e81e3a132b041dfc5d6bf1658fa16b445799 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 16:10:53 +0200 Subject: [PATCH 22/50] history and gossip length are named constants --- gossipsub.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gossipsub.go b/gossipsub.go index 7efc160..5a02f63 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -19,6 +19,10 @@ const ( GossipSubD = 6 GossipSubDlo = 4 GossipSubDhi = 12 + + // gossip parameters + GossipSubHistoryLength = 5 + GossipSubHistoryGossip = 3 ) func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { @@ -28,7 +32,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er fanout: make(map[string]map[peer.ID]struct{}), gossip: make(map[peer.ID][]*pb.ControlIHave), control: make(map[peer.ID]*pb.ControlMessage), - mcache: NewMessageCache(3, 5), + mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), } return NewPubSub(ctx, h, rt, opts...) } From 64cdbbabbc8a6aa67faec4d124d2092c846c41e7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 18:23:28 +0200 Subject: [PATCH 23/50] remove pending gossip and control messages on RemovePeer --- gossipsub.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index 5a02f63..03cd928 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -68,6 +68,8 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) { for _, peers := range gs.fanout { delete(peers, p) } + delete(gs.gossip, p) + delete(gs.control, p) } func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { From 599ccffecdea8a4287c2bd7ec7307499b05bb629 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 10:59:00 +0200 Subject: [PATCH 24/50] shift the message history window at the end of the heartbeat --- gossipsub.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 03cd928..3bce11c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -324,8 +324,6 @@ func (gs *GossipSubRouter) heartbeatTimer() { } func (gs *GossipSubRouter) heartbeat() { - gs.mcache.Shift() - // flush pending control message from retries and gossip // that hasn't been piggybacked since the last heartbeat gs.flush() @@ -441,6 +439,9 @@ func (gs *GossipSubRouter) heartbeat() { } } } + + // advance the message history window + gs.mcache.Shift() } func (gs *GossipSubRouter) flush() { From 060a9bba62a994fb386e886c654f902376029df2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 11:18:56 +0200 Subject: [PATCH 25/50] mcache test --- mcache_test.go | 165 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 mcache_test.go diff --git a/mcache_test.go b/mcache_test.go new file mode 100644 index 0000000..02d26f7 --- /dev/null +++ b/mcache_test.go @@ -0,0 +1,165 @@ +package floodsub + +import ( + "encoding/binary" + "fmt" + "testing" + + pb "github.com/libp2p/go-floodsub/pb" +) + +func TestMessageCache(t *testing.T) { + mcache := NewMessageCache(3, 5) + + msgs := make([]*pb.Message, 60) + for i := range msgs { + msgs[i] = makeTestMessage(i) + } + + for i := 0; i < 10; i++ { + mcache.Put(msgs[i]) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + m, ok := mcache.Get(mid) + if !ok { + t.Fatal("Message %d not in cache", i) + } + + if m != msgs[i] { + t.Fatal("Message %d does not match cache", i) + } + } + + gids := mcache.GetGossipIDs("test") + if len(gids) != 10 { + t.Fatal("Expected 10 gossip IDs; got %d", len(gids)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + if mid != gids[i] { + t.Fatal("GossipID mismatch for message %d", i) + } + } + + mcache.Shift() + for i := 10; i < 20; i++ { + mcache.Put(msgs[i]) + } + + for i := 0; i < 20; i++ { + mid := msgID(msgs[i]) + m, ok := mcache.Get(mid) + if !ok { + t.Fatal("Message %d not in cache", i) + } + + if m != msgs[i] { + t.Fatal("Message %d does not match cache", i) + } + } + + gids = mcache.GetGossipIDs("test") + if len(gids) != 20 { + t.Fatal("Expected 20 gossip IDs; got %d", len(gids)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + if mid != gids[10+i] { + t.Fatal("GossipID mismatch for message %d", i) + } + } + + for i := 10; i < 20; i++ { + mid := msgID(msgs[i]) + if mid != gids[i-10] { + t.Fatal("GossipID mismatch for message %d", i) + } + } + + mcache.Shift() + for i := 20; i < 30; i++ { + mcache.Put(msgs[i]) + } + + mcache.Shift() + for i := 30; i < 40; i++ { + mcache.Put(msgs[i]) + } + + mcache.Shift() + for i := 40; i < 50; i++ { + mcache.Put(msgs[i]) + } + + mcache.Shift() + for i := 50; i < 60; i++ { + mcache.Put(msgs[i]) + } + + if len(mcache.msgs) != 50 { + t.Fatal("Expected 50 messages in the cache; got %d", len(mcache.msgs)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + _, ok := mcache.Get(mid) + if ok { + t.Fatal("Message %d still in cache", i) + } + } + + for i := 10; i < 60; i++ { + mid := msgID(msgs[i]) + m, ok := mcache.Get(mid) + if !ok { + t.Fatal("Message %d not in cache", i) + } + + if m != msgs[i] { + t.Fatal("Message %d does not match cache", i) + } + } + + gids = mcache.GetGossipIDs("test") + if len(gids) != 30 { + t.Fatal("Expected 30 gossip IDs; got %d", len(gids)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[50+i]) + if mid != gids[i] { + t.Fatal("GossipID mismatch for message %d", i) + } + } + + for i := 10; i < 20; i++ { + mid := msgID(msgs[30+i]) + if mid != gids[i] { + t.Fatal("GossipID mismatch for message %d", i) + } + } + + for i := 20; i < 30; i++ { + mid := msgID(msgs[10+i]) + if mid != gids[i] { + t.Fatal("GossipID mismatch for message %d", i) + } + } + +} + +func makeTestMessage(n int) *pb.Message { + seqno := make([]byte, 8) + binary.BigEndian.PutUint64(seqno, uint64(n)) + data := []byte(fmt.Sprintf("%d", n)) + return &pb.Message{ + Data: data, + TopicIDs: []string{"test"}, + From: []byte("test"), + Seqno: seqno, + } +} From 1f5959bf54374d036d39a770b63b82d4c73adc02 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 12:47:45 +0200 Subject: [PATCH 26/50] fix slice bounds issues; getCount takes care of the slicing --- gossipsub.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 3bce11c..f700f3b 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -203,10 +203,10 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { gmap, ok = gs.fanout[topic] if !ok { // we don't have any, pick some - peers := gs.getPeers(topic, func(peer.ID) bool { return true }) + peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) if len(peers) > 0 { - gmap = peerListToMap(peers[:GossipSubD]) + gmap = peerListToMap(peers) gs.fanout[topic] = gmap } } @@ -238,8 +238,8 @@ func (gs *GossipSubRouter) Join(topic string) { gs.mesh[topic] = gmap delete(gs.fanout, topic) } else { - peers := gs.getPeers(topic, func(peer.ID) bool { return true }) - gmap = peerListToMap(peers[:GossipSubD]) + peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + gmap = peerListToMap(peers) gs.mesh[topic] = gmap } @@ -344,12 +344,12 @@ func (gs *GossipSubRouter) heartbeat() { // do we have enough peers? if len(peers) < GossipSubDlo { ineed := GossipSubD - len(peers) - plst := gs.getPeers(topic, func(p peer.ID) bool { + plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { _, ok := peers[p] return !ok }) - for _, p := range plst[:ineed] { + for _, p := range plst { peers[p] = struct{}{} topics := tograft[p] tograft[p] = append(topics, topic) @@ -375,8 +375,8 @@ func (gs *GossipSubRouter) heartbeat() { continue } - gpeers := gs.getPeers(topic, func(peer.ID) bool { return true }) - for _, p := range gpeers[:GossipSubD] { + gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + for _, p := range gpeers { // skip mesh peers _, ok := peers[p] if !ok { @@ -429,12 +429,12 @@ func (gs *GossipSubRouter) heartbeat() { // do we need more peers? if len(peers) < GossipSubD { ineed := GossipSubD - len(peers) - plst := gs.getPeers(topic, func(p peer.ID) bool { + plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { _, ok := peers[p] return !ok }) - for _, p := range plst[:ineed] { + for _, p := range plst { peers[p] = struct{}{} } } @@ -525,7 +525,7 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control xctl.Prune = append(xctl.Prune, toprune...) } -func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID { +func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID { tmap, ok := gs.p.topics[topic] if !ok { return nil @@ -539,6 +539,11 @@ func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []p } shufflePeers(peers) + + if count > 0 && len(peers) > count { + peers = peers[:count] + } + return peers } From 0a82522cf603132d13a912038ef32b3caf696684 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 13:01:29 +0200 Subject: [PATCH 27/50] basic gossipsub tests --- floodsub_test.go | 10 +- gossipsub_test.go | 239 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 gossipsub_test.go diff --git a/floodsub_test.go b/floodsub_test.go index e7b421f..b0196ca 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -55,8 +55,16 @@ func connect(t *testing.T, a, b host.Host) { } func sparseConnect(t *testing.T, hosts []host.Host) { + connectSome(t, hosts, 3) +} + +func denseConnect(t *testing.T, hosts []host.Host) { + connectSome(t, hosts, 10) +} + +func connectSome(t *testing.T, hosts []host.Host, d int) { for i, a := range hosts { - for j := 0; j < 3; j++ { + for j := 0; j < d; j++ { n := rand.Intn(len(hosts)) if n == i { j-- diff --git a/gossipsub_test.go b/gossipsub_test.go new file mode 100644 index 0000000..4de35ee --- /dev/null +++ b/gossipsub_test.go @@ -0,0 +1,239 @@ +package floodsub + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "testing" + "time" + + host "github.com/libp2p/go-libp2p-host" +) + +func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { + var psubs []*PubSub + for _, h := range hs { + ps, err := NewGossipSub(ctx, h, opts...) + if err != nil { + panic(err) + } + psubs = append(psubs, ps) + } + return psubs +} + +func TestSparseGossipsub(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + sparseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestDenseGossipsub(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestMixedGossipsub(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 30) + + gsubs := getGossipsubs(ctx, hosts[:20]) + fsubs := getPubsubs(ctx, hosts[20:]) + psubs := append(gsubs, fsubs...) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + sparseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubMultihops(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 6) + + psubs := getGossipsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + connect(t, hosts[2], hosts[3]) + connect(t, hosts[3], hosts[4]) + connect(t, hosts[4], hosts[5]) + + var subs []*Subscription + for i := 1; i < 6; i++ { + ch, err := psubs[i].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + subs = append(subs, ch) + } + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + msg := []byte("i like cats") + err := psubs[0].Publish("foobar", msg) + if err != nil { + t.Fatal(err) + } + + // last node in the chain should get the message + select { + case out := <-subs[4].ch: + if !bytes.Equal(out.GetData(), msg) { + t.Fatal("got wrong data") + } + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for message") + } +} + +func TestGossipsubTreeTopology(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 10) + psubs := getGossipsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + connect(t, hosts[1], hosts[4]) + connect(t, hosts[2], hosts[3]) + connect(t, hosts[0], hosts[5]) + connect(t, hosts[5], hosts[6]) + connect(t, hosts[5], hosts[8]) + connect(t, hosts[6], hosts[7]) + connect(t, hosts[8], hosts[9]) + + /* + [0] -> [1] -> [2] -> [3] + | L->[4] + v + [5] -> [6] -> [7] + | + v + [8] -> [9] + */ + + var chs []*Subscription + for _, ps := range psubs { + ch, err := ps.Subscribe("fizzbuzz") + if err != nil { + t.Fatal(err) + } + + chs = append(chs, ch) + } + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + assertPeerLists(t, hosts, psubs[0], 1, 5) + assertPeerLists(t, hosts, psubs[1], 0, 2, 4) + assertPeerLists(t, hosts, psubs[2], 1, 3) + + checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs) +} From 626b04f2a8ed27874fb39c9694213c8121d139f5 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 13:05:26 +0200 Subject: [PATCH 28/50] fix go vet issues in mcache test --- mcache_test.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/mcache_test.go b/mcache_test.go index 02d26f7..6bc526c 100644 --- a/mcache_test.go +++ b/mcache_test.go @@ -24,23 +24,23 @@ func TestMessageCache(t *testing.T) { mid := msgID(msgs[i]) m, ok := mcache.Get(mid) if !ok { - t.Fatal("Message %d not in cache", i) + t.Fatalf("Message %d not in cache", i) } if m != msgs[i] { - t.Fatal("Message %d does not match cache", i) + t.Fatalf("Message %d does not match cache", i) } } gids := mcache.GetGossipIDs("test") if len(gids) != 10 { - t.Fatal("Expected 10 gossip IDs; got %d", len(gids)) + t.Fatalf("Expected 10 gossip IDs; got %d", len(gids)) } for i := 0; i < 10; i++ { mid := msgID(msgs[i]) if mid != gids[i] { - t.Fatal("GossipID mismatch for message %d", i) + t.Fatalf("GossipID mismatch for message %d", i) } } @@ -53,30 +53,30 @@ func TestMessageCache(t *testing.T) { mid := msgID(msgs[i]) m, ok := mcache.Get(mid) if !ok { - t.Fatal("Message %d not in cache", i) + t.Fatalf("Message %d not in cache", i) } if m != msgs[i] { - t.Fatal("Message %d does not match cache", i) + t.Fatalf("Message %d does not match cache", i) } } gids = mcache.GetGossipIDs("test") if len(gids) != 20 { - t.Fatal("Expected 20 gossip IDs; got %d", len(gids)) + t.Fatalf("Expected 20 gossip IDs; got %d", len(gids)) } for i := 0; i < 10; i++ { mid := msgID(msgs[i]) if mid != gids[10+i] { - t.Fatal("GossipID mismatch for message %d", i) + t.Fatalf("GossipID mismatch for message %d", i) } } for i := 10; i < 20; i++ { mid := msgID(msgs[i]) if mid != gids[i-10] { - t.Fatal("GossipID mismatch for message %d", i) + t.Fatalf("GossipID mismatch for message %d", i) } } @@ -101,14 +101,14 @@ func TestMessageCache(t *testing.T) { } if len(mcache.msgs) != 50 { - t.Fatal("Expected 50 messages in the cache; got %d", len(mcache.msgs)) + t.Fatalf("Expected 50 messages in the cache; got %d", len(mcache.msgs)) } for i := 0; i < 10; i++ { mid := msgID(msgs[i]) _, ok := mcache.Get(mid) if ok { - t.Fatal("Message %d still in cache", i) + t.Fatalf("Message %d still in cache", i) } } @@ -116,37 +116,37 @@ func TestMessageCache(t *testing.T) { mid := msgID(msgs[i]) m, ok := mcache.Get(mid) if !ok { - t.Fatal("Message %d not in cache", i) + t.Fatalf("Message %d not in cache", i) } if m != msgs[i] { - t.Fatal("Message %d does not match cache", i) + t.Fatalf("Message %d does not match cache", i) } } gids = mcache.GetGossipIDs("test") if len(gids) != 30 { - t.Fatal("Expected 30 gossip IDs; got %d", len(gids)) + t.Fatalf("Expected 30 gossip IDs; got %d", len(gids)) } for i := 0; i < 10; i++ { mid := msgID(msgs[50+i]) if mid != gids[i] { - t.Fatal("GossipID mismatch for message %d", i) + t.Fatalf("GossipID mismatch for message %d", i) } } for i := 10; i < 20; i++ { mid := msgID(msgs[30+i]) if mid != gids[i] { - t.Fatal("GossipID mismatch for message %d", i) + t.Fatalf("GossipID mismatch for message %d", i) } } for i := 20; i < 30; i++ { mid := msgID(msgs[10+i]) if mid != gids[i] { - t.Fatal("GossipID mismatch for message %d", i) + t.Fatalf("GossipID mismatch for message %d", i) } } From 66fc8adac5c28860edcf57d1e68383d92acc9657 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 13:28:58 +0200 Subject: [PATCH 29/50] more gossipsub tests --- gossipsub_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/gossipsub_test.go b/gossipsub_test.go index 4de35ee..28ae3b3 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -105,6 +105,102 @@ func TestDenseGossipsub(t *testing.T) { } } +func TestGossipsubGossip(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + // wait for hearbeat to have some gossip + time.Sleep(time.Second * 1) + } + + // and wait for some gossip flushing + time.Sleep(time.Second * 2) +} + +func TestGossipsubPrune(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + // disconnect some peers from the mesh to get some PRUNEs + for _, sub := range msgs[:5] { + sub.Cancel() + } + + // wait a bit to take effect + time.Sleep(time.Millisecond * 100) + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs[5:] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 553394944333283fab89200bf7c88a2c46738e79 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 13:53:31 +0200 Subject: [PATCH 30/50] finetune gossip test, add join grafting test --- gossipsub_test.go | 51 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index 28ae3b3..00b9184 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -127,7 +127,7 @@ func TestGossipsubGossip(t *testing.T) { // wait for heartbeats to build mesh time.Sleep(time.Second * 2) - for i := 0; i < 10; i++ { + for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) owner := rand.Intn(len(psubs)) @@ -144,8 +144,8 @@ func TestGossipsubGossip(t *testing.T) { } } - // wait for hearbeat to have some gossip - time.Sleep(time.Second * 1) + // wait a bit to have some gossip interleaved + time.Sleep(time.Millisecond * 100) } // and wait for some gossip flushing @@ -201,6 +201,51 @@ func TestGossipsubPrune(t *testing.T) { } } +func TestGossipsubGraft(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + sparseConnect(t, hosts) + + time.Sleep(time.Second * 1) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + + // wait for announce to propagate + time.Sleep(time.Millisecond * 100) + } + + time.Sleep(time.Second * 1) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 75787fb809ef9ec2895ee149d1970d1ba952dfd1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 15:25:30 +0200 Subject: [PATCH 31/50] moar gossipsub tests --- gossipsub_test.go | 189 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/gossipsub_test.go b/gossipsub_test.go index 00b9184..09bc8d0 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -105,6 +105,78 @@ func TestDenseGossipsub(t *testing.T) { } } +func TestGossipsubFanout(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + // wait for heartbeat to exercise fanout maintainance + time.Sleep(time.Second * 2) + + // subscribe the owner + subch, err := psubs[0].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + msgs = append(msgs, subch) + + // wait for a heartbeat + time.Sleep(time.Second * 1) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestGossipsubGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -152,6 +224,74 @@ func TestGossipsubGossip(t *testing.T) { time.Sleep(time.Second * 2) } +func TestGossipsubGossipPiggyback(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + var xmsgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("bazcrux") + if err != nil { + t.Fatal(err) + } + + xmsgs = append(xmsgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + psubs[owner].Publish("bazcrux", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + for _, sub := range xmsgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + // wait a bit to have some gossip interleaved + time.Sleep(time.Millisecond * 100) + } + + // and wait for some gossip flushing + time.Sleep(time.Second * 2) +} + func TestGossipsubPrune(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -246,6 +386,55 @@ func TestGossipsubGraft(t *testing.T) { } } +func TestGossipsubRemovePeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + // disconnect some peers to exercise RemovePeer paths + for _, host := range hosts[:5] { + host.Close() + } + + // wait a heartbeat + time.Sleep(time.Second * 1) + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 5 + rand.Intn(len(psubs)-5) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs[5:] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 3ecfbc2a6042f7a1cae43ea006ee01a5262bf087 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 15:52:37 +0200 Subject: [PATCH 32/50] better test for fanout maintenance --- gossipsub_test.go | 83 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 3 deletions(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index 09bc8d0..4046d5f 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -145,9 +145,6 @@ func TestGossipsubFanout(t *testing.T) { } } - // wait for heartbeat to exercise fanout maintainance - time.Sleep(time.Second * 2) - // subscribe the owner subch, err := psubs[0].Subscribe("foobar") if err != nil { @@ -177,6 +174,86 @@ func TestGossipsubFanout(t *testing.T) { } } +func TestGossipsubFanoutMaintenance(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + // unsubscribe all peers to exercise fanout maintenance + for _, sub := range msgs { + sub.Cancel() + } + msgs = nil + + // wait for heartbeats + time.Sleep(time.Second * 2) + + // resubscribe and repeat + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestGossipsubGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 4667b0ae940fd54c7fdabc6a813cfabcac63c1fc Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 16:34:25 +0200 Subject: [PATCH 33/50] fanout sources should emit gossip too --- gossipsub.go | 84 ++++++++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index f700f3b..6835147 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -331,6 +331,7 @@ func (gs *GossipSubRouter) heartbeat() { tograft := make(map[peer.ID][]string) toprune := make(map[peer.ID][]string) + // maintain the mesh for topics we have joined for topic, peers := range gs.mesh { // check whether our peers are still in the topic @@ -369,23 +370,36 @@ func (gs *GossipSubRouter) heartbeat() { } } - // schedule gossip - mids := gs.mcache.GetGossipIDs(topic) - if len(mids) == 0 { - continue - } - - gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) - for _, p := range gpeers { - // skip mesh peers - _, ok := peers[p] - if !ok { - gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) - } - } + gs.emitGossip(topic, peers) } - // send coalesced GRAFT/PRUNE messages + // maintain our fanout for topics we are publishing but we have not joined + for topic, peers := range gs.fanout { + // check whether our peers are still in the topic + for p := range peers { + _, ok := gs.p.topics[topic][p] + if !ok { + delete(peers, p) + } + } + + // do we need more peers? + if len(peers) < GossipSubD { + ineed := GossipSubD - len(peers) + plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { + _, ok := peers[p] + return !ok + }) + + for _, p := range plst { + peers[p] = struct{}{} + } + } + + gs.emitGossip(topic, peers) + } + + // send coalesced GRAFT/PRUNE messages (will piggyback gossip) for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics)) for _, topic := range topics { @@ -416,34 +430,26 @@ func (gs *GossipSubRouter) heartbeat() { gs.sendRPC(p, out) } - // maintain our fanout for topics we are publishing but we have not joined - for topic, peers := range gs.fanout { - // check whether our peers are still in the topic - for p := range peers { - _, ok := gs.p.topics[topic][p] - if !ok { - delete(peers, p) - } - } - - // do we need more peers? - if len(peers) < GossipSubD { - ineed := GossipSubD - len(peers) - plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - _, ok := peers[p] - return !ok - }) - - for _, p := range plst { - peers[p] = struct{}{} - } - } - } - // advance the message history window gs.mcache.Shift() } +func (gs *GossipSubRouter) emitGossip(topic string, peers map[peer.ID]struct{}) { + mids := gs.mcache.GetGossipIDs(topic) + if len(mids) == 0 { + return + } + + gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + for _, p := range gpeers { + // skip mesh peers + _, ok := peers[p] + if !ok { + gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) + } + } +} + func (gs *GossipSubRouter) flush() { // send gossip first, which will also piggyback control for p, ihave := range gs.gossip { From e8c5cf0914657b472778d41e55a6960b1f57f3c5 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 17:33:53 +0200 Subject: [PATCH 34/50] test gossip propagation with IHAVE/IWANT cycle --- gossipsub_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/gossipsub_test.go b/gossipsub_test.go index 4046d5f..e21984d 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -369,6 +369,87 @@ func TestGossipsubGossipPiggyback(t *testing.T) { time.Sleep(time.Second * 2) } +func TestGossipsubGossipPropagation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 20) + psubs := getGossipsubs(ctx, hosts) + + hosts1 := hosts[:GossipSubD+1] + hosts2 := append(hosts[GossipSubD+1:], hosts[0]) + + denseConnect(t, hosts1) + denseConnect(t, hosts2) + + var msgs1 []*Subscription + for _, ps := range psubs[1 : GossipSubD+1] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs1 = append(msgs1, subch) + } + + time.Sleep(time.Second * 1) + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs1 { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + time.Sleep(time.Millisecond * 100) + + var msgs2 []*Subscription + for _, ps := range psubs[GossipSubD+1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs2 = append(msgs2, subch) + } + + var collect [][]byte + for i := 0; i < 10; i++ { + for _, sub := range msgs2 { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + collect = append(collect, got.Data) + } + } + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + gotit := false + for j := 0; j < len(collect); j++ { + if bytes.Equal(msg, collect[j]) { + gotit = true + break + } + } + if !gotit { + t.Fatalf("Didn't get message %s", string(msg)) + } + } +} + func TestGossipsubPrune(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From ef730627adedd8c55c29f22402256d5326246085 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 19:03:12 +0200 Subject: [PATCH 35/50] remove unnecessary and potentially harmful check from heartbeat - the check is unnecessary because peers emit PRUNE on Leave - the check is harmful, because the ANNOUNCE message might have benn lost (or reordered after the GRAFT depending on the retry strategy), which would leave the mesh in an inconsistent state. --- gossipsub.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 6835147..8793977 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -334,14 +334,6 @@ func (gs *GossipSubRouter) heartbeat() { // maintain the mesh for topics we have joined for topic, peers := range gs.mesh { - // check whether our peers are still in the topic - for p := range peers { - _, ok := gs.p.topics[topic][p] - if !ok { - delete(peers, p) - } - } - // do we have enough peers? if len(peers) < GossipSubDlo { ineed := GossipSubD - len(peers) From 285c1f0aa74307696afb3eda92e1165d718b7ca0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 19:14:07 +0200 Subject: [PATCH 36/50] add test for graft/prune coalescing in heartbeat --- gossipsub_test.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/gossipsub_test.go b/gossipsub_test.go index e21984d..2a86384 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -593,6 +593,54 @@ func TestGossipsubRemovePeer(t *testing.T) { } } +func TestGossipsubGraftPruneCoalesce(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 20) + psubs := getGossipsubs(ctx, hosts) + denseConnect(t, hosts) + + var topics []string + var msgs [][]*Subscription + for i := 0; i < 100; i++ { + topic := fmt.Sprintf("topic%d", i) + + var subs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + subs = append(subs, subch) + } + + msgs = append(msgs, subs) + } + + // wait for heartbeats to build meshes + time.Sleep(time.Second * 2) + + for i, topic := range topics { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish(topic, msg) + + for _, sub := range msgs[i] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From af061f5040a8e0dd4ba172bb08bc35b2c2836e8b Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 22 Feb 2018 10:18:48 +0200 Subject: [PATCH 37/50] refactor sendGraftPrune out of heartbeat --- gossipsub.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 8793977..b50de38 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -338,6 +338,7 @@ func (gs *GossipSubRouter) heartbeat() { if len(peers) < GossipSubDlo { ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { + // filter our current peers _, ok := peers[p] return !ok }) @@ -379,6 +380,7 @@ func (gs *GossipSubRouter) heartbeat() { if len(peers) < GossipSubD { ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { + // filter our current peers _, ok := peers[p] return !ok }) @@ -392,6 +394,13 @@ func (gs *GossipSubRouter) heartbeat() { } // send coalesced GRAFT/PRUNE messages (will piggyback gossip) + gs.sendGraftPrune(tograft, toprune) + + // advance the message history window + gs.mcache.Shift() +} + +func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) { for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics)) for _, topic := range topics { @@ -422,8 +431,6 @@ func (gs *GossipSubRouter) heartbeat() { gs.sendRPC(p, out) } - // advance the message history window - gs.mcache.Shift() } func (gs *GossipSubRouter) emitGossip(topic string, peers map[peer.ID]struct{}) { From bfb06645818342237a44e58588469b1d99101d7a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 22 Feb 2018 10:58:58 +0200 Subject: [PATCH 38/50] retry dropped ANNOUNCE messages they are very important for correct topic state in the protocol, esp gossipsub --- pubsub.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pubsub.go b/pubsub.go index 3d0b3e8..f739c3a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "fmt" + "math/rand" "sync/atomic" "time" @@ -330,9 +331,18 @@ func (p *PubSub) announce(topic string, sub bool) { select { case peer <- out: default: - // TODO this needs to be reliable, schedule it for piggybacking - // in a subsequent message or retry later - log.Infof("dropping announce message to peer %s: queue full", pid) + log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + go p.announceRetry(topic, sub) + } + } +} + +func (p *PubSub) announceRetry(topic string, sub bool) { + time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) + p.eval <- func() { + _, ok := p.myTopics[topic] + if (ok && sub) || (!ok && !sub) { + p.announce(topic, sub) } } } From 009efebdafd21b63f284f70691b1cc39ff75e8bd Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 22 Feb 2018 11:01:14 +0200 Subject: [PATCH 39/50] harden piggybackControl don't create a control object if the graft/prune are stale and only assign the relevant fields. --- gossipsub.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index b50de38..5ff4386 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -520,14 +520,22 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control } } + if len(tograft) == 0 && len(toprune) == 0 { + return + } + xctl := out.Control if xctl == nil { xctl = &pb.ControlMessage{} out.Control = xctl } - xctl.Graft = append(xctl.Graft, tograft...) - xctl.Prune = append(xctl.Prune, toprune...) + if len(tograft) > 0 { + xctl.Graft = append(xctl.Graft, tograft...) + } + if len(toprune) > 0 { + xctl.Prune = append(xctl.Prune, toprune...) + } } func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID { From f5d6cf3bd1f1afa906cd573ac5190ae7c2984721 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 22 Feb 2018 11:04:04 +0200 Subject: [PATCH 40/50] TestGossipsubGraftPruneCoalesce is TestGossipsubGraftPruneRetry it is really testing full queues (sized 32) and retries of control messages and announces --- gossipsub_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index 2a86384..a645b7d 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -593,7 +593,7 @@ func TestGossipsubRemovePeer(t *testing.T) { } } -func TestGossipsubGraftPruneCoalesce(t *testing.T) { +func TestGossipsubGraftPruneRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -603,8 +603,9 @@ func TestGossipsubGraftPruneCoalesce(t *testing.T) { var topics []string var msgs [][]*Subscription - for i := 0; i < 100; i++ { + for i := 0; i < 50; i++ { topic := fmt.Sprintf("topic%d", i) + topics = append(topics, topic) var subs []*Subscription for _, ps := range psubs { @@ -615,12 +616,11 @@ func TestGossipsubGraftPruneCoalesce(t *testing.T) { subs = append(subs, subch) } - msgs = append(msgs, subs) } // wait for heartbeats to build meshes - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 5) for i, topic := range topics { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) From a71eec5c3a1e2fda5bd9751b7092298f2161a40d Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 22 Feb 2018 12:10:49 +0200 Subject: [PATCH 41/50] test control message retry piggybacking --- gossipsub_test.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/gossipsub_test.go b/gossipsub_test.go index a645b7d..f307223 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -641,6 +641,88 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { } } +func TestGossipsubControlPiggyback(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 20) + psubs := getGossipsubs(ctx, hosts) + denseConnect(t, hosts) + + for _, ps := range psubs { + subch, err := ps.Subscribe("flood") + if err != nil { + t.Fatal(err) + } + go func(sub *Subscription) { + for { + _, err := sub.Next(ctx) + if err != nil { + break + } + } + }(subch) + } + + time.Sleep(time.Second * 1) + + // create a background flood of messages that overloads the queues + done := make(chan struct{}) + go func() { + owner := rand.Intn(len(psubs)) + for i := 0; i < 1000; i++ { + msg := []byte("background flooooood") + psubs[owner].Publish("flood", msg) + } + done <- struct{}{} + }() + + time.Sleep(time.Millisecond * 20) + + // and subscribe to a bunch of topics in the meantime -- this should + // result in some dropped control messages, with subsequent piggybacking + // in the background flood + var topics []string + var msgs [][]*Subscription + for i := 0; i < 5; i++ { + topic := fmt.Sprintf("topic%d", i) + topics = append(topics, topic) + + var subs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + subs = append(subs, subch) + } + msgs = append(msgs, subs) + } + + // wait for the flood to stop + <-done + + // and test that we have functional overlays + for i, topic := range topics { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish(topic, msg) + + for _, sub := range msgs[i] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 0824316326f6cb82dc54eb83fd92d023a1f8f1d4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 22 Feb 2018 12:29:16 +0200 Subject: [PATCH 42/50] finetune GraftPruneRetry test, so that it doesn't get OOM killed --- gossipsub_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index f307223..e3c2d91 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -603,7 +603,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { var topics []string var msgs [][]*Subscription - for i := 0; i < 50; i++ { + for i := 0; i < 35; i++ { topic := fmt.Sprintf("topic%d", i) topics = append(topics, topic) From 2544ae7df940643d70866a2b2865a232759c2d97 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 6 Mar 2018 10:02:19 +0200 Subject: [PATCH 43/50] announce retry should check the pubsub context for cancellation --- pubsub.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pubsub.go b/pubsub.go index f739c3a..3ff2f7a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -339,12 +339,18 @@ func (p *PubSub) announce(topic string, sub bool) { func (p *PubSub) announceRetry(topic string, sub bool) { time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) - p.eval <- func() { + + retry := func() { _, ok := p.myTopics[topic] if (ok && sub) || (!ok && !sub) { p.announce(topic, sub) } } + + select { + case p.eval <- retry: + case <-p.ctx.Done(): + } } // notifySubs sends a given message to all corresponding subscribers. From a39184a0af404da56d2ae36f3032cce52cdc1701 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 6 Mar 2018 10:14:53 +0200 Subject: [PATCH 44/50] smaller net sizes for tests that exercise full queues so that travis doesn't get killed by OOM. --- gossipsub_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index e3c2d91..794ea5f 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -597,7 +597,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getNetHosts(t, ctx, 10) psubs := getGossipsubs(ctx, hosts) denseConnect(t, hosts) @@ -645,7 +645,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getNetHosts(t, ctx, 10) psubs := getGossipsubs(ctx, hosts) denseConnect(t, hosts) From c57d256a22758cfa2c1801f1bcc57e8fe262daae Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 6 Mar 2018 12:11:41 +0200 Subject: [PATCH 45/50] increase the flood length in TestGossipsubControlPiggyback --- gossipsub_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index 794ea5f..83a37d1 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -670,7 +670,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { done := make(chan struct{}) go func() { owner := rand.Intn(len(psubs)) - for i := 0; i < 1000; i++ { + for i := 0; i < 10000; i++ { msg := []byte("background flooooood") psubs[owner].Publish("flood", msg) } From e8a91d330affbfd4c1b8a283a3ac7df656e2c3b9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 10 Mar 2018 10:08:50 +0200 Subject: [PATCH 46/50] document PubSubRouter interface --- pubsub.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pubsub.go b/pubsub.go index 3ff2f7a..a6394fc 100644 --- a/pubsub.go +++ b/pubsub.go @@ -94,13 +94,25 @@ type PubSub struct { // PubSubRouter is the message router component of PubSub type PubSubRouter interface { + // Protocols returns the list of protocols supported by the router. Protocols() []protocol.ID + // Attach is invoked by the PubSub constructor to attach the router to a + // freshly initialized PubSub instance. Attach(*PubSub) + // AddPeer notifies the router that a new peer has been connected. AddPeer(peer.ID, protocol.ID) + // RemovePeer notifies the router that a peer has been disconnected. RemovePeer(peer.ID) + // HandleRPC is invoked to process control messages in the RPC envelope. + // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) + // Publish is invoked to forward a new message that has been validated. Publish(peer.ID, *pb.Message) + // Join notifies the router that we want to receive and forward messages in a topic. + // It is invoked after the subscription announcement. Join(topic string) + // Leave notifies the router that we are no longer interested in a topic. + // It is invoked after the unsubscription announcement. Leave(topic string) } From d6dfe83ebe42daee7f77c002d4335744cb8ae330 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 10 Mar 2018 10:14:21 +0200 Subject: [PATCH 47/50] refactor nextSeqno out of Publish --- pubsub.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pubsub.go b/pubsub.go index a6394fc..2e33315 100644 --- a/pubsub.go +++ b/pubsub.go @@ -597,10 +597,7 @@ func (p *PubSub) GetTopics() []string { // Publish publishes data under the given topic func (p *PubSub) Publish(topic string, data []byte) error { - seqno := make([]byte, 8) - counter := atomic.AddUint64(&p.counter, 1) - binary.BigEndian.PutUint64(seqno, counter) - + seqno := p.nextSeqno() p.publish <- &Message{ &pb.Message{ Data: data, @@ -612,6 +609,13 @@ func (p *PubSub) Publish(topic string, data []byte) error { return nil } +func (p *PubSub) nextSeqno() []byte { + seqno := make([]byte, 8) + counter := atomic.AddUint64(&p.counter, 1) + binary.BigEndian.PutUint64(seqno, counter) + return seqno +} + type listPeerReq struct { resp chan []peer.ID topic string From b490d117f2bf19456451913d7ff6e48ddc174635 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 10 Mar 2018 10:23:55 +0200 Subject: [PATCH 48/50] make heartbeat interval a parameter, turn all gossipsub parameters into variables --- gossipsub.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/gossipsub.go b/gossipsub.go index 5ff4386..66d1734 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -14,7 +14,9 @@ import ( const ( GossipSubID = protocol.ID("/meshsub/1.0.0") +) +var ( // overlay parameters GossipSubD = 6 GossipSubDlo = 4 @@ -23,6 +25,9 @@ const ( // gossip parameters GossipSubHistoryLength = 5 GossipSubHistoryGossip = 3 + + // heartbeat interval + GossipSubHeartbeatInterval = 1 * time.Second ) func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { @@ -306,7 +311,7 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { } func (gs *GossipSubRouter) heartbeatTimer() { - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(GossipSubHeartbeatInterval) defer ticker.Stop() for { From 1dc8405449535d8629f3a123eda819dc58255222 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 10 Mar 2018 11:21:50 +0200 Subject: [PATCH 49/50] more docs for gossipsub router, expire fanout peers when we haven't published in a while --- gossipsub.go | 24 +++++++++++++++++++++ gossipsub_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index 66d1734..2648530 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -28,13 +28,18 @@ var ( // heartbeat interval GossipSubHeartbeatInterval = 1 * time.Second + + // fanout ttl + GossipSubFanoutTTL = 60 * time.Second ) +// NewGossipSub returns a new PubSub object using GossipSubRouter as the router func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { rt := &GossipSubRouter{ peers: make(map[peer.ID]protocol.ID), mesh: make(map[string]map[peer.ID]struct{}), fanout: make(map[string]map[peer.ID]struct{}), + lastpub: make(map[string]int64), gossip: make(map[peer.ID][]*pb.ControlIHave), control: make(map[peer.ID]*pb.ControlMessage), mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), @@ -42,11 +47,19 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er return NewPubSub(ctx, h, rt, opts...) } +// GossipSubRouter is a router that implements the gossipsub protocol. +// For each topic we have joined, we maintain an overlay through which +// messages flow; this is the mesh map. +// For each topic we publish to without joining, we maintain a list of peers +// to use for injecting our messages in the overlay with stable routes; this +// is the fanout map. Fanout peer lists are expired if we don't publish any +// messages to their topic for GossipSubFanoutTTL. type GossipSubRouter struct { p *PubSub peers map[peer.ID]protocol.ID // peer protocols mesh map[string]map[peer.ID]struct{} // topic meshes fanout map[string]map[peer.ID]struct{} // topic fanout + lastpub map[string]int64 // last pubish time for fanout topics gossip map[peer.ID][]*pb.ControlIHave // pending gossip control map[peer.ID]*pb.ControlMessage // pending control messages mcache *MessageCache @@ -215,6 +228,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { gs.fanout[topic] = gmap } } + gs.lastpub[topic] = time.Now().UnixNano() } for p := range gmap { @@ -242,6 +256,7 @@ func (gs *GossipSubRouter) Join(topic string) { if ok { gs.mesh[topic] = gmap delete(gs.fanout, topic) + delete(gs.lastpub, topic) } else { peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) gmap = peerListToMap(peers) @@ -371,6 +386,15 @@ func (gs *GossipSubRouter) heartbeat() { gs.emitGossip(topic, peers) } + // expire fanout for topics we haven't published to in a while + now := time.Now().UnixNano() + for topic, lastpub := range gs.lastpub { + if lastpub+int64(GossipSubFanoutTTL) < now { + delete(gs.fanout, topic) + delete(gs.lastpub, topic) + } + } + // maintain our fanout for topics we are publishing but we have not joined for topic, peers := range gs.fanout { // check whether our peers are still in the topic diff --git a/gossipsub_test.go b/gossipsub_test.go index 83a37d1..8dae7ff 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -254,6 +254,61 @@ func TestGossipsubFanoutMaintenance(t *testing.T) { } } +func TestGossipsubFanoutExpiry(t *testing.T) { + GossipSubFanoutTTL = 1 * time.Second + defer func() { GossipSubFanoutTTL = 60 * time.Second }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 10) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 5; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 { + t.Fatal("owner has no fanout") + } + + // wait for TTL to expire fanout peers in owner + time.Sleep(time.Second * 2) + + if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 { + t.Fatal("fanout hasn't expired") + } +} + func TestGossipsubGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 1b4fbb865d4944a602b27e24e8123a57095e9987 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 5 May 2018 09:48:28 +0300 Subject: [PATCH 50/50] fix NewPubsub docstring --- pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub.go b/pubsub.go index 2e33315..4d5415a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -133,7 +133,7 @@ type RPC struct { type Option func(*PubSub) error -// NewFloodSub returns a new PubSub management object +// NewPubSub returns a new PubSub management object func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) { ps := &PubSub{ host: h,