Create temporary whisperv6 patch file. Closes #636

This commit is contained in:
Pedro Pombeiro 2018-02-09 18:46:21 +01:00
parent f5393274ff
commit 9976018978
No known key found for this signature in database
GPG Key ID: A65DEB11E4BBC647
16 changed files with 3504 additions and 569 deletions

2
Gopkg.lock generated
View File

@ -109,7 +109,7 @@
"whisper/notifications",
"whisper/whisperv5"
]
revision = "6bba5c70ba424f5efb448536ae3d000f96e7cc9e"
revision = "09f08d50335df2d8c9b9f062b18f0ebd3a84133d"
source = "https://github.com/status-im/go-ethereum.git"
[[projects]]

File diff suppressed because it is too large Load Diff

View File

@ -14,3 +14,19 @@ index e3c2f4a97..96d895fdc 100644
)
var (
diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go
index a2c75a41c..4d3662880 100644
--- a/whisper/whisperv6/api.go
+++ b/whisper/whisperv6/api.go
@@ -33,7 +33,10 @@ import (
)
const (
- filterTimeout = 300 // filters are considered timeout out after filterTimeout seconds
+ // HACK: make the filter essentially never timeout (1 year of timeout time)
+ // It's a hack, but that simplifies rebasing process, because the patch consists
+ // only of 1 LoC change (excluding this comment).
+ filterTimeout = 525600 * 60 // filters are considered timeout out after filterTimeout seconds
)
// List of errors

View File

@ -18,13 +18,15 @@ Instructions for creating a patch from the command line:
# Patches
- [`0000-accounts-hd-keys.patch`](./0000-accounts-hd-keys.patch) — adds support for HD extended keys (links/docs?)
- [`0002-les-api-status.patch`](./0002-les-api-status.patch) — adds StatusBackend into LES code (need to be inspected, some things can and should be done outside of les code
- [`0003-dockerfiles-wnode-swarm.patch`](./0003-dockerfiles-wnode-swarm.patch) — adds Dockerfiles (who uses this?)
- [`0004-whisper-notifications.patch`](./0004-whisper-notifications.patch) — adds Whisper notifications (need to be reviewed and documented)
- [`0006-latest-cht.patch`](./0006-latest-cht.patch) updates CHT root hashes, should be updated regularly to keep sync fast, until proper Trusted Checkpoint sync is not implemented as part of LES/2 protocol.
- [`0007-README.patch`](./0007-README.patch) — update upstream README.md.
- [`0010-geth-17-fix-npe-in-filter-system.patch`](./0010-geth-17-fix-npe-in-filter-system.patch) - Temp patch for 1.7.x to fix a NPE in the filter system.
- [`0000-accounts-hd-keys.patch`](./0000-accounts-hd-keys.patch) — adds support for HD extended keys (links/docs?)
- [`0002-les-api-status.patch`](./0002-les-api-status.patch) — adds StatusBackend into LES code (need to be inspected, some things can and should be done outside of les code
- [`0003-dockerfiles-wnode-swarm.patch`](./0003-dockerfiles-wnode-swarm.patch) — adds Dockerfiles (who uses this?)
- [`0004-whisper-notifications.patch`](./0004-whisper-notifications.patch) — adds Whisper notifications (need to be reviewed and documented)
- [`0006-latest-cht.patch`](./0006-latest-cht.patch) updates CHT root hashes, should be updated regularly to keep sync fast, until proper Trusted Checkpoint sync is not implemented as part of LES/2 protocol.
- [`0007-README.patch`](./0007-README.patch) — update upstream README.md.
- [`0009-whisper-envelopes-tracing.patch`](./0009-whisper-envelopes-tracing.patch) — adds Whisper envelope tracing (need to be reviewed and documented)
- [`0010-geth-17-fix-npe-in-filter-system.patch`](./0010-geth-17-fix-npe-in-filter-system.patch) - Temp patch for 1.7.x to fix a NPE in the filter system.
- [`0011-geth-17-whisperv6-70fbc87.patch`](./0011-geth-17-whisperv6-70fbc87.patch) - Temp patch for 1.7.x to update whisper v6 to the upstream version at the `70fbc87` SHA1.
# Updating upstream version
@ -37,47 +39,53 @@ When a new stable release of `go-ethereum` comes out, we need to upgrade our for
#### I. In our fork at /status-im/go-ethereum.
1. Remove the local `develop` branch.
```bash
git branch -D develop
```
2. Pull upstream release branch into `develop` branch.
```bash
git pull git@github.com:ethereum/go-ethereum.git <release_branch>:develop
```
In our case `<release_branch>` would be `release/1.7` because the current stable version is
1.7.x.
```bash
git branch -D develop
```
3. Apply patches
```bash
for patch in $GOPATH/src/github.com/status-im/status-go/_assets/patches/geth/*.patch;
do
patch -p1 < $patch;
done
```
1. Pull upstream release branch into `develop` branch.
Once patches applied, you might want to inspect changes between current vendored version and newly patched version by this command:
```bash
diff -Nru -x "*_test.go" -x "vendor" -x ".git" -x "tests" -x "build" --brief $GOPATH/src/github.com/status-im/go-ethereum $GOPATH/src/github.com/status-im/status-go/vendor/github.com/ethereum/go-ethereum
```
```bash
git pull git@github.com:ethereum/go-ethereum.git <release_branch>:develop
```
In our case `<release_branch>` would be `release/1.7` because the current stable version is
1.7.x.
4. Push `develop` branch to our remote, rewriting history
```bash
git push -f origin develop
```
1. Apply patches
```bash
for patch in $GOPATH/src/github.com/status-im/status-go/_assets/patches/geth/*.patch;
do
patch -p1 < $patch;
done
```
Once patches applied, you might want to inspect changes between current vendored version and newly patched version by this command:
```bash
diff -Nru -x "*_test.go" -x "vendor" -x ".git" -x "tests" -x "build" --brief $GOPATH/src/github.com/status-im/go-ethereum $GOPATH/src/github.com/status-im/status-go/vendor/github.com/ethereum/go-ethereum
```
1. Push `develop` branch to our remote, rewriting history
```bash
git push -f origin develop
```
#### II. In status-go repository
1. Update vendored `go-ethereum` (note that we use upstream's address there, we override the download link to our fork address in `Gopkg.toml`)
```bash
dep ensure --update github.com/ethereum/go-ethereum
```
`Gopkg.lock` will change and files within `vendor/ethereum/go-ethereum`.
```bash
dep ensure --update github.com/ethereum/go-ethereum
```
2. Run tests
```bash
make ci
```
`Gopkg.lock` will change and files within `vendor/ethereum/go-ethereum`.
3. Commit & push changes, create a PR
1. Run tests
```bash
make ci
```
1. Commit & push changes, create a PR

View File

@ -33,9 +33,13 @@ import (
)
const (
filterTimeout = 300 // filters are considered timeout out after filterTimeout seconds
// HACK: make the filter essentially never timeout (1 year of timeout time)
// It's a hack, but that simplifies rebasing process, because the patch consists
// only of 1 LoC change (excluding this comment).
filterTimeout = 525600 * 60 // filters are considered timeout out after filterTimeout seconds
)
// List of errors
var (
ErrSymAsym = errors.New("specify either a symmetric or an asymmetric key")
ErrInvalidSymmetricKey = errors.New("invalid symmetric key")
@ -116,12 +120,17 @@ func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32)
return true, api.w.SetMaxMessageSize(size)
}
// SetMinPow sets the minimum PoW for a message before it is accepted.
// SetMinPoW sets the minimum PoW, and notifies the peers.
func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) {
return true, api.w.SetMinimumPoW(pow)
}
// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages.
// SetBloomFilter sets the new value of bloom filter, and notifies the peers.
func (api *PublicWhisperAPI) SetBloomFilter(ctx context.Context, bloom hexutil.Bytes) (bool, error) {
return true, api.w.SetBloomFilter(bloom)
}
// MarkTrustedPeer marks a peer trusted, which will allow it to send historic (expired) messages.
// Note: This function is not adding new nodes, the node needs to exists as a peer.
func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) {
n, err := discover.ParseNode(enode)
@ -169,7 +178,7 @@ func (api *PublicWhisperAPI) GetPublicKey(ctx context.Context, id string) (hexut
return crypto.FromECDSAPub(&key.PublicKey), nil
}
// GetPublicKey returns the private key associated with the given key. The key is the hex
// GetPrivateKey returns the private key associated with the given key. The key is the hex
// encoded representation of a key in the form specified in section 4.3.6 of ANSI X9.62.
func (api *PublicWhisperAPI) GetPrivateKey(ctx context.Context, id string) (hexutil.Bytes, error) {
key, err := api.w.GetPrivateKey(id)
@ -272,7 +281,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return false, err
}
if !validateSymmetricKey(params.KeySym) {
if !validateDataIntegrity(params.KeySym, aesKeyLength) {
return false, ErrInvalidSymmetricKey
}
}
@ -378,7 +387,7 @@ func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc.
if err != nil {
return nil, err
}
if !validateSymmetricKey(key) {
if !validateDataIntegrity(key, aesKeyLength) {
return nil, ErrInvalidSymmetricKey
}
filter.KeySym = key
@ -550,7 +559,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) {
if keySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return "", err
}
if !validateSymmetricKey(keySym) {
if !validateDataIntegrity(keySym, aesKeyLength) {
return "", ErrInvalidSymmetricKey
}
}
@ -562,7 +571,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) {
}
if len(req.Topics) > 0 {
topics = make([][]byte, 1)
topics = make([][]byte, 0, len(req.Topics))
for _, topic := range req.Topics {
topics = append(topics, topic[:])
}

View File

@ -16,11 +16,13 @@
package whisperv6
// Config represents the configuration state of a whisper node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
MinimumAcceptedPOW float64 `toml:",omitempty"`
}
// DefaultConfig represents (shocker!) the default configuration.
var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize,
MinimumAcceptedPOW: DefaultMinimumPoW,

View File

@ -27,6 +27,9 @@ Whisper is a pure identity-based messaging system. Whisper provides a low-level
or prejudiced by the low-level hardware attributes and characteristics,
particularly the notion of singular endpoints.
*/
// Contains the Whisper protocol constant definitions
package whisperv6
import (
@ -34,39 +37,46 @@ import (
"time"
)
// Whisper protocol parameters
const (
EnvelopeVersion = uint64(0)
ProtocolVersion = uint64(5)
ProtocolVersionStr = "5.0"
ProtocolName = "shh"
ProtocolVersion = uint64(6) // Protocol version number
ProtocolVersionStr = "6.0" // The same, as a string
ProtocolName = "shh" // Nickname of the protocol in geth
statusCode = 0 // used by whisper protocol
messagesCode = 1 // normal whisper message
p2pCode = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
p2pRequestCode = 3 // peer-to-peer message, used by Dapp protocol
NumberOfMessageCodes = 64
// whisper protocol message codes, according to EIP-627
statusCode = 0 // used by whisper protocol
messagesCode = 1 // normal whisper message
powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128
paddingMask = byte(3)
SizeMask = byte(3) // mask used to extract the size of payload size field from the flags
signatureFlag = byte(4)
TopicLength = 4
signatureLength = 65
aesKeyLength = 32
AESNonceLength = 12
keyIdSize = 32
TopicLength = 4 // in bytes
signatureLength = 65 // in bytes
aesKeyLength = 32 // in bytes
aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize()
keyIDSize = 32 // in bytes
bloomFilterSize = 64 // in bytes
flagsLength = 1
EnvelopeHeaderLength = 20
MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message.
DefaultMaxMessageSize = uint32(1024 * 1024)
DefaultMinimumPoW = 0.2
padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol (must not exceed 2^24)
padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol
messageQueueLimit = 1024
expirationCycle = time.Second
transmissionCycle = 300 * time.Millisecond
DefaultTTL = 50 // seconds
SynchAllowance = 10 // seconds
DefaultTTL = 50 // seconds
DefaultSyncAllowance = 10 // seconds
)
type unknownVersionError uint64

View File

@ -36,76 +36,60 @@ import (
// Envelope represents a clear-text data packet to transmit through the Whisper
// network. Its contents may or may not be encrypted and signed.
type Envelope struct {
Version []byte
Expiry uint32
TTL uint32
Topic TopicType
AESNonce []byte
Data []byte
EnvNonce uint64
Expiry uint32
TTL uint32
Topic TopicType
Data []byte
Nonce uint64
pow float64 // Message-specific PoW as described in the Whisper specification.
hash common.Hash // Cached hash of the envelope to avoid rehashing every time.
// Don't access hash directly, use Hash() function instead.
pow float64 // Message-specific PoW as described in the Whisper specification.
// the following variables should not be accessed directly, use the corresponding function instead: Hash(), Bloom()
hash common.Hash // Cached hash of the envelope to avoid rehashing every time.
bloom []byte
}
// size returns the size of envelope as it is sent (i.e. public fields only)
func (e *Envelope) size() int {
return 20 + len(e.Version) + len(e.AESNonce) + len(e.Data)
return EnvelopeHeaderLength + len(e.Data)
}
// rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce.
func (e *Envelope) rlpWithoutNonce() []byte {
res, _ := rlp.EncodeToBytes([]interface{}{e.Version, e.Expiry, e.TTL, e.Topic, e.AESNonce, e.Data})
res, _ := rlp.EncodeToBytes([]interface{}{e.Expiry, e.TTL, e.Topic, e.Data})
return res
}
// NewEnvelope wraps a Whisper message with expiration and destination data
// included into an envelope for network forwarding.
func NewEnvelope(ttl uint32, topic TopicType, aesNonce []byte, msg *sentMessage) *Envelope {
func NewEnvelope(ttl uint32, topic TopicType, msg *sentMessage) *Envelope {
env := Envelope{
Version: make([]byte, 1),
Expiry: uint32(time.Now().Add(time.Second * time.Duration(ttl)).Unix()),
TTL: ttl,
Topic: topic,
AESNonce: aesNonce,
Data: msg.Raw,
EnvNonce: 0,
}
if EnvelopeVersion < 256 {
env.Version[0] = byte(EnvelopeVersion)
} else {
panic("please increase the size of Envelope.Version before releasing this version")
Expiry: uint32(time.Now().Add(time.Second * time.Duration(ttl)).Unix()),
TTL: ttl,
Topic: topic,
Data: msg.Raw,
Nonce: 0,
}
return &env
}
func (e *Envelope) IsSymmetric() bool {
return len(e.AESNonce) > 0
}
func (e *Envelope) isAsymmetric() bool {
return !e.IsSymmetric()
}
func (e *Envelope) Ver() uint64 {
return bytesToUintLittleEndian(e.Version)
}
// Seal closes the envelope by spending the requested amount of time as a proof
// of work on hashing the data.
func (e *Envelope) Seal(options *MessageParams) error {
var target, bestBit int
if options.PoW == 0 {
// adjust for the duration of Seal() execution only if execution time is predefined unconditionally
// PoW is not required
return nil
}
var target, bestBit int
if options.PoW < 0 {
// target is not set - the function should run for a period
// of time specified in WorkTime param. Since we can predict
// the execution time, we can also adjust Expiry.
e.Expiry += options.WorkTime
} else {
target = e.powToFirstBit(options.PoW)
if target < 1 {
target = 1
}
}
buf := make([]byte, 64)
@ -119,7 +103,7 @@ func (e *Envelope) Seal(options *MessageParams) error {
d := new(big.Int).SetBytes(crypto.Keccak256(buf))
firstBit := math.FirstBitSet(d)
if firstBit > bestBit {
e.EnvNonce, bestBit = nonce, firstBit
e.Nonce, bestBit = nonce, firstBit
if target > 0 && bestBit >= target {
return nil
}
@ -135,6 +119,8 @@ func (e *Envelope) Seal(options *MessageParams) error {
return nil
}
// PoW computes (if necessary) and returns the proof of work target
// of the envelope.
func (e *Envelope) PoW() float64 {
if e.pow == 0 {
e.calculatePoW(0)
@ -146,7 +132,7 @@ func (e *Envelope) calculatePoW(diff uint32) {
buf := make([]byte, 64)
h := crypto.Keccak256(e.rlpWithoutNonce())
copy(buf[:32], h)
binary.BigEndian.PutUint64(buf[56:], e.EnvNonce)
binary.BigEndian.PutUint64(buf[56:], e.Nonce)
d := new(big.Int).SetBytes(crypto.Keccak256(buf))
firstBit := math.FirstBitSet(d)
x := gmath.Pow(2, float64(firstBit))
@ -161,7 +147,11 @@ func (e *Envelope) powToFirstBit(pow float64) int {
x *= float64(e.TTL)
bits := gmath.Log2(x)
bits = gmath.Ceil(bits)
return int(bits)
res := int(bits)
if res < 1 {
res = 1
}
return res
}
// Hash returns the SHA3 hash of the envelope, calculating it if not yet done.
@ -209,7 +199,7 @@ func (e *Envelope) OpenAsymmetric(key *ecdsa.PrivateKey) (*ReceivedMessage, erro
// OpenSymmetric tries to decrypt an envelope, potentially encrypted with a particular key.
func (e *Envelope) OpenSymmetric(key []byte) (msg *ReceivedMessage, err error) {
msg = &ReceivedMessage{Raw: e.Data}
err = msg.decryptSymmetric(key, e.AESNonce)
err = msg.decryptSymmetric(key)
if err != nil {
msg = nil
}
@ -218,12 +208,17 @@ func (e *Envelope) OpenSymmetric(key []byte) (msg *ReceivedMessage, err error) {
// Open tries to decrypt an envelope, and populates the message fields in case of success.
func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
if e.isAsymmetric() {
// The API interface forbids filters doing both symmetric and asymmetric encryption.
if watcher.expectsAsymmetricEncryption() && watcher.expectsSymmetricEncryption() {
return nil
}
if watcher.expectsAsymmetricEncryption() {
msg, _ = e.OpenAsymmetric(watcher.KeyAsym)
if msg != nil {
msg.Dst = &watcher.KeyAsym.PublicKey
}
} else if e.IsSymmetric() {
} else if watcher.expectsSymmetricEncryption() {
msg, _ = e.OpenSymmetric(watcher.KeySym)
if msg != nil {
msg.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
@ -231,7 +226,7 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
}
if msg != nil {
ok := msg.Validate()
ok := msg.ValidateAndParse()
if !ok {
return nil
}
@ -240,7 +235,33 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
msg.TTL = e.TTL
msg.Sent = e.Expiry - e.TTL
msg.EnvelopeHash = e.Hash()
msg.EnvelopeVersion = e.Ver()
}
return msg
}
// Bloom maps 4-bytes Topic into 64-byte bloom filter with 3 bits set (at most).
func (e *Envelope) Bloom() []byte {
if e.bloom == nil {
e.bloom = TopicToBloom(e.Topic)
}
return e.bloom
}
// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes)
func TopicToBloom(topic TopicType) []byte {
b := make([]byte, bloomFilterSize)
var index [3]int
for j := 0; j < 3; j++ {
index[j] = int(topic[j])
if (topic[3] & (1 << uint(j))) != 0 {
index[j] += 256
}
}
for j := 0; j < 3; j++ {
byteIndex := index[j] / 8
bitIndex := index[j] % 8
b[byteIndex] = (1 << uint(bitIndex))
}
return b
}

View File

@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// Filter represents a Whisper message filter
type Filter struct {
Src *ecdsa.PublicKey // Sender of the message
KeyAsym *ecdsa.PrivateKey // Private Key of recipient
@ -39,12 +40,14 @@ type Filter struct {
mutex sync.RWMutex
}
// Filters represents a collection of filters
type Filters struct {
watchers map[string]*Filter
whisper *Whisper
mutex sync.RWMutex
}
// NewFilters returns a newly created filter collection
func NewFilters(w *Whisper) *Filters {
return &Filters{
watchers: make(map[string]*Filter),
@ -52,7 +55,12 @@ func NewFilters(w *Whisper) *Filters {
}
}
// Install will add a new filter to the filter collection
func (fs *Filters) Install(watcher *Filter) (string, error) {
if watcher.KeySym != nil && watcher.KeyAsym != nil {
return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys")
}
if watcher.Messages == nil {
watcher.Messages = make(map[common.Hash]*ReceivedMessage)
}
@ -77,6 +85,8 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
return id, err
}
// Uninstall will remove a filter whose id has been specified from
// the filter collection
func (fs *Filters) Uninstall(id string) bool {
fs.mutex.Lock()
defer fs.mutex.Unlock()
@ -87,12 +97,15 @@ func (fs *Filters) Uninstall(id string) bool {
return false
}
// Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
return fs.watchers[id]
}
// NotifyWatchers notifies any filter that has declared interest
// for the envelope's topic.
func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
var msg *ReceivedMessage
@ -136,9 +149,9 @@ func (f *Filter) processEnvelope(env *Envelope) *ReceivedMessage {
msg := env.Open(f)
if msg != nil {
return msg
} else {
log.Trace("processing envelope: failed to open", "hash", env.Hash().Hex())
}
log.Trace("processing envelope: failed to open", "hash", env.Hash().Hex())
} else {
log.Trace("processing envelope: does not match", "hash", env.Hash().Hex())
}
@ -153,6 +166,8 @@ func (f *Filter) expectsSymmetricEncryption() bool {
return f.KeySym != nil
}
// Trigger adds a yet-unknown message to the filter's list of
// received messages.
func (f *Filter) Trigger(msg *ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
@ -162,6 +177,8 @@ func (f *Filter) Trigger(msg *ReceivedMessage) {
}
}
// Retrieve will return the list of all received messages associated
// to a filter.
func (f *Filter) Retrieve() (all []*ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
@ -175,6 +192,9 @@ func (f *Filter) Retrieve() (all []*ReceivedMessage) {
return all
}
// MatchMessage checks if the filter matches an already decrypted
// message (i.e. a Message that has already been handled by
// MatchEnvelope when checked by a previous filter)
func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
if f.PoW > 0 && msg.PoW < f.PoW {
return false
@ -188,19 +208,18 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
return false
}
// MatchEnvelope checks if it's worth decrypting the message. If
// it returns `true`, client code is expected to attempt decrypting
// the message and subsequently call MatchMessage.
func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
if f.PoW > 0 && envelope.pow < f.PoW {
return false
}
if f.expectsAsymmetricEncryption() && envelope.isAsymmetric() {
return f.MatchTopic(envelope.Topic)
} else if f.expectsSymmetricEncryption() && envelope.IsSymmetric() {
return f.MatchTopic(envelope.Topic)
}
return false
return f.MatchTopic(envelope.Topic)
}
// MatchTopic checks that the filter captures a given topic.
func (f *Filter) MatchTopic(topic TopicType) bool {
if len(f.Topics) == 0 {
// any topic matches
@ -216,8 +235,12 @@ func (f *Filter) MatchTopic(topic TopicType) bool {
}
func matchSingleTopic(topic TopicType, bt []byte) bool {
if len(bt) > 4 {
bt = bt[:4]
if len(bt) > TopicLength {
bt = bt[:TopicLength]
}
if len(bt) < TopicLength {
return false
}
for j, b := range bt {
@ -228,6 +251,7 @@ func matchSingleTopic(topic TopicType, bt []byte) bool {
return true
}
// IsPubKeyEqual checks that two public keys are equal
func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
if !ValidatePublicKey(a) {
return false

View File

@ -10,6 +10,7 @@ import (
var _ = (*criteriaOverride)(nil)
// MarshalJSON marshals type Criteria to a json string
func (c Criteria) MarshalJSON() ([]byte, error) {
type Criteria struct {
SymKeyID string `json:"symKeyID"`
@ -29,14 +30,15 @@ func (c Criteria) MarshalJSON() ([]byte, error) {
return json.Marshal(&enc)
}
// UnmarshalJSON unmarshals type Criteria to a json string
func (c *Criteria) UnmarshalJSON(input []byte) error {
type Criteria struct {
SymKeyID *string `json:"symKeyID"`
PrivateKeyID *string `json:"privateKeyID"`
Sig hexutil.Bytes `json:"sig"`
MinPow *float64 `json:"minPow"`
Topics []TopicType `json:"topics"`
AllowP2P *bool `json:"allowP2P"`
SymKeyID *string `json:"symKeyID"`
PrivateKeyID *string `json:"privateKeyID"`
Sig *hexutil.Bytes `json:"sig"`
MinPow *float64 `json:"minPow"`
Topics []TopicType `json:"topics"`
AllowP2P *bool `json:"allowP2P"`
}
var dec Criteria
if err := json.Unmarshal(input, &dec); err != nil {
@ -49,7 +51,7 @@ func (c *Criteria) UnmarshalJSON(input []byte) error {
c.PrivateKeyID = *dec.PrivateKeyID
}
if dec.Sig != nil {
c.Sig = dec.Sig
c.Sig = *dec.Sig
}
if dec.MinPow != nil {
c.MinPow = *dec.MinPow

View File

@ -10,6 +10,7 @@ import (
var _ = (*messageOverride)(nil)
// MarshalJSON marshals type Message to a json string
func (m Message) MarshalJSON() ([]byte, error) {
type Message struct {
Sig hexutil.Bytes `json:"sig,omitempty"`
@ -35,24 +36,25 @@ func (m Message) MarshalJSON() ([]byte, error) {
return json.Marshal(&enc)
}
// UnmarshalJSON unmarshals type Message to a json string
func (m *Message) UnmarshalJSON(input []byte) error {
type Message struct {
Sig hexutil.Bytes `json:"sig,omitempty"`
TTL *uint32 `json:"ttl"`
Timestamp *uint32 `json:"timestamp"`
Topic *TopicType `json:"topic"`
Payload hexutil.Bytes `json:"payload"`
Padding hexutil.Bytes `json:"padding"`
PoW *float64 `json:"pow"`
Hash hexutil.Bytes `json:"hash"`
Dst hexutil.Bytes `json:"recipientPublicKey,omitempty"`
Sig *hexutil.Bytes `json:"sig,omitempty"`
TTL *uint32 `json:"ttl"`
Timestamp *uint32 `json:"timestamp"`
Topic *TopicType `json:"topic"`
Payload *hexutil.Bytes `json:"payload"`
Padding *hexutil.Bytes `json:"padding"`
PoW *float64 `json:"pow"`
Hash *hexutil.Bytes `json:"hash"`
Dst *hexutil.Bytes `json:"recipientPublicKey,omitempty"`
}
var dec Message
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.Sig != nil {
m.Sig = dec.Sig
m.Sig = *dec.Sig
}
if dec.TTL != nil {
m.TTL = *dec.TTL
@ -64,19 +66,19 @@ func (m *Message) UnmarshalJSON(input []byte) error {
m.Topic = *dec.Topic
}
if dec.Payload != nil {
m.Payload = dec.Payload
m.Payload = *dec.Payload
}
if dec.Padding != nil {
m.Padding = dec.Padding
m.Padding = *dec.Padding
}
if dec.PoW != nil {
m.PoW = *dec.PoW
}
if dec.Hash != nil {
m.Hash = dec.Hash
m.Hash = *dec.Hash
}
if dec.Dst != nil {
m.Dst = dec.Dst
m.Dst = *dec.Dst
}
return nil
}

View File

@ -10,6 +10,7 @@ import (
var _ = (*newMessageOverride)(nil)
// MarshalJSON marshals type NewMessage to a json string
func (n NewMessage) MarshalJSON() ([]byte, error) {
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
@ -37,18 +38,19 @@ func (n NewMessage) MarshalJSON() ([]byte, error) {
return json.Marshal(&enc)
}
// UnmarshalJSON unmarshals type NewMessage to a json string
func (n *NewMessage) UnmarshalJSON(input []byte) error {
type NewMessage struct {
SymKeyID *string `json:"symKeyID"`
PublicKey hexutil.Bytes `json:"pubKey"`
Sig *string `json:"sig"`
TTL *uint32 `json:"ttl"`
Topic *TopicType `json:"topic"`
Payload hexutil.Bytes `json:"payload"`
Padding hexutil.Bytes `json:"padding"`
PowTime *uint32 `json:"powTime"`
PowTarget *float64 `json:"powTarget"`
TargetPeer *string `json:"targetPeer"`
SymKeyID *string `json:"symKeyID"`
PublicKey *hexutil.Bytes `json:"pubKey"`
Sig *string `json:"sig"`
TTL *uint32 `json:"ttl"`
Topic *TopicType `json:"topic"`
Payload *hexutil.Bytes `json:"payload"`
Padding *hexutil.Bytes `json:"padding"`
PowTime *uint32 `json:"powTime"`
PowTarget *float64 `json:"powTarget"`
TargetPeer *string `json:"targetPeer"`
}
var dec NewMessage
if err := json.Unmarshal(input, &dec); err != nil {
@ -58,7 +60,7 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error {
n.SymKeyID = *dec.SymKeyID
}
if dec.PublicKey != nil {
n.PublicKey = dec.PublicKey
n.PublicKey = *dec.PublicKey
}
if dec.Sig != nil {
n.Sig = *dec.Sig
@ -70,10 +72,10 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error {
n.Topic = *dec.Topic
}
if dec.Payload != nil {
n.Payload = dec.Payload
n.Payload = *dec.Payload
}
if dec.Padding != nil {
n.Padding = dec.Padding
n.Padding = *dec.Padding
}
if dec.PowTime != nil {
n.PowTime = *dec.PowTime

View File

@ -25,6 +25,7 @@ import (
crand "crypto/rand"
"encoding/binary"
"errors"
mrand "math/rand"
"strconv"
"github.com/ethereum/go-ethereum/common"
@ -33,7 +34,8 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// Options specifies the exact way a message should be wrapped into an Envelope.
// MessageParams specifies the exact way a message should be wrapped
// into an Envelope.
type MessageParams struct {
TTL uint32
Src *ecdsa.PrivateKey
@ -54,13 +56,14 @@ type sentMessage struct {
}
// ReceivedMessage represents a data packet to be received through the
// Whisper protocol.
// Whisper protocol and successfully decrypted.
type ReceivedMessage struct {
Raw []byte
Payload []byte
Padding []byte
Signature []byte
Salt []byte
PoW float64 // Proof of work as described in the Whisper spec
Sent uint32 // Time when the message was posted into the network
@ -69,9 +72,8 @@ type ReceivedMessage struct {
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
Topic TopicType
SymKeyHash common.Hash // The Keccak256Hash of the key, associated with the Topic
EnvelopeHash common.Hash // Message envelope hash to act as a unique id
EnvelopeVersion uint64
SymKeyHash common.Hash // The Keccak256Hash of the key
EnvelopeHash common.Hash // Message envelope hash to act as a unique id
}
func isMessageSigned(flags byte) bool {
@ -86,79 +88,62 @@ func (msg *ReceivedMessage) isAsymmetricEncryption() bool {
return msg.Dst != nil
}
// NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
// NewSentMessage creates and initializes a non-signed, non-encrypted Whisper message.
func NewSentMessage(params *MessageParams) (*sentMessage, error) {
const payloadSizeFieldMaxSize = 4
msg := sentMessage{}
msg.Raw = make([]byte, 1, len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit)
msg.Raw = make([]byte, 1,
flagsLength+payloadSizeFieldMaxSize+len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit)
msg.Raw[0] = 0 // set all the flags to zero
err := msg.appendPadding(params)
if err != nil {
return nil, err
}
msg.addPayloadSizeField(params.Payload)
msg.Raw = append(msg.Raw, params.Payload...)
return &msg, nil
err := msg.appendPadding(params)
return &msg, err
}
// getSizeOfLength returns the number of bytes necessary to encode the entire size padding (including these bytes)
func getSizeOfLength(b []byte) (sz int, err error) {
sz = intSize(len(b)) // first iteration
sz = intSize(len(b) + sz) // second iteration
if sz > 3 {
err = errors.New("oversized padding parameter")
}
return sz, err
// addPayloadSizeField appends the auxiliary field containing the size of payload
func (msg *sentMessage) addPayloadSizeField(payload []byte) {
fieldSize := getSizeOfPayloadSizeField(payload)
field := make([]byte, 4)
binary.LittleEndian.PutUint32(field, uint32(len(payload)))
field = field[:fieldSize]
msg.Raw = append(msg.Raw, field...)
msg.Raw[0] |= byte(fieldSize)
}
// sizeOfIntSize returns minimal number of bytes necessary to encode an integer value
func intSize(i int) (s int) {
for s = 1; i >= 256; s++ {
i /= 256
// getSizeOfPayloadSizeField returns the number of bytes necessary to encode the size of payload
func getSizeOfPayloadSizeField(payload []byte) int {
s := 1
for i := len(payload); i >= 256; i /= 256 {
s++
}
return s
}
// appendPadding appends the pseudorandom padding bytes and sets the padding flag.
// The last byte contains the size of padding (thus, its size must not exceed 256).
// appendPadding appends the padding specified in params.
// If no padding is provided in params, then random padding is generated.
func (msg *sentMessage) appendPadding(params *MessageParams) error {
rawSize := len(params.Payload) + 1
if len(params.Padding) != 0 {
// padding data was provided by the Dapp, just use it as is
msg.Raw = append(msg.Raw, params.Padding...)
return nil
}
rawSize := flagsLength + getSizeOfPayloadSizeField(params.Payload) + len(params.Payload)
if params.Src != nil {
rawSize += signatureLength
}
odd := rawSize % padSizeLimit
if len(params.Padding) != 0 {
padSize := len(params.Padding)
padLengthSize, err := getSizeOfLength(params.Padding)
if err != nil {
return err
}
totalPadSize := padSize + padLengthSize
buf := make([]byte, 8)
binary.LittleEndian.PutUint32(buf, uint32(totalPadSize))
buf = buf[:padLengthSize]
msg.Raw = append(msg.Raw, buf...)
msg.Raw = append(msg.Raw, params.Padding...)
msg.Raw[0] |= byte(padLengthSize) // number of bytes indicating the padding size
} else if odd != 0 {
totalPadSize := padSizeLimit - odd
if totalPadSize > 255 {
// this algorithm is only valid if padSizeLimit < 256.
// if padSizeLimit will ever change, please fix the algorithm
// (please see also ReceivedMessage.extractPadding() function).
panic("please fix the padding algorithm before releasing new version")
}
buf := make([]byte, totalPadSize)
_, err := crand.Read(buf[1:])
if err != nil {
return err
}
if totalPadSize > 6 && !validateSymmetricKey(buf) {
return errors.New("failed to generate random padding of size " + strconv.Itoa(totalPadSize))
}
buf[0] = byte(totalPadSize)
msg.Raw = append(msg.Raw, buf...)
msg.Raw[0] |= byte(0x1) // number of bytes indicating the padding size
paddingSize := padSizeLimit - odd
pad := make([]byte, paddingSize)
_, err := crand.Read(pad)
if err != nil {
return err
}
if !validateDataIntegrity(pad, paddingSize) {
return errors.New("failed to generate random padding of size " + strconv.Itoa(paddingSize))
}
msg.Raw = append(msg.Raw, pad...)
return nil
}
@ -171,11 +156,11 @@ func (msg *sentMessage) sign(key *ecdsa.PrivateKey) error {
return nil
}
msg.Raw[0] |= signatureFlag
msg.Raw[0] |= signatureFlag // it is important to set this flag before signing
hash := crypto.Keccak256(msg.Raw)
signature, err := crypto.Sign(hash, key)
if err != nil {
msg.Raw[0] &= ^signatureFlag // clear the flag
msg.Raw[0] &= (0xFF ^ signatureFlag) // clear the flag
return err
}
msg.Raw = append(msg.Raw, signature...)
@ -196,31 +181,56 @@ func (msg *sentMessage) encryptAsymmetric(key *ecdsa.PublicKey) error {
// encryptSymmetric encrypts a message with a topic key, using AES-GCM-256.
// nonce size should be 12 bytes (see cipher.gcmStandardNonceSize).
func (msg *sentMessage) encryptSymmetric(key []byte) (nonce []byte, err error) {
if !validateSymmetricKey(key) {
return nil, errors.New("invalid key provided for symmetric encryption")
func (msg *sentMessage) encryptSymmetric(key []byte) (err error) {
if !validateDataIntegrity(key, aesKeyLength) {
return errors.New("invalid key provided for symmetric encryption, size: " + strconv.Itoa(len(key)))
}
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
return err
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
return err
}
salt, err := generateSecureRandomData(aesNonceLength) // never use more than 2^32 random nonces with a given key
if err != nil {
return err
}
encrypted := aesgcm.Seal(nil, salt, msg.Raw, nil)
msg.Raw = append(encrypted, salt...)
return nil
}
// never use more than 2^32 random nonces with a given key
nonce = make([]byte, aesgcm.NonceSize())
_, err = crand.Read(nonce)
// generateSecureRandomData generates random data where extra security is required.
// The purpose of this function is to prevent some bugs in software or in hardware
// from delivering not-very-random data. This is especially useful for AES nonce,
// where true randomness does not really matter, but it is very important to have
// a unique nonce for every message.
func generateSecureRandomData(length int) ([]byte, error) {
x := make([]byte, length)
y := make([]byte, length)
res := make([]byte, length)
_, err := crand.Read(x)
if err != nil {
return nil, err
} else if !validateSymmetricKey(nonce) {
return nil, errors.New("crypto/rand failed to generate nonce")
} else if !validateDataIntegrity(x, length) {
return nil, errors.New("crypto/rand failed to generate secure random data")
}
msg.Raw = aesgcm.Seal(nil, nonce, msg.Raw, nil)
return nonce, nil
_, err = mrand.Read(y)
if err != nil {
return nil, err
} else if !validateDataIntegrity(y, length) {
return nil, errors.New("math/rand failed to generate secure random data")
}
for i := 0; i < length; i++ {
res[i] = x[i] ^ y[i]
}
if !validateDataIntegrity(res, length) {
return nil, errors.New("failed to generate secure random data")
}
return res, nil
}
// Wrap bundles the message into an Envelope to transmit over the network.
@ -233,11 +243,10 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er
return nil, err
}
}
var nonce []byte
if options.Dst != nil {
err = msg.encryptAsymmetric(options.Dst)
} else if options.KeySym != nil {
nonce, err = msg.encryptSymmetric(options.KeySym)
err = msg.encryptSymmetric(options.KeySym)
} else {
err = errors.New("unable to encrypt the message: neither symmetric nor assymmetric key provided")
}
@ -245,7 +254,7 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er
return nil, err
}
envelope = NewEnvelope(options.TTL, options.Topic, nonce, msg)
envelope = NewEnvelope(options.TTL, options.Topic, msg)
if err = envelope.Seal(options); err != nil {
return nil, err
}
@ -254,7 +263,13 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er
// decryptSymmetric decrypts a message with a topic key, using AES-GCM-256.
// nonce size should be 12 bytes (see cipher.gcmStandardNonceSize).
func (msg *ReceivedMessage) decryptSymmetric(key []byte, nonce []byte) error {
func (msg *ReceivedMessage) decryptSymmetric(key []byte) error {
// symmetric messages are expected to contain the 12-byte nonce at the end of the payload
if len(msg.Raw) < aesNonceLength {
return errors.New("missing salt or invalid payload in symmetric message")
}
salt := msg.Raw[len(msg.Raw)-aesNonceLength:]
block, err := aes.NewCipher(key)
if err != nil {
return err
@ -263,15 +278,12 @@ func (msg *ReceivedMessage) decryptSymmetric(key []byte, nonce []byte) error {
if err != nil {
return err
}
if len(nonce) != aesgcm.NonceSize() {
log.Error("decrypting the message", "AES nonce size", len(nonce))
return errors.New("wrong AES nonce size")
}
decrypted, err := aesgcm.Open(nil, nonce, msg.Raw, nil)
decrypted, err := aesgcm.Open(nil, salt, msg.Raw[:len(msg.Raw)-aesNonceLength], nil)
if err != nil {
return err
}
msg.Raw = decrypted
msg.Salt = salt
return nil
}
@ -284,8 +296,8 @@ func (msg *ReceivedMessage) decryptAsymmetric(key *ecdsa.PrivateKey) error {
return err
}
// Validate checks the validity and extracts the fields in case of success
func (msg *ReceivedMessage) Validate() bool {
// ValidateAndParse checks the message validity and extracts the fields in case of success.
func (msg *ReceivedMessage) ValidateAndParse() bool {
end := len(msg.Raw)
if end < 1 {
return false
@ -296,41 +308,32 @@ func (msg *ReceivedMessage) Validate() bool {
if end <= 1 {
return false
}
msg.Signature = msg.Raw[end:]
msg.Signature = msg.Raw[end : end+signatureLength]
msg.Src = msg.SigToPubKey()
if msg.Src == nil {
return false
}
}
padSize, ok := msg.extractPadding(end)
if !ok {
return false
beg := 1
payloadSize := 0
sizeOfPayloadSizeField := int(msg.Raw[0] & SizeMask) // number of bytes indicating the size of payload
if sizeOfPayloadSizeField != 0 {
payloadSize = int(bytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField]))
if payloadSize+1 > end {
return false
}
beg += sizeOfPayloadSizeField
msg.Payload = msg.Raw[beg : beg+payloadSize]
}
msg.Payload = msg.Raw[1+padSize : end]
beg += payloadSize
msg.Padding = msg.Raw[beg:end]
return true
}
// extractPadding extracts the padding from raw message.
// although we don't support sending messages with padding size
// exceeding 255 bytes, such messages are perfectly valid, and
// can be successfully decrypted.
func (msg *ReceivedMessage) extractPadding(end int) (int, bool) {
paddingSize := 0
sz := int(msg.Raw[0] & paddingMask) // number of bytes indicating the entire size of padding (including these bytes)
// could be zero -- it means no padding
if sz != 0 {
paddingSize = int(bytesToUintLittleEndian(msg.Raw[1 : 1+sz]))
if paddingSize < sz || paddingSize+1 > end {
return 0, false
}
msg.Padding = msg.Raw[1+sz : 1+paddingSize]
}
return paddingSize, true
}
// Recover retrieves the public key of the message signer.
// SigToPubKey returns the public key associated to the message's
// signature.
func (msg *ReceivedMessage) SigToPubKey() *ecdsa.PublicKey {
defer func() { recover() }() // in case of invalid signature
@ -342,7 +345,7 @@ func (msg *ReceivedMessage) SigToPubKey() *ecdsa.PublicKey {
return pub
}
// hash calculates the SHA3 checksum of the message flags, payload and padding.
// hash calculates the SHA3 checksum of the message flags, payload size field, payload and padding.
func (msg *ReceivedMessage) hash() []byte {
if isMessageSigned(msg.Raw[0]) {
sz := len(msg.Raw) - signatureLength

View File

@ -18,6 +18,7 @@ package whisperv6
import (
"fmt"
"math"
"time"
"github.com/ethereum/go-ethereum/common"
@ -27,12 +28,16 @@ import (
set "gopkg.in/fatih/set.v0"
)
// peer represents a whisper protocol peer connection.
// Peer represents a whisper protocol peer connection.
type Peer struct {
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter
trusted bool
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter
trusted bool
powRequirement float64
bloomFilter []byte
fullNode bool
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
@ -42,62 +47,93 @@ type Peer struct {
// newPeer creates a new whisper peer object, but does not run the handshake itself.
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return &Peer{
host: host,
peer: remote,
ws: rw,
trusted: false,
known: set.New(),
quit: make(chan struct{}),
host: host,
peer: remote,
ws: rw,
trusted: false,
powRequirement: 0.0,
known: set.New(),
quit: make(chan struct{}),
bloomFilter: makeFullNodeBloom(),
fullNode: true,
}
}
// start initiates the peer updater, periodically broadcasting the whisper packets
// into the network.
func (p *Peer) start() {
go p.update()
log.Trace("start", "peer", p.ID())
func (peer *Peer) start() {
go peer.update()
log.Trace("start", "peer", peer.ID())
}
// stop terminates the peer updater, stopping message forwarding to it.
func (p *Peer) stop() {
close(p.quit)
log.Trace("stop", "peer", p.ID())
func (peer *Peer) stop() {
close(peer.quit)
log.Trace("stop", "peer", peer.ID())
}
// handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too.
func (p *Peer) handshake() error {
func (peer *Peer) handshake() error {
// Send the handshake status message asynchronously
errc := make(chan error, 1)
go func() {
errc <- p2p.Send(p.ws, statusCode, ProtocolVersion)
pow := peer.host.MinPow()
powConverted := math.Float64bits(pow)
bloom := peer.host.BloomFilter()
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom)
}()
// Fetch the remote status packet and verify protocol match
packet, err := p.ws.ReadMsg()
packet, err := peer.ws.ReadMsg()
if err != nil {
return err
}
if packet.Code != statusCode {
return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code)
return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
}
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
_, err = s.List()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
}
peerVersion, err := s.Uint()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err)
return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
}
if peerVersion != ProtocolVersion {
return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion)
return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
}
// Wait until out own status is consumed too
// only version is mandatory, subsequent parameters are optional
powRaw, err := s.Uint()
if err == nil {
pow := math.Float64frombits(powRaw)
if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
}
peer.powRequirement = pow
var bloom []byte
err = s.Decode(&bloom)
if err == nil {
sz := len(bloom)
if sz != bloomFilterSize && sz != 0 {
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
}
peer.setBloomFilter(bloom)
}
}
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err)
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
}
return nil
}
// update executes periodic operations on the peer, including message transmission
// and expiration.
func (p *Peer) update() {
func (peer *Peer) update() {
// Start the tickers for the updates
expire := time.NewTicker(expirationCycle)
transmit := time.NewTicker(transmissionCycle)
@ -106,15 +142,15 @@ func (p *Peer) update() {
for {
select {
case <-expire.C:
p.expire()
peer.expire()
case <-transmit.C:
if err := p.broadcast(); err != nil {
log.Trace("broadcast failed", "reason", err, "peer", p.ID())
if err := peer.broadcast(); err != nil {
log.Trace("broadcast failed", "reason", err, "peer", peer.ID())
return
}
case <-p.quit:
case <-peer.quit:
return
}
}
@ -148,27 +184,62 @@ func (peer *Peer) expire() {
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (p *Peer) broadcast() error {
var cnt int
envelopes := p.host.Envelopes()
func (peer *Peer) broadcast() error {
envelopes := peer.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
if !p.marked(envelope) {
err := p2p.Send(p.ws, messagesCode, envelope)
if err != nil {
return err
} else {
p.mark(envelope)
cnt++
}
if !peer.marked(envelope) && envelope.PoW() >= peer.powRequirement && peer.bloomMatch(envelope) {
bundle = append(bundle, envelope)
}
}
if cnt > 0 {
log.Trace("broadcast", "num. messages", cnt)
if len(bundle) > 0 {
// transmit the batch of envelopes
if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil {
return err
}
// mark envelopes only if they were successfully sent
for _, e := range bundle {
peer.mark(e)
}
log.Trace("broadcast", "num. messages", len(bundle))
}
return nil
}
func (p *Peer) ID() []byte {
id := p.peer.ID()
// ID returns a peer's id
func (peer *Peer) ID() []byte {
id := peer.peer.ID()
return id[:]
}
func (peer *Peer) notifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(peer.ws, powRequirementCode, i)
}
func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
return p2p.Send(peer.ws, bloomFilterExCode, bloom)
}
func (peer *Peer) bloomMatch(env *Envelope) bool {
return peer.fullNode || bloomFilterMatch(peer.bloomFilter, env.Bloom())
}
func (peer *Peer) setBloomFilter(bloom []byte) {
peer.bloomFilter = bloom
peer.fullNode = isFullNode(bloom)
if peer.fullNode && peer.bloomFilter == nil {
peer.bloomFilter = makeFullNodeBloom()
}
}
func makeFullNodeBloom() []byte {
bloom := make([]byte, bloomFilterSize)
for i := 0; i < bloomFilterSize; i++ {
bloom[i] = 0xFF
}
return bloom
}

View File

@ -23,11 +23,13 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
)
// Topic represents a cryptographically secure, probabilistic partial
// TopicType represents a cryptographically secure, probabilistic partial
// classifications of a message, determined as the first (left) 4 bytes of the
// SHA3 hash of some arbitrary data given by the original author of the message.
type TopicType [TopicLength]byte
// BytesToTopic converts from the byte array representation of a topic
// into the TopicType type.
func BytesToTopic(b []byte) (t TopicType) {
sz := TopicLength
if x := len(b); x < TopicLength {

File diff suppressed because it is too large Load Diff