libp2p-test-plans/dht/utils/identifiers.go

110 lines
2.7 KiB
Go

package utils
import (
"context"
"sort"
"github.com/testground/sdk-go/sync"
)
func GetGroupsAndSeqs(ctx context.Context, ri *RunInfo, groupOrder int) (groupSeq, testSeq int, err error) {
groupSeq, err = getGroupSeq(ctx, ri)
if err != nil {
return
}
if err = setGroupInfo(ctx, ri, groupOrder); err != nil {
return
}
ri.RunEnv.RecordMessage("past group info")
testSeq = getNodeID(ri, groupSeq)
return
}
// getGroupSeq returns the sequence number of this test instance within its group
func getGroupSeq(ctx context.Context, ri *RunInfo) (int, error) {
seq, err := ri.Client.SignalAndWait(ctx, sync.State(ri.RunEnv.TestGroupID), ri.RunEnv.TestGroupInstanceCount)
seq-- // make 0-indexed
return int(seq), err
}
// setGroupInfo uses the sync service to determine which groups are part of the test and to get their sizes.
// This information is set on the passed in RunInfo.
func setGroupInfo(ctx context.Context, ri *RunInfo, groupOrder int) error {
gi := &GroupInfo{
ID: ri.RunEnv.TestGroupID,
Size: ri.RunEnv.TestGroupInstanceCount,
Order: groupOrder,
Params: ri.RunEnv.TestInstanceParams,
}
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
groupInfoCh := make(chan *GroupInfo)
ri.Client.MustPublishSubscribe(subCtx, GroupIDTopic, gi, groupInfoCh)
groupOrderMap := make(map[int][]string)
groups := make(map[string]*GroupInfo)
for i := 0; i < ri.RunEnv.TestInstanceCount; i++ {
select {
case g, more := <-groupInfoCh:
if !more {
break
}
if _, ok := groups[g.ID]; !ok {
groups[g.ID] = g
groupOrderMap[g.Order] = append(groupOrderMap[g.Order], g.ID)
}
case <-ctx.Done():
return ctx.Err()
}
}
ri.RunEnv.RecordMessage("there are %d groups %v", len(groups), groups)
sortedGroups := make([]string, 0, len(groups))
sortedOrderNums := make([]int, 0, len(groupOrderMap))
for order := range groupOrderMap {
sortedOrderNums = append(sortedOrderNums, order)
}
sort.Ints(sortedOrderNums)
for i := 0; i < len(sortedOrderNums); i++ {
sort.Strings(groupOrderMap[i])
sortedGroups = append(sortedGroups, groupOrderMap[i]...)
}
ri.Groups = sortedGroups
ri.GroupProperties = groups
ri.RunEnv.RecordMessage("sortedGroup order %v", sortedGroups)
return nil
}
// getNodeID returns the sequence number of this test instance within the test
func getNodeID(ri *RunInfo, seq int) int {
id := seq
for _, g := range ri.Groups {
if g == ri.RunEnv.TestGroupID {
break
}
id += ri.GroupProperties[g].Size
}
return id
}
// GroupIDTopic represents a subtree under the test run's sync tree where peers
// participating in this distributed test advertise their groups.
var GroupIDTopic = sync.NewTopic("groupIDs", &GroupInfo{})
type GroupInfo struct {
ID string
Size int
Order int
Params map[string]string
}