From cd8f8aaf6201b4eaa61757dfceb436bb57582c55 Mon Sep 17 00:00:00 2001 From: Pablo Lopez Date: Mon, 10 Jun 2024 10:10:09 +0300 Subject: [PATCH] fix_: cli not logging all messages (#5317) * fix_: cli not logging all messages * refactor_: PR comments: refactor interactive send messages and marshal before --- cmd/status-cli/message.go | 110 +++++++++++++++++++++++-------------- cmd/status-cli/serve.go | 76 +++++++++++++++---------- cmd/status-cli/simulate.go | 16 ++---- cmd/status-cli/util.go | 3 +- 4 files changed, 123 insertions(+), 82 deletions(-) diff --git a/cmd/status-cli/message.go b/cmd/status-cli/message.go index 519b214cf..005edff40 100644 --- a/cmd/status-cli/message.go +++ b/cmd/status-cli/message.go @@ -3,6 +3,7 @@ package main import ( "bufio" "context" + "log/slog" "os" "strings" "sync" @@ -12,17 +13,15 @@ import ( "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" - - "github.com/urfave/cli/v2" ) -func (cli *StatusCLI) sendContactRequest(cCtx *cli.Context, toID string) error { +func (cli *StatusCLI) sendContactRequest(ctx context.Context, toID string) error { cli.logger.Info("send contact request, contact public key: ", toID) request := &requests.SendContactRequest{ ID: toID, Message: "Hello!", } - resp, err := cli.messenger.SendContactRequest(cCtx.Context, request) + resp, err := cli.messenger.SendContactRequest(ctx, request) cli.logger.Info("function SendContactRequest response.messages: ", resp.Messages()) if err != nil { return err @@ -31,9 +30,9 @@ func (cli *StatusCLI) sendContactRequest(cCtx *cli.Context, toID string) error { return nil } -func (cli *StatusCLI) sendContactRequestAcceptance(cCtx *cli.Context, msgID string) error { +func (cli *StatusCLI) sendContactRequestAcceptance(ctx context.Context, msgID string) error { cli.logger.Info("accept contact request, message ID: ", msgID) - resp, err := cli.messenger.AcceptContactRequest(cCtx.Context, &requests.AcceptContactRequest{ID: types.Hex2Bytes(msgID)}) + resp, err := cli.messenger.AcceptContactRequest(ctx, &requests.AcceptContactRequest{ID: types.Hex2Bytes(msgID)}) if err != nil { return err } @@ -102,48 +101,75 @@ func (cli *StatusCLI) retrieveMessagesLoop(ctx context.Context, tick time.Durati } } -func (cli *StatusCLI) sendMessageLoop(ctx context.Context, tick time.Duration, wg *sync.WaitGroup, sem chan struct{}, cancel context.CancelFunc) { - defer wg.Done() - - ticker := time.NewTicker(tick) - defer ticker.Stop() - +// interactiveSendMessageLoop reads input from stdin and sends it as a direct message to the first mutual contact. +// +// If multiple CLIs are provided, it will send messages in a round-robin fashion: +// 1st input message will be from Alice, 2nd from Bob, 3rd from Alice, and so on. +func interactiveSendMessageLoop(ctx context.Context, clis ...*StatusCLI) { reader := bufio.NewReader(os.Stdin) + i := -1 + n := len(clis) + if n == 0 { + slog.Error("at least 1 CLI needed") + return + } for { - select { - case <-ticker.C: - if len(cli.messenger.MutualContacts()) == 0 { - continue - } - sem <- struct{}{} - cli.logger.Info("Enter your message to send: (type 'quit' or 'q' to exit)") - message, err := reader.ReadString('\n') - if err != nil { - <-sem - cli.logger.Error("failed to read input", err) - continue - } + i++ + if i >= n { + i = 0 + } + cli := clis[i] // round robin cli selection - message = strings.TrimSpace(message) - if message == "quit" || message == "q" || strings.Contains(message, "\x03") { - cancel() - <-sem + if len(cli.messenger.MutualContacts()) == 0 { + // waits for 1 second before trying again + time.Sleep(1 * time.Second) + continue + } + cli.logger.Info("Enter your message to send: (type 'quit' or 'q' to exit)") + + message, err := readInput(ctx, reader) + if err != nil { + if err == context.Canceled { return } - if message == "" { - <-sem - continue - } - - err = cli.sendDirectMessage(ctx, message) - time.Sleep(WaitingInterval) - <-sem - if err != nil { - cli.logger.Error("failed to send direct message: ", err) - continue - } - case <-ctx.Done(): + cli.logger.Error("failed to read input", err) + continue + } + message = strings.TrimSpace(message) + if message == "quit" || message == "q" || strings.Contains(message, "\x03") { return } + if message == "" { + continue + } + if err = cli.sendDirectMessage(ctx, message); err != nil { + cli.logger.Error("failed to send direct message: ", err) + continue + } + } +} + +// readInput reads input from the reader and respects context cancellation +func readInput(ctx context.Context, reader *bufio.Reader) (string, error) { + inputCh := make(chan string, 1) + errCh := make(chan error, 1) + + // Start a goroutine to read input + go func() { + input, err := reader.ReadString('\n') + if err != nil { + errCh <- err + return + } + inputCh <- input + }() + + select { + case <-ctx.Done(): + return "", ctx.Err() + case input := <-inputCh: + return input, nil + case err := <-errCh: + return "", err } } diff --git a/cmd/status-cli/serve.go b/cmd/status-cli/serve.go index 75117303b..29be29d39 100644 --- a/cmd/status-cli/serve.go +++ b/cmd/status-cli/serve.go @@ -2,26 +2,21 @@ package main import ( "context" + "encoding/json" "log" "os" "os/signal" - "sync" "syscall" + "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/protobuf" + msignal "github.com/status-im/status-go/signal" + "github.com/urfave/cli/v2" "go.uber.org/zap" ) func serve(cCtx *cli.Context) error { - ctx, cancel := context.WithCancel(cCtx.Context) - - go func() { - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - <-sig - cancel() - }() - rawLogger, err := zap.NewDevelopment() if err != nil { log.Fatalf("Error initializing logger: %v", err) @@ -38,44 +33,69 @@ func serve(cCtx *cli.Context) error { apiModules := cCtx.String(APIModulesFlag) telemetryUrl := cCtx.String(TelemetryServerURLFlag) - cli, err := start(cCtx, name, port, apiModules, telemetryUrl) + cli, err := start(name, port, apiModules, telemetryUrl) if err != nil { return err } defer cli.stop() - // Retrieve for messages - var wg sync.WaitGroup - msgCh := make(chan string) + // Using the mobile signal handler to listen for received messages + // because if we call messenger.RetrieveAll() from different routines we will miss messages in one of them + // and the retrieve messages loop is started when starting a node, so we needed a different appproach, + // alternatively we could have implemented another notification mechanism in the messenger, but this signal is already in place + msignal.SetMobileSignalHandler(msignal.MobileSignalHandler(func(s []byte) { + var ev MobileSignalEvent + if err := json.Unmarshal(s, &ev); err != nil { + logger.Errorf("unmarshaling signal event: %v", err) + return + } - wg.Add(1) - go cli.retrieveMessagesLoop(ctx, RetrieveInterval, msgCh, &wg) + if ev.Type == msignal.EventNewMessages { + for _, message := range ev.Event.Messages { + logger.Infof("message received: %v (ID=%v)", message.Text, message.ID) + // if request contact, accept it + if message.ContentType == protobuf.ChatMessage_SYSTEM_MESSAGE_MUTUAL_EVENT_SENT { + if err = cli.sendContactRequestAcceptance(cCtx.Context, message.ID); err != nil { + logger.Errorf("accepting contact request: %v", err) + return + } + } + } + } + })) - // Send and accept contact request + // Send contact request dest := cCtx.String(AddFlag) if dest != "" { - err := cli.sendContactRequest(cCtx, dest) + err := cli.sendContactRequest(cCtx.Context, dest) if err != nil { return err } } + // nightly testrunner looks for this log to consider node as started + logger.Info("retrieve messages...") + + ctx, cancel := context.WithCancel(cCtx.Context) go func() { - msgID := <-msgCh - err = cli.sendContactRequestAcceptance(cCtx, msgID) - if err != nil { - logger.Error(err) - return - } + // Wait for signal to exit + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + cancel() }() // Send message if mutual contact exists - sem := make(chan struct{}, 1) - wg.Add(1) - go cli.sendMessageLoop(ctx, SendInterval, &wg, sem, cancel) + interactiveSendMessageLoop(ctx, cli) - wg.Wait() logger.Info("Exiting") return nil } + +type MobileSignalEvent struct { + Type string `json:"type"` + Event struct { + Messages []*common.Message `json:"messages"` + } `json:"event"` +} diff --git a/cmd/status-cli/simulate.go b/cmd/status-cli/simulate.go index 15aee6978..35b895933 100644 --- a/cmd/status-cli/simulate.go +++ b/cmd/status-cli/simulate.go @@ -39,13 +39,13 @@ func simulate(cCtx *cli.Context) error { apiModules := cCtx.String(APIModulesFlag) telemetryUrl := cCtx.String(TelemetryServerURLFlag) - alice, err := start(cCtx, "Alice", 0, apiModules, telemetryUrl) + alice, err := start("Alice", 0, apiModules, telemetryUrl) if err != nil { return err } defer alice.stop() - charlie, err := start(cCtx, "Charlie", 0, apiModules, telemetryUrl) + charlie, err := start("Charlie", 0, apiModules, telemetryUrl) if err != nil { return err } @@ -63,27 +63,23 @@ func simulate(cCtx *cli.Context) error { // Send contact request from Alice to Charlie, charlie accept the request time.Sleep(WaitingInterval) destID := charlie.messenger.GetSelfContact().ID - err = alice.sendContactRequest(cCtx, destID) + err = alice.sendContactRequest(ctx, destID) if err != nil { return err } msgID := <-msgCh - err = charlie.sendContactRequestAcceptance(cCtx, msgID) + err = charlie.sendContactRequestAcceptance(ctx, msgID) if err != nil { return err } + time.Sleep(WaitingInterval) // Send DM between alice to charlie interactive := cCtx.Bool(InteractiveFlag) if interactive { - sem := make(chan struct{}, 1) - wg.Add(1) - go alice.sendMessageLoop(ctx, SendInterval, &wg, sem, cancel) - wg.Add(1) - go charlie.sendMessageLoop(ctx, SendInterval, &wg, sem, cancel) + interactiveSendMessageLoop(ctx, alice, charlie) } else { - time.Sleep(WaitingInterval) for i := 0; i < cCtx.Int(CountFlag); i++ { err = alice.sendDirectMessage(ctx, fmt.Sprintf("message from alice, number: %d", i+1)) if err != nil { diff --git a/cmd/status-cli/util.go b/cmd/status-cli/util.go index d5a8fa154..952a0b606 100644 --- a/cmd/status-cli/util.go +++ b/cmd/status-cli/util.go @@ -12,7 +12,6 @@ import ( "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/services/wakuv2ext" - "github.com/urfave/cli/v2" "go.uber.org/zap" ) @@ -34,7 +33,7 @@ func setupLogger(file string) *zap.Logger { return logutils.ZapLogger() } -func start(cCtx *cli.Context, name string, port int, apiModules string, telemetryUrl string) (*StatusCLI, error) { +func start(name string, port int, apiModules string, telemetryUrl string) (*StatusCLI, error) { namedLogger := logger.Named(name) namedLogger.Info("starting messager")