diff --git a/floodsub.go b/floodsub.go index 3690f62..b44632d 100644 --- a/floodsub.go +++ b/floodsub.go @@ -3,8 +3,6 @@ package pubsub import ( "context" - pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -71,7 +69,7 @@ func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} -func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { +func (fs *FloodSubRouter) Publish(from peer.ID, msg *Message) { tosend := make(map[peer.ID]struct{}) for _, topic := range msg.GetTopicIDs() { tmap, ok := fs.p.topics[topic] @@ -84,7 +82,7 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { } } - out := rpcWithMessages(msg) + out := rpcWithMessages(msg.Message) for pid := range tosend { if pid == from || pid == peer.ID(msg.GetFrom()) { continue diff --git a/gossipsub.go b/gossipsub.go index a1c1d26..dfd1afe 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -476,8 +476,8 @@ func (gs *GossipSubRouter) connector() { } } -func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { - gs.mcache.Put(msg) +func (gs *GossipSubRouter) Publish(from peer.ID, msg *Message) { + gs.mcache.Put(msg.Message) tosend := make(map[peer.ID]struct{}) for _, topic := range msg.GetTopicIDs() { @@ -487,7 +487,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { continue } - if gs.floodPublish { + if gs.floodPublish && msg.ReceivedFrom == gs.p.host.ID() { for p := range tmap { if gs.score.Score(p) >= gs.publishThreshold { tosend[p] = struct{}{} @@ -527,7 +527,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { } } - out := rpcWithMessages(msg) + out := rpcWithMessages(msg.Message) for pid := range tosend { if pid == from || pid == peer.ID(msg.GetFrom()) { continue diff --git a/pubsub.go b/pubsub.go index 285b1b7..a7cf901 100644 --- a/pubsub.go +++ b/pubsub.go @@ -155,7 +155,7 @@ type PubSubRouter interface { // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) // Publish is invoked to forward a new message that has been validated. - Publish(peer.ID, *pb.Message) + Publish(peer.ID, *Message) // Join notifies the router that we want to receive and forward messages in a topic. // It is invoked after the subscription announcement. Join(topic string) @@ -840,7 +840,7 @@ func (p *PubSub) pushMsg(msg *Message) { func (p *PubSub) publishMessage(msg *Message) { p.tracer.DeliverMessage(msg) p.notifySubs(msg) - p.rt.Publish(msg.ReceivedFrom, msg.Message) + p.rt.Publish(msg.ReceivedFrom, msg) } type addTopicReq struct { diff --git a/randomsub.go b/randomsub.go index ffbfc62..72c3340 100644 --- a/randomsub.go +++ b/randomsub.go @@ -3,8 +3,6 @@ package pubsub import ( "context" - pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -90,7 +88,7 @@ func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool { func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} -func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) { +func (rs *RandomSubRouter) Publish(from peer.ID, msg *Message) { tosend := make(map[peer.ID]struct{}) rspeers := make(map[peer.ID]struct{}) src := peer.ID(msg.GetFrom()) @@ -127,7 +125,7 @@ func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) { } } - out := rpcWithMessages(msg) + out := rpcWithMessages(msg.Message) for p := range tosend { mch, ok := rs.p.peers[p] if !ok {