mirror of https://github.com/status-im/consul.git
consul: generalize multi-DC RPC call broadcasts
This commit is contained in:
parent
c11f6b5152
commit
9556347609
|
@ -2,6 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Internal endpoint is used to query the miscellaneous info that
|
// Internal endpoint is used to query the miscellaneous info that
|
||||||
|
@ -77,9 +78,8 @@ func (m *Internal) ListKeys(
|
||||||
respWAN, err := m.srv.KeyManagerWAN().ListKeys()
|
respWAN, err := m.srv.KeyManagerWAN().ListKeys()
|
||||||
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
||||||
|
|
||||||
// Mark key rotation as being already forwarded, then forward.
|
|
||||||
args.Forwarded = true
|
args.Forwarded = true
|
||||||
m.srv.keyringRPC("Internal.ListKeys", args, reply)
|
m.srv.globalRPC("Internal.ListKeys", args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -100,7 +100,7 @@ func (m *Internal) InstallKey(
|
||||||
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
||||||
|
|
||||||
args.Forwarded = true
|
args.Forwarded = true
|
||||||
m.srv.keyringRPC("Internal.InstallKey", args, reply)
|
m.srv.globalRPC("Internal.InstallKey", args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -121,7 +121,7 @@ func (m *Internal) UseKey(
|
||||||
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
||||||
|
|
||||||
args.Forwarded = true
|
args.Forwarded = true
|
||||||
m.srv.keyringRPC("Internal.UseKey", args, reply)
|
m.srv.globalRPC("Internal.UseKey", args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -141,8 +141,29 @@ func (m *Internal) RemoveKey(
|
||||||
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
ingestKeyringResponse(respWAN, reply, dc, true, err)
|
||||||
|
|
||||||
args.Forwarded = true
|
args.Forwarded = true
|
||||||
m.srv.keyringRPC("Internal.RemoveKey", args, reply)
|
m.srv.globalRPC("Internal.RemoveKey", args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ingestKeyringResponse is a helper method to pick the relative information
|
||||||
|
// from a Serf message and stuff it into a KeyringResponse.
|
||||||
|
func ingestKeyringResponse(
|
||||||
|
serfResp *serf.KeyResponse, reply *structs.KeyringResponses,
|
||||||
|
dc string, wan bool, err error) {
|
||||||
|
|
||||||
|
errStr := ""
|
||||||
|
if err != nil {
|
||||||
|
errStr = err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Responses = append(reply.Responses, &structs.KeyringResponse{
|
||||||
|
WAN: wan,
|
||||||
|
Datacenter: dc,
|
||||||
|
Messages: serfResp.Messages,
|
||||||
|
Keys: serfResp.Keys,
|
||||||
|
NumNodes: serfResp.NumNodes,
|
||||||
|
Error: errStr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -223,6 +223,49 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
|
||||||
return s.connPool.RPC(server.Addr, server.Version, method, args, reply)
|
return s.connPool.RPC(server.Addr, server.Version, method, args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// globalRPC is used to forward an RPC request to one server in each datacenter.
|
||||||
|
// This will only error for RPC-related errors. Otherwise, application-level
|
||||||
|
// errors are returned inside of the inner response objects.
|
||||||
|
func (s *Server) globalRPC(method string, args interface{},
|
||||||
|
reply structs.CompoundResponse) error {
|
||||||
|
|
||||||
|
rlen := len(s.remoteConsuls)
|
||||||
|
if rlen < 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
errorCh := make(chan error)
|
||||||
|
respCh := make(chan interface{})
|
||||||
|
|
||||||
|
// Make a new request into each datacenter
|
||||||
|
for dc, _ := range s.remoteConsuls {
|
||||||
|
info := &structs.GenericRPC{Datacenter: dc}
|
||||||
|
go func() {
|
||||||
|
rr := reply.New()
|
||||||
|
if _, err := s.forward(method, info, args, &rr); err != nil {
|
||||||
|
errorCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respCh <- rr
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
done := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-errorCh:
|
||||||
|
return err
|
||||||
|
case rr := <-respCh:
|
||||||
|
reply.Add(rr)
|
||||||
|
done++
|
||||||
|
}
|
||||||
|
if done == rlen {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// raftApply is used to encode a message, run it through raft, and return
|
// raftApply is used to encode a message, run it through raft, and return
|
||||||
// the FSM response along with any errors
|
// the FSM response along with any errors
|
||||||
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -277,85 +276,3 @@ func (s *Server) nodeFailed(me serf.MemberEvent, wan bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ingestKeyringResponse is a helper method to pick the relative information
|
|
||||||
// from a Serf message and stuff it into a KeyringResponse.
|
|
||||||
func ingestKeyringResponse(
|
|
||||||
serfResp *serf.KeyResponse, reply *structs.KeyringResponses,
|
|
||||||
dc string, wan bool, err error) {
|
|
||||||
|
|
||||||
errStr := ""
|
|
||||||
if err != nil {
|
|
||||||
errStr = err.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
reply.Responses = append(reply.Responses, &structs.KeyringResponse{
|
|
||||||
WAN: wan,
|
|
||||||
Datacenter: dc,
|
|
||||||
Messages: serfResp.Messages,
|
|
||||||
Keys: serfResp.Keys,
|
|
||||||
NumNodes: serfResp.NumNodes,
|
|
||||||
Error: errStr,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// forwardKeyring handles sending an RPC request to a remote datacenter and
|
|
||||||
// funneling any errors or responses back through the provided channels.
|
|
||||||
func (s *Server) forwardKeyringRPC(
|
|
||||||
method, dc string,
|
|
||||||
args *structs.KeyringRequest,
|
|
||||||
errorCh chan<- error,
|
|
||||||
respCh chan<- *structs.KeyringResponses) {
|
|
||||||
|
|
||||||
rr := structs.KeyringResponses{}
|
|
||||||
if err := s.forwardDC(method, dc, args, &rr); err != nil {
|
|
||||||
errorCh <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
respCh <- &rr
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// keyringRPC is used to forward a keyring-related RPC request to one
|
|
||||||
// server in each datacenter. This will only error for RPC-related errors.
|
|
||||||
// Otherwise, application-level errors are returned inside of the inner
|
|
||||||
// response objects.
|
|
||||||
func (s *Server) keyringRPC(
|
|
||||||
method string,
|
|
||||||
args *structs.KeyringRequest,
|
|
||||||
replies *structs.KeyringResponses) error {
|
|
||||||
|
|
||||||
rlen := len(s.remoteConsuls) - 1
|
|
||||||
if rlen == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
errorCh := make(chan error)
|
|
||||||
respCh := make(chan *structs.KeyringResponses)
|
|
||||||
|
|
||||||
for dc, _ := range s.remoteConsuls {
|
|
||||||
if dc == s.config.Datacenter {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
go s.forwardKeyringRPC(method, dc, args, errorCh, respCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
done := 0
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case err := <-errorCh:
|
|
||||||
return err
|
|
||||||
case rr := <-respCh:
|
|
||||||
for _, r := range rr.Responses {
|
|
||||||
replies.Responses = append(replies.Responses, r)
|
|
||||||
}
|
|
||||||
done++
|
|
||||||
}
|
|
||||||
|
|
||||||
if done == rlen {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -127,6 +127,16 @@ type QueryMeta struct {
|
||||||
KnownLeader bool
|
KnownLeader bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GenericRPC is the simplest possible RPCInfo implementation
|
||||||
|
type GenericRPC struct {
|
||||||
|
Datacenter string
|
||||||
|
QueryOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GenericRPC) RequestDatacenter() string {
|
||||||
|
return r.Datacenter
|
||||||
|
}
|
||||||
|
|
||||||
// RegisterRequest is used for the Catalog.Register endpoint
|
// RegisterRequest is used for the Catalog.Register endpoint
|
||||||
// to register a node as providing a service. If no service
|
// to register a node as providing a service. If no service
|
||||||
// is provided, the node is registered.
|
// is provided, the node is registered.
|
||||||
|
@ -532,14 +542,31 @@ func Encode(t MessageType, msg interface{}) ([]byte, error) {
|
||||||
return buf.Bytes(), err
|
return buf.Bytes(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CompoundResponse is an interface for gathering multiple responses. It is
|
||||||
|
// used in cross-datacenter RPC calls where more than 1 datacenter is
|
||||||
|
// expected to reply.
|
||||||
|
type CompoundResponse interface {
|
||||||
|
// Add adds a new response to the compound response
|
||||||
|
Add(interface{})
|
||||||
|
|
||||||
|
// New returns an empty response object which can be passed around by
|
||||||
|
// reference, and then passed to Add() later on.
|
||||||
|
New() interface{}
|
||||||
|
}
|
||||||
|
|
||||||
// KeyringRequest encapsulates a request to modify an encryption keyring.
|
// KeyringRequest encapsulates a request to modify an encryption keyring.
|
||||||
// It can be used for install, remove, or use key type operations.
|
// It can be used for install, remove, or use key type operations.
|
||||||
type KeyringRequest struct {
|
type KeyringRequest struct {
|
||||||
Key string
|
Key string
|
||||||
|
Datacenter string
|
||||||
Forwarded bool
|
Forwarded bool
|
||||||
QueryOptions
|
QueryOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *KeyringRequest) RequestDatacenter() string {
|
||||||
|
return r.Datacenter
|
||||||
|
}
|
||||||
|
|
||||||
// KeyringResponse is a unified key response and can be used for install,
|
// KeyringResponse is a unified key response and can be used for install,
|
||||||
// remove, use, as well as listing key queries.
|
// remove, use, as well as listing key queries.
|
||||||
type KeyringResponse struct {
|
type KeyringResponse struct {
|
||||||
|
@ -558,3 +585,12 @@ type KeyringResponses struct {
|
||||||
Responses []*KeyringResponse
|
Responses []*KeyringResponse
|
||||||
QueryMeta
|
QueryMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *KeyringResponses) Add(v interface{}) {
|
||||||
|
val := v.(*KeyringResponses)
|
||||||
|
r.Responses = append(r.Responses, val.Responses...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *KeyringResponses) New() interface{} {
|
||||||
|
return new(KeyringResponses)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue