189 lines
4.6 KiB
Go
189 lines
4.6 KiB
Go
|
package test
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/hex"
|
||
|
"fmt"
|
||
|
"github.com/libp2p/test-plans/dht/utils"
|
||
|
"time"
|
||
|
|
||
|
"golang.org/x/sync/errgroup"
|
||
|
|
||
|
"github.com/ipfs/go-cid"
|
||
|
u "github.com/ipfs/go-ipfs-util"
|
||
|
|
||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||
|
kbucket "github.com/libp2p/go-libp2p-kbucket"
|
||
|
|
||
|
"github.com/testground/sdk-go/runtime"
|
||
|
)
|
||
|
|
||
|
func GetClosestPeers(runenv *runtime.RunEnv) error {
|
||
|
commonOpts := GetCommonOpts(runenv)
|
||
|
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), commonOpts.Timeout)
|
||
|
defer cancel()
|
||
|
|
||
|
ri, err := Base(ctx, runenv, commonOpts)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if err := TestGetClosestPeers(ctx, ri); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
Teardown(ctx, ri.RunInfo)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func TestGetClosestPeers(ctx context.Context, ri *DHTRunInfo) error {
|
||
|
fpOpts := getFindProvsParams(ri.RunEnv.RunParams.TestInstanceParams)
|
||
|
runenv := ri.RunEnv
|
||
|
|
||
|
// TODO: This is hacky we should probably thread through a separate GCPRecordCount variable
|
||
|
maxRecCount := 0
|
||
|
for _, g := range ri.GroupProperties {
|
||
|
gOpts := getFindProvsParams(g.Params)
|
||
|
if gOpts.RecordCount > maxRecCount {
|
||
|
maxRecCount = gOpts.RecordCount
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Calculate the CIDs we're dealing with.
|
||
|
cids := func() (out []cid.Cid) {
|
||
|
for i := 0; i < maxRecCount; i++ {
|
||
|
c := fmt.Sprintf("CID %d - seeded with %d", i, fpOpts.RecordSeed)
|
||
|
out = append(out, cid.NewCidV0(u.Hash([]byte(c))))
|
||
|
}
|
||
|
return out
|
||
|
}()
|
||
|
|
||
|
node := ri.Node
|
||
|
others := ri.Others
|
||
|
|
||
|
stager := utils.NewBatchStager(ctx, node.info.Seq, runenv.TestInstanceCount, "get-closest-peers", ri.RunInfo)
|
||
|
if err := stager.Begin(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
runenv.RecordMessage("start gcp loop")
|
||
|
if fpOpts.SearchRecords {
|
||
|
g := errgroup.Group{}
|
||
|
for index, cid := range cids {
|
||
|
i := index
|
||
|
c := cid
|
||
|
g.Go(func() error {
|
||
|
p := peer.ID(c.Bytes())
|
||
|
ectx, cancel := context.WithCancel(ctx)
|
||
|
ectx = TraceQuery(ectx, runenv, node, p.Pretty(), "get-closest-peers")
|
||
|
t := time.Now()
|
||
|
pids, err := node.dht.GetClosestPeers(ectx, c.KeyString())
|
||
|
cancel()
|
||
|
|
||
|
peers := make([]peer.ID, 0, node.info.Properties.BucketSize)
|
||
|
for p := range pids {
|
||
|
peers = append(peers, p)
|
||
|
}
|
||
|
|
||
|
if err == nil {
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("time-to-gcp-%d", i),
|
||
|
Unit: "ns",
|
||
|
ImprovementDir: -1,
|
||
|
}, float64(time.Since(t).Nanoseconds()))
|
||
|
|
||
|
runenv.RecordMetric(&runtime.MetricDefinition{
|
||
|
Name: fmt.Sprintf("gcp-peers-found-%d", i),
|
||
|
Unit: "peers",
|
||
|
ImprovementDir: 1,
|
||
|
}, float64(len(pids)))
|
||
|
|
||
|
actualClosest := getClosestPeerRanking(node, others, c)
|
||
|
outputGCP(runenv, node.info.Addrs.ID, c, peers, actualClosest)
|
||
|
} else {
|
||
|
runenv.RecordMessage("Error during GCP %w", err)
|
||
|
}
|
||
|
return err
|
||
|
})
|
||
|
}
|
||
|
|
||
|
if err := g.Wait(); err != nil {
|
||
|
_ = stager.End()
|
||
|
return fmt.Errorf("failed while finding providerss: %s", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
runenv.RecordMessage("done gcp loop")
|
||
|
|
||
|
if err := stager.End(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func getClosestPeerRanking(me *NodeParams, others map[peer.ID]*DHTNodeInfo, target cid.Cid) []peer.ID {
|
||
|
var allPeers []peer.ID
|
||
|
allPeers = append(allPeers, me.dht.PeerID())
|
||
|
for p := range others {
|
||
|
allPeers = append(allPeers, p)
|
||
|
}
|
||
|
|
||
|
kadTarget := kbucket.ConvertKey(target.KeyString())
|
||
|
return kbucket.SortClosestPeers(allPeers, kadTarget)
|
||
|
}
|
||
|
|
||
|
func outputGCP(runenv *runtime.RunEnv, me peer.ID, target cid.Cid, peers, rankedPeers []peer.ID) {
|
||
|
peerStrs := make([]string, len(peers))
|
||
|
kadPeerStrs := make([]string, len(peers))
|
||
|
|
||
|
for i, p := range peers {
|
||
|
peerStrs[i] = p.String()
|
||
|
kadPeerStrs[i] = hex.EncodeToString(kbucket.ConvertKey(string(p)))
|
||
|
}
|
||
|
|
||
|
actualClosest := rankedPeers[:len(peers)]
|
||
|
|
||
|
nodeLogger.Infow("gcp-results",
|
||
|
"me", me.String(),
|
||
|
"KadMe", kbucket.ConvertKey(string(me)),
|
||
|
"target", target,
|
||
|
"peers", peers,
|
||
|
"actual", actualClosest,
|
||
|
"KadTarget", kbucket.ConvertKey(target.KeyString()),
|
||
|
"KadPeers", peerIDsToKadIDs(peers),
|
||
|
"KadActual", peerIDsToKadIDs(actualClosest),
|
||
|
"Scores", gcpScore(peers, rankedPeers),
|
||
|
)
|
||
|
|
||
|
_ = nodeLogger.Sync()
|
||
|
}
|
||
|
|
||
|
func gcpScore(peers, rankedPeers []peer.ID) []int {
|
||
|
getIndex := func(peers []peer.ID, target peer.ID) int {
|
||
|
for i, p := range peers {
|
||
|
if p == target {
|
||
|
return i
|
||
|
}
|
||
|
}
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
// score is distance between actual ranking and our ranking
|
||
|
var scores []int
|
||
|
for i, p := range peers {
|
||
|
diff := getIndex(rankedPeers, p) - i
|
||
|
scores = append(scores, diff)
|
||
|
}
|
||
|
return scores
|
||
|
}
|
||
|
|
||
|
func peerIDsToKadIDs(peers []peer.ID) []kbucket.ID {
|
||
|
kadIDs := make([]kbucket.ID, len(peers))
|
||
|
for i, p := range peers {
|
||
|
kadIDs[i] = kbucket.ConvertPeerID(p)
|
||
|
}
|
||
|
return kadIDs
|
||
|
}
|