From 9e5145fb29c9df968bbec842fcb4cbab64f47b7f Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 19 May 2025 17:02:21 -0700 Subject: [PATCH] Send IDONTWANT before first publish (#612) See #610 We previously send IDONTWANT only when forwarding. This has us send IDONTWANT on our initial publish as well. Helps in the case that one or more peers may also publish the same thing at around the same time (see #610 for a longer explanation) and prevents "boomerang" duplicates where a peer sends you back the message you sent before you get a chance to send it to them. This also serves as a hint to a peer that you are about to send them a certain message. --- floodsub.go | 2 +- gossipsub.go | 4 +-- gossipsub_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++++- pubsub.go | 6 ++--- randomsub.go | 2 +- topic.go | 1 + 6 files changed, 70 insertions(+), 8 deletions(-) diff --git a/floodsub.go b/floodsub.go index 4c943bb..359886a 100644 --- a/floodsub.go +++ b/floodsub.go @@ -71,7 +71,7 @@ func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus { return AcceptAll } -func (fs *FloodSubRouter) PreValidation(from peer.ID, msgs []*Message) {} +func (fs *FloodSubRouter) Preprocess(from peer.ID, msgs []*Message) {} func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} diff --git a/gossipsub.go b/gossipsub.go index b5a605a..3b52efe 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -707,10 +707,10 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus { return gs.gate.AcceptFrom(p) } -// PreValidation sends the IDONTWANT control messages to all the mesh +// Preprocess sends the IDONTWANT control messages to all the mesh // peers. They need to be sent right before the validation because they // should be seen by the peers as soon as possible. -func (gs *GossipSubRouter) PreValidation(from peer.ID, msgs []*Message) { +func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) { tmids := make(map[string][]string) for _, msg := range msgs { if len(msg.GetData()) < gs.params.IDontWantMessageThreshold { diff --git a/gossipsub_test.go b/gossipsub_test.go index 72188be..2231352 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -2847,7 +2847,7 @@ var _ RawTracer = &mockRawTracer{} func TestGossipsubNoIDONTWANTToMessageSender(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getDefaultHosts(t, 3) + hosts := getDefaultHosts(t, 2) denseConnect(t, hosts) psubs := make([]*PubSub, 2) @@ -2886,6 +2886,67 @@ func TestGossipsubNoIDONTWANTToMessageSender(t *testing.T) { t.Fatal("IDONTWANT should not be sent to the message sender") case <-time.After(time.Second): } +} + +func TestGossipsubIDONTWANTBeforeFirstPublish(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 2) + denseConnect(t, hosts) + + psubs := make([]*PubSub, 2) + + psubs[0] = getGossipsub(ctx, hosts[0]) + rpcsReceived := make(chan string) + psubs[1] = getGossipsub(ctx, hosts[1], WithRawTracer(&mockRawTracer{ + onRecvRPC: func(rpc *RPC) { + if len(rpc.GetControl().GetIdontwant()) > 0 { + rpcsReceived <- "idontwant" + } + if len(rpc.GetPublish()) > 0 { + rpcsReceived <- "publish" + } + }, + })) + + topicString := "foobar" + var topics []*Topic + for _, ps := range psubs { + topic, err := ps.Join(topicString) + if err != nil { + t.Fatal(err) + } + topics = append(topics, topic) + + _, err = ps.Subscribe(topicString) + if err != nil { + t.Fatal(err) + } + } + time.Sleep(2 * time.Second) + + msg := make([]byte, GossipSubIDontWantMessageThreshold+1) + _ = topics[0].Publish(ctx, msg) + + timeout := time.After(5 * time.Second) + + select { + case kind := <-rpcsReceived: + if kind == "publish" { + t.Fatal("IDONTWANT should be sent before publish") + } + case <-timeout: + t.Fatal("IDONTWANT should be sent on first publish") + } + + select { + case kind := <-rpcsReceived: + if kind != "publish" { + t.Fatal("Expected publish after IDONTWANT") + } + case <-timeout: + t.Fatal("Expected publish after IDONTWANT") + } } diff --git a/pubsub.go b/pubsub.go index fae115a..e8e598b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -204,9 +204,9 @@ type PubSubRouter interface { // Allows routers with internal scoring to vet peers before committing any processing resources // to the message and implement an effective graylist and react to validation queue overload. AcceptFrom(peer.ID) AcceptStatus - // PreValidation is invoked on messages in the RPC envelope right before pushing it to + // Preprocess is invoked on messages in the RPC envelope right before pushing it to // the validation pipeline - PreValidation(from peer.ID, msgs []*Message) + Preprocess(from peer.ID, msgs []*Message) // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) @@ -1117,7 +1117,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { toPush = append(toPush, msg) } } - p.rt.PreValidation(rpc.from, toPush) + p.rt.Preprocess(rpc.from, toPush) for _, msg := range toPush { p.pushMsg(msg) } diff --git a/randomsub.go b/randomsub.go index f9f6473..fe70e43 100644 --- a/randomsub.go +++ b/randomsub.go @@ -94,7 +94,7 @@ func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus { return AcceptAll } -func (rs *RandomSubRouter) PreValidation(from peer.ID, msgs []*Message) {} +func (rs *RandomSubRouter) Preprocess(from peer.ID, msgs []*Message) {} func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} diff --git a/topic.go b/topic.go index a6ad979..b164e32 100644 --- a/topic.go +++ b/topic.go @@ -349,6 +349,7 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes } msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local} + t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg}) err := t.p.val.ValidateLocal(msg) if err != nil { return nil, err