send much more data in integration test

This commit is contained in:
Yusef Napora 2020-01-29 09:05:19 -05:00
parent 8c0011194e
commit 297dd7fae9
1 changed files with 41 additions and 21 deletions

View File

@ -1,7 +1,6 @@
package noise
import (
"bufio"
"context"
"crypto/rand"
"fmt"
@ -12,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"io"
"io/ioutil"
mrand "math/rand"
"testing"
"time"
@ -111,8 +111,8 @@ func TestLibp2pIntegration_NoPipes(t *testing.T) {
defer hb.Close()
ha.SetStreamHandler(testProtocolID, handleStream)
hb.SetStreamHandler(testProtocolID, handleStream)
ha.SetStreamHandler(testProtocolID, streamHandler(t))
hb.SetStreamHandler(testProtocolID, streamHandler(t))
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", hb.Addrs()[0].String(), hb.ID()))
if err != nil {
@ -136,7 +136,7 @@ func TestLibp2pIntegration_NoPipes(t *testing.T) {
t.Fatal(err)
}
_, err = stream.Write([]byte("hello\n"))
err = writeRandomPayloadAndClose(t, stream)
if err != nil {
t.Fatal(err)
}
@ -168,8 +168,8 @@ func TestLibp2pIntegration_WithPipes(t *testing.T) {
defer hb.Close()
ha.SetStreamHandler(testProtocolID, handleStream)
hb.SetStreamHandler(testProtocolID, handleStream)
ha.SetStreamHandler(testProtocolID, streamHandler(t))
hb.SetStreamHandler(testProtocolID, streamHandler(t))
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", ha.Addrs()[0].String(), ha.ID()))
if err != nil {
@ -193,7 +193,7 @@ func TestLibp2pIntegration_WithPipes(t *testing.T) {
t.Fatal(err)
}
_, err = stream.Write([]byte("hello\n"))
err = writeRandomPayloadAndClose(t, stream)
if err != nil {
t.Fatal(err)
}
@ -225,8 +225,8 @@ func TestLibp2pIntegration_XXFallback(t *testing.T) {
defer hb.Close()
ha.SetStreamHandler(testProtocolID, handleStream)
hb.SetStreamHandler(testProtocolID, handleStream)
ha.SetStreamHandler(testProtocolID, streamHandler(t))
hb.SetStreamHandler(testProtocolID, streamHandler(t))
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", hb.Addrs()[0].String(), hb.ID()))
if err != nil {
@ -250,7 +250,7 @@ func TestLibp2pIntegration_XXFallback(t *testing.T) {
t.Fatal(err)
}
_, err = stream.Write([]byte("hello\n"))
err = writeRandomPayloadAndClose(t, stream)
if err != nil {
t.Fatal(err)
}
@ -277,18 +277,38 @@ func TestConstrucingWithMaker(t *testing.T) {
_ = h.Close()
}
func handleStream(stream net.Stream) {
defer func() {
if err := stream.Close(); err != nil {
log.Error("error closing stream", "err", err)
}
}()
func writeRandomPayloadAndClose(t *testing.T, stream net.Stream) error {
t.Helper()
size := 1 << 24
r := mrand.New(mrand.NewSource(42))
start := time.Now()
lr := io.LimitReader(r, int64(size))
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
msg, err := rw.Reader.ReadString('\n')
c, err := io.Copy(stream, lr)
elapsed := time.Since(start)
if err != nil {
fmt.Println("stream err", err)
return
return fmt.Errorf("failed to write out bytes: %v", err)
}
t.Logf("wrote %d bytes in %dms", c, elapsed.Milliseconds())
return stream.Close()
}
func streamHandler(t *testing.T) func(net.Stream) {
return func(stream net.Stream) {
t.Helper()
defer func() {
if err := stream.Close(); err != nil {
t.Error("error closing stream: ", err)
}
}()
start := time.Now()
c, err := io.Copy(ioutil.Discard, stream)
elapsed := time.Since(start)
if err != nil {
t.Error("error reading from stream: ", err)
return
}
t.Logf("read %d bytes in %dms", c, elapsed.Milliseconds())
}
fmt.Printf("got msg: %s", msg)
}