Initial commit

This commit is contained in:
Ivan Danyliuk 2018-05-24 18:32:39 +03:00
commit 7a8c56631d
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF
6 changed files with 233 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
p2p-health-bot
p2p-health-bot_linux

45
main.go Normal file
View File

@ -0,0 +1,45 @@
package main
import (
"flag"
"log"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go-sdk"
)
func main() {
var (
name = flag.String("name", "randomstring", "Public chat name used for this health bots")
interval = flag.Duration("interval", 5*time.Second, "Interval for health check")
rpcHost = flag.String("rpc", "http://localhost:8545", "Host:port to statusd's RPC endpoint")
isSender = flag.Bool("send", true, "Select bot role, sender or responder")
)
flag.Parse()
rpcClient, err := rpc.Dial(*rpcHost)
if err != nil {
log.Fatal(err)
}
client := sdk.New(rpcClient)
a, err := client.SignupAndLogin("password")
if err != nil {
log.Fatal(err)
}
ch, err := a.JoinPublicChannel(*name)
if err != nil {
log.Fatal(err)
}
if *isSender {
startSender(ch, *interval)
} else {
startReceiver(ch)
select {}
}
}

41
msg.go Normal file
View File

@ -0,0 +1,41 @@
package main
import (
"fmt"
"strconv"
"strings"
)
// Msg represents health check message.
type Msg string
// NewRequestMsg constructs new request health message.
func NewRequestMsg(counter int) Msg {
return Msg(fmt.Sprintf("Health Check Request|%d", counter))
}
// NewResponseMsg constructs new request health message.
func NewResponseMsg(counter int) Msg {
return Msg(fmt.Sprintf("Health Check Response|%d", counter))
}
// Counter returns counter value from health message.
func (m *Msg) Counter() (int, error) {
fields := strings.Split(string(*m), "|")
if len(fields) != 2 {
return 0, fmt.Errorf("wrong length: %s", *m)
}
c, err := strconv.ParseInt(fields[1], 10, 0)
if err != nil {
return 0, fmt.Errorf("wrong counter value: %s", err)
}
return int(c), nil
}
func (m *Msg) IsRequest() bool {
return strings.Contains(string(*m), "Health Check Request")
}
func (m *Msg) IsResponse() bool {
return strings.Contains(string(*m), "Health Check Response")
}

33
receiver.go Normal file
View File

@ -0,0 +1,33 @@
package main
import (
"log"
"github.com/status-im/status-go-sdk"
)
func startReceiver(ch *sdk.Channel) {
if _, err := ch.Subscribe(func(m *sdk.Msg) {
log.Println("[DEBUG] Got message: %v", m)
rawmsg, ok := m.Properties.(*sdk.PublishMsg)
if !ok {
log.Println("[ERROR] Wrong message props type received: %T", m.Properties)
return
}
msg := Msg(rawmsg.Text)
if msg.IsRequest() {
counter, err := msg.Counter()
if err != nil {
log.Println("[ERROR] Can't extract counter: %v", err)
}
go func(counter int) {
var body = NewResponseMsg(counter)
if err := ch.Publish(string(body)); err != nil {
log.Println("[ERROR] Can't send response: %v", err)
}
}(counter)
}
}); err != nil {
log.Fatal(err)
}
}

65
sender.go Normal file
View File

@ -0,0 +1,65 @@
package main
import (
"log"
"time"
"github.com/status-im/status-go-sdk"
)
func startSender(ch *sdk.Channel, interval time.Duration) {
var (
counter int
ticker = time.NewTicker(interval)
statsTicker = time.NewTicker(10 * time.Second)
pending = make(map[int]time.Time)
recvCh = make(chan Msg, 1000)
)
if _, err := ch.Subscribe(func(m *sdk.Msg) {
rawmsg, ok := m.Properties.(*sdk.PublishMsg)
if !ok {
log.Println("Wrong message props type received: %T", m.Properties)
return
}
msg := Msg(rawmsg.Text)
if msg.IsResponse() {
recvCh <- msg
}
}); err != nil {
log.Fatal(err)
}
stats := NewStats()
for {
select {
case <-ticker.C:
var body = NewRequestMsg(counter)
err := ch.Publish(string(body))
if err != nil {
log.Printf("[ERROR] Failed to send health message: %s", err)
continue
}
pending[counter] = time.Now()
counter++
stats.AddSent()
case msg := <-recvCh:
c, err := msg.Counter()
if err != nil {
log.Printf("[ERROR] Failed to parse health message: %s", err)
continue
}
start, ok := pending[c]
if !ok {
log.Printf("[ERROR] Received response for counter never sent (another sender bot running?): %s", err)
continue
}
delete(pending, c)
dur := time.Since(start)
stats.AddRountrip(dur)
case <-statsTicker.C:
stats.Print()
}
}
}

47
stats.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"fmt"
"sync"
"time"
)
// Stats represents messages' statistics.
type Stats struct {
mx sync.RWMutex
sent int
received int
delays []time.Duration
}
// NewStats returns new empty Stats object.
func NewStats() *Stats {
return &Stats{}
}
// AddSent adds information about sent messages.
func (s *Stats) AddSent() {
s.mx.Lock()
defer s.mx.Unlock()
s.sent++
}
// AddRoundtrip adds information about successful message roundtrip.
func (s *Stats) AddRountrip(d time.Duration) {
s.mx.Lock()
defer s.mx.Unlock()
s.received++
s.delays = append(s.delays, d)
}
// Print dumps stats to the console.
func (s *Stats) Print() {
s.mx.RLock()
defer s.mx.RUnlock()
fmt.Println("-------------------------")
fmt.Println("Time:", time.Now())
fmt.Println("Sent:", s.sent)
fmt.Println("Received:", s.received)
fmt.Println("Delays:", s.delays)
fmt.Println("-------------------------")
}