cmd/swarm/swarm-smoke: sliding window test should not time out (#19152)

This commit is contained in:
Elad 2019-03-05 23:43:05 +07:00 committed by Anton Evangelatov
parent 81ed700157
commit 34c85def3e
2 changed files with 45 additions and 30 deletions

View File

@ -149,8 +149,9 @@ var (
} }
SwarmStoreCacheCapacity = cli.UintFlag{ SwarmStoreCacheCapacity = cli.UintFlag{
Name: "store.cache.size", Name: "store.cache.size",
Usage: "Number of recent chunks cached in memory (default 5000)", Usage: "Number of recent chunks cached in memory",
EnvVar: SwarmEnvStoreCacheCapacity, EnvVar: SwarmEnvStoreCacheCapacity,
Value: 10000,
} }
SwarmCompressedFlag = cli.BoolFlag{ SwarmCompressedFlag = cli.BoolFlag{
Name: "compressed", Name: "compressed",

View File

@ -42,23 +42,16 @@ func slidingWindowCmd(ctx *cli.Context, tuid string) error {
errc <- slidingWindow(ctx, tuid) errc <- slidingWindow(ctx, tuid)
}() }()
select { err := <-errc
case err := <-errc:
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
} }
return err return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
return fmt.Errorf("timeout after %v sec", timeout)
}
} }
func slidingWindow(ctx *cli.Context, tuid string) error { func slidingWindow(ctx *cli.Context, tuid string) error {
var hashes []uploadResult //swarm hashes of the uploads var hashes []uploadResult //swarm hashes of the uploads
nodes := len(hosts) nodes := len(hosts)
const iterationTimeout = 30 * time.Second
log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout)
uploadedBytes := 0 uploadedBytes := 0
networkDepth := 0 networkDepth := 0
@ -66,6 +59,7 @@ func slidingWindow(ctx *cli.Context, tuid string) error {
outer: outer:
for { for {
seed = int(time.Now().UTC().UnixNano())
log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)
t1 := time.Now() t1 := time.Now()
@ -79,6 +73,7 @@ outer:
} }
metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1) metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1)
metrics.GetOrRegisterGauge("sliding-window.upload-depth", nil).Update(int64(len(hashes)))
fhash, err := digest(bytes.NewReader(randomBytes)) fhash, err := digest(bytes.NewReader(randomBytes))
if err != nil { if err != nil {
@ -90,37 +85,56 @@ outer:
hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) hashes = append(hashes, uploadResult{hash: hash, digest: fhash})
time.Sleep(time.Duration(syncDelay) * time.Second) time.Sleep(time.Duration(syncDelay) * time.Second)
uploadedBytes += filesize * 1000 uploadedBytes += filesize * 1000
q := make(chan struct{}, 1)
d := make(chan struct{})
defer close(q)
defer close(d)
for i, v := range hashes { for i, v := range hashes {
timeout := time.After(time.Duration(timeout) * time.Second) timeoutC := time.After(time.Duration(timeout) * time.Second)
errored = false errored = false
inner: task:
for { for {
select { select {
case <-timeout: case q <- struct{}{}:
errored = true go func() {
log.Error("error retrieving hash. timeout", "hash idx", i, "err", err) var start time.Time
metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) done := false
break inner for !done {
default: log.Info("trying to retrieve hash", "hash", v.hash)
idx := 1 + rand.Intn(len(hosts)-1) idx := 1 + rand.Intn(len(hosts)-1)
ruid := uuid.New()[:8] ruid := uuid.New()[:8]
start := time.Now() start = time.Now()
// fetch hangs when swarm dies out, so we have to jump through a bit more hoops to actually
// catch the timeout, but also allow this retry logic
err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "") err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "")
if err != nil { if err != nil {
continue inner log.Error("error fetching hash", "err", err)
continue
}
done = true
} }
metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start) metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start)
break inner d <- struct{}{}
}()
case <-d:
<-q
break task
case <-timeoutC:
errored = true
log.Error("error retrieving hash. timeout", "hash idx", i)
metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1)
break outer
default:
} }
} }
if errored {
break outer
}
networkDepth = i networkDepth = i
metrics.GetOrRegisterGauge("sliding-window.network-depth", nil).Update(int64(networkDepth)) metrics.GetOrRegisterGauge("sliding-window.network-depth", nil).Update(int64(networkDepth))
log.Info("sliding window test successfully fetched file", "currentDepth", networkDepth)
// this test might take a long time to finish - but we'd like to see metrics while they accumulate and not just when
// the test finishes. therefore emit the metrics on each iteration
emitMetrics(ctx)
} }
} }