Add timeouts and context support

This commit is contained in:
Ivan Danyliuk 2018-06-20 16:45:05 +02:00
parent ede8dcb309
commit 87fa0cc5e5
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF
10 changed files with 100 additions and 40 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
clustervis
clustervis-linux-amd64
web/node_modules
.vscode

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"log"
"sort"
)
@ -8,7 +9,7 @@ import (
// ClusterSource represents knowledge source of
// cluster configuration.
type ClusterSource interface {
IPs(dc, tag string) ([]string, error)
IPs(ctx context.Context, dc, tag string) ([]string, error)
}
// Fetched implements data fetching from multiple sources
@ -27,15 +28,15 @@ func NewFetcher(cluster ClusterSource, rpc RPCClient) *Fetcher {
}
// Nodes returns the list of nodes for the given datacentre 'dc' and tag.
func (f *Fetcher) Nodes(dc, tag string) ([]*ClusterNode, error) {
ips, err := f.cluster.IPs(dc, tag)
func (f *Fetcher) Nodes(ctx context.Context, dc, tag string) ([]*ClusterNode, error) {
ips, err := f.cluster.IPs(ctx, dc, tag)
if err != nil {
return nil, err
}
var ret []*ClusterNode
for _, ip := range ips {
nodeInfo, err := f.rpc.NodeInfo(ip)
nodeInfo, err := f.rpc.NodeInfo(ctx, ip)
if err != nil {
return nil, err
}
@ -47,12 +48,12 @@ func (f *Fetcher) Nodes(dc, tag string) ([]*ClusterNode, error) {
}
// NodePeers runs `admin_peers` command for each node.
func (f *Fetcher) NodePeers(nodes []*ClusterNode) ([]*Node, []*Link, error) {
func (f *Fetcher) NodePeers(ctx context.Context, nodes []*ClusterNode) ([]*Node, []*Link, error) {
m := make(map[string]*Node)
var links []*Link
for _, node := range nodes {
// TODO: run concurrently
peers, err := f.rpc.AdminPeers(node.IP)
peers, err := f.rpc.AdminPeers(ctx, node.IP)
if err != nil {
log.Printf("[ERROR] Failed to get peers from %s\n", node.IP)
continue

View File

@ -2,6 +2,7 @@ package main
import (
"bytes"
"context"
)
// MockConsulSource implements ClusterSource for local
@ -16,7 +17,7 @@ func NewMockConsulSource() ClusterSource {
// Node returns the list of mock nodes for the given datacentre 'dc' and tag.
// Satisfies ClusterSource interface.
func (c *MockConsulSource) IPs(dc, tag string) ([]string, error) {
func (c *MockConsulSource) IPs(ctx context.Context, dc, tag string) ([]string, error) {
r := bytes.NewBufferString(mockClusterIPsJSON)
return ParseConsulResponse(r)
}

View File

@ -1,31 +1,45 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// Consul implements ClusterSource for Consul.
type Consul struct {
hostport string
client *http.Client
}
const ConsulTimeout = 5 * time.Second // it's localhost, should be in ms actually
// NewConsul creates new Consul source. It doesn't attempt
// to connect or verify if address is correct.
func NewConsul(hostport string) *Consul {
return &Consul{
hostport: hostport,
client: &http.Client{
Timeout: ConsulTimeout,
},
}
}
// IPs returns the list of IPs for the given datacenter and tag from Consul.
func (c *Consul) IPs(dc, tag string) ([]string, error) {
func (c *Consul) IPs(ctx context.Context, dc, tag string) ([]string, error) {
url := fmt.Sprintf("http://%s/v1/catalog/service/statusd-rpc?tag=%s", c.hostport, tag)
resp, err := http.Get(url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("http call failed: %s", err)
return nil, fmt.Errorf("request: %s", err)
}
req = req.WithContext(ctx)
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("http call: %s", err)
}
defer resp.Body.Close()

View File

@ -17,8 +17,9 @@ type PeersResponse struct {
// ParsePeersResponse parses JSON-RPC 'admin_peers' response from reader r.
func ParsePeersResponse(r io.Reader) ([]*p2p.PeerInfo, error) {
lr := io.LimitReader(r, 10e6) // 1MB should be more than enough
var resp PeersResponse
err := json.NewDecoder(r).Decode(&resp)
err := json.NewDecoder(lr).Decode(&resp)
if err != nil {
return nil, err
}
@ -36,8 +37,9 @@ type NodeInfoResponse struct {
// ParseNodeInfoResponse parses JSON-RPC 'admin_nodeInfo' response from reader r.
func ParseNodeInfoResponse(r io.Reader) (*p2p.NodeInfo, error) {
lr := io.LimitReader(r, 10e6) // 1MB should be more than enough
var resp NodeInfoResponse
err := json.NewDecoder(r).Decode(&resp)
err := json.NewDecoder(lr).Decode(&resp)
if err != nil {
return nil, err
}

11
main.go
View File

@ -1,6 +1,7 @@
package main
import (
"context"
"flag"
"fmt"
"log"
@ -32,7 +33,9 @@ func main() {
fetcher := NewFetcher(cluster, rpc)
ws := NewWSServer(fetcher, *updateInterval)
ws.refresh()
// do initial refresh
ws.refresh(context.Background())
log.Printf("Starting web server...")
startWeb(ws, *port)
@ -40,13 +43,13 @@ func main() {
// BuildGraph performs new cycle of updating data from
// fetcher source and populating graph object.
func BuildGraph(fetcher *Fetcher) (*graph.Graph, error) {
nodes, err := fetcher.Nodes("", "eth.beta")
func BuildGraph(ctx context.Context, fetcher *Fetcher) (*graph.Graph, error) {
nodes, err := fetcher.Nodes(ctx, "", "eth.beta")
if err != nil {
return nil, fmt.Errorf("list of ips: %s", err)
}
peers, links, err := fetcher.NodePeers(nodes)
peers, links, err := fetcher.NodePeers(ctx, nodes)
if err != nil {
return nil, fmt.Errorf("get node peers: %s", err)
}

View File

@ -1,6 +1,9 @@
package main
import "testing"
import (
"context"
"testing"
)
func TestGraphCreate(t *testing.T) {
cluster := NewMockConsulSource()
@ -8,7 +11,8 @@ func TestGraphCreate(t *testing.T) {
f := NewFetcher(cluster, rpc)
nodes, err := f.Nodes("", "eth.beta")
ctx := context.Background()
nodes, err := f.Nodes(ctx, "", "eth.beta")
if err != nil {
t.Fatal(err)
}
@ -19,7 +23,7 @@ func TestGraphCreate(t *testing.T) {
t.Fatalf("Expected %d nodes, got %d", expected, got)
}
peers, links, err := f.NodePeers(nodes)
peers, links, err := f.NodePeers(ctx, nodes)
if err != nil {
t.Fatal(err)
}

58
rpc.go
View File

@ -2,8 +2,11 @@ package main
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/ethereum/go-ethereum/p2p"
)
@ -11,31 +14,37 @@ import (
// RPCClient defines subset of client that
// can call needed methods to geth's RPC server.
type RPCClient interface {
AdminPeers(ip string) ([]*Node, error)
NodeInfo(ip string) (*p2p.NodeInfo, error)
AdminPeers(ctx context.Context, ip string) ([]*Node, error)
NodeInfo(ctx context.Context, ip string) (*p2p.NodeInfo, error)
}
// HTTPRPCClient implements RPCClient for
// HTTP transport.
type HTTPRPCClient struct {
client *http.Client
}
const HTTPRPCTimeout = 3 * time.Second
// NewHTTPRPCClient creates new HTTP RPC client for eth JSON-RPC server.
func NewHTTPRPCClient() *HTTPRPCClient {
return &HTTPRPCClient{}
return &HTTPRPCClient{
client: &http.Client{
Timeout: ConsulTimeout,
},
}
}
// AdminPeers executes `admin_peers` RPC call and parses the response.
// Satisfies RPCClient interface.
func (h *HTTPRPCClient) AdminPeers(ip string) ([]*Node, error) {
data := bytes.NewBufferString(`{"jsonrpc":"2.0","method":"admin_peers","params":[],"id":1}`)
resp, err := http.Post("http://"+ip, "application/json", data)
func (h *HTTPRPCClient) AdminPeers(ctx context.Context, ip string) ([]*Node, error) {
r, err := h.postMethod(ctx, ip, "admin_peers")
if err != nil {
return nil, fmt.Errorf("POST RPC request: %s", err)
return nil, fmt.Errorf("rpc admin_peers: %s", err)
}
defer resp.Body.Close()
defer r.Close()
nodes, err := ParsePeersResponse(resp.Body)
nodes, err := ParsePeersResponse(r)
if err != nil {
return nil, fmt.Errorf("get admin peers: %s", err)
}
@ -45,18 +54,37 @@ func (h *HTTPRPCClient) AdminPeers(ip string) ([]*Node, error) {
// NodeInfo executes `admin_nodeInfo` RPC call and parses the response.
// Satisfies RPCClient interface.
func (h *HTTPRPCClient) NodeInfo(ip string) (*p2p.NodeInfo, error) {
data := bytes.NewBufferString(`{"jsonrpc":"2.0","method":"admin_nodeInfo","params":[],"id":1}`)
resp, err := http.Post("http://"+ip, "application/json", data)
func (h *HTTPRPCClient) NodeInfo(ctx context.Context, ip string) (*p2p.NodeInfo, error) {
r, err := h.postMethod(ctx, ip, "admin_nodeInfo")
if err != nil {
return nil, fmt.Errorf("POST RPC request: %s", err)
return nil, fmt.Errorf("rpc admin_nodeInfo: %s", err)
}
defer resp.Body.Close()
defer r.Close()
nodeInfo, err := ParseNodeInfoResponse(resp.Body)
nodeInfo, err := ParseNodeInfoResponse(r)
if err != nil {
return nil, fmt.Errorf("get node info: %s", err)
}
return nodeInfo, err
}
// postMethod performs POST RPC request for single method RPC calls without params.
// it reads body and return the whole answer.
func (h *HTTPRPCClient) postMethod(ctx context.Context, ip, method string) (io.ReadCloser, error) {
payload := fmt.Sprintf("{\"jsonrpc\":\"2.0\",\"method\":\"%s\",\"params\":[],\"id\":1}", method)
data := bytes.NewBufferString(payload)
url := "http://" + ip
req, err := http.NewRequest("POST", url, data)
if err != nil {
return nil, fmt.Errorf("request: %s", err)
}
req = req.WithContext(ctx)
resp, err := h.client.Do(req)
if err != nil {
return nil, fmt.Errorf("POST RPC request: %s", err)
}
return resp.Body, nil
}

View File

@ -2,6 +2,7 @@ package main
import (
"bytes"
"context"
"fmt"
"github.com/ethereum/go-ethereum/p2p"
@ -17,7 +18,7 @@ func NewMockRPCClient() *MockRPCClient {
// AdminPeers simulates call to `admin_peers` RPC and parses the response.
// Satisfies RPCClient interface.
func (h *MockRPCClient) AdminPeers(ip string) ([]*Node, error) {
func (h *MockRPCClient) AdminPeers(ctx context.Context, ip string) ([]*Node, error) {
r := bytes.NewBufferString(mockPeers[ip])
nodes, err := ParsePeersResponse(r)
if err != nil {
@ -29,7 +30,7 @@ func (h *MockRPCClient) AdminPeers(ip string) ([]*Node, error) {
// AdminPeers simulates call to `admin_peers` RPC and parses the response.
// Satisfies RPCClient interface.
func (h *MockRPCClient) NodeInfo(ip string) (*p2p.NodeInfo, error) {
func (h *MockRPCClient) NodeInfo(ctx context.Context, ip string) (*p2p.NodeInfo, error) {
r := bytes.NewBufferString(mockInfo[ip])
nodeInfo, err := ParseNodeInfoResponse(r)
if err != nil {

13
ws.go
View File

@ -1,6 +1,7 @@
package main
import (
"context"
"encoding/json"
"log"
"net/http"
@ -32,7 +33,11 @@ func NewWSServer(f *Fetcher, updateInterval time.Duration) *WSServer {
go func() {
t := time.NewTicker(updateInterval)
for range t.C {
ws.refresh()
ctx, _ := context.WithTimeout(context.Background(), updateInterval)
if err := ws.refresh(ctx); err != nil {
// TODO: send error to ws
continue
}
ws.stats.Update(ws.graph)
ws.broadcastStats()
}
@ -102,7 +107,7 @@ func (ws *WSServer) processRequest(c *websocket.Conn, mtype int, data []byte) {
ws.updatePositions()
ws.sendPositions(c)
case CmdRefresh:
ws.refresh()
ws.refresh(context.TODO())
ws.broadcastStats()
case CmdStats:
ws.sendStats(c)
@ -123,9 +128,9 @@ func (ws *WSServer) sendMsg(c *websocket.Conn, msg *WSResponse) {
}
}
func (ws *WSServer) refresh() error {
func (ws *WSServer) refresh(ctx context.Context) error {
log.Println("Getting peers from Status-cluster")
g, err := BuildGraph(ws.fetcher)
g, err := BuildGraph(ctx, ws.fetcher)
if err != nil {
log.Printf("[ERROR] Failed to fetch: %s", err)
return err