Initial commit

This commit is contained in:
Ivan Danyliuk 2018-06-19 15:06:42 +02:00
commit 9c930ee75b
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF
7 changed files with 306 additions and 0 deletions

92
cluster.go Normal file
View File

@ -0,0 +1,92 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
)
// ClusterSource represents knowledge source of
// cluster configuration.
type ClusterSource interface {
Nodes(dc, tag string) ([]*Node, error)
}
// ConsulSource implements Consul clients that fetches
// actual information about hosts in cluster.
type ConsulSource struct {
hostport string
}
// NewConsulSource creates new Consul source. It doesn't attempt
// to connect or verify if address is correct.
func NewConsulSource(hostport string) ClusterSource {
return &ConsulSource{
hostport: hostport,
}
}
// Node returns the list of nodes for the given datacentre 'dc' and tag.
// Satisfies ClusterSource interface.
func (c *ConsulSource) Nodes(dc, tag string) ([]*Node, error) {
url := fmt.Sprintf("http://%s/v1/catalog/service/statusd-rpc?tag=%s", c.hostport, tag)
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("http call failed: %s", err)
}
defer resp.Body.Close()
ips, err := ParseConsulResponse(resp.Body)
if err != nil {
return nil, fmt.Errorf("get nodes list: %s", err)
}
var ret []*Node
for _, ip := range ips {
// TODO: run concurrently
rpc := NewHTTPRPCClient(ip)
nodes, err := rpc.AdminPeers()
if err != nil {
log.Println("[ERROR] Failed to get peers from %s", ip)
continue
}
ret = append(ret, nodes...)
}
return ret, nil
return nil, errors.New("TBD")
}
// ConsulResponse describes response structure from Consul.
type ConsulResponse []*ConsulNodeInfo
// ConsulNodeInfo describes single node as reported by Consul.
type ConsulNodeInfo struct {
ServiceAddress string
ServicePort string
}
// ToIP converts ConsulNodeInfo fields into hostport representation of IP.
func (c *ConsulNodeInfo) ToIP() string {
return fmt.Sprintf("%s:%s", c.ServiceAddress, c.ServicePort)
}
// ParseConsulResponse parses JSON output from Consul response with
// the list of service and extracts IP addresses.
func ParseConsulResponse(r io.Reader) ([]string, error) {
var resp ConsulResponse
err := json.NewDecoder(r).Decode(&resp)
if err != nil {
return nil, fmt.Errorf("unmarshal Consul JSON response: %s", err)
}
ret := make([]string, len(resp))
for i := range resp {
ret[i] = resp[i].ToIP()
}
return ret, nil
}

43
cluster_mock.go Normal file

File diff suppressed because one or more lines are too long

27
jsonrpc.go Normal file
View File

@ -0,0 +1,27 @@
package main
import (
"encoding/json"
"io"
"github.com/ethereum/go-ethereum/p2p"
)
// JSONRPCResponse represents JSON-RPC response for `admin_peers` command from
// geth instance.
type JSONRPCResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
Result []*p2p.PeerInfo `json:"result"`
}
// ParseResponse parses JSON-RPC 'admin_peers' response from reader r.
func ParseResponse(r io.Reader) ([]*p2p.PeerInfo, error) {
var resp JSONRPCResponse
err := json.NewDecoder(r).Decode(&resp)
if err != nil {
return nil, err
}
return resp.Result, nil
}

28
main.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"flag"
"fmt"
"log"
"github.com/divan/graph-experiments/graph"
)
func main() {
var consulAddr = flag.String("consul", "localhost:8500", "Host:port for consul address to query")
flag.Parse()
cluster := NewConsulSource(*consulAddr)
nodes, err := cluster.Nodes("", "eth.beta")
if err != nil {
log.Fatalf("Getting list of nodes: %s", err)
}
g := graph.NewGraph()
for _, node := range nodes {
_ = node
//g.AddNode(node)
}
fmt.Printf("Graph has %d nodes and %d links\n", len(g.Nodes()), len(g.Links()))
}

24
node.go Normal file
View File

@ -0,0 +1,24 @@
package main
import "github.com/ethereum/go-ethereum/p2p"
// Node represents single node information.
type Node struct {
*p2p.PeerInfo
}
// NewNode creates new Node object for the given peerinfo.
func NewNode(peer *p2p.PeerInfo) *Node {
return &Node{
PeerInfo: peer,
}
}
// PeersToNodes converts PeerInfo to Nodes.
func PeersToNodes(peers []*p2p.PeerInfo) ([]*Node, error) {
ret := make([]*Node, len(peers))
for i := range peers {
ret[i] = NewNode(peers[i])
}
return ret, nil
}

44
rpc.go Normal file
View File

@ -0,0 +1,44 @@
package main
import (
"bytes"
"fmt"
"net/http"
)
// RPCClient defines subset of client that
// can call needed methods to geth's RPC server.
type RPCClient interface {
AdminPeers() ([]*Node, error)
}
// HTTPRPCClient implements RPCClient for
// HTTP transport.
type HTTPRPCClient struct {
IP string
}
// NewHTTPRPCClient creates new HTTP RPC client for eth JSON-RPC server.
func NewHTTPRPCClient(ip string) *HTTPRPCClient {
return &HTTPRPCClient{
IP: ip,
}
}
// AdminPeers executes `admin_peers` RPC call and parses the response.
// Satisfies RPCClient interface.
func (h *HTTPRPCClient) AdminPeers() ([]*Node, error) {
data := bytes.NewBufferString(`{"jsonrpc":"2.0","method":"admin_peers","params":[],"id":1}`)
resp, err := http.Post("https://"+h.IP, "application/json", data)
if err != nil {
return nil, fmt.Errorf("POST RPC request: %s", err)
}
defer resp.Body.Close()
nodes, err := ParseResponse(resp.Body)
if err != nil {
return nil, fmt.Errorf("get admin peers: %s", err)
}
return PeersToNodes(nodes)
}

48
rpc_mock.go Normal file

File diff suppressed because one or more lines are too long