Add datacenters support

This commit is contained in:
Ivan Danyliuk 2018-06-20 18:42:24 +02:00
parent a5dd8d5987
commit 5321afb814
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF
4 changed files with 50 additions and 7 deletions

View File

@ -9,7 +9,7 @@ import (
// ClusterSource represents knowledge source of
// cluster configuration.
type ClusterSource interface {
IPs(ctx context.Context, dc, tag string) ([]string, error)
IPs(ctx context.Context, tag string) ([]string, error)
}
// Fetched implements data fetching from multiple sources
@ -28,8 +28,8 @@ func NewFetcher(cluster ClusterSource, rpc RPCClient) *Fetcher {
}
// Nodes returns the list of nodes for the given datacentre 'dc' and tag.
func (f *Fetcher) Nodes(ctx context.Context, dc, tag string) ([]*ClusterNode, error) {
ips, err := f.cluster.IPs(ctx, dc, tag)
func (f *Fetcher) Nodes(ctx context.Context, tag string) ([]*ClusterNode, error) {
ips, err := f.cluster.IPs(ctx, tag)
if err != nil {
return nil, err
}

View File

@ -17,7 +17,7 @@ func NewMockConsulSource() ClusterSource {
// Node returns the list of mock nodes for the given datacentre 'dc' and tag.
// Satisfies ClusterSource interface.
func (c *MockConsulSource) IPs(ctx context.Context, dc, tag string) ([]string, error) {
func (c *MockConsulSource) IPs(ctx context.Context, tag string) ([]string, error) {
r := bytes.NewBufferString(mockClusterIPsJSON)
return ParseConsulResponse(r)
}

View File

@ -28,8 +28,39 @@ func NewConsul(hostport string) *Consul {
}
}
// IPs returns the list of IPs for the given datacenter and tag from Consul.
func (c *Consul) IPs(ctx context.Context, dc, tag string) ([]string, error) {
// IPs returns the list of IPs for all datacenters and tag from Consul.
func (c *Consul) IPs(ctx context.Context, tag string) ([]string, error) {
url := fmt.Sprintf("http://%s/v1/catalog/datacenters", c.hostport)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("datacenters request: %s", err)
}
req = req.WithContext(ctx)
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("http call: %s", err)
}
defer resp.Body.Close()
dcs, err := ParseConsulDatacentersResponse(resp.Body)
if err != nil {
return nil, fmt.Errorf("get datacenters: %s", err)
}
var ret []string
for _, dc := range dcs {
ips, err := c.IPsDC(ctx, dc, tag)
if err != nil {
return nil, fmt.Errorf("ips for datacenter %s: %s", dc, err)
}
ret = append(ret, ips...)
}
return ret, nil
}
// IPsDC returns the list of IPs for the given datacenter and tag from Consul.
func (c *Consul) IPsDC(ctx context.Context, dc, tag string) ([]string, error) {
url := fmt.Sprintf("http://%s/v1/catalog/service/statusd-rpc?tag=%s", c.hostport, tag)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
@ -79,3 +110,15 @@ func ParseConsulResponse(r io.Reader) ([]string, error) {
}
return ret, nil
}
// ParseConsulDatacentersResponse parses JSON output from Consul datacenters response with
// the list of known datacenters.
func ParseConsulDatacentersResponse(r io.Reader) ([]string, error) {
var resp []string
err := json.NewDecoder(r).Decode(&resp)
if err != nil {
return nil, fmt.Errorf("unmarshal: %s", err)
}
return resp, nil
}

View File

@ -44,7 +44,7 @@ func main() {
// BuildGraph performs new cycle of updating data from
// fetcher source and populating graph object.
func BuildGraph(ctx context.Context, fetcher *Fetcher) (*graph.Graph, error) {
nodes, err := fetcher.Nodes(ctx, "", "eth.beta")
nodes, err := fetcher.Nodes(ctx, "eth.beta")
if err != nil {
return nil, fmt.Errorf("list of ips: %s", err)
}