diff --git a/cluster.go b/cluster.go index 71c5b6a..b46143f 100644 --- a/cluster.go +++ b/cluster.go @@ -9,7 +9,7 @@ import ( // ClusterSource represents knowledge source of // cluster configuration. type ClusterSource interface { - IPs(ctx context.Context, dc, tag string) ([]string, error) + IPs(ctx context.Context, tag string) ([]string, error) } // Fetched implements data fetching from multiple sources @@ -28,8 +28,8 @@ func NewFetcher(cluster ClusterSource, rpc RPCClient) *Fetcher { } // Nodes returns the list of nodes for the given datacentre 'dc' and tag. -func (f *Fetcher) Nodes(ctx context.Context, dc, tag string) ([]*ClusterNode, error) { - ips, err := f.cluster.IPs(ctx, dc, tag) +func (f *Fetcher) Nodes(ctx context.Context, tag string) ([]*ClusterNode, error) { + ips, err := f.cluster.IPs(ctx, tag) if err != nil { return nil, err } diff --git a/cluster_mock.go b/cluster_mock.go index 41c523c..0b6bc25 100644 --- a/cluster_mock.go +++ b/cluster_mock.go @@ -17,7 +17,7 @@ func NewMockConsulSource() ClusterSource { // Node returns the list of mock nodes for the given datacentre 'dc' and tag. // Satisfies ClusterSource interface. -func (c *MockConsulSource) IPs(ctx context.Context, dc, tag string) ([]string, error) { +func (c *MockConsulSource) IPs(ctx context.Context, tag string) ([]string, error) { r := bytes.NewBufferString(mockClusterIPsJSON) return ParseConsulResponse(r) } diff --git a/consul.go b/consul.go index 2aca0fa..ba2fe47 100644 --- a/consul.go +++ b/consul.go @@ -28,8 +28,39 @@ func NewConsul(hostport string) *Consul { } } -// IPs returns the list of IPs for the given datacenter and tag from Consul. -func (c *Consul) IPs(ctx context.Context, dc, tag string) ([]string, error) { +// IPs returns the list of IPs for all datacenters and tag from Consul. +func (c *Consul) IPs(ctx context.Context, tag string) ([]string, error) { + url := fmt.Sprintf("http://%s/v1/catalog/datacenters", c.hostport) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("datacenters request: %s", err) + } + req = req.WithContext(ctx) + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("http call: %s", err) + } + defer resp.Body.Close() + + dcs, err := ParseConsulDatacentersResponse(resp.Body) + if err != nil { + return nil, fmt.Errorf("get datacenters: %s", err) + } + + var ret []string + for _, dc := range dcs { + ips, err := c.IPsDC(ctx, dc, tag) + if err != nil { + return nil, fmt.Errorf("ips for datacenter %s: %s", dc, err) + } + ret = append(ret, ips...) + } + return ret, nil +} + +// IPsDC returns the list of IPs for the given datacenter and tag from Consul. +func (c *Consul) IPsDC(ctx context.Context, dc, tag string) ([]string, error) { url := fmt.Sprintf("http://%s/v1/catalog/service/statusd-rpc?tag=%s", c.hostport, tag) req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -79,3 +110,15 @@ func ParseConsulResponse(r io.Reader) ([]string, error) { } return ret, nil } + +// ParseConsulDatacentersResponse parses JSON output from Consul datacenters response with +// the list of known datacenters. +func ParseConsulDatacentersResponse(r io.Reader) ([]string, error) { + var resp []string + err := json.NewDecoder(r).Decode(&resp) + if err != nil { + return nil, fmt.Errorf("unmarshal: %s", err) + } + + return resp, nil +} diff --git a/main.go b/main.go index 9799c43..b61a2fd 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func main() { // BuildGraph performs new cycle of updating data from // fetcher source and populating graph object. func BuildGraph(ctx context.Context, fetcher *Fetcher) (*graph.Graph, error) { - nodes, err := fetcher.Nodes(ctx, "", "eth.beta") + nodes, err := fetcher.Nodes(ctx, "eth.beta") if err != nil { return nil, fmt.Errorf("list of ips: %s", err) }