correctly implement flood publishing

Only applies to messages that have been published by our own peer!
This commit is contained in:
vyzo 2020-03-03 15:44:44 +02:00
parent 4f6ca1b1b7
commit 69e4102a6d
4 changed files with 10 additions and 14 deletions

View File

@ -3,8 +3,6 @@ package pubsub
import ( import (
"context" "context"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
@ -71,7 +69,7 @@ func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { func (fs *FloodSubRouter) Publish(from peer.ID, msg *Message) {
tosend := make(map[peer.ID]struct{}) tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() { for _, topic := range msg.GetTopicIDs() {
tmap, ok := fs.p.topics[topic] tmap, ok := fs.p.topics[topic]
@ -84,7 +82,7 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
} }
} }
out := rpcWithMessages(msg) out := rpcWithMessages(msg.Message)
for pid := range tosend { for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) { if pid == from || pid == peer.ID(msg.GetFrom()) {
continue continue

View File

@ -476,8 +476,8 @@ func (gs *GossipSubRouter) connector() {
} }
} }
func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { func (gs *GossipSubRouter) Publish(from peer.ID, msg *Message) {
gs.mcache.Put(msg) gs.mcache.Put(msg.Message)
tosend := make(map[peer.ID]struct{}) tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() { for _, topic := range msg.GetTopicIDs() {
@ -487,7 +487,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
continue continue
} }
if gs.floodPublish { if gs.floodPublish && msg.ReceivedFrom == gs.p.host.ID() {
for p := range tmap { for p := range tmap {
if gs.score.Score(p) >= gs.publishThreshold { if gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{} tosend[p] = struct{}{}
@ -527,7 +527,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
} }
} }
out := rpcWithMessages(msg) out := rpcWithMessages(msg.Message)
for pid := range tosend { for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) { if pid == from || pid == peer.ID(msg.GetFrom()) {
continue continue

View File

@ -155,7 +155,7 @@ type PubSubRouter interface {
// It is invoked after subscriptions and payload messages have been processed. // It is invoked after subscriptions and payload messages have been processed.
HandleRPC(*RPC) HandleRPC(*RPC)
// Publish is invoked to forward a new message that has been validated. // Publish is invoked to forward a new message that has been validated.
Publish(peer.ID, *pb.Message) Publish(peer.ID, *Message)
// Join notifies the router that we want to receive and forward messages in a topic. // Join notifies the router that we want to receive and forward messages in a topic.
// It is invoked after the subscription announcement. // It is invoked after the subscription announcement.
Join(topic string) Join(topic string)
@ -840,7 +840,7 @@ func (p *PubSub) pushMsg(msg *Message) {
func (p *PubSub) publishMessage(msg *Message) { func (p *PubSub) publishMessage(msg *Message) {
p.tracer.DeliverMessage(msg) p.tracer.DeliverMessage(msg)
p.notifySubs(msg) p.notifySubs(msg)
p.rt.Publish(msg.ReceivedFrom, msg.Message) p.rt.Publish(msg.ReceivedFrom, msg)
} }
type addTopicReq struct { type addTopicReq struct {

View File

@ -3,8 +3,6 @@ package pubsub
import ( import (
"context" "context"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
@ -90,7 +88,7 @@ func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool {
func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {}
func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) { func (rs *RandomSubRouter) Publish(from peer.ID, msg *Message) {
tosend := make(map[peer.ID]struct{}) tosend := make(map[peer.ID]struct{})
rspeers := make(map[peer.ID]struct{}) rspeers := make(map[peer.ID]struct{})
src := peer.ID(msg.GetFrom()) src := peer.ID(msg.GetFrom())
@ -127,7 +125,7 @@ func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) {
} }
} }
out := rpcWithMessages(msg) out := rpcWithMessages(msg.Message)
for p := range tosend { for p := range tosend {
mch, ok := rs.p.peers[p] mch, ok := rs.p.peers[p]
if !ok { if !ok {