diff --git a/chat.go b/chat.go index 6ad694d..158b799 100644 --- a/chat.go +++ b/chat.go @@ -79,7 +79,7 @@ func (c *ChatViewController) readEventsLoop(contact client.Contact) { if len(all) != 0 { clock = all[len(all)-1].Clock } - log.Printf("[ChatViewController::readEventsLoops] retrieved %d messages", len(messages)) + log.Printf("[ChatViewController::readEventsLoop] retrieved %d messages", len(messages)) c.printMessages(true, all...) inorder = true } else { @@ -95,29 +95,35 @@ func (c *ChatViewController) readEventsLoop(contact client.Contact) { c.onError(err) return case event := <-events: - log.Printf("[ChatViewController::readEventsLoops] received an event: %+v", event) + log.Printf("[ChatViewController::readEventsLoop] received an event: %+v", event) switch ev := event.Interface.(type) { case client.EventWithError: c.onError(ev.GetError()) case client.EventWithContact: - log.Printf("[ChatViewController::readEventsLoops] selected contact %v, msg contact %v\n", contact, ev.GetContact()) if !ev.GetContact().Equal(contact) { + log.Printf("[ChatViewController::readEventsLoop] selected and received message contact are not equal: %s, %s", contact, ev.GetContact()) continue } msgev, ok := ev.(client.EventWithMessage) if !ok { + log.Printf("[ChatViewController::readEventsLoop] can not convert to EventWithMessage") continue } if !inorder { + log.Printf("[ChatViewController::readEventsLoop] not in order; skipping") continue } + msg := msgev.GetMessage() - log.Printf("[ChatViewController::readEventsLoops] received message current clock %v - msg clock %v\n", clock, msg.Clock) + log.Printf("[ChatViewController::readEventsLoop] received message %v", msg) + if msg.Clock < clock { inorder = false + log.Printf("[ChatViewController::readEventsLoop] received message is out of order") continue } + messages = append(messages, msg) } case contact = <-c.changeContact: @@ -162,6 +168,7 @@ func (c *ChatViewController) RequestMessages(params protocol.RequestOptions) err // Send sends a payload as a message. func (c *ChatViewController) Send(data []byte) error { + log.Printf("[ChatViewController::Send]") return c.messenger.Send(c.contact, data) } diff --git a/main.go b/main.go index b4a3595..2745843 100644 --- a/main.go +++ b/main.go @@ -56,6 +56,8 @@ func main() { err := os.MkdirAll(*dataDir, 0777) if err != nil { exitErr(err) + } else { + fmt.Printf("Starting in %s\n", *dataDir) } logPath := filepath.Join(*dataDir, "client.log") logFile, err := os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) @@ -150,14 +152,32 @@ func main() { } } } + + // run in a goroutine to show the UI faster go func() { - err = messenger.Start() - if err != nil { + if err := messenger.Start(); err != nil { exitErr(err) } }() + done := make(chan bool, 1) + sigs := make(chan os.Signal, 1) + + ossignal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + log.Printf("received signal: %v", sig) + done <- true + }() + + log.Printf("starting UI...") + if !*noUI { + go func() { + <-done + exitErr(errors.New("exit with signal")) + }() + if err := setupGUI(privateKey, messenger); err != nil { exitErr(err) } @@ -165,19 +185,9 @@ func main() { if err := g.MainLoop(); err != nil && err != gocui.ErrQuit { exitErr(err) } + g.Close() } else { - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - - ossignal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - - go func() { - sig := <-sigs - log.Printf("received signal: %v", sig) - done <- true - }() - <-done } } diff --git a/protocol/adapters/whisper_service.go b/protocol/adapters/whisper_service.go index 83cc565..4ba8edd 100644 --- a/protocol/adapters/whisper_service.go +++ b/protocol/adapters/whisper_service.go @@ -326,10 +326,10 @@ func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, req shhext. if err != nil { return } - shhextAPI := shhext.NewPublicAPI(shhextService) - log.Printf("[WhisperServiceAdapter::requestMessages] request for a chunk with %d messages\n", req.Limit) + log.Printf("[WhisperServiceAdapter::requestMessages] request for a chunk with %d messages", req.Limit) + start := time.Now() resp, err = shhextAPI.RequestMessagesSync(shhext.RetryConfig{ BaseTimeout: time.Second * 10, @@ -337,12 +337,12 @@ func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, req shhext. MaxRetries: 3, }, req) if err != nil { - log.Printf("[WhisperServiceAdapter::requestMessages] timed out. err %v\n", err) + log.Printf("[WhisperServiceAdapter::requestMessages] failed with err: %v", err) return } - log.Printf("[WhisperServiceAdapter::requestMessages] delivery for %d message took %v\n", req.Limit, time.Since(start)) - log.Printf("[WhisperServiceAdapter::requestMessages] response = %+v, err = %v\n", resp, err) + log.Printf("[WhisperServiceAdapter::requestMessages] delivery of %d message took %v", req.Limit, time.Since(start)) + log.Printf("[WhisperServiceAdapter::requestMessages] response: %+v", resp) if resp.Error != nil { err = resp.Error @@ -353,7 +353,7 @@ func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, req shhext. } req.Cursor = resp.Cursor - log.Printf("[WhisperServiceAdapter::requestMessages] request messages with cursor %v\n", req.Cursor) + log.Printf("[WhisperServiceAdapter::requestMessages] request messages with cursor %v", req.Cursor) return a.requestMessages(ctx, req, true) } diff --git a/protocol/client/messenger_v2.go b/protocol/client/messenger_v2.go index a158a96..e327498 100644 --- a/protocol/client/messenger_v2.go +++ b/protocol/client/messenger_v2.go @@ -51,31 +51,40 @@ func (m *MessengerV2) Start() error { if err != nil { return err } + if contacts[i].Type == ContactPublicKey { _, exist := m.private[contacts[i].Topic] + if exist { continue } + stream := NewStream(context.Background(), options, m.proto, NewPrivateHandler(m.db)) err := stream.Start() if err != nil { return errors.Wrap(err, "unable to start private stream") } + m.private[contacts[i].Topic] = stream } else { _, exist := m.public[contacts[i].Topic] + if exist { return fmt.Errorf("multiple public chats with same topic: %s", contacts[i].Topic) } + stream := NewStream(context.Background(), options, m.proto, NewPublicHandler(contacts[i], m.db)) err := stream.Start() if err != nil { return errors.Wrap(err, "unable to start stream") } + m.public[contacts[i].Topic] = stream } } - log.Printf("[INFO] request messages from mail sever") + + log.Printf("[Mesenger:Start] request messages from mail sever") + return m.RequestAll(context.Background(), true) } @@ -162,18 +171,21 @@ func (m *MessengerV2) Request(ctx context.Context, c Contact, options protocol.R } func (m *MessengerV2) requestHistories(ctx context.Context, histories []History, opts protocol.RequestOptions) error { - log.Printf("[messenger::RequestAll] requesting messages for chats %+v: from %d to %d\n", opts.Chats, opts.From, opts.To) + log.Printf("[Messenger::requestHistories] requesting messages for chats %+v: from %d to %d\n", opts.Chats, opts.From, opts.To) + start := time.Now() + err := m.proto.Request(ctx, opts) if err != nil { return err } - log.Printf("[messenger::RequestAll] requesting message for chats %+v finished. took %v\n", opts.Chats, time.Since(start)) + + log.Printf("[Messenger::requestHistories] requesting message for chats %+v finished in %s\n", opts.Chats, time.Since(start)) + for i := range histories { histories[i].Synced = opts.To } - err = m.db.UpdateHistories(histories) - return err + return m.db.UpdateHistories(histories) } func (m *MessengerV2) RequestAll(ctx context.Context, newest bool) error { @@ -204,6 +216,9 @@ func (m *MessengerV2) RequestAll(ctx context.Context, newest bool) error { }() } wg.Wait() + + log.Printf("[Messenger::RequestAll] finished requesting histories") + close(errors) for err := range errors { if err != nil { @@ -239,10 +254,18 @@ func (m *MessengerV2) Send(c Contact, data []byte) error { return errors.Wrap(err, "failed to prepare send options") } - hash, err := m.proto.Send(context.Background(), encodedMessage, opts) + log.Printf("[Messenger::Send] sending message") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hash, err := m.proto.Send(ctx, encodedMessage, opts) if err != nil { return errors.Wrap(err, "can't send a message") } + + log.Printf("[Messenger::Send] sent message with hash %x", hash) + message.ID = hash message.SigPubKey = &m.identity.PublicKey _, err = m.db.SaveMessages(c, []*protocol.Message{&message}) diff --git a/protocol/client/stream.go b/protocol/client/stream.go index 1cd90b0..547d007 100644 --- a/protocol/client/stream.go +++ b/protocol/client/stream.go @@ -57,22 +57,42 @@ type PrivateStream struct { } func (priv PrivateStream) Handle(msg protocol.Message) error { - if msg.SigPubKey == nil { + publicKey := msg.SigPubKey + + if publicKey == nil { return errors.New("message should be signed") } - keyhex := PubkeyToHex(msg.SigPubKey) + contact := Contact{ Type: ContactPublicKey, State: ContactNew, - Name: keyhex, // TODO(dshulyak) replace with 3-word funny name - PublicKey: msg.SigPubKey, + Name: PubkeyToHex(publicKey), // TODO(dshulyak) replace with 3-word funny name + PublicKey: publicKey, Topic: DefaultPrivateTopic(), } - exist, err := priv.db.PublicContactExist(contact) + + exists, err := priv.db.PublicContactExist(contact) if err != nil { return errors.Wrap(err, "error verifying if public contact exist") } - if !exist { + if exists { + // TODO: replace with db.ContactByPublicKey() + contacts, err := priv.db.Contacts() + if err != nil { + return errors.Wrap(err, "error getting contacts") + } + for _, c := range contacts { + if c.PublicKey == nil { + continue + } + + // TODO: extract + if publicKey.X.Cmp(c.PublicKey.X) == 0 && publicKey.Y.Cmp(c.PublicKey.Y) == 0 { + contact = c + break + } + } + } else { err := priv.db.SaveContacts([]Contact{contact}) if err != nil { return errors.Wrap(err, "can't add new contact")