libp2p-test-plans/dht/test/store_get_value.go
Raúl Kripalani 4d4f24ad8f
migrate dht test plan from testground/testground. (#1)
* migrate dht test plan from testground/testground.

* dht: update module and import paths.

* rm redundant scripts; rm redundant line in manifest.
2020-04-22 17:47:58 +01:00

320 lines
8.4 KiB
Go

package test
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/libp2p/test-plans/dht/utils"
"github.com/testground/sdk-go/runtime"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-ipns"
ipns_pb "github.com/ipfs/go-ipns/pb"
"golang.org/x/sync/errgroup"
)
func StoreGetValue(runenv *runtime.RunEnv) error {
commonOpts := GetCommonOpts(runenv)
ctx, cancel := context.WithTimeout(context.Background(), commonOpts.Timeout)
defer cancel()
ri, err := Base(ctx, runenv, commonOpts)
if err != nil {
return err
}
if err := TestIPNSRecords(ctx, ri); err != nil {
return err
}
Teardown(ctx, ri.RunInfo)
return nil
}
func TestIPNSRecords(ctx context.Context, ri *DHTRunInfo) error {
runenv := ri.RunEnv
node := ri.Node
fpOpts := getFindProvsParams(ri.RunEnv.RunParams.TestInstanceParams)
stager := utils.NewBatchStager(ctx, node.info.Seq, runenv.TestInstanceCount, "ipns-records", ri.RunInfo)
emitRecords, searchRecords, err := generateIPNSRecords(ri, fpOpts)
if err != nil {
return err
}
emitRecordsKeys := make([]string, len(emitRecords))
for i, privKey := range emitRecords {
pid, err := peer.IDFromPrivateKey(privKey)
if err != nil {
return err
}
emitRecordsKeys[i] = ipns.RecordKey(pid)
}
runenv.RecordMessage("start put loop")
for i := 0; i < 5; i++ {
if err := putIPNSRecord(ctx, ri, fpOpts, stager, i, emitRecords, emitRecordsKeys); err != nil {
return err
}
if err := getIPNSRecord(ctx, ri, fpOpts, stager, i, searchRecords); err != nil {
return err
}
}
return nil
}
func putIPNSRecord(ctx context.Context, ri *DHTRunInfo, fpOpts findProvsParams, stager utils.Stager, recNum int, emitRecords []crypto.PrivKey, emitRecordsKeys []string) error {
runenv := ri.RunEnv
node := ri.Node
if err := stager.Begin(); err != nil {
return err
}
// If we're a member of the putting cohort, let's put those IPNS records to the network.
if fpOpts.RecordCount > 0 {
g := errgroup.Group{}
for index, privKey := range emitRecords {
i := index
record, err := ipns.Create(privKey, []byte(fmt.Sprintf("/path/to/stuff/%d", recNum)), uint64(recNum), time.Now().Add(time.Hour))
if err != nil {
return err
}
if err := ipns.EmbedPublicKey(privKey.GetPublic(), record); err != nil {
return err
}
recordKey := emitRecordsKeys[i]
recordBytes, err := proto.Marshal(record)
if err != nil {
return err
}
g.Go(func() error {
ectx, cancel := context.WithCancel(ctx) //nolint
ectx = TraceQuery(ctx, runenv, node, recordKey, "ipns-records")
t := time.Now()
err := node.dht.PutValue(ectx, recordKey, recordBytes)
cancel()
if err == nil {
runenv.RecordMessage("Put IPNS Key: %s", recordKey)
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("time-to-put-%d", i),
Unit: "ns",
ImprovementDir: -1,
}, float64(time.Since(t).Nanoseconds()))
} else {
runenv.RecordMessage("Failed to Put IPNS Key: %s : err: %s", recordKey, err)
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("time-to-failed-put-%d", i),
Unit: "ns",
ImprovementDir: -1,
}, float64(time.Since(t).Nanoseconds()))
}
return nil
})
}
if err := g.Wait(); err != nil {
panic("how is there an error here?")
}
}
if err := stager.End(); err != nil {
return err
}
return nil
}
func getIPNSRecord(ctx context.Context, ri *DHTRunInfo, fpOpts findProvsParams, stager utils.Stager, recNum int, searchRecords []*RecordSubmission) error {
runenv := ri.RunEnv
node := ri.Node
if err := stager.Begin(); err != nil {
return err
}
if fpOpts.SearchRecords {
g := errgroup.Group{}
for _, record := range searchRecords {
for index, key := range record.RecordIDs {
i := index
k := key
groupID := record.GroupID
g.Go(func() error {
ectx, cancel := context.WithCancel(ctx) //nolint
ectx = TraceQuery(ctx, runenv, node, k, "ipns-records")
t := time.Now()
runenv.RecordMessage("Searching for IPNS Key: %s", k)
numRecs := 0
recordCh, err := node.dht.SearchValue(ectx, k)
if err != nil {
runenv.RecordMessage("Failed to Search for IPNS Key: %s : err: %s", k, err)
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("time-to-failed-put-%d", i),
Unit: "ns",
ImprovementDir: -1,
}, float64(time.Since(t).Nanoseconds()))
return nil //nolint
}
status := "done"
var tLastFound time.Time
var lastRec []byte
searchLoop:
for {
select {
case rec, ok := <-recordCh:
if !ok {
break searchLoop
}
lastRec = rec
tLastFound = time.Now()
if numRecs == 0 {
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("time-to-get-first|%s|%d", groupID, i),
Unit: "ns",
ImprovementDir: -1,
}, float64(tLastFound.Sub(t).Nanoseconds()))
}
numRecs++
case <-ctx.Done():
break searchLoop
}
}
cancel()
if numRecs > 0 {
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("time-to-get-last|%s|%s|%d", status, groupID, i),
Unit: "ns",
ImprovementDir: -1,
}, float64(tLastFound.Sub(t).Nanoseconds()))
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("record-updates|%s|%s|%d|%d", status, groupID, recNum, i),
Unit: "records",
ImprovementDir: -1,
}, float64(numRecs))
if len(lastRec) == 0 {
panic("this should not be possible")
}
recordResult := &ipns_pb.IpnsEntry{}
if err := recordResult.Unmarshal(lastRec); err != nil {
panic(fmt.Errorf("received invalid IPNS record: err %v", err))
}
if diff := int(*recordResult.Sequence) - recNum; diff > 0 {
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("incomplete-get|%s|%d|%d", groupID, recNum, i),
Unit: "records",
ImprovementDir: -1,
}, float64(diff))
status = "fail"
}
} else {
status = "fail"
}
runenv.RecordMetric(&runtime.MetricDefinition{
Name: fmt.Sprintf("time-to-get|%s|%s|%d|%d", status, groupID, recNum, i),
Unit: "ns",
ImprovementDir: -1,
}, float64(time.Since(t).Nanoseconds()))
return nil
})
}
}
if err := g.Wait(); err != nil {
panic("how is this possible?")
}
}
if err := stager.End(); err != nil {
return err
}
return nil
}
// generateIPNSRecords returns the records we plan to store and those we plan to search for
func generateIPNSRecords(ri *DHTRunInfo, fpOpts findProvsParams) (emitRecords []crypto.PrivKey, searchRecords []*RecordSubmission, err error) {
recGen := func(groupID string, groupFPOpts findProvsParams) (out []crypto.PrivKey, err error) {
// Calculate key based on seed
rng := rand.New(rand.NewSource(int64(fpOpts.RecordSeed)))
// Calculate key based on group (run through the rng to do this)
for _, g := range ri.Groups {
rng.Int63()
if g == groupID {
break
}
}
// Unique key per record is generated since the rng is mutated by creating the new key
for i := 0; i < groupFPOpts.RecordCount; i++ {
priv, _, err := crypto.GenerateEd25519Key(rng)
if err != nil {
return nil, err
}
out = append(out, priv)
}
return out, nil
}
if fpOpts.RecordCount > 0 && ri.Node.info.GroupSeq == 0 {
// Calculate the CIDs we're dealing with.
emitRecords, err = recGen(ri.Node.info.Group, fpOpts)
if err != nil {
return
}
}
if fpOpts.SearchRecords {
for _, g := range ri.Groups {
gOpts := ri.GroupProperties[g]
groupFPOpts := getFindProvsParams(gOpts.Params)
if groupFPOpts.RecordCount > 0 {
recs, err := recGen(g, groupFPOpts)
if err != nil {
return nil, nil, err
}
ipnsKeys := make([]string, len(recs))
for i, k := range recs {
pid, err := peer.IDFromPrivateKey(k)
if err != nil {
return nil, nil, err
}
ipnsKeys[i] = ipns.RecordKey(pid)
}
searchRecords = append(searchRecords, &RecordSubmission{
RecordIDs: ipnsKeys,
GroupID: g,
})
}
}
}
return
}
type RecordSubmission struct {
RecordIDs []string
GroupID string
}