diff --git a/comm.go b/comm.go index 430e6a6..182e992 100644 --- a/comm.go +++ b/comm.go @@ -104,3 +104,21 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { func rpcWithMessages(msgs ...*pb.Message) *RPC { return &RPC{RPC: pb.RPC{Publish: msgs}} } + +func rpcWithControl(msgs []*pb.Message, + ihave []*pb.ControlIHave, + iwant []*pb.ControlIWant, + graft []*pb.ControlGraft, + prune []*pb.ControlPrune) *RPC { + return &RPC{ + RPC: pb.RPC{ + Publish: msgs, + Control: &pb.ControlMessage{ + Ihave: ihave, + Iwant: iwant, + Graft: graft, + Prune: prune, + }, + }, + } +} diff --git a/floodsub.go b/floodsub.go index 65d640a..1b2eb08 100644 --- a/floodsub.go +++ b/floodsub.go @@ -78,3 +78,7 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { } } } + +func (fs *FloodSubRouter) Join(topic string) {} + +func (fs *FloodSubRouter) Leave(topic string) {} diff --git a/floodsub_test.go b/floodsub_test.go index e7b421f..b0196ca 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -55,8 +55,16 @@ func connect(t *testing.T, a, b host.Host) { } func sparseConnect(t *testing.T, hosts []host.Host) { + connectSome(t, hosts, 3) +} + +func denseConnect(t *testing.T, hosts []host.Host) { + connectSome(t, hosts, 10) +} + +func connectSome(t *testing.T, hosts []host.Host, d int) { for i, a := range hosts { - for j := 0; j < 3; j++ { + for j := 0; j < d; j++ { n := rand.Intn(len(hosts)) if n == i { j-- diff --git a/gossipsub.go b/gossipsub.go new file mode 100644 index 0000000..2648530 --- /dev/null +++ b/gossipsub.go @@ -0,0 +1,613 @@ +package floodsub + +import ( + "context" + "math/rand" + "time" + + pb "github.com/libp2p/go-floodsub/pb" + + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + protocol "github.com/libp2p/go-libp2p-protocol" +) + +const ( + GossipSubID = protocol.ID("/meshsub/1.0.0") +) + +var ( + // overlay parameters + GossipSubD = 6 + GossipSubDlo = 4 + GossipSubDhi = 12 + + // gossip parameters + GossipSubHistoryLength = 5 + GossipSubHistoryGossip = 3 + + // heartbeat interval + GossipSubHeartbeatInterval = 1 * time.Second + + // fanout ttl + GossipSubFanoutTTL = 60 * time.Second +) + +// NewGossipSub returns a new PubSub object using GossipSubRouter as the router +func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { + rt := &GossipSubRouter{ + peers: make(map[peer.ID]protocol.ID), + mesh: make(map[string]map[peer.ID]struct{}), + fanout: make(map[string]map[peer.ID]struct{}), + lastpub: make(map[string]int64), + gossip: make(map[peer.ID][]*pb.ControlIHave), + control: make(map[peer.ID]*pb.ControlMessage), + mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), + } + return NewPubSub(ctx, h, rt, opts...) +} + +// GossipSubRouter is a router that implements the gossipsub protocol. +// For each topic we have joined, we maintain an overlay through which +// messages flow; this is the mesh map. +// For each topic we publish to without joining, we maintain a list of peers +// to use for injecting our messages in the overlay with stable routes; this +// is the fanout map. Fanout peer lists are expired if we don't publish any +// messages to their topic for GossipSubFanoutTTL. +type GossipSubRouter struct { + p *PubSub + peers map[peer.ID]protocol.ID // peer protocols + mesh map[string]map[peer.ID]struct{} // topic meshes + fanout map[string]map[peer.ID]struct{} // topic fanout + lastpub map[string]int64 // last pubish time for fanout topics + gossip map[peer.ID][]*pb.ControlIHave // pending gossip + control map[peer.ID]*pb.ControlMessage // pending control messages + mcache *MessageCache +} + +func (gs *GossipSubRouter) Protocols() []protocol.ID { + return []protocol.ID{GossipSubID, FloodSubID} +} + +func (gs *GossipSubRouter) Attach(p *PubSub) { + gs.p = p + go gs.heartbeatTimer() +} + +func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { + gs.peers[p] = proto +} + +func (gs *GossipSubRouter) RemovePeer(p peer.ID) { + delete(gs.peers, p) + for _, peers := range gs.mesh { + delete(peers, p) + } + for _, peers := range gs.fanout { + delete(peers, p) + } + delete(gs.gossip, p) + delete(gs.control, p) +} + +func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { + ctl := rpc.GetControl() + if ctl == nil { + return + } + + iwant := gs.handleIHave(ctl) + ihave := gs.handleIWant(ctl) + prune := gs.handleGraft(rpc.from, ctl) + gs.handlePrune(rpc.from, ctl) + + if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 { + return + } + + out := rpcWithControl(ihave, nil, iwant, nil, prune) + gs.sendRPC(rpc.from, out) +} + +func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWant { + iwant := make(map[string]struct{}) + + for _, ihave := range ctl.GetIhave() { + topic := ihave.GetTopicID() + _, ok := gs.mesh[topic] + if !ok { + continue + } + + for _, mid := range ihave.GetMessageIDs() { + if gs.p.seenMessage(mid) { + continue + } + iwant[mid] = struct{}{} + } + } + + if len(iwant) == 0 { + return nil + } + + iwantlst := make([]string, 0, len(iwant)) + for mid := range iwant { + iwantlst = append(iwantlst, mid) + } + + return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}} +} + +func (gs *GossipSubRouter) handleIWant(ctl *pb.ControlMessage) []*pb.Message { + ihave := make(map[string]*pb.Message) + for _, iwant := range ctl.GetIwant() { + for _, mid := range iwant.GetMessageIDs() { + msg, ok := gs.mcache.Get(mid) + if ok { + ihave[mid] = msg + } + } + } + + if len(ihave) == 0 { + return nil + } + + msgs := make([]*pb.Message, 0, len(ihave)) + for _, msg := range ihave { + msgs = append(msgs, msg) + } + + return msgs +} + +func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune { + var prune []string + for _, graft := range ctl.GetGraft() { + topic := graft.GetTopicID() + peers, ok := gs.mesh[topic] + if !ok { + prune = append(prune, topic) + } else { + peers[p] = struct{}{} + } + } + + if len(prune) == 0 { + return nil + } + + cprune := make([]*pb.ControlPrune, 0, len(prune)) + for _, topic := range prune { + cprune = append(cprune, &pb.ControlPrune{TopicID: &topic}) + } + + return cprune +} + +func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { + for _, prune := range ctl.GetPrune() { + topic := prune.GetTopicID() + peers, ok := gs.mesh[topic] + if ok { + delete(peers, p) + } + } +} + +func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { + gs.mcache.Put(msg) + + tosend := make(map[peer.ID]struct{}) + for _, topic := range msg.GetTopicIDs() { + // any peers in the topic? + tmap, ok := gs.p.topics[topic] + if !ok { + continue + } + + // floodsub peers + for p := range tmap { + if gs.peers[p] == FloodSubID { + tosend[p] = struct{}{} + } + } + + // gossipsub peers + gmap, ok := gs.mesh[topic] + if !ok { + // we are not in the mesh for topic, use fanout peers + gmap, ok = gs.fanout[topic] + if !ok { + // we don't have any, pick some + peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + + if len(peers) > 0 { + gmap = peerListToMap(peers) + gs.fanout[topic] = gmap + } + } + gs.lastpub[topic] = time.Now().UnixNano() + } + + for p := range gmap { + tosend[p] = struct{}{} + } + } + + out := rpcWithMessages(msg) + for pid := range tosend { + if pid == from || pid == peer.ID(msg.GetFrom()) { + continue + } + + gs.sendRPC(pid, out) + } +} + +func (gs *GossipSubRouter) Join(topic string) { + gmap, ok := gs.mesh[topic] + if ok { + return + } + + gmap, ok = gs.fanout[topic] + if ok { + gs.mesh[topic] = gmap + delete(gs.fanout, topic) + delete(gs.lastpub, topic) + } else { + peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + gmap = peerListToMap(peers) + gs.mesh[topic] = gmap + } + + for p := range gmap { + gs.sendGraft(p, topic) + } +} + +func (gs *GossipSubRouter) Leave(topic string) { + gmap, ok := gs.mesh[topic] + if !ok { + return + } + + delete(gs.mesh, topic) + + for p := range gmap { + gs.sendPrune(p, topic) + } +} + +func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { + graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}} + out := rpcWithControl(nil, nil, nil, graft, nil) + gs.sendRPC(p, out) +} + +func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { + prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}} + out := rpcWithControl(nil, nil, nil, nil, prune) + gs.sendRPC(p, out) +} + +func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { + // piggyback cotrol message retries + ctl, ok := gs.control[p] + if ok { + gs.piggybackControl(p, out, ctl) + delete(gs.control, p) + } + + // piggyback gossip + ihave, ok := gs.gossip[p] + if ok { + gs.piggybackGossip(p, out, ihave) + delete(gs.gossip, p) + } + + mch, ok := gs.p.peers[p] + if !ok { + return + } + + select { + case mch <- out: + default: + log.Infof("dropping message to peer %s: queue full", p) + // push control messages that need to be retried + ctl := out.GetControl() + if ctl != nil { + gs.pushControl(p, ctl) + } + } +} + +func (gs *GossipSubRouter) heartbeatTimer() { + ticker := time.NewTicker(GossipSubHeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + select { + case gs.p.eval <- gs.heartbeat: + case <-gs.p.ctx.Done(): + return + } + case <-gs.p.ctx.Done(): + return + } + } +} + +func (gs *GossipSubRouter) heartbeat() { + // flush pending control message from retries and gossip + // that hasn't been piggybacked since the last heartbeat + gs.flush() + + tograft := make(map[peer.ID][]string) + toprune := make(map[peer.ID][]string) + + // maintain the mesh for topics we have joined + for topic, peers := range gs.mesh { + + // do we have enough peers? + if len(peers) < GossipSubDlo { + ineed := GossipSubD - len(peers) + plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { + // filter our current peers + _, ok := peers[p] + return !ok + }) + + for _, p := range plst { + peers[p] = struct{}{} + topics := tograft[p] + tograft[p] = append(topics, topic) + } + } + + // do we have too many peers? + if len(peers) > GossipSubDhi { + idontneed := len(peers) - GossipSubD + plst := peerMapToList(peers) + shufflePeers(plst) + + for _, p := range plst[:idontneed] { + delete(peers, p) + topics := toprune[p] + toprune[p] = append(topics, topic) + } + } + + gs.emitGossip(topic, peers) + } + + // expire fanout for topics we haven't published to in a while + now := time.Now().UnixNano() + for topic, lastpub := range gs.lastpub { + if lastpub+int64(GossipSubFanoutTTL) < now { + delete(gs.fanout, topic) + delete(gs.lastpub, topic) + } + } + + // maintain our fanout for topics we are publishing but we have not joined + for topic, peers := range gs.fanout { + // check whether our peers are still in the topic + for p := range peers { + _, ok := gs.p.topics[topic][p] + if !ok { + delete(peers, p) + } + } + + // do we need more peers? + if len(peers) < GossipSubD { + ineed := GossipSubD - len(peers) + plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { + // filter our current peers + _, ok := peers[p] + return !ok + }) + + for _, p := range plst { + peers[p] = struct{}{} + } + } + + gs.emitGossip(topic, peers) + } + + // send coalesced GRAFT/PRUNE messages (will piggyback gossip) + gs.sendGraftPrune(tograft, toprune) + + // advance the message history window + gs.mcache.Shift() +} + +func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) { + for p, topics := range tograft { + graft := make([]*pb.ControlGraft, 0, len(topics)) + for _, topic := range topics { + graft = append(graft, &pb.ControlGraft{TopicID: &topic}) + } + + var prune []*pb.ControlPrune + pruning, ok := toprune[p] + if ok { + delete(toprune, p) + prune = make([]*pb.ControlPrune, 0, len(pruning)) + for _, topic := range pruning { + prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + } + } + + out := rpcWithControl(nil, nil, nil, graft, prune) + gs.sendRPC(p, out) + } + + for p, topics := range toprune { + prune := make([]*pb.ControlPrune, 0, len(topics)) + for _, topic := range topics { + prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + } + + out := rpcWithControl(nil, nil, nil, nil, prune) + gs.sendRPC(p, out) + } + +} + +func (gs *GossipSubRouter) emitGossip(topic string, peers map[peer.ID]struct{}) { + mids := gs.mcache.GetGossipIDs(topic) + if len(mids) == 0 { + return + } + + gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + for _, p := range gpeers { + // skip mesh peers + _, ok := peers[p] + if !ok { + gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) + } + } +} + +func (gs *GossipSubRouter) flush() { + // send gossip first, which will also piggyback control + for p, ihave := range gs.gossip { + delete(gs.gossip, p) + out := rpcWithControl(nil, ihave, nil, nil, nil) + gs.sendRPC(p, out) + } + + // send the remaining control messages + for p, ctl := range gs.control { + delete(gs.control, p) + out := rpcWithControl(nil, nil, nil, ctl.Graft, ctl.Prune) + gs.sendRPC(p, out) + } +} + +func (gs *GossipSubRouter) pushGossip(p peer.ID, ihave *pb.ControlIHave) { + gossip := gs.gossip[p] + gossip = append(gossip, ihave) + gs.gossip[p] = gossip +} + +func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) { + ctl := out.GetControl() + if ctl == nil { + ctl = &pb.ControlMessage{} + out.Control = ctl + } + + ctl.Ihave = ihave +} + +func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) { + // remove IHAVE/IWANT from control message, gossip is not retried + ctl.Ihave = nil + ctl.Iwant = nil + if ctl.Graft != nil || ctl.Prune != nil { + gs.control[p] = ctl + } +} + +func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) { + // check control message for staleness first + var tograft []*pb.ControlGraft + var toprune []*pb.ControlPrune + + for _, graft := range ctl.GetGraft() { + topic := graft.GetTopicID() + peers, ok := gs.mesh[topic] + if !ok { + continue + } + _, ok = peers[p] + if ok { + tograft = append(tograft, graft) + } + } + + for _, prune := range ctl.GetPrune() { + topic := prune.GetTopicID() + peers, ok := gs.mesh[topic] + if !ok { + toprune = append(toprune, prune) + continue + } + _, ok = peers[p] + if !ok { + toprune = append(toprune, prune) + } + } + + if len(tograft) == 0 && len(toprune) == 0 { + return + } + + xctl := out.Control + if xctl == nil { + xctl = &pb.ControlMessage{} + out.Control = xctl + } + + if len(tograft) > 0 { + xctl.Graft = append(xctl.Graft, tograft...) + } + if len(toprune) > 0 { + xctl.Prune = append(xctl.Prune, toprune...) + } +} + +func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID { + tmap, ok := gs.p.topics[topic] + if !ok { + return nil + } + + peers := make([]peer.ID, 0, len(tmap)) + for p := range tmap { + if gs.peers[p] == GossipSubID && filter(p) { + peers = append(peers, p) + } + } + + shufflePeers(peers) + + if count > 0 && len(peers) > count { + peers = peers[:count] + } + + return peers +} + +func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { + pmap := make(map[peer.ID]struct{}) + for _, p := range peers { + pmap[p] = struct{}{} + } + return pmap +} + +func peerMapToList(peers map[peer.ID]struct{}) []peer.ID { + plst := make([]peer.ID, 0, len(peers)) + for p := range peers { + plst = append(plst, p) + } + return plst +} + +func shufflePeers(peers []peer.ID) { + for i := range peers { + j := rand.Intn(i + 1) + peers[i], peers[j] = peers[j], peers[i] + } +} diff --git a/gossipsub_test.go b/gossipsub_test.go new file mode 100644 index 0000000..8dae7ff --- /dev/null +++ b/gossipsub_test.go @@ -0,0 +1,912 @@ +package floodsub + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "testing" + "time" + + host "github.com/libp2p/go-libp2p-host" +) + +func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { + var psubs []*PubSub + for _, h := range hs { + ps, err := NewGossipSub(ctx, h, opts...) + if err != nil { + panic(err) + } + psubs = append(psubs, ps) + } + return psubs +} + +func TestSparseGossipsub(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + sparseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestDenseGossipsub(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubFanout(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + // subscribe the owner + subch, err := psubs[0].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + msgs = append(msgs, subch) + + // wait for a heartbeat + time.Sleep(time.Second * 1) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubFanoutMaintenance(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + // unsubscribe all peers to exercise fanout maintenance + for _, sub := range msgs { + sub.Cancel() + } + msgs = nil + + // wait for heartbeats + time.Sleep(time.Second * 2) + + // resubscribe and repeat + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubFanoutExpiry(t *testing.T) { + GossipSubFanoutTTL = 1 * time.Second + defer func() { GossipSubFanoutTTL = 60 * time.Second }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 10) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 5; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 { + t.Fatal("owner has no fanout") + } + + // wait for TTL to expire fanout peers in owner + time.Sleep(time.Second * 2) + + if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 { + t.Fatal("fanout hasn't expired") + } +} + +func TestGossipsubGossip(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + // wait a bit to have some gossip interleaved + time.Sleep(time.Millisecond * 100) + } + + // and wait for some gossip flushing + time.Sleep(time.Second * 2) +} + +func TestGossipsubGossipPiggyback(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + var xmsgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("bazcrux") + if err != nil { + t.Fatal(err) + } + + xmsgs = append(xmsgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + psubs[owner].Publish("bazcrux", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + for _, sub := range xmsgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + // wait a bit to have some gossip interleaved + time.Sleep(time.Millisecond * 100) + } + + // and wait for some gossip flushing + time.Sleep(time.Second * 2) +} + +func TestGossipsubGossipPropagation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 20) + psubs := getGossipsubs(ctx, hosts) + + hosts1 := hosts[:GossipSubD+1] + hosts2 := append(hosts[GossipSubD+1:], hosts[0]) + + denseConnect(t, hosts1) + denseConnect(t, hosts2) + + var msgs1 []*Subscription + for _, ps := range psubs[1 : GossipSubD+1] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs1 = append(msgs1, subch) + } + + time.Sleep(time.Second * 1) + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs1 { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + time.Sleep(time.Millisecond * 100) + + var msgs2 []*Subscription + for _, ps := range psubs[GossipSubD+1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs2 = append(msgs2, subch) + } + + var collect [][]byte + for i := 0; i < 10; i++ { + for _, sub := range msgs2 { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + collect = append(collect, got.Data) + } + } + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + gotit := false + for j := 0; j < len(collect); j++ { + if bytes.Equal(msg, collect[j]) { + gotit = true + break + } + } + if !gotit { + t.Fatalf("Didn't get message %s", string(msg)) + } + } +} + +func TestGossipsubPrune(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + // disconnect some peers from the mesh to get some PRUNEs + for _, sub := range msgs[:5] { + sub.Cancel() + } + + // wait a bit to take effect + time.Sleep(time.Millisecond * 100) + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs[5:] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubGraft(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + sparseConnect(t, hosts) + + time.Sleep(time.Second * 1) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + + // wait for announce to propagate + time.Sleep(time.Millisecond * 100) + } + + time.Sleep(time.Second * 1) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubRemovePeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 20) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + // disconnect some peers to exercise RemovePeer paths + for _, host := range hosts[:5] { + host.Close() + } + + // wait a heartbeat + time.Sleep(time.Second * 1) + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 5 + rand.Intn(len(psubs)-5) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs[5:] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubGraftPruneRetry(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 10) + psubs := getGossipsubs(ctx, hosts) + denseConnect(t, hosts) + + var topics []string + var msgs [][]*Subscription + for i := 0; i < 35; i++ { + topic := fmt.Sprintf("topic%d", i) + topics = append(topics, topic) + + var subs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + subs = append(subs, subch) + } + msgs = append(msgs, subs) + } + + // wait for heartbeats to build meshes + time.Sleep(time.Second * 5) + + for i, topic := range topics { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish(topic, msg) + + for _, sub := range msgs[i] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubControlPiggyback(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 10) + psubs := getGossipsubs(ctx, hosts) + denseConnect(t, hosts) + + for _, ps := range psubs { + subch, err := ps.Subscribe("flood") + if err != nil { + t.Fatal(err) + } + go func(sub *Subscription) { + for { + _, err := sub.Next(ctx) + if err != nil { + break + } + } + }(subch) + } + + time.Sleep(time.Second * 1) + + // create a background flood of messages that overloads the queues + done := make(chan struct{}) + go func() { + owner := rand.Intn(len(psubs)) + for i := 0; i < 10000; i++ { + msg := []byte("background flooooood") + psubs[owner].Publish("flood", msg) + } + done <- struct{}{} + }() + + time.Sleep(time.Millisecond * 20) + + // and subscribe to a bunch of topics in the meantime -- this should + // result in some dropped control messages, with subsequent piggybacking + // in the background flood + var topics []string + var msgs [][]*Subscription + for i := 0; i < 5; i++ { + topic := fmt.Sprintf("topic%d", i) + topics = append(topics, topic) + + var subs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + subs = append(subs, subch) + } + msgs = append(msgs, subs) + } + + // wait for the flood to stop + <-done + + // and test that we have functional overlays + for i, topic := range topics { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish(topic, msg) + + for _, sub := range msgs[i] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestMixedGossipsub(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 30) + + gsubs := getGossipsubs(ctx, hosts[:20]) + fsubs := getPubsubs(ctx, hosts[20:]) + psubs := append(gsubs, fsubs...) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + sparseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubMultihops(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 6) + + psubs := getGossipsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + connect(t, hosts[2], hosts[3]) + connect(t, hosts[3], hosts[4]) + connect(t, hosts[4], hosts[5]) + + var subs []*Subscription + for i := 1; i < 6; i++ { + ch, err := psubs[i].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + subs = append(subs, ch) + } + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + msg := []byte("i like cats") + err := psubs[0].Publish("foobar", msg) + if err != nil { + t.Fatal(err) + } + + // last node in the chain should get the message + select { + case out := <-subs[4].ch: + if !bytes.Equal(out.GetData(), msg) { + t.Fatal("got wrong data") + } + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for message") + } +} + +func TestGossipsubTreeTopology(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 10) + psubs := getGossipsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + connect(t, hosts[1], hosts[4]) + connect(t, hosts[2], hosts[3]) + connect(t, hosts[0], hosts[5]) + connect(t, hosts[5], hosts[6]) + connect(t, hosts[5], hosts[8]) + connect(t, hosts[6], hosts[7]) + connect(t, hosts[8], hosts[9]) + + /* + [0] -> [1] -> [2] -> [3] + | L->[4] + v + [5] -> [6] -> [7] + | + v + [8] -> [9] + */ + + var chs []*Subscription + for _, ps := range psubs { + ch, err := ps.Subscribe("fizzbuzz") + if err != nil { + t.Fatal(err) + } + + chs = append(chs, ch) + } + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + assertPeerLists(t, hosts, psubs[0], 1, 5) + assertPeerLists(t, hosts, psubs[1], 0, 2, 4) + assertPeerLists(t, hosts, psubs[2], 1, 3) + + checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs) +} diff --git a/mcache.go b/mcache.go new file mode 100644 index 0000000..f501d1e --- /dev/null +++ b/mcache.go @@ -0,0 +1,61 @@ +package floodsub + +import ( + pb "github.com/libp2p/go-floodsub/pb" +) + +func NewMessageCache(gossip, history int) *MessageCache { + return &MessageCache{ + msgs: make(map[string]*pb.Message), + history: make([][]CacheEntry, history), + gossip: gossip, + } +} + +type MessageCache struct { + msgs map[string]*pb.Message + history [][]CacheEntry + gossip int +} + +type CacheEntry struct { + mid string + topics []string +} + +func (mc *MessageCache) Put(msg *pb.Message) { + mid := msgID(msg) + mc.msgs[mid] = msg + mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topics: msg.GetTopicIDs()}) +} + +func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { + m, ok := mc.msgs[mid] + return m, ok +} + +func (mc *MessageCache) GetGossipIDs(topic string) []string { + var mids []string + for _, entries := range mc.history[:mc.gossip] { + for _, entry := range entries { + for _, t := range entry.topics { + if t == topic { + mids = append(mids, entry.mid) + break + } + } + } + } + return mids +} + +func (mc *MessageCache) Shift() { + last := mc.history[len(mc.history)-1] + for _, entry := range last { + delete(mc.msgs, entry.mid) + } + for i := len(mc.history) - 2; i >= 0; i-- { + mc.history[i+1] = mc.history[i] + } + mc.history[0] = nil +} diff --git a/mcache_test.go b/mcache_test.go new file mode 100644 index 0000000..6bc526c --- /dev/null +++ b/mcache_test.go @@ -0,0 +1,165 @@ +package floodsub + +import ( + "encoding/binary" + "fmt" + "testing" + + pb "github.com/libp2p/go-floodsub/pb" +) + +func TestMessageCache(t *testing.T) { + mcache := NewMessageCache(3, 5) + + msgs := make([]*pb.Message, 60) + for i := range msgs { + msgs[i] = makeTestMessage(i) + } + + for i := 0; i < 10; i++ { + mcache.Put(msgs[i]) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + m, ok := mcache.Get(mid) + if !ok { + t.Fatalf("Message %d not in cache", i) + } + + if m != msgs[i] { + t.Fatalf("Message %d does not match cache", i) + } + } + + gids := mcache.GetGossipIDs("test") + if len(gids) != 10 { + t.Fatalf("Expected 10 gossip IDs; got %d", len(gids)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + if mid != gids[i] { + t.Fatalf("GossipID mismatch for message %d", i) + } + } + + mcache.Shift() + for i := 10; i < 20; i++ { + mcache.Put(msgs[i]) + } + + for i := 0; i < 20; i++ { + mid := msgID(msgs[i]) + m, ok := mcache.Get(mid) + if !ok { + t.Fatalf("Message %d not in cache", i) + } + + if m != msgs[i] { + t.Fatalf("Message %d does not match cache", i) + } + } + + gids = mcache.GetGossipIDs("test") + if len(gids) != 20 { + t.Fatalf("Expected 20 gossip IDs; got %d", len(gids)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + if mid != gids[10+i] { + t.Fatalf("GossipID mismatch for message %d", i) + } + } + + for i := 10; i < 20; i++ { + mid := msgID(msgs[i]) + if mid != gids[i-10] { + t.Fatalf("GossipID mismatch for message %d", i) + } + } + + mcache.Shift() + for i := 20; i < 30; i++ { + mcache.Put(msgs[i]) + } + + mcache.Shift() + for i := 30; i < 40; i++ { + mcache.Put(msgs[i]) + } + + mcache.Shift() + for i := 40; i < 50; i++ { + mcache.Put(msgs[i]) + } + + mcache.Shift() + for i := 50; i < 60; i++ { + mcache.Put(msgs[i]) + } + + if len(mcache.msgs) != 50 { + t.Fatalf("Expected 50 messages in the cache; got %d", len(mcache.msgs)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[i]) + _, ok := mcache.Get(mid) + if ok { + t.Fatalf("Message %d still in cache", i) + } + } + + for i := 10; i < 60; i++ { + mid := msgID(msgs[i]) + m, ok := mcache.Get(mid) + if !ok { + t.Fatalf("Message %d not in cache", i) + } + + if m != msgs[i] { + t.Fatalf("Message %d does not match cache", i) + } + } + + gids = mcache.GetGossipIDs("test") + if len(gids) != 30 { + t.Fatalf("Expected 30 gossip IDs; got %d", len(gids)) + } + + for i := 0; i < 10; i++ { + mid := msgID(msgs[50+i]) + if mid != gids[i] { + t.Fatalf("GossipID mismatch for message %d", i) + } + } + + for i := 10; i < 20; i++ { + mid := msgID(msgs[30+i]) + if mid != gids[i] { + t.Fatalf("GossipID mismatch for message %d", i) + } + } + + for i := 20; i < 30; i++ { + mid := msgID(msgs[10+i]) + if mid != gids[i] { + t.Fatalf("GossipID mismatch for message %d", i) + } + } + +} + +func makeTestMessage(n int) *pb.Message { + seqno := make([]byte, 8) + binary.BigEndian.PutUint64(seqno, uint64(n)) + data := []byte(fmt.Sprintf("%d", n)) + return &pb.Message{ + Data: data, + TopicIDs: []string{"test"}, + From: []byte("test"), + Seqno: seqno, + } +} diff --git a/pb/rpc.pb.go b/pb/rpc.pb.go index a5933c0..f970bbc 100644 --- a/pb/rpc.pb.go +++ b/pb/rpc.pb.go @@ -11,6 +11,11 @@ It is generated from these files: It has these top-level messages: RPC Message + ControlMessage + ControlIHave + ControlIWant + ControlGraft + ControlPrune TopicDescriptor */ package floodsub_pb @@ -97,9 +102,10 @@ func (x *TopicDescriptor_EncOpts_EncMode) UnmarshalJSON(data []byte) error { } type RPC struct { - Subscriptions []*RPC_SubOpts `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"` - Publish []*Message `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"` - XXX_unrecognized []byte `json:"-"` + Subscriptions []*RPC_SubOpts `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"` + Publish []*Message `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"` + Control *ControlMessage `protobuf:"bytes,3,opt,name=control" json:"control,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *RPC) Reset() { *m = RPC{} } @@ -120,6 +126,13 @@ func (m *RPC) GetPublish() []*Message { return nil } +func (m *RPC) GetControl() *ControlMessage { + if m != nil { + return m.Control + } + return nil +} + type RPC_SubOpts struct { Subscribe *bool `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"` Topicid *string `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"` @@ -184,6 +197,118 @@ func (m *Message) GetTopicIDs() []string { return nil } +type ControlMessage struct { + Ihave []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"` + Iwant []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"` + Graft []*ControlGraft `protobuf:"bytes,3,rep,name=graft" json:"graft,omitempty"` + Prune []*ControlPrune `protobuf:"bytes,4,rep,name=prune" json:"prune,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlMessage) Reset() { *m = ControlMessage{} } +func (m *ControlMessage) String() string { return proto.CompactTextString(m) } +func (*ControlMessage) ProtoMessage() {} + +func (m *ControlMessage) GetIhave() []*ControlIHave { + if m != nil { + return m.Ihave + } + return nil +} + +func (m *ControlMessage) GetIwant() []*ControlIWant { + if m != nil { + return m.Iwant + } + return nil +} + +func (m *ControlMessage) GetGraft() []*ControlGraft { + if m != nil { + return m.Graft + } + return nil +} + +func (m *ControlMessage) GetPrune() []*ControlPrune { + if m != nil { + return m.Prune + } + return nil +} + +type ControlIHave struct { + TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` + MessageIDs []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlIHave) Reset() { *m = ControlIHave{} } +func (m *ControlIHave) String() string { return proto.CompactTextString(m) } +func (*ControlIHave) ProtoMessage() {} + +func (m *ControlIHave) GetTopicID() string { + if m != nil && m.TopicID != nil { + return *m.TopicID + } + return "" +} + +func (m *ControlIHave) GetMessageIDs() []string { + if m != nil { + return m.MessageIDs + } + return nil +} + +type ControlIWant struct { + MessageIDs []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlIWant) Reset() { *m = ControlIWant{} } +func (m *ControlIWant) String() string { return proto.CompactTextString(m) } +func (*ControlIWant) ProtoMessage() {} + +func (m *ControlIWant) GetMessageIDs() []string { + if m != nil { + return m.MessageIDs + } + return nil +} + +type ControlGraft struct { + TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlGraft) Reset() { *m = ControlGraft{} } +func (m *ControlGraft) String() string { return proto.CompactTextString(m) } +func (*ControlGraft) ProtoMessage() {} + +func (m *ControlGraft) GetTopicID() string { + if m != nil && m.TopicID != nil { + return *m.TopicID + } + return "" +} + +type ControlPrune struct { + TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlPrune) Reset() { *m = ControlPrune{} } +func (m *ControlPrune) String() string { return proto.CompactTextString(m) } +func (*ControlPrune) ProtoMessage() {} + +func (m *ControlPrune) GetTopicID() string { + if m != nil && m.TopicID != nil { + return *m.TopicID + } + return "" +} + // topicID = hash(topicDescriptor); (not the topic.name) type TopicDescriptor struct { Name *string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` @@ -269,6 +394,11 @@ func init() { proto.RegisterType((*RPC)(nil), "floodsub.pb.RPC") proto.RegisterType((*RPC_SubOpts)(nil), "floodsub.pb.RPC.SubOpts") proto.RegisterType((*Message)(nil), "floodsub.pb.Message") + proto.RegisterType((*ControlMessage)(nil), "floodsub.pb.ControlMessage") + proto.RegisterType((*ControlIHave)(nil), "floodsub.pb.ControlIHave") + proto.RegisterType((*ControlIWant)(nil), "floodsub.pb.ControlIWant") + proto.RegisterType((*ControlGraft)(nil), "floodsub.pb.ControlGraft") + proto.RegisterType((*ControlPrune)(nil), "floodsub.pb.ControlPrune") proto.RegisterType((*TopicDescriptor)(nil), "floodsub.pb.TopicDescriptor") proto.RegisterType((*TopicDescriptor_AuthOpts)(nil), "floodsub.pb.TopicDescriptor.AuthOpts") proto.RegisterType((*TopicDescriptor_EncOpts)(nil), "floodsub.pb.TopicDescriptor.EncOpts") diff --git a/pb/rpc.proto b/pb/rpc.proto index f43d3c1..a7fe600 100644 --- a/pb/rpc.proto +++ b/pb/rpc.proto @@ -8,6 +8,8 @@ message RPC { optional bool subscribe = 1; // subscribe or unsubcribe optional string topicid = 2; } + + optional ControlMessage control = 3; } message Message { @@ -17,6 +19,30 @@ message Message { repeated string topicIDs = 4; } +message ControlMessage { + repeated ControlIHave ihave = 1; + repeated ControlIWant iwant = 2; + repeated ControlGraft graft = 3; + repeated ControlPrune prune = 4; +} + +message ControlIHave { + optional string topicID = 1; + repeated string messageIDs = 2; +} + +message ControlIWant { + repeated string messageIDs = 1; +} + +message ControlGraft { + optional string topicID = 1; +} + +message ControlPrune { + optional string topicID = 1; +} + // topicID = hash(topicDescriptor); (not the topic.name) message TopicDescriptor { optional string name = 1; diff --git a/pubsub.go b/pubsub.go index 5d92af9..4d5415a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "fmt" + "math/rand" "sync/atomic" "time" @@ -82,6 +83,9 @@ type PubSub struct { // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} + // eval thunk in event loop + eval chan func() + peers map[peer.ID]chan *RPC seenMessages *timecache.TimeCache @@ -90,12 +94,26 @@ type PubSub struct { // PubSubRouter is the message router component of PubSub type PubSubRouter interface { + // Protocols returns the list of protocols supported by the router. Protocols() []protocol.ID + // Attach is invoked by the PubSub constructor to attach the router to a + // freshly initialized PubSub instance. Attach(*PubSub) + // AddPeer notifies the router that a new peer has been connected. AddPeer(peer.ID, protocol.ID) + // RemovePeer notifies the router that a peer has been disconnected. RemovePeer(peer.ID) + // HandleRPC is invoked to process control messages in the RPC envelope. + // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) + // Publish is invoked to forward a new message that has been validated. Publish(peer.ID, *pb.Message) + // Join notifies the router that we want to receive and forward messages in a topic. + // It is invoked after the subscription announcement. + Join(topic string) + // Leave notifies the router that we are no longer interested in a topic. + // It is invoked after the unsubscription announcement. + Leave(topic string) } type Message struct { @@ -115,7 +133,7 @@ type RPC struct { type Option func(*PubSub) error -// NewFloodSub returns a new PubSub management object +// NewPubSub returns a new PubSub management object func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) { ps := &PubSub{ host: h, @@ -133,11 +151,12 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), validateThrottle: make(chan struct{}, defaultValidateThrottle), + eval: make(chan func()), myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), topicVals: make(map[string]*topicVal), - seenMessages: timecache.NewTimeCache(time.Second * 30), + seenMessages: timecache.NewTimeCache(time.Second * 120), counter: uint64(time.Now().UnixNano()), } @@ -243,7 +262,7 @@ func (p *PubSub) processLoop(ctx context.Context) { p.pushMsg(vals, p.host.ID(), msg) case req := <-p.sendMsg: - p.maybePublishMessage(req.from, req.msg.Message) + p.publishMessage(req.from, req.msg.Message) case req := <-p.addVal: p.addValidator(req) @@ -251,6 +270,9 @@ func (p *PubSub) processLoop(ctx context.Context) { case req := <-p.rmVal: p.rmValidator(req) + case thunk := <-p.eval: + thunk() + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return @@ -276,6 +298,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { if len(subs) == 0 { delete(p.myTopics, sub.topic) p.announce(sub.topic, false) + p.rt.Leave(sub.topic) } } @@ -290,6 +313,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { // announce we want this topic if len(subs) == 0 { p.announce(sub.topic, true) + p.rt.Join(sub.topic) } // make new if not there @@ -319,11 +343,28 @@ func (p *PubSub) announce(topic string, sub bool) { select { case peer <- out: default: - log.Infof("dropping announce message to peer %s: queue full", pid) + log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + go p.announceRetry(topic, sub) } } } +func (p *PubSub) announceRetry(topic string, sub bool) { + time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) + + retry := func() { + _, ok := p.myTopics[topic] + if (ok && sub) || (!ok && !sub) { + p.announce(topic, sub) + } + } + + select { + case p.eval <- retry: + case <-p.ctx.Done(): + } +} + // notifySubs sends a given message to all corresponding subscribers. // Only called from processLoop. func (p *PubSub) notifySubs(msg *pb.Message) { @@ -405,6 +446,12 @@ func msgID(pmsg *pb.Message) string { // pushMsg pushes a message performing validation as necessary func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { + id := msgID(msg.Message) + if p.seenMessage(id) { + return + } + p.markSeen(id) + if len(vals) > 0 { // validation is asynchronous and globally throttled with the throttleValidate semaphore. // the purpose of the global throttle is to bound the goncurrency possible from incoming @@ -422,7 +469,7 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } - p.maybePublishMessage(src, msg.Message) + p.publishMessage(src, msg.Message) } // validate performs validation and only sends the message if all validators succeed @@ -472,13 +519,7 @@ loop: } } -func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { - id := msgID(pmsg) - if p.seenMessage(id) { - return - } - - p.markSeen(id) +func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { p.notifySubs(pmsg) p.rt.Publish(from, pmsg) } @@ -556,10 +597,7 @@ func (p *PubSub) GetTopics() []string { // Publish publishes data under the given topic func (p *PubSub) Publish(topic string, data []byte) error { - seqno := make([]byte, 8) - counter := atomic.AddUint64(&p.counter, 1) - binary.BigEndian.PutUint64(seqno, counter) - + seqno := p.nextSeqno() p.publish <- &Message{ &pb.Message{ Data: data, @@ -571,12 +609,20 @@ func (p *PubSub) Publish(topic string, data []byte) error { return nil } +func (p *PubSub) nextSeqno() []byte { + seqno := make([]byte, 8) + counter := atomic.AddUint64(&p.counter, 1) + binary.BigEndian.PutUint64(seqno, counter) + return seqno +} + type listPeerReq struct { resp chan []peer.ID topic string } -// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done. +// sendReq is a request to call publishMessage. +// It is issued after message validation is done. type sendReq struct { from peer.ID msg *Message