whisper/libp2p/ping_test.go

138 lines
3.7 KiB
Go

package libp2p
import (
"context"
"errors"
"fmt"
"math/rand"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/p2p/discover"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
net "github.com/libp2p/go-libp2p-net"
protocol "github.com/libp2p/go-libp2p-protocol"
"github.com/stretchr/testify/require"
)
func TestPingPong(t *testing.T) {
rng := rand.New(rand.NewSource(1))
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Secp256k1, 0, rng)
require.NoError(t, err)
h, err := libp2p.New(context.Background(),
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/9999"),
libp2p.Identity(priv),
libp2p.DisableRelay(),
)
priv, _, err = crypto.GenerateKeyPairWithReader(crypto.Secp256k1, 0, rng)
c, err := libp2p.New(context.Background(),
libp2p.Identity(priv),
libp2p.DisableRelay(),
)
require.NoError(t, err)
c.Peerstore().AddAddr(h.ID(), h.Addrs()[0], 30*time.Second)
conn := connection{id: discover.NodeID{1}, received: new(atomic.Value), period: 10 * time.Second}
conn.Update(time.Time{})
h.SetStreamHandler(pingproto, makePingHandler(conn, 1*time.Second, 10*time.Second, 10*time.Second))
s, err := c.NewStream(context.TODO(), h.ID(), pingproto)
require.NoError(t, err)
received := [1]byte{}
require.NoError(t, s.SetReadDeadline(time.Now().Add(3*time.Second)))
_, err = s.Read(received[:])
require.NoError(t, err)
require.Equal(t, pingMsg, received[0])
require.True(t, conn.IsFlaky())
_, err = s.Write([]byte{pingMsg})
require.NoError(t, err)
require.NoError(t, Eventually(func() error {
if conn.IsFlaky() {
return errors.New("connection is flaky")
}
return nil
}, 100*time.Millisecond, 1*time.Second))
require.NoError(t, Eventually(func() error {
if err := s.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
return err
}
received := [1]byte{}
_, err := s.Read(received[:])
if err != nil {
return err
}
if received[0] != pongMsg {
return fmt.Errorf("not a pong: %x", received)
}
return nil
}, 100*time.Millisecond, 1*time.Second))
}
func TestConnectionClosed(t *testing.T) {
rng := rand.New(rand.NewSource(1))
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Secp256k1, 0, rng)
require.NoError(t, err)
h, err := libp2p.New(context.Background(),
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/8888"),
libp2p.Identity(priv),
libp2p.DisableRelay(),
)
priv, _, err = crypto.GenerateKeyPairWithReader(crypto.Secp256k1, 0, rng)
c, err := libp2p.New(context.Background(),
libp2p.Identity(priv),
libp2p.DisableRelay(),
)
require.NoError(t, err)
c.Peerstore().AddAddr(h.ID(), h.Addrs()[0], 30*time.Second)
conn := connection{id: discover.NodeID{1}, received: new(atomic.Value), period: 10 * time.Second}
conn.Update(time.Time{})
errors := make(chan error, 1)
h.SetStreamHandler(protocol.TestingID, func(s net.Stream) {
rcv := [1]byte{}
for {
_, err := s.Read(rcv[:])
if err != nil {
errors <- err
return
}
}
})
h.SetStreamHandler(pingproto, makePingHandler(conn, time.Second, time.Second, time.Second))
_, err = c.NewStream(context.TODO(), h.ID(), pingproto)
require.NoError(t, err)
_, err = c.NewStream(context.TODO(), h.ID(), protocol.TestingID)
require.NoError(t, err)
select {
case err := <-errors:
require.EqualError(t, err, "stream reset")
case <-time.After(5 * time.Second):
require.FailNow(t, "connection wasn't closed in expected time")
}
}
func Eventually(f func() error, period, timeout time.Duration) (err error) {
after := time.NewTimer(timeout)
defer after.Stop()
tick := time.NewTicker(period)
defer tick.Stop()
for {
select {
case <-tick.C:
err = f()
if err == nil {
return
}
case <-after.C:
return
}
}
}