2021-06-16 20:19:45 +00:00
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package common
import (
"crypto/ecdsa"
"fmt"
"sync"
2023-09-13 10:50:23 +00:00
"go.uber.org/zap"
"golang.org/x/exp/maps"
2023-05-22 21:38:02 +00:00
2023-10-12 19:21:49 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
2021-06-16 20:19:45 +00:00
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
// Filter represents a Waku message filter
type Filter struct {
2023-09-13 10:50:23 +00:00
Src * ecdsa . PublicKey // Sender of the message
KeyAsym * ecdsa . PrivateKey // Private Key of recipient
KeySym [ ] byte // Key associated with the Topic
PubsubTopic string // Pubsub topic used to filter messages with
ContentTopics TopicSet // ContentTopics to filter messages with
SymKeyHash common . Hash // The Keccak256Hash of the symmetric key, needed for optimization
id string // unique identifier
2021-06-16 20:19:45 +00:00
Messages MessageStore
}
2023-05-22 21:38:02 +00:00
type FilterSet = map [ * Filter ] struct { }
type ContentTopicToFilter = map [ TopicType ] FilterSet
type PubsubTopicToContentTopic = map [ string ] ContentTopicToFilter
2021-06-16 20:19:45 +00:00
// Filters represents a collection of filters
type Filters struct {
2023-09-13 10:50:23 +00:00
// Map of random ID to Filter
2021-06-16 20:19:45 +00:00
watchers map [ string ] * Filter
2023-09-13 10:50:23 +00:00
// map a topic to the filters that are interested in being notified when a message matches that topic
topicMatcher PubsubTopicToContentTopic
2021-06-16 20:19:45 +00:00
2023-09-13 10:50:23 +00:00
// list all the filters that will be notified of a new message, no matter what its topic is
allTopicsMatcher map [ * Filter ] struct { }
logger * zap . Logger
sync . RWMutex
2021-06-16 20:19:45 +00:00
}
// NewFilters returns a newly created filter collection
2023-09-13 10:50:23 +00:00
func NewFilters ( logger * zap . Logger ) * Filters {
2021-06-16 20:19:45 +00:00
return & Filters {
watchers : make ( map [ string ] * Filter ) ,
2023-05-22 21:38:02 +00:00
topicMatcher : make ( PubsubTopicToContentTopic ) ,
2021-06-16 20:19:45 +00:00
allTopicsMatcher : make ( map [ * Filter ] struct { } ) ,
2023-09-13 10:50:23 +00:00
logger : logger ,
2021-06-16 20:19:45 +00:00
}
}
// Install will add a new filter to the filter collection
func ( fs * Filters ) Install ( watcher * Filter ) ( string , error ) {
if watcher . KeySym != nil && watcher . KeyAsym != nil {
return "" , fmt . Errorf ( "filters must choose between symmetric and asymmetric keys" )
}
id , err := GenerateRandomID ( )
if err != nil {
return "" , err
}
2023-09-13 10:50:23 +00:00
fs . Lock ( )
defer fs . Unlock ( )
2021-06-16 20:19:45 +00:00
if fs . watchers [ id ] != nil {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
if watcher . expectsSymmetricEncryption ( ) {
watcher . SymKeyHash = crypto . Keccak256Hash ( watcher . KeySym )
}
watcher . id = id
2023-09-13 10:50:23 +00:00
2021-06-16 20:19:45 +00:00
fs . watchers [ id ] = watcher
fs . addTopicMatcher ( watcher )
2023-09-13 10:50:23 +00:00
fs . logger . Debug ( "filters install" , zap . String ( "id" , id ) )
2021-06-16 20:19:45 +00:00
return id , err
}
// Uninstall will remove a filter whose id has been specified from
// the filter collection
func ( fs * Filters ) Uninstall ( id string ) bool {
2023-09-13 10:50:23 +00:00
fs . Lock ( )
defer fs . Unlock ( )
watcher := fs . watchers [ id ]
if watcher != nil {
fs . removeFromTopicMatchers ( watcher )
2021-06-16 20:19:45 +00:00
delete ( fs . watchers , id )
2023-09-13 10:50:23 +00:00
fs . logger . Debug ( "filters uninstall" , zap . String ( "id" , id ) )
2021-06-16 20:19:45 +00:00
return true
}
return false
}
func ( fs * Filters ) AllTopics ( ) [ ] TopicType {
var topics [ ] TopicType
2023-09-13 10:50:23 +00:00
fs . Lock ( )
defer fs . Unlock ( )
2023-05-22 21:38:02 +00:00
for _ , topicsPerPubsubTopic := range fs . topicMatcher {
for t := range topicsPerPubsubTopic {
topics = append ( topics , t )
}
2021-06-16 20:19:45 +00:00
}
return topics
}
// addTopicMatcher adds a filter to the topic matchers.
// If the filter's Topics array is empty, it will be tried on every topic.
// Otherwise, it will be tried on the topics specified.
func ( fs * Filters ) addTopicMatcher ( watcher * Filter ) {
2023-09-13 10:50:23 +00:00
if len ( watcher . ContentTopics ) == 0 && ( watcher . PubsubTopic == relay . DefaultWakuTopic || watcher . PubsubTopic == "" ) {
2021-06-16 20:19:45 +00:00
fs . allTopicsMatcher [ watcher ] = struct { } { }
} else {
2023-05-22 21:38:02 +00:00
filtersPerContentTopic , ok := fs . topicMatcher [ watcher . PubsubTopic ]
if ! ok {
filtersPerContentTopic = make ( ContentTopicToFilter )
}
2023-09-13 10:50:23 +00:00
for topic := range watcher . ContentTopics {
2023-05-22 21:38:02 +00:00
if filtersPerContentTopic [ topic ] == nil {
filtersPerContentTopic [ topic ] = make ( FilterSet )
2021-06-16 20:19:45 +00:00
}
2023-05-22 21:38:02 +00:00
filtersPerContentTopic [ topic ] [ watcher ] = struct { } { }
2021-06-16 20:19:45 +00:00
}
2023-05-22 21:38:02 +00:00
fs . topicMatcher [ watcher . PubsubTopic ] = filtersPerContentTopic
2021-06-16 20:19:45 +00:00
}
}
// removeFromTopicMatchers removes a filter from the topic matchers
func ( fs * Filters ) removeFromTopicMatchers ( watcher * Filter ) {
delete ( fs . allTopicsMatcher , watcher )
2023-05-22 21:38:02 +00:00
filtersPerContentTopic , ok := fs . topicMatcher [ watcher . PubsubTopic ]
if ! ok {
return
}
2023-09-13 10:50:23 +00:00
for topic := range watcher . ContentTopics {
2023-05-22 21:38:02 +00:00
delete ( filtersPerContentTopic [ topic ] , watcher )
2021-06-16 20:19:45 +00:00
}
2023-05-22 21:38:02 +00:00
fs . topicMatcher [ watcher . PubsubTopic ] = filtersPerContentTopic
2021-06-16 20:19:45 +00:00
}
// GetWatchersByTopic returns a slice containing the filters that
// match a specific topic
2023-05-22 21:38:02 +00:00
func ( fs * Filters ) GetWatchersByTopic ( pubsubTopic string , contentTopic TopicType ) [ ] * Filter {
2021-06-16 20:19:45 +00:00
res := make ( [ ] * Filter , 0 , len ( fs . allTopicsMatcher ) )
for watcher := range fs . allTopicsMatcher {
res = append ( res , watcher )
}
2023-05-22 21:38:02 +00:00
filtersPerContentTopic , ok := fs . topicMatcher [ pubsubTopic ]
if ! ok {
return res
}
for watcher := range filtersPerContentTopic [ contentTopic ] {
2021-06-16 20:19:45 +00:00
res = append ( res , watcher )
}
return res
}
// Get returns a filter from the collection with a specific ID
func ( fs * Filters ) Get ( id string ) * Filter {
2023-09-13 10:50:23 +00:00
fs . RLock ( )
defer fs . RUnlock ( )
2021-06-16 20:19:45 +00:00
return fs . watchers [ id ]
}
2023-09-13 10:50:23 +00:00
func ( fs * Filters ) GetFilters ( ) map [ string ] * Filter {
fs . RLock ( )
defer fs . RUnlock ( )
return maps . Clone ( fs . watchers )
}
2021-06-16 20:19:45 +00:00
// NotifyWatchers notifies any filter that has declared interest
// for the envelope's topic.
2021-12-01 15:15:18 +00:00
func ( fs * Filters ) NotifyWatchers ( recvMessage * ReceivedMessage ) bool {
2021-06-16 20:19:45 +00:00
var decodedMsg * ReceivedMessage
2023-09-13 10:50:23 +00:00
fs . RLock ( )
defer fs . RUnlock ( )
2021-06-16 20:19:45 +00:00
2021-12-01 15:15:18 +00:00
var matched bool
2023-05-22 21:38:02 +00:00
candidates := fs . GetWatchersByTopic ( recvMessage . PubsubTopic , recvMessage . ContentTopic )
2022-12-12 22:24:36 +00:00
if len ( candidates ) == 0 {
2023-05-22 21:38:02 +00:00
log . Debug ( "no filters available for this topic" , "message" , recvMessage . Hash ( ) . Hex ( ) , "pubsubTopic" , recvMessage . PubsubTopic , "contentTopic" , recvMessage . ContentTopic . String ( ) )
2022-12-12 22:24:36 +00:00
}
2021-06-16 20:19:45 +00:00
for _ , watcher := range candidates {
2021-12-01 15:15:18 +00:00
matched = true
2021-06-16 20:19:45 +00:00
if decodedMsg == nil {
decodedMsg = recvMessage . Open ( watcher )
if decodedMsg == nil {
2022-12-12 22:24:36 +00:00
log . Debug ( "processing message: failed to open" , "message" , recvMessage . Hash ( ) . Hex ( ) , "filter" , watcher . id )
2021-06-16 20:19:45 +00:00
}
} else {
2021-12-01 15:15:18 +00:00
matched = watcher . MatchMessage ( decodedMsg )
2021-06-16 20:19:45 +00:00
}
2021-12-01 15:15:18 +00:00
if matched && decodedMsg != nil {
2022-12-12 22:24:36 +00:00
log . Debug ( "processing message: decrypted" , "hash" , recvMessage . Hash ( ) . Hex ( ) )
2021-06-16 20:19:45 +00:00
if watcher . Src == nil || IsPubKeyEqual ( decodedMsg . Src , watcher . Src ) {
watcher . Trigger ( decodedMsg )
}
}
}
2021-12-01 15:15:18 +00:00
return matched
2021-06-16 20:19:45 +00:00
}
func ( f * Filter ) expectsAsymmetricEncryption ( ) bool {
return f . KeyAsym != nil
}
func ( f * Filter ) expectsSymmetricEncryption ( ) bool {
return f . KeySym != nil
}
// Trigger adds a yet-unknown message to the filter's list of
// received messages.
func ( f * Filter ) Trigger ( msg * ReceivedMessage ) {
err := f . Messages . Add ( msg )
if err != nil {
log . Error ( "failed to add msg into the filters store" , "hash" , msg . Hash ( ) , "error" , err )
}
}
// Retrieve will return the list of all received messages associated
// to a filter.
func ( f * Filter ) Retrieve ( ) [ ] * ReceivedMessage {
msgs , err := f . Messages . Pop ( )
if err != nil {
log . Error ( "failed to retrieve messages from filter store" , "error" , err )
return nil
}
return msgs
}
// MatchMessage checks if the filter matches an already decrypted
// message (i.e. a Message that has already been handled by
// MatchEnvelope when checked by a previous filter).
// Topics are not checked here, since this is done by topic matchers.
func ( f * Filter ) MatchMessage ( msg * ReceivedMessage ) bool {
if f . expectsAsymmetricEncryption ( ) && msg . isAsymmetricEncryption ( ) {
return IsPubKeyEqual ( & f . KeyAsym . PublicKey , msg . Dst )
} else if f . expectsSymmetricEncryption ( ) && msg . isSymmetricEncryption ( ) {
return f . SymKeyHash == msg . SymKeyHash
}
return false
}