add x-check-mailserver (#7)

This commit is contained in:
Adam Babik 2018-11-26 13:28:40 +01:00 committed by GitHub
parent 597264e486
commit a290aeaa1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 479 additions and 3 deletions

View File

@ -4,9 +4,7 @@ dependencies:
dep ensure
image: AUTHOR = $(shell echo $$USER)
image: GIT_COMMIT = $(shell tag=`git describe --exact-match --tag 2>/dev/null`; \
if [ $$? -eq 0 ]; then echo $$tag | sed 's/^v\(.*\)$$/\1/'; \
else git rev-parse --short HEAD; fi)
image: GIT_COMMIT = $(shell git rev-parse --short HEAD)
image:
docker build . \
--label "commit=$(GIT_COMMIT)" \
@ -17,3 +15,4 @@ image:
build:
go build -o ./bin/pubchats ./cmd/pubchats
go build -o ./bin/bench-mailserver ./cmd/bench-mailserver
go build -o ./bin/x-check-mailserver ./cmd/x-check-mailserver

View File

@ -0,0 +1,6 @@
x-check-mailserver
==================
Cross-check of mail server outputs.
This tool can be used to verify if all provided mail servers store the same messages and detect any anomalies.

View File

@ -0,0 +1,31 @@
package main
import (
"github.com/status-im/status-go/params"
)
func newNodeConfig(fleet string, networkID uint64) (*params.NodeConfig, error) {
c, err := params.NewNodeConfig("", networkID)
if err != nil {
return nil, err
}
if err := params.WithFleet(params.FleetBeta)(c); err != nil {
return nil, err
}
c.ListenAddr = ":0"
c.MaxPeers = 10
c.IPCEnabled = true
c.HTTPEnabled = false
c.NoDiscovery = true
c.Rendezvous = false
c.ClusterConfig.Enabled = false
c.ClusterConfig.StaticNodes = nil
c.WhisperConfig.Enabled = true
c.WhisperConfig.EnableNTPSync = true
return c, nil
}

View File

@ -0,0 +1,20 @@
package main
import (
"time"
"github.com/spf13/pflag"
"github.com/status-im/status-go/params"
)
var (
fleet = pflag.StringP("fleet", "f", params.FleetBeta, "cluster fleet")
mailservers = pflag.StringArrayP("mailservers", "m", nil, "a list of mail servers")
duration = pflag.DurationP("duration", "l", time.Hour*24, "length of time span from now")
channels = pflag.StringArrayP("channel", "p", []string{"status"}, "name of the channel")
verbosity = pflag.StringP("verbosity", "v", "INFO", "verbosity level of status-go, options: crit, error, warning, info, debug")
)
func init() {
pflag.Parse()
}

View File

@ -0,0 +1,241 @@
package main
import (
"context"
"encoding/hex"
"encoding/json"
"io/ioutil"
"log"
"math/rand"
"os"
stdsignal "os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/ethereum/go-ethereum"
"github.com/status-im/go-ethereum/common"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/signal"
"github.com/status-im/statusd-bots/protocol"
"github.com/status-im/whisper/shhclient"
whisper "github.com/status-im/whisper/whisperv6"
)
func init() {
if err := logutils.OverrideRootLog(true, *verbosity, "", false); err != nil {
log.Fatalf("failed to override root log: %v\n", err)
}
}
func main() {
// handle OS signals
signals := make(chan os.Signal, 1)
stdsignal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signals
os.Exit(1)
}()
// create config
config, err := newNodeConfig(*fleet, params.MainNetworkID)
if err != nil {
log.Fatalf("failed to create a config: %v", err)
}
// collect mail servers
mailserversToCheck := *mailservers
if len(mailserversToCheck) == 0 {
// -2 to get at least two mail servers
min := rand.Intn(len(config.ClusterConfig.TrustedMailServers) - 1)
max := min + rand.Intn(len(config.ClusterConfig.TrustedMailServers)-min)
mailserversToCheck = config.ClusterConfig.TrustedMailServers[min:max]
}
config.MaxPeers = len(mailserversToCheck)
// collect mail server request signals
mailSignalsForwarder := newSignalForwarder()
defer close(mailSignalsForwarder.in)
go mailSignalsForwarder.Start()
// setup signals handler
signal.SetDefaultNodeNotificationHandler(
filterMailTypesHandler(printHandler, mailSignalsForwarder.in),
)
// setup work
workConfig := WorkUnitConfig{
From: uint32(time.Now().Add(-*duration).Unix()),
To: uint32(time.Now().Add(-5 * time.Minute).Unix()), // subtract 5 mins to cater for TTL, time skew on devices etc.
Channels: *channels,
}
var workUnites []*WorkUnit
var wg sync.WaitGroup
wg.Add(len(mailserversToCheck))
for i, enode := range mailserversToCheck {
config.ListenAddr = "127.0.0.1:" + strconv.Itoa(44300+i)
config.DataDir, err = ioutil.TempDir("", "")
if err != nil {
log.Fatalf("failed to create temp dir: %v", err)
}
nodeConfig := *config
log.Printf("using node config: %v", nodeConfig)
work := NewWorkUnit(enode, &nodeConfig)
go func(work *WorkUnit) {
if err := work.Execute(workConfig, mailSignalsForwarder); err != nil {
log.Fatalf("failed to execute work: %v", err)
}
wg.Done()
}(work)
workUnites = append(workUnites, work)
}
wg.Wait()
exitCode := 0
for i, j := 0, 1; j < len(workUnites); j++ {
workA := workUnites[i]
workB := workUnites[j]
if len(workA.Messages) != len(workB.Messages) {
exitCode = 1
}
log.Printf("%s vs %s: ", workA.MailServerEnode, workB.MailServerEnode)
log.Printf(" messages: %d vs %d", len(workA.Messages), len(workB.Messages))
}
os.Exit(exitCode)
}
func addPublicChatSymKey(c *shhclient.Client, chat string) (string, error) {
// This operation can be really slow, hence 10 seconds timeout.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return c.GenerateSymmetricKeyFromPassword(ctx, chat)
}
func subscribeMessages(c *shhclient.Client, chat, symKeyID string, messages chan<- *whisper.Message) (ethereum.Subscription, error) {
topic, err := protocol.PublicChatTopic([]byte(chat))
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return c.SubscribeMessages(ctx, whisper.Criteria{
SymKeyID: symKeyID,
MinPow: 0,
Topics: []whisper.TopicType{topic},
AllowP2P: true,
}, messages)
}
func printHandler(event string) {
log.Printf("received signal: %v\n", event)
}
type signalEnvelope struct {
Type string `json:"type"`
Event json.RawMessage `json:"event"`
}
type mailTypeEvent struct {
RequestID common.Hash `json:"requestID"`
Hash common.Hash `json:"hash"`
LastEnvelopeHash common.Hash `json:"lastEnvelopeHash"`
}
type mailTypeSignal struct {
Type string
RequestID string
LastEnvelopeID []byte
}
type signalForwarder struct {
sync.Mutex
in chan mailTypeSignal
out map[string]chan<- mailTypeSignal
}
func newSignalForwarder() *signalForwarder {
return &signalForwarder{
in: make(chan mailTypeSignal),
out: make(map[string]chan<- mailTypeSignal),
}
}
func (s *signalForwarder) Start() {
for {
sig, ok := <-s.in
if !ok {
return
}
s.Lock()
out, found := s.out[sig.RequestID]
if found {
out <- sig
}
s.Unlock()
}
}
func (s *signalForwarder) cancel(reqID []byte) {
s.Lock()
delete(s.out, hex.EncodeToString(reqID))
s.Unlock()
}
func (s *signalForwarder) Filter(reqID []byte) (<-chan mailTypeSignal, func()) {
c := make(chan mailTypeSignal)
s.Lock()
s.out[hex.EncodeToString(reqID)] = c
s.Unlock()
return c, func() { s.cancel(reqID); close(c) }
}
func filterMailTypesHandler(fn func(string), in chan<- mailTypeSignal) func(string) {
return func(event string) {
fn(event)
var envelope signalEnvelope
if err := json.Unmarshal([]byte(event), &envelope); err != nil {
log.Fatalf("faild to unmarshal signal Envelope: %v", err)
}
switch envelope.Type {
case signal.EventMailServerRequestCompleted:
var event mailTypeEvent
if err := json.Unmarshal(envelope.Event, &event); err != nil {
log.Fatalf("faild to unmarshal signal event: %v", err)
}
in <- mailTypeSignal{
envelope.Type,
hex.EncodeToString(event.RequestID.Bytes()),
event.LastEnvelopeHash.Bytes(),
}
case signal.EventMailServerRequestExpired:
var event mailTypeEvent
if err := json.Unmarshal(envelope.Event, &event); err != nil {
log.Fatalf("faild to unmarshal signal event: %v", err)
}
in <- mailTypeSignal{
envelope.Type,
hex.EncodeToString(event.Hash.Bytes()),
nil,
}
}
}
}

View File

@ -0,0 +1,179 @@
package main
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"log"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/signal"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/statusd-bots/protocol"
"github.com/status-im/whisper/shhclient"
whisper "github.com/status-im/whisper/whisperv6"
)
// WorkUnit represents a single unit of work.
type WorkUnit struct {
MailServerEnode string
Messages []*whisper.Message
config *params.NodeConfig
node *node.StatusNode
shh *shhclient.Client
shhextAPI *shhext.PublicAPI
}
// NewWorkUnit creates a new WorkUnit instance.
func NewWorkUnit(mailEnode string, config *params.NodeConfig) *WorkUnit {
return &WorkUnit{
MailServerEnode: mailEnode,
config: config,
}
}
// WorkUnitConfig configures the execution of the work.
type WorkUnitConfig struct {
From uint32
To uint32
Channels []string
}
// Execute runs the work.
func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder) error {
if err := u.startNode(); err != nil {
return fmt.Errorf("failed to start node: %v", err)
}
if err := u.addPeer(); err != nil {
return fmt.Errorf("failed to add peer: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
mailServerSymKeyID, err := u.shh.GenerateSymmetricKeyFromPassword(
ctx, protocol.MailServerPassword)
if err != nil {
return fmt.Errorf("failed to generate sym key for mail server: %v", err)
}
var topics []whisper.TopicType
for _, ch := range config.Channels {
topic, err := protocol.PublicChatTopic([]byte(ch))
if err != nil {
return fmt.Errorf("failed to create topic: %v", err)
}
topics = append(topics, topic)
}
var messageSubErrs []<-chan error
messages := make(chan *whisper.Message)
for _, ch := range config.Channels {
symKeyID, err := addPublicChatSymKey(u.shh, ch)
if err != nil {
return fmt.Errorf("failed to add sym key for channel '%s': %v", ch, err)
}
sub, err := subscribeMessages(u.shh, ch, symKeyID, messages)
if err != nil {
return fmt.Errorf("failed to subscribe for messages: %v", err)
}
defer sub.Unsubscribe()
messageSubErrs = append(messageSubErrs, sub.Err())
}
reqID, err := u.shhextAPI.RequestMessages(nil, shhext.MessagesRequest{
MailServerPeer: u.MailServerEnode,
SymKeyID: mailServerSymKeyID,
From: config.From,
To: config.To,
Limit: 1000,
Topics: topics,
Timeout: 30,
})
if err != nil {
return fmt.Errorf("failed to request %s for messages: %v", u.MailServerEnode, err)
}
// TODO(adam): change to regular fanout. It might happen that a signal
// is received before the filter is setup.
signals, cancelSignalsFilter := mailSignals.Filter([]byte(reqID))
defer cancelSignalsFilter()
var lastEnvelopeID []byte
for {
select {
case m := <-messages:
log.Printf("received a message %s", hex.EncodeToString(m.Hash))
u.Messages = append(u.Messages, m)
case <-time.After(time.Second * 5):
// As we can not predict when messages finish to come in,
// we timeout after 5 seconds of silence.
// If lastEnvelopeID is found amoung received messages,
// it's a successful request. Otherwise, an error is returned.
for i, m := range u.Messages {
if bytes.Equal(lastEnvelopeID, m.Hash) {
log.Printf("received lastEnvelopeID %s on %d out of %d",
hex.EncodeToString(lastEnvelopeID), i, len(u.Messages))
return u.stopNode()
}
}
return fmt.Errorf("did not receive lastEnvelopeID: %s", hex.EncodeToString(lastEnvelopeID))
case err := <-merge(messageSubErrs...):
return fmt.Errorf("subscription for messages errored: %v", err)
case s := <-signals:
switch s.Type {
case signal.EventMailServerRequestCompleted:
lastEnvelopeID = s.LastEnvelopeID
case signal.EventMailServerRequestExpired:
return fmt.Errorf("request for messages expired")
}
}
}
}
func (u *WorkUnit) startNode() error {
u.node = node.New()
if err := u.node.Start(u.config); err != nil {
return fmt.Errorf("failed to start a node: %v", err)
}
rpcClient, err := u.node.GethNode().Attach()
if err != nil {
return fmt.Errorf("failed to get an rpc: %v", err)
}
u.shh = shhclient.NewClient(rpcClient)
shhextService, err := u.node.ShhExtService()
if err != nil {
return fmt.Errorf("failed go get an shhext service: %v", err)
}
u.shhextAPI = shhext.NewPublicAPI(shhextService)
return nil
}
func (u *WorkUnit) stopNode() error {
return u.node.Stop()
}
func (u *WorkUnit) addPeer() error {
if err := u.node.AddPeer(u.MailServerEnode); err != nil {
return err
}
return <-helpers.WaitForPeerAsync(
u.node.Server(),
u.MailServerEnode,
p2p.PeerEventTypeAdd,
5*time.Second,
)
}