consul: make forwarding to multiple datacenters parallel

This commit is contained in:
Ryan Uber 2014-09-30 10:03:47 -07:00
parent 74c7b1239b
commit 9b8e753b15
3 changed files with 83 additions and 59 deletions

View File

@ -79,7 +79,7 @@ func (m *Internal) ListKeys(
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true
m.srv.forwardKeyringRPC("Internal.ListKeys", args, reply)
m.srv.keyringRPC("Internal.ListKeys", args, reply)
}
return nil
@ -100,7 +100,7 @@ func (m *Internal) InstallKey(
ingestKeyringResponse(respWAN, reply, dc, true, err)
args.Forwarded = true
m.srv.forwardKeyringRPC("Internal.InstallKey", args, reply)
m.srv.keyringRPC("Internal.InstallKey", args, reply)
}
return nil
@ -121,7 +121,7 @@ func (m *Internal) UseKey(
ingestKeyringResponse(respWAN, reply, dc, true, err)
args.Forwarded = true
m.srv.forwardKeyringRPC("Internal.UseKey", args, reply)
m.srv.keyringRPC("Internal.UseKey", args, reply)
}
return nil
@ -141,7 +141,7 @@ func (m *Internal) RemoveKey(
ingestKeyringResponse(respWAN, reply, dc, true, err)
args.Forwarded = true
m.srv.forwardKeyringRPC("Internal.RemoveKey", args, reply)
m.srv.keyringRPC("Internal.RemoveKey", args, reply)
}
return nil

View File

@ -1,55 +0,0 @@
package consul
import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)
// ingestKeyringResponse is a helper method to pick the relative information
// from a Serf message and stuff it into a KeyringResponse.
func ingestKeyringResponse(
serfResp *serf.KeyResponse, reply *structs.KeyringResponses,
dc string, wan bool, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
reply.Responses = append(reply.Responses, &structs.KeyringResponse{
WAN: wan,
Datacenter: dc,
Messages: serfResp.Messages,
Keys: serfResp.Keys,
NumNodes: serfResp.NumNodes,
Error: errStr,
})
}
// forwardKeyringRPC is used to forward a keyring-related RPC request to one
// server in each datacenter. Since the net/rpc package writes replies in-place,
// we use this specialized method for dealing with keyring-related replies
// specifically by appending them to a wrapper response struct.
//
// This will only error for RPC-related errors. Otherwise, application-level
// errors are returned inside of the inner response objects.
func (s *Server) forwardKeyringRPC(
method string,
args *structs.KeyringRequest,
replies *structs.KeyringResponses) error {
for dc, _ := range s.remoteConsuls {
if dc == s.config.Datacenter {
continue
}
rr := structs.KeyringResponses{}
if err := s.forwardDC(method, dc, args, &rr); err != nil {
return err
}
for _, r := range rr.Responses {
replies.Responses = append(replies.Responses, r)
}
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"net"
"strings"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)
@ -276,3 +277,81 @@ func (s *Server) nodeFailed(me serf.MemberEvent, wan bool) {
}
}
}
// ingestKeyringResponse is a helper method to pick the relative information
// from a Serf message and stuff it into a KeyringResponse.
func ingestKeyringResponse(
serfResp *serf.KeyResponse, reply *structs.KeyringResponses,
dc string, wan bool, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
reply.Responses = append(reply.Responses, &structs.KeyringResponse{
WAN: wan,
Datacenter: dc,
Messages: serfResp.Messages,
Keys: serfResp.Keys,
NumNodes: serfResp.NumNodes,
Error: errStr,
})
}
// forwardKeyring handles sending an RPC request to a remote datacenter and
// funneling any errors or responses back through the provided channels.
func (s *Server) forwardKeyringRPC(
method, dc string,
args *structs.KeyringRequest,
errorCh chan<- error,
respCh chan<- *structs.KeyringResponses) {
rr := structs.KeyringResponses{}
if err := s.forwardDC(method, dc, args, &rr); err != nil {
errorCh <- err
return
}
respCh <- &rr
return
}
// keyringRPC is used to forward a keyring-related RPC request to one
// server in each datacenter. This will only error for RPC-related errors.
// Otherwise, application-level errors are returned inside of the inner
// response objects.
func (s *Server) keyringRPC(
method string,
args *structs.KeyringRequest,
replies *structs.KeyringResponses) error {
errorCh := make(chan error)
respCh := make(chan *structs.KeyringResponses)
for dc, _ := range s.remoteConsuls {
if dc == s.config.Datacenter {
continue
}
go s.forwardKeyringRPC(method, dc, args, errorCh, respCh)
}
rlen := len(s.remoteConsuls) - 1
done := 0
for {
select {
case err := <-errorCh:
return err
case rr := <-respCh:
for _, r := range rr.Responses {
replies.Responses = append(replies.Responses, r)
}
done++
}
if done == rlen {
break
}
}
return nil
}