mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 13:23:07 +00:00
Allow cancelling IWANT using IDONTWANT (#591)
As specified in the Gossipsub v1.2 spec, we should allow cancelling IWANT by IDONTWANT. That is if IDONTWANT already arrived, we should not process IWANT. However due to the code structure, we can cancel IWANT only in handleIWant. https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#cancelling-iwant
This commit is contained in:
parent
0936035d5f
commit
bf5b583843
@ -839,6 +839,11 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
|
||||
ihave := make(map[string]*pb.Message)
|
||||
for _, iwant := range ctl.GetIwant() {
|
||||
for _, mid := range iwant.GetMessageIDs() {
|
||||
// Check if that peer has sent IDONTWANT before, if so don't send them the message
|
||||
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
msg, count, ok := gs.mcache.GetForPeer(mid, p)
|
||||
if !ok {
|
||||
continue
|
||||
|
||||
@ -3079,6 +3079,110 @@ func TestGossipsubIdontwantSmallMessage(t *testing.T) {
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
// Test that IWANT will have no effect after IDONTWANT is sent
|
||||
func TestGossipsubIdontwantBeforeIwant(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hosts := getDefaultHosts(t, 3)
|
||||
|
||||
msgID := func(pmsg *pb.Message) string {
|
||||
// silly content-based test message-ID: just use the data as whole
|
||||
return base64.URLEncoding.EncodeToString(pmsg.Data)
|
||||
}
|
||||
|
||||
psubs := make([]*PubSub, 2)
|
||||
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
|
||||
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))
|
||||
|
||||
topic := "foobar"
|
||||
for _, ps := range psubs {
|
||||
_, err := ps.Subscribe(topic)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait a bit after the last message before checking the result
|
||||
msgWaitMax := 2 * time.Second
|
||||
msgTimer := time.NewTimer(msgWaitMax)
|
||||
|
||||
// Checks we received right messages
|
||||
msgReceived := false
|
||||
ihaveReceived := false
|
||||
checkMsgs := func() {
|
||||
if msgReceived {
|
||||
t.Fatalf("Expected no messages received after IDONWANT")
|
||||
}
|
||||
if !ihaveReceived {
|
||||
t.Fatalf("Expected IHAVE received")
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the timer to expire
|
||||
go func() {
|
||||
select {
|
||||
case <-msgTimer.C:
|
||||
checkMsgs()
|
||||
cancel()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
checkMsgs()
|
||||
}
|
||||
}()
|
||||
|
||||
newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
|
||||
// Check if it receives any message
|
||||
if len(irpc.GetPublish()) > 0 {
|
||||
msgReceived = true
|
||||
}
|
||||
// The middle peer is supposed to send IHAVE
|
||||
for _, ihave := range irpc.GetControl().GetIhave() {
|
||||
ihaveReceived = true
|
||||
mids := ihave.GetMessageIDs()
|
||||
|
||||
writeMsg(&pb.RPC{
|
||||
Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: mids}}},
|
||||
})
|
||||
// Wait for the middle peer to process IDONTWANT
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
writeMsg(&pb.RPC{
|
||||
Control: &pb.ControlMessage{Iwant: []*pb.ControlIWant{{MessageIDs: mids}}},
|
||||
})
|
||||
}
|
||||
// When the middle peer connects it will send us its subscriptions
|
||||
for _, sub := range irpc.GetSubscriptions() {
|
||||
if sub.GetSubscribe() {
|
||||
// Reply by subcribing to the topic and pruning to the middle peer to make sure
|
||||
// that it's not in the mesh
|
||||
writeMsg(&pb.RPC{
|
||||
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
||||
Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}},
|
||||
})
|
||||
|
||||
go func() {
|
||||
// Wait for an interval to make sure the middle peer
|
||||
// received and processed the subscribe
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
data := make([]byte, 16)
|
||||
crand.Read(data)
|
||||
|
||||
// Publish the message from the first peer
|
||||
if err := psubs[0].Publish(topic, data); err != nil {
|
||||
t.Error(err)
|
||||
return // cannot call t.Fatal in a non-test goroutine
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
connect(t, hosts[0], hosts[1])
|
||||
connect(t, hosts[1], hosts[2])
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
// Test that IDONTWANT will cleared when it's old enough
|
||||
func TestGossipsubIdontwantClear(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user