add some files although still does not compile

This commit is contained in:
Ivan Folgueira Bande 2024-11-20 18:12:18 +01:00
commit a0b2dec87f
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
21 changed files with 9305 additions and 0 deletions

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "third_party/nwaku"]
path = third_party/nwaku
url = https://github.com/waku-org/nwaku

3
common/README.md Normal file
View File

@ -0,0 +1,3 @@
# Waku Common
[See here](../README.md#common)

28
common/const.go Normal file
View File

@ -0,0 +1,28 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package common
// Waku protocol parameters
const (
TopicLength = 4 // in bytes
AESKeyLength = 32 // in bytes
KeyIDSize = 32 // in bytes
DefaultMaxMessageSize = uint32(1 << 20) // DefaultMaximumMessageSize is 1mb.
)

56
common/events.go Normal file
View File

@ -0,0 +1,56 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package common
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
)
// EventType used to define known waku events.
type EventType string
const (
// EventEnvelopeSent fires when envelope was sent to a peer.
EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired"
// EventEnvelopeReceived is sent once envelope was received from a peer.
// EventEnvelopeReceived must be sent to the feed even if envelope was previously in the cache.
// And event, ideally, should contain information about peer that sent envelope to us.
EventEnvelopeReceived EventType = "envelope.received"
// EventBatchAcknowledged is sent when batch of envelopes was acknowledged by a peer.
EventBatchAcknowledged EventType = "batch.acknowledged"
// EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available"
)
// EnvelopeEvent represents an envelope event.
type EnvelopeEvent struct {
Event EventType
Topic TopicType
Hash common.Hash
Batch common.Hash
Peer enode.ID
Data interface{}
}

310
common/filter.go Normal file
View File

@ -0,0 +1,310 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package common
import (
"crypto/ecdsa"
"fmt"
"sync"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-go/logutils"
)
// Filter represents a Waku message filter
type Filter struct {
Src *ecdsa.PublicKey // Sender of the message
KeyAsym *ecdsa.PrivateKey // Private Key of recipient
KeySym []byte // Key associated with the Topic
PubsubTopic string // Pubsub topic used to filter messages with
ContentTopics TopicSet // ContentTopics to filter messages with
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
id string // unique identifier
Messages MessageStore
}
type FilterSet = map[*Filter]struct{}
type ContentTopicToFilter = map[TopicType]FilterSet
type PubsubTopicToContentTopic = map[string]ContentTopicToFilter
// Filters represents a collection of filters
type Filters struct {
// Map of random ID to Filter
watchers map[string]*Filter
// Pubsub topic to use when no pubsub topic is specified on a filter
defaultPubsubTopic string
// map a topic to the filters that are interested in being notified when a message matches that topic
topicMatcher PubsubTopicToContentTopic
// list all the filters that will be notified of a new message, no matter what its topic is
allTopicsMatcher map[*Filter]struct{}
logger *zap.Logger
sync.RWMutex
}
// NewFilters returns a newly created filter collection
func NewFilters(defaultPubsubTopic string, logger *zap.Logger) *Filters {
return &Filters{
watchers: make(map[string]*Filter),
topicMatcher: make(PubsubTopicToContentTopic),
allTopicsMatcher: make(map[*Filter]struct{}),
defaultPubsubTopic: defaultPubsubTopic,
logger: logger,
}
}
// Install will add a new filter to the filter collection
func (fs *Filters) Install(watcher *Filter) (string, error) {
if watcher.KeySym != nil && watcher.KeyAsym != nil {
return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys")
}
id, err := GenerateRandomID()
if err != nil {
return "", err
}
fs.Lock()
defer fs.Unlock()
if fs.watchers[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
if watcher.expectsSymmetricEncryption() {
watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
}
watcher.id = id
fs.watchers[id] = watcher
fs.addTopicMatcher(watcher)
fs.logger.Debug("filters install", zap.String("id", id))
return id, err
}
// Uninstall will remove a filter whose id has been specified from
// the filter collection
func (fs *Filters) Uninstall(id string) bool {
fs.Lock()
defer fs.Unlock()
watcher := fs.watchers[id]
if watcher != nil {
fs.removeFromTopicMatchers(watcher)
delete(fs.watchers, id)
fs.logger.Debug("filters uninstall", zap.String("id", id))
return true
}
return false
}
func (fs *Filters) AllTopics() []TopicType {
var topics []TopicType
fs.Lock()
defer fs.Unlock()
for _, topicsPerPubsubTopic := range fs.topicMatcher {
for t := range topicsPerPubsubTopic {
topics = append(topics, t)
}
}
return topics
}
// addTopicMatcher adds a filter to the topic matchers.
// If the filter's Topics array is empty, it will be tried on every topic.
// Otherwise, it will be tried on the topics specified.
func (fs *Filters) addTopicMatcher(watcher *Filter) {
if len(watcher.ContentTopics) == 0 && (watcher.PubsubTopic == fs.defaultPubsubTopic || watcher.PubsubTopic == "") {
fs.allTopicsMatcher[watcher] = struct{}{}
} else {
filtersPerContentTopic, ok := fs.topicMatcher[watcher.PubsubTopic]
if !ok {
filtersPerContentTopic = make(ContentTopicToFilter)
}
for topic := range watcher.ContentTopics {
if filtersPerContentTopic[topic] == nil {
filtersPerContentTopic[topic] = make(FilterSet)
}
filtersPerContentTopic[topic][watcher] = struct{}{}
}
fs.topicMatcher[watcher.PubsubTopic] = filtersPerContentTopic
}
}
// removeFromTopicMatchers removes a filter from the topic matchers
func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
delete(fs.allTopicsMatcher, watcher)
filtersPerContentTopic, ok := fs.topicMatcher[watcher.PubsubTopic]
if !ok {
return
}
for topic := range watcher.ContentTopics {
delete(filtersPerContentTopic[topic], watcher)
}
fs.topicMatcher[watcher.PubsubTopic] = filtersPerContentTopic
}
// GetWatchersByTopic returns a slice containing the filters that
// match a specific topic
func (fs *Filters) GetWatchersByTopic(pubsubTopic string, contentTopic TopicType) []*Filter {
res := make([]*Filter, 0, len(fs.allTopicsMatcher))
for watcher := range fs.allTopicsMatcher {
res = append(res, watcher)
}
filtersPerContentTopic, ok := fs.topicMatcher[pubsubTopic]
if !ok {
return res
}
for watcher := range filtersPerContentTopic[contentTopic] {
res = append(res, watcher)
}
return res
}
// Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter {
fs.RLock()
defer fs.RUnlock()
return fs.watchers[id]
}
func (fs *Filters) All() []*Filter {
fs.RLock()
defer fs.RUnlock()
var filters []*Filter
for _, f := range fs.watchers {
filters = append(filters, f)
}
return filters
}
func (fs *Filters) GetFilters() map[string]*Filter {
fs.RLock()
defer fs.RUnlock()
return maps.Clone(fs.watchers)
}
// NotifyWatchers notifies any filter that has declared interest
// for the envelope's topic.
func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) bool {
var decodedMsg *ReceivedMessage
fs.RLock()
defer fs.RUnlock()
var matched bool
candidates := fs.GetWatchersByTopic(recvMessage.PubsubTopic, recvMessage.ContentTopic)
if len(candidates) == 0 {
logutils.ZapLogger().Debug("no filters available for this topic",
zap.Stringer("message", recvMessage.Hash()),
zap.String("pubsubTopic", recvMessage.PubsubTopic),
zap.Stringer("contentTopic", &recvMessage.ContentTopic),
)
}
for _, watcher := range candidates {
// Messages are decrypted successfully only once
if decodedMsg == nil {
decodedMsg = recvMessage.Open(watcher)
if decodedMsg == nil {
logutils.ZapLogger().Debug("processing message: failed to open",
zap.Stringer("message", recvMessage.Hash()),
zap.String("filter", watcher.id),
)
continue
}
}
if watcher.MatchMessage(decodedMsg) {
matched = true
logutils.ZapLogger().Debug("processing message: decrypted", zap.Stringer("envelopeHash", recvMessage.Hash()))
if watcher.Src == nil || IsPubKeyEqual(decodedMsg.Src, watcher.Src) {
watcher.Trigger(decodedMsg)
}
}
}
return matched
}
func (f *Filter) expectsAsymmetricEncryption() bool {
return f.KeyAsym != nil
}
func (f *Filter) expectsSymmetricEncryption() bool {
return f.KeySym != nil
}
// Trigger adds a yet-unknown message to the filter's list of
// received messages.
func (f *Filter) Trigger(msg *ReceivedMessage) {
err := f.Messages.Add(msg)
if err != nil {
logutils.ZapLogger().Error("failed to add msg into the filters store",
zap.Stringer("hash", msg.Hash()),
zap.Error(err),
)
}
}
// Retrieve will return the list of all received messages associated
// to a filter.
func (f *Filter) Retrieve() []*ReceivedMessage {
msgs, err := f.Messages.Pop()
if err != nil {
logutils.ZapLogger().Error("failed to retrieve messages from filter store", zap.Error(err))
return nil
}
return msgs
}
// MatchMessage checks if the filter matches an already decrypted
// message (i.e. a Message that has already been handled by
// MatchEnvelope when checked by a previous filter).
// Topics are not checked here, since this is done by topic matchers.
func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() {
return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst)
} else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() {
return f.SymKeyHash == msg.SymKeyHash
} else if !f.expectsAsymmetricEncryption() && !f.expectsSymmetricEncryption() && !msg.isAsymmetricEncryption() && !msg.isSymmetricEncryption() {
return true
}
return false
}

316
common/filter_test.go Normal file
View File

@ -0,0 +1,316 @@
package common
import (
crand "crypto/rand"
mrand "math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
const testShard = "/waku/2/rs/16/32"
type FilterTestCase struct {
f *Filter
id string
alive bool
msgCnt int
}
func createLogger(t *testing.T) *zap.Logger {
config := zap.NewDevelopmentConfig()
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, err := config.Build()
require.NoError(t, err)
return logger
}
func generateFilter(t *testing.T, symmetric bool) (*Filter, error) {
var f Filter
f.Messages = NewMemoryMessageStore()
f.PubsubTopic = "test"
const topicNum = 8
f.ContentTopics = make(TopicSet, topicNum)
for i := 0; i < topicNum; i++ {
topic := make([]byte, 4)
_, err := crand.Read(topic) // nolint: gosec
require.NoError(t, err)
topic[0] = 0x01
f.ContentTopics[BytesToTopic(topic)] = struct{}{}
}
key, err := crypto.GenerateKey()
require.NoError(t, err)
f.Src = &key.PublicKey
if symmetric {
f.KeySym = make([]byte, AESKeyLength)
_, err := crand.Read(f.KeySym) // nolint: gosec
require.NoError(t, err)
f.SymKeyHash = crypto.Keccak256Hash(f.KeySym)
} else {
f.KeyAsym, err = crypto.GenerateKey()
require.NoError(t, err)
}
return &f, nil
}
func generateTestCases(t *testing.T, SizeTestFilters int) []FilterTestCase {
cases := make([]FilterTestCase, SizeTestFilters)
for i := 0; i < SizeTestFilters; i++ {
f, _ := generateFilter(t, true)
cases[i].f = f
cases[i].alive = mrand.Int()&1 == 0 // nolint: gosec
}
return cases
}
func TestInstallFilters(t *testing.T) {
const SizeTestFilters = 256
filters := NewFilters(testShard, createLogger(t))
tst := generateTestCases(t, SizeTestFilters)
var err error
var j string
for i := 0; i < SizeTestFilters; i++ {
j, err = filters.Install(tst[i].f)
require.NoError(t, err)
tst[i].id = j
require.Len(t, j, KeyIDSize*2)
}
for _, testCase := range tst {
if !testCase.alive {
filters.Uninstall(testCase.id)
}
}
for _, testCase := range tst {
fil := filters.Get(testCase.id)
exist := fil != nil
require.Equal(t, exist, testCase.alive)
}
}
func TestInstallSymKeyGeneratesHash(t *testing.T) {
filters := NewFilters(testShard, createLogger(t))
filter, _ := generateFilter(t, true)
// save the current SymKeyHash for comparison
initialSymKeyHash := filter.SymKeyHash
// ensure the SymKeyHash is invalid, for Install to recreate it
var invalid common.Hash
filter.SymKeyHash = invalid
_, err := filters.Install(filter)
require.NoError(t, err)
for i, b := range filter.SymKeyHash {
require.Equal(t, b, initialSymKeyHash[i])
}
}
func TestInstallIdenticalFilters(t *testing.T) {
filters := NewFilters(testShard, createLogger(t))
filter1, _ := generateFilter(t, true)
// Copy the first filter since some of its fields
// are randomly gnerated.
filter2 := &Filter{
KeySym: filter1.KeySym,
PubsubTopic: filter1.PubsubTopic,
ContentTopics: filter1.ContentTopics,
Messages: NewMemoryMessageStore(),
}
_, err := filters.Install(filter1)
require.NoError(t, err)
_, err = filters.Install(filter2)
require.NoError(t, err)
recvMessage := generateCompatibleReceivedMessage(t, filter1)
msg := recvMessage.Open(filter1)
require.NotNil(t, msg)
}
func TestInstallFilterWithSymAndAsymKeys(t *testing.T) {
filters := NewFilters(testShard, createLogger(t))
filter1, _ := generateFilter(t, true)
asymKey, err := crypto.GenerateKey()
require.NoError(t, err)
// Copy the first filter since some of its fields
// are randomly gnerated.
filter := &Filter{
KeySym: filter1.KeySym,
KeyAsym: asymKey,
PubsubTopic: filter1.PubsubTopic,
ContentTopics: filter1.ContentTopics,
Messages: NewMemoryMessageStore(),
}
_, err = filters.Install(filter)
require.Error(t, err)
}
func cloneFilter(orig *Filter) *Filter {
var clone Filter
clone.Messages = NewMemoryMessageStore()
clone.Src = orig.Src
clone.KeyAsym = orig.KeyAsym
clone.KeySym = orig.KeySym
clone.PubsubTopic = orig.PubsubTopic
clone.ContentTopics = orig.ContentTopics
clone.SymKeyHash = orig.SymKeyHash
return &clone
}
func generateCompatibleReceivedMessage(t *testing.T, f *Filter) *ReceivedMessage {
keyInfo := &payload.KeyInfo{}
keyInfo.Kind = payload.Symmetric
keyInfo.SymKey = f.KeySym
var version uint32 = 1
p := new(payload.Payload)
p.Data = make([]byte, 20)
_, err := crand.Read(p.Data) // nolint: gosec
require.NoError(t, err)
p.Key = keyInfo
payload, err := p.Encode(version)
require.NoError(t, err)
msg := &pb.WakuMessage{
Payload: payload,
Version: &version,
ContentTopic: maps.Keys(f.ContentTopics)[2].ContentTopic(),
Timestamp: proto.Int64(time.Now().UnixNano()),
Meta: []byte{},
}
envelope := protocol.NewEnvelope(msg, time.Now().UnixNano(), f.PubsubTopic)
result := NewReceivedMessage(envelope, "test")
result.SymKeyHash = crypto.Keccak256Hash(f.KeySym)
return result
}
func TestWatchers(t *testing.T) {
const NumFilters = 16
const NumMessages = 256
var i int
var j uint32
var e *ReceivedMessage
var x, firstID string
var err error
filters := NewFilters("/waku/2/rs/16/32", createLogger(t))
tst := generateTestCases(t, NumFilters)
for i = 0; i < NumFilters; i++ {
tst[i].f.Src = nil
x, err = filters.Install(tst[i].f)
require.NoError(t, err)
tst[i].id = x
if len(firstID) == 0 {
firstID = x
}
}
lastID := x
var envelopes [NumMessages]*ReceivedMessage
for i = 0; i < NumMessages; i++ {
j = mrand.Uint32() % NumFilters // nolint: gosec
e = generateCompatibleReceivedMessage(t, tst[j].f)
envelopes[i] = e
tst[j].msgCnt++
}
for i = 0; i < NumMessages; i++ {
filters.NotifyWatchers(envelopes[i])
}
var total int
var mail []*ReceivedMessage
var count [NumFilters]int
for i = 0; i < NumFilters; i++ {
mail = tst[i].f.Retrieve()
count[i] = len(mail)
total += len(mail)
}
require.Equal(t, total, NumMessages)
for i = 0; i < NumFilters; i++ {
mail = tst[i].f.Retrieve()
require.Zero(t, len(mail))
require.Equal(t, tst[i].msgCnt, count[i])
}
// another round with a cloned filter
clone := cloneFilter(tst[0].f)
filters.Uninstall(lastID)
total = 0
last := NumFilters - 1
tst[last].f = clone
_, err = filters.Install(clone)
require.NoError(t, err)
for i = 0; i < NumFilters; i++ {
tst[i].msgCnt = 0
count[i] = 0
}
// make sure that the first watcher receives at least one message
e = generateCompatibleReceivedMessage(t, tst[0].f)
envelopes[0] = e
tst[0].msgCnt++
for i = 1; i < NumMessages; i++ {
j = mrand.Uint32() % NumFilters // nolint: gosec
e = generateCompatibleReceivedMessage(t, tst[j].f)
envelopes[i] = e
tst[j].msgCnt++
}
for i = 0; i < NumMessages; i++ {
filters.NotifyWatchers(envelopes[i])
}
for i = 0; i < NumFilters; i++ {
mail = tst[i].f.Retrieve()
count[i] = len(mail)
total += len(mail)
}
combined := tst[0].msgCnt + tst[last].msgCnt
require.Equal(t, total, NumMessages+count[0])
require.Equal(t, combined, count[0])
require.Equal(t, combined, count[last])
for i = 1; i < NumFilters-1; i++ {
mail = tst[i].f.Retrieve()
require.Zero(t, len(mail))
require.Equal(t, tst[i].msgCnt, count[i])
}
}

239
common/helpers.go Normal file
View File

@ -0,0 +1,239 @@
package common
import (
"crypto/ecdsa"
crand "crypto/rand"
"errors"
"fmt"
mrand "math/rand"
"regexp"
"strings"
"github.com/multiformats/go-multiaddr"
"github.com/ethereum/go-ethereum/common"
)
// IsPubKeyEqual checks that two public keys are equal
func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
if !ValidatePublicKey(a) {
return false
} else if !ValidatePublicKey(b) {
return false
}
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}
// ValidatePublicKey checks the format of the given public key.
func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0
}
// BytesToUintLittleEndian converts the slice to 64-bit unsigned integer.
func BytesToUintLittleEndian(b []byte) (res uint64) {
mul := uint64(1)
for i := 0; i < len(b); i++ {
res += uint64(b[i]) * mul
mul *= 256
}
return res
}
// BytesToUintBigEndian converts the slice to 64-bit unsigned integer.
func BytesToUintBigEndian(b []byte) (res uint64) {
for i := 0; i < len(b); i++ {
res *= 256
res += uint64(b[i])
}
return res
}
// ContainsOnlyZeros checks if the data contain only zeros.
func ContainsOnlyZeros(data []byte) bool {
for _, b := range data {
if b != 0 {
return false
}
}
return true
}
// GenerateSecureRandomData generates random data where extra security is required.
// The purpose of this function is to prevent some bugs in software or in hardware
// from delivering not-very-random data. This is especially useful for AES nonce,
// where true randomness does not really matter, but it is very important to have
// a unique nonce for every message.
func GenerateSecureRandomData(length int) ([]byte, error) {
x := make([]byte, length)
y := make([]byte, length)
res := make([]byte, length)
_, err := crand.Read(x)
if err != nil {
return nil, err
} else if !ValidateDataIntegrity(x, length) {
return nil, errors.New("crypto/rand failed to generate secure random data")
}
_, err = mrand.Read(y) // nolint: gosec
if err != nil {
return nil, err
} else if !ValidateDataIntegrity(y, length) {
return nil, errors.New("math/rand failed to generate secure random data")
}
for i := 0; i < length; i++ {
res[i] = x[i] ^ y[i]
}
if !ValidateDataIntegrity(res, length) {
return nil, errors.New("failed to generate secure random data")
}
return res, nil
}
// GenerateRandomID generates a random string, which is then returned to be used as a key id
func GenerateRandomID() (id string, err error) {
buf, err := GenerateSecureRandomData(KeyIDSize)
if err != nil {
return "", err
}
if !ValidateDataIntegrity(buf, KeyIDSize) {
return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data")
}
id = common.Bytes2Hex(buf)
return id, err
}
// ValidateDataIntegrity returns false if the data have the wrong or contains all zeros,
// which is the simplest and the most common bug.
func ValidateDataIntegrity(k []byte, expectedSize int) bool {
if len(k) != expectedSize {
return false
}
if expectedSize > 3 && ContainsOnlyZeros(k) {
return false
}
return true
}
func ParseDialErrors(errMsg string) []DialError {
// Regular expression to match the array of failed dial attempts
re := regexp.MustCompile(`all dials failed\n((?:\s*\*\s*\[.*\].*\n?)+)`)
match := re.FindStringSubmatch(errMsg)
if len(match) < 2 {
return nil
}
// Split the matched string into individual dial attempts
dialAttempts := strings.Split(strings.TrimSpace(match[1]), "\n")
// Regular expression to extract multiaddr and error message
reAttempt := regexp.MustCompile(`\[(.*?)\]\s*(.*)`)
var dialErrors []DialError
for _, attempt := range dialAttempts {
attempt = strings.TrimSpace(strings.Trim(attempt, "* "))
matches := reAttempt.FindStringSubmatch(attempt)
if len(matches) != 3 {
continue
}
errMsg := strings.TrimSpace(matches[2])
ma, err := multiaddr.NewMultiaddr(matches[1])
if err != nil {
continue
}
protocols := ma.Protocols()
protocolsStr := "/"
for i, protocol := range protocols {
protocolsStr += protocol.Name
if i < len(protocols)-1 {
protocolsStr += "/"
}
}
dialErrors = append(dialErrors, DialError{
Protocols: protocolsStr,
MultiAddr: matches[1],
ErrMsg: errMsg,
ErrType: CategorizeDialError(errMsg),
})
}
return dialErrors
}
// DialErrorType represents the type of dial error
type DialErrorType int
const (
ErrorUnknown DialErrorType = iota
ErrorIOTimeout
ErrorConnectionRefused
ErrorRelayCircuitFailed
ErrorRelayNoReservation
ErrorSecurityNegotiationFailed
ErrorConcurrentDialSucceeded
ErrorConcurrentDialFailed
ErrorConnectionsPerIPLimitExceeded
ErrorStreamReset
ErrorRelayResourceLimitExceeded
ErrorOpeningHopStreamToRelay
ErrorDialBackoff
)
func (det DialErrorType) String() string {
return [...]string{
"Unknown",
"I/O Timeout",
"Connection Refused",
"Relay Circuit Failed",
"Relay No Reservation",
"Security Negotiation Failed",
"Concurrent Dial Succeeded",
"Concurrent Dial Failed",
"Connections Per IP Limit Exceeded",
"Stream Reset",
"Relay Resource Limit Exceeded",
"Error Opening Hop Stream to Relay",
"Dial Backoff",
}[det]
}
func CategorizeDialError(errMsg string) DialErrorType {
switch {
case strings.Contains(errMsg, "i/o timeout"):
return ErrorIOTimeout
case strings.Contains(errMsg, "connect: connection refused"):
return ErrorConnectionRefused
case strings.Contains(errMsg, "error opening relay circuit: CONNECTION_FAILED"):
return ErrorRelayCircuitFailed
case strings.Contains(errMsg, "error opening relay circuit: NO_RESERVATION"):
return ErrorRelayNoReservation
case strings.Contains(errMsg, "failed to negotiate security protocol"):
return ErrorSecurityNegotiationFailed
case strings.Contains(errMsg, "concurrent active dial succeeded"):
return ErrorConcurrentDialSucceeded
case strings.Contains(errMsg, "concurrent active dial through the same relay failed"):
return ErrorConcurrentDialFailed
case strings.Contains(errMsg, "connections per ip limit exceeded"):
return ErrorConnectionsPerIPLimitExceeded
case strings.Contains(errMsg, "stream reset"):
return ErrorStreamReset
case strings.Contains(errMsg, "error opening relay circuit: RESOURCE_LIMIT_EXCEEDED"):
return ErrorRelayResourceLimitExceeded
case strings.Contains(errMsg, "error opening hop stream to relay: connection failed"):
return ErrorOpeningHopStreamToRelay
case strings.Contains(errMsg, "dial backoff"):
return ErrorDialBackoff
default:
return ErrorUnknown
}
}
// DialError represents a single dial error with its multiaddr and error message
type DialError struct {
MultiAddr string
ErrMsg string
ErrType DialErrorType
Protocols string
}

46
common/helpers_test.go Normal file
View File

@ -0,0 +1,46 @@
package common
import (
"testing"
)
var testCases = []struct {
errorString string
errorTypes []DialErrorType
}{
{
errorString: "failed to dial: failed to dial 16Uiu2HAmNFvubdwLtyScgQKMVL7Ppwvd7RZskgThtPAGqMrUfs1V: all dials failed\n * [/ip4/0.0.0.0/tcp/55136] dial tcp4 0.0.0.0:60183->146.4.106.194:55136: i/o timeout",
errorTypes: []DialErrorType{ErrorIOTimeout},
},
{
errorString: "failed to dial: failed to dial 16Uiu2HAmC1BsqZfy9exnA3DiQHAo3gdAopTQRErLUjK8WoospTwq: all dials failed\n * [/ip4/0.0.0.0/tcp/46949] dial tcp4 0.0.0.0:60183->0.0.0.0:46949: i/o timeout\n * [/ip4/0.0.0.0/tcp/51063] dial tcp4 0.0.0.0:60183->0.0.0.0:51063: i/o timeout",
errorTypes: []DialErrorType{ErrorIOTimeout, ErrorIOTimeout},
},
{
errorString: "failed to dial: failed ito dial 16Uiu2HAkyjvXPmymR5eRnvxCufRGZdfRrgjME6bmn3Xo6aprE1eo: all dials failed\n * [/ip4/0.0.0.0/tcp/443/wss/p2p/16Uiu2HAmB7Ur9HQqo3cWDPovRQjo57fxWWDaQx27WxSzDGhN4JKg/p2p-circuit] error opening relay circuit: CONNECTION_FAILED (203)\n * [/ip4/0.0.0.0/tcp/30303/p2p/16Uiu2HAmB7Ur9HQqo3cWDPovRQjo57fxWWDaQx27WxSzDGhN4JKg/p2p-circuit] concurrent active dial through the same relay failed with a protocol error\n * [/ip4/0.0.0.0/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT/p2p-circuit] error opening relay circuit: CONNECTION_FAILED (203)\n * [/ip4/0.0.0.0/tcp/443/wss/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT/p2p-circuit] concurrent active dial through the same relay failed with a protocol error",
errorTypes: []DialErrorType{ErrorRelayCircuitFailed, ErrorConcurrentDialFailed, ErrorRelayCircuitFailed, ErrorConcurrentDialFailed},
},
{
errorString: "failed to dial: failed to dial 16Uiu2HAm9QijC9d2GsGKPLLF7cZXMFEadqvN7FqhFJ2z5jdW6AFY: all dials failed\n * [/ip4/0.0.0.0/tcp/64012] dial tcp4 0.0.0.0:64012: connect: connection refused",
errorTypes: []DialErrorType{ErrorConnectionRefused},
},
{
errorString: "failed to dial: failed to dial 16Uiu2HAm7jXmopqB6BUJAQH1PKcZULfSKgj9rC9pyBRKwJGTiRHf: all dials failed\n * [/ip4/34.135.13.87/tcp/30303/p2p/16Uiu2HAm8mUZ18tBWPXDQsaF7PbCKYA35z7WB2xNZH2EVq1qS8LJ/p2p-circuit] error opening relay circuit: NO_RESERVATION (204)\n * [/ip4/34.170.192.39/tcp/30303/p2p/16Uiu2HAmMELCo218hncCtTvC2Dwbej3rbyHQcR8erXNnKGei7WPZ/p2p-circuit] error opening relay circuit: NO_RESERVATION (204)\n * [/ip4/178.72.78.116/tcp/42841] dial tcp4 0.0.0.0:60183->178.72.78.116:42841: i/o timeout",
errorTypes: []DialErrorType{ErrorRelayNoReservation, ErrorRelayNoReservation, ErrorIOTimeout},
},
{
errorString: "failed to dial: failed to dial 16Uiu2HAmMUYpufreYsUBo4A56BQDnbMwN4mhP3wMWTM4reS8ivxd: all dials failed\n * [/ip4/0.0.0.0/tcp/52957] unknown",
errorTypes: []DialErrorType{ErrorUnknown},
},
}
func TestParseDialErrors(t *testing.T) {
for _, testCase := range testCases {
parsedErrors := ParseDialErrors(testCase.errorString)
for i, err := range parsedErrors {
if err.ErrType != testCase.errorTypes[i] {
t.Errorf("Expected error type %v, got %v", testCase.errorTypes[i], err.ErrType)
}
}
}
}

208
common/message.go Normal file
View File

@ -0,0 +1,208 @@
package common
import (
"crypto/ecdsa"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/status-im/status-go/logutils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
// MessageType represents where this message comes from
type MessageType = string
const (
RelayedMessageType MessageType = "relay"
StoreMessageType MessageType = "store"
SendMessageType MessageType = "send"
MissingMessageType MessageType = "missing"
)
// MessageParams specifies the exact way a message should be wrapped
// into an Envelope.
type MessageParams struct {
Src *ecdsa.PrivateKey
Dst *ecdsa.PublicKey
KeySym []byte
Topic TopicType
Payload []byte
Padding []byte
}
// ReceivedMessage represents a data packet to be received through the
// WakuV2 protocol and successfully decrypted.
type ReceivedMessage struct {
Envelope *protocol.Envelope // Wrapped Waku Message
MsgType MessageType
Data []byte
Padding []byte
Signature []byte
Sent uint32 // Time when the message was posted into the network in seconds
Src *ecdsa.PublicKey // Message recipient (identity used to decode the message)
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
PubsubTopic string
ContentTopic TopicType
SymKeyHash common.Hash // The Keccak256Hash of the key
hash common.Hash
Processed atomic.Bool
}
// EnvelopeError code and optional description of the error.
type EnvelopeError struct {
Hash common.Hash
Code uint
Description string
}
// MessagesResponse sent as a response after processing batch of envelopes.
type MessagesResponse struct {
// Hash is a hash of all envelopes sent in the single batch.
Hash common.Hash
// Per envelope error.
Errors []EnvelopeError
}
func (msg *ReceivedMessage) isSymmetricEncryption() bool {
return msg.SymKeyHash != common.Hash{}
}
func (msg *ReceivedMessage) isAsymmetricEncryption() bool {
return msg.Dst != nil
}
// MessageStore defines interface for temporary message store.
type MessageStore interface {
Add(*ReceivedMessage) error
Pop() ([]*ReceivedMessage, error)
}
// NewMemoryMessageStore returns pointer to an instance of the MemoryMessageStore.
func NewMemoryMessageStore() *MemoryMessageStore {
return &MemoryMessageStore{
messages: map[common.Hash]*ReceivedMessage{},
}
}
// MemoryMessageStore represents messages stored in a memory hash table.
type MemoryMessageStore struct {
mu sync.Mutex
messages map[common.Hash]*ReceivedMessage
}
func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMessage {
ct, err := ExtractTopicFromContentTopic(env.Message().ContentTopic)
if err != nil {
logutils.ZapLogger().Debug("failed to extract content topic from message",
zap.String("topic", env.Message().ContentTopic),
zap.Error(err),
)
return nil
}
return &ReceivedMessage{
Envelope: env,
MsgType: msgType,
Sent: uint32(env.Message().GetTimestamp() / int64(time.Second)),
ContentTopic: ct,
PubsubTopic: env.PubsubTopic(),
}
}
// Hash returns the SHA3 hash of the envelope, calculating it if not yet done.
func (msg *ReceivedMessage) Hash() common.Hash {
if (msg.hash == common.Hash{}) {
msg.hash = common.BytesToHash(msg.Envelope.Hash().Bytes())
}
return msg.hash
}
// Add adds message to store.
func (store *MemoryMessageStore) Add(msg *ReceivedMessage) error {
store.mu.Lock()
defer store.mu.Unlock()
if _, exist := store.messages[msg.Hash()]; !exist {
store.messages[msg.Hash()] = msg
}
return nil
}
// Pop returns all available messages and cleans the store.
func (store *MemoryMessageStore) Pop() ([]*ReceivedMessage, error) {
store.mu.Lock()
defer store.mu.Unlock()
all := make([]*ReceivedMessage, 0, len(store.messages))
for hash, msg := range store.messages {
delete(store.messages, hash)
all = append(all, msg)
}
return all, nil
}
// Open tries to decrypt an message, and populates the message fields in case of success.
func (msg *ReceivedMessage) Open(watcher *Filter) (result *ReceivedMessage) {
if watcher == nil {
return nil
}
// The API interface forbids filters doing both symmetric and asymmetric encryption.
if watcher.expectsAsymmetricEncryption() && watcher.expectsSymmetricEncryption() {
return nil
}
// TODO: should we update msg instead of creating a new received message?
result = new(ReceivedMessage)
keyInfo := new(payload.KeyInfo)
if watcher.expectsAsymmetricEncryption() {
keyInfo.Kind = payload.Asymmetric
keyInfo.PrivKey = watcher.KeyAsym
msg.Dst = &watcher.KeyAsym.PublicKey
} else if watcher.expectsSymmetricEncryption() {
keyInfo.Kind = payload.Symmetric
keyInfo.SymKey = watcher.KeySym
msg.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
}
raw, err := payload.DecodePayload(msg.Envelope.Message(), keyInfo)
if err != nil {
logutils.ZapLogger().Error("failed to decode message", zap.Error(err))
return nil
}
result.Envelope = msg.Envelope
result.Data = raw.Data
result.Padding = raw.Padding
result.Signature = raw.Signature
result.Src = raw.PubKey
result.SymKeyHash = msg.SymKeyHash
result.Dst = msg.Dst
result.Sent = uint32(msg.Envelope.Message().GetTimestamp() / int64(time.Second))
ct, err := ExtractTopicFromContentTopic(msg.Envelope.Message().ContentTopic)
if err != nil {
logutils.ZapLogger().Error("failed to decode message", zap.Error(err))
return nil
}
result.PubsubTopic = watcher.PubsubTopic
result.ContentTopic = ct
return result
}

59
common/metrics.go Normal file
View File

@ -0,0 +1,59 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package common
import (
prom "github.com/prometheus/client_golang/prometheus"
)
var (
EnvelopesReceivedCounter = prom.NewCounter(prom.CounterOpts{
Name: "waku2_envelopes_received_total",
Help: "Number of envelopes received.",
})
EnvelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{
Name: "waku2_envelopes_validated_total",
Help: "Number of envelopes processed successfully.",
})
EnvelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_envelopes_rejected_total",
Help: "Number of envelopes rejected.",
}, []string{"reason"})
EnvelopesCacheFailedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_envelopes_cache_failures_total",
Help: "Number of envelopes which failed to be cached.",
}, []string{"type"})
EnvelopesCachedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_envelopes_cached_total",
Help: "Number of envelopes cached.",
}, []string{"cache"})
EnvelopesSizeMeter = prom.NewHistogram(prom.HistogramOpts{
Name: "waku2_envelopes_size_bytes",
Help: "Size of processed Waku envelopes in bytes.",
Buckets: prom.ExponentialBuckets(256, 4, 10),
})
)
func init() {
prom.MustRegister(EnvelopesReceivedCounter)
prom.MustRegister(EnvelopesRejectedCounter)
prom.MustRegister(EnvelopesCacheFailedCounter)
prom.MustRegister(EnvelopesCachedCounter)
prom.MustRegister(EnvelopesSizeMeter)
}

123
common/stats.go Normal file
View File

@ -0,0 +1,123 @@
package common
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/types"
)
type Measure struct {
Timestamp int64
Size uint64
}
type StatsTracker struct {
Uploads []Measure
Downloads []Measure
statsMutex sync.Mutex
}
const measurementPeriod = 15 * time.Second
func measure(input interface{}) (*Measure, error) {
b, err := rlp.EncodeToBytes(input)
if err != nil {
return nil, err
}
return &Measure{
Timestamp: time.Now().UnixNano(),
Size: uint64(len(b)),
}, nil
}
func (s *StatsTracker) AddUpload(input interface{}) {
go func(input interface{}) {
defer common.LogOnPanic()
m, err := measure(input)
if err != nil {
return
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Uploads = append(s.Uploads, *m)
}(input)
}
func (s *StatsTracker) AddDownload(input interface{}) {
go func(input interface{}) {
defer common.LogOnPanic()
m, err := measure(input)
if err != nil {
return
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Downloads = append(s.Downloads, *m)
}(input)
}
func (s *StatsTracker) AddUploadBytes(size uint64) {
go func(size uint64) {
defer common.LogOnPanic()
m := Measure{
Timestamp: time.Now().UnixNano(),
Size: size,
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Uploads = append(s.Uploads, m)
}(size)
}
func (s *StatsTracker) AddDownloadBytes(size uint64) {
go func(size uint64) {
defer common.LogOnPanic()
m := Measure{
Timestamp: time.Now().UnixNano(),
Size: size,
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Downloads = append(s.Downloads, m)
}(size)
}
func calculateAverage(measures []Measure, minTime int64) (validMeasures []Measure, rate uint64) {
for _, m := range measures {
if m.Timestamp > minTime {
// Only use recent measures
validMeasures = append(validMeasures, m)
rate += m.Size
}
}
rate /= (uint64(measurementPeriod) / uint64(1*time.Second))
return
}
func (s *StatsTracker) GetRatePerSecond() (uploadRate uint64, downloadRate uint64) {
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
minTime := time.Now().Add(-measurementPeriod).UnixNano()
s.Uploads, uploadRate = calculateAverage(s.Uploads, minTime)
s.Downloads, downloadRate = calculateAverage(s.Downloads, minTime)
return
}
func (s *StatsTracker) GetStats() types.StatsSummary {
uploadRate, downloadRate := s.GetRatePerSecond()
summary := types.StatsSummary{
UploadRate: uploadRate,
DownloadRate: downloadRate,
}
return summary
}

114
common/topic.go Normal file
View File

@ -0,0 +1,114 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package common
import (
"errors"
"strings"
"golang.org/x/exp/maps"
"github.com/ethereum/go-ethereum/common/hexutil"
)
// TopicType represents a cryptographically secure, probabilistic partial
// classifications of a message, determined as the first (leftmost) 4 bytes of the
// SHA3 hash of some arbitrary data given by the original author of the message.
type TopicType [TopicLength]byte
type TopicSet map[TopicType]struct{}
func NewTopicSet(topics []TopicType) TopicSet {
s := make(TopicSet, len(topics))
for _, t := range topics {
s[t] = struct{}{}
}
return s
}
func NewTopicSetFromBytes(byteArrays [][]byte) TopicSet {
topics := make([]TopicType, len(byteArrays))
for i, byteArr := range byteArrays {
topics[i] = BytesToTopic(byteArr)
}
return NewTopicSet(topics)
}
// BytesToTopic converts from the byte array representation of a topic
// into the TopicType type.
func BytesToTopic(b []byte) (t TopicType) {
sz := TopicLength
if x := len(b); x < TopicLength {
sz = x
}
for i := 0; i < sz; i++ {
t[i] = b[i]
}
return t
}
func StringToTopic(s string) (t TopicType) {
str, _ := hexutil.Decode(s)
return BytesToTopic(str)
}
// String converts a topic byte array to a string representation.
func (t *TopicType) String() string {
return hexutil.Encode(t[:])
}
// MarshalText returns the hex representation of t.
func (t TopicType) MarshalText() ([]byte, error) {
return hexutil.Bytes(t[:]).MarshalText()
}
// UnmarshalText parses a hex representation to a topic.
func (t *TopicType) UnmarshalText(input []byte) error {
return hexutil.UnmarshalFixedText("Topic", input, t[:])
}
// Converts a topic to its 23/WAKU2-TOPICS representation
func (t TopicType) ContentTopic() string {
enc := hexutil.Encode(t[:])
return "/waku/1/" + enc + "/rfc26"
}
func ExtractTopicFromContentTopic(s string) (TopicType, error) {
p := strings.Split(s, "/")
if len(p) != 5 || p[1] != "waku" || p[2] != "1" || p[4] != "rfc26" {
return TopicType{}, errors.New("invalid content topic format")
}
str, err := hexutil.Decode(p[3])
if err != nil {
return TopicType{}, err
}
result := BytesToTopic(str)
return result, nil
}
func (t TopicSet) ContentTopics() []string {
contentTopics := make([]string, len(t))
for i, ct := range maps.Keys(t) {
contentTopics[i] = ct.ContentTopic()
}
return contentTopics
}

145
common/topic_test.go Normal file
View File

@ -0,0 +1,145 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package common
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/require"
)
var topicStringTests = []struct {
topic TopicType
str string
}{
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, str: "0x00000000"},
{topic: TopicType{0x00, 0x7f, 0x80, 0xff}, str: "0x007f80ff"},
{topic: TopicType{0xff, 0x80, 0x7f, 0x00}, str: "0xff807f00"},
{topic: TopicType{0xf2, 0x6e, 0x77, 0x79}, str: "0xf26e7779"},
}
func TestTopicSet(t *testing.T) {
tSet := NewTopicSet([]TopicType{{0x00, 0x00, 0x00, 0x00}, {0x00, 0x7f, 0x80, 0xff}})
topics := tSet.ContentTopics()
require.Equal(t, len(topics), 2)
}
func TestTopicString(t *testing.T) {
for i, tst := range topicStringTests {
s := tst.topic.String()
if s != tst.str {
t.Fatalf("failed test %d: have %s, want %s.", i, s, tst.str)
}
}
}
var bytesToTopicTests = []struct {
data []byte
topic TopicType
}{
{topic: TopicType{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte{0x8f, 0x9a, 0x2b, 0x7d}},
{topic: TopicType{0x00, 0x7f, 0x80, 0xff}, data: []byte{0x00, 0x7f, 0x80, 0xff}},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte{0x00, 0x00, 0x00, 0x00}},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte{0x00, 0x00, 0x00}},
{topic: TopicType{0x01, 0x00, 0x00, 0x00}, data: []byte{0x01}},
{topic: TopicType{0x00, 0xfe, 0x00, 0x00}, data: []byte{0x00, 0xfe}},
{topic: TopicType{0xea, 0x1d, 0x43, 0x00}, data: []byte{0xea, 0x1d, 0x43}},
{topic: TopicType{0x6f, 0x3c, 0xb0, 0xdd}, data: []byte{0x6f, 0x3c, 0xb0, 0xdd, 0x0f, 0x00, 0x90}},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte{}},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: nil},
}
var unmarshalTestsGood = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x00000000"`)},
{topic: TopicType{0x00, 0x7f, 0x80, 0xff}, data: []byte(`"0x007f80ff"`)},
{topic: TopicType{0xff, 0x80, 0x7f, 0x00}, data: []byte(`"0xff807f00"`)},
{topic: TopicType{0xf2, 0x6e, 0x77, 0x79}, data: []byte(`"0xf26e7779"`)},
}
var unmarshalTestsBad = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x0000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x0000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"abcdefg0"`)},
}
var unmarshalTestsUgly = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x01, 0x00, 0x00, 0x00}, data: []byte(`"0x00000001"`)},
}
func TestBytesToTopic(t *testing.T) {
for i, tst := range bytesToTopicTests {
top := BytesToTopic(tst.data)
if top != tst.topic {
t.Fatalf("failed test %d: have %v, want %v.", i, t, tst.topic)
}
}
}
func TestUnmarshalTestsGood(t *testing.T) {
for i, tst := range unmarshalTestsGood {
var top TopicType
err := json.Unmarshal(tst.data, &top)
if err != nil {
t.Errorf("failed test %d. input: %v. err: %v", i, tst.data, err)
} else if top != tst.topic {
t.Errorf("failed test %d: have %v, want %v.", i, t, tst.topic)
}
}
}
func TestUnmarshalTestsBad(t *testing.T) {
// in this test UnmarshalJSON() is supposed to fail
for i, tst := range unmarshalTestsBad {
var top TopicType
err := json.Unmarshal(tst.data, &top)
if err == nil {
t.Fatalf("failed test %d. input: %v.", i, tst.data)
}
}
}
func TestUnmarshalTestsUgly(t *testing.T) {
// in this test UnmarshalJSON() is NOT supposed to fail, but result should be wrong
for i, tst := range unmarshalTestsUgly {
var top TopicType
err := json.Unmarshal(tst.data, &top)
if err != nil {
t.Errorf("failed test %d. input: %v.", i, tst.data)
} else if top == tst.topic {
t.Errorf("failed test %d: have %v, want %v.", i, top, tst.topic)
}
}
}

299
go.mod Normal file
View File

@ -0,0 +1,299 @@
module github.com/status-im/status-go
go 1.21
toolchain go1.21.8
replace github.com/ethereum/go-ethereum v1.10.26 => github.com/status-im/go-ethereum v1.10.25-status.18
replace github.com/rjeczalik/notify => github.com/status-im/notify v1.0.2-status
replace github.com/docker/docker => github.com/docker/engine v1.4.2-0.20190717161051-705d9623b7c1
replace github.com/nfnt/resize => github.com/status-im/resize v0.0.0-20201215164250-7c6d9f0d3088
replace github.com/forPelevin/gomoji => github.com/status-im/gomoji v1.1.3-0.20220213022530-e5ac4a8732d4
replace github.com/mutecomm/go-sqlcipher/v4 v4.4.2 => github.com/status-im/go-sqlcipher/v4 v4.5.4-status.3
replace github.com/libp2p/go-libp2p-pubsub v0.12.0 => github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f
require (
github.com/anacrolix/torrent v1.41.0
github.com/beevik/ntp v0.3.0
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cenkalti/backoff/v3 v3.2.2
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.8.0
github.com/ethereum/go-ethereum v1.10.26
github.com/forPelevin/gomoji v1.1.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.6.0
github.com/hashicorp/go-version v1.2.0
github.com/imdario/mergo v0.3.12
github.com/ipfs/go-cid v0.4.1
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a
github.com/keighl/metabolize v0.0.0-20150915210303-97ab655d4034
github.com/kilic/bls12-381 v0.0.0-20200607163746-32e1441c8a9f
github.com/lib/pq v1.10.4
github.com/libp2p/go-libp2p v0.36.2
github.com/libp2p/go-libp2p-pubsub v0.12.0
github.com/lucasb-eyer/go-colorful v1.0.3
github.com/mat/besticon v0.0.0-20210314201728-1579f269edb7
github.com/multiformats/go-multiaddr v0.13.0
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/nfnt/resize v0.0.0-00010101000000-000000000000
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
github.com/oliamb/cutter v0.2.2
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.0
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/markdown v0.0.0-20240404192634-b7e33c6ac3d4
github.com/status-im/migrate/v4 v4.6.2-status.3
github.com/status-im/mvds v0.0.27-0.20240729032523-f6fba962c2b1
github.com/status-im/status-go/extkeys v1.1.2
github.com/status-im/tcp-shaker v1.1.1-status
github.com/status-im/zxcvbn-go v0.0.0-20220311183720-5e8676676857
github.com/stretchr/testify v1.9.0
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a
github.com/tsenart/tb v0.0.0-20181025101425-0d2499c8b6e9
github.com/wealdtech/go-ens/v3 v3.5.0
github.com/wealdtech/go-multicodec v1.4.0
github.com/xeipuuv/gojsonschema v1.2.0
github.com/zenthangplus/goccm v0.0.0-20211005163543-2f2e522aca15
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.26.0
golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb
google.golang.org/protobuf v1.34.2
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3
)
require github.com/fogleman/gg v1.3.0
require (
github.com/Masterminds/squirrel v1.5.4
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/andybalholm/brotli v1.0.5
github.com/bits-and-blooms/bloom/v3 v3.7.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/gorilla/sessions v1.2.1
github.com/gorilla/websocket v1.5.3
github.com/ipfs/go-log/v2 v2.5.1
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/jmoiron/sqlx v1.3.5
github.com/klauspost/reedsolomon v1.12.1
github.com/ladydascalie/currency v1.6.0
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/mutecomm/go-sqlcipher/v4 v4.4.2
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
go.lsp.dev/jsonrpc2 v0.10.0
go.lsp.dev/protocol v0.12.0
go.lsp.dev/uri v0.3.0
go.uber.org/mock v0.4.0
go.uber.org/multierr v1.11.0
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
golang.org/x/net v0.28.0
golang.org/x/text v0.17.0
golang.org/x/time v0.5.0
golang.org/x/tools v0.24.0
)
require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/PuerkitoBio/goquery v1.6.1 // indirect
github.com/RoaringBitmap/roaring v0.9.4 // indirect
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
github.com/anacrolix/chansync v0.3.0 // indirect
github.com/anacrolix/confluence v1.9.0 // indirect
github.com/anacrolix/dht/v2 v2.15.2-0.20220123034220-0538803801cb // indirect
github.com/anacrolix/envpprof v1.1.1 // indirect
github.com/anacrolix/go-libutp v1.3.1 // indirect
github.com/anacrolix/log v0.13.1 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/perf v1.0.0 // indirect
github.com/anacrolix/missinggo/v2 v2.5.2 // indirect
github.com/anacrolix/mmsg v1.0.0 // indirect
github.com/anacrolix/multiless v0.2.0 // indirect
github.com/anacrolix/stm v0.3.0 // indirect
github.com/anacrolix/sync v0.4.0 // indirect
github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 // indirect
github.com/anacrolix/utp v0.1.0 // indirect
github.com/andybalholm/cascadia v1.2.0 // indirect
github.com/avast/retry-go/v4 v4.5.1 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/benbjohnson/immutable v0.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/elastic/gosigar v0.14.3 // indirect
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.3.0 // indirect
github.com/golang-migrate/migrate/v4 v4.15.2 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-bexpr v0.1.10 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/miekg/dns v1.1.62 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/pointerstructure v1.2.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onsi/ginkgo/v2 v2.20.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.8 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/ice/v2 v2.3.34 // indirect
github.com/pion/interceptor v0.1.30 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.9 // indirect
github.com/pion/sctp v1.8.33 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.20 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pion/webrtc/v3 v3.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/prometheus/tsdb v0.10.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.46.0 // indirect
github.com/quic-go/webtransport-go v0.8.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rjeczalik/notify v0.9.3 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 // indirect
github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/asm v1.1.3 // indirect
github.com/segmentio/encoding v0.3.4 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.6 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230916171929-1dd9494ff065 // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230916171518-2a77c3734dd1 // indirect
github.com/wk8/go-ordered-map v1.0.0 // indirect
github.com/wlynxg/anet v0.0.4 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/yeqown/reedsolomon v1.0.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.lsp.dev/pkg v0.0.0-20210717090340-384b27a52fb2 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.22.2 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/term v0.23.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
modernc.org/libc v1.11.82 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/memory v1.0.5 // indirect
modernc.org/sqlite v1.14.2-0.20211125151325-d4ed92c0a70f // indirect
zombiezen.com/go/sqlite v0.8.0 // indirect
)

3301
go.sum Normal file

File diff suppressed because it is too large Load Diff

3346
nwaku.go Normal file

File diff suppressed because it is too large Load Diff

56
persistence/dbkey.go Normal file
View File

@ -0,0 +1,56 @@
package persistence
import (
"encoding/binary"
"errors"
"github.com/waku-org/go-waku/waku/v2/hash"
)
const (
TimestampLength = 8
HashLength = 32
DigestLength = HashLength
PubsubTopicLength = HashLength
DBKeyLength = TimestampLength + PubsubTopicLength + DigestLength
)
type Hash [HashLength]byte
var (
// ErrInvalidByteSize is returned when DBKey can't be created
// from a byte slice because it has invalid length.
ErrInvalidByteSize = errors.New("byte slice has invalid length")
)
// DBKey key to be stored in a db.
type DBKey struct {
raw []byte
}
// Bytes returns a bytes representation of the DBKey.
func (k *DBKey) Bytes() []byte {
return k.raw
}
// NewDBKey creates a new DBKey with the given values.
func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey {
pubSubHash := make([]byte, PubsubTopicLength)
if pubsubTopic != "" {
pubSubHash = hash.SHA256([]byte(pubsubTopic))
}
var k DBKey
k.raw = make([]byte, DBKeyLength)
if senderTimestamp == 0 {
binary.BigEndian.PutUint64(k.raw, receiverTimestamp)
} else {
binary.BigEndian.PutUint64(k.raw, senderTimestamp)
}
copy(k.raw[TimestampLength:], pubSubHash[:])
copy(k.raw[TimestampLength+PubsubTopicLength:], digest)
return &k
}

424
persistence/dbstore.go Normal file
View File

@ -0,0 +1,424 @@
package persistence
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"
gowakuPersistence "github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/protocol"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/status-im/status-go/common"
"go.uber.org/zap"
)
var ErrInvalidCursor = errors.New("invalid cursor")
var ErrFutureMessage = errors.New("message timestamp in the future")
var ErrMessageTooOld = errors.New("message too old")
// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
const MaxTimeVariance = time.Duration(20) * time.Second
// DBStore is a MessageProvider that has a *sql.DB connection
type DBStore struct {
db *sql.DB
log *zap.Logger
maxMessages int
maxDuration time.Duration
wg sync.WaitGroup
cancel context.CancelFunc
}
// DBOption is an optional setting that can be used to configure the DBStore
type DBOption func(*DBStore) error
// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.
func WithDB(db *sql.DB) DBOption {
return func(d *DBStore) error {
d.db = db
return nil
}
}
// WithRetentionPolicy is a DBOption that specifies the max number of messages
// to be stored and duration before they're removed from the message store
func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
return func(d *DBStore) error {
d.maxDuration = maxDuration
d.maxMessages = maxMessages
return nil
}
}
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
result := new(DBStore)
result.log = log.Named("dbstore")
for _, opt := range options {
err := opt(result)
if err != nil {
return nil, err
}
}
return result, nil
}
func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
err := d.cleanOlderRecords()
if err != nil {
return err
}
d.wg.Add(1)
go d.checkForOlderRecords(ctx, 60*time.Second)
return nil
}
func (d *DBStore) Validate(env *protocol.Envelope) error {
n := time.Unix(0, env.Index().ReceiverTime)
upperBound := n.Add(MaxTimeVariance)
lowerBound := n.Add(-MaxTimeVariance)
// Ensure that messages don't "jump" to the front of the queue with future timestamps
if env.Message().GetTimestamp() > upperBound.UnixNano() {
return ErrFutureMessage
}
if env.Message().GetTimestamp() < lowerBound.UnixNano() {
return ErrMessageTooOld
}
return nil
}
func (d *DBStore) cleanOlderRecords() error {
d.log.Debug("Cleaning older records...")
// Delete older messages
if d.maxDuration > 0 {
start := time.Now()
sqlStmt := `DELETE FROM store_messages WHERE receiverTimestamp < ?`
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration)))
if err != nil {
return err
}
elapsed := time.Since(start)
d.log.Debug("deleting older records from the DB", zap.Duration("duration", elapsed))
}
// Limit number of records to a max N
if d.maxMessages > 0 {
start := time.Now()
sqlStmt := `DELETE FROM store_messages WHERE id IN (SELECT id FROM store_messages ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET ?)`
_, err := d.db.Exec(sqlStmt, d.maxMessages)
if err != nil {
return err
}
elapsed := time.Since(start)
d.log.Debug("deleting excess records from the DB", zap.Duration("duration", elapsed))
}
return nil
}
func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) {
defer common.LogOnPanic()
defer d.wg.Done()
ticker := time.NewTicker(t)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := d.cleanOlderRecords()
if err != nil {
d.log.Error("cleaning older records", zap.Error(err))
}
}
}
}
// Stop closes a DB connection
func (d *DBStore) Stop() {
if d.cancel == nil {
return
}
d.cancel()
d.wg.Wait()
d.db.Close()
}
// Put inserts a WakuMessage into the DB
func (d *DBStore) Put(env *protocol.Envelope) error {
stmt, err := d.db.Prepare("INSERT INTO store_messages (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
cursor := env.Index()
dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(env.Index().ReceiverTime), env.PubsubTopic(), env.Index().Digest)
_, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version)
if err != nil {
return err
}
err = stmt.Close()
if err != nil {
return err
}
return nil
}
// Query retrieves messages from the DB
func (d *DBStore) Query(query *storepb.HistoryQuery) (*storepb.Index, []gowakuPersistence.StoredMessage, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed))
}()
sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version
FROM store_messages
%s
ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s `
var conditions []string
var parameters []interface{}
paramCnt := 0
if query.PubsubTopic != "" {
paramCnt++
conditions = append(conditions, fmt.Sprintf("pubsubTopic = $%d", paramCnt))
parameters = append(parameters, query.PubsubTopic)
}
if len(query.ContentFilters) != 0 {
var ctPlaceHolder []string
for _, ct := range query.ContentFilters {
if ct.ContentTopic != "" {
paramCnt++
ctPlaceHolder = append(ctPlaceHolder, fmt.Sprintf("$%d", paramCnt))
parameters = append(parameters, ct.ContentTopic)
}
}
conditions = append(conditions, "contentTopic IN ("+strings.Join(ctPlaceHolder, ", ")+")")
}
usesCursor := false
if query.PagingInfo.Cursor != nil {
usesCursor = true
var exists bool
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM store_messages WHERE id = $1)",
cursorDBKey.Bytes(),
).Scan(&exists)
if err != nil {
return nil, nil, err
}
if exists {
eqOp := ">"
if query.PagingInfo.Direction == storepb.PagingInfo_BACKWARD {
eqOp = "<"
}
paramCnt++
conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, paramCnt))
parameters = append(parameters, cursorDBKey.Bytes())
} else {
return nil, nil, ErrInvalidCursor
}
}
if query.GetStartTime() != 0 {
if !usesCursor || query.PagingInfo.Direction == storepb.PagingInfo_BACKWARD {
paramCnt++
conditions = append(conditions, fmt.Sprintf("id >= $%d", paramCnt))
startTimeDBKey := NewDBKey(uint64(query.GetStartTime()), uint64(query.GetStartTime()), "", []byte{})
parameters = append(parameters, startTimeDBKey.Bytes())
}
}
if query.GetEndTime() != 0 {
if !usesCursor || query.PagingInfo.Direction == storepb.PagingInfo_FORWARD {
paramCnt++
conditions = append(conditions, fmt.Sprintf("id <= $%d", paramCnt))
endTimeDBKey := NewDBKey(uint64(query.GetEndTime()), uint64(query.GetEndTime()), "", []byte{})
parameters = append(parameters, endTimeDBKey.Bytes())
}
}
conditionStr := ""
if len(conditions) != 0 {
conditionStr = "WHERE " + strings.Join(conditions, " AND ")
}
orderDirection := "ASC"
if query.PagingInfo.Direction == storepb.PagingInfo_BACKWARD {
orderDirection = "DESC"
}
paramCnt++
sqlQuery += fmt.Sprintf("LIMIT $%d", paramCnt)
sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection)
stmt, err := d.db.Prepare(sqlQuery)
if err != nil {
return nil, nil, err
}
defer stmt.Close()
pageSize := query.PagingInfo.PageSize + 1
parameters = append(parameters, pageSize)
rows, err := stmt.Query(parameters...)
if err != nil {
return nil, nil, err
}
var result []gowakuPersistence.StoredMessage
for rows.Next() {
record, err := d.GetStoredMessage(rows)
if err != nil {
return nil, nil, err
}
result = append(result, record)
}
defer rows.Close()
var cursor *storepb.Index
if len(result) != 0 {
if len(result) > int(query.PagingInfo.PageSize) {
result = result[0:query.PagingInfo.PageSize]
lastMsgIdx := len(result) - 1
cursor = protocol.NewEnvelope(result[lastMsgIdx].Message, result[lastMsgIdx].ReceiverTime, result[lastMsgIdx].PubsubTopic).Index()
}
}
// The retrieved messages list should always be in chronological order
if query.PagingInfo.Direction == storepb.PagingInfo_BACKWARD {
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
}
return cursor, result, nil
}
// MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp
// in the message table
func (d *DBStore) MostRecentTimestamp() (int64, error) {
result := sql.NullInt64{}
err := d.db.QueryRow(`SELECT max(senderTimestamp) FROM store_messages`).Scan(&result)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return result.Int64, nil
}
// Count returns the number of rows in the message table
func (d *DBStore) Count() (int, error) {
var result int
err := d.db.QueryRow(`SELECT COUNT(*) FROM store_messages`).Scan(&result)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return result, nil
}
// GetAll returns all the stored WakuMessages
func (d *DBStore) GetAll() ([]gowakuPersistence.StoredMessage, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
d.log.Info("loading records from the DB", zap.Duration("duration", elapsed))
}()
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM store_messages ORDER BY senderTimestamp ASC")
if err != nil {
return nil, err
}
var result []gowakuPersistence.StoredMessage
defer rows.Close()
for rows.Next() {
record, err := d.GetStoredMessage(rows)
if err != nil {
return nil, err
}
result = append(result, record)
}
d.log.Info("DB returned records", zap.Int("count", len(result)))
err = rows.Err()
if err != nil {
return nil, err
}
return result, nil
}
// GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage`
func (d *DBStore) GetStoredMessage(row *sql.Rows) (gowakuPersistence.StoredMessage, error) {
var id []byte
var receiverTimestamp int64
var senderTimestamp int64
var contentTopic string
var payload []byte
var version uint32
var pubsubTopic string
err := row.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version)
if err != nil {
d.log.Error("scanning messages from db", zap.Error(err))
return gowakuPersistence.StoredMessage{}, err
}
msg := new(pb.WakuMessage)
msg.ContentTopic = contentTopic
msg.Payload = payload
msg.Timestamp = &senderTimestamp
msg.Version = &version
record := gowakuPersistence.StoredMessage{
ID: id,
PubsubTopic: pubsubTopic,
ReceiverTime: receiverTimestamp,
Message: msg,
}
return record, nil
}

103
persistence/queries.go Normal file
View File

@ -0,0 +1,103 @@
package persistence
import (
"database/sql"
"fmt"
)
// Queries are the sqlite queries for a given table.
type Queries struct {
deleteQuery string
existsQuery string
getQuery string
putQuery string
queryQuery string
prefixQuery string
limitQuery string
offsetQuery string
getSizeQuery string
}
// NewQueries creates a new set of queries for the passed table
func NewQueries(tbl string, db *sql.DB) (*Queries, error) {
err := CreateTable(db, tbl)
if err != nil {
return nil, err
}
return &Queries{
deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl),
existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl),
getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl),
putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2)", tbl),
queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl),
prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`,
limitQuery: ` LIMIT %d`,
offsetQuery: ` OFFSET %d`,
getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl),
}, nil
}
// Delete returns the query for deleting a row.
func (q Queries) Delete() string {
return q.deleteQuery
}
// Exists returns the query for determining if a row exists.
func (q Queries) Exists() string {
return q.existsQuery
}
// Get returns the query for getting a row.
func (q Queries) Get() string {
return q.getQuery
}
// Put returns the query for putting a row.
func (q Queries) Put() string {
return q.putQuery
}
// Query returns the query for getting multiple rows.
func (q Queries) Query() string {
return q.queryQuery
}
// Prefix returns the query fragment for getting a rows with a key prefix.
func (q Queries) Prefix() string {
return q.prefixQuery
}
// Limit returns the query fragment for limiting results.
func (q Queries) Limit() string {
return q.limitQuery
}
// Offset returns the query fragment for returning rows from a given offset.
func (q Queries) Offset() string {
return q.offsetQuery
}
// GetSize returns the query for determining the size of a value.
func (q Queries) GetSize() string {
return q.getSizeQuery
}
// CreateTable creates the table that will persist the peers
func CreateTable(db *sql.DB, tableName string) error {
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL PRIMARY KEY ON CONFLICT REPLACE, data BYTEA);", tableName)
_, err := db.Exec(sqlStmt)
if err != nil {
return err
}
return nil
}
func Clean(db *sql.DB, tableName string) error {
// This is fully controlled by us
sqlStmt := fmt.Sprintf("DELETE FROM %s;", tableName) // nolint: gosec
_, err := db.Exec(sqlStmt)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,125 @@
package persistence
import (
"crypto/ecdsa"
"database/sql"
"errors"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
)
// DBStore is a MessageProvider that has a *sql.DB connection
type ProtectedTopicsStore struct {
db *sql.DB
log *zap.Logger
insertStmt *sql.Stmt
fetchPrivKeyStmt *sql.Stmt
deleteStmt *sql.Stmt
}
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
func NewProtectedTopicsStore(log *zap.Logger, db *sql.DB) (*ProtectedTopicsStore, error) {
insertStmt, err := db.Prepare("INSERT OR REPLACE INTO pubsubtopic_signing_key (topic, priv_key, pub_key) VALUES (?, ?, ?)")
if err != nil {
return nil, err
}
fetchPrivKeyStmt, err := db.Prepare("SELECT priv_key FROM pubsubtopic_signing_key WHERE topic = ?")
if err != nil {
return nil, err
}
deleteStmt, err := db.Prepare("DELETE FROM pubsubtopic_signing_key WHERE topic = ?")
if err != nil {
return nil, err
}
result := new(ProtectedTopicsStore)
result.log = log.Named("protected-topics-store")
result.db = db
result.insertStmt = insertStmt
result.fetchPrivKeyStmt = fetchPrivKeyStmt
result.deleteStmt = deleteStmt
return result, nil
}
func (p *ProtectedTopicsStore) Close() error {
err := p.insertStmt.Close()
if err != nil {
return err
}
return p.fetchPrivKeyStmt.Close()
}
func (p *ProtectedTopicsStore) Insert(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error {
var privKeyBytes []byte
if privKey != nil {
privKeyBytes = crypto.FromECDSA(privKey)
}
pubKeyBytes := crypto.FromECDSAPub(publicKey)
_, err := p.insertStmt.Exec(pubsubTopic, privKeyBytes, pubKeyBytes)
return err
}
func (p *ProtectedTopicsStore) Delete(pubsubTopic string) error {
_, err := p.deleteStmt.Exec(pubsubTopic)
return err
}
func (p *ProtectedTopicsStore) FetchPrivateKey(topic string) (privKey *ecdsa.PrivateKey, err error) {
var privKeyBytes []byte
err = p.fetchPrivKeyStmt.QueryRow(topic).Scan(&privKeyBytes)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return crypto.ToECDSA(privKeyBytes)
}
type ProtectedTopic struct {
PubKey *ecdsa.PublicKey
Topic string
}
func (p *ProtectedTopicsStore) ProtectedTopics() ([]ProtectedTopic, error) {
rows, err := p.db.Query("SELECT pub_key, topic FROM pubsubtopic_signing_key")
if err != nil {
return nil, err
}
defer rows.Close()
var result []ProtectedTopic
for rows.Next() {
var pubKeyBytes []byte
var topic string
err := rows.Scan(&pubKeyBytes, &topic)
if err != nil {
return nil, err
}
pubk, err := crypto.UnmarshalPubkey(pubKeyBytes)
if err != nil {
return nil, err
}
result = append(result, ProtectedTopic{
PubKey: pubk,
Topic: topic,
})
}
return result, nil
}

1
third_party/nwaku vendored Submodule

@ -0,0 +1 @@
Subproject commit 8368ff006d386ea1597862acca5405e1edbfa9cf