diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index ee22707002..223b8eeff8 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -2,6 +2,7 @@ package consul import ( "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/serf/serf" ) // Internal endpoint is used to query the miscellaneous info that @@ -77,9 +78,8 @@ func (m *Internal) ListKeys( respWAN, err := m.srv.KeyManagerWAN().ListKeys() ingestKeyringResponse(respWAN, reply, dc, true, err) - // Mark key rotation as being already forwarded, then forward. args.Forwarded = true - m.srv.keyringRPC("Internal.ListKeys", args, reply) + m.srv.globalRPC("Internal.ListKeys", args, reply) } return nil @@ -100,7 +100,7 @@ func (m *Internal) InstallKey( ingestKeyringResponse(respWAN, reply, dc, true, err) args.Forwarded = true - m.srv.keyringRPC("Internal.InstallKey", args, reply) + m.srv.globalRPC("Internal.InstallKey", args, reply) } return nil @@ -121,7 +121,7 @@ func (m *Internal) UseKey( ingestKeyringResponse(respWAN, reply, dc, true, err) args.Forwarded = true - m.srv.keyringRPC("Internal.UseKey", args, reply) + m.srv.globalRPC("Internal.UseKey", args, reply) } return nil @@ -141,8 +141,29 @@ func (m *Internal) RemoveKey( ingestKeyringResponse(respWAN, reply, dc, true, err) args.Forwarded = true - m.srv.keyringRPC("Internal.RemoveKey", args, reply) + m.srv.globalRPC("Internal.RemoveKey", args, reply) } return nil } + +// ingestKeyringResponse is a helper method to pick the relative information +// from a Serf message and stuff it into a KeyringResponse. +func ingestKeyringResponse( + serfResp *serf.KeyResponse, reply *structs.KeyringResponses, + dc string, wan bool, err error) { + + errStr := "" + if err != nil { + errStr = err.Error() + } + + reply.Responses = append(reply.Responses, &structs.KeyringResponse{ + WAN: wan, + Datacenter: dc, + Messages: serfResp.Messages, + Keys: serfResp.Keys, + NumNodes: serfResp.NumNodes, + Error: errStr, + }) +} diff --git a/consul/rpc.go b/consul/rpc.go index cd5c36ebd3..6fcb07d820 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -223,6 +223,49 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ return s.connPool.RPC(server.Addr, server.Version, method, args, reply) } +// globalRPC is used to forward an RPC request to one server in each datacenter. +// This will only error for RPC-related errors. Otherwise, application-level +// errors are returned inside of the inner response objects. +func (s *Server) globalRPC(method string, args interface{}, + reply structs.CompoundResponse) error { + + rlen := len(s.remoteConsuls) + if rlen < 2 { + return nil + } + + errorCh := make(chan error) + respCh := make(chan interface{}) + + // Make a new request into each datacenter + for dc, _ := range s.remoteConsuls { + info := &structs.GenericRPC{Datacenter: dc} + go func() { + rr := reply.New() + if _, err := s.forward(method, info, args, &rr); err != nil { + errorCh <- err + return + } + respCh <- rr + }() + } + + done := 0 + for { + select { + case err := <-errorCh: + return err + case rr := <-respCh: + reply.Add(rr) + done++ + } + if done == rlen { + break + } + } + return nil +} + // raftApply is used to encode a message, run it through raft, and return // the FSM response along with any errors func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) { diff --git a/consul/serf.go b/consul/serf.go index 02d0683741..ae1dacc33e 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -4,7 +4,6 @@ import ( "net" "strings" - "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/serf" ) @@ -277,85 +276,3 @@ func (s *Server) nodeFailed(me serf.MemberEvent, wan bool) { } } } - -// ingestKeyringResponse is a helper method to pick the relative information -// from a Serf message and stuff it into a KeyringResponse. -func ingestKeyringResponse( - serfResp *serf.KeyResponse, reply *structs.KeyringResponses, - dc string, wan bool, err error) { - - errStr := "" - if err != nil { - errStr = err.Error() - } - - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - WAN: wan, - Datacenter: dc, - Messages: serfResp.Messages, - Keys: serfResp.Keys, - NumNodes: serfResp.NumNodes, - Error: errStr, - }) -} - -// forwardKeyring handles sending an RPC request to a remote datacenter and -// funneling any errors or responses back through the provided channels. -func (s *Server) forwardKeyringRPC( - method, dc string, - args *structs.KeyringRequest, - errorCh chan<- error, - respCh chan<- *structs.KeyringResponses) { - - rr := structs.KeyringResponses{} - if err := s.forwardDC(method, dc, args, &rr); err != nil { - errorCh <- err - return - } - respCh <- &rr - return -} - -// keyringRPC is used to forward a keyring-related RPC request to one -// server in each datacenter. This will only error for RPC-related errors. -// Otherwise, application-level errors are returned inside of the inner -// response objects. -func (s *Server) keyringRPC( - method string, - args *structs.KeyringRequest, - replies *structs.KeyringResponses) error { - - rlen := len(s.remoteConsuls) - 1 - if rlen == 0 { - return nil - } - - errorCh := make(chan error) - respCh := make(chan *structs.KeyringResponses) - - for dc, _ := range s.remoteConsuls { - if dc == s.config.Datacenter { - continue - } - go s.forwardKeyringRPC(method, dc, args, errorCh, respCh) - } - - done := 0 - for { - select { - case err := <-errorCh: - return err - case rr := <-respCh: - for _, r := range rr.Responses { - replies.Responses = append(replies.Responses, r) - } - done++ - } - - if done == rlen { - break - } - } - - return nil -} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index dfaadbaa71..1b3ead9607 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -127,6 +127,16 @@ type QueryMeta struct { KnownLeader bool } +// GenericRPC is the simplest possible RPCInfo implementation +type GenericRPC struct { + Datacenter string + QueryOptions +} + +func (r *GenericRPC) RequestDatacenter() string { + return r.Datacenter +} + // RegisterRequest is used for the Catalog.Register endpoint // to register a node as providing a service. If no service // is provided, the node is registered. @@ -532,14 +542,31 @@ func Encode(t MessageType, msg interface{}) ([]byte, error) { return buf.Bytes(), err } +// CompoundResponse is an interface for gathering multiple responses. It is +// used in cross-datacenter RPC calls where more than 1 datacenter is +// expected to reply. +type CompoundResponse interface { + // Add adds a new response to the compound response + Add(interface{}) + + // New returns an empty response object which can be passed around by + // reference, and then passed to Add() later on. + New() interface{} +} + // KeyringRequest encapsulates a request to modify an encryption keyring. // It can be used for install, remove, or use key type operations. type KeyringRequest struct { - Key string - Forwarded bool + Key string + Datacenter string + Forwarded bool QueryOptions } +func (r *KeyringRequest) RequestDatacenter() string { + return r.Datacenter +} + // KeyringResponse is a unified key response and can be used for install, // remove, use, as well as listing key queries. type KeyringResponse struct { @@ -558,3 +585,12 @@ type KeyringResponses struct { Responses []*KeyringResponse QueryMeta } + +func (r *KeyringResponses) Add(v interface{}) { + val := v.(*KeyringResponses) + r.Responses = append(r.Responses, val.Responses...) +} + +func (r *KeyringResponses) New() interface{} { + return new(KeyringResponses) +}