diff --git a/floodsub.go b/floodsub.go index 4c943bb..359886a 100644 --- a/floodsub.go +++ b/floodsub.go @@ -71,7 +71,7 @@ func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus { return AcceptAll } -func (fs *FloodSubRouter) PreValidation(from peer.ID, msgs []*Message) {} +func (fs *FloodSubRouter) Preprocess(from peer.ID, msgs []*Message) {} func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} diff --git a/gossipsub.go b/gossipsub.go index b5a605a..3b52efe 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -707,10 +707,10 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus { return gs.gate.AcceptFrom(p) } -// PreValidation sends the IDONTWANT control messages to all the mesh +// Preprocess sends the IDONTWANT control messages to all the mesh // peers. They need to be sent right before the validation because they // should be seen by the peers as soon as possible. -func (gs *GossipSubRouter) PreValidation(from peer.ID, msgs []*Message) { +func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) { tmids := make(map[string][]string) for _, msg := range msgs { if len(msg.GetData()) < gs.params.IDontWantMessageThreshold { diff --git a/gossipsub_test.go b/gossipsub_test.go index 72188be..2231352 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -2847,7 +2847,7 @@ var _ RawTracer = &mockRawTracer{} func TestGossipsubNoIDONTWANTToMessageSender(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getDefaultHosts(t, 3) + hosts := getDefaultHosts(t, 2) denseConnect(t, hosts) psubs := make([]*PubSub, 2) @@ -2886,6 +2886,67 @@ func TestGossipsubNoIDONTWANTToMessageSender(t *testing.T) { t.Fatal("IDONTWANT should not be sent to the message sender") case <-time.After(time.Second): } +} + +func TestGossipsubIDONTWANTBeforeFirstPublish(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 2) + denseConnect(t, hosts) + + psubs := make([]*PubSub, 2) + + psubs[0] = getGossipsub(ctx, hosts[0]) + rpcsReceived := make(chan string) + psubs[1] = getGossipsub(ctx, hosts[1], WithRawTracer(&mockRawTracer{ + onRecvRPC: func(rpc *RPC) { + if len(rpc.GetControl().GetIdontwant()) > 0 { + rpcsReceived <- "idontwant" + } + if len(rpc.GetPublish()) > 0 { + rpcsReceived <- "publish" + } + }, + })) + + topicString := "foobar" + var topics []*Topic + for _, ps := range psubs { + topic, err := ps.Join(topicString) + if err != nil { + t.Fatal(err) + } + topics = append(topics, topic) + + _, err = ps.Subscribe(topicString) + if err != nil { + t.Fatal(err) + } + } + time.Sleep(2 * time.Second) + + msg := make([]byte, GossipSubIDontWantMessageThreshold+1) + _ = topics[0].Publish(ctx, msg) + + timeout := time.After(5 * time.Second) + + select { + case kind := <-rpcsReceived: + if kind == "publish" { + t.Fatal("IDONTWANT should be sent before publish") + } + case <-timeout: + t.Fatal("IDONTWANT should be sent on first publish") + } + + select { + case kind := <-rpcsReceived: + if kind != "publish" { + t.Fatal("Expected publish after IDONTWANT") + } + case <-timeout: + t.Fatal("Expected publish after IDONTWANT") + } } diff --git a/pubsub.go b/pubsub.go index fae115a..e8e598b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -204,9 +204,9 @@ type PubSubRouter interface { // Allows routers with internal scoring to vet peers before committing any processing resources // to the message and implement an effective graylist and react to validation queue overload. AcceptFrom(peer.ID) AcceptStatus - // PreValidation is invoked on messages in the RPC envelope right before pushing it to + // Preprocess is invoked on messages in the RPC envelope right before pushing it to // the validation pipeline - PreValidation(from peer.ID, msgs []*Message) + Preprocess(from peer.ID, msgs []*Message) // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) @@ -1117,7 +1117,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { toPush = append(toPush, msg) } } - p.rt.PreValidation(rpc.from, toPush) + p.rt.Preprocess(rpc.from, toPush) for _, msg := range toPush { p.pushMsg(msg) } diff --git a/randomsub.go b/randomsub.go index f9f6473..fe70e43 100644 --- a/randomsub.go +++ b/randomsub.go @@ -94,7 +94,7 @@ func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus { return AcceptAll } -func (rs *RandomSubRouter) PreValidation(from peer.ID, msgs []*Message) {} +func (rs *RandomSubRouter) Preprocess(from peer.ID, msgs []*Message) {} func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} diff --git a/topic.go b/topic.go index a6ad979..b164e32 100644 --- a/topic.go +++ b/topic.go @@ -349,6 +349,7 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes } msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local} + t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg}) err := t.p.val.ValidateLocal(msg) if err != nil { return nil, err