243 lines
6.1 KiB
Go
243 lines
6.1 KiB
Go
|
package test
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"github.com/libp2p/test-plans/dht/utils"
|
||
|
"time"
|
||
|
|
||
|
"golang.org/x/sync/errgroup"
|
||
|
|
||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||
|
|
||
|
"github.com/ipfs/go-cid"
|
||
|
u "github.com/ipfs/go-ipfs-util"
|
||
|
"github.com/testground/sdk-go/runtime"
|
||
|
)
|
||
|
|
||
|
type findProvsParams struct {
|
||
|
RecordSeed int
|
||
|
RecordCount int
|
||
|
SearchRecords bool
|
||
|
}
|
||
|
|
||
|
func getFindProvsParams(params map[string]string) findProvsParams {
|
||
|
tmpRunEnv := runtime.RunEnv{RunParams: runtime.RunParams{
|
||
|
TestInstanceParams: params,
|
||
|
}}
|
||
|
|
||
|
fpOpts := findProvsParams{
|
||
|
RecordSeed: tmpRunEnv.IntParam("record_seed"),
|
||
|
RecordCount: tmpRunEnv.IntParam("record_count"),
|
||
|
SearchRecords: tmpRunEnv.BooleanParam("search_records"),
|
||
|
}
|
||
|
|
||
|
return fpOpts
|
||
|
}
|
||
|
|
||
|
func FindProviders(runenv *runtime.RunEnv) error {
|
||
|
commonOpts := GetCommonOpts(runenv)
|
||
|
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), commonOpts.Timeout)
|
||
|
defer cancel()
|
||
|
|
||
|
ri, err := Base(ctx, runenv, commonOpts)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if err := TestProviderRecords(ctx, ri); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
Teardown(ctx, ri.RunInfo)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func TestProviderRecords(ctx context.Context, ri *DHTRunInfo) error {
|
||
|
runenv := ri.RunEnv
|
||
|
node := ri.Node
|
||
|
|
||
|
fpOpts := getFindProvsParams(ri.RunEnv.RunParams.TestInstanceParams)
|
||
|
|
||
|
stager := utils.NewBatchStager(ctx, node.info.Seq, runenv.TestInstanceCount, "provider-records", ri.RunInfo)
|
||
|
|
||
|
emitRecords, searchRecords := getRecords(ri, fpOpts)
|
||
|
|
||
|
if err := stager.Begin(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
runenv.RecordMessage("start provide loop")
|
||
|
|
||
|
// If we're a member of the providing cohort, let's provide those CIDs to
|
||
|
// the network.
|
||
|
if fpOpts.RecordCount > 0 {
|
||
|
g := errgroup.Group{}
|
||
|
for index, cid := range emitRecords {
|
||
|
i := index
|
||
|
c := cid
|
||
|
g.Go(func() error {
|
||
|
p := peer.ID(c.Bytes())
|
||
|
ectx, cancel := context.WithCancel(ctx) //nolint
|
||
|
ectx = TraceQuery(ctx, runenv, node, p.Pretty(), "provider-records")
|
||
|
t := time.Now()
|
||
|
err := node.dht.Provide(ectx, c, true)
|
||
|
cancel()
|
||
|
if err == nil {
|
||
|
runenv.RecordMessage("Provided CID: %s", c)
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("time-to-provide-%d", i),
|
||
|
Unit: "ns",
|
||
|
ImprovementDir: -1,
|
||
|
}, float64(time.Since(t).Nanoseconds()))
|
||
|
}
|
||
|
|
||
|
return err
|
||
|
})
|
||
|
}
|
||
|
|
||
|
if err := g.Wait(); err != nil {
|
||
|
_ = stager.End()
|
||
|
return fmt.Errorf("failed while providing: %s", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := stager.End(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
outputGraph(node.dht, "after_provide")
|
||
|
|
||
|
if err := stager.Begin(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if fpOpts.SearchRecords {
|
||
|
g := errgroup.Group{}
|
||
|
for _, record := range searchRecords {
|
||
|
for index, cid := range record.RecordIDs {
|
||
|
i := index
|
||
|
c := cid
|
||
|
groupID := record.GroupID
|
||
|
g.Go(func() error {
|
||
|
p := peer.ID(c.Bytes())
|
||
|
ectx, cancel := context.WithCancel(ctx) //nolint
|
||
|
ectx = TraceQuery(ctx, runenv, node, p.Pretty(), "provider-records")
|
||
|
t := time.Now()
|
||
|
|
||
|
numProvs := 0
|
||
|
provsCh := node.dht.FindProvidersAsync(ectx, c, getAllProvRecordsNum())
|
||
|
status := "done"
|
||
|
|
||
|
var tLastFound time.Time
|
||
|
provLoop:
|
||
|
for {
|
||
|
select {
|
||
|
case _, ok := <-provsCh:
|
||
|
if !ok {
|
||
|
break provLoop
|
||
|
}
|
||
|
|
||
|
tLastFound = time.Now()
|
||
|
|
||
|
if numProvs == 0 {
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("time-to-find-first|%s|%d", groupID, i),
|
||
|
Unit: "ns",
|
||
|
ImprovementDir: -1,
|
||
|
}, float64(tLastFound.Sub(t).Nanoseconds()))
|
||
|
}
|
||
|
|
||
|
numProvs++
|
||
|
case <-ctx.Done():
|
||
|
status = "incomplete"
|
||
|
break provLoop
|
||
|
}
|
||
|
}
|
||
|
cancel()
|
||
|
|
||
|
if numProvs > 0 {
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("time-to-find-last|%s|%s|%d", status, groupID, i),
|
||
|
Unit: "ns",
|
||
|
ImprovementDir: -1,
|
||
|
}, float64(tLastFound.Sub(t).Nanoseconds()))
|
||
|
} else if status != "incomplete" {
|
||
|
status = "fail"
|
||
|
}
|
||
|
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("time-to-find|%s|%s|%d", status, groupID, i),
|
||
|
Unit: "ns",
|
||
|
ImprovementDir: -1,
|
||
|
}, float64(time.Since(t).Nanoseconds()))
|
||
|
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("peers-found|%s|%s|%d", status, groupID, i),
|
||
|
Unit: "peers",
|
||
|
ImprovementDir: 1,
|
||
|
}, float64(numProvs))
|
||
|
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("peers-missing|%s|%s|%d", status, groupID, i),
|
||
|
Unit: "peers",
|
||
|
ImprovementDir: -1,
|
||
|
}, float64(ri.GroupProperties[groupID].Size-numProvs))
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := g.Wait(); err != nil {
|
||
|
_ = stager.End()
|
||
|
return fmt.Errorf("failed while finding providerss: %s", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := stager.End(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// getRecords returns the records we plan to store and those we plan to search for. It also tells other nodes via the
|
||
|
// sync service which nodes our group plans on advertising
|
||
|
func getRecords(ri *DHTRunInfo, fpOpts findProvsParams) ([]cid.Cid, []*ProviderRecordSubmission) {
|
||
|
recGen := func(groupID string, groupFPOpts findProvsParams) (out []cid.Cid) {
|
||
|
for i := 0; i < groupFPOpts.RecordCount; i++ {
|
||
|
c := fmt.Sprintf("CID %d - group %s - seeded with %d", i, groupID, groupFPOpts.RecordSeed)
|
||
|
out = append(out, cid.NewCidV0(u.Hash([]byte(c))))
|
||
|
}
|
||
|
return out
|
||
|
}
|
||
|
|
||
|
var emitRecords []cid.Cid
|
||
|
if fpOpts.RecordCount > 0 {
|
||
|
// Calculate the CIDs we're dealing with.
|
||
|
emitRecords = recGen(ri.Node.info.Group, fpOpts)
|
||
|
}
|
||
|
|
||
|
var searchRecords []*ProviderRecordSubmission
|
||
|
if fpOpts.SearchRecords {
|
||
|
for _, g := range ri.Groups {
|
||
|
gOpts := ri.GroupProperties[g]
|
||
|
groupFPOpts := getFindProvsParams(gOpts.Params)
|
||
|
if groupFPOpts.RecordCount > 0 {
|
||
|
searchRecords = append(searchRecords, &ProviderRecordSubmission{
|
||
|
RecordIDs: recGen(g, groupFPOpts),
|
||
|
GroupID: g,
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return emitRecords, searchRecords
|
||
|
}
|
||
|
|
||
|
type ProviderRecordSubmission struct {
|
||
|
RecordIDs []cid.Cid
|
||
|
GroupID string
|
||
|
}
|