// node-canary tests whether a P2P peer is responding correctly. package main import ( "context" "errors" "flag" "fmt" stdlog "log" "os" "path" "path/filepath" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/api" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/params" "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-protocol-go/transport/whisper/gethbridge" statusproto "github.com/status-im/status-protocol-go/types" "golang.org/x/crypto/sha3" "golang.org/x/crypto/ssh/terminal" whispertypes "github.com/status-im/status-protocol-go/transport/whisper/types" ) const ( mailboxPassword = "status-offline-inbox" ) // All general log messages in this package should be routed through this logger. var logger = log.New("package", "status-go/cmd/node-canary") var ( staticEnodeAddr = flag.String("staticnode", "", "static node enode address to test (e.g. enode://3f04db09bedc8d85a198de94c84da73aa7782fafc61b28c525ec5cca5a6cc16be7ebbb5cd001780f71d8408d35a2f6326faa1e524d9d8875294172ebec988743@172.16.238.10:30303)") mailserverEnodeAddr = flag.String("mailserver", "", "mailserver enode address to test (e.g. enode://1da276e34126e93babf24ec88aac1a7602b4cbb2e11b0961d0ab5e989ca9c261aa7f7c1c85f15550a5f1e5a5ca2305b53b9280cf5894d5ecf7d257b173136d40@167.99.209.61:30504)") publicChannel = flag.String("channel", "status", "The public channel name to retrieve historic messages from (used with 'mailserver' flag)") timeout = flag.Int("timeout", 10, "Timeout when connecting to node or fetching messages from mailserver, in seconds") period = flag.Int("period", 24*60*60, "How far in the past to request messages from mailserver, in seconds") minPow = flag.Float64("shh.pow", params.WhisperMinimumPoW, "PoW for messages to be added to queue, in float format") ttl = flag.Int("shh.ttl", params.WhisperTTL, "Time to live for messages, in seconds") homePath = flag.String("home-dir", ".", "Home directory where state is stored") logLevel = flag.String("log", "INFO", `Log level, one of: "ERROR", "WARN", "INFO", "DEBUG", and "TRACE"`) logFile = flag.String("logfile", "", "Path to the log file") logWithoutColors = flag.Bool("log-without-color", false, "Disables log colors") ) func main() { var err error var staticParsedNode, mailserverParsedNode *enode.Node if *staticEnodeAddr != "" { staticParsedNode, err = enode.ParseV4(*staticEnodeAddr) if err != nil { logger.Crit("Invalid static address specified", "staticEnodeAddr", *staticEnodeAddr, "error", err) os.Exit(1) } } if *mailserverEnodeAddr != "" { mailserverParsedNode, err = enode.ParseV4(*mailserverEnodeAddr) if err != nil { logger.Crit("Invalid mailserver address specified", "mailserverEnodeAddr", *mailserverEnodeAddr, "error", err) os.Exit(1) } } if staticParsedNode != nil { verifyStaticNodeBehavior(staticParsedNode) logger.Info("Connected to static node correctly", "address", *staticEnodeAddr) os.Exit(0) } if mailserverParsedNode != nil { verifyMailserverBehavior(mailserverParsedNode) logger.Info("Mailserver responded correctly", "address", *mailserverEnodeAddr) os.Exit(0) } logger.Crit("No address specified") os.Exit(1) } func init() { flag.Parse() colors := !(*logWithoutColors) if colors { colors = terminal.IsTerminal(int(os.Stdin.Fd())) } if err := logutils.OverrideRootLog(*logLevel != "", *logLevel, logutils.FileOptions{Filename: *logFile}, colors); err != nil { stdlog.Fatalf("Error initializing logger: %s", err) } } func verifyMailserverBehavior(mailserverNode *enode.Node) { clientBackend, err := startClientNode() if err != nil { logger.Error("Node start failed", "error", err) os.Exit(1) } defer func() { _ = clientBackend.StopNode() }() clientNode := clientBackend.StatusNode() clientGethWhisperService, err := clientNode.WhisperService() if err != nil { logger.Error("Could not retrieve Whisper service", "error", err) os.Exit(1) } clientWhisperService := gethbridge.NewGethWhisperWrapper(clientGethWhisperService) clientShhExtService, err := clientNode.ShhExtService() if err != nil { logger.Error("Could not retrieve shhext service", "error", err) os.Exit(1) } // add mailserver peer to client clientErrCh := helpers.WaitForPeerAsync(clientNode.Server(), *mailserverEnodeAddr, p2p.PeerEventTypeAdd, time.Duration(*timeout)*time.Second) err = clientNode.AddPeer(*mailserverEnodeAddr) if err != nil { logger.Error("Failed to add mailserver peer to client", "error", err) os.Exit(1) } err = <-clientErrCh if err != nil { logger.Error("Error detected while waiting for mailserver peer to be added", "error", err) os.Exit(1) } // add mailserver sym key mailServerKeyID, err := clientWhisperService.AddSymKeyFromPassword(mailboxPassword) if err != nil { logger.Error("Error adding mailserver sym key to client peer", "error", err) os.Exit(1) } mailboxPeer := mailserverNode.ID().Bytes() err = clientGethWhisperService.AllowP2PMessagesFromPeer(mailboxPeer) if err != nil { logger.Error("Failed to allow P2P messages from mailserver peer", "error", err, mailserverNode.String()) os.Exit(1) } clientRPCClient := clientNode.RPCClient() // TODO: Replace chat implementation with github.com/status-im/status-go-sdk _, topic, _, err := joinPublicChat(clientWhisperService, clientRPCClient, *publicChannel) if err != nil { logger.Error("Failed to join public chat", "error", err) os.Exit(1) } // watch for envelopes to be available in filters in the client envelopeAvailableWatcher := make(chan whispertypes.EnvelopeEvent, 1024) sub := clientWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher) defer sub.Unsubscribe() // watch for mailserver responses in the client mailServerResponseWatcher := make(chan whispertypes.EnvelopeEvent, 1024) sub = clientWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher) defer sub.Unsubscribe() // request messages from mailbox shhextAPI := shhext.NewPublicAPI(clientShhExtService) requestIDBytes, err := shhextAPI.RequestMessages(context.TODO(), shhext.MessagesRequest{ MailServerPeer: mailserverNode.String(), From: uint32(clientWhisperService.GetCurrentTime().Add(-time.Duration(*period) * time.Second).Unix()), Limit: 1, Topic: topic, SymKeyID: mailServerKeyID, Timeout: time.Duration(*timeout), }) if err != nil { logger.Error("Error requesting historic messages from mailserver", "error", err) os.Exit(2) } requestID := statusproto.BytesToHash(requestIDBytes) // wait for mailserver request sent event err = waitForMailServerRequestSent(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second) if err != nil { logger.Error("Error waiting for mailserver request sent event", "error", err) os.Exit(3) } // wait for mailserver response resp, err := waitForMailServerResponse(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second) if err != nil { logger.Error("Error waiting for mailserver response", "error", err) os.Exit(3) } // wait for last envelope sent by the mailserver to be available for filters err = waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whispertypes.EventEnvelopeAvailable) if err != nil { logger.Error("Error waiting for envelopes to be available to client filter", "error", err) os.Exit(4) } } func verifyStaticNodeBehavior(staticNode *enode.Node) { clientBackend, err := startClientNode() if err != nil { logger.Error("Node start failed", "error", err) os.Exit(1) } defer func() { _ = clientBackend.StopNode() }() clientNode := clientBackend.StatusNode() // wait for peer to be added to client clientErrCh := helpers.WaitForPeerAsync(clientNode.Server(), *staticEnodeAddr, p2p.PeerEventTypeAdd, 5*time.Second) err = <-clientErrCh if err != nil { logger.Error("Error detected while waiting for static peer to be added", "error", err) os.Exit(1) } // wait to check if peer remains connected to client clientErrCh = helpers.WaitForPeerAsync(clientNode.Server(), *staticEnodeAddr, p2p.PeerEventTypeDrop, 5*time.Second) err = <-clientErrCh peers := clientNode.GethNode().Server().Peers() if len(peers) != 1 { logger.Error("Failed to add static peer", "error", err) os.Exit(1) } } // makeNodeConfig parses incoming CLI options and returns node configuration object func makeNodeConfig() (*params.NodeConfig, error) { err := error(nil) workDir := "" if path.IsAbs(*homePath) { workDir = *homePath } else { workDir, err = filepath.Abs(filepath.Dir(os.Args[0])) if err == nil { workDir = path.Join(workDir, *homePath) } } if err != nil { return nil, err } nodeConfig, err := params.NewNodeConfigWithDefaults(path.Join(workDir, ".ethereum"), uint64(params.RopstenNetworkID)) if err != nil { return nil, err } if *logLevel != "" { nodeConfig.LogLevel = *logLevel nodeConfig.LogEnabled = true } if *logFile != "" { nodeConfig.LogFile = *logFile } nodeConfig.NoDiscovery = true nodeConfig.ListenAddr = "" if *staticEnodeAddr != "" { nodeConfig.ClusterConfig.Enabled = true nodeConfig.ClusterConfig.Fleet = params.FleetUndefined nodeConfig.ClusterConfig.StaticNodes = []string{ *staticEnodeAddr, } } return whisperConfig(nodeConfig) } // whisperConfig creates node configuration object from flags func whisperConfig(nodeConfig *params.NodeConfig) (*params.NodeConfig, error) { whisperConfig := &nodeConfig.WhisperConfig whisperConfig.Enabled = true whisperConfig.LightClient = true whisperConfig.MinimumPoW = *minPow whisperConfig.TTL = *ttl whisperConfig.EnableNTPSync = true return nodeConfig, nil } func startClientNode() (*api.StatusBackend, error) { config, err := makeNodeConfig() if err != nil { return nil, err } clientBackend := api.NewStatusBackend() err = clientBackend.StartNode(config) if err != nil { return nil, err } return clientBackend, err } func joinPublicChat(w whispertypes.Whisper, rpcClient *rpc.Client, name string) (string, whispertypes.TopicType, string, error) { keyID, err := w.AddSymKeyFromPassword(name) if err != nil { return "", whispertypes.TopicType{}, "", err } h := sha3.NewLegacyKeccak256() _, err = h.Write([]byte(name)) if err != nil { return "", whispertypes.TopicType{}, "", err } fullTopic := h.Sum(nil) topic := whispertypes.BytesToTopic(fullTopic) whisperAPI := w.PublicWhisperAPI() filterID, err := whisperAPI.NewMessageFilter(whispertypes.Criteria{SymKeyID: keyID, Topics: []whispertypes.TopicType{topic}}) return keyID, topic, filterID, err } func waitForMailServerRequestSent(events chan whispertypes.EnvelopeEvent, requestID statusproto.Hash, timeout time.Duration) error { timeoutTimer := time.NewTimer(timeout) for { select { case event := <-events: if event.Hash == requestID && event.Event == whispertypes.EventMailServerRequestSent { timeoutTimer.Stop() return nil } case <-timeoutTimer.C: return errors.New("timed out waiting for mailserver request sent") } } } func waitForMailServerResponse(events chan whispertypes.EnvelopeEvent, requestID statusproto.Hash, timeout time.Duration) (*whispertypes.MailServerResponse, error) { timeoutTimer := time.NewTimer(timeout) for { select { case event := <-events: if event.Hash == requestID { resp, err := decodeMailServerResponse(event) if resp != nil || err != nil { timeoutTimer.Stop() return resp, err } } case <-timeoutTimer.C: return nil, errors.New("timed out waiting for mailserver response") } } } func decodeMailServerResponse(event whispertypes.EnvelopeEvent) (*whispertypes.MailServerResponse, error) { switch event.Event { case whispertypes.EventMailServerRequestSent: return nil, nil case whispertypes.EventMailServerRequestCompleted: resp, ok := event.Data.(*whispertypes.MailServerResponse) if !ok { return nil, errors.New("failed to convert event to a *MailServerResponse") } return resp, nil case whispertypes.EventMailServerRequestExpired: return nil, errors.New("no messages available from mailserver") default: return nil, fmt.Errorf("unexpected event type: %v", event.Event) } } func waitForEnvelopeEvents(events chan whispertypes.EnvelopeEvent, hashes []string, event whispertypes.EventType) error { check := make(map[string]struct{}) for _, hash := range hashes { check[hash] = struct{}{} } timeout := time.NewTimer(time.Second * 5) for { select { case e := <-events: if e.Event == event { delete(check, e.Hash.String()) if len(check) == 0 { timeout.Stop() return nil } } case <-timeout.C: return fmt.Errorf("timed out while waiting for event on envelopes. event: %s", event) } } }