From 7a8c56631d55673761e0d9a362b70b38bb7fe2e3 Mon Sep 17 00:00:00 2001 From: Ivan Danyliuk Date: Thu, 24 May 2018 18:32:39 +0300 Subject: [PATCH] Initial commit --- .gitignore | 2 ++ main.go | 45 +++++++++++++++++++++++++++++++++++++ msg.go | 41 +++++++++++++++++++++++++++++++++ receiver.go | 33 +++++++++++++++++++++++++++ sender.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++ stats.go | 47 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 233 insertions(+) create mode 100644 .gitignore create mode 100644 main.go create mode 100644 msg.go create mode 100644 receiver.go create mode 100644 sender.go create mode 100644 stats.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..996d2f6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +p2p-health-bot +p2p-health-bot_linux diff --git a/main.go b/main.go new file mode 100644 index 0000000..1b9afc2 --- /dev/null +++ b/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "flag" + "log" + + "time" + + "github.com/ethereum/go-ethereum/rpc" + "github.com/status-im/status-go-sdk" +) + +func main() { + var ( + name = flag.String("name", "randomstring", "Public chat name used for this health bots") + interval = flag.Duration("interval", 5*time.Second, "Interval for health check") + rpcHost = flag.String("rpc", "http://localhost:8545", "Host:port to statusd's RPC endpoint") + isSender = flag.Bool("send", true, "Select bot role, sender or responder") + ) + flag.Parse() + + rpcClient, err := rpc.Dial(*rpcHost) + if err != nil { + log.Fatal(err) + } + + client := sdk.New(rpcClient) + + a, err := client.SignupAndLogin("password") + if err != nil { + log.Fatal(err) + } + + ch, err := a.JoinPublicChannel(*name) + if err != nil { + log.Fatal(err) + } + + if *isSender { + startSender(ch, *interval) + } else { + startReceiver(ch) + select {} + } +} diff --git a/msg.go b/msg.go new file mode 100644 index 0000000..5a25913 --- /dev/null +++ b/msg.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "strconv" + "strings" +) + +// Msg represents health check message. +type Msg string + +// NewRequestMsg constructs new request health message. +func NewRequestMsg(counter int) Msg { + return Msg(fmt.Sprintf("Health Check Request|%d", counter)) +} + +// NewResponseMsg constructs new request health message. +func NewResponseMsg(counter int) Msg { + return Msg(fmt.Sprintf("Health Check Response|%d", counter)) +} + +// Counter returns counter value from health message. +func (m *Msg) Counter() (int, error) { + fields := strings.Split(string(*m), "|") + if len(fields) != 2 { + return 0, fmt.Errorf("wrong length: %s", *m) + } + c, err := strconv.ParseInt(fields[1], 10, 0) + if err != nil { + return 0, fmt.Errorf("wrong counter value: %s", err) + } + return int(c), nil +} + +func (m *Msg) IsRequest() bool { + return strings.Contains(string(*m), "Health Check Request") +} + +func (m *Msg) IsResponse() bool { + return strings.Contains(string(*m), "Health Check Response") +} diff --git a/receiver.go b/receiver.go new file mode 100644 index 0000000..e86f75f --- /dev/null +++ b/receiver.go @@ -0,0 +1,33 @@ +package main + +import ( + "log" + + "github.com/status-im/status-go-sdk" +) + +func startReceiver(ch *sdk.Channel) { + if _, err := ch.Subscribe(func(m *sdk.Msg) { + log.Println("[DEBUG] Got message: %v", m) + rawmsg, ok := m.Properties.(*sdk.PublishMsg) + if !ok { + log.Println("[ERROR] Wrong message props type received: %T", m.Properties) + return + } + msg := Msg(rawmsg.Text) + if msg.IsRequest() { + counter, err := msg.Counter() + if err != nil { + log.Println("[ERROR] Can't extract counter: %v", err) + } + go func(counter int) { + var body = NewResponseMsg(counter) + if err := ch.Publish(string(body)); err != nil { + log.Println("[ERROR] Can't send response: %v", err) + } + }(counter) + } + }); err != nil { + log.Fatal(err) + } +} diff --git a/sender.go b/sender.go new file mode 100644 index 0000000..2540481 --- /dev/null +++ b/sender.go @@ -0,0 +1,65 @@ +package main + +import ( + "log" + "time" + + "github.com/status-im/status-go-sdk" +) + +func startSender(ch *sdk.Channel, interval time.Duration) { + var ( + counter int + ticker = time.NewTicker(interval) + statsTicker = time.NewTicker(10 * time.Second) + pending = make(map[int]time.Time) + recvCh = make(chan Msg, 1000) + ) + + if _, err := ch.Subscribe(func(m *sdk.Msg) { + rawmsg, ok := m.Properties.(*sdk.PublishMsg) + if !ok { + log.Println("Wrong message props type received: %T", m.Properties) + return + } + msg := Msg(rawmsg.Text) + if msg.IsResponse() { + recvCh <- msg + } + }); err != nil { + log.Fatal(err) + } + + stats := NewStats() + + for { + select { + case <-ticker.C: + var body = NewRequestMsg(counter) + err := ch.Publish(string(body)) + if err != nil { + log.Printf("[ERROR] Failed to send health message: %s", err) + continue + } + pending[counter] = time.Now() + counter++ + stats.AddSent() + case msg := <-recvCh: + c, err := msg.Counter() + if err != nil { + log.Printf("[ERROR] Failed to parse health message: %s", err) + continue + } + start, ok := pending[c] + if !ok { + log.Printf("[ERROR] Received response for counter never sent (another sender bot running?): %s", err) + continue + } + delete(pending, c) + dur := time.Since(start) + stats.AddRountrip(dur) + case <-statsTicker.C: + stats.Print() + } + } +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..772bbfa --- /dev/null +++ b/stats.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +// Stats represents messages' statistics. +type Stats struct { + mx sync.RWMutex + sent int + received int + delays []time.Duration +} + +// NewStats returns new empty Stats object. +func NewStats() *Stats { + return &Stats{} +} + +// AddSent adds information about sent messages. +func (s *Stats) AddSent() { + s.mx.Lock() + defer s.mx.Unlock() + s.sent++ +} + +// AddRoundtrip adds information about successful message roundtrip. +func (s *Stats) AddRountrip(d time.Duration) { + s.mx.Lock() + defer s.mx.Unlock() + s.received++ + s.delays = append(s.delays, d) +} + +// Print dumps stats to the console. +func (s *Stats) Print() { + s.mx.RLock() + defer s.mx.RUnlock() + fmt.Println("-------------------------") + fmt.Println("Time:", time.Now()) + fmt.Println("Sent:", s.sent) + fmt.Println("Received:", s.received) + fmt.Println("Delays:", s.delays) + fmt.Println("-------------------------") +}