diff --git a/floodsub.go b/floodsub.go index b44632d..4fe7669 100644 --- a/floodsub.go +++ b/floodsub.go @@ -69,7 +69,9 @@ func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} -func (fs *FloodSubRouter) Publish(from peer.ID, msg *Message) { +func (fs *FloodSubRouter) Publish(msg *Message) { + from := msg.ReceivedFrom + tosend := make(map[peer.ID]struct{}) for _, topic := range msg.GetTopicIDs() { tmap, ok := fs.p.topics[topic] diff --git a/gossipsub.go b/gossipsub.go index 43b7b36..f67f181 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -484,8 +484,9 @@ func (gs *GossipSubRouter) connector() { } } -func (gs *GossipSubRouter) Publish(from peer.ID, msg *Message) { +func (gs *GossipSubRouter) Publish(msg *Message) { gs.mcache.Put(msg.Message) + from := msg.ReceivedFrom tosend := make(map[peer.ID]struct{}) for _, topic := range msg.GetTopicIDs() { @@ -495,7 +496,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *Message) { continue } - if gs.floodPublish && msg.ReceivedFrom == gs.p.host.ID() { + if gs.floodPublish && from == gs.p.host.ID() { for p := range tmap { if gs.score.Score(p) >= gs.publishThreshold { tosend[p] = struct{}{} diff --git a/pubsub.go b/pubsub.go index a7cf901..4977e6d 100644 --- a/pubsub.go +++ b/pubsub.go @@ -155,7 +155,7 @@ type PubSubRouter interface { // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) // Publish is invoked to forward a new message that has been validated. - Publish(peer.ID, *Message) + Publish(*Message) // Join notifies the router that we want to receive and forward messages in a topic. // It is invoked after the subscription announcement. Join(topic string) @@ -840,7 +840,7 @@ func (p *PubSub) pushMsg(msg *Message) { func (p *PubSub) publishMessage(msg *Message) { p.tracer.DeliverMessage(msg) p.notifySubs(msg) - p.rt.Publish(msg.ReceivedFrom, msg) + p.rt.Publish(msg) } type addTopicReq struct { diff --git a/randomsub.go b/randomsub.go index 72c3340..42bbd5c 100644 --- a/randomsub.go +++ b/randomsub.go @@ -88,7 +88,9 @@ func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool { func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} -func (rs *RandomSubRouter) Publish(from peer.ID, msg *Message) { +func (rs *RandomSubRouter) Publish(msg *Message) { + from := msg.ReceivedFrom + tosend := make(map[peer.ID]struct{}) rspeers := make(map[peer.ID]struct{}) src := peer.ID(msg.GetFrom())