Abstract asynchronous client dispatch

:100644 100644 d3ecc28 053dd73 M	test/dht_test.go
This commit is contained in:
Cole Brown 2018-11-13 18:47:10 -05:00
parent 8a39ab2f7a
commit e468aef215
2 changed files with 257 additions and 31 deletions

View File

@ -3,27 +3,66 @@ package test
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"reflect"
"testing"
"time"
crypto "github.com/libp2p/go-libp2p-crypto"
"github.com/libp2p/go-libp2p-daemon/p2pclient"
pb "github.com/libp2p/go-libp2p-daemon/pb"
peer "github.com/libp2p/go-libp2p-peer"
)
func clientRequestAsync(t *testing.T, client *p2pclient.Client, method string, arg interface{}) interface{} {
argv := reflect.ValueOf(arg)
methodv := reflect.ValueOf(client).MethodByName(method)
elemtype := methodv.Type().Out(0)
streaming := false
if elemtype.Kind() == reflect.Chan {
streaming = true
elemtype = elemtype.Elem()
}
chantype := reflect.ChanOf(reflect.BothDir, elemtype)
outcv := reflect.MakeChan(chantype, 10)
go func() {
defer outcv.Close()
args := []reflect.Value{
reflect.ValueOf(context.Background()),
argv,
}
if !streaming {
args = args[1:]
}
res := methodv.Call(args)
if err, ok := res[1].Interface().(error); ok {
t.Fatalf("request failed: %s", err)
}
if !streaming {
outcv.Send(res[0])
return
}
for {
v, ok := res[0].Recv()
if !ok {
break
}
outcv.Send(v)
}
}()
return outcv.Interface()
}
func TestDHTFindPeer(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
id := randPeerID(t)
infoc := make(chan p2pclient.PeerInfo)
go func() {
defer close(infoc)
info, err := client.FindPeer(id)
if err != nil {
t.Fatal(err)
}
infoc <- info
}()
infoc := clientRequestAsync(t, client, "FindPeer", id).(chan p2pclient.PeerInfo)
conn := daemon.ExpectConn(t)
conn.ExpectDHTRequestType(t, pb.DHTRequest_FIND_PEER)
findPeerResponse := wrapDhtResponse(peerInfoResponse(t, id))
@ -44,22 +83,110 @@ func TestDHTFindPeer(t *testing.T) {
}
}
func TestDHTGetPublicKey(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
id := randPeerID(t)
key := randPubKey(t)
keyc := clientRequestAsync(t, client, "GetPublicKey", id).(chan crypto.PubKey)
conn := daemon.ExpectConn(t)
conn.ExpectDHTRequestType(t, pb.DHTRequest_GET_PUBLIC_KEY)
keybytes, err := key.Bytes()
if err != nil {
t.Fatal(err)
}
getKeyResponse := wrapDhtResponse(valueResponse(keybytes))
conn.SendMessage(t, getKeyResponse)
select {
case reskey := <-keyc:
if !key.Equals(reskey) {
t.Fatal("keys did not match")
}
case <-time.After(testTimeout):
t.Fatal("timed out waiting for peer info")
}
}
func TestDHTGetValue(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
key := randString(t)
value := make([]byte, 10)
rand.Read(value)
valuec := clientRequestAsync(t, client, "GetValue", key).(chan []byte)
conn := daemon.ExpectConn(t)
conn.ExpectDHTRequestType(t, pb.DHTRequest_GET_VALUE)
getKeyResponse := wrapDhtResponse(valueResponse(value))
conn.SendMessage(t, getKeyResponse)
select {
case resvalue := <-valuec:
if !bytes.Equal(resvalue, value) {
t.Fatal("value did not match")
}
case <-time.After(testTimeout):
t.Fatal("timed out waiting for peer info")
}
}
func TestDHTPutValue(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
key := randString(t)
value := make([]byte, 10)
rand.Read(value)
donec := make(chan struct{})
go func() {
err := client.PutValue(key, value)
if err != nil {
t.Fatal(err)
}
donec <- struct{}{}
}()
conn := daemon.ExpectConn(t)
conn.ExpectDHTRequestType(t, pb.DHTRequest_PUT_VALUE)
putValueResponse := wrapDhtResponse(nil)
conn.SendMessage(t, putValueResponse)
select {
case <-donec:
case <-time.After(testTimeout):
t.Fatal("timed out waiting for response")
}
}
func TestDHTProvide(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
cid := randCid(t)
donec := make(chan struct{})
go func() {
err := client.Provide(cid)
if err != nil {
t.Fatal(err)
}
donec <- struct{}{}
}()
conn := daemon.ExpectConn(t)
conn.ExpectDHTRequestType(t, pb.DHTRequest_PROVIDE)
provideResponse := wrapDhtResponse(nil)
conn.SendMessage(t, provideResponse)
select {
case <-donec:
case <-time.After(testTimeout):
t.Fatal("timed out waiting for response")
}
}
func TestDHTFindPeersConnectedToPeer(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
ids := randPeerIDs(t, 3)
infoc := make(chan p2pclient.PeerInfo)
go func(out chan p2pclient.PeerInfo) {
infoc, err := client.FindPeersConnectedToPeer(context.Background(), ids[0])
if err != nil {
t.Fatal(err)
}
for info := range infoc {
out <- info
}
close(out)
}(infoc)
infoc := clientRequestAsync(t, client, "FindPeersConnectedToPeer", ids[0]).(chan p2pclient.PeerInfo)
conn := daemon.ExpectConn(t)
req := conn.ExpectDHTRequestType(t, pb.DHTRequest_FIND_PEERS_CONNECTED_TO_PEER)
@ -87,18 +214,8 @@ func TestDHTFindProviders(t *testing.T) {
defer closer()
ids := randPeerIDs(t, 3)
infoc := make(chan p2pclient.PeerInfo)
contentID := randCid(t)
go func(out chan p2pclient.PeerInfo) {
infoc, err := client.FindProviders(context.Background(), contentID)
if err != nil {
t.Fatal(err)
}
for info := range infoc {
out <- info
}
close(out)
}(infoc)
infoc := clientRequestAsync(t, client, "FindProviders", contentID).(chan p2pclient.PeerInfo)
conn := daemon.ExpectConn(t)
req := conn.ExpectDHTRequestType(t, pb.DHTRequest_FIND_PROVIDERS)
@ -120,3 +237,64 @@ func TestDHTFindProviders(t *testing.T) {
t.Fatalf("expected 2 responses, got %d", i)
}
}
func TestDHTGetClosestPeers(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
ids := randPeerIDs(t, 2)
key := randString(t)
idc := clientRequestAsync(t, client, "GetClosestPeers", key).(chan peer.ID)
conn := daemon.ExpectConn(t)
req := conn.ExpectDHTRequestType(t, pb.DHTRequest_GET_CLOSEST_PEERS)
if req.GetKey() != key {
t.Fatal("request key didn't match expected key")
}
fmt.Println("we good")
resps := make([]*pb.DHTResponse, 2)
for i, id := range ids {
resps[i] = peerIDResponse(t, id)
}
conn.SendStreamAsync(t, resps)
i := 0
for range idc {
i++
}
if i != 2 {
t.Fatalf("expected 2 responses, got %d", i)
}
}
func TestDHTSearchValue(t *testing.T) {
daemon, client, closer := createMockDaemonClientPair(t)
defer closer()
key := randString(t)
values := make([][]byte, 2)
for i := range values {
values[i] = make([]byte, 10)
rand.Read(values[i])
}
valuec := clientRequestAsync(t, client, "SearchValue", key).(chan []byte)
conn := daemon.ExpectConn(t)
conn.ExpectDHTRequestType(t, pb.DHTRequest_SEARCH_VALUE)
resps := make([]*pb.DHTResponse, 2)
for i, value := range values {
resps[i] = valueResponse(value)
}
conn.SendStreamAsync(t, resps)
expiry := time.Now().Add(testTimeout)
for i := 0; i < 2; i++ {
select {
case resvalue := <-valuec:
if !bytes.Equal(resvalue, values[i]) {
t.Fatalf("value %d did not match", i)
}
case <-time.After(time.Until(expiry)):
t.Fatal("timed out waiting for values")
}
}
}

View File

@ -3,12 +3,14 @@ package test
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"io/ioutil"
"os"
"testing"
cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
p2pd "github.com/libp2p/go-libp2p-daemon"
"github.com/libp2p/go-libp2p-daemon/p2pclient"
pb "github.com/libp2p/go-libp2p-daemon/pb"
@ -107,6 +109,38 @@ func randCid(t *testing.T) cid.Cid {
return id
}
func randCids(t *testing.T, n int) []cid.Cid {
ids := make([]cid.Cid, n)
for i := 0; i < n; i++ {
ids[i] = randCid(t)
}
return ids
}
func randString(t *testing.T) string {
buf := make([]byte, 10)
rand.Read(buf)
return hex.EncodeToString(buf)
}
func randStrings(t *testing.T, n int) []string {
out := make([]string, n)
for i := 0; i < n; i++ {
buf := make([]byte, 10)
rand.Read(buf)
out[i] = hex.EncodeToString(buf)
}
return out
}
func randPubKey(t *testing.T) crypto.PubKey {
_, pub, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
t.Fatalf("generating pubkey: %s", err)
}
return pub
}
func wrapDhtResponse(dht *pb.DHTResponse) *pb.Response {
return &pb.Response{
Type: pb.Response_OK.Enum(),
@ -127,3 +161,17 @@ func peerInfoResponse(t *testing.T, id peer.ID) *pb.DHTResponse {
},
}
}
func peerIDResponse(t *testing.T, id peer.ID) *pb.DHTResponse {
return &pb.DHTResponse{
Type: pb.DHTResponse_VALUE.Enum(),
Value: []byte(id),
}
}
func valueResponse(buf []byte) *pb.DHTResponse {
return &pb.DHTResponse{
Type: pb.DHTResponse_VALUE.Enum(),
Value: buf,
}
}