feat: Mark messages as confirmed in waku2 (#2449)

This commit is contained in:
Anthony Laibe 2021-12-01 16:15:18 +01:00 committed by GitHub
parent 605a19ceb3
commit 268c8304a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 16 deletions

View File

@ -256,12 +256,11 @@ func (w *gethWakuV2Wrapper) DropPeer(peerID string) error {
} }
func (w *gethWakuV2Wrapper) ProcessingP2PMessages() bool { func (w *gethWakuV2Wrapper) ProcessingP2PMessages() bool {
// NOTE: not implemented for waku 2 return w.waku.ProcessingP2PMessages()
return false
} }
func (w *gethWakuV2Wrapper) MarkP2PMessageAsProcessed(hash common.Hash) { func (w *gethWakuV2Wrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
// NOTE: not implemented for waku 2 w.waku.MarkP2PMessageAsProcessed(hash)
} }
type wakuV2FilterWrapper struct { type wakuV2FilterWrapper struct {

View File

@ -159,7 +159,7 @@ func (fs *Filters) Get(id string) *Filter {
// NotifyWatchers notifies any filter that has declared interest // NotifyWatchers notifies any filter that has declared interest
// for the envelope's topic. // for the envelope's topic.
func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) { func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) bool {
var decodedMsg *ReceivedMessage var decodedMsg *ReceivedMessage
fs.mutex.RLock() fs.mutex.RLock()
@ -168,28 +168,31 @@ func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) {
topic, err := ExtractTopicFromContentTopic(recvMessage.Envelope.Message().ContentTopic) topic, err := ExtractTopicFromContentTopic(recvMessage.Envelope.Message().ContentTopic)
if err != nil { if err != nil {
log.Trace(err.Error(), "topic", recvMessage.Envelope.Message().ContentTopic) log.Trace(err.Error(), "topic", recvMessage.Envelope.Message().ContentTopic)
return return false
} }
var matched bool
candidates := fs.GetWatchersByTopic(*topic) candidates := fs.GetWatchersByTopic(*topic)
for _, watcher := range candidates { for _, watcher := range candidates {
match := true matched = true
if decodedMsg == nil { if decodedMsg == nil {
decodedMsg = recvMessage.Open(watcher) decodedMsg = recvMessage.Open(watcher)
if decodedMsg == nil { if decodedMsg == nil {
log.Trace("processing message: failed to open", "message", recvMessage.Hash().Hex(), "filter", watcher.id) log.Trace("processing message: failed to open", "message", recvMessage.Hash().Hex(), "filter", watcher.id)
} }
} else { } else {
match = watcher.MatchMessage(decodedMsg) matched = watcher.MatchMessage(decodedMsg)
} }
if match && decodedMsg != nil { if matched && decodedMsg != nil {
log.Trace("processing message: decrypted", "hash", recvMessage.Hash().Hex()) log.Trace("processing message: decrypted", "hash", recvMessage.Hash().Hex())
if watcher.Src == nil || IsPubKeyEqual(decodedMsg.Src, watcher.Src) { if watcher.Src == nil || IsPubKeyEqual(decodedMsg.Src, watcher.Src) {
watcher.Trigger(decodedMsg) watcher.Trigger(decodedMsg)
} }
} }
} }
return matched
} }
func (f *Filter) expectsAsymmetricEncryption() bool { func (f *Filter) expectsAsymmetricEncryption() bool {

View File

@ -14,6 +14,14 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// MessageType represents where this message comes from
type MessageType int
const (
RelayedMessageType MessageType = iota
StoreMessageType
)
// MessageParams specifies the exact way a message should be wrapped // MessageParams specifies the exact way a message should be wrapped
// into an Envelope. // into an Envelope.
type MessageParams struct { type MessageParams struct {
@ -30,6 +38,8 @@ type MessageParams struct {
type ReceivedMessage struct { type ReceivedMessage struct {
Envelope *protocol.Envelope // Wrapped Waku Message Envelope *protocol.Envelope // Wrapped Waku Message
MsgType MessageType
Data []byte Data []byte
Padding []byte Padding []byte
Signature []byte Signature []byte
@ -140,9 +150,10 @@ type MemoryMessageStore struct {
messages map[common.Hash]*ReceivedMessage messages map[common.Hash]*ReceivedMessage
} }
func NewReceivedMessage(env *protocol.Envelope) *ReceivedMessage { func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMessage {
return &ReceivedMessage{ return &ReceivedMessage{
Envelope: env, Envelope: env,
MsgType: msgType,
} }
} }

View File

@ -123,6 +123,9 @@ type Waku struct {
envelopeFeed event.Feed envelopeFeed event.Feed
storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids
storeMsgIDsMu sync.RWMutex
timeSource func() time.Time // source of time for waku timeSource func() time.Time // source of time for waku
logger *zap.Logger logger *zap.Logger
@ -148,6 +151,8 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
quit: make(chan struct{}), quit: make(chan struct{}),
dnsAddressCache: make(map[string][]multiaddr.Multiaddr), dnsAddressCache: make(map[string][]multiaddr.Multiaddr),
dnsAddressCacheLock: &sync.RWMutex{}, dnsAddressCacheLock: &sync.RWMutex{},
storeMsgIDs: make(map[gethcommon.Hash]bool),
storeMsgIDsMu: sync.RWMutex{},
timeSource: time.Now, timeSource: time.Now,
logger: logger, logger: logger,
} }
@ -384,7 +389,7 @@ func (w *Waku) runRelayMsgLoop() {
} }
for env := range sub.C { for env := range sub.C {
envelopeErrors, err := w.OnNewEnvelopes(env) envelopeErrors, err := w.OnNewEnvelopes(env, common.RelayedMessageType)
// TODO: should these be handled? // TODO: should these be handled?
_ = envelopeErrors _ = envelopeErrors
_ = err _ = err
@ -401,7 +406,7 @@ func (w *Waku) runFilterMsgLoop() {
case <-w.quit: case <-w.quit:
return return
case env := <-w.filterMsgChannel: case env := <-w.filterMsgChannel:
envelopeErrors, err := w.OnNewEnvelopes(env) envelopeErrors, err := w.OnNewEnvelopes(env, common.RelayedMessageType)
// TODO: should these be handled? // TODO: should these be handled?
_ = envelopeErrors _ = envelopeErrors
_ = err _ = err
@ -835,7 +840,7 @@ func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
w.poolMu.Unlock() w.poolMu.Unlock()
if !alreadyCached { if !alreadyCached {
envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic) envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic)
recvMessage := common.NewReceivedMessage(envelope) recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType)
w.postEvent(recvMessage) // notify the local node about the new message w.postEvent(recvMessage) // notify the local node about the new message
w.addEnvelope(recvMessage) w.addEnvelope(recvMessage)
} }
@ -867,7 +872,7 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
for _, msg := range result.Messages { for _, msg := range result.Messages {
envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic) envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic)
w.logger.Debug("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash()))) w.logger.Debug("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())))
_, err = w.OnNewEnvelopes(envelope) _, err = w.OnNewEnvelopes(envelope, common.StoreMessageType)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -902,8 +907,8 @@ func (w *Waku) Stop() error {
return nil return nil
} }
func (w *Waku) OnNewEnvelopes(envelope *wakuprotocol.Envelope) ([]common.EnvelopeError, error) { func (w *Waku) OnNewEnvelopes(envelope *wakuprotocol.Envelope, msgType common.MessageType) ([]common.EnvelopeError, error) {
recvMessage := common.NewReceivedMessage(envelope) recvMessage := common.NewReceivedMessage(envelope, msgType)
envelopeErrors := make([]common.EnvelopeError, 0) envelopeErrors := make([]common.EnvelopeError, 0)
w.logger.Debug("received new envelope") w.logger.Debug("received new envelope")
@ -969,7 +974,23 @@ func (w *Waku) processQueue() {
case <-w.quit: case <-w.quit:
return return
case e := <-w.msgQueue: case e := <-w.msgQueue:
w.filters.NotifyWatchers(e) if e.MsgType == common.StoreMessageType {
// We need to insert it first, and then remove it if not matched,
// as messages are processed asynchronously
w.storeMsgIDsMu.Lock()
w.storeMsgIDs[e.Hash()] = true
w.storeMsgIDsMu.Unlock()
}
matched := w.filters.NotifyWatchers(e)
// If not matched we remove it
if !matched {
w.storeMsgIDsMu.Lock()
delete(w.storeMsgIDs, e.Hash())
w.storeMsgIDsMu.Unlock()
}
w.envelopeFeed.Send(common.EnvelopeEvent{ w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: e.Topic, Topic: e.Topic,
Hash: e.Hash(), Hash: e.Hash(),
@ -1075,6 +1096,18 @@ func (w *Waku) DropPeer(peerID string) error {
return w.node.ClosePeerById(peer.ID(peerID)) return w.node.ClosePeerById(peer.ID(peerID))
} }
func (w *Waku) ProcessingP2PMessages() bool {
w.storeMsgIDsMu.Lock()
defer w.storeMsgIDsMu.Unlock()
return len(w.storeMsgIDs) != 0
}
func (w *Waku) MarkP2PMessageAsProcessed(hash gethcommon.Hash) {
w.storeMsgIDsMu.Lock()
defer w.storeMsgIDsMu.Unlock()
delete(w.storeMsgIDs, hash)
}
// validatePrivateKey checks the format of the given private key. // validatePrivateKey checks the format of the given private key.
func validatePrivateKey(k *ecdsa.PrivateKey) bool { func validatePrivateKey(k *ecdsa.PrivateKey) bool {
if k == nil || k.D == nil || k.D.Sign() == 0 { if k == nil || k.D == nil || k.D.Sign() == 0 {