refactor_: remove unused code from wakuv2 (#5651)
This commit is contained in:
parent
152f1c4071
commit
c68854299a
|
@ -3,7 +3,6 @@ package gethbridge
|
|||
import (
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/waku"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
|
||||
wakucommon "github.com/status-im/status-go/waku/common"
|
||||
wakuv2common "github.com/status-im/status-go/wakuv2/common"
|
||||
|
@ -47,8 +46,6 @@ func NewWakuV2EnvelopeEventWrapper(envelopeEvent *wakuv2common.EnvelopeEvent) *t
|
|||
for index := range data {
|
||||
wrappedData[index] = *NewWakuV2EnvelopeErrorWrapper(&data[index])
|
||||
}
|
||||
case *wakuv2.MailServerResponse:
|
||||
wrappedData = NewWakuV2MailServerResponseWrapper(data)
|
||||
}
|
||||
return &types.EnvelopeEvent{
|
||||
Event: types.EventType(envelopeEvent.Event),
|
||||
|
|
|
@ -3,7 +3,6 @@ package gethbridge
|
|||
import (
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/waku"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
// NewWakuMailServerResponseWrapper returns a types.MailServerResponse object that mimics Geth's MailServerResponse
|
||||
|
@ -18,16 +17,3 @@ func NewWakuMailServerResponseWrapper(mailServerResponse *waku.MailServerRespons
|
|||
Error: mailServerResponse.Error,
|
||||
}
|
||||
}
|
||||
|
||||
// NewWakuV2MailServerResponseWrapper returns a types.MailServerResponse object that mimics Geth's MailServerResponse
|
||||
func NewWakuV2MailServerResponseWrapper(mailServerResponse *wakuv2.MailServerResponse) *types.MailServerResponse {
|
||||
if mailServerResponse == nil {
|
||||
panic("mailServerResponse should not be nil")
|
||||
}
|
||||
|
||||
return &types.MailServerResponse{
|
||||
LastEnvelopeHash: types.Hash(mailServerResponse.LastEnvelopeHash),
|
||||
Cursor: mailServerResponse.Cursor,
|
||||
Error: mailServerResponse.Error,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,7 +132,7 @@ func (w *gethWakuV2Wrapper) Subscribe(opts *types.SubscriptionOptions) (string,
|
|||
}
|
||||
}
|
||||
|
||||
f, err := w.createFilterWrapper("", keyAsym, keySym, opts.PoW, opts.PubsubTopic, opts.Topics)
|
||||
f, err := w.createFilterWrapper("", keyAsym, keySym, opts.PubsubTopic, opts.Topics)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -162,7 +162,7 @@ func (w *gethWakuV2Wrapper) UnsubscribeMany(ids []string) error {
|
|||
return w.waku.UnsubscribeMany(ids)
|
||||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, pubsubTopic string, topics [][]byte) (types.Filter, error) {
|
||||
func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pubsubTopic string, topics [][]byte) (types.Filter, error) {
|
||||
return NewWakuV2FilterWrapper(&wakucommon.Filter{
|
||||
KeyAsym: keyAsym,
|
||||
KeySym: keySym,
|
||||
|
|
|
@ -18,35 +18,11 @@
|
|||
|
||||
package common
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Waku protocol parameters
|
||||
const (
|
||||
SizeMask = byte(3) // mask used to extract the size of payload size field from the flags
|
||||
|
||||
TopicLength = 4 // in bytes
|
||||
AESKeyLength = 32 // in bytes
|
||||
KeyIDSize = 32 // in bytes
|
||||
BloomFilterSize = 64 // in bytes
|
||||
MaxTopicInterest = 10000
|
||||
|
||||
EnvelopeHeaderLength = 20
|
||||
|
||||
MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message.
|
||||
DefaultMaxMessageSize = uint32(1 << 20) // DefaultMaximumMessageSize is 1mb.
|
||||
|
||||
ExpirationCycle = time.Second
|
||||
TransmissionCycle = 300 * time.Millisecond
|
||||
|
||||
DefaultTTL = 50 // seconds
|
||||
DefaultSyncAllowance = 10 // seconds
|
||||
|
||||
MaxLimitInSyncMailRequest = 1000
|
||||
|
||||
EnvelopeTimeNotSynced uint = iota + 1
|
||||
EnvelopeOtherError
|
||||
|
||||
MaxLimitInMessagesRequest = 1000
|
||||
)
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
package common
|
||||
|
||||
// TimeSyncError error for clock skew errors.
|
||||
type TimeSyncError error
|
|
@ -43,23 +43,6 @@ const (
|
|||
|
||||
// EventEnvelopeAvailable fires when envelop is available for filters
|
||||
EventEnvelopeAvailable EventType = "envelope.available"
|
||||
|
||||
// EventMailServerRequestSent fires when such request is sent.
|
||||
EventMailServerRequestSent EventType = "mailserver.request.sent"
|
||||
|
||||
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
|
||||
EventMailServerRequestCompleted EventType = "mailserver.request.completed"
|
||||
|
||||
// EventMailServerRequestExpired fires after mailserver the request TTL ends.
|
||||
// This event is independent and concurrent to EventMailServerRequestCompleted.
|
||||
// Request should be considered as expired only if expiry event was received first.
|
||||
EventMailServerRequestExpired EventType = "mailserver.request.expired"
|
||||
|
||||
// EventMailServerEnvelopeArchived fires after an envelope has been archived
|
||||
EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"
|
||||
|
||||
// EventMailServerSyncFinished fires when the sync of messages is finished.
|
||||
EventMailServerSyncFinished EventType = "mailserver.sync.finished"
|
||||
)
|
||||
|
||||
// EnvelopeEvent represents an envelope event.
|
||||
|
|
|
@ -2,8 +2,6 @@ package common
|
|||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -62,46 +60,6 @@ type ReceivedMessage struct {
|
|||
Processed atomic.Bool
|
||||
}
|
||||
|
||||
// MessagesRequest contains details of a request for historic messages.
|
||||
type MessagesRequest struct {
|
||||
// ID of the request. The current implementation requires ID to be 32-byte array,
|
||||
// however, it's not enforced for future implementation.
|
||||
ID []byte `json:"id"`
|
||||
|
||||
// From is a lower bound of time range.
|
||||
From uint32 `json:"from"`
|
||||
|
||||
// To is a upper bound of time range.
|
||||
To uint32 `json:"to"`
|
||||
|
||||
// Limit determines the number of messages sent by the mail server
|
||||
// for the current paginated request.
|
||||
Limit uint32 `json:"limit"`
|
||||
|
||||
// Cursor is used as starting point for paginated requests.
|
||||
Cursor []byte `json:"cursor"`
|
||||
|
||||
// Topics is a list of topics. A returned message should
|
||||
// belong to one of the topics from the list.
|
||||
Topics [][]byte `json:"topics"`
|
||||
}
|
||||
|
||||
func (r MessagesRequest) Validate() error {
|
||||
if len(r.ID) != common.HashLength {
|
||||
return errors.New("invalid 'ID', expected a 32-byte slice")
|
||||
}
|
||||
|
||||
if r.From > r.To {
|
||||
return errors.New("invalid 'From' value which is greater than To")
|
||||
}
|
||||
|
||||
if r.Limit > MaxLimitInMessagesRequest {
|
||||
return fmt.Errorf("invalid 'Limit' value, expected value lower than %d", MaxLimitInMessagesRequest)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnvelopeError code and optional description of the error.
|
||||
type EnvelopeError struct {
|
||||
Hash common.Hash
|
||||
|
@ -109,20 +67,6 @@ type EnvelopeError struct {
|
|||
Description string
|
||||
}
|
||||
|
||||
// ErrorToEnvelopeError converts common golang error into EnvelopeError with a code.
|
||||
func ErrorToEnvelopeError(hash common.Hash, err error) EnvelopeError {
|
||||
code := EnvelopeOtherError
|
||||
switch err.(type) {
|
||||
case TimeSyncError:
|
||||
code = EnvelopeTimeNotSynced
|
||||
}
|
||||
return EnvelopeError{
|
||||
Hash: hash,
|
||||
Code: code,
|
||||
Description: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
// MessagesResponse sent as a response after processing batch of envelopes.
|
||||
type MessagesResponse struct {
|
||||
// Hash is a hash of all envelopes sent in the single batch.
|
||||
|
|
|
@ -48,26 +48,6 @@ var (
|
|||
Help: "Size of processed Waku envelopes in bytes.",
|
||||
Buckets: prom.ExponentialBuckets(256, 4, 10),
|
||||
})
|
||||
RateLimitsProcessed = prom.NewCounter(prom.CounterOpts{
|
||||
Name: "waku2_rate_limits_processed_total",
|
||||
Help: "Number of packets Waku rate limiter processed.",
|
||||
})
|
||||
RateLimitsExceeded = prom.NewCounterVec(prom.CounterOpts{
|
||||
Name: "waku2_rate_limits_exceeded_total",
|
||||
Help: "Number of times the Waku rate limits were exceeded",
|
||||
}, []string{"type"})
|
||||
BridgeSent = prom.NewCounter(prom.CounterOpts{
|
||||
Name: "waku2_bridge_sent_total",
|
||||
Help: "Number of envelopes bridged from Waku",
|
||||
})
|
||||
BridgeReceivedSucceed = prom.NewCounter(prom.CounterOpts{
|
||||
Name: "waku2_bridge_received_success_total",
|
||||
Help: "Number of envelopes bridged to Waku and successfully added",
|
||||
})
|
||||
BridgeReceivedFailed = prom.NewCounter(prom.CounterOpts{
|
||||
Name: "waku2_bridge_received_failure_total",
|
||||
Help: "Number of envelopes bridged to Waku and failed to be added",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -76,9 +56,4 @@ func init() {
|
|||
prom.MustRegister(EnvelopesCacheFailedCounter)
|
||||
prom.MustRegister(EnvelopesCachedCounter)
|
||||
prom.MustRegister(EnvelopesSizeMeter)
|
||||
prom.MustRegister(RateLimitsProcessed)
|
||||
prom.MustRegister(RateLimitsExceeded)
|
||||
prom.MustRegister(BridgeSent)
|
||||
prom.MustRegister(BridgeReceivedSucceed)
|
||||
prom.MustRegister(BridgeReceivedFailed)
|
||||
}
|
||||
|
|
|
@ -1,179 +0,0 @@
|
|||
// Copyright 2019 The Waku Library Authors.
|
||||
//
|
||||
// The Waku library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The Waku library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty off
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
// This software uses the go-ethereum library, which is licensed
|
||||
// under the GNU Lesser General Public Library, version 3 or any later.
|
||||
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/status-im/status-go/waku/common"
|
||||
|
||||
gethcommon "github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
)
|
||||
|
||||
const (
|
||||
mailServerFailedPayloadPrefix = "ERROR="
|
||||
cursorSize = 36
|
||||
)
|
||||
|
||||
// MailServer represents a mail server, capable of
|
||||
// archiving the old messages for subsequent delivery
|
||||
// to the peers. Any implementation must ensure that both
|
||||
// functions are thread-safe. Also, they must return ASAP.
|
||||
// DeliverMail should use p2pMessageCode for delivery,
|
||||
// in order to bypass the expiry checks.
|
||||
type MailServer interface {
|
||||
Archive(env *common.Envelope)
|
||||
DeliverMail(peerID []byte, request *common.Envelope) // DEPRECATED; use Deliver()
|
||||
Deliver(peerID []byte, request common.MessagesRequest)
|
||||
}
|
||||
|
||||
// MailServerResponse is the response payload sent by the mailserver.
|
||||
type MailServerResponse struct {
|
||||
LastEnvelopeHash gethcommon.Hash
|
||||
Cursor []byte
|
||||
Error error
|
||||
}
|
||||
|
||||
func invalidResponseSizeError(size int) error {
|
||||
return fmt.Errorf("unexpected payload size: %d", size)
|
||||
}
|
||||
|
||||
// CreateMailServerRequestCompletedPayload creates a payload representing
|
||||
// a successful request to mailserver
|
||||
func CreateMailServerRequestCompletedPayload(requestID, lastEnvelopeHash gethcommon.Hash, cursor []byte) []byte {
|
||||
payload := make([]byte, len(requestID))
|
||||
copy(payload, requestID[:])
|
||||
payload = append(payload, lastEnvelopeHash[:]...)
|
||||
payload = append(payload, cursor...)
|
||||
return payload
|
||||
}
|
||||
|
||||
// CreateMailServerRequestFailedPayload creates a payload representing
|
||||
// a failed request to a mailserver
|
||||
func CreateMailServerRequestFailedPayload(requestID gethcommon.Hash, err error) []byte {
|
||||
payload := []byte(mailServerFailedPayloadPrefix)
|
||||
payload = append(payload, requestID[:]...)
|
||||
payload = append(payload, []byte(err.Error())...)
|
||||
return payload
|
||||
}
|
||||
|
||||
// CreateMailServerEvent returns EnvelopeEvent with correct data
|
||||
// if payload corresponds to any of the know mailserver events:
|
||||
// * request completed successfully
|
||||
// * request failed
|
||||
// If the payload is unknown/unparseable, it returns `nil`
|
||||
func CreateMailServerEvent(nodeID enode.ID, payload []byte) (*common.EnvelopeEvent, error) {
|
||||
if len(payload) < gethcommon.HashLength {
|
||||
return nil, invalidResponseSizeError(len(payload))
|
||||
}
|
||||
|
||||
event, err := tryCreateMailServerRequestFailedEvent(nodeID, payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if event != nil {
|
||||
return event, nil
|
||||
}
|
||||
|
||||
return tryCreateMailServerRequestCompletedEvent(nodeID, payload)
|
||||
}
|
||||
|
||||
func tryCreateMailServerRequestFailedEvent(nodeID enode.ID, payload []byte) (*common.EnvelopeEvent, error) {
|
||||
if len(payload) < gethcommon.HashLength+len(mailServerFailedPayloadPrefix) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
prefix, remainder := extractPrefix(payload, len(mailServerFailedPayloadPrefix))
|
||||
|
||||
if !bytes.Equal(prefix, []byte(mailServerFailedPayloadPrefix)) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var (
|
||||
requestID gethcommon.Hash
|
||||
errorMsg string
|
||||
)
|
||||
|
||||
requestID, remainder = extractHash(remainder)
|
||||
errorMsg = string(remainder)
|
||||
|
||||
event := common.EnvelopeEvent{
|
||||
Peer: nodeID,
|
||||
Hash: requestID,
|
||||
Event: common.EventMailServerRequestCompleted,
|
||||
Data: &MailServerResponse{
|
||||
Error: errors.New(errorMsg),
|
||||
},
|
||||
}
|
||||
|
||||
return &event, nil
|
||||
|
||||
}
|
||||
|
||||
func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (*common.EnvelopeEvent, error) {
|
||||
// check if payload is
|
||||
// - requestID or
|
||||
// - requestID + lastEnvelopeHash or
|
||||
// - requestID + lastEnvelopeHash + cursor
|
||||
// requestID is the hash of the request envelope.
|
||||
// lastEnvelopeHash is the last envelope sent by the mail server
|
||||
// cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
|
||||
if len(payload) > gethcommon.HashLength*2+cursorSize {
|
||||
return nil, invalidResponseSizeError(len(payload))
|
||||
}
|
||||
|
||||
var (
|
||||
requestID gethcommon.Hash
|
||||
lastEnvelopeHash gethcommon.Hash
|
||||
cursor []byte
|
||||
)
|
||||
|
||||
requestID, remainder := extractHash(payload)
|
||||
|
||||
if len(remainder) >= gethcommon.HashLength {
|
||||
lastEnvelopeHash, remainder = extractHash(remainder)
|
||||
}
|
||||
|
||||
if len(remainder) >= cursorSize {
|
||||
cursor = remainder
|
||||
}
|
||||
|
||||
event := common.EnvelopeEvent{
|
||||
Peer: nodeID,
|
||||
Hash: requestID,
|
||||
Event: common.EventMailServerRequestCompleted,
|
||||
Data: &MailServerResponse{
|
||||
LastEnvelopeHash: lastEnvelopeHash,
|
||||
Cursor: cursor,
|
||||
},
|
||||
}
|
||||
|
||||
return &event, nil
|
||||
}
|
||||
|
||||
func extractHash(payload []byte) (gethcommon.Hash, []byte) {
|
||||
prefix, remainder := extractPrefix(payload, gethcommon.HashLength)
|
||||
return gethcommon.BytesToHash(prefix), remainder
|
||||
}
|
||||
|
||||
func extractPrefix(payload []byte, size int) ([]byte, []byte) {
|
||||
return payload[:size], payload[size:]
|
||||
}
|
|
@ -41,7 +41,6 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
mapset "github.com/deckarep/golang-set"
|
||||
"golang.org/x/crypto/pbkdf2"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
|
@ -135,7 +134,6 @@ type Waku struct {
|
|||
keyMu sync.RWMutex // Mutex associated with key stores
|
||||
|
||||
envelopeCache *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] // Pool of envelopes currently tracked by this node
|
||||
expirations map[uint32]mapset.Set // Message expiration pool
|
||||
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
|
||||
|
||||
bandwidthCounter *metrics.BandwidthCounter
|
||||
|
@ -235,7 +233,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
|
|||
privateKeys: make(map[string]*ecdsa.PrivateKey),
|
||||
symKeys: make(map[string][]byte),
|
||||
envelopeCache: newTTLCache(),
|
||||
expirations: make(map[uint32]mapset.Set),
|
||||
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
|
||||
topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100),
|
||||
connectionNotifChan: make(chan node.PeerConnection, 20),
|
||||
|
|
Loading…
Reference in New Issue