Add `Messenger.RetrieveAll` method. Closes #24 (#26)

This commit is contained in:
Pedro Pombeiro 2019-07-26 08:19:03 +02:00 committed by Adam Babik
parent 549afb5edf
commit f5973102d8
6 changed files with 134 additions and 36 deletions

View File

@ -75,6 +75,34 @@ func (a *whisperAdapter) LeavePrivate(publicKey *ecdsa.PublicKey) error {
return a.transport.LeavePrivate(publicKey)
}
type ChatMessages struct {
Messages []*protocol.Message
Public bool
ChatID string
}
func (a *whisperAdapter) RetrieveAllMessages() ([]ChatMessages, error) {
chatMessages, err := a.transport.RetrieveAllMessages()
if err != nil {
return nil, err
}
var result []ChatMessages
for _, messages := range chatMessages {
protoMessages, err := a.handleRetrievedMessages(messages.Messages)
if err != nil {
return nil, err
}
result = append(result, ChatMessages{
Messages: protoMessages,
Public: messages.Public,
ChatID: messages.ChatID,
})
}
return result, nil
}
// RetrievePublicMessages retrieves the collected public messages.
// It implies joining a chat if it has not been joined yet.
func (a *whisperAdapter) RetrievePublicMessages(chatID string) ([]*protocol.Message, error) {
@ -83,32 +111,7 @@ func (a *whisperAdapter) RetrievePublicMessages(chatID string) ([]*protocol.Mess
return nil, err
}
logger := a.logger.With(zap.String("site", "RetrievePublicMessages"))
decodedMessages := make([]*protocol.Message, 0, len(messages))
for _, item := range messages {
shhMessage := whisper.ToWhisperMessage(item)
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
hlogger.Debug("received a public message")
statusMessage, err := a.decodeMessage(shhMessage)
if err != nil {
hlogger.Error("failed to decode message", zap.Error(err))
continue
}
switch m := statusMessage.Message.(type) {
case protocol.Message:
m.ID = statusMessage.ID
m.SigPubKey = statusMessage.SigPubKey
decodedMessages = append(decodedMessages, &m)
default:
hlogger.Error("skipped a public message of unsupported type")
}
}
return decodedMessages, nil
return a.handleRetrievedMessages(messages)
}
// RetrievePrivateMessages retrieves the collected private messages.
@ -119,7 +122,11 @@ func (a *whisperAdapter) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]
return nil, err
}
logger := a.logger.With(zap.String("site", "RetrievePrivateMessages"))
return a.handleRetrievedMessages(messages)
}
func (a *whisperAdapter) handleRetrievedMessages(messages []*whisper.ReceivedMessage) ([]*protocol.Message, error) {
logger := a.logger.With(zap.String("site", "handleRetrievedMessages"))
decodedMessages := make([]*protocol.Message, 0, len(messages))
for _, item := range messages {
@ -127,7 +134,7 @@ func (a *whisperAdapter) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
hlogger.Debug("received a private message")
hlogger.Debug("handling a received message")
err := a.decryptMessage(context.Background(), shhMessage)
if err != nil {
@ -161,6 +168,8 @@ func (a *whisperAdapter) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]
if err != nil {
return nil, err
}
default:
hlogger.Debug("skipped a message of unsupported type")
}
}
return decodedMessages, nil

View File

@ -44,14 +44,12 @@ type Messenger struct {
persistence persistence
adapter *whisperAdapter
encryptor *encryption.Protocol
logger *zap.Logger
ownMessages map[string][]*protocol.Message
featureFlags featureFlags
messagesPersistenceEnabled bool
logger *zap.Logger
shutdownTasks []func() error
shutdownTasks []func() error
}
type featureFlags struct {
@ -404,6 +402,47 @@ var (
RetrieveLastDay = RetrieveConfig{latest: true, last24Hours: true}
)
// RetrieveAll retrieves all previously fetched messages
func (m *Messenger) RetrieveAll(ctx context.Context, c RetrieveConfig) (allMessages []*protocol.Message, err error) {
latest, err := m.adapter.RetrieveAllMessages()
if err != nil {
err = errors.Wrap(err, "failed to retrieve messages")
return
}
for _, messages := range latest {
chatID := messages.ChatID
_, err = m.persistence.SaveMessages(chatID, messages.Messages)
if err != nil {
return nil, errors.Wrap(err, "failed to save messages")
}
if !messages.Public {
// Return any own messages for this chat as well.
if ownMessages, ok := m.ownMessages[chatID]; ok {
messages.Messages = append(messages.Messages, ownMessages...)
}
}
retrievedMessages, err := m.retrieveSaved(ctx, chatID, c, messages.Messages)
if err != nil {
return nil, errors.Wrap(err, "failed to get saved messages")
}
allMessages = append(allMessages, retrievedMessages...)
}
// Delete own messages as they were added to the result.
for _, messages := range latest {
if !messages.Public {
delete(m.ownMessages, messages.ChatID)
}
}
return
}
func (m *Messenger) Retrieve(ctx context.Context, chat Chat, c RetrieveConfig) (messages []*protocol.Message, err error) {
var (
latest []*protocol.Message
@ -444,25 +483,31 @@ func (m *Messenger) Retrieve(ctx context.Context, chat Chat, c RetrieveConfig) (
}
}
// We may need to add more messages from the past.
result, err := m.retrieveSaved(ctx, chat.ID(), c, append(latest, ownLatest...))
if err != nil {
return nil, err
}
// When our messages are returned, we can delete them.
delete(m.ownMessages, chat.ID())
return m.retrieveSaved(ctx, chat, c, append(latest, ownLatest...))
return result, nil
}
func (m *Messenger) retrieveSaved(ctx context.Context, chat Chat, c RetrieveConfig, latest []*protocol.Message) (messages []*protocol.Message, err error) {
func (m *Messenger) retrieveSaved(ctx context.Context, chatID string, c RetrieveConfig, latest []*protocol.Message) (messages []*protocol.Message, err error) {
if !m.messagesPersistenceEnabled {
return latest, nil
}
if !c.latest {
return m.persistence.Messages(chat.ID(), c.From, c.To)
return m.persistence.Messages(chatID, c.From, c.To)
}
if c.last24Hours {
to := time.Now()
from := to.Add(-time.Hour * 24)
return m.persistence.Messages(chat.ID(), from, to)
return m.persistence.Messages(chatID, from, to)
}
return latest, nil

View File

@ -58,6 +58,10 @@ type Chat struct {
Listen bool `json:"listen"`
}
func (c *Chat) IsPublic() bool {
return !c.OneToOne
}
type ChatsManager struct {
whisper *whisper.Whisper
persistence *sqlitePersistence
@ -278,6 +282,7 @@ func (s *ChatsManager) loadPartitioned(publicKey *ecdsa.PublicKey, listen bool)
Topic: filter.Topic,
Identity: publicKeyToStr(publicKey),
Listen: listen,
OneToOne: true,
}
s.chats[chatID] = chat
@ -310,6 +315,7 @@ func (s *ChatsManager) LoadNegotiated(secret NegotiatedSecret) (*Chat, error) {
Identity: publicKeyToStr(secret.PublicKey),
Negotiated: true,
Listen: true,
OneToOne: true,
}
s.chats[chat.ChatID] = chat
@ -354,6 +360,7 @@ func (s *ChatsManager) LoadDiscovery() ([]*Chat, error) {
Identity: identityStr,
Discovery: true,
Listen: true,
OneToOne: true,
}
discoveryResponse, err = s.addAsymmetric(personalDiscoveryChat.ChatID, true)
@ -373,6 +380,7 @@ func (s *ChatsManager) LoadDiscovery() ([]*Chat, error) {
Identity: identityStr,
Discovery: true,
Listen: true,
OneToOne: true,
}
discoveryResponse, err = s.addAsymmetric(discoveryChat.ChatID, true)
@ -411,6 +419,7 @@ func (s *ChatsManager) LoadPublic(chatID string) (*Chat, error) {
SymKeyID: filterAndTopic.SymKeyID,
Topic: filterAndTopic.Topic,
Listen: true,
OneToOne: false,
}
s.chats[chatID] = chat

View File

@ -167,6 +167,34 @@ func (a *WhisperServiceTransport) LeavePrivate(publicKey *ecdsa.PublicKey) error
return a.chats.Remove(chats...)
}
type ChatMessages struct {
Messages []*whisper.ReceivedMessage
Public bool
ChatID string
}
func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error) {
chatMessages := make(map[string]ChatMessages)
for _, chat := range a.chats.Chats() {
f := a.shh.GetFilter(chat.FilterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
messages := chatMessages[chat.ChatID]
messages.ChatID = chat.ChatID
messages.Public = chat.IsPublic()
messages.Messages = append(messages.Messages, f.Retrieve()...)
}
var result []ChatMessages
for _, messages := range chatMessages {
result = append(result, messages)
}
return result, nil
}
func (a *WhisperServiceTransport) RetrievePublicMessages(chatID string) ([]*whisper.ReceivedMessage, error) {
chat, err := a.chats.LoadPublic(chatID)
if err != nil {
@ -195,6 +223,7 @@ func (a *WhisperServiceTransport) RetrievePrivateMessages(publicKey *ecdsa.Publi
if f == nil {
return nil, errors.New("failed to return a filter")
}
result = append(result, f.Retrieve()...)
}
@ -211,6 +240,7 @@ func (a *WhisperServiceTransport) RetrieveRawAll() (map[filter.Chat][]*whisper.R
if f == nil {
return nil, errors.New("failed to return a filter")
}
result[*chat] = append(result[*chat], f.Retrieve()...)
}

3
v1/README.md Normal file
View File

@ -0,0 +1,3 @@
# v1 statusproto package folder
This folder contains only data types mentioned in the specification and required for its implementation.

View File

@ -99,6 +99,8 @@ type Message struct {
Flags Flags `json:"-"`
ID []byte `json:"-"`
SigPubKey *ecdsa.PublicKey `json:"-"`
ChatID string `json:"-"`
Public bool `json:"-"`
}
func (m *Message) MarshalJSON() ([]byte, error) {