2018-05-11 19:43:07 +00:00
|
|
|
// Copyright 2017 The go-ethereum Authors
|
|
|
|
// This file is part of the go-ethereum library.
|
|
|
|
//
|
|
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Lesser General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
package mailserver
|
|
|
|
|
|
|
|
import (
|
2020-01-08 11:12:23 +00:00
|
|
|
"crypto/ecdsa"
|
2021-05-14 10:55:42 +00:00
|
|
|
"encoding/binary"
|
2018-05-21 11:30:37 +00:00
|
|
|
"errors"
|
2018-05-11 19:43:07 +00:00
|
|
|
"fmt"
|
2019-02-27 14:30:08 +00:00
|
|
|
"math/rand"
|
2018-07-04 09:30:57 +00:00
|
|
|
"sync"
|
2018-05-17 11:21:04 +00:00
|
|
|
"time"
|
2018-05-11 19:43:07 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
prom "github.com/prometheus/client_golang/prometheus"
|
|
|
|
|
2018-05-11 19:43:07 +00:00
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
|
|
"github.com/ethereum/go-ethereum/rlp"
|
2020-01-08 11:12:23 +00:00
|
|
|
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
2019-11-23 17:57:05 +00:00
|
|
|
"github.com/status-im/status-go/eth-node/types"
|
2018-06-08 11:29:50 +00:00
|
|
|
"github.com/status-im/status-go/params"
|
2020-01-08 11:12:23 +00:00
|
|
|
"github.com/status-im/status-go/waku"
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
wakucommon "github.com/status-im/status-go/waku/common"
|
2018-05-11 19:43:07 +00:00
|
|
|
)
|
|
|
|
|
2018-05-17 11:21:04 +00:00
|
|
|
const (
|
|
|
|
maxQueryRange = 24 * time.Hour
|
2020-01-08 11:12:23 +00:00
|
|
|
maxQueryLimit = 1000
|
2019-03-12 11:14:35 +00:00
|
|
|
// When we default the upper limit, we want to extend the range a bit
|
|
|
|
// to accommodate for envelopes with slightly higher timestamp, in seconds
|
|
|
|
whisperTTLSafeThreshold = 60
|
2018-05-17 11:21:04 +00:00
|
|
|
)
|
|
|
|
|
2018-05-21 11:30:37 +00:00
|
|
|
var (
|
2018-07-04 09:30:57 +00:00
|
|
|
errDirectoryNotProvided = errors.New("data directory not provided")
|
|
|
|
errDecryptionMethodNotProvided = errors.New("decryption method is not provided")
|
2018-05-21 11:30:37 +00:00
|
|
|
)
|
|
|
|
|
2018-07-02 07:38:10 +00:00
|
|
|
const (
|
|
|
|
timestampLength = 4
|
|
|
|
requestLimitLength = 4
|
|
|
|
requestTimeRangeLength = timestampLength * 2
|
2019-02-22 08:55:37 +00:00
|
|
|
processRequestTimeout = time.Minute
|
2018-07-02 07:38:10 +00:00
|
|
|
)
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
type Config struct {
|
|
|
|
// DataDir points to a directory where mailserver's data is stored.
|
|
|
|
DataDir string
|
|
|
|
// Password is used to create a symmetric key to decrypt requests.
|
|
|
|
Password string
|
|
|
|
// AsymKey is an asymmetric key to decrypt requests.
|
|
|
|
AsymKey string
|
|
|
|
// MininumPoW is a minimum PoW for requests.
|
|
|
|
MinimumPoW float64
|
|
|
|
// RateLimit is a maximum number of requests per second from a peer.
|
|
|
|
RateLimit int
|
|
|
|
// DataRetention specifies a number of days an envelope should be stored for.
|
|
|
|
DataRetention int
|
|
|
|
PostgresEnabled bool
|
|
|
|
PostgresURI string
|
|
|
|
}
|
2018-07-04 09:30:57 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
// --------------
|
|
|
|
// WakuMailServer
|
|
|
|
// --------------
|
|
|
|
|
|
|
|
type WakuMailServer struct {
|
|
|
|
ms *mailServer
|
|
|
|
shh *waku.Waku
|
|
|
|
minRequestPoW float64
|
|
|
|
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
symFilter *wakucommon.Filter
|
|
|
|
asymFilter *wakucommon.Filter
|
2020-01-08 11:12:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *WakuMailServer) Init(waku *waku.Waku, cfg *params.WakuConfig) error {
|
|
|
|
s.shh = waku
|
|
|
|
s.minRequestPoW = cfg.MinimumPoW
|
|
|
|
|
|
|
|
config := Config{
|
|
|
|
DataDir: cfg.DataDir,
|
|
|
|
Password: cfg.MailServerPassword,
|
|
|
|
MinimumPoW: cfg.MinimumPoW,
|
|
|
|
DataRetention: cfg.MailServerDataRetention,
|
|
|
|
RateLimit: cfg.MailServerRateLimit,
|
|
|
|
PostgresEnabled: cfg.DatabaseConfig.PGConfig.Enabled,
|
|
|
|
PostgresURI: cfg.DatabaseConfig.PGConfig.URI,
|
|
|
|
}
|
|
|
|
var err error
|
|
|
|
s.ms, err = newMailServer(
|
|
|
|
config,
|
|
|
|
&wakuAdapter{},
|
|
|
|
&wakuService{Waku: waku},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := s.setupDecryptor(config.Password, config.AsymKey); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *WakuMailServer) Close() {
|
|
|
|
s.ms.Close()
|
|
|
|
}
|
|
|
|
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
func (s *WakuMailServer) Archive(env *wakucommon.Envelope) {
|
2020-01-08 11:12:23 +00:00
|
|
|
s.ms.Archive(gethbridge.NewWakuEnvelope(env))
|
|
|
|
}
|
|
|
|
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
func (s *WakuMailServer) Deliver(peerID []byte, req wakucommon.MessagesRequest) {
|
2020-01-08 11:12:23 +00:00
|
|
|
s.ms.DeliverMail(types.BytesToHash(peerID), types.BytesToHash(req.ID), MessagesRequestPayload{
|
|
|
|
Lower: req.From,
|
|
|
|
Upper: req.To,
|
|
|
|
Bloom: req.Bloom,
|
2020-01-21 07:11:24 +00:00
|
|
|
Topics: req.Topics,
|
2020-01-08 11:12:23 +00:00
|
|
|
Limit: req.Limit,
|
|
|
|
Cursor: req.Cursor,
|
|
|
|
Batch: true,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// DEPRECATED; user Deliver instead
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
func (s *WakuMailServer) DeliverMail(peerID []byte, req *wakucommon.Envelope) {
|
2020-01-08 11:12:23 +00:00
|
|
|
payload, err := s.decodeRequest(peerID, req)
|
2018-10-19 09:09:13 +00:00
|
|
|
if err != nil {
|
2019-10-22 17:32:05 +00:00
|
|
|
deliveryFailuresCounter.WithLabelValues("validation").Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:DeliverMail] request failed validaton",
|
2020-06-03 18:10:05 +00:00
|
|
|
"peerID", types.BytesToHash(peerID),
|
2020-01-08 11:12:23 +00:00
|
|
|
"requestID", req.Hash().String(),
|
|
|
|
"err", err,
|
|
|
|
)
|
|
|
|
s.ms.sendHistoricMessageErrorResponse(types.BytesToHash(peerID), types.Hash(req.Hash()), err)
|
2018-10-19 09:09:13 +00:00
|
|
|
return
|
|
|
|
}
|
2018-10-18 10:25:00 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
s.ms.DeliverMail(types.BytesToHash(peerID), types.Hash(req.Hash()), payload)
|
|
|
|
}
|
|
|
|
|
2021-05-14 10:55:42 +00:00
|
|
|
// bloomFromReceivedMessage for a given whisper.ReceivedMessage it extracts the
|
|
|
|
// used bloom filter.
|
|
|
|
func (s *WakuMailServer) bloomFromReceivedMessage(msg *wakucommon.ReceivedMessage) ([]byte, error) {
|
|
|
|
payloadSize := len(msg.Payload)
|
|
|
|
|
|
|
|
if payloadSize < 8 {
|
|
|
|
return nil, errors.New("Undersized p2p request")
|
|
|
|
} else if payloadSize == 8 {
|
|
|
|
return wakucommon.MakeFullNodeBloom(), nil
|
|
|
|
} else if payloadSize < 8+wakucommon.BloomFilterSize {
|
|
|
|
return nil, errors.New("Undersized bloom filter in p2p request")
|
|
|
|
}
|
|
|
|
|
|
|
|
return msg.Payload[8 : 8+wakucommon.BloomFilterSize], nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *WakuMailServer) decompositeRequest(peerID []byte, request *wakucommon.Envelope) (MessagesRequestPayload, error) {
|
|
|
|
var (
|
|
|
|
payload MessagesRequestPayload
|
|
|
|
err error
|
|
|
|
)
|
|
|
|
|
|
|
|
if s.minRequestPoW > 0.0 && request.PoW() < s.minRequestPoW {
|
|
|
|
return payload, fmt.Errorf("PoW() is too low")
|
|
|
|
}
|
|
|
|
|
|
|
|
decrypted := s.openEnvelope(request)
|
|
|
|
if decrypted == nil {
|
|
|
|
return payload, fmt.Errorf("failed to decrypt p2p request")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := checkMsgSignature(decrypted.Src, peerID); err != nil {
|
|
|
|
return payload, err
|
|
|
|
}
|
|
|
|
|
|
|
|
payload.Bloom, err = s.bloomFromReceivedMessage(decrypted)
|
|
|
|
if err != nil {
|
|
|
|
return payload, err
|
|
|
|
}
|
|
|
|
|
|
|
|
payload.Lower = binary.BigEndian.Uint32(decrypted.Payload[:4])
|
|
|
|
payload.Upper = binary.BigEndian.Uint32(decrypted.Payload[4:8])
|
|
|
|
|
|
|
|
if payload.Upper < payload.Lower {
|
|
|
|
err := fmt.Errorf("query range is invalid: from > to (%d > %d)", payload.Lower, payload.Upper)
|
|
|
|
return payload, err
|
|
|
|
}
|
|
|
|
|
|
|
|
lowerTime := time.Unix(int64(payload.Lower), 0)
|
|
|
|
upperTime := time.Unix(int64(payload.Upper), 0)
|
|
|
|
if upperTime.Sub(lowerTime) > maxQueryRange {
|
|
|
|
err := fmt.Errorf("query range too big for peer %s", string(peerID))
|
|
|
|
return payload, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(decrypted.Payload) >= requestTimeRangeLength+wakucommon.BloomFilterSize+requestLimitLength {
|
|
|
|
payload.Limit = binary.BigEndian.Uint32(decrypted.Payload[requestTimeRangeLength+wakucommon.BloomFilterSize:])
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(decrypted.Payload) == requestTimeRangeLength+wakucommon.BloomFilterSize+requestLimitLength+DBKeyLength {
|
|
|
|
payload.Cursor = decrypted.Payload[requestTimeRangeLength+wakucommon.BloomFilterSize+requestLimitLength:]
|
|
|
|
}
|
|
|
|
|
|
|
|
return payload, nil
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *WakuMailServer) setupDecryptor(password, asymKey string) error {
|
|
|
|
s.symFilter = nil
|
|
|
|
s.asymFilter = nil
|
|
|
|
|
|
|
|
if password != "" {
|
|
|
|
keyID, err := s.shh.AddSymKeyFromPassword(password)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("create symmetric key: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
symKey, err := s.shh.GetSymKey(keyID)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("save symmetric key: %v", err)
|
|
|
|
}
|
|
|
|
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
s.symFilter = &wakucommon.Filter{KeySym: symKey}
|
2020-01-08 11:12:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if asymKey != "" {
|
|
|
|
keyAsym, err := crypto.HexToECDSA(asymKey)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
s.asymFilter = &wakucommon.Filter{KeyAsym: keyAsym}
|
2020-01-08 11:12:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// openEnvelope tries to decrypt an envelope, first based on asymetric key (if
|
|
|
|
// provided) and second on the symetric key (if provided)
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
func (s *WakuMailServer) openEnvelope(request *wakucommon.Envelope) *wakucommon.ReceivedMessage {
|
2020-01-08 11:12:23 +00:00
|
|
|
if s.asymFilter != nil {
|
|
|
|
if d := request.Open(s.asymFilter); d != nil {
|
|
|
|
return d
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if s.symFilter != nil {
|
|
|
|
if d := request.Open(s.symFilter); d != nil {
|
|
|
|
return d
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
Move networking code for waku under `v0` namespace
Why make the change?
As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.
This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .
All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.
In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.
This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.
What has changed?
- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`
Things still to do
Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.
Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
2020-04-21 12:40:30 +00:00
|
|
|
func (s *WakuMailServer) decodeRequest(peerID []byte, request *wakucommon.Envelope) (MessagesRequestPayload, error) {
|
2020-01-08 11:12:23 +00:00
|
|
|
var payload MessagesRequestPayload
|
|
|
|
|
|
|
|
if s.minRequestPoW > 0.0 && request.PoW() < s.minRequestPoW {
|
|
|
|
return payload, errors.New("PoW too low")
|
|
|
|
}
|
|
|
|
|
|
|
|
decrypted := s.openEnvelope(request)
|
|
|
|
if decrypted == nil {
|
|
|
|
log.Warn("Failed to decrypt p2p request")
|
|
|
|
return payload, errors.New("failed to decrypt p2p request")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := checkMsgSignature(decrypted.Src, peerID); err != nil {
|
|
|
|
log.Warn("Check message signature failed", "err", err.Error())
|
|
|
|
return payload, fmt.Errorf("check message signature failed: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := rlp.DecodeBytes(decrypted.Payload, &payload); err != nil {
|
|
|
|
return payload, fmt.Errorf("failed to decode data: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if payload.Upper == 0 {
|
|
|
|
payload.Upper = uint32(time.Now().Unix() + whisperTTLSafeThreshold)
|
|
|
|
}
|
|
|
|
|
|
|
|
if payload.Upper < payload.Lower {
|
|
|
|
log.Error("Query range is invalid: lower > upper", "lower", payload.Lower, "upper", payload.Upper)
|
|
|
|
return payload, errors.New("query range is invalid: lower > upper")
|
|
|
|
}
|
|
|
|
|
|
|
|
return payload, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// -------
|
|
|
|
// adapter
|
|
|
|
// -------
|
|
|
|
|
|
|
|
type adapter interface {
|
|
|
|
CreateRequestFailedPayload(reqID types.Hash, err error) []byte
|
|
|
|
CreateRequestCompletedPayload(reqID, lastEnvelopeHash types.Hash, cursor []byte) []byte
|
|
|
|
CreateSyncResponse(envelopes []types.Envelope, cursor []byte, final bool, err string) interface{}
|
|
|
|
CreateRawSyncResponse(envelopes []rlp.RawValue, cursor []byte, final bool, err string) interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// -----------
|
|
|
|
// wakuAdapter
|
|
|
|
// -----------
|
|
|
|
|
|
|
|
type wakuAdapter struct{}
|
|
|
|
|
|
|
|
var _ adapter = (*wakuAdapter)(nil)
|
|
|
|
|
|
|
|
func (wakuAdapter) CreateRequestFailedPayload(reqID types.Hash, err error) []byte {
|
|
|
|
return waku.CreateMailServerRequestFailedPayload(common.Hash(reqID), err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (wakuAdapter) CreateRequestCompletedPayload(reqID, lastEnvelopeHash types.Hash, cursor []byte) []byte {
|
|
|
|
return waku.CreateMailServerRequestCompletedPayload(common.Hash(reqID), common.Hash(lastEnvelopeHash), cursor)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (wakuAdapter) CreateSyncResponse(_ []types.Envelope, _ []byte, _ bool, _ string) interface{} {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (wakuAdapter) CreateRawSyncResponse(_ []rlp.RawValue, _ []byte, _ bool, _ string) interface{} {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// -------
|
|
|
|
// service
|
|
|
|
// -------
|
|
|
|
|
|
|
|
type service interface {
|
|
|
|
SendHistoricMessageResponse(peerID []byte, payload []byte) error
|
|
|
|
SendRawP2PDirect(peerID []byte, envelopes ...rlp.RawValue) error
|
|
|
|
MaxMessageSize() uint32
|
|
|
|
SendRawSyncResponse(peerID []byte, data interface{}) error // optional
|
|
|
|
SendSyncResponse(peerID []byte, data interface{}) error // optional
|
|
|
|
}
|
|
|
|
|
|
|
|
// -----------
|
|
|
|
// wakuService
|
|
|
|
// -----------
|
|
|
|
|
|
|
|
type wakuService struct {
|
|
|
|
*waku.Waku
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *wakuService) SendRawSyncResponse(peerID []byte, data interface{}) error {
|
|
|
|
return errors.New("syncing mailservers is not support by Waku")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *wakuService) SendSyncResponse(peerID []byte, data interface{}) error {
|
|
|
|
return errors.New("syncing mailservers is not support by Waku")
|
|
|
|
}
|
|
|
|
|
|
|
|
// ----------
|
|
|
|
// mailServer
|
|
|
|
// ----------
|
|
|
|
|
|
|
|
type mailServer struct {
|
|
|
|
adapter adapter
|
|
|
|
service service
|
|
|
|
db DB
|
|
|
|
cleaner *dbCleaner // removes old envelopes
|
|
|
|
muRateLimiter sync.RWMutex
|
|
|
|
rateLimiter *rateLimiter
|
|
|
|
}
|
2018-10-19 09:09:13 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func newMailServer(cfg Config, adapter adapter, service service) (*mailServer, error) {
|
|
|
|
if len(cfg.DataDir) == 0 {
|
|
|
|
return nil, errDirectoryNotProvided
|
2018-11-19 09:14:03 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
// TODO: move out
|
|
|
|
if len(cfg.Password) == 0 && len(cfg.AsymKey) == 0 {
|
|
|
|
return nil, errDecryptionMethodNotProvided
|
2019-05-13 09:25:46 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
s := mailServer{
|
|
|
|
adapter: adapter,
|
|
|
|
service: service,
|
|
|
|
}
|
2018-12-06 09:48:28 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
if cfg.RateLimit > 0 {
|
|
|
|
s.setupRateLimiter(time.Duration(cfg.RateLimit) * time.Second)
|
|
|
|
}
|
2018-12-06 09:48:28 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
// Open database in the last step in order not to init with error
|
|
|
|
// and leave the database open by accident.
|
|
|
|
if cfg.PostgresEnabled {
|
|
|
|
log.Info("Connecting to postgres database")
|
|
|
|
database, err := NewPostgresDB(cfg.PostgresURI)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("open DB: %s", err)
|
2018-12-06 09:48:28 +00:00
|
|
|
}
|
2020-01-08 11:12:23 +00:00
|
|
|
s.db = database
|
|
|
|
log.Info("Connected to postgres database")
|
|
|
|
} else {
|
|
|
|
// Defaults to LevelDB
|
|
|
|
database, err := NewLevelDB(cfg.DataDir)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("open DB: %s", err)
|
|
|
|
}
|
|
|
|
s.db = database
|
2018-12-06 09:48:28 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
if cfg.DataRetention > 0 {
|
|
|
|
// MailServerDataRetention is a number of days.
|
|
|
|
s.setupCleaner(time.Duration(cfg.DataRetention) * time.Hour * 24)
|
2018-10-19 09:09:13 +00:00
|
|
|
}
|
2018-06-27 12:22:09 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
return &s, nil
|
|
|
|
}
|
2018-10-19 09:09:13 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
// setupRateLimiter in case limit is bigger than 0 it will setup an automated
|
|
|
|
// limit db cleanup.
|
|
|
|
func (s *mailServer) setupRateLimiter(limit time.Duration) {
|
|
|
|
s.rateLimiter = newRateLimiter(limit)
|
|
|
|
s.rateLimiter.Start()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mailServer) setupCleaner(retention time.Duration) {
|
|
|
|
s.cleaner = newDBCleaner(s.db, retention)
|
|
|
|
s.cleaner.Start()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mailServer) Archive(env types.Envelope) {
|
|
|
|
err := s.db.SaveEnvelope(env)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Could not save envelope", "hash", env.Hash().String())
|
2019-11-04 20:15:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *mailServer) DeliverMail(peerID, reqID types.Hash, req MessagesRequestPayload) {
|
2019-11-04 20:15:21 +00:00
|
|
|
timer := prom.NewTimer(mailDeliveryDuration)
|
|
|
|
defer timer.ObserveDuration()
|
|
|
|
|
|
|
|
deliveryAttemptsCounter.Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:DeliverMail] delivering mail",
|
|
|
|
"peerID", peerID.String(),
|
|
|
|
"requestID", reqID.String(),
|
2019-11-04 20:15:21 +00:00
|
|
|
)
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
req.SetDefaults()
|
2019-11-04 20:15:21 +00:00
|
|
|
|
2020-01-21 07:11:24 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:DeliverMail] processing request",
|
|
|
|
"peerID", peerID.String(),
|
|
|
|
"requestID", reqID.String(),
|
|
|
|
"lower", req.Lower,
|
|
|
|
"upper", req.Upper,
|
|
|
|
"bloom", req.Bloom,
|
|
|
|
"topics", req.Topics,
|
|
|
|
"limit", req.Limit,
|
|
|
|
"cursor", req.Cursor,
|
|
|
|
"batch", req.Batch,
|
|
|
|
)
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
if err := req.Validate(); err != nil {
|
|
|
|
syncFailuresCounter.WithLabelValues("req_invalid").Inc()
|
|
|
|
log.Error(
|
|
|
|
"[mailserver:DeliverMail] request invalid",
|
|
|
|
"peerID", peerID.String(),
|
|
|
|
"requestID", reqID.String(),
|
|
|
|
"err", err,
|
|
|
|
)
|
|
|
|
s.sendHistoricMessageErrorResponse(peerID, reqID, fmt.Errorf("request is invalid: %v", err))
|
2019-11-04 20:15:21 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
if s.exceedsPeerRequests(peerID) {
|
2019-11-04 20:15:21 +00:00
|
|
|
deliveryFailuresCounter.WithLabelValues("peer_req_limit").Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:DeliverMail] peer exceeded the limit",
|
|
|
|
"peerID", peerID.String(),
|
|
|
|
"requestID", reqID.String(),
|
|
|
|
)
|
|
|
|
s.sendHistoricMessageErrorResponse(peerID, reqID, fmt.Errorf("rate limit exceeded"))
|
2019-11-04 20:15:21 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
if req.Batch {
|
|
|
|
requestsBatchedCounter.Inc()
|
|
|
|
}
|
2019-11-04 20:15:21 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
iter, err := s.createIterator(req)
|
2019-11-04 20:15:21 +00:00
|
|
|
if err != nil {
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:DeliverMail] request failed",
|
|
|
|
"peerID", peerID.String(),
|
|
|
|
"requestID", reqID.String(),
|
2020-12-07 08:20:57 +00:00
|
|
|
"err", err,
|
2020-01-08 11:12:23 +00:00
|
|
|
)
|
2019-11-04 20:15:21 +00:00
|
|
|
return
|
|
|
|
}
|
2020-01-21 07:11:24 +00:00
|
|
|
defer func() { _ = iter.Release() }()
|
2019-11-04 20:15:21 +00:00
|
|
|
|
|
|
|
bundles := make(chan []rlp.RawValue, 5)
|
2020-01-08 11:12:23 +00:00
|
|
|
errCh := make(chan error)
|
2019-11-04 20:15:21 +00:00
|
|
|
cancelProcessing := make(chan struct{})
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
counter := 0
|
|
|
|
for bundle := range bundles {
|
2020-01-08 11:12:23 +00:00
|
|
|
if err := s.sendRawEnvelopes(peerID, bundle, req.Batch); err != nil {
|
2019-11-04 20:15:21 +00:00
|
|
|
close(cancelProcessing)
|
|
|
|
errCh <- err
|
|
|
|
break
|
|
|
|
}
|
|
|
|
counter++
|
|
|
|
}
|
|
|
|
close(errCh)
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:DeliverMail] finished sending bundles",
|
2019-11-04 20:15:21 +00:00
|
|
|
"peerID", peerID,
|
2020-01-08 11:12:23 +00:00
|
|
|
"requestID", reqID.String(),
|
2019-11-04 20:15:21 +00:00
|
|
|
"counter", counter,
|
|
|
|
)
|
|
|
|
}()
|
|
|
|
|
|
|
|
nextPageCursor, lastEnvelopeHash := s.processRequestInBundles(
|
|
|
|
iter,
|
2020-01-08 11:12:23 +00:00
|
|
|
req.Bloom,
|
2020-01-21 07:11:24 +00:00
|
|
|
req.Topics,
|
2020-01-08 11:12:23 +00:00
|
|
|
int(req.Limit),
|
2019-11-04 20:15:21 +00:00
|
|
|
processRequestTimeout,
|
2020-01-08 11:12:23 +00:00
|
|
|
reqID.String(),
|
2019-11-04 20:15:21 +00:00
|
|
|
bundles,
|
|
|
|
cancelProcessing,
|
|
|
|
)
|
|
|
|
|
|
|
|
// Wait for the goroutine to finish the work. It may return an error.
|
2020-01-08 11:12:23 +00:00
|
|
|
if err := <-errCh; err != nil {
|
2019-11-04 20:15:21 +00:00
|
|
|
deliveryFailuresCounter.WithLabelValues("process").Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:DeliverMail] error while processing",
|
|
|
|
"err", err,
|
|
|
|
"peerID", peerID,
|
|
|
|
"requestID", reqID,
|
|
|
|
)
|
|
|
|
s.sendHistoricMessageErrorResponse(peerID, reqID, err)
|
2019-11-04 20:15:21 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Processing of the request could be finished earlier due to iterator error.
|
2020-01-08 11:12:23 +00:00
|
|
|
if err := iter.Error(); err != nil {
|
2019-11-04 20:15:21 +00:00
|
|
|
deliveryFailuresCounter.WithLabelValues("iterator").Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:DeliverMail] iterator failed",
|
|
|
|
"err", err,
|
|
|
|
"peerID", peerID,
|
|
|
|
"requestID", reqID,
|
|
|
|
)
|
|
|
|
s.sendHistoricMessageErrorResponse(peerID, reqID, err)
|
2019-11-04 20:15:21 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:DeliverMail] sending historic message response",
|
2019-11-04 20:15:21 +00:00
|
|
|
"peerID", peerID,
|
2020-01-08 11:12:23 +00:00
|
|
|
"requestID", reqID,
|
2019-11-04 20:15:21 +00:00
|
|
|
"last", lastEnvelopeHash,
|
|
|
|
"next", nextPageCursor,
|
|
|
|
)
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
s.sendHistoricMessageResponse(peerID, reqID, lastEnvelopeHash, nextPageCursor)
|
2018-05-21 11:30:37 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *mailServer) SyncMail(peerID types.Hash, req MessagesRequestPayload) error {
|
|
|
|
log.Info("Started syncing envelopes", "peer", peerID.String(), "req", req)
|
2018-12-06 09:48:28 +00:00
|
|
|
|
2020-12-28 09:09:45 +00:00
|
|
|
requestID := fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Intn(1000)) // nolint: gosec
|
2019-02-27 14:30:08 +00:00
|
|
|
|
2019-10-22 17:32:05 +00:00
|
|
|
syncAttemptsCounter.Inc()
|
2018-12-06 09:48:28 +00:00
|
|
|
|
|
|
|
// Check rate limiting for a requesting peer.
|
2020-01-08 11:12:23 +00:00
|
|
|
if s.exceedsPeerRequests(peerID) {
|
2019-10-22 17:32:05 +00:00
|
|
|
syncFailuresCounter.WithLabelValues("req_per_sec_limit").Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error("Peer exceeded request per seconds limit", "peerID", peerID.String())
|
2018-12-06 09:48:28 +00:00
|
|
|
return fmt.Errorf("requests per seconds limit exceeded")
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
req.SetDefaults()
|
|
|
|
|
|
|
|
if err := req.Validate(); err != nil {
|
2019-10-22 17:32:05 +00:00
|
|
|
syncFailuresCounter.WithLabelValues("req_invalid").Inc()
|
2018-12-14 11:21:34 +00:00
|
|
|
return fmt.Errorf("request is invalid: %v", err)
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
iter, err := s.createIterator(req)
|
2019-05-13 09:25:46 +00:00
|
|
|
if err != nil {
|
2019-10-22 17:32:05 +00:00
|
|
|
syncFailuresCounter.WithLabelValues("iterator").Inc()
|
2019-05-13 09:25:46 +00:00
|
|
|
return err
|
|
|
|
}
|
2020-01-21 07:11:24 +00:00
|
|
|
defer func() { _ = iter.Release() }()
|
2018-12-06 09:48:28 +00:00
|
|
|
|
2019-05-09 10:58:02 +00:00
|
|
|
bundles := make(chan []rlp.RawValue, 5)
|
2018-12-06 09:48:28 +00:00
|
|
|
errCh := make(chan error)
|
2019-02-22 08:55:37 +00:00
|
|
|
cancelProcessing := make(chan struct{})
|
2018-12-06 09:48:28 +00:00
|
|
|
|
|
|
|
go func() {
|
|
|
|
for bundle := range bundles {
|
2020-01-08 11:12:23 +00:00
|
|
|
resp := s.adapter.CreateRawSyncResponse(bundle, nil, false, "")
|
|
|
|
if err := s.service.SendRawSyncResponse(peerID.Bytes(), resp); err != nil {
|
2019-02-22 08:55:37 +00:00
|
|
|
close(cancelProcessing)
|
2018-12-06 09:48:28 +00:00
|
|
|
errCh <- fmt.Errorf("failed to send sync response: %v", err)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
close(errCh)
|
|
|
|
}()
|
|
|
|
|
|
|
|
nextCursor, _ := s.processRequestInBundles(
|
|
|
|
iter,
|
2020-01-08 11:12:23 +00:00
|
|
|
req.Bloom,
|
2020-01-21 07:11:24 +00:00
|
|
|
req.Topics,
|
2020-01-08 11:12:23 +00:00
|
|
|
int(req.Limit),
|
2019-02-22 08:55:37 +00:00
|
|
|
processRequestTimeout,
|
2019-02-27 14:30:08 +00:00
|
|
|
requestID,
|
2018-12-06 09:48:28 +00:00
|
|
|
bundles,
|
2019-02-22 08:55:37 +00:00
|
|
|
cancelProcessing,
|
2018-12-06 09:48:28 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Wait for the goroutine to finish the work. It may return an error.
|
|
|
|
if err := <-errCh; err != nil {
|
2019-10-22 17:32:05 +00:00
|
|
|
syncFailuresCounter.WithLabelValues("routine").Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
_ = s.service.SendSyncResponse(
|
|
|
|
peerID.Bytes(),
|
|
|
|
s.adapter.CreateSyncResponse(nil, nil, false, "failed to send a response"),
|
2018-12-06 09:48:28 +00:00
|
|
|
)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Processing of the request could be finished earlier due to iterator error.
|
|
|
|
if err := iter.Error(); err != nil {
|
2019-10-22 17:32:05 +00:00
|
|
|
syncFailuresCounter.WithLabelValues("iterator").Inc()
|
2020-01-08 11:12:23 +00:00
|
|
|
_ = s.service.SendSyncResponse(
|
|
|
|
peerID.Bytes(),
|
|
|
|
s.adapter.CreateSyncResponse(nil, nil, false, "failed to process all envelopes"),
|
2018-12-06 09:48:28 +00:00
|
|
|
)
|
2019-05-13 09:25:46 +00:00
|
|
|
return fmt.Errorf("LevelDB iterator failed: %v", err)
|
2018-12-06 09:48:28 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info("Finished syncing envelopes", "peer", peerID.String())
|
2018-12-06 09:48:28 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
err = s.service.SendSyncResponse(
|
|
|
|
peerID.Bytes(),
|
|
|
|
s.adapter.CreateSyncResponse(nil, nextCursor, true, ""),
|
|
|
|
)
|
|
|
|
if err != nil {
|
2019-10-22 17:32:05 +00:00
|
|
|
syncFailuresCounter.WithLabelValues("response_send").Inc()
|
2018-12-06 09:48:28 +00:00
|
|
|
return fmt.Errorf("failed to send the final sync response: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
// Close the mailserver and its associated db connection.
|
|
|
|
func (s *mailServer) Close() {
|
|
|
|
if s.db != nil {
|
|
|
|
if err := s.db.Close(); err != nil {
|
|
|
|
log.Error("closing database failed", "err", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if s.rateLimiter != nil {
|
|
|
|
s.rateLimiter.Stop()
|
|
|
|
}
|
|
|
|
if s.cleaner != nil {
|
|
|
|
s.cleaner.Stop()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mailServer) exceedsPeerRequests(peerID types.Hash) bool {
|
2019-01-10 16:07:16 +00:00
|
|
|
s.muRateLimiter.RLock()
|
|
|
|
defer s.muRateLimiter.RUnlock()
|
|
|
|
|
|
|
|
if s.rateLimiter == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
if s.rateLimiter.IsAllowed(peerID.String()) {
|
|
|
|
s.rateLimiter.Add(peerID.String())
|
2019-01-10 16:07:16 +00:00
|
|
|
return false
|
2018-05-17 11:21:04 +00:00
|
|
|
}
|
2019-01-10 16:07:16 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info("peerID exceeded the number of requests per second", "peerID", peerID.String())
|
2019-01-10 16:07:16 +00:00
|
|
|
return true
|
2018-05-11 19:43:07 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *mailServer) createIterator(req MessagesRequestPayload) (Iterator, error) {
|
2018-10-19 09:09:13 +00:00
|
|
|
var (
|
2019-11-23 17:57:05 +00:00
|
|
|
emptyHash types.Hash
|
|
|
|
emptyTopic types.TopicType
|
2019-05-09 10:58:02 +00:00
|
|
|
ku, kl *DBKey
|
2018-10-19 09:09:13 +00:00
|
|
|
)
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
ku = NewDBKey(req.Upper+1, emptyTopic, emptyHash)
|
|
|
|
kl = NewDBKey(req.Lower, emptyTopic, emptyHash)
|
2018-10-19 09:09:13 +00:00
|
|
|
|
2019-05-10 10:26:57 +00:00
|
|
|
query := CursorQuery{
|
|
|
|
start: kl.Bytes(),
|
|
|
|
end: ku.Bytes(),
|
2020-01-08 11:12:23 +00:00
|
|
|
cursor: req.Cursor,
|
2021-01-25 10:00:27 +00:00
|
|
|
topics: req.Topics,
|
2020-01-08 11:12:23 +00:00
|
|
|
bloom: req.Bloom,
|
|
|
|
limit: req.Limit,
|
2019-04-23 07:04:58 +00:00
|
|
|
}
|
2019-05-10 10:26:57 +00:00
|
|
|
return s.db.BuildIterator(query)
|
2018-10-19 09:09:13 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *mailServer) processRequestInBundles(
|
2019-05-10 10:26:57 +00:00
|
|
|
iter Iterator,
|
2019-02-08 15:39:24 +00:00
|
|
|
bloom []byte,
|
2020-01-21 07:11:24 +00:00
|
|
|
topics [][]byte,
|
2019-02-08 15:39:24 +00:00
|
|
|
limit int,
|
2019-02-22 08:55:37 +00:00
|
|
|
timeout time.Duration,
|
2019-02-27 14:30:08 +00:00
|
|
|
requestID string,
|
2019-05-09 10:58:02 +00:00
|
|
|
output chan<- []rlp.RawValue,
|
2019-02-22 08:55:37 +00:00
|
|
|
cancel <-chan struct{},
|
2019-11-23 17:57:05 +00:00
|
|
|
) ([]byte, types.Hash) {
|
2019-10-22 17:32:05 +00:00
|
|
|
timer := prom.NewTimer(requestsInBundlesDuration)
|
|
|
|
defer timer.ObserveDuration()
|
|
|
|
|
2018-10-19 09:09:13 +00:00
|
|
|
var (
|
2019-05-09 10:58:02 +00:00
|
|
|
bundle []rlp.RawValue
|
2018-12-06 09:48:28 +00:00
|
|
|
bundleSize uint32
|
2019-05-09 10:58:02 +00:00
|
|
|
batches [][]rlp.RawValue
|
2018-12-06 09:48:28 +00:00
|
|
|
processedEnvelopes int
|
|
|
|
processedEnvelopesSize int64
|
2018-12-11 10:23:47 +00:00
|
|
|
nextCursor []byte
|
2019-11-23 17:57:05 +00:00
|
|
|
lastEnvelopeHash types.Hash
|
2018-10-19 09:09:13 +00:00
|
|
|
)
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:processRequestInBundles] processing request",
|
2019-02-27 14:30:08 +00:00
|
|
|
"requestID", requestID,
|
2020-01-08 11:12:23 +00:00
|
|
|
"limit", limit,
|
|
|
|
)
|
2019-02-08 15:39:24 +00:00
|
|
|
|
|
|
|
// We iterate over the envelopes.
|
|
|
|
// We collect envelopes in batches.
|
|
|
|
// If there still room and we haven't reached the limit
|
|
|
|
// append and continue.
|
|
|
|
// Otherwise publish what you have so far, reset the bundle to the
|
|
|
|
// current envelope, and leave if we hit the limit
|
2019-04-23 07:04:58 +00:00
|
|
|
for iter.Next() {
|
2019-05-10 10:26:57 +00:00
|
|
|
rawValue, err := iter.GetEnvelope(bloom)
|
|
|
|
if err != nil {
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:processRequestInBundles]Failed to get envelope from iterator",
|
2019-05-10 10:26:57 +00:00
|
|
|
"err", err,
|
2020-01-08 11:12:23 +00:00
|
|
|
"requestID", requestID,
|
|
|
|
)
|
2019-05-10 10:26:57 +00:00
|
|
|
continue
|
2019-05-09 10:58:02 +00:00
|
|
|
}
|
2019-05-10 10:26:57 +00:00
|
|
|
if rawValue == nil {
|
2018-10-19 09:09:13 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-05-13 09:25:46 +00:00
|
|
|
key, err := iter.DBKey()
|
|
|
|
if err != nil {
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:processRequestInBundles] failed getting key",
|
|
|
|
"requestID", requestID,
|
|
|
|
)
|
2019-05-13 09:25:46 +00:00
|
|
|
break
|
|
|
|
|
|
|
|
}
|
2019-05-10 10:26:57 +00:00
|
|
|
|
2019-06-26 16:17:41 +00:00
|
|
|
// TODO(adam): this is invalid code. If the limit is 1000,
|
|
|
|
// it will only send 999 items and send a cursor.
|
2019-05-09 10:58:02 +00:00
|
|
|
lastEnvelopeHash = key.EnvelopeHash()
|
2019-02-08 15:39:24 +00:00
|
|
|
processedEnvelopes++
|
2019-05-09 10:58:02 +00:00
|
|
|
envelopeSize := uint32(len(rawValue))
|
2019-06-26 16:17:41 +00:00
|
|
|
limitReached := processedEnvelopes >= limit
|
2019-02-08 15:39:24 +00:00
|
|
|
newSize := bundleSize + envelopeSize
|
|
|
|
|
|
|
|
// If we still have some room for messages, add and continue
|
2020-01-08 11:12:23 +00:00
|
|
|
if !limitReached && newSize < s.service.MaxMessageSize() {
|
2019-05-09 10:58:02 +00:00
|
|
|
bundle = append(bundle, rawValue)
|
2018-10-19 09:09:13 +00:00
|
|
|
bundleSize = newSize
|
2018-06-27 12:22:09 +00:00
|
|
|
continue
|
2018-05-11 19:43:07 +00:00
|
|
|
}
|
|
|
|
|
2019-02-08 15:39:24 +00:00
|
|
|
// Publish if anything is in the bundle (there should always be
|
|
|
|
// something unless limit = 1)
|
|
|
|
if len(bundle) != 0 {
|
|
|
|
batches = append(batches, bundle)
|
|
|
|
processedEnvelopesSize += int64(bundleSize)
|
|
|
|
}
|
2018-10-19 09:09:13 +00:00
|
|
|
|
2019-02-08 15:39:24 +00:00
|
|
|
// Reset the bundle with the current envelope
|
2019-05-09 10:58:02 +00:00
|
|
|
bundle = []rlp.RawValue{rawValue}
|
2019-02-08 15:39:24 +00:00
|
|
|
bundleSize = envelopeSize
|
2018-10-19 09:09:13 +00:00
|
|
|
|
2019-02-08 15:39:24 +00:00
|
|
|
// Leave if we reached the limit
|
2018-10-19 09:09:13 +00:00
|
|
|
if limitReached {
|
2019-05-09 10:58:02 +00:00
|
|
|
nextCursor = key.Cursor()
|
2018-10-19 09:09:13 +00:00
|
|
|
break
|
|
|
|
}
|
2019-02-08 15:39:24 +00:00
|
|
|
}
|
2018-07-02 07:38:10 +00:00
|
|
|
|
2019-02-08 15:39:24 +00:00
|
|
|
if len(bundle) > 0 {
|
|
|
|
batches = append(batches, bundle)
|
|
|
|
processedEnvelopesSize += int64(bundleSize)
|
2018-10-19 09:09:13 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:processRequestInBundles] publishing envelopes",
|
2019-02-27 14:30:08 +00:00
|
|
|
"requestID", requestID,
|
|
|
|
"batchesCount", len(batches),
|
|
|
|
"envelopeCount", processedEnvelopes,
|
|
|
|
"processedEnvelopesSize", processedEnvelopesSize,
|
2020-01-08 11:12:23 +00:00
|
|
|
"cursor", nextCursor,
|
|
|
|
)
|
2019-02-08 15:39:24 +00:00
|
|
|
|
|
|
|
// Publish
|
2019-11-06 14:12:05 +00:00
|
|
|
batchLoop:
|
2019-02-08 15:39:24 +00:00
|
|
|
for _, batch := range batches {
|
2019-02-22 08:55:37 +00:00
|
|
|
select {
|
|
|
|
case output <- batch:
|
|
|
|
// It might happen that during producing the batches,
|
|
|
|
// the connection with the peer goes down and
|
|
|
|
// the consumer of `output` channel exits prematurely.
|
|
|
|
// In such a case, we should stop pushing batches and exit.
|
|
|
|
case <-cancel:
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:processRequestInBundles] failed to push all batches",
|
|
|
|
"requestID", requestID,
|
|
|
|
)
|
2019-11-06 14:12:05 +00:00
|
|
|
break batchLoop
|
2019-02-22 08:55:37 +00:00
|
|
|
case <-time.After(timeout):
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error(
|
|
|
|
"[mailserver:processRequestInBundles] timed out pushing a batch",
|
|
|
|
"requestID", requestID,
|
|
|
|
)
|
2019-11-06 14:12:05 +00:00
|
|
|
break batchLoop
|
2019-02-22 08:55:37 +00:00
|
|
|
}
|
2018-05-11 19:43:07 +00:00
|
|
|
}
|
|
|
|
|
2019-10-22 17:32:05 +00:00
|
|
|
envelopesCounter.Inc()
|
|
|
|
sentEnvelopeBatchSizeMeter.Observe(float64(processedEnvelopesSize))
|
2018-05-11 19:43:07 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Info(
|
|
|
|
"[mailserver:processRequestInBundles] envelopes published",
|
|
|
|
"requestID", requestID,
|
|
|
|
)
|
2019-02-08 15:39:24 +00:00
|
|
|
close(output)
|
|
|
|
|
2018-12-06 09:48:28 +00:00
|
|
|
return nextCursor, lastEnvelopeHash
|
2018-05-11 19:43:07 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *mailServer) sendRawEnvelopes(peerID types.Hash, envelopes []rlp.RawValue, batch bool) error {
|
2019-10-22 17:32:05 +00:00
|
|
|
timer := prom.NewTimer(sendRawEnvelopeDuration)
|
|
|
|
defer timer.ObserveDuration()
|
2018-11-19 09:14:03 +00:00
|
|
|
|
2018-10-19 09:09:13 +00:00
|
|
|
if batch {
|
2020-01-08 11:12:23 +00:00
|
|
|
return s.service.SendRawP2PDirect(peerID.Bytes(), envelopes...)
|
2018-10-19 09:09:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, env := range envelopes {
|
2020-01-08 11:12:23 +00:00
|
|
|
if err := s.service.SendRawP2PDirect(peerID.Bytes(), env); err != nil {
|
2018-10-19 09:09:13 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *mailServer) sendHistoricMessageResponse(peerID, reqID, lastEnvelopeHash types.Hash, cursor []byte) {
|
|
|
|
payload := s.adapter.CreateRequestCompletedPayload(reqID, lastEnvelopeHash, cursor)
|
|
|
|
err := s.service.SendHistoricMessageResponse(peerID.Bytes(), payload)
|
|
|
|
if err != nil {
|
|
|
|
deliveryFailuresCounter.WithLabelValues("historic_msg_resp").Inc()
|
|
|
|
log.Error(
|
|
|
|
"[mailserver:DeliverMail] error sending historic message response",
|
|
|
|
"err", err,
|
|
|
|
"peerID", peerID,
|
|
|
|
"requestID", reqID,
|
|
|
|
)
|
|
|
|
}
|
2018-06-15 15:12:31 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func (s *mailServer) sendHistoricMessageErrorResponse(peerID, reqID types.Hash, errorToReport error) {
|
|
|
|
payload := s.adapter.CreateRequestFailedPayload(reqID, errorToReport)
|
|
|
|
err := s.service.SendHistoricMessageResponse(peerID.Bytes(), payload)
|
2018-10-18 10:25:00 +00:00
|
|
|
// if we can't report an error, probably something is wrong with p2p connection,
|
|
|
|
// so we just print a log entry to document this sad fact
|
|
|
|
if err != nil {
|
2020-01-08 11:12:23 +00:00
|
|
|
log.Error("Error while reporting error response", "err", err, "peerID", peerID.String())
|
2018-10-19 09:09:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
func extractBloomFromEncodedEnvelope(rawValue rlp.RawValue) ([]byte, error) {
|
2021-05-13 16:05:34 +00:00
|
|
|
var envelope wakucommon.Envelope
|
2020-01-08 11:12:23 +00:00
|
|
|
decodeErr := rlp.DecodeBytes(rawValue, &envelope)
|
|
|
|
if decodeErr != nil {
|
|
|
|
return nil, decodeErr
|
2018-05-17 11:21:04 +00:00
|
|
|
}
|
2020-01-08 11:12:23 +00:00
|
|
|
return envelope.Bloom(), nil
|
2018-05-11 19:43:07 +00:00
|
|
|
}
|
2018-05-17 11:21:04 +00:00
|
|
|
|
2020-01-08 11:12:23 +00:00
|
|
|
// checkMsgSignature returns an error in case the message is not correctly signed.
|
|
|
|
func checkMsgSignature(reqSrc *ecdsa.PublicKey, id []byte) error {
|
|
|
|
src := crypto.FromECDSAPub(reqSrc)
|
2018-05-21 11:30:37 +00:00
|
|
|
if len(src)-len(id) == 1 {
|
|
|
|
src = src[1:]
|
2018-05-17 11:21:04 +00:00
|
|
|
}
|
|
|
|
|
2018-05-21 11:30:37 +00:00
|
|
|
// if you want to check the signature, you can do it here. e.g.:
|
|
|
|
// if !bytes.Equal(peerID, src) {
|
|
|
|
if src == nil {
|
2020-01-08 11:12:23 +00:00
|
|
|
return errors.New("wrong signature of p2p request")
|
2018-05-17 11:21:04 +00:00
|
|
|
}
|
|
|
|
|
2018-05-21 11:30:37 +00:00
|
|
|
return nil
|
2018-05-17 11:21:04 +00:00
|
|
|
}
|