diff --git a/.gitignore b/.gitignore index 5508cd7..10299e2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ clustervis +clustervis-linux-amd64 web/node_modules .vscode diff --git a/cluster.go b/cluster.go index 92ff5a2..71c5b6a 100644 --- a/cluster.go +++ b/cluster.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "sort" ) @@ -8,7 +9,7 @@ import ( // ClusterSource represents knowledge source of // cluster configuration. type ClusterSource interface { - IPs(dc, tag string) ([]string, error) + IPs(ctx context.Context, dc, tag string) ([]string, error) } // Fetched implements data fetching from multiple sources @@ -27,15 +28,15 @@ func NewFetcher(cluster ClusterSource, rpc RPCClient) *Fetcher { } // Nodes returns the list of nodes for the given datacentre 'dc' and tag. -func (f *Fetcher) Nodes(dc, tag string) ([]*ClusterNode, error) { - ips, err := f.cluster.IPs(dc, tag) +func (f *Fetcher) Nodes(ctx context.Context, dc, tag string) ([]*ClusterNode, error) { + ips, err := f.cluster.IPs(ctx, dc, tag) if err != nil { return nil, err } var ret []*ClusterNode for _, ip := range ips { - nodeInfo, err := f.rpc.NodeInfo(ip) + nodeInfo, err := f.rpc.NodeInfo(ctx, ip) if err != nil { return nil, err } @@ -47,12 +48,12 @@ func (f *Fetcher) Nodes(dc, tag string) ([]*ClusterNode, error) { } // NodePeers runs `admin_peers` command for each node. -func (f *Fetcher) NodePeers(nodes []*ClusterNode) ([]*Node, []*Link, error) { +func (f *Fetcher) NodePeers(ctx context.Context, nodes []*ClusterNode) ([]*Node, []*Link, error) { m := make(map[string]*Node) var links []*Link for _, node := range nodes { // TODO: run concurrently - peers, err := f.rpc.AdminPeers(node.IP) + peers, err := f.rpc.AdminPeers(ctx, node.IP) if err != nil { log.Printf("[ERROR] Failed to get peers from %s\n", node.IP) continue diff --git a/cluster_mock.go b/cluster_mock.go index 9efa50d..41c523c 100644 --- a/cluster_mock.go +++ b/cluster_mock.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" ) // MockConsulSource implements ClusterSource for local @@ -16,7 +17,7 @@ func NewMockConsulSource() ClusterSource { // Node returns the list of mock nodes for the given datacentre 'dc' and tag. // Satisfies ClusterSource interface. -func (c *MockConsulSource) IPs(dc, tag string) ([]string, error) { +func (c *MockConsulSource) IPs(ctx context.Context, dc, tag string) ([]string, error) { r := bytes.NewBufferString(mockClusterIPsJSON) return ParseConsulResponse(r) } diff --git a/consul.go b/consul.go index 11206b6..2aca0fa 100644 --- a/consul.go +++ b/consul.go @@ -1,31 +1,45 @@ package main import ( + "context" "encoding/json" "fmt" "io" "net/http" + "time" ) // Consul implements ClusterSource for Consul. type Consul struct { hostport string + client *http.Client } +const ConsulTimeout = 5 * time.Second // it's localhost, should be in ms actually + // NewConsul creates new Consul source. It doesn't attempt // to connect or verify if address is correct. func NewConsul(hostport string) *Consul { return &Consul{ hostport: hostport, + client: &http.Client{ + Timeout: ConsulTimeout, + }, } } // IPs returns the list of IPs for the given datacenter and tag from Consul. -func (c *Consul) IPs(dc, tag string) ([]string, error) { +func (c *Consul) IPs(ctx context.Context, dc, tag string) ([]string, error) { url := fmt.Sprintf("http://%s/v1/catalog/service/statusd-rpc?tag=%s", c.hostport, tag) - resp, err := http.Get(url) + req, err := http.NewRequest("GET", url, nil) if err != nil { - return nil, fmt.Errorf("http call failed: %s", err) + return nil, fmt.Errorf("request: %s", err) + } + req = req.WithContext(ctx) + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("http call: %s", err) } defer resp.Body.Close() diff --git a/jsonrpc.go b/jsonrpc.go index 8800a85..3eaf5c0 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -17,8 +17,9 @@ type PeersResponse struct { // ParsePeersResponse parses JSON-RPC 'admin_peers' response from reader r. func ParsePeersResponse(r io.Reader) ([]*p2p.PeerInfo, error) { + lr := io.LimitReader(r, 10e6) // 1MB should be more than enough var resp PeersResponse - err := json.NewDecoder(r).Decode(&resp) + err := json.NewDecoder(lr).Decode(&resp) if err != nil { return nil, err } @@ -36,8 +37,9 @@ type NodeInfoResponse struct { // ParseNodeInfoResponse parses JSON-RPC 'admin_nodeInfo' response from reader r. func ParseNodeInfoResponse(r io.Reader) (*p2p.NodeInfo, error) { + lr := io.LimitReader(r, 10e6) // 1MB should be more than enough var resp NodeInfoResponse - err := json.NewDecoder(r).Decode(&resp) + err := json.NewDecoder(lr).Decode(&resp) if err != nil { return nil, err } diff --git a/main.go b/main.go index 8b0ab2c..9799c43 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "log" @@ -32,7 +33,9 @@ func main() { fetcher := NewFetcher(cluster, rpc) ws := NewWSServer(fetcher, *updateInterval) - ws.refresh() + + // do initial refresh + ws.refresh(context.Background()) log.Printf("Starting web server...") startWeb(ws, *port) @@ -40,13 +43,13 @@ func main() { // BuildGraph performs new cycle of updating data from // fetcher source and populating graph object. -func BuildGraph(fetcher *Fetcher) (*graph.Graph, error) { - nodes, err := fetcher.Nodes("", "eth.beta") +func BuildGraph(ctx context.Context, fetcher *Fetcher) (*graph.Graph, error) { + nodes, err := fetcher.Nodes(ctx, "", "eth.beta") if err != nil { return nil, fmt.Errorf("list of ips: %s", err) } - peers, links, err := fetcher.NodePeers(nodes) + peers, links, err := fetcher.NodePeers(ctx, nodes) if err != nil { return nil, fmt.Errorf("get node peers: %s", err) } diff --git a/main_test.go b/main_test.go index 264ebf3..084be3e 100644 --- a/main_test.go +++ b/main_test.go @@ -1,6 +1,9 @@ package main -import "testing" +import ( + "context" + "testing" +) func TestGraphCreate(t *testing.T) { cluster := NewMockConsulSource() @@ -8,7 +11,8 @@ func TestGraphCreate(t *testing.T) { f := NewFetcher(cluster, rpc) - nodes, err := f.Nodes("", "eth.beta") + ctx := context.Background() + nodes, err := f.Nodes(ctx, "", "eth.beta") if err != nil { t.Fatal(err) } @@ -19,7 +23,7 @@ func TestGraphCreate(t *testing.T) { t.Fatalf("Expected %d nodes, got %d", expected, got) } - peers, links, err := f.NodePeers(nodes) + peers, links, err := f.NodePeers(ctx, nodes) if err != nil { t.Fatal(err) } diff --git a/rpc.go b/rpc.go index be5714a..8f4781f 100644 --- a/rpc.go +++ b/rpc.go @@ -2,8 +2,11 @@ package main import ( "bytes" + "context" "fmt" + "io" "net/http" + "time" "github.com/ethereum/go-ethereum/p2p" ) @@ -11,31 +14,37 @@ import ( // RPCClient defines subset of client that // can call needed methods to geth's RPC server. type RPCClient interface { - AdminPeers(ip string) ([]*Node, error) - NodeInfo(ip string) (*p2p.NodeInfo, error) + AdminPeers(ctx context.Context, ip string) ([]*Node, error) + NodeInfo(ctx context.Context, ip string) (*p2p.NodeInfo, error) } // HTTPRPCClient implements RPCClient for // HTTP transport. type HTTPRPCClient struct { + client *http.Client } +const HTTPRPCTimeout = 3 * time.Second + // NewHTTPRPCClient creates new HTTP RPC client for eth JSON-RPC server. func NewHTTPRPCClient() *HTTPRPCClient { - return &HTTPRPCClient{} + return &HTTPRPCClient{ + client: &http.Client{ + Timeout: ConsulTimeout, + }, + } } // AdminPeers executes `admin_peers` RPC call and parses the response. // Satisfies RPCClient interface. -func (h *HTTPRPCClient) AdminPeers(ip string) ([]*Node, error) { - data := bytes.NewBufferString(`{"jsonrpc":"2.0","method":"admin_peers","params":[],"id":1}`) - resp, err := http.Post("http://"+ip, "application/json", data) +func (h *HTTPRPCClient) AdminPeers(ctx context.Context, ip string) ([]*Node, error) { + r, err := h.postMethod(ctx, ip, "admin_peers") if err != nil { - return nil, fmt.Errorf("POST RPC request: %s", err) + return nil, fmt.Errorf("rpc admin_peers: %s", err) } - defer resp.Body.Close() + defer r.Close() - nodes, err := ParsePeersResponse(resp.Body) + nodes, err := ParsePeersResponse(r) if err != nil { return nil, fmt.Errorf("get admin peers: %s", err) } @@ -45,18 +54,37 @@ func (h *HTTPRPCClient) AdminPeers(ip string) ([]*Node, error) { // NodeInfo executes `admin_nodeInfo` RPC call and parses the response. // Satisfies RPCClient interface. -func (h *HTTPRPCClient) NodeInfo(ip string) (*p2p.NodeInfo, error) { - data := bytes.NewBufferString(`{"jsonrpc":"2.0","method":"admin_nodeInfo","params":[],"id":1}`) - resp, err := http.Post("http://"+ip, "application/json", data) +func (h *HTTPRPCClient) NodeInfo(ctx context.Context, ip string) (*p2p.NodeInfo, error) { + r, err := h.postMethod(ctx, ip, "admin_nodeInfo") if err != nil { - return nil, fmt.Errorf("POST RPC request: %s", err) + return nil, fmt.Errorf("rpc admin_nodeInfo: %s", err) } - defer resp.Body.Close() + defer r.Close() - nodeInfo, err := ParseNodeInfoResponse(resp.Body) + nodeInfo, err := ParseNodeInfoResponse(r) if err != nil { return nil, fmt.Errorf("get node info: %s", err) } return nodeInfo, err } + +// postMethod performs POST RPC request for single method RPC calls without params. +// it reads body and return the whole answer. +func (h *HTTPRPCClient) postMethod(ctx context.Context, ip, method string) (io.ReadCloser, error) { + payload := fmt.Sprintf("{\"jsonrpc\":\"2.0\",\"method\":\"%s\",\"params\":[],\"id\":1}", method) + data := bytes.NewBufferString(payload) + url := "http://" + ip + req, err := http.NewRequest("POST", url, data) + if err != nil { + return nil, fmt.Errorf("request: %s", err) + } + req = req.WithContext(ctx) + + resp, err := h.client.Do(req) + if err != nil { + return nil, fmt.Errorf("POST RPC request: %s", err) + } + + return resp.Body, nil +} diff --git a/rpc_mock.go b/rpc_mock.go index 0a7b2b1..303ae5f 100644 --- a/rpc_mock.go +++ b/rpc_mock.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "fmt" "github.com/ethereum/go-ethereum/p2p" @@ -17,7 +18,7 @@ func NewMockRPCClient() *MockRPCClient { // AdminPeers simulates call to `admin_peers` RPC and parses the response. // Satisfies RPCClient interface. -func (h *MockRPCClient) AdminPeers(ip string) ([]*Node, error) { +func (h *MockRPCClient) AdminPeers(ctx context.Context, ip string) ([]*Node, error) { r := bytes.NewBufferString(mockPeers[ip]) nodes, err := ParsePeersResponse(r) if err != nil { @@ -29,7 +30,7 @@ func (h *MockRPCClient) AdminPeers(ip string) ([]*Node, error) { // AdminPeers simulates call to `admin_peers` RPC and parses the response. // Satisfies RPCClient interface. -func (h *MockRPCClient) NodeInfo(ip string) (*p2p.NodeInfo, error) { +func (h *MockRPCClient) NodeInfo(ctx context.Context, ip string) (*p2p.NodeInfo, error) { r := bytes.NewBufferString(mockInfo[ip]) nodeInfo, err := ParseNodeInfoResponse(r) if err != nil { diff --git a/ws.go b/ws.go index c849e11..241ca97 100644 --- a/ws.go +++ b/ws.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "log" "net/http" @@ -32,7 +33,11 @@ func NewWSServer(f *Fetcher, updateInterval time.Duration) *WSServer { go func() { t := time.NewTicker(updateInterval) for range t.C { - ws.refresh() + ctx, _ := context.WithTimeout(context.Background(), updateInterval) + if err := ws.refresh(ctx); err != nil { + // TODO: send error to ws + continue + } ws.stats.Update(ws.graph) ws.broadcastStats() } @@ -102,7 +107,7 @@ func (ws *WSServer) processRequest(c *websocket.Conn, mtype int, data []byte) { ws.updatePositions() ws.sendPositions(c) case CmdRefresh: - ws.refresh() + ws.refresh(context.TODO()) ws.broadcastStats() case CmdStats: ws.sendStats(c) @@ -123,9 +128,9 @@ func (ws *WSServer) sendMsg(c *websocket.Conn, msg *WSResponse) { } } -func (ws *WSServer) refresh() error { +func (ws *WSServer) refresh(ctx context.Context) error { log.Println("Getting peers from Status-cluster") - g, err := BuildGraph(ws.fetcher) + g, err := BuildGraph(ctx, ws.fetcher) if err != nil { log.Printf("[ERROR] Failed to fetch: %s", err) return err