status-go/wakuv2/common/message.go
kaichao 47899fd045
feat_: hash based query for outgoing messages. (#5217)
* feat_: hash based query for outgoing messages.

* chore_: more logs

* chore_: fix comments

* chore_: do not lock when send queries

* chore_: use constant for magic number

* chore_: remove message ids from query queue after ack

* chore_: fix ack clean process

* chore_: fix message resend test

* chore_: add test for waku confirm message sent.

* chore_: fix tests.

* chore_: fix more

* chore_: set store peer id when mailserver updates

* fix_: tests

* chore_: increase max hash query length

* chore_: remove debug log of ack message

* chore_: remove automatic peer selection

* chore_: mark raw message to sent after ack

* chore_: fix test

* chore_: fix test
2024-06-11 15:45:01 +08:00

257 lines
6.7 KiB
Go

package common
import (
"crypto/ecdsa"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
// MessageType represents where this message comes from
type MessageType int
const (
RelayedMessageType MessageType = iota
StoreMessageType
SendMessageType
)
// MessageParams specifies the exact way a message should be wrapped
// into an Envelope.
type MessageParams struct {
Src *ecdsa.PrivateKey
Dst *ecdsa.PublicKey
KeySym []byte
Topic TopicType
Payload []byte
Padding []byte
}
// ReceivedMessage represents a data packet to be received through the
// WakuV2 protocol and successfully decrypted.
type ReceivedMessage struct {
Envelope *protocol.Envelope // Wrapped Waku Message
MsgType MessageType
Data []byte
Padding []byte
Signature []byte
Sent uint32 // Time when the message was posted into the network in seconds
Src *ecdsa.PublicKey // Message recipient (identity used to decode the message)
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
PubsubTopic string
ContentTopic TopicType
SymKeyHash common.Hash // The Keccak256Hash of the key
hash common.Hash
Processed atomic.Bool
}
// MessagesRequest contains details of a request for historic messages.
type MessagesRequest struct {
// ID of the request. The current implementation requires ID to be 32-byte array,
// however, it's not enforced for future implementation.
ID []byte `json:"id"`
// From is a lower bound of time range.
From uint32 `json:"from"`
// To is a upper bound of time range.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request.
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests.
Cursor []byte `json:"cursor"`
// Topics is a list of topics. A returned message should
// belong to one of the topics from the list.
Topics [][]byte `json:"topics"`
}
func (r MessagesRequest) Validate() error {
if len(r.ID) != common.HashLength {
return errors.New("invalid 'ID', expected a 32-byte slice")
}
if r.From > r.To {
return errors.New("invalid 'From' value which is greater than To")
}
if r.Limit > MaxLimitInMessagesRequest {
return fmt.Errorf("invalid 'Limit' value, expected value lower than %d", MaxLimitInMessagesRequest)
}
return nil
}
// EnvelopeError code and optional description of the error.
type EnvelopeError struct {
Hash common.Hash
Code uint
Description string
}
// ErrorToEnvelopeError converts common golang error into EnvelopeError with a code.
func ErrorToEnvelopeError(hash common.Hash, err error) EnvelopeError {
code := EnvelopeOtherError
switch err.(type) {
case TimeSyncError:
code = EnvelopeTimeNotSynced
}
return EnvelopeError{
Hash: hash,
Code: code,
Description: err.Error(),
}
}
// MessagesResponse sent as a response after processing batch of envelopes.
type MessagesResponse struct {
// Hash is a hash of all envelopes sent in the single batch.
Hash common.Hash
// Per envelope error.
Errors []EnvelopeError
}
func (msg *ReceivedMessage) isSymmetricEncryption() bool {
return msg.SymKeyHash != common.Hash{}
}
func (msg *ReceivedMessage) isAsymmetricEncryption() bool {
return msg.Dst != nil
}
// MessageStore defines interface for temporary message store.
type MessageStore interface {
Add(*ReceivedMessage) error
Pop() ([]*ReceivedMessage, error)
}
// NewMemoryMessageStore returns pointer to an instance of the MemoryMessageStore.
func NewMemoryMessageStore() *MemoryMessageStore {
return &MemoryMessageStore{
messages: map[common.Hash]*ReceivedMessage{},
}
}
// MemoryMessageStore represents messages stored in a memory hash table.
type MemoryMessageStore struct {
mu sync.Mutex
messages map[common.Hash]*ReceivedMessage
}
func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMessage {
ct, err := ExtractTopicFromContentTopic(env.Message().ContentTopic)
if err != nil {
log.Debug("failed to extract content topic from message", "topic", env.Message().ContentTopic, "err", err)
return nil
}
return &ReceivedMessage{
Envelope: env,
MsgType: msgType,
Sent: uint32(env.Message().GetTimestamp() / int64(time.Second)),
ContentTopic: ct,
PubsubTopic: env.PubsubTopic(),
}
}
// Hash returns the SHA3 hash of the envelope, calculating it if not yet done.
func (msg *ReceivedMessage) Hash() common.Hash {
if (msg.hash == common.Hash{}) {
msg.hash = common.BytesToHash(msg.Envelope.Hash().Bytes())
}
return msg.hash
}
// Add adds message to store.
func (store *MemoryMessageStore) Add(msg *ReceivedMessage) error {
store.mu.Lock()
defer store.mu.Unlock()
if _, exist := store.messages[msg.Hash()]; !exist {
store.messages[msg.Hash()] = msg
}
return nil
}
// Pop returns all available messages and cleans the store.
func (store *MemoryMessageStore) Pop() ([]*ReceivedMessage, error) {
store.mu.Lock()
defer store.mu.Unlock()
all := make([]*ReceivedMessage, 0, len(store.messages))
for hash, msg := range store.messages {
delete(store.messages, hash)
all = append(all, msg)
}
return all, nil
}
// Open tries to decrypt an message, and populates the message fields in case of success.
func (msg *ReceivedMessage) Open(watcher *Filter) (result *ReceivedMessage) {
if watcher == nil {
return nil
}
// The API interface forbids filters doing both symmetric and asymmetric encryption.
if watcher.expectsAsymmetricEncryption() && watcher.expectsSymmetricEncryption() {
return nil
}
// TODO: should we update msg instead of creating a new received message?
result = new(ReceivedMessage)
keyInfo := new(payload.KeyInfo)
if watcher.expectsAsymmetricEncryption() {
keyInfo.Kind = payload.Asymmetric
keyInfo.PrivKey = watcher.KeyAsym
msg.Dst = &watcher.KeyAsym.PublicKey
} else if watcher.expectsSymmetricEncryption() {
keyInfo.Kind = payload.Symmetric
keyInfo.SymKey = watcher.KeySym
msg.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
}
raw, err := payload.DecodePayload(msg.Envelope.Message(), keyInfo)
if err != nil {
log.Error("failed to decode message", "err", err)
return nil
}
result.Envelope = msg.Envelope
result.Data = raw.Data
result.Padding = raw.Padding
result.Signature = raw.Signature
result.Src = raw.PubKey
result.Sent = uint32(msg.Envelope.Message().GetTimestamp() / int64(time.Second))
ct, err := ExtractTopicFromContentTopic(msg.Envelope.Message().ContentTopic)
if err != nil {
log.Error("failed to decode message", "err", err)
return nil
}
result.PubsubTopic = watcher.PubsubTopic
result.ContentTopic = ct
return result
}