agent/cache-types/ca-leaf: proper result for timeout, race on setting CA

This commit is contained in:
Mitchell Hashimoto 2018-04-16 12:25:35 +02:00
parent fcb15e15ae
commit be873d2558
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A

View File

@ -77,8 +77,11 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// Block on the events that wake us up.
select {
case <-timeoutCh:
// TODO: what is the right error for a timeout?
return result, fmt.Errorf("timeout")
// On a timeout, we just return the empty result and no error.
// It isn't an error to timeout, its just the limit of time the
// caching system wants us to block for. By returning an empty result
// the caching system will ignore.
return result, nil
case err := <-newRootCACh:
// A new root CA triggers us to refresh the leaf certificate.
@ -122,6 +125,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
if c.issuedCerts == nil {
c.issuedCerts = make(map[string]*structs.IssuedCert)
}
c.issuedCerts[reqReal.Service] = &reply
lastCert = &reply
}
@ -156,8 +160,24 @@ func (c *ConnectCALeaf) waitNewRootCA(ch chan<- error, timeout time.Duration) {
return
}
// Set the new index
atomic.StoreUint64(&c.caIndex, roots.QueryMeta.Index)
// We do a loop here because there can be multiple waitNewRootCA calls
// happening simultaneously. Each Fetch kicks off one call. These are
// multiplexed through Cache.Get which should ensure we only ever
// actually make a single RPC call. However, there is a race to set
// the caIndex field so do a basic CAS loop here.
for {
// We only set our index if its newer than what is previously set.
old := atomic.LoadUint64(&c.caIndex)
if old == roots.Index || old > roots.Index {
break
}
// Set the new index atomically. If the caIndex value changed
// in the meantime, retry.
if atomic.CompareAndSwapUint64(&c.caIndex, old, roots.Index) {
break
}
}
// Trigger the channel since we updated.
ch <- nil