diff --git a/p2p/security/noise/integration_test.go b/p2p/security/noise/integration_test.go index 279ad9d6..0cb2a930 100644 --- a/p2p/security/noise/integration_test.go +++ b/p2p/security/noise/integration_test.go @@ -1,7 +1,6 @@ package noise import ( - "bufio" "context" "crypto/rand" "fmt" @@ -12,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" "io" + "io/ioutil" mrand "math/rand" "testing" "time" @@ -111,8 +111,8 @@ func TestLibp2pIntegration_NoPipes(t *testing.T) { defer hb.Close() - ha.SetStreamHandler(testProtocolID, handleStream) - hb.SetStreamHandler(testProtocolID, handleStream) + ha.SetStreamHandler(testProtocolID, streamHandler(t)) + hb.SetStreamHandler(testProtocolID, streamHandler(t)) addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", hb.Addrs()[0].String(), hb.ID())) if err != nil { @@ -136,7 +136,7 @@ func TestLibp2pIntegration_NoPipes(t *testing.T) { t.Fatal(err) } - _, err = stream.Write([]byte("hello\n")) + err = writeRandomPayloadAndClose(t, stream) if err != nil { t.Fatal(err) } @@ -168,8 +168,8 @@ func TestLibp2pIntegration_WithPipes(t *testing.T) { defer hb.Close() - ha.SetStreamHandler(testProtocolID, handleStream) - hb.SetStreamHandler(testProtocolID, handleStream) + ha.SetStreamHandler(testProtocolID, streamHandler(t)) + hb.SetStreamHandler(testProtocolID, streamHandler(t)) addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", ha.Addrs()[0].String(), ha.ID())) if err != nil { @@ -193,7 +193,7 @@ func TestLibp2pIntegration_WithPipes(t *testing.T) { t.Fatal(err) } - _, err = stream.Write([]byte("hello\n")) + err = writeRandomPayloadAndClose(t, stream) if err != nil { t.Fatal(err) } @@ -225,8 +225,8 @@ func TestLibp2pIntegration_XXFallback(t *testing.T) { defer hb.Close() - ha.SetStreamHandler(testProtocolID, handleStream) - hb.SetStreamHandler(testProtocolID, handleStream) + ha.SetStreamHandler(testProtocolID, streamHandler(t)) + hb.SetStreamHandler(testProtocolID, streamHandler(t)) addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", hb.Addrs()[0].String(), hb.ID())) if err != nil { @@ -250,7 +250,7 @@ func TestLibp2pIntegration_XXFallback(t *testing.T) { t.Fatal(err) } - _, err = stream.Write([]byte("hello\n")) + err = writeRandomPayloadAndClose(t, stream) if err != nil { t.Fatal(err) } @@ -277,18 +277,38 @@ func TestConstrucingWithMaker(t *testing.T) { _ = h.Close() } -func handleStream(stream net.Stream) { - defer func() { - if err := stream.Close(); err != nil { - log.Error("error closing stream", "err", err) - } - }() +func writeRandomPayloadAndClose(t *testing.T, stream net.Stream) error { + t.Helper() + size := 1 << 24 + r := mrand.New(mrand.NewSource(42)) + start := time.Now() + lr := io.LimitReader(r, int64(size)) - rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) - msg, err := rw.Reader.ReadString('\n') + c, err := io.Copy(stream, lr) + elapsed := time.Since(start) if err != nil { - fmt.Println("stream err", err) - return + return fmt.Errorf("failed to write out bytes: %v", err) + } + t.Logf("wrote %d bytes in %dms", c, elapsed.Milliseconds()) + return stream.Close() +} + +func streamHandler(t *testing.T) func(net.Stream) { + return func(stream net.Stream) { + t.Helper() + defer func() { + if err := stream.Close(); err != nil { + t.Error("error closing stream: ", err) + } + }() + + start := time.Now() + c, err := io.Copy(ioutil.Discard, stream) + elapsed := time.Since(start) + if err != nil { + t.Error("error reading from stream: ", err) + return + } + t.Logf("read %d bytes in %dms", c, elapsed.Milliseconds()) } - fmt.Printf("got msg: %s", msg) }