mirror of
https://github.com/status-im/consul.git
synced 2025-02-24 11:28:40 +00:00
consul: blockingRPC is a helper method for queries that need to block
This commit is contained in:
parent
985e3a0529
commit
ffa7173953
@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RPCType byte
|
type RPCType byte
|
||||||
@ -16,6 +17,10 @@ const (
|
|||||||
rpcRaft
|
rpcRaft
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxQueryTime = 600 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// listen is used to listen for incoming RPC connections
|
// listen is used to listen for incoming RPC connections
|
||||||
func (s *Server) listen() {
|
func (s *Server) listen() {
|
||||||
for {
|
for {
|
||||||
@ -145,3 +150,51 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
|
|||||||
|
|
||||||
return future.Response(), nil
|
return future.Response(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// blockingRPC is used for queries that need to wait for a
|
||||||
|
// minimum index. This is used to block and wait for changes.
|
||||||
|
func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error {
|
||||||
|
var timeout <-chan time.Time
|
||||||
|
var notifyCh chan struct{}
|
||||||
|
|
||||||
|
// Fast path non-blocking
|
||||||
|
if b.MinQueryIndex == 0 {
|
||||||
|
goto RUN_QUERY
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restrict the max query time
|
||||||
|
if b.MaxQueryTime > maxQueryTime {
|
||||||
|
b.MaxQueryTime = maxQueryTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure a time limit is set if we have an index
|
||||||
|
if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 {
|
||||||
|
b.MaxQueryTime = maxQueryTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup a query timeout
|
||||||
|
if b.MaxQueryTime > 0 {
|
||||||
|
timeout = time.After(b.MaxQueryTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup a notification channel for changes
|
||||||
|
SETUP_NOTIFY:
|
||||||
|
if b.MinQueryIndex > 0 {
|
||||||
|
notifyCh = make(chan struct{}, 1)
|
||||||
|
s.fsm.State().Watch(tables, notifyCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the query function
|
||||||
|
RUN_QUERY:
|
||||||
|
idx, err := run()
|
||||||
|
|
||||||
|
// Check for minimum query time
|
||||||
|
if err == nil && idx <= b.MinQueryIndex {
|
||||||
|
select {
|
||||||
|
case <-notifyCh:
|
||||||
|
goto SETUP_NOTIFY
|
||||||
|
case <-timeout:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user