From 15f045596b4ace507f3c92db239cc64cc7b8ce34 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 11 Dec 2013 16:33:19 -0800 Subject: [PATCH] Adding support for cross-dc forwarding --- consul/catalog_endpoint_test.go | 36 +++++++++++++++++++++++++++++++++ consul/rpc.go | 18 +++++++++++++++-- rpc/structs.go | 1 + 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 14920a17e7..0e9677feea 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -1,6 +1,7 @@ package consul import ( + "fmt" "github.com/hashicorp/consul/rpc" "os" "testing" @@ -37,6 +38,41 @@ func TestCatalogRegister(t *testing.T) { } } +func TestCatalogRegister_ForwardDC(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + dir2, s2 := testServerDC(t, "dc2") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.Port) + if err := s2.JoinWAN(addr); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the leaders + time.Sleep(100 * time.Millisecond) + + arg := rpc.RegisterRequest{ + Datacenter: "dc2", // SHould forward through s1 + Node: "foo", + Address: "127.0.0.1", + ServiceName: "db", + ServiceTag: "master", + ServicePort: 8000, + } + var out struct{} + if err := client.Call("Catalog.Register", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } +} + func TestCatalogDeregister(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/rpc.go b/consul/rpc.go index e0addca259..2ff6e5959f 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/consul/rpc" "github.com/ugorji/go/codec" "io" + "math/rand" "net" ) @@ -111,8 +112,21 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{ // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error { - // TODO: Fix - return fmt.Errorf("DC forwarding not supported") + // Bail if we can't find any servers + s.remoteLock.RLock() + servers := s.remoteConsuls[dc] + if len(servers) == 0 { + s.remoteLock.RUnlock() + return rpc.ErrNoDCPath + } + + // Select a random addr + offset := rand.Int31() % int32(len(servers)) + server := servers[offset] + s.remoteLock.RUnlock() + + // Forward to remote Consul + return s.connPool.RPC(server, method, args, reply) } // raftApply is used to encode a message, run it through raft, and return diff --git a/rpc/structs.go b/rpc/structs.go index c5eaf7409b..fdad669e3d 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -8,6 +8,7 @@ import ( var ( ErrNoLeader = fmt.Errorf("No cluster leader") + ErrNoDCPath = fmt.Errorf("No path to datacenter") ) type MessageType uint8