diff --git a/consul/rpc.go b/consul/rpc.go index c9269f4e51..27df98cef0 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -7,6 +7,7 @@ import ( "io" "math/rand" "net" + "time" ) type RPCType byte @@ -16,6 +17,10 @@ const ( rpcRaft ) +const ( + maxQueryTime = 600 * time.Second +) + // listen is used to listen for incoming RPC connections func (s *Server) listen() { for { @@ -145,3 +150,51 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, return future.Response(), nil } + +// blockingRPC is used for queries that need to wait for a +// minimum index. This is used to block and wait for changes. +func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error { + var timeout <-chan time.Time + var notifyCh chan struct{} + + // Fast path non-blocking + if b.MinQueryIndex == 0 { + goto RUN_QUERY + } + + // Restrict the max query time + if b.MaxQueryTime > maxQueryTime { + b.MaxQueryTime = maxQueryTime + } + + // Ensure a time limit is set if we have an index + if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 { + b.MaxQueryTime = maxQueryTime + } + + // Setup a query timeout + if b.MaxQueryTime > 0 { + timeout = time.After(b.MaxQueryTime) + } + + // Setup a notification channel for changes +SETUP_NOTIFY: + if b.MinQueryIndex > 0 { + notifyCh = make(chan struct{}, 1) + s.fsm.State().Watch(tables, notifyCh) + } + + // Run the query function +RUN_QUERY: + idx, err := run() + + // Check for minimum query time + if err == nil && idx <= b.MinQueryIndex { + select { + case <-notifyCh: + goto SETUP_NOTIFY + case <-timeout: + } + } + return err +}