This commit is contained in:
Ivan Danyliuk 2018-06-19 17:40:46 +02:00
parent 9c930ee75b
commit a5c6a531fc
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF
10 changed files with 353 additions and 123 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
clustervis

View File

@ -1,92 +1,77 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"sort"
)
// ClusterSource represents knowledge source of
// cluster configuration.
type ClusterSource interface {
Nodes(dc, tag string) ([]*Node, error)
IPs(dc, tag string) ([]string, error)
}
// ConsulSource implements Consul clients that fetches
// actual information about hosts in cluster.
type ConsulSource struct {
hostport string
// Fetched implements data fetching from multiple sources
// to get the needed peers data.
type Fetcher struct {
cluster ClusterSource
rpc RPCClient
}
// NewConsulSource creates new Consul source. It doesn't attempt
// to connect or verify if address is correct.
func NewConsulSource(hostport string) ClusterSource {
return &ConsulSource{
hostport: hostport,
// NewFetcher creates new Fetcher.
func NewFetcher(cluster ClusterSource, rpc RPCClient) *Fetcher {
return &Fetcher{
cluster: cluster,
rpc: rpc,
}
}
// Node returns the list of nodes for the given datacentre 'dc' and tag.
// Satisfies ClusterSource interface.
func (c *ConsulSource) Nodes(dc, tag string) ([]*Node, error) {
url := fmt.Sprintf("http://%s/v1/catalog/service/statusd-rpc?tag=%s", c.hostport, tag)
resp, err := http.Get(url)
// Nodes returns the list of nodes for the given datacentre 'dc' and tag.
func (f *Fetcher) Nodes(dc, tag string) ([]*ClusterNode, error) {
ips, err := f.cluster.IPs(dc, tag)
if err != nil {
return nil, fmt.Errorf("http call failed: %s", err)
return nil, err
}
defer resp.Body.Close()
ips, err := ParseConsulResponse(resp.Body)
if err != nil {
return nil, fmt.Errorf("get nodes list: %s", err)
var ret []*ClusterNode
for _, ip := range ips {
nodeInfo, err := f.rpc.NodeInfo(ip)
if err != nil {
return nil, err
}
node := NewClusterNode(ip, nodeInfo)
ret = append(ret, node)
}
return ret, nil
}
// NodePeers runs `admin_peers` command for each node.
func (f *Fetcher) NodePeers(nodes []*ClusterNode) ([]*Node, []*Link, error) {
m := make(map[string]*Node)
var links []*Link
for _, node := range nodes {
// TODO: run concurrently
peers, err := f.rpc.AdminPeers(node.IP)
if err != nil {
log.Printf("[ERROR] Failed to get peers from %s\n", node.IP)
continue
}
for _, peer := range peers {
m[peer.ID()] = peer
link := NewLink(node.ID, peer.ID())
links = append(links, link)
}
}
var ret []*Node
for _, ip := range ips {
// TODO: run concurrently
rpc := NewHTTPRPCClient(ip)
nodes, err := rpc.AdminPeers()
if err != nil {
log.Println("[ERROR] Failed to get peers from %s", ip)
continue
}
ret = append(ret, nodes...)
for _, node := range m {
ret = append(ret, node)
}
return ret, nil
return nil, errors.New("TBD")
}
// ConsulResponse describes response structure from Consul.
type ConsulResponse []*ConsulNodeInfo
// ConsulNodeInfo describes single node as reported by Consul.
type ConsulNodeInfo struct {
ServiceAddress string
ServicePort string
}
// ToIP converts ConsulNodeInfo fields into hostport representation of IP.
func (c *ConsulNodeInfo) ToIP() string {
return fmt.Sprintf("%s:%s", c.ServiceAddress, c.ServicePort)
}
// ParseConsulResponse parses JSON output from Consul response with
// the list of service and extracts IP addresses.
func ParseConsulResponse(r io.Reader) ([]string, error) {
var resp ConsulResponse
err := json.NewDecoder(r).Decode(&resp)
if err != nil {
return nil, fmt.Errorf("unmarshal Consul JSON response: %s", err)
}
ret := make([]string, len(resp))
for i := range resp {
ret[i] = resp[i].ToIP()
}
return ret, nil
sort.Slice(ret, func(i, j int) bool {
return ret[i].ID() < ret[j].ID()
})
return ret, links, nil
}

File diff suppressed because one or more lines are too long

67
consul.go Normal file
View File

@ -0,0 +1,67 @@
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
)
// Consul implements ClusterSource for Consul.
type Consul struct {
hostport string
}
// NewConsul creates new Consul source. It doesn't attempt
// to connect or verify if address is correct.
func NewConsul(hostport string) *Consul {
return &Consul{
hostport: hostport,
}
}
// IPs returns the list of IPs for the given datacenter and tag from Consul.
func (c *Consul) IPs(dc, tag string) ([]string, error) {
url := fmt.Sprintf("http://%s/v1/catalog/service/statusd-rpc?tag=%s", c.hostport, tag)
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("http call failed: %s", err)
}
defer resp.Body.Close()
ips, err := ParseConsulResponse(resp.Body)
if err != nil {
return nil, fmt.Errorf("get nodes list: %s", err)
}
return ips, err
}
// ConsulResponse describes response structure from Consul.
type ConsulResponse []*ConsulNodeInfo
// ConsulNodeInfo describes single node as reported by Consul.
type ConsulNodeInfo struct {
ServiceAddress string
ServicePort int
}
// ToIP converts ConsulNodeInfo fields into hostport representation of IP.
func (c *ConsulNodeInfo) ToIP() string {
return fmt.Sprintf("%s:%d", c.ServiceAddress, c.ServicePort)
}
// ParseConsulResponse parses JSON output from Consul response with
// the list of service and extracts IP addresses.
func ParseConsulResponse(r io.Reader) ([]string, error) {
var resp ConsulResponse
err := json.NewDecoder(r).Decode(&resp)
if err != nil {
return nil, fmt.Errorf("unmarshal Consul JSON response: %s", err)
}
ret := make([]string, len(resp))
for i := range resp {
ret[i] = resp[i].ToIP()
}
return ret, nil
}

View File

@ -7,17 +7,36 @@ import (
"github.com/ethereum/go-ethereum/p2p"
)
// JSONRPCResponse represents JSON-RPC response for `admin_peers` command from
// PeersResponse represents JSON-RPC response for `admin_peers` command from
// geth instance.
type JSONRPCResponse struct {
type PeersResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
Result []*p2p.PeerInfo `json:"result"`
}
// ParseResponse parses JSON-RPC 'admin_peers' response from reader r.
func ParseResponse(r io.Reader) ([]*p2p.PeerInfo, error) {
var resp JSONRPCResponse
// ParsePeersResponse parses JSON-RPC 'admin_peers' response from reader r.
func ParsePeersResponse(r io.Reader) ([]*p2p.PeerInfo, error) {
var resp PeersResponse
err := json.NewDecoder(r).Decode(&resp)
if err != nil {
return nil, err
}
return resp.Result, nil
}
// NodeInfoResponse represents JSON-RPC response for `admin_nodeInfo` command from
// geth instance.
type NodeInfoResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
Result *p2p.NodeInfo `json:"result"`
}
// ParseNodeInfoResponse parses JSON-RPC 'admin_nodeInfo' response from reader r.
func ParseNodeInfoResponse(r io.Reader) (*p2p.NodeInfo, error) {
var resp NodeInfoResponse
err := json.NewDecoder(r).Decode(&resp)
if err != nil {
return nil, err

63
main.go
View File

@ -6,23 +6,72 @@ import (
"log"
"github.com/divan/graph-experiments/graph"
"github.com/ethereum/go-ethereum/p2p"
)
func main() {
var consulAddr = flag.String("consul", "localhost:8500", "Host:port for consul address to query")
var (
consulAddr = flag.String("consul", "localhost:8500", "Host:port for consul address to query")
testMode = flag.Bool("test", false, "Test mode (use local test data)")
)
flag.Parse()
cluster := NewConsulSource(*consulAddr)
nodes, err := cluster.Nodes("", "eth.beta")
var rpc RPCClient
rpc = NewHTTPRPCClient()
if *testMode {
rpc = NewMockRPCClient()
}
var cluster ClusterSource
cluster = NewConsul(*consulAddr)
if *testMode {
cluster = NewMockConsulSource()
}
fetcher := NewFetcher(cluster, rpc)
nodes, err := fetcher.Nodes("", "eth.beta")
if err != nil {
log.Fatalf("Getting list of nodes: %s", err)
log.Fatalf("Getting list of ips: %s", err)
}
peers, links, err := fetcher.NodePeers(nodes)
if err != nil {
log.Fatalf("Getting list of ips: %s", err)
}
g := graph.NewGraph()
for _, node := range nodes {
_ = node
//g.AddNode(node)
for _, peer := range peers {
_ = peer
//AddPeer(g, "", node.PI)
}
for _, link := range links {
_ = link
//AddPeer(g, "", node.PI)
}
fmt.Printf("Graph has %d nodes and %d links\n", len(g.Nodes()), len(g.Links()))
}
func AddPeer(g *graph.Graph, fromID string, to *p2p.PeerInfo) {
toID := to.ID
addNode(g, fromID, false)
//addNode(g, toID, isClient(to.Name))
addNode(g, toID, false)
if g.LinkExistsByID(fromID, toID) {
return
}
if to.Network.Inbound == false {
g.AddLinkByIDs(fromID, toID)
} else {
g.AddLinkByIDs(toID, fromID)
}
}
func addNode(g *graph.Graph, id string, client bool) {
if _, err := g.NodeByID(id); err == nil {
// already exists
return
}
//node := NewNode(id)
//g.AddNode(node)
}

38
main_test.go Normal file
View File

@ -0,0 +1,38 @@
package main
import "testing"
func TestGraphCreate(t *testing.T) {
cluster := NewMockConsulSource()
rpc := NewMockRPCClient()
f := NewFetcher(cluster, rpc)
nodes, err := f.Nodes("", "eth.beta")
if err != nil {
t.Fatal(err)
}
got := len(nodes)
expected := 15
if got != expected {
t.Fatalf("Expected %d nodes, got %d", expected, got)
}
peers, links, err := f.NodePeers(nodes)
if err != nil {
t.Fatal(err)
}
got = len(peers)
expected = 49
if got != expected {
t.Fatalf("Expected %d nodes, got %d", expected, got)
}
got = len(links)
expected = 200
if got != expected {
t.Fatalf("Expected %d links, got %d", expected, got)
}
}

50
node.go
View File

@ -2,15 +2,39 @@ package main
import "github.com/ethereum/go-ethereum/p2p"
// Node represents single node information.
// Node represents single node information to be used in Graph.
type Node struct {
*p2p.PeerInfo
ID_ string `json:"ID"`
Group_ int `json:"Group"`
}
// NewNode creates new Node object for the given peerinfo.
func NewNode(peer *p2p.PeerInfo) *Node {
return &Node{
PeerInfo: peer,
ID_: peer.ID,
Group_: clientGroup(peer.Name),
}
}
// clientGroup returns group id based in server type.
func clientGroup(name string) int {
// TODO: implement
return 1
}
// ClusterNode represents single cluster node information.
type ClusterNode struct {
IP string
ID string
Type string // name field in JSON (statusd or statusIM)
}
// NewClusterNode creates new Node object for the given peerinfo.
func NewClusterNode(ip string, peer *p2p.NodeInfo) *ClusterNode {
return &ClusterNode{
IP: ip,
ID: peer.ID,
Type: peer.Name,
}
}
@ -22,3 +46,23 @@ func PeersToNodes(peers []*p2p.PeerInfo) ([]*Node, error) {
}
return ret, nil
}
// ID returns ID of the node. Satisfies graph.Node interface.
func (n *Node) ID() string {
return n.ID_
}
// Group returns group of the node. Satisfies graph.Node interface.
func (n *Node) Group() int {
return n.Group_
}
// Link represents link between two nodes.
type Link struct {
FromID, ToID string
}
// NewLinks creates link for the given IDs.
func NewLink(fromID, toID string) *Link {
return &Link{fromID, toID}
}

36
rpc.go
View File

@ -4,41 +4,59 @@ import (
"bytes"
"fmt"
"net/http"
"github.com/ethereum/go-ethereum/p2p"
)
// RPCClient defines subset of client that
// can call needed methods to geth's RPC server.
type RPCClient interface {
AdminPeers() ([]*Node, error)
AdminPeers(ip string) ([]*Node, error)
NodeInfo(ip string) (*p2p.NodeInfo, error)
}
// HTTPRPCClient implements RPCClient for
// HTTP transport.
type HTTPRPCClient struct {
IP string
}
// NewHTTPRPCClient creates new HTTP RPC client for eth JSON-RPC server.
func NewHTTPRPCClient(ip string) *HTTPRPCClient {
return &HTTPRPCClient{
IP: ip,
}
func NewHTTPRPCClient() *HTTPRPCClient {
return &HTTPRPCClient{}
}
// AdminPeers executes `admin_peers` RPC call and parses the response.
// Satisfies RPCClient interface.
func (h *HTTPRPCClient) AdminPeers() ([]*Node, error) {
func (h *HTTPRPCClient) AdminPeers(ip string) ([]*Node, error) {
data := bytes.NewBufferString(`{"jsonrpc":"2.0","method":"admin_peers","params":[],"id":1}`)
resp, err := http.Post("https://"+h.IP, "application/json", data)
resp, err := http.Post("https://"+ip, "application/json", data)
if err != nil {
return nil, fmt.Errorf("POST RPC request: %s", err)
}
defer resp.Body.Close()
nodes, err := ParseResponse(resp.Body)
nodes, err := ParsePeersResponse(resp.Body)
if err != nil {
return nil, fmt.Errorf("get admin peers: %s", err)
}
return PeersToNodes(nodes)
}
// NodeInfo executes `admin_nodeInfo` RPC call and parses the response.
// Satisfies RPCClient interface.
func (h *HTTPRPCClient) NodeInfo(ip string) (*p2p.NodeInfo, error) {
data := bytes.NewBufferString(`{"jsonrpc":"2.0","method":"admin_nodeInfo","params":[],"id":1}`)
resp, err := http.Post("https://"+ip, "application/json", data)
if err != nil {
return nil, fmt.Errorf("POST RPC request: %s", err)
}
defer resp.Body.Close()
nodeInfo, err := ParseNodeInfoResponse(resp.Body)
if err != nil {
return nil, fmt.Errorf("get node info: %s", err)
}
return nodeInfo, err
}

File diff suppressed because one or more lines are too long