From 9b8e753b1534005b7135a3754eb0cf0262208dd2 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 30 Sep 2014 10:03:47 -0700 Subject: [PATCH] consul: make forwarding to multiple datacenters parallel --- consul/internal_endpoint.go | 8 ++-- consul/keyring.go | 55 -------------------------- consul/serf.go | 79 +++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 59 deletions(-) delete mode 100644 consul/keyring.go diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index 9d8d000e9a..ee22707002 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -79,7 +79,7 @@ func (m *Internal) ListKeys( // Mark key rotation as being already forwarded, then forward. args.Forwarded = true - m.srv.forwardKeyringRPC("Internal.ListKeys", args, reply) + m.srv.keyringRPC("Internal.ListKeys", args, reply) } return nil @@ -100,7 +100,7 @@ func (m *Internal) InstallKey( ingestKeyringResponse(respWAN, reply, dc, true, err) args.Forwarded = true - m.srv.forwardKeyringRPC("Internal.InstallKey", args, reply) + m.srv.keyringRPC("Internal.InstallKey", args, reply) } return nil @@ -121,7 +121,7 @@ func (m *Internal) UseKey( ingestKeyringResponse(respWAN, reply, dc, true, err) args.Forwarded = true - m.srv.forwardKeyringRPC("Internal.UseKey", args, reply) + m.srv.keyringRPC("Internal.UseKey", args, reply) } return nil @@ -141,7 +141,7 @@ func (m *Internal) RemoveKey( ingestKeyringResponse(respWAN, reply, dc, true, err) args.Forwarded = true - m.srv.forwardKeyringRPC("Internal.RemoveKey", args, reply) + m.srv.keyringRPC("Internal.RemoveKey", args, reply) } return nil diff --git a/consul/keyring.go b/consul/keyring.go deleted file mode 100644 index 373523118e..0000000000 --- a/consul/keyring.go +++ /dev/null @@ -1,55 +0,0 @@ -package consul - -import ( - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/serf/serf" -) - -// ingestKeyringResponse is a helper method to pick the relative information -// from a Serf message and stuff it into a KeyringResponse. -func ingestKeyringResponse( - serfResp *serf.KeyResponse, reply *structs.KeyringResponses, - dc string, wan bool, err error) { - - errStr := "" - if err != nil { - errStr = err.Error() - } - - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - WAN: wan, - Datacenter: dc, - Messages: serfResp.Messages, - Keys: serfResp.Keys, - NumNodes: serfResp.NumNodes, - Error: errStr, - }) -} - -// forwardKeyringRPC is used to forward a keyring-related RPC request to one -// server in each datacenter. Since the net/rpc package writes replies in-place, -// we use this specialized method for dealing with keyring-related replies -// specifically by appending them to a wrapper response struct. -// -// This will only error for RPC-related errors. Otherwise, application-level -// errors are returned inside of the inner response objects. -func (s *Server) forwardKeyringRPC( - method string, - args *structs.KeyringRequest, - replies *structs.KeyringResponses) error { - - for dc, _ := range s.remoteConsuls { - if dc == s.config.Datacenter { - continue - } - rr := structs.KeyringResponses{} - if err := s.forwardDC(method, dc, args, &rr); err != nil { - return err - } - for _, r := range rr.Responses { - replies.Responses = append(replies.Responses, r) - } - } - - return nil -} diff --git a/consul/serf.go b/consul/serf.go index ae1dacc33e..bfdf5668e8 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -4,6 +4,7 @@ import ( "net" "strings" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/serf" ) @@ -276,3 +277,81 @@ func (s *Server) nodeFailed(me serf.MemberEvent, wan bool) { } } } + +// ingestKeyringResponse is a helper method to pick the relative information +// from a Serf message and stuff it into a KeyringResponse. +func ingestKeyringResponse( + serfResp *serf.KeyResponse, reply *structs.KeyringResponses, + dc string, wan bool, err error) { + + errStr := "" + if err != nil { + errStr = err.Error() + } + + reply.Responses = append(reply.Responses, &structs.KeyringResponse{ + WAN: wan, + Datacenter: dc, + Messages: serfResp.Messages, + Keys: serfResp.Keys, + NumNodes: serfResp.NumNodes, + Error: errStr, + }) +} + +// forwardKeyring handles sending an RPC request to a remote datacenter and +// funneling any errors or responses back through the provided channels. +func (s *Server) forwardKeyringRPC( + method, dc string, + args *structs.KeyringRequest, + errorCh chan<- error, + respCh chan<- *structs.KeyringResponses) { + + rr := structs.KeyringResponses{} + if err := s.forwardDC(method, dc, args, &rr); err != nil { + errorCh <- err + return + } + respCh <- &rr + return +} + +// keyringRPC is used to forward a keyring-related RPC request to one +// server in each datacenter. This will only error for RPC-related errors. +// Otherwise, application-level errors are returned inside of the inner +// response objects. +func (s *Server) keyringRPC( + method string, + args *structs.KeyringRequest, + replies *structs.KeyringResponses) error { + + errorCh := make(chan error) + respCh := make(chan *structs.KeyringResponses) + + for dc, _ := range s.remoteConsuls { + if dc == s.config.Datacenter { + continue + } + go s.forwardKeyringRPC(method, dc, args, errorCh, respCh) + } + + rlen := len(s.remoteConsuls) - 1 + done := 0 + for { + select { + case err := <-errorCh: + return err + case rr := <-respCh: + for _, r := range rr.Responses { + replies.Responses = append(replies.Responses, r) + } + done++ + } + + if done == rlen { + break + } + } + + return nil +}