Add web and websockets

This commit is contained in:
Ivan Danyliuk 2018-06-19 19:01:52 +02:00
parent de71c60d04
commit 9cd1ebae40
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF
9 changed files with 25043 additions and 9 deletions

38
main.go
View File

@ -4,14 +4,17 @@ import (
"flag"
"fmt"
"log"
"time"
"github.com/divan/graph-experiments/graph"
)
func main() {
var (
consulAddr = flag.String("consul", "localhost:8500", "Host:port for consul address to query")
testMode = flag.Bool("test", false, "Test mode (use local test data)")
consulAddr = flag.String("consul", "localhost:8500", "Host:port for consul address to query")
testMode = flag.Bool("test", false, "Test mode (use local test data)")
port = flag.String("port", ":20002", "Port to bind server to")
updateInterval = flag.Duration("i", 10*time.Second, "Update interval")
)
flag.Parse()
@ -27,29 +30,46 @@ func main() {
}
fetcher := NewFetcher(cluster, rpc)
ws := NewWSServer(fetcher, *updateInterval)
ws.refresh()
log.Printf("Starting web server...")
startWeb(ws, *port)
}
// BuildGraph performs new cycle of updating data from
// fetcher source and populating graph object.
func BuildGraph(fetcher *Fetcher) (*graph.Graph, error) {
nodes, err := fetcher.Nodes("", "eth.beta")
if err != nil {
log.Fatalf("Getting list of ips: %s", err)
return nil, fmt.Errorf("list of ips: %s", err)
}
peers, links, err := fetcher.NodePeers(nodes)
if err != nil {
log.Fatalf("Getting list of ips: %s", err)
return nil, fmt.Errorf("get node peers: %s", err)
}
g := graph.NewGraph()
for _, peer := range peers {
if _, err := g.NodeByID(peer.ID()); err == nil {
// already exists
continue
}
g.AddNode(peer)
AddNode(g, peer)
}
for _, link := range links {
AddLink(g, link.FromID, link.ToID)
}
fmt.Printf("Graph has %d nodes and %d links\n", len(g.Nodes()), len(g.Links()))
return g, nil
}
// AddNode is a wrapper around adding node to graph with proper checking for duplicates.
func AddNode(g *graph.Graph, node *Node) {
if _, err := g.NodeByID(node.ID()); err == nil {
// already exists
return
}
g.AddNode(node)
}
// AddLink is a wrapper around adding link to graph with proper checking for duplicates.

View File

@ -16,6 +16,12 @@ func NewNode(peer *p2p.PeerInfo) *Node {
}
}
// IsClient returns true if node is identified as a mobile client, rather then server.
func (n *Node) IsClient() bool {
// TODO: implement
return false
}
// clientGroup returns group id based in server type.
func clientGroup(name string) int {
// TODO: implement

120
stats.go Normal file
View File

@ -0,0 +1,120 @@
package main
import (
"sync"
"time"
"github.com/divan/graph-experiments/graph"
)
type Stats struct {
mu sync.RWMutex
// current data
Clients []string
ClientsNum int
Servers []string
ServersNum int
LinksNum int
Nodes []*NodeStats
LastUpdate time.Time
// history data
Timestamps []string
ServersHist []int
ClientsHist []int
}
type NodeStats struct {
ID string
Peers []string
Clients []string
PeersNum int
ClientsNum int
IsClient bool
}
func (s *Stats) Stats() *Stats {
s.mu.RLock()
defer s.mu.RUnlock()
return s
}
func (s *Stats) Update(g *graph.Graph) {
var servers, clients []string
for _, node := range g.Nodes() {
n := node.(*Node)
if n.IsClient() {
clients = append(clients, n.ID())
} else {
servers = append(servers, n.ID())
}
}
findLinks := func(idx int) []*graph.Link {
var ret []*graph.Link
for _, link := range g.Links() {
if link.From == idx || link.To == idx {
ret = append(ret, link)
}
}
return ret
}
ns := make([]*NodeStats, 0, len(g.Nodes()))
for i, node := range g.Nodes() {
n := node.(*Node)
var peers, clients int
var peersS, clientsS []string
links := findLinks(i)
for _, link := range links {
var peer graph.Node
if i == link.From {
peer = g.Nodes()[link.To]
} else {
peer = g.Nodes()[link.From]
}
p := peer.(*Node)
if p.IsClient() {
clients++
clientsS = append(clientsS, p.ID())
} else {
peers++
peersS = append(peersS, p.ID())
}
}
nodeStat := &NodeStats{
ID: n.ID(),
IsClient: n.IsClient(),
Peers: peersS,
Clients: clientsS,
PeersNum: peers,
ClientsNum: clients,
}
ns = append(ns, nodeStat)
}
s.mu.Lock()
defer s.mu.Unlock()
s.Clients = clients
s.ClientsNum = len(clients)
s.Servers = servers
s.ServersNum = len(servers)
s.LinksNum = len(g.Links())
s.Nodes = ns
now := time.Now()
s.LastUpdate = now
// update historic data
JavascriptISOString := "2006-01-02T15:04:05.999Z07:00"
jsNow := now.UTC().Format(JavascriptISOString)
s.Timestamps = append(s.Timestamps, jsNow)
s.ServersHist = append(s.ServersHist, s.ServersNum)
s.ClientsHist = append(s.ClientsHist, s.ClientsNum)
}

21
web.go Normal file
View File

@ -0,0 +1,21 @@
//go:generate browserify web/index.js web/js/ws.js -o web/bundle.js
package main
import (
"log"
"net/http"
)
func startWeb(ws *WSServer, bind string) {
fs := http.FileServer(http.Dir("web"))
http.Handle("/", noCacheMiddleware(fs))
http.HandleFunc("/ws", ws.Handle)
log.Fatal(http.ListenAndServe(bind, nil))
}
func noCacheMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Cache-Control", "max-age=0, no-cache")
h.ServeHTTP(w, r)
})
}

24622
web/bundle.js Normal file

File diff suppressed because one or more lines are too long

147
ws.go Normal file
View File

@ -0,0 +1,147 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/divan/graph-experiments/graph"
"github.com/divan/graph-experiments/layout"
"github.com/gorilla/websocket"
)
type WSServer struct {
upgrader websocket.Upgrader
hub []*websocket.Conn
Positions []*position
layout layout.Layout
graph *graph.Graph
stats *Stats
fetcher *Fetcher
}
func NewWSServer(f *Fetcher, updateInterval time.Duration) *WSServer {
ws := &WSServer{
upgrader: websocket.Upgrader{},
stats: &Stats{},
fetcher: f,
}
go func() {
t := time.NewTicker(updateInterval)
for range t.C {
ws.refresh()
ws.stats.Update(ws.graph)
ws.broadcastStats()
}
}()
return ws
}
type WSResponse struct {
Type MsgType `json:"type"`
Positions []*position `json:"positions,omitempty"`
Graph json.RawMessage `json:"graph,omitempty"`
Stats Stats `json:"stats,omitempty"`
}
type WSRequest struct {
Cmd WSCommand `json:"cmd"`
}
type MsgType string
type WSCommand string
// WebSocket response types
const (
RespPositions MsgType = "positions"
RespGraph MsgType = "graph"
RespStats MsgType = "stats"
)
// WebSocket commands
const (
CmdInit WSCommand = "init"
CmdRefresh WSCommand = "refresh"
CmdStats WSCommand = "stats"
)
func (ws *WSServer) Handle(w http.ResponseWriter, r *http.Request) {
c, err := ws.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
defer c.Close()
ws.hub = append(ws.hub, c)
for {
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", mt, err)
break
}
ws.processRequest(c, mt, message)
}
}
func (ws *WSServer) processRequest(c *websocket.Conn, mtype int, data []byte) {
var cmd WSRequest
err := json.Unmarshal(data, &cmd)
if err != nil {
log.Fatal("unmarshal command", err)
return
}
switch cmd.Cmd {
case CmdInit:
ws.sendGraphData(c)
ws.updatePositions()
ws.sendPositions(c)
case CmdRefresh:
ws.refresh()
ws.broadcastStats()
case CmdStats:
ws.sendStats(c)
}
}
func (ws *WSServer) sendMsg(c *websocket.Conn, msg *WSResponse) {
data, err := json.Marshal(msg)
if err != nil {
log.Println("write:", err)
return
}
err = c.WriteMessage(1, data)
if err != nil {
log.Println("write:", err)
return
}
}
func (ws *WSServer) refresh() {
log.Println("Getting peers from Status-cluster via SSH")
g, err := BuildGraph(ws.fetcher)
if err != nil {
log.Println("[ERROR] Failed to fetch: %s", err)
return
}
log.Printf("Loaded graph: %d nodes, %d links\n", len(g.Nodes()), len(g.Links()))
fmt.Println(g.Links()[0])
log.Printf("Initializing layout...")
repelling := layout.NewGravityForce(-50.0, layout.BarneHutMethod)
springs := layout.NewSpringForce(0.01, 5.0, layout.ForEachLink)
drag := layout.NewDragForce(0.4, layout.ForEachNode)
l := layout.New(g, repelling, springs, drag)
l.CalculateN(20)
ws.updateGraph(g, l)
ws.updatePositions()
}

39
ws_graph.go Normal file
View File

@ -0,0 +1,39 @@
package main
import (
"bytes"
"encoding/json"
"log"
"github.com/divan/graph-experiments/export"
"github.com/divan/graph-experiments/graph"
"github.com/divan/graph-experiments/layout"
"github.com/gorilla/websocket"
)
func (ws *WSServer) sendGraphData(c *websocket.Conn) {
var buf bytes.Buffer
err := export.NewJSON(&buf, false).ExportGraph(ws.graph)
if err != nil {
log.Fatal("Can't marshal graph to JSON")
}
msg := &WSResponse{
Type: RespGraph,
Graph: json.RawMessage(buf.Bytes()),
}
ws.sendMsg(c, msg)
}
func (ws *WSServer) updateGraph(g *graph.Graph, l layout.Layout) {
ws.graph = g
ws.layout = l
ws.broadcastGraphData()
}
func (ws *WSServer) broadcastGraphData() {
for i := 0; i < len(ws.hub); i++ {
ws.sendGraphData(ws.hub[i])
}
}

41
ws_positions.go Normal file
View File

@ -0,0 +1,41 @@
package main
import "github.com/gorilla/websocket"
type position struct {
X int `json:"x"`
Y int `json:"y"`
Z int `json:"z"`
}
func (ws *WSServer) sendPositions(c *websocket.Conn) {
msg := &WSResponse{
Type: RespPositions,
Positions: ws.Positions,
}
ws.sendMsg(c, msg)
}
func (ws *WSServer) updatePositions() {
// positions
nodes := ws.layout.Nodes()
positions := []*position{}
for i := 0; i < len(nodes); i++ {
pos := &position{
X: nodes[i].X,
Y: nodes[i].Y,
Z: nodes[i].Z,
}
positions = append(positions, pos)
}
ws.Positions = positions
ws.broadcastPositions()
}
func (ws *WSServer) broadcastPositions() {
for i := 0; i < len(ws.hub); i++ {
ws.sendPositions(ws.hub[i])
}
}

18
ws_stats.go Normal file
View File

@ -0,0 +1,18 @@
package main
import "github.com/gorilla/websocket"
func (ws *WSServer) sendStats(c *websocket.Conn) {
msg := &WSResponse{
Type: RespStats,
Stats: *ws.stats.Stats(),
}
ws.sendMsg(c, msg)
}
func (ws *WSServer) broadcastStats() {
for i := 0; i < len(ws.hub); i++ {
ws.sendStats(ws.hub[i])
}
}