Merge pull request #2 from waku-org/rebase-from-libp2p

rebase from libp2p
This commit is contained in:
Prem Chaitanya Prathi 2024-08-09 07:09:22 +05:30 committed by GitHub
commit 2f15576fd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 510 additions and 295 deletions

19
.github/workflows/go-check.yml vendored Normal file
View File

@ -0,0 +1,19 @@
name: Go Checks
on:
pull_request:
push:
branches: ["master"]
workflow_dispatch:
merge_group:
permissions:
contents: read
concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }}
cancel-in-progress: true
jobs:
go-check:
uses: ipdxco/unified-github-workflows/.github/workflows/go-check.yml@v1.0

4
.github/workflows/go-test-config.json vendored Normal file
View File

@ -0,0 +1,4 @@
{
"skipOSes": ["windows", "macos"],
"skipRace": true
}

21
.github/workflows/go-test.yml vendored Normal file
View File

@ -0,0 +1,21 @@
name: Go Test
on:
pull_request:
push:
branches: ["master"]
workflow_dispatch:
merge_group:
permissions:
contents: read
concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }}
cancel-in-progress: true
jobs:
go-test:
uses: ipdxco/unified-github-workflows/.github/workflows/go-test.yml@v1.0
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

21
.github/workflows/release-check.yml vendored Normal file
View File

@ -0,0 +1,21 @@
name: Release Checker
on:
pull_request_target:
paths: ["version.json"]
types: [ opened, synchronize, reopened, labeled, unlabeled ]
workflow_dispatch:
permissions:
contents: write
pull-requests: write
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
release-check:
uses: ipdxco/unified-github-workflows/.github/workflows/release-check.yml@v1.0
with:
sources: '["version.json"]'

21
.github/workflows/releaser.yml vendored Normal file
View File

@ -0,0 +1,21 @@
name: Releaser
on:
push:
paths: ["version.json"]
workflow_dispatch:
permissions:
contents: write
concurrency:
group: ${{ github.workflow }}-${{ github.sha }}
cancel-in-progress: true
jobs:
releaser:
uses: ipdxco/unified-github-workflows/.github/workflows/releaser.yml@v1.0
with:
sources: '["version.json"]'
secrets:
UCI_GITHUB_TOKEN: ${{ secrets.UCI_GITHUB_TOKEN }}

18
.github/workflows/tagpush.yml vendored Normal file
View File

@ -0,0 +1,18 @@
name: Tag Push Checker
on:
push:
tags:
- v*
permissions:
contents: read
issues: write
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
releaser:
uses: ipdxco/unified-github-workflows/.github/workflows/tagpush.yml@v1.0

View File

@ -43,7 +43,6 @@ func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Dur
info: make(map[peer.ID]*backoffHistory),
}
rand.Seed(time.Now().UnixNano()) // used for jitter
go b.cleanupLoop(ctx)
return b

View File

@ -38,7 +38,7 @@ func TestBlacklist(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -66,7 +66,7 @@ func TestBlacklist2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -99,7 +99,7 @@ func TestBlacklist3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
psubs[1].BlacklistPeer(hosts[0].ID())

View File

@ -134,7 +134,7 @@ func TestSimpleDiscovery(t *testing.T) {
server := newDiscoveryServer()
discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(1 * time.Minute)}
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)
@ -234,7 +234,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)}
// Put the pubsub clients into two partitions
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)

View File

@ -3,11 +3,12 @@ package pubsub
import (
"bytes"
"context"
crand "crypto/rand"
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"math/rand"
mrand "math/rand"
"sort"
"sync"
"testing"
@ -20,15 +21,13 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
//lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated
"github.com/libp2p/go-msgio/protoio"
)
func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) {
data := make([]byte, 16)
rand.Read(data)
crand.Read(data)
for _, p := range pubs {
err := p.Publish(topic, data)
@ -42,19 +41,6 @@ func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Sub
}
}
func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := swarmt.GenSwarm(t)
h := bhost.NewBlankHost(netw)
t.Cleanup(func() { h.Close() })
out = append(out, h)
}
return out
}
func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
@ -74,7 +60,7 @@ func denseConnect(t *testing.T, hosts []host.Host) {
func connectSome(t *testing.T, hosts []host.Host, d int) {
for i, a := range hosts {
for j := 0; j < d; j++ {
n := rand.Intn(len(hosts))
n := mrand.Intn(len(hosts))
if n == i {
j--
continue
@ -151,7 +137,7 @@ func assertNeverReceives(t *testing.T, ch *Subscription, timeout time.Duration)
func TestBasicFloodsub(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getPubsubs(ctx, hosts)
@ -173,7 +159,7 @@ func TestBasicFloodsub(t *testing.T) {
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
@ -193,7 +179,7 @@ func TestMultihops(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 6)
hosts := getDefaultHosts(t, 6)
psubs := getPubsubs(ctx, hosts)
@ -235,7 +221,7 @@ func TestReconnects(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
hosts := getDefaultHosts(t, 3)
psubs := getPubsubs(ctx, hosts)
@ -309,7 +295,7 @@ func TestNoConnection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getPubsubs(ctx, hosts)
@ -334,7 +320,7 @@ func TestSelfReceive(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host := getNetHosts(t, ctx, 1)[0]
host := getDefaultHosts(t, 1)[0]
psub, err := NewFloodSub(ctx, host)
if err != nil {
@ -368,7 +354,7 @@ func TestOneToOne(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -401,7 +387,7 @@ func TestTreeTopology(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -464,7 +450,7 @@ func TestFloodSubPluggableProtocol(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
hosts := getDefaultHosts(t, 3)
psubA := mustCreatePubSub(ctx, t, hosts[0], "/esh/floodsub", "/lsr/floodsub")
psubB := mustCreatePubSub(ctx, t, hosts[1], "/esh/floodsub")
@ -496,7 +482,7 @@ func TestFloodSubPluggableProtocol(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubA := mustCreatePubSub(ctx, t, hosts[0], "/esh/floodsub")
psubB := mustCreatePubSub(ctx, t, hosts[1], "/lsr/floodsub")
@ -551,7 +537,7 @@ func TestSubReporting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host := getNetHosts(t, ctx, 1)[0]
host := getDefaultHosts(t, 1)[0]
psub, err := NewFloodSub(ctx, host)
if err != nil {
t.Fatal(err)
@ -593,7 +579,7 @@ func TestPeerTopicReporting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 4)
hosts := getDefaultHosts(t, 4)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -650,7 +636,7 @@ func TestSubscribeMultipleTimes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -695,7 +681,7 @@ func TestPeerDisconnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -743,7 +729,7 @@ func TestWithNoSigning(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts, WithNoAuthor(), WithMessageIdFn(func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
@ -788,7 +774,7 @@ func TestWithSigning(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts, WithStrictSignatureVerification(true))
connect(t, hosts[0], hosts[1])
@ -830,7 +816,7 @@ func TestImproperlySignedMessageRejected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
adversary := hosts[0]
honestPeer := hosts[1]
@ -948,7 +934,7 @@ func TestMessageSender(t *testing.T) {
const topic = "foobar"
hosts := getNetHosts(t, ctx, 3)
hosts := getDefaultHosts(t, 3)
psubs := getPubsubs(ctx, hosts)
var msgs []*Subscription
@ -1002,7 +988,7 @@ func TestConfigurableMaxMessageSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
// use a 4mb limit; default is 1mb; we'll test with a 2mb payload.
psubs := getPubsubs(ctx, hosts, WithMaxMessageSize(1<<22))
@ -1022,7 +1008,7 @@ func TestConfigurableMaxMessageSize(t *testing.T) {
// 2mb payload.
msg := make([]byte, 1<<21)
rand.Read(msg)
crand.Read(msg)
err := psubs[0].Publish(topic, msg)
if err != nil {
t.Fatal(err)
@ -1045,7 +1031,7 @@ func TestAnnounceRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
ps := getPubsub(ctx, hosts[0])
watcher := &announceWatcher{}
hosts[1].SetStreamHandler(FloodSubID, watcher.handleStream)
@ -1117,7 +1103,7 @@ func TestPubsubWithAssortedOptions(t *testing.T) {
return string(hash[:])
}
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts,
WithMessageIdFn(hashMsgID),
WithPeerOutboundQueueSize(10),
@ -1152,8 +1138,7 @@ func TestWithInvalidMessageAuthor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h.Close()
h := getDefaultHosts(t, 1)[0]
_, err := NewFloodSub(ctx, h, WithMessageAuthor("bogotr0n"))
if err == nil {
t.Fatal("expected error")
@ -1168,10 +1153,9 @@ func TestPreconnectedNodes(t *testing.T) {
defer cancel()
// Create hosts
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
defer h2.Close()
hosts := getDefaultHosts(t, 2)
h1 := hosts[0]
h2 := hosts[1]
opts := []Option{WithDiscovery(&dummyDiscovery{})}
// Setup first PubSub
@ -1229,10 +1213,9 @@ func TestDedupInboundStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
defer h2.Close()
hosts := getDefaultHosts(t, 2)
h1 := hosts[0]
h2 := hosts[1]
_, err := NewFloodSub(ctx, h1)
if err != nil {
@ -1247,18 +1230,30 @@ func TestDedupInboundStreams(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, err = s1.Read(nil) // force protocol negotiation to complete
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
s2, err := h2.NewStream(ctx, h1.ID(), FloodSubID)
if err != nil {
t.Fatal(err)
}
_, err = s2.Read(nil) // force protocol negotiation to complete
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
s3, err := h2.NewStream(ctx, h1.ID(), FloodSubID)
if err != nil {
t.Fatal(err)
}
_, err = s3.Read(nil) // force protocol negotiation to complete
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
// check that s1 and s2 have been reset

View File

@ -3,6 +3,7 @@ package pubsub
import (
"context"
"fmt"
"io"
"math/rand"
"sort"
"time"
@ -543,6 +544,13 @@ func (gs *GossipSubRouter) manageAddrBook() {
for {
select {
case <-gs.p.ctx.Done():
cabCloser, ok := gs.cab.(io.Closer)
if ok {
errClose := cabCloser.Close()
if errClose != nil {
log.Warnf("failed to close addr book: %v", errClose)
}
}
return
case ev := <-sub.Out():
switch ev := ev.(type) {

View File

@ -7,15 +7,14 @@ import (
"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/host"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
)
func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
t.Skip("Test disabled with go-libp2p v0.22.0") // TODO: reenable test when updating to v0.23.0
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -70,9 +69,14 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
t.Fatal(err)
}
netw := swarmt.GenSwarm(t)
defer netw.Close()
h := bhost.NewBlankHost(netw, bhost.WithConnectionManager(connmgrs[i]))
h, err := libp2p.New(
libp2p.ResourceManager(&network.NullResourceManager{}),
libp2p.ConnectionManager(connmgrs[i]),
)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { h.Close() })
honestHosts[i] = h
honestPeers[h.ID()] = struct{}{}
}
@ -83,9 +87,9 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
WithFloodPublish(true))
// sybil squatters to be connected later
sybilHosts := getNetHosts(t, ctx, nSquatter)
sybilHosts := getDefaultHosts(t, nSquatter)
for _, h := range sybilHosts {
squatter := &sybilSquatter{h: h}
squatter := &sybilSquatter{h: h, ignoreErrors: true}
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
}
@ -139,18 +143,6 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
allHosts := append(honestHosts, sybilHosts...)
connectAll(t, allHosts)
// verify that we have a bunch of connections
for _, h := range honestHosts {
if len(h.Network().Conns()) != nHonest+nSquatter-1 {
t.Errorf("expected to have conns to all peers, have %d", len(h.Network().Conns()))
}
}
// force the connection managers to trim, so we don't need to muck about with timing as much
for _, cm := range connmgrs {
cm.TrimOpenConns(ctx)
}
// we should still have conns to all the honest peers, but not the sybils
for _, h := range honestHosts {
nHonestConns := 0

View File

@ -42,7 +42,7 @@ func TestGossipSubCustomProtocols(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
hosts := getDefaultHosts(t, 3)
gsubs := getGossipsubs(ctx, hosts[:2], WithGossipSubProtocols(protos, features))
fsub := getPubsub(ctx, hosts[2])

View File

@ -17,7 +17,7 @@ func TestGossipSubMatchingFn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := getNetHosts(t, ctx, 4)
h := getDefaultHosts(t, 4)
psubs := []*PubSub{
getGossipsub(ctx, h[0], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA100, GossipSubID_v11}, GossipSubDefaultFeatures)),
getGossipsub(ctx, h[1], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA101Beta}, GossipSubDefaultFeatures)),

View File

@ -2,7 +2,7 @@ package pubsub
import (
"context"
"math/rand"
"crypto/rand"
"strconv"
"sync"
"testing"
@ -15,6 +15,7 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"
//lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated
"github.com/libp2p/go-msgio/protoio"
)
@ -25,7 +26,7 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) {
defer cancel()
// Create legitimate and attacker hosts
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
@ -142,7 +143,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
defer cancel()
// Create legitimate and attacker hosts
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
@ -195,6 +196,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
})
sub := sub
go func() {
defer cancel()
@ -292,7 +294,7 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) {
defer cancel()
// Create legitimate and attacker hosts
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
@ -376,7 +378,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
defer cancel()
// Create legitimate and attacker hosts
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
@ -430,6 +432,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
Control: &pb.ControlMessage{Graft: graft},
})
sub := sub
go func() {
defer cancel()
@ -617,7 +620,7 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
defer cancel()
// Create legitimate and attacker hosts
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]

View File

@ -3,9 +3,10 @@ package pubsub
import (
"bytes"
"context"
crand "crypto/rand"
"fmt"
"io"
"math/rand"
mrand "math/rand"
"sync"
"sync/atomic"
"testing"
@ -13,16 +14,13 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/record"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
//lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated
"github.com/libp2p/go-msgio/protoio"
)
@ -45,7 +43,7 @@ func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSu
func TestSparseGossipsub(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -67,7 +65,7 @@ func TestSparseGossipsub(t *testing.T) {
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
@ -86,7 +84,7 @@ func TestSparseGossipsub(t *testing.T) {
func TestDenseGossipsub(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -108,7 +106,7 @@ func TestDenseGossipsub(t *testing.T) {
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
@ -127,7 +125,7 @@ func TestDenseGossipsub(t *testing.T) {
func TestGossipsubFanout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -196,7 +194,7 @@ func TestGossipsubFanout(t *testing.T) {
func TestGossipsubFanoutMaintenance(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -281,7 +279,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getGossipsubs(ctx, hosts)
@ -340,7 +338,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) {
func TestGossipsubGossip(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -362,7 +360,7 @@ func TestGossipsubGossip(t *testing.T) {
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
@ -388,7 +386,7 @@ func TestGossipsubGossipPiggyback(t *testing.T) {
t.Skip("test no longer relevant; gossip propagation has become eager")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -420,7 +418,7 @@ func TestGossipsubGossipPiggyback(t *testing.T) {
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
psubs[owner].Publish("bazcrux", msg)
@ -457,7 +455,7 @@ func TestGossipsubGossipPropagation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
hosts1 := hosts[:GossipSubD+1]
@ -537,7 +535,7 @@ func TestGossipsubGossipPropagation(t *testing.T) {
func TestGossipsubPrune(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -567,7 +565,7 @@ func TestGossipsubPrune(t *testing.T) {
for i := 0; i < 10; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
@ -586,7 +584,7 @@ func TestGossipsubPrune(t *testing.T) {
func TestGossipsubPruneBackoffTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
// App specific score that we'll change later.
currentScoreForHost0 := int32(0)
@ -665,7 +663,7 @@ func TestGossipsubPruneBackoffTime(t *testing.T) {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
// Don't publish from host 0, since everyone should have pruned it.
owner := rand.Intn(len(psubs)-1) + 1
owner := mrand.Intn(len(psubs)-1) + 1
psubs[owner].Publish("foobar", msg)
@ -684,7 +682,7 @@ func TestGossipsubPruneBackoffTime(t *testing.T) {
func TestGossipsubGraft(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -710,7 +708,7 @@ func TestGossipsubGraft(t *testing.T) {
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
@ -729,7 +727,7 @@ func TestGossipsubGraft(t *testing.T) {
func TestGossipsubRemovePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
@ -759,7 +757,7 @@ func TestGossipsubRemovePeer(t *testing.T) {
for i := 0; i < 10; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := 5 + rand.Intn(len(psubs)-5)
owner := 5 + mrand.Intn(len(psubs)-5)
psubs[owner].Publish("foobar", msg)
@ -779,7 +777,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getGossipsubs(ctx, hosts)
denseConnect(t, hosts)
@ -807,7 +805,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) {
for i, topic := range topics {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish(topic, msg)
@ -829,7 +827,7 @@ func TestGossipsubControlPiggyback(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getGossipsubs(ctx, hosts)
denseConnect(t, hosts)
@ -853,7 +851,7 @@ func TestGossipsubControlPiggyback(t *testing.T) {
// create a background flood of messages that overloads the queues
done := make(chan struct{})
go func() {
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
for i := 0; i < 10000; i++ {
msg := []byte("background flooooood")
psubs[owner].Publish("flood", msg)
@ -891,7 +889,7 @@ func TestGossipsubControlPiggyback(t *testing.T) {
for i, topic := range topics {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish(topic, msg)
@ -910,7 +908,7 @@ func TestGossipsubControlPiggyback(t *testing.T) {
func TestMixedGossipsub(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 30)
hosts := getDefaultHosts(t, 30)
gsubs := getGossipsubs(ctx, hosts[:20])
fsubs := getPubsubs(ctx, hosts[20:])
@ -934,7 +932,7 @@ func TestMixedGossipsub(t *testing.T) {
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(len(psubs))
owner := mrand.Intn(len(psubs))
psubs[owner].Publish("foobar", msg)
@ -954,7 +952,7 @@ func TestGossipsubMultihops(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 6)
hosts := getDefaultHosts(t, 6)
psubs := getGossipsubs(ctx, hosts)
@ -997,7 +995,7 @@ func TestGossipsubTreeTopology(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getGossipsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -1061,7 +1059,7 @@ func TestGossipsubStarTopology(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true))
// configure the center of the star with a very low D
@ -1223,7 +1221,7 @@ func TestGossipsubDirectPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := getNetHosts(t, ctx, 3)
h := getDefaultHosts(t, 3)
psubs := []*PubSub{
getGossipsub(ctx, h[0], WithDirectConnectTicks(2)),
getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}}), WithDirectConnectTicks(2)),
@ -1287,7 +1285,7 @@ func TestGossipSubPeerFilter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := getNetHosts(t, ctx, 3)
h := getDefaultHosts(t, 3)
psubs := []*PubSub{
getGossipsub(ctx, h[0], WithPeerFilter(func(pid peer.ID, topic string) bool {
return pid == h[1].ID()
@ -1329,7 +1327,7 @@ func TestGossipsubDirectPeersFanout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := getNetHosts(t, ctx, 3)
h := getDefaultHosts(t, 3)
psubs := []*PubSub{
getGossipsub(ctx, h[0]),
getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}})),
@ -1416,7 +1414,7 @@ func TestGossipsubFloodPublish(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts, WithFloodPublish(true))
// build the star
@ -1451,7 +1449,7 @@ func TestGossipsubEnoughPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts)
for _, ps := range psubs {
@ -1500,7 +1498,7 @@ func TestGossipsubCustomParams(t *testing.T) {
wantedMaxPendingConns := 23
params.MaxPendingConnections = wantedMaxPendingConns
hosts := getNetHosts(t, ctx, 1)
hosts := getDefaultHosts(t, 1)
psubs := getGossipsubs(ctx, hosts,
WithGossipSubParams(params))
@ -1529,7 +1527,7 @@ func TestGossipsubNegativeScore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts,
WithPeerScore(
&PeerScoreParams{
@ -1613,7 +1611,7 @@ func TestGossipsubScoreValidatorEx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
hosts := getDefaultHosts(t, 3)
psubs := getGossipsubs(ctx, hosts,
WithPeerScore(
&PeerScoreParams{
@ -1701,8 +1699,7 @@ func TestGossipsubPiggybackControl(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h.Close()
h := getDefaultHosts(t, 1)[0]
ps := getGossipsub(ctx, h)
blah := peer.ID("bogotr0n")
@ -1750,7 +1747,7 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getGossipsubs(ctx, hosts)
sparseConnect(t, hosts)
@ -1818,7 +1815,7 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 50)
hosts := getDefaultHosts(t, 50)
// pubsubs for the first 10 hosts
psubs := getGossipsubs(ctx, hosts[:10],
WithFloodPublish(true),
@ -1919,7 +1916,7 @@ func TestGossipSubLeaveTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := getNetHosts(t, ctx, 2)
h := getDefaultHosts(t, 2)
psubs := []*PubSub{
getGossipsub(ctx, h[0]),
getGossipsub(ctx, h[1]),
@ -1928,13 +1925,11 @@ func TestGossipSubLeaveTopic(t *testing.T) {
connect(t, h[0], h[1])
// Join all peers
var subs []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test")
_, err := ps.Subscribe("test")
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub)
}
time.Sleep(time.Second)
@ -1990,7 +1985,7 @@ func TestGossipSubJoinTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := getNetHosts(t, ctx, 3)
h := getDefaultHosts(t, 3)
psubs := []*PubSub{
getGossipsub(ctx, h[0]),
getGossipsub(ctx, h[1]),
@ -2009,13 +2004,11 @@ func TestGossipSubJoinTopic(t *testing.T) {
router0.backoff["test"] = peerMap
// Join all peers
var subs []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test")
_, err := ps.Subscribe("test")
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub)
}
time.Sleep(time.Second)
@ -2032,7 +2025,8 @@ func TestGossipSubJoinTopic(t *testing.T) {
}
type sybilSquatter struct {
h host.Host
h host.Host
ignoreErrors bool // set to false to ignore connection/stream errors.
}
func (sq *sybilSquatter) handleStream(s network.Stream) {
@ -2040,7 +2034,10 @@ func (sq *sybilSquatter) handleStream(s network.Stream) {
os, err := sq.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10)
if err != nil {
panic(err)
if !sq.ignoreErrors {
panic(err)
}
return
}
// send a subscription for test in the output stream to become candidate for GRAFT
@ -2051,7 +2048,10 @@ func (sq *sybilSquatter) handleStream(s network.Stream) {
topic := "test"
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, Topicid: &topic}}})
if err != nil {
panic(err)
if !sq.ignoreErrors {
panic(err)
}
return
}
var rpc pb.RPC
@ -2072,7 +2072,7 @@ func TestGossipsubPeerScoreInspect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
inspector := &mockPeerScoreInspector{}
psub1 := getGossipsub(ctx, hosts[0],
@ -2132,7 +2132,7 @@ func TestGossipsubPeerScoreResetTopicParams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 1)
hosts := getDefaultHosts(t, 1)
ps := getGossipsub(ctx, hosts[0],
WithPeerScore(
@ -2199,7 +2199,7 @@ func TestGossipsubRPCFragmentation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
ps := getGossipsub(ctx, hosts[0])
// make a fake peer that requests everything through IWANT gossip
@ -2222,7 +2222,7 @@ func TestGossipsubRPCFragmentation(t *testing.T) {
msgSize := 20000
for i := 0; i < nMessages; i++ {
msg := make([]byte, msgSize)
rand.Read(msg)
crand.Read(msg)
ps.Publish("test", msg)
time.Sleep(20 * time.Millisecond)
}
@ -2362,7 +2362,7 @@ func TestFragmentRPCFunction(t *testing.T) {
mkMsg := func(size int) *pb.Message {
msg := &pb.Message{}
msg.Data = make([]byte, size-4) // subtract the protobuf overhead, so msg.Size() returns requested size
rand.Read(msg.Data)
crand.Read(msg.Data)
return msg
}
@ -2476,7 +2476,7 @@ func TestFragmentRPCFunction(t *testing.T) {
messageIds := make([]string, msgsPerTopic)
for m := 0; m < msgsPerTopic; m++ {
mid := make([]byte, messageIdSize)
rand.Read(mid)
crand.Read(mid)
messageIds[m] = string(mid)
}
rpc.Control.Ihave[i] = &pb.ControlIHave{MessageIDs: messageIds}
@ -2497,7 +2497,7 @@ func TestFragmentRPCFunction(t *testing.T) {
// It should not be present in the fragmented messages, but smaller IDs should be
rpc.Reset()
giantIdBytes := make([]byte, limit*2)
rand.Read(giantIdBytes)
crand.Read(giantIdBytes)
rpc.Control = &pb.ControlMessage{
Iwant: []*pb.ControlIWant{
{MessageIDs: []string{"hello", string(giantIdBytes)}},
@ -2553,21 +2553,6 @@ func FuzzAppendOrMergeRPC(f *testing.F) {
})
}
func getDefaultHosts(t *testing.T, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { h.Close() })
out = append(out, h)
}
return out
}
func TestGossipsubManagesAnAddressBook(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -1,75 +0,0 @@
package pubsub
import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)
var _ network.Notifiee = (*PubSubNotif)(nil)
type PubSubNotif PubSub
func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream) {
}
func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream) {
}
func (p *PubSubNotif) Connected(n network.Network, c network.Conn) {
// ignore transient connections
if c.Stat().Limited {
return
}
go func() {
p.newPeersPrioLk.RLock()
p.newPeersMx.Lock()
p.newPeersPend[c.RemotePeer()] = struct{}{}
p.newPeersMx.Unlock()
p.newPeersPrioLk.RUnlock()
select {
case p.newPeers <- struct{}{}:
default:
}
}()
}
func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn) {
}
func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr) {
}
func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr) {
}
func (p *PubSubNotif) Initialize() {
isTransient := func(pid peer.ID) bool {
for _, c := range p.host.Network().ConnsToPeer(pid) {
if !c.Stat().Limited {
return false
}
}
return true
}
p.newPeersPrioLk.RLock()
p.newPeersMx.Lock()
for _, pid := range p.host.Network().Peers() {
if isTransient(pid) {
continue
}
p.newPeersPend[pid] = struct{}{}
}
p.newPeersMx.Unlock()
p.newPeersPrioLk.RUnlock()
select {
case p.newPeers <- struct{}{}:
default:
}
}

76
notify_test.go Normal file
View File

@ -0,0 +1,76 @@
package pubsub
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
)
func TestNotifyPeerProtocolsUpdated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 2)
// Initialize id services.
{
ids1, err := identify.NewIDService(hosts[0])
if err != nil {
t.Fatal(err)
}
ids1.Start()
defer ids1.Close()
ids2, err := identify.NewIDService(hosts[1])
if err != nil {
t.Fatal(err)
}
ids2.Start()
defer ids2.Close()
}
psubs0 := getPubsub(ctx, hosts[0])
connect(t, hosts[0], hosts[1])
// Delay to make sure that peers are connected.
<-time.After(time.Millisecond * 100)
psubs1 := getPubsub(ctx, hosts[1])
// Pubsub 0 joins topic "test".
topic0, err := psubs0.Join("test")
if err != nil {
t.Fatal(err)
}
defer topic0.Close()
sub0, err := topic0.Subscribe()
if err != nil {
t.Fatal(err)
}
defer sub0.Cancel()
// Pubsub 1 joins topic "test".
topic1, err := psubs1.Join("test")
if err != nil {
t.Fatal(err)
}
defer topic1.Close()
sub1, err := topic1.Subscribe()
if err != nil {
t.Fatal(err)
}
defer sub1.Cancel()
// Delay before checking results (similar to most tests).
<-time.After(time.Millisecond * 100)
if len(topic0.ListPeers()) == 0 {
t.Fatalf("topic0 should at least have 1 peer")
}
if len(topic1.ListPeers()) == 0 {
t.Fatalf("topic1 should at least have 1 peer")
}
}

112
peer_notify.go Normal file
View File

@ -0,0 +1,112 @@
package pubsub
import (
"context"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
func (ps *PubSub) watchForNewPeers(ctx context.Context) {
// We don't bother subscribing to "connectivity" events because we always run identify after
// every new connection.
sub, err := ps.host.EventBus().Subscribe([]interface{}{
&event.EvtPeerIdentificationCompleted{},
&event.EvtPeerProtocolsUpdated{},
})
if err != nil {
log.Errorf("failed to subscribe to peer identification events: %v", err)
return
}
defer sub.Close()
ps.newPeersPrioLk.RLock()
ps.newPeersMx.Lock()
for _, pid := range ps.host.Network().Peers() {
if ps.host.Network().Connectedness(pid) != network.Connected {
continue
}
ps.newPeersPend[pid] = struct{}{}
}
ps.newPeersMx.Unlock()
ps.newPeersPrioLk.RUnlock()
select {
case ps.newPeers <- struct{}{}:
default:
}
var supportsProtocol func(protocol.ID) bool
if ps.protoMatchFunc != nil {
var supportedProtocols []func(protocol.ID) bool
for _, proto := range ps.rt.Protocols() {
supportedProtocols = append(supportedProtocols, ps.protoMatchFunc(proto))
}
supportsProtocol = func(proto protocol.ID) bool {
for _, fn := range supportedProtocols {
if (fn)(proto) {
return true
}
}
return false
}
} else {
supportedProtocols := make(map[protocol.ID]struct{})
for _, proto := range ps.rt.Protocols() {
supportedProtocols[proto] = struct{}{}
}
supportsProtocol = func(proto protocol.ID) bool {
_, ok := supportedProtocols[proto]
return ok
}
}
for ctx.Err() == nil {
var ev any
select {
case <-ctx.Done():
return
case ev = <-sub.Out():
}
var protos []protocol.ID
var peer peer.ID
switch ev := ev.(type) {
case event.EvtPeerIdentificationCompleted:
peer = ev.Peer
protos = ev.Protocols
case event.EvtPeerProtocolsUpdated:
peer = ev.Peer
protos = ev.Added
default:
continue
}
// We don't bother checking connectivity (connected and non-"limited") here because
// we'll check when actually handling the new peer.
for _, p := range protos {
if supportsProtocol(p) {
ps.notifyNewPeer(peer)
break
}
}
}
}
func (ps *PubSub) notifyNewPeer(peer peer.ID) {
ps.newPeersPrioLk.RLock()
ps.newPeersMx.Lock()
ps.newPeersPend[peer] = struct{}{}
ps.newPeersMx.Unlock()
ps.newPeersPrioLk.RUnlock()
select {
case ps.newPeers <- struct{}{}:
default:
}
}

View File

@ -331,14 +331,12 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
h.SetStreamHandler(id, ps.handleNewStream)
}
}
h.Network().Notify((*PubSubNotif)(ps))
go ps.watchForNewPeers(ctx)
ps.val.Start(ps)
go ps.processLoop(ctx)
(*PubSubNotif)(ps).Initialize()
return ps, nil
}
@ -698,6 +696,8 @@ func (p *PubSub) handlePendingPeers() {
p.newPeersPrioLk.Unlock()
for pid := range newPeers {
// Make sure we have a non-limited connection. We do this late because we may have
// disconnected in the meantime.
if p.host.Network().Connectedness(pid) != network.Connected {
continue
}

View File

@ -4,13 +4,32 @@ import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
)
func getDefaultHosts(t *testing.T, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { h.Close() })
out = append(out, h)
}
return out
}
// See https://github.com/libp2p/go-libp2p-pubsub/issues/426
func TestPubSubRemovesBlacklistedPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
bl := NewMapBlacklist()

View File

@ -40,7 +40,7 @@ func TestRandomsubSmall(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getRandomsubs(ctx, hosts, 10)
connectAll(t, hosts)
@ -77,7 +77,7 @@ func TestRandomsubBig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 50)
hosts := getDefaultHosts(t, 50)
psubs := getRandomsubs(ctx, hosts, 50)
connectSome(t, hosts, 12)
@ -114,7 +114,7 @@ func TestRandomsubMixed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 40)
hosts := getDefaultHosts(t, 40)
fsubs := getPubsubs(ctx, hosts[:10])
rsubs := getRandomsubs(ctx, hosts[10:], 30)
psubs := append(fsubs, rsubs...)
@ -153,7 +153,7 @@ func TestRandomsubEnoughPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 40)
hosts := getDefaultHosts(t, 40)
fsubs := getPubsubs(ctx, hosts[:10])
rsubs := getRandomsubs(ctx, hosts[10:], 30)
psubs := append(fsubs, rsubs...)

View File

@ -19,15 +19,15 @@ func TestBasicSubscriptionFilter(t *testing.T) {
topic3 := "test3"
yes := true
subs := []*pb.RPC_SubOpts{
&pb.RPC_SubOpts{
{
Topicid: &topic1,
Subscribe: &yes,
},
&pb.RPC_SubOpts{
{
Topicid: &topic2,
Subscribe: &yes,
},
&pb.RPC_SubOpts{
{
Topicid: &topic3,
Subscribe: &yes,
},
@ -108,24 +108,24 @@ func TestSubscriptionFilterDeduplication(t *testing.T) {
yes := true
no := false
subs := []*pb.RPC_SubOpts{
&pb.RPC_SubOpts{
{
Topicid: &topic1,
Subscribe: &yes,
},
&pb.RPC_SubOpts{
{
Topicid: &topic1,
Subscribe: &yes,
},
&pb.RPC_SubOpts{
{
Topicid: &topic2,
Subscribe: &yes,
},
&pb.RPC_SubOpts{
{
Topicid: &topic2,
Subscribe: &no,
},
&pb.RPC_SubOpts{
{
Topicid: &topic3,
Subscribe: &yes,
},
@ -150,7 +150,7 @@ func TestSubscriptionFilterRPC(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
ps1 := getPubsub(ctx, hosts[0], WithSubscriptionFilter(NewAllowlistSubscriptionFilter("test1", "test2")))
ps2 := getPubsub(ctx, hosts[1], WithSubscriptionFilter(NewAllowlistSubscriptionFilter("test2", "test3")))

View File

@ -2,12 +2,8 @@ package timecache
import (
"time"
logger "github.com/ipfs/go-log/v2"
)
var log = logger.Logger("pubsub/timecache")
// Stategy is the TimeCache expiration strategy to use.
type Strategy uint8

View File

@ -99,7 +99,7 @@ func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic
const numHosts = 1
topicID := "foobar"
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
ps := getPubsub(ctx, hosts[0])
// Try create and cancel topic
@ -139,7 +139,7 @@ func TestTopicReuse(t *testing.T) {
const numHosts = 2
topicID := "foobar"
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
sender := getPubsub(ctx, hosts[0], WithDiscovery(&dummyDiscovery{}))
receiver := getPubsub(ctx, hosts[1])
@ -233,7 +233,7 @@ func TestTopicEventHandlerCancel(t *testing.T) {
const numHosts = 5
topicID := "foobar"
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
ps := getPubsub(ctx, hosts[0])
// Try create and cancel topic
@ -265,7 +265,7 @@ func TestSubscriptionJoinNotification(t *testing.T) {
const numLateSubscribers = 10
const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), "foobar")
evts := getTopicEvts(topics)
@ -331,7 +331,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) {
defer cancel()
const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
psubs := getPubsubs(ctx, hosts)
topics := getTopics(psubs, "foobar")
evts := getTopicEvts(topics)
@ -416,7 +416,7 @@ func TestSubscriptionManyNotifications(t *testing.T) {
const topic = "foobar"
const numHosts = 33
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
evts := getTopicEvts(topics)
@ -521,7 +521,7 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) {
const topic = "foobar"
const numHosts = 35
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
for i := 1; i < numHosts; i++ {
@ -539,7 +539,7 @@ func TestTopicRelay(t *testing.T) {
const topic = "foobar"
const numHosts = 5
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
// [0.Rel] - [1.Rel] - [2.Sub]
@ -603,7 +603,7 @@ func TestTopicRelayReuse(t *testing.T) {
const topic = "foobar"
const numHosts = 1
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
pubsubs := getPubsubs(ctx, hosts)
topics := getTopics(pubsubs, topic)
@ -670,7 +670,7 @@ func TestTopicRelayOnClosedTopic(t *testing.T) {
const topic = "foobar"
const numHosts = 1
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
err := topics[0].Close()
@ -690,7 +690,7 @@ func TestProducePanic(t *testing.T) {
const numHosts = 5
topicID := "foobar"
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
ps := getPubsub(ctx, hosts[0])
// Create topic
@ -743,7 +743,7 @@ func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) {
}
// Wait for the unsubscribe messages to reach the primary peer
for len(primaryTopic.ListPeers()) < 0 {
for len(primaryTopic.ListPeers()) > 0 {
time.Sleep(time.Millisecond * 100)
}
@ -792,7 +792,7 @@ func TestMinTopicSizeNoDiscovery(t *testing.T) {
const numHosts = 3
topicID := "foobar"
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
sender := getPubsub(ctx, hosts[0])
receiver1 := getPubsub(ctx, hosts[1])
@ -872,7 +872,7 @@ func TestWithTopicMsgIdFunction(t *testing.T) {
const topicA, topicB = "foobarA", "foobarB"
const numHosts = 2
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
pubsubs := getPubsubs(ctx, hosts, WithMessageIdFn(func(pmsg *pb.Message) string {
hash := sha256.Sum256(pmsg.Data)
return string(hash[:])
@ -932,7 +932,7 @@ func TestTopicPublishWithKeyInvalidParameters(t *testing.T) {
const numHosts = 5
virtualPeer := tnet.RandPeerNetParamsOrFatal(t)
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
t.Run("nil sign private key should error", func(t *testing.T) {
@ -959,7 +959,7 @@ func TestTopicRelayPublishWithKey(t *testing.T) {
const numHosts = 5
virtualPeer := tnet.RandPeerNetParamsOrFatal(t)
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
// [0.Rel] - [1.Rel] - [2.Sub]
@ -1026,7 +1026,7 @@ func TestWithLocalPublication(t *testing.T) {
const topic = "test"
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
pubsubs := getPubsubs(ctx, hosts)
topics := getTopics(pubsubs, topic)
connectAll(t, hosts)

View File

@ -17,9 +17,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
//lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated
"github.com/libp2p/go-msgio/protoio"
)
@ -27,7 +25,7 @@ func testWithTracer(t *testing.T, tracer EventTracer) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getGossipsubs(ctx, hosts,
WithEventTracer(tracer),
// to bootstrap from star topology
@ -302,10 +300,9 @@ func TestRemoteTracer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
defer h2.Close()
hosts := getDefaultHosts(t, 2)
h1 := hosts[0]
h2 := hosts[1]
mrt := &mockRemoteTracer{}
h1.SetStreamHandler(RemoteTracerProtoID, mrt.handleStream)

View File

@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
//lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated
"github.com/libp2p/go-msgio/protoio"
)

View File

@ -38,7 +38,7 @@ func testBasicSeqnoValidator(t *testing.T, ttl time.Duration) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getPubsubsWithOptionC(ctx, hosts,
func(i int) Option {
return WithDefaultValidator(NewBasicSeqnoValidator(newMockPeerMetadataStore()))
@ -86,7 +86,7 @@ func TestBasicSeqnoValidatorReplay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 20)
hosts := getDefaultHosts(t, 20)
psubs := getPubsubsWithOptionC(ctx, hosts[:19],
func(i int) Option {
return WithDefaultValidator(NewBasicSeqnoValidator(newMockPeerMetadataStore()))
@ -246,7 +246,7 @@ func (r *replayActor) replay(msg *pb.Message) {
var peers []peer.ID
r.mx.Lock()
for p, _ := range r.out {
for p := range r.out {
if rng.Intn(2) > 0 {
peers = append(peers, p)
}

View File

@ -15,7 +15,7 @@ func TestRegisterUnregisterValidator(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 1)
hosts := getDefaultHosts(t, 1)
psubs := getPubsubs(ctx, hosts)
err := psubs[0].RegisterTopicValidator("foo", func(context.Context, peer.ID, *Message) bool {
@ -40,7 +40,7 @@ func TestRegisterValidatorEx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
hosts := getDefaultHosts(t, 3)
psubs := getPubsubs(ctx, hosts)
err := psubs[0].RegisterTopicValidator("test",
@ -69,7 +69,7 @@ func TestValidate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -123,7 +123,7 @@ func TestValidate2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 1)
hosts := getDefaultHosts(t, 1)
psubs := getPubsubs(ctx, hosts)
topic := "foobar"
@ -201,7 +201,7 @@ func TestValidateOverload(t *testing.T) {
for tci, tc := range tcs {
t.Run(fmt.Sprintf("%d", tci), func(t *testing.T) {
hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
@ -273,7 +273,7 @@ func TestValidateAssortedOptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
hosts := getDefaultHosts(t, 10)
psubs := getPubsubs(ctx, hosts,
WithValidateQueueSize(10),
WithValidateThrottle(10),

3
version.json Normal file
View File

@ -0,0 +1,3 @@
{
"version": "v0.11.0"
}