From 375c4176b911cdbf67e78e9111f0268cf4cf07fc Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 16:13:18 +0200 Subject: [PATCH] gossipsub publish --- gossipsub.go | 141 ++++++++++++++++++++++++++++++++++++++++++--------- mcache.go | 16 ++++++ 2 files changed, 134 insertions(+), 23 deletions(-) create mode 100644 mcache.go diff --git a/gossipsub.go b/gossipsub.go index d8dae5c..0d63f92 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -13,51 +13,142 @@ import ( const ( GossipSubID = protocol.ID("/meshsub/1.0.0") + + // overlay parameters + GossipSubD = 6 + GossipSubDlo = 4 + GossipSubDhi = 12 ) func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { - rt := &GossipSubRouter{} + rt := &GossipSubRouter{ + peers: make(map[peer.ID]protocol.ID), + mesh: make(map[string]map[peer.ID]struct{}), + fanout: make(map[string]map[peer.ID]struct{}), + mcache: NewMessageCache(5), + } return NewPubSub(ctx, h, rt, opts...) } type GossipSubRouter struct { - p *PubSub + p *PubSub + peers map[peer.ID]protocol.ID // peer protocols + mesh map[string]map[peer.ID]struct{} // topic meshes + fanout map[string]map[peer.ID]struct{} // topic fanout + mcache *MessageCache } -func (fs *GossipSubRouter) Protocols() []protocol.ID { +func (gs *GossipSubRouter) Protocols() []protocol.ID { return []protocol.ID{GossipSubID, FloodSubID} } -func (fs *GossipSubRouter) Attach(p *PubSub) { - fs.p = p - go fs.heartbeatTimer() +func (gs *GossipSubRouter) Attach(p *PubSub) { + gs.p = p + go gs.heartbeatTimer() } -func (fs *GossipSubRouter) AddPeer(peer.ID, protocol.ID) { - +func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { + gs.peers[p] = proto } -func (fs *GossipSubRouter) RemovePeer(peer.ID) { - +func (gs *GossipSubRouter) RemovePeer(p peer.ID) { + delete(gs.peers, p) + for _, peers := range gs.mesh { + delete(peers, p) + } + for _, peers := range gs.fanout { + delete(peers, p) + } } -func (fs *GossipSubRouter) HandleRPC(rpc *RPC) { - +func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { + // TODO } -func (fs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { +func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { + gs.mcache.Add(msg) + tosend := make(map[peer.ID]struct{}) + for _, topic := range msg.GetTopicIDs() { + // any peers in the topic? + tmap, ok := gs.p.topics[topic] + if !ok { + continue + } + + // floodsub peers + for p := range tmap { + if gs.peers[p] == FloodSubID { + tosend[p] = struct{}{} + } + } + + // gossipsub peers + gmap, ok := gs.mesh[topic] + if ok { + // direct peers in the mesh for topic + for p := range gmap { + tosend[p] = struct{}{} + } + } else { + // fanout peers, we are not in the mesh for topic + gmap, ok = gs.fanout[topic] + if !ok { + // we don't have any yet, pick some + var peers []peer.ID + for p := range tmap { + if gs.peers[p] == GossipSubID { + peers = append(peers, p) + } + } + + if len(peers) > 0 { + gmap = make(map[peer.ID]struct{}) + + shufflePeers(peers) + for _, p := range peers[:GossipSubD] { + gmap[p] = struct{}{} + } + + gs.fanout[topic] = gmap + } + } + } + + for p := range gmap { + tosend[p] = struct{}{} + } + } + + out := rpcWithMessages(msg) + for pid := range tosend { + if pid == from || pid == peer.ID(msg.GetFrom()) { + continue + } + + mch, ok := gs.p.peers[pid] + if !ok { + continue + } + + select { + case mch <- out: + default: + log.Infof("dropping message to peer %s: queue full", pid) + // Drop it. The peer is too slow. + } + } } -func (fs *GossipSubRouter) Join(topic string) { - +func (gs *GossipSubRouter) Join(topic string) { + // TODO } -func (fs *GossipSubRouter) Leave(topic string) { - +func (gs *GossipSubRouter) Leave(topic string) { + // TODO } -func (fs *GossipSubRouter) heartbeatTimer() { +func (gs *GossipSubRouter) heartbeatTimer() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -65,16 +156,20 @@ func (fs *GossipSubRouter) heartbeatTimer() { select { case <-ticker.C: select { - case fs.p.eval <- fs.heartbeat: - case <-fs.p.ctx.Done(): + case gs.p.eval <- gs.heartbeat: + case <-gs.p.ctx.Done(): return } - case <-fs.p.ctx.Done(): + case <-gs.p.ctx.Done(): return } } } -func (fs *GossipSubRouter) heartbeat() { - +func (gs *GossipSubRouter) heartbeat() { + // TODO +} + +func shufflePeers(peers []peer.ID) { + // TODO } diff --git a/mcache.go b/mcache.go new file mode 100644 index 0000000..47e869e --- /dev/null +++ b/mcache.go @@ -0,0 +1,16 @@ +package floodsub + +import ( + pb "github.com/libp2p/go-floodsub/pb" +) + +func NewMessageCache(win int) *MessageCache { + return &MessageCache{} +} + +type MessageCache struct { +} + +func (mc *MessageCache) Add(msg *pb.Message) { + // TODO +}