diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go index 3dddb6953..a2c75a41c 100644 --- a/whisper/whisperv6/api.go +++ b/whisper/whisperv6/api.go @@ -36,6 +36,7 @@ const ( filterTimeout = 300 // filters are considered timeout out after filterTimeout seconds ) +// List of errors var ( ErrSymAsym = errors.New("specify either a symmetric or an asymmetric key") ErrInvalidSymmetricKey = errors.New("invalid symmetric key") @@ -116,12 +117,17 @@ func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32) return true, api.w.SetMaxMessageSize(size) } -// SetMinPow sets the minimum PoW for a message before it is accepted. +// SetMinPoW sets the minimum PoW, and notifies the peers. func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) { return true, api.w.SetMinimumPoW(pow) } -// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages. +// SetBloomFilter sets the new value of bloom filter, and notifies the peers. +func (api *PublicWhisperAPI) SetBloomFilter(ctx context.Context, bloom hexutil.Bytes) (bool, error) { + return true, api.w.SetBloomFilter(bloom) +} + +// MarkTrustedPeer marks a peer trusted, which will allow it to send historic (expired) messages. // Note: This function is not adding new nodes, the node needs to exists as a peer. func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) { n, err := discover.ParseNode(enode) @@ -169,7 +175,7 @@ func (api *PublicWhisperAPI) GetPublicKey(ctx context.Context, id string) (hexut return crypto.FromECDSAPub(&key.PublicKey), nil } -// GetPublicKey returns the private key associated with the given key. The key is the hex +// GetPrivateKey returns the private key associated with the given key. The key is the hex // encoded representation of a key in the form specified in section 4.3.6 of ANSI X9.62. func (api *PublicWhisperAPI) GetPrivateKey(ctx context.Context, id string) (hexutil.Bytes, error) { key, err := api.w.GetPrivateKey(id) @@ -272,7 +278,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { return false, err } - if !validateSymmetricKey(params.KeySym) { + if !validateDataIntegrity(params.KeySym, aesKeyLength) { return false, ErrInvalidSymmetricKey } } @@ -378,7 +384,7 @@ func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc. if err != nil { return nil, err } - if !validateSymmetricKey(key) { + if !validateDataIntegrity(key, aesKeyLength) { return nil, ErrInvalidSymmetricKey } filter.KeySym = key @@ -550,7 +556,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) { if keySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { return "", err } - if !validateSymmetricKey(keySym) { + if !validateDataIntegrity(keySym, aesKeyLength) { return "", ErrInvalidSymmetricKey } } @@ -562,7 +568,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) { } if len(req.Topics) > 0 { - topics = make([][]byte, 1) + topics = make([][]byte, 0, len(req.Topics)) for _, topic := range req.Topics { topics = append(topics, topic[:]) } diff --git a/whisper/whisperv6/config.go b/whisper/whisperv6/config.go index d7f817aa2..61419de00 100644 --- a/whisper/whisperv6/config.go +++ b/whisper/whisperv6/config.go @@ -16,11 +16,13 @@ package whisperv6 +// Config represents the configuration state of a whisper node. type Config struct { MaxMessageSize uint32 `toml:",omitempty"` MinimumAcceptedPOW float64 `toml:",omitempty"` } +// DefaultConfig represents (shocker!) the default configuration. var DefaultConfig = Config{ MaxMessageSize: DefaultMaxMessageSize, MinimumAcceptedPOW: DefaultMinimumPoW, diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go index e64dd2f42..d5d7fed60 100644 --- a/whisper/whisperv6/doc.go +++ b/whisper/whisperv6/doc.go @@ -27,6 +27,9 @@ Whisper is a pure identity-based messaging system. Whisper provides a low-level or prejudiced by the low-level hardware attributes and characteristics, particularly the notion of singular endpoints. */ + +// Contains the Whisper protocol constant definitions + package whisperv6 import ( @@ -34,39 +37,46 @@ import ( "time" ) +// Whisper protocol parameters const ( - EnvelopeVersion = uint64(0) - ProtocolVersion = uint64(5) - ProtocolVersionStr = "5.0" - ProtocolName = "shh" - - statusCode = 0 // used by whisper protocol - messagesCode = 1 // normal whisper message - p2pCode = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) - p2pRequestCode = 3 // peer-to-peer message, used by Dapp protocol - NumberOfMessageCodes = 64 - - paddingMask = byte(3) + ProtocolVersion = uint64(6) // Protocol version number + ProtocolVersionStr = "6.0" // The same, as a string + ProtocolName = "shh" // Nickname of the protocol in geth + + // whisper protocol message codes, according to EIP-627 + statusCode = 0 // used by whisper protocol + messagesCode = 1 // normal whisper message + powRequirementCode = 2 // PoW requirement + bloomFilterExCode = 3 // bloom filter exchange + p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol + p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) + NumberOfMessageCodes = 128 + + SizeMask = byte(3) // mask used to extract the size of payload size field from the flags signatureFlag = byte(4) - TopicLength = 4 - signatureLength = 65 - aesKeyLength = 32 - AESNonceLength = 12 - keyIdSize = 32 + TopicLength = 4 // in bytes + signatureLength = 65 // in bytes + aesKeyLength = 32 // in bytes + aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize() + keyIDSize = 32 // in bytes + bloomFilterSize = 64 // in bytes + flagsLength = 1 + + EnvelopeHeaderLength = 20 MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message. DefaultMaxMessageSize = uint32(1024 * 1024) DefaultMinimumPoW = 0.2 - padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol (must not exceed 2^24) + padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol messageQueueLimit = 1024 expirationCycle = time.Second transmissionCycle = 300 * time.Millisecond - DefaultTTL = 50 // seconds - SynchAllowance = 10 // seconds + DefaultTTL = 50 // seconds + DefaultSyncAllowance = 10 // seconds ) type unknownVersionError uint64 diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go index a5f4770b0..c7bea2bb9 100644 --- a/whisper/whisperv6/envelope.go +++ b/whisper/whisperv6/envelope.go @@ -36,76 +36,60 @@ import ( // Envelope represents a clear-text data packet to transmit through the Whisper // network. Its contents may or may not be encrypted and signed. type Envelope struct { - Version []byte - Expiry uint32 - TTL uint32 - Topic TopicType - AESNonce []byte - Data []byte - EnvNonce uint64 - - pow float64 // Message-specific PoW as described in the Whisper specification. - hash common.Hash // Cached hash of the envelope to avoid rehashing every time. - // Don't access hash directly, use Hash() function instead. + Expiry uint32 + TTL uint32 + Topic TopicType + Data []byte + Nonce uint64 + + pow float64 // Message-specific PoW as described in the Whisper specification. + + // the following variables should not be accessed directly, use the corresponding function instead: Hash(), Bloom() + hash common.Hash // Cached hash of the envelope to avoid rehashing every time. + bloom []byte } // size returns the size of envelope as it is sent (i.e. public fields only) func (e *Envelope) size() int { - return 20 + len(e.Version) + len(e.AESNonce) + len(e.Data) + return EnvelopeHeaderLength + len(e.Data) } // rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce. func (e *Envelope) rlpWithoutNonce() []byte { - res, _ := rlp.EncodeToBytes([]interface{}{e.Version, e.Expiry, e.TTL, e.Topic, e.AESNonce, e.Data}) + res, _ := rlp.EncodeToBytes([]interface{}{e.Expiry, e.TTL, e.Topic, e.Data}) return res } // NewEnvelope wraps a Whisper message with expiration and destination data // included into an envelope for network forwarding. -func NewEnvelope(ttl uint32, topic TopicType, aesNonce []byte, msg *sentMessage) *Envelope { +func NewEnvelope(ttl uint32, topic TopicType, msg *sentMessage) *Envelope { env := Envelope{ - Version: make([]byte, 1), - Expiry: uint32(time.Now().Add(time.Second * time.Duration(ttl)).Unix()), - TTL: ttl, - Topic: topic, - AESNonce: aesNonce, - Data: msg.Raw, - EnvNonce: 0, - } - - if EnvelopeVersion < 256 { - env.Version[0] = byte(EnvelopeVersion) - } else { - panic("please increase the size of Envelope.Version before releasing this version") + Expiry: uint32(time.Now().Add(time.Second * time.Duration(ttl)).Unix()), + TTL: ttl, + Topic: topic, + Data: msg.Raw, + Nonce: 0, } return &env } -func (e *Envelope) IsSymmetric() bool { - return len(e.AESNonce) > 0 -} - -func (e *Envelope) isAsymmetric() bool { - return !e.IsSymmetric() -} - -func (e *Envelope) Ver() uint64 { - return bytesToUintLittleEndian(e.Version) -} - // Seal closes the envelope by spending the requested amount of time as a proof // of work on hashing the data. func (e *Envelope) Seal(options *MessageParams) error { - var target, bestBit int if options.PoW == 0 { - // adjust for the duration of Seal() execution only if execution time is predefined unconditionally + // PoW is not required + return nil + } + + var target, bestBit int + if options.PoW < 0 { + // target is not set - the function should run for a period + // of time specified in WorkTime param. Since we can predict + // the execution time, we can also adjust Expiry. e.Expiry += options.WorkTime } else { target = e.powToFirstBit(options.PoW) - if target < 1 { - target = 1 - } } buf := make([]byte, 64) @@ -119,7 +103,7 @@ func (e *Envelope) Seal(options *MessageParams) error { d := new(big.Int).SetBytes(crypto.Keccak256(buf)) firstBit := math.FirstBitSet(d) if firstBit > bestBit { - e.EnvNonce, bestBit = nonce, firstBit + e.Nonce, bestBit = nonce, firstBit if target > 0 && bestBit >= target { return nil } @@ -135,6 +119,8 @@ func (e *Envelope) Seal(options *MessageParams) error { return nil } +// PoW computes (if necessary) and returns the proof of work target +// of the envelope. func (e *Envelope) PoW() float64 { if e.pow == 0 { e.calculatePoW(0) @@ -146,7 +132,7 @@ func (e *Envelope) calculatePoW(diff uint32) { buf := make([]byte, 64) h := crypto.Keccak256(e.rlpWithoutNonce()) copy(buf[:32], h) - binary.BigEndian.PutUint64(buf[56:], e.EnvNonce) + binary.BigEndian.PutUint64(buf[56:], e.Nonce) d := new(big.Int).SetBytes(crypto.Keccak256(buf)) firstBit := math.FirstBitSet(d) x := gmath.Pow(2, float64(firstBit)) @@ -161,7 +147,11 @@ func (e *Envelope) powToFirstBit(pow float64) int { x *= float64(e.TTL) bits := gmath.Log2(x) bits = gmath.Ceil(bits) - return int(bits) + res := int(bits) + if res < 1 { + res = 1 + } + return res } // Hash returns the SHA3 hash of the envelope, calculating it if not yet done. @@ -209,7 +199,7 @@ func (e *Envelope) OpenAsymmetric(key *ecdsa.PrivateKey) (*ReceivedMessage, erro // OpenSymmetric tries to decrypt an envelope, potentially encrypted with a particular key. func (e *Envelope) OpenSymmetric(key []byte) (msg *ReceivedMessage, err error) { msg = &ReceivedMessage{Raw: e.Data} - err = msg.decryptSymmetric(key, e.AESNonce) + err = msg.decryptSymmetric(key) if err != nil { msg = nil } @@ -218,12 +208,17 @@ func (e *Envelope) OpenSymmetric(key []byte) (msg *ReceivedMessage, err error) { // Open tries to decrypt an envelope, and populates the message fields in case of success. func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) { - if e.isAsymmetric() { + // The API interface forbids filters doing both symmetric and asymmetric encryption. + if watcher.expectsAsymmetricEncryption() && watcher.expectsSymmetricEncryption() { + return nil + } + + if watcher.expectsAsymmetricEncryption() { msg, _ = e.OpenAsymmetric(watcher.KeyAsym) if msg != nil { msg.Dst = &watcher.KeyAsym.PublicKey } - } else if e.IsSymmetric() { + } else if watcher.expectsSymmetricEncryption() { msg, _ = e.OpenSymmetric(watcher.KeySym) if msg != nil { msg.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym) @@ -231,7 +226,7 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) { } if msg != nil { - ok := msg.Validate() + ok := msg.ValidateAndParse() if !ok { return nil } @@ -240,7 +235,33 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) { msg.TTL = e.TTL msg.Sent = e.Expiry - e.TTL msg.EnvelopeHash = e.Hash() - msg.EnvelopeVersion = e.Ver() } return msg } + +// Bloom maps 4-bytes Topic into 64-byte bloom filter with 3 bits set (at most). +func (e *Envelope) Bloom() []byte { + if e.bloom == nil { + e.bloom = TopicToBloom(e.Topic) + } + return e.bloom +} + +// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes) +func TopicToBloom(topic TopicType) []byte { + b := make([]byte, bloomFilterSize) + var index [3]int + for j := 0; j < 3; j++ { + index[j] = int(topic[j]) + if (topic[3] & (1 << uint(j))) != 0 { + index[j] += 256 + } + } + + for j := 0; j < 3; j++ { + byteIndex := index[j] / 8 + bitIndex := index[j] % 8 + b[byteIndex] = (1 << uint(bitIndex)) + } + return b +} diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go index 5cb371b7d..eb0c65fa3 100644 --- a/whisper/whisperv6/filter.go +++ b/whisper/whisperv6/filter.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// Filter represents a Whisper message filter type Filter struct { Src *ecdsa.PublicKey // Sender of the message KeyAsym *ecdsa.PrivateKey // Private Key of recipient @@ -39,12 +40,14 @@ type Filter struct { mutex sync.RWMutex } +// Filters represents a collection of filters type Filters struct { watchers map[string]*Filter whisper *Whisper mutex sync.RWMutex } +// NewFilters returns a newly created filter collection func NewFilters(w *Whisper) *Filters { return &Filters{ watchers: make(map[string]*Filter), @@ -52,7 +55,12 @@ func NewFilters(w *Whisper) *Filters { } } +// Install will add a new filter to the filter collection func (fs *Filters) Install(watcher *Filter) (string, error) { + if watcher.KeySym != nil && watcher.KeyAsym != nil { + return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys") + } + if watcher.Messages == nil { watcher.Messages = make(map[common.Hash]*ReceivedMessage) } @@ -77,6 +85,8 @@ func (fs *Filters) Install(watcher *Filter) (string, error) { return id, err } +// Uninstall will remove a filter whose id has been specified from +// the filter collection func (fs *Filters) Uninstall(id string) bool { fs.mutex.Lock() defer fs.mutex.Unlock() @@ -87,12 +97,15 @@ func (fs *Filters) Uninstall(id string) bool { return false } +// Get returns a filter from the collection with a specific ID func (fs *Filters) Get(id string) *Filter { fs.mutex.RLock() defer fs.mutex.RUnlock() return fs.watchers[id] } +// NotifyWatchers notifies any filter that has declared interest +// for the envelope's topic. func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { var msg *ReceivedMessage @@ -136,9 +149,9 @@ func (f *Filter) processEnvelope(env *Envelope) *ReceivedMessage { msg := env.Open(f) if msg != nil { return msg - } else { - log.Trace("processing envelope: failed to open", "hash", env.Hash().Hex()) } + + log.Trace("processing envelope: failed to open", "hash", env.Hash().Hex()) } else { log.Trace("processing envelope: does not match", "hash", env.Hash().Hex()) } @@ -153,6 +166,8 @@ func (f *Filter) expectsSymmetricEncryption() bool { return f.KeySym != nil } +// Trigger adds a yet-unknown message to the filter's list of +// received messages. func (f *Filter) Trigger(msg *ReceivedMessage) { f.mutex.Lock() defer f.mutex.Unlock() @@ -162,6 +177,8 @@ func (f *Filter) Trigger(msg *ReceivedMessage) { } } +// Retrieve will return the list of all received messages associated +// to a filter. func (f *Filter) Retrieve() (all []*ReceivedMessage) { f.mutex.Lock() defer f.mutex.Unlock() @@ -175,6 +192,9 @@ func (f *Filter) Retrieve() (all []*ReceivedMessage) { return all } +// MatchMessage checks if the filter matches an already decrypted +// message (i.e. a Message that has already been handled by +// MatchEnvelope when checked by a previous filter) func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { if f.PoW > 0 && msg.PoW < f.PoW { return false @@ -188,19 +208,18 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { return false } +// MatchEnvelope checks if it's worth decrypting the message. If +// it returns `true`, client code is expected to attempt decrypting +// the message and subsequently call MatchMessage. func (f *Filter) MatchEnvelope(envelope *Envelope) bool { if f.PoW > 0 && envelope.pow < f.PoW { return false } - if f.expectsAsymmetricEncryption() && envelope.isAsymmetric() { - return f.MatchTopic(envelope.Topic) - } else if f.expectsSymmetricEncryption() && envelope.IsSymmetric() { - return f.MatchTopic(envelope.Topic) - } - return false + return f.MatchTopic(envelope.Topic) } +// MatchTopic checks that the filter captures a given topic. func (f *Filter) MatchTopic(topic TopicType) bool { if len(f.Topics) == 0 { // any topic matches @@ -216,8 +235,12 @@ func (f *Filter) MatchTopic(topic TopicType) bool { } func matchSingleTopic(topic TopicType, bt []byte) bool { - if len(bt) > 4 { - bt = bt[:4] + if len(bt) > TopicLength { + bt = bt[:TopicLength] + } + + if len(bt) < TopicLength { + return false } for j, b := range bt { @@ -228,6 +251,7 @@ func matchSingleTopic(topic TopicType, bt []byte) bool { return true } +// IsPubKeyEqual checks that two public keys are equal func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool { if !ValidatePublicKey(a) { return false diff --git a/whisper/whisperv6/gen_criteria_json.go b/whisper/whisperv6/gen_criteria_json.go index 52a4d3cb6..1a428d6df 100644 --- a/whisper/whisperv6/gen_criteria_json.go +++ b/whisper/whisperv6/gen_criteria_json.go @@ -10,6 +10,7 @@ import ( var _ = (*criteriaOverride)(nil) +// MarshalJSON marshals type Criteria to a json string func (c Criteria) MarshalJSON() ([]byte, error) { type Criteria struct { SymKeyID string `json:"symKeyID"` @@ -29,14 +30,15 @@ func (c Criteria) MarshalJSON() ([]byte, error) { return json.Marshal(&enc) } +// UnmarshalJSON unmarshals type Criteria to a json string func (c *Criteria) UnmarshalJSON(input []byte) error { type Criteria struct { - SymKeyID *string `json:"symKeyID"` - PrivateKeyID *string `json:"privateKeyID"` - Sig hexutil.Bytes `json:"sig"` - MinPow *float64 `json:"minPow"` - Topics []TopicType `json:"topics"` - AllowP2P *bool `json:"allowP2P"` + SymKeyID *string `json:"symKeyID"` + PrivateKeyID *string `json:"privateKeyID"` + Sig *hexutil.Bytes `json:"sig"` + MinPow *float64 `json:"minPow"` + Topics []TopicType `json:"topics"` + AllowP2P *bool `json:"allowP2P"` } var dec Criteria if err := json.Unmarshal(input, &dec); err != nil { @@ -49,7 +51,7 @@ func (c *Criteria) UnmarshalJSON(input []byte) error { c.PrivateKeyID = *dec.PrivateKeyID } if dec.Sig != nil { - c.Sig = dec.Sig + c.Sig = *dec.Sig } if dec.MinPow != nil { c.MinPow = *dec.MinPow diff --git a/whisper/whisperv6/gen_message_json.go b/whisper/whisperv6/gen_message_json.go index 27b46752b..6218f5df6 100644 --- a/whisper/whisperv6/gen_message_json.go +++ b/whisper/whisperv6/gen_message_json.go @@ -10,6 +10,7 @@ import ( var _ = (*messageOverride)(nil) +// MarshalJSON marshals type Message to a json string func (m Message) MarshalJSON() ([]byte, error) { type Message struct { Sig hexutil.Bytes `json:"sig,omitempty"` @@ -35,24 +36,25 @@ func (m Message) MarshalJSON() ([]byte, error) { return json.Marshal(&enc) } +// UnmarshalJSON unmarshals type Message to a json string func (m *Message) UnmarshalJSON(input []byte) error { type Message struct { - Sig hexutil.Bytes `json:"sig,omitempty"` - TTL *uint32 `json:"ttl"` - Timestamp *uint32 `json:"timestamp"` - Topic *TopicType `json:"topic"` - Payload hexutil.Bytes `json:"payload"` - Padding hexutil.Bytes `json:"padding"` - PoW *float64 `json:"pow"` - Hash hexutil.Bytes `json:"hash"` - Dst hexutil.Bytes `json:"recipientPublicKey,omitempty"` + Sig *hexutil.Bytes `json:"sig,omitempty"` + TTL *uint32 `json:"ttl"` + Timestamp *uint32 `json:"timestamp"` + Topic *TopicType `json:"topic"` + Payload *hexutil.Bytes `json:"payload"` + Padding *hexutil.Bytes `json:"padding"` + PoW *float64 `json:"pow"` + Hash *hexutil.Bytes `json:"hash"` + Dst *hexutil.Bytes `json:"recipientPublicKey,omitempty"` } var dec Message if err := json.Unmarshal(input, &dec); err != nil { return err } if dec.Sig != nil { - m.Sig = dec.Sig + m.Sig = *dec.Sig } if dec.TTL != nil { m.TTL = *dec.TTL @@ -64,19 +66,19 @@ func (m *Message) UnmarshalJSON(input []byte) error { m.Topic = *dec.Topic } if dec.Payload != nil { - m.Payload = dec.Payload + m.Payload = *dec.Payload } if dec.Padding != nil { - m.Padding = dec.Padding + m.Padding = *dec.Padding } if dec.PoW != nil { m.PoW = *dec.PoW } if dec.Hash != nil { - m.Hash = dec.Hash + m.Hash = *dec.Hash } if dec.Dst != nil { - m.Dst = dec.Dst + m.Dst = *dec.Dst } return nil } diff --git a/whisper/whisperv6/gen_newmessage_json.go b/whisper/whisperv6/gen_newmessage_json.go index d16011a57..75a1279ae 100644 --- a/whisper/whisperv6/gen_newmessage_json.go +++ b/whisper/whisperv6/gen_newmessage_json.go @@ -10,6 +10,7 @@ import ( var _ = (*newMessageOverride)(nil) +// MarshalJSON marshals type NewMessage to a json string func (n NewMessage) MarshalJSON() ([]byte, error) { type NewMessage struct { SymKeyID string `json:"symKeyID"` @@ -37,18 +38,19 @@ func (n NewMessage) MarshalJSON() ([]byte, error) { return json.Marshal(&enc) } +// UnmarshalJSON unmarshals type NewMessage to a json string func (n *NewMessage) UnmarshalJSON(input []byte) error { type NewMessage struct { - SymKeyID *string `json:"symKeyID"` - PublicKey hexutil.Bytes `json:"pubKey"` - Sig *string `json:"sig"` - TTL *uint32 `json:"ttl"` - Topic *TopicType `json:"topic"` - Payload hexutil.Bytes `json:"payload"` - Padding hexutil.Bytes `json:"padding"` - PowTime *uint32 `json:"powTime"` - PowTarget *float64 `json:"powTarget"` - TargetPeer *string `json:"targetPeer"` + SymKeyID *string `json:"symKeyID"` + PublicKey *hexutil.Bytes `json:"pubKey"` + Sig *string `json:"sig"` + TTL *uint32 `json:"ttl"` + Topic *TopicType `json:"topic"` + Payload *hexutil.Bytes `json:"payload"` + Padding *hexutil.Bytes `json:"padding"` + PowTime *uint32 `json:"powTime"` + PowTarget *float64 `json:"powTarget"` + TargetPeer *string `json:"targetPeer"` } var dec NewMessage if err := json.Unmarshal(input, &dec); err != nil { @@ -58,7 +60,7 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error { n.SymKeyID = *dec.SymKeyID } if dec.PublicKey != nil { - n.PublicKey = dec.PublicKey + n.PublicKey = *dec.PublicKey } if dec.Sig != nil { n.Sig = *dec.Sig @@ -70,10 +72,10 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error { n.Topic = *dec.Topic } if dec.Payload != nil { - n.Payload = dec.Payload + n.Payload = *dec.Payload } if dec.Padding != nil { - n.Padding = dec.Padding + n.Padding = *dec.Padding } if dec.PowTime != nil { n.PowTime = *dec.PowTime diff --git a/whisper/whisperv6/message.go b/whisper/whisperv6/message.go index 0815f07a2..b8318cbe8 100644 --- a/whisper/whisperv6/message.go +++ b/whisper/whisperv6/message.go @@ -25,6 +25,7 @@ import ( crand "crypto/rand" "encoding/binary" "errors" + mrand "math/rand" "strconv" "github.com/ethereum/go-ethereum/common" @@ -33,7 +34,8 @@ import ( "github.com/ethereum/go-ethereum/log" ) -// Options specifies the exact way a message should be wrapped into an Envelope. +// MessageParams specifies the exact way a message should be wrapped +// into an Envelope. type MessageParams struct { TTL uint32 Src *ecdsa.PrivateKey @@ -54,13 +56,14 @@ type sentMessage struct { } // ReceivedMessage represents a data packet to be received through the -// Whisper protocol. +// Whisper protocol and successfully decrypted. type ReceivedMessage struct { Raw []byte Payload []byte Padding []byte Signature []byte + Salt []byte PoW float64 // Proof of work as described in the Whisper spec Sent uint32 // Time when the message was posted into the network @@ -69,9 +72,8 @@ type ReceivedMessage struct { Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message) Topic TopicType - SymKeyHash common.Hash // The Keccak256Hash of the key, associated with the Topic - EnvelopeHash common.Hash // Message envelope hash to act as a unique id - EnvelopeVersion uint64 + SymKeyHash common.Hash // The Keccak256Hash of the key + EnvelopeHash common.Hash // Message envelope hash to act as a unique id } func isMessageSigned(flags byte) bool { @@ -86,79 +88,62 @@ func (msg *ReceivedMessage) isAsymmetricEncryption() bool { return msg.Dst != nil } -// NewMessage creates and initializes a non-signed, non-encrypted Whisper message. +// NewSentMessage creates and initializes a non-signed, non-encrypted Whisper message. func NewSentMessage(params *MessageParams) (*sentMessage, error) { + const payloadSizeFieldMaxSize = 4 msg := sentMessage{} - msg.Raw = make([]byte, 1, len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit) + msg.Raw = make([]byte, 1, + flagsLength+payloadSizeFieldMaxSize+len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit) msg.Raw[0] = 0 // set all the flags to zero - err := msg.appendPadding(params) - if err != nil { - return nil, err - } + msg.addPayloadSizeField(params.Payload) msg.Raw = append(msg.Raw, params.Payload...) - return &msg, nil + err := msg.appendPadding(params) + return &msg, err } -// getSizeOfLength returns the number of bytes necessary to encode the entire size padding (including these bytes) -func getSizeOfLength(b []byte) (sz int, err error) { - sz = intSize(len(b)) // first iteration - sz = intSize(len(b) + sz) // second iteration - if sz > 3 { - err = errors.New("oversized padding parameter") - } - return sz, err +// addPayloadSizeField appends the auxiliary field containing the size of payload +func (msg *sentMessage) addPayloadSizeField(payload []byte) { + fieldSize := getSizeOfPayloadSizeField(payload) + field := make([]byte, 4) + binary.LittleEndian.PutUint32(field, uint32(len(payload))) + field = field[:fieldSize] + msg.Raw = append(msg.Raw, field...) + msg.Raw[0] |= byte(fieldSize) } -// sizeOfIntSize returns minimal number of bytes necessary to encode an integer value -func intSize(i int) (s int) { - for s = 1; i >= 256; s++ { - i /= 256 +// getSizeOfPayloadSizeField returns the number of bytes necessary to encode the size of payload +func getSizeOfPayloadSizeField(payload []byte) int { + s := 1 + for i := len(payload); i >= 256; i /= 256 { + s++ } return s } -// appendPadding appends the pseudorandom padding bytes and sets the padding flag. -// The last byte contains the size of padding (thus, its size must not exceed 256). +// appendPadding appends the padding specified in params. +// If no padding is provided in params, then random padding is generated. func (msg *sentMessage) appendPadding(params *MessageParams) error { - rawSize := len(params.Payload) + 1 + if len(params.Padding) != 0 { + // padding data was provided by the Dapp, just use it as is + msg.Raw = append(msg.Raw, params.Padding...) + return nil + } + + rawSize := flagsLength + getSizeOfPayloadSizeField(params.Payload) + len(params.Payload) if params.Src != nil { rawSize += signatureLength } odd := rawSize % padSizeLimit - - if len(params.Padding) != 0 { - padSize := len(params.Padding) - padLengthSize, err := getSizeOfLength(params.Padding) - if err != nil { - return err - } - totalPadSize := padSize + padLengthSize - buf := make([]byte, 8) - binary.LittleEndian.PutUint32(buf, uint32(totalPadSize)) - buf = buf[:padLengthSize] - msg.Raw = append(msg.Raw, buf...) - msg.Raw = append(msg.Raw, params.Padding...) - msg.Raw[0] |= byte(padLengthSize) // number of bytes indicating the padding size - } else if odd != 0 { - totalPadSize := padSizeLimit - odd - if totalPadSize > 255 { - // this algorithm is only valid if padSizeLimit < 256. - // if padSizeLimit will ever change, please fix the algorithm - // (please see also ReceivedMessage.extractPadding() function). - panic("please fix the padding algorithm before releasing new version") - } - buf := make([]byte, totalPadSize) - _, err := crand.Read(buf[1:]) - if err != nil { - return err - } - if totalPadSize > 6 && !validateSymmetricKey(buf) { - return errors.New("failed to generate random padding of size " + strconv.Itoa(totalPadSize)) - } - buf[0] = byte(totalPadSize) - msg.Raw = append(msg.Raw, buf...) - msg.Raw[0] |= byte(0x1) // number of bytes indicating the padding size + paddingSize := padSizeLimit - odd + pad := make([]byte, paddingSize) + _, err := crand.Read(pad) + if err != nil { + return err + } + if !validateDataIntegrity(pad, paddingSize) { + return errors.New("failed to generate random padding of size " + strconv.Itoa(paddingSize)) } + msg.Raw = append(msg.Raw, pad...) return nil } @@ -171,11 +156,11 @@ func (msg *sentMessage) sign(key *ecdsa.PrivateKey) error { return nil } - msg.Raw[0] |= signatureFlag + msg.Raw[0] |= signatureFlag // it is important to set this flag before signing hash := crypto.Keccak256(msg.Raw) signature, err := crypto.Sign(hash, key) if err != nil { - msg.Raw[0] &= ^signatureFlag // clear the flag + msg.Raw[0] &= (0xFF ^ signatureFlag) // clear the flag return err } msg.Raw = append(msg.Raw, signature...) @@ -196,31 +181,56 @@ func (msg *sentMessage) encryptAsymmetric(key *ecdsa.PublicKey) error { // encryptSymmetric encrypts a message with a topic key, using AES-GCM-256. // nonce size should be 12 bytes (see cipher.gcmStandardNonceSize). -func (msg *sentMessage) encryptSymmetric(key []byte) (nonce []byte, err error) { - if !validateSymmetricKey(key) { - return nil, errors.New("invalid key provided for symmetric encryption") +func (msg *sentMessage) encryptSymmetric(key []byte) (err error) { + if !validateDataIntegrity(key, aesKeyLength) { + return errors.New("invalid key provided for symmetric encryption, size: " + strconv.Itoa(len(key))) } - block, err := aes.NewCipher(key) if err != nil { - return nil, err + return err } aesgcm, err := cipher.NewGCM(block) if err != nil { - return nil, err + return err + } + salt, err := generateSecureRandomData(aesNonceLength) // never use more than 2^32 random nonces with a given key + if err != nil { + return err } + encrypted := aesgcm.Seal(nil, salt, msg.Raw, nil) + msg.Raw = append(encrypted, salt...) + return nil +} - // never use more than 2^32 random nonces with a given key - nonce = make([]byte, aesgcm.NonceSize()) - _, err = crand.Read(nonce) +// generateSecureRandomData generates random data where extra security is required. +// The purpose of this function is to prevent some bugs in software or in hardware +// from delivering not-very-random data. This is especially useful for AES nonce, +// where true randomness does not really matter, but it is very important to have +// a unique nonce for every message. +func generateSecureRandomData(length int) ([]byte, error) { + x := make([]byte, length) + y := make([]byte, length) + res := make([]byte, length) + + _, err := crand.Read(x) if err != nil { return nil, err - } else if !validateSymmetricKey(nonce) { - return nil, errors.New("crypto/rand failed to generate nonce") + } else if !validateDataIntegrity(x, length) { + return nil, errors.New("crypto/rand failed to generate secure random data") } - - msg.Raw = aesgcm.Seal(nil, nonce, msg.Raw, nil) - return nonce, nil + _, err = mrand.Read(y) + if err != nil { + return nil, err + } else if !validateDataIntegrity(y, length) { + return nil, errors.New("math/rand failed to generate secure random data") + } + for i := 0; i < length; i++ { + res[i] = x[i] ^ y[i] + } + if !validateDataIntegrity(res, length) { + return nil, errors.New("failed to generate secure random data") + } + return res, nil } // Wrap bundles the message into an Envelope to transmit over the network. @@ -233,11 +243,10 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er return nil, err } } - var nonce []byte if options.Dst != nil { err = msg.encryptAsymmetric(options.Dst) } else if options.KeySym != nil { - nonce, err = msg.encryptSymmetric(options.KeySym) + err = msg.encryptSymmetric(options.KeySym) } else { err = errors.New("unable to encrypt the message: neither symmetric nor assymmetric key provided") } @@ -245,7 +254,7 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er return nil, err } - envelope = NewEnvelope(options.TTL, options.Topic, nonce, msg) + envelope = NewEnvelope(options.TTL, options.Topic, msg) if err = envelope.Seal(options); err != nil { return nil, err } @@ -254,7 +263,13 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er // decryptSymmetric decrypts a message with a topic key, using AES-GCM-256. // nonce size should be 12 bytes (see cipher.gcmStandardNonceSize). -func (msg *ReceivedMessage) decryptSymmetric(key []byte, nonce []byte) error { +func (msg *ReceivedMessage) decryptSymmetric(key []byte) error { + // symmetric messages are expected to contain the 12-byte nonce at the end of the payload + if len(msg.Raw) < aesNonceLength { + return errors.New("missing salt or invalid payload in symmetric message") + } + salt := msg.Raw[len(msg.Raw)-aesNonceLength:] + block, err := aes.NewCipher(key) if err != nil { return err @@ -263,15 +278,12 @@ func (msg *ReceivedMessage) decryptSymmetric(key []byte, nonce []byte) error { if err != nil { return err } - if len(nonce) != aesgcm.NonceSize() { - log.Error("decrypting the message", "AES nonce size", len(nonce)) - return errors.New("wrong AES nonce size") - } - decrypted, err := aesgcm.Open(nil, nonce, msg.Raw, nil) + decrypted, err := aesgcm.Open(nil, salt, msg.Raw[:len(msg.Raw)-aesNonceLength], nil) if err != nil { return err } msg.Raw = decrypted + msg.Salt = salt return nil } @@ -284,8 +296,8 @@ func (msg *ReceivedMessage) decryptAsymmetric(key *ecdsa.PrivateKey) error { return err } -// Validate checks the validity and extracts the fields in case of success -func (msg *ReceivedMessage) Validate() bool { +// ValidateAndParse checks the message validity and extracts the fields in case of success. +func (msg *ReceivedMessage) ValidateAndParse() bool { end := len(msg.Raw) if end < 1 { return false @@ -296,41 +308,32 @@ func (msg *ReceivedMessage) Validate() bool { if end <= 1 { return false } - msg.Signature = msg.Raw[end:] + msg.Signature = msg.Raw[end : end+signatureLength] msg.Src = msg.SigToPubKey() if msg.Src == nil { return false } } - padSize, ok := msg.extractPadding(end) - if !ok { - return false + beg := 1 + payloadSize := 0 + sizeOfPayloadSizeField := int(msg.Raw[0] & SizeMask) // number of bytes indicating the size of payload + if sizeOfPayloadSizeField != 0 { + payloadSize = int(bytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField])) + if payloadSize+1 > end { + return false + } + beg += sizeOfPayloadSizeField + msg.Payload = msg.Raw[beg : beg+payloadSize] } - msg.Payload = msg.Raw[1+padSize : end] + beg += payloadSize + msg.Padding = msg.Raw[beg:end] return true } -// extractPadding extracts the padding from raw message. -// although we don't support sending messages with padding size -// exceeding 255 bytes, such messages are perfectly valid, and -// can be successfully decrypted. -func (msg *ReceivedMessage) extractPadding(end int) (int, bool) { - paddingSize := 0 - sz := int(msg.Raw[0] & paddingMask) // number of bytes indicating the entire size of padding (including these bytes) - // could be zero -- it means no padding - if sz != 0 { - paddingSize = int(bytesToUintLittleEndian(msg.Raw[1 : 1+sz])) - if paddingSize < sz || paddingSize+1 > end { - return 0, false - } - msg.Padding = msg.Raw[1+sz : 1+paddingSize] - } - return paddingSize, true -} - -// Recover retrieves the public key of the message signer. +// SigToPubKey returns the public key associated to the message's +// signature. func (msg *ReceivedMessage) SigToPubKey() *ecdsa.PublicKey { defer func() { recover() }() // in case of invalid signature @@ -342,7 +345,7 @@ func (msg *ReceivedMessage) SigToPubKey() *ecdsa.PublicKey { return pub } -// hash calculates the SHA3 checksum of the message flags, payload and padding. +// hash calculates the SHA3 checksum of the message flags, payload size field, payload and padding. func (msg *ReceivedMessage) hash() []byte { if isMessageSigned(msg.Raw[0]) { sz := len(msg.Raw) - signatureLength diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index ac7b3b12b..4ef0f3c43 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -18,6 +18,7 @@ package whisperv6 import ( "fmt" + "math" "time" "github.com/ethereum/go-ethereum/common" @@ -27,12 +28,16 @@ import ( set "gopkg.in/fatih/set.v0" ) -// peer represents a whisper protocol peer connection. +// Peer represents a whisper protocol peer connection. type Peer struct { - host *Whisper - peer *p2p.Peer - ws p2p.MsgReadWriter - trusted bool + host *Whisper + peer *p2p.Peer + ws p2p.MsgReadWriter + + trusted bool + powRequirement float64 + bloomFilter []byte + fullNode bool known *set.Set // Messages already known by the peer to avoid wasting bandwidth @@ -42,62 +47,93 @@ type Peer struct { // newPeer creates a new whisper peer object, but does not run the handshake itself. func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer { return &Peer{ - host: host, - peer: remote, - ws: rw, - trusted: false, - known: set.New(), - quit: make(chan struct{}), + host: host, + peer: remote, + ws: rw, + trusted: false, + powRequirement: 0.0, + known: set.New(), + quit: make(chan struct{}), + bloomFilter: makeFullNodeBloom(), + fullNode: true, } } // start initiates the peer updater, periodically broadcasting the whisper packets // into the network. -func (p *Peer) start() { - go p.update() - log.Trace("start", "peer", p.ID()) +func (peer *Peer) start() { + go peer.update() + log.Trace("start", "peer", peer.ID()) } // stop terminates the peer updater, stopping message forwarding to it. -func (p *Peer) stop() { - close(p.quit) - log.Trace("stop", "peer", p.ID()) +func (peer *Peer) stop() { + close(peer.quit) + log.Trace("stop", "peer", peer.ID()) } // handshake sends the protocol initiation status message to the remote peer and // verifies the remote status too. -func (p *Peer) handshake() error { +func (peer *Peer) handshake() error { // Send the handshake status message asynchronously errc := make(chan error, 1) go func() { - errc <- p2p.Send(p.ws, statusCode, ProtocolVersion) + pow := peer.host.MinPow() + powConverted := math.Float64bits(pow) + bloom := peer.host.BloomFilter() + errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom) }() + // Fetch the remote status packet and verify protocol match - packet, err := p.ws.ReadMsg() + packet, err := peer.ws.ReadMsg() if err != nil { return err } if packet.Code != statusCode { - return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code) + return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code) } s := rlp.NewStream(packet.Payload, uint64(packet.Size)) + _, err = s.List() + if err != nil { + return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err) + } peerVersion, err := s.Uint() if err != nil { - return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err) + return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err) } if peerVersion != ProtocolVersion { - return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion) + return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion) + } + + // only version is mandatory, subsequent parameters are optional + powRaw, err := s.Uint() + if err == nil { + pow := math.Float64frombits(powRaw) + if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 { + return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID()) + } + peer.powRequirement = pow + + var bloom []byte + err = s.Decode(&bloom) + if err == nil { + sz := len(bloom) + if sz != bloomFilterSize && sz != 0 { + return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz) + } + peer.setBloomFilter(bloom) + } } - // Wait until out own status is consumed too + if err := <-errc; err != nil { - return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err) + return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err) } return nil } // update executes periodic operations on the peer, including message transmission // and expiration. -func (p *Peer) update() { +func (peer *Peer) update() { // Start the tickers for the updates expire := time.NewTicker(expirationCycle) transmit := time.NewTicker(transmissionCycle) @@ -106,15 +142,15 @@ func (p *Peer) update() { for { select { case <-expire.C: - p.expire() + peer.expire() case <-transmit.C: - if err := p.broadcast(); err != nil { - log.Trace("broadcast failed", "reason", err, "peer", p.ID()) + if err := peer.broadcast(); err != nil { + log.Trace("broadcast failed", "reason", err, "peer", peer.ID()) return } - case <-p.quit: + case <-peer.quit: return } } @@ -148,27 +184,62 @@ func (peer *Peer) expire() { // broadcast iterates over the collection of envelopes and transmits yet unknown // ones over the network. -func (p *Peer) broadcast() error { - var cnt int - envelopes := p.host.Envelopes() +func (peer *Peer) broadcast() error { + envelopes := peer.host.Envelopes() + bundle := make([]*Envelope, 0, len(envelopes)) for _, envelope := range envelopes { - if !p.marked(envelope) { - err := p2p.Send(p.ws, messagesCode, envelope) - if err != nil { - return err - } else { - p.mark(envelope) - cnt++ - } + if !peer.marked(envelope) && envelope.PoW() >= peer.powRequirement && peer.bloomMatch(envelope) { + bundle = append(bundle, envelope) } } - if cnt > 0 { - log.Trace("broadcast", "num. messages", cnt) + + if len(bundle) > 0 { + // transmit the batch of envelopes + if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil { + return err + } + + // mark envelopes only if they were successfully sent + for _, e := range bundle { + peer.mark(e) + } + + log.Trace("broadcast", "num. messages", len(bundle)) } return nil } -func (p *Peer) ID() []byte { - id := p.peer.ID() +// ID returns a peer's id +func (peer *Peer) ID() []byte { + id := peer.peer.ID() return id[:] } + +func (peer *Peer) notifyAboutPowRequirementChange(pow float64) error { + i := math.Float64bits(pow) + return p2p.Send(peer.ws, powRequirementCode, i) +} + +func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error { + return p2p.Send(peer.ws, bloomFilterExCode, bloom) +} + +func (peer *Peer) bloomMatch(env *Envelope) bool { + return peer.fullNode || bloomFilterMatch(peer.bloomFilter, env.Bloom()) +} + +func (peer *Peer) setBloomFilter(bloom []byte) { + peer.bloomFilter = bloom + peer.fullNode = isFullNode(bloom) + if peer.fullNode && peer.bloomFilter == nil { + peer.bloomFilter = makeFullNodeBloom() + } +} + +func makeFullNodeBloom() []byte { + bloom := make([]byte, bloomFilterSize) + for i := 0; i < bloomFilterSize; i++ { + bloom[i] = 0xFF + } + return bloom +} diff --git a/whisper/whisperv6/topic.go b/whisper/whisperv6/topic.go index bf5da01e3..4dd8f283c 100644 --- a/whisper/whisperv6/topic.go +++ b/whisper/whisperv6/topic.go @@ -23,11 +23,13 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ) -// Topic represents a cryptographically secure, probabilistic partial +// TopicType represents a cryptographically secure, probabilistic partial // classifications of a message, determined as the first (left) 4 bytes of the // SHA3 hash of some arbitrary data given by the original author of the message. type TopicType [TopicLength]byte +// BytesToTopic converts from the byte array representation of a topic +// into the TopicType type. func BytesToTopic(b []byte) (t TopicType) { sz := TopicLength if x := len(b); x < TopicLength { diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index e2b884f3d..600f9cb28 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -19,9 +19,9 @@ package whisperv6 import ( "bytes" "crypto/ecdsa" - crand "crypto/rand" "crypto/sha256" "fmt" + "math" "runtime" "sync" "time" @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/syndtr/goleveldb/leveldb/errors" "golang.org/x/crypto/pbkdf2" @@ -37,6 +38,8 @@ import ( set "gopkg.in/fatih/set.v0" ) +// Statistics holds several message-related counter for analytics +// purposes. type Statistics struct { messagesCleared int memoryCleared int @@ -46,9 +49,12 @@ type Statistics struct { } const ( - minPowIdx = iota // Minimal PoW required by the whisper node - maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node - overflowIdx = iota // Indicator of message queue overflow + maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node + overflowIdx // Indicator of message queue overflow + minPowIdx // Minimal PoW required by the whisper node + minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time + bloomFilterIdx // Bloom filter for topics of interest for this node + bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time ) // Whisper represents a dark communication interface through the Ethereum @@ -74,6 +80,8 @@ type Whisper struct { settings syncmap.Map // holds configuration settings that can be dynamically changed + syncAllowance int // maximum time in seconds allowed to process the whisper-related messages + statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node @@ -87,14 +95,15 @@ func New(cfg *Config) *Whisper { } whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - expirations: make(map[uint32]*set.SetNonTS), - peers: make(map[*Peer]struct{}), - messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), - quit: make(chan struct{}), + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), + syncAllowance: DefaultSyncAllowance, } whisper.filters = NewFilters(whisper) @@ -121,30 +130,74 @@ func New(cfg *Config) *Whisper { return whisper } -func (w *Whisper) MinPow() float64 { - val, _ := w.settings.Load(minPowIdx) +// MinPow returns the PoW value required by this node. +func (whisper *Whisper) MinPow() float64 { + val, exist := whisper.settings.Load(minPowIdx) + if !exist || val == nil { + return DefaultMinimumPoW + } + v, ok := val.(float64) + if !ok { + log.Error("Error loading minPowIdx, using default") + return DefaultMinimumPoW + } + return v +} + +// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited +// time after PoW was changed. If sufficient time have elapsed or no change of PoW +// have ever occurred, the return value will be the same as return value of MinPow(). +func (whisper *Whisper) MinPowTolerance() float64 { + val, exist := whisper.settings.Load(minPowToleranceIdx) + if !exist || val == nil { + return DefaultMinimumPoW + } return val.(float64) } +// BloomFilter returns the aggregated bloom filter for all the topics of interest. +// The nodes are required to send only messages that match the advertised bloom filter. +// If a message does not match the bloom, it will tantamount to spam, and the peer will +// be disconnected. +func (whisper *Whisper) BloomFilter() []byte { + val, exist := whisper.settings.Load(bloomFilterIdx) + if !exist || val == nil { + return nil + } + return val.([]byte) +} + +// BloomFilterTolerance returns the bloom filter which is tolerated for a limited +// time after new bloom was advertised to the peers. If sufficient time have elapsed +// or no change of bloom filter have ever occurred, the return value will be the same +// as return value of BloomFilter(). +func (whisper *Whisper) BloomFilterTolerance() []byte { + val, exist := whisper.settings.Load(bloomFilterToleranceIdx) + if !exist || val == nil { + return nil + } + return val.([]byte) +} + // MaxMessageSize returns the maximum accepted message size. -func (w *Whisper) MaxMessageSize() uint32 { - val, _ := w.settings.Load(maxMsgSizeIdx) +func (whisper *Whisper) MaxMessageSize() uint32 { + val, _ := whisper.settings.Load(maxMsgSizeIdx) return val.(uint32) } // Overflow returns an indication if the message queue is full. -func (w *Whisper) Overflow() bool { - val, _ := w.settings.Load(overflowIdx) +func (whisper *Whisper) Overflow() bool { + val, _ := whisper.settings.Load(overflowIdx) return val.(bool) } // APIs returns the RPC descriptors the Whisper implementation offers -func (w *Whisper) APIs() []rpc.API { +func (whisper *Whisper) APIs() []rpc.API { return []rpc.API{ { Namespace: ProtocolName, Version: ProtocolVersionStr, - Service: NewPublicWhisperAPI(w), + Service: NewPublicWhisperAPI(whisper), Public: true, }, } @@ -152,43 +205,120 @@ func (w *Whisper) APIs() []rpc.API { // RegisterServer registers MailServer interface. // MailServer will process all the incoming messages with p2pRequestCode. -func (w *Whisper) RegisterServer(server MailServer) { - w.mailServer = server +func (whisper *Whisper) RegisterServer(server MailServer) { + whisper.mailServer = server } // Protocols returns the whisper sub-protocols ran by this particular client. -func (w *Whisper) Protocols() []p2p.Protocol { - return []p2p.Protocol{w.protocol} +func (whisper *Whisper) Protocols() []p2p.Protocol { + return []p2p.Protocol{whisper.protocol} } // Version returns the whisper sub-protocols version number. -func (w *Whisper) Version() uint { - return w.protocol.Version +func (whisper *Whisper) Version() uint { + return whisper.protocol.Version } // SetMaxMessageSize sets the maximal message size allowed by this node -func (w *Whisper) SetMaxMessageSize(size uint32) error { +func (whisper *Whisper) SetMaxMessageSize(size uint32) error { if size > MaxMessageSize { return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize) } - w.settings.Store(maxMsgSizeIdx, size) + whisper.settings.Store(maxMsgSizeIdx, size) + return nil +} + +// SetBloomFilter sets the new bloom filter +func (whisper *Whisper) SetBloomFilter(bloom []byte) error { + if len(bloom) != bloomFilterSize { + return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) + } + + b := make([]byte, bloomFilterSize) + copy(b, bloom) + + whisper.settings.Store(bloomFilterIdx, b) + whisper.notifyPeersAboutBloomFilterChange(b) + + go func() { + // allow some time before all the peers have processed the notification + time.Sleep(time.Duration(whisper.syncAllowance) * time.Second) + whisper.settings.Store(bloomFilterToleranceIdx, b) + }() + return nil } // SetMinimumPoW sets the minimal PoW required by this node -func (w *Whisper) SetMinimumPoW(val float64) error { - if val <= 0.0 { +func (whisper *Whisper) SetMinimumPoW(val float64) error { + if val < 0.0 { return fmt.Errorf("invalid PoW: %f", val) } - w.settings.Store(minPowIdx, val) + + whisper.settings.Store(minPowIdx, val) + whisper.notifyPeersAboutPowRequirementChange(val) + + go func() { + // allow some time before all the peers have processed the notification + time.Sleep(time.Duration(whisper.syncAllowance) * time.Second) + whisper.settings.Store(minPowToleranceIdx, val) + }() + return nil } +// SetMinimumPowTest sets the minimal PoW in test environment +func (whisper *Whisper) SetMinimumPowTest(val float64) { + whisper.settings.Store(minPowIdx, val) + whisper.notifyPeersAboutPowRequirementChange(val) + whisper.settings.Store(minPowToleranceIdx, val) +} + +func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { + arr := whisper.getPeers() + for _, p := range arr { + err := p.notifyAboutPowRequirementChange(pow) + if err != nil { + // allow one retry + err = p.notifyAboutPowRequirementChange(pow) + } + if err != nil { + log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err) + } + } +} + +func (whisper *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) { + arr := whisper.getPeers() + for _, p := range arr { + err := p.notifyAboutBloomFilterChange(bloom) + if err != nil { + // allow one retry + err = p.notifyAboutBloomFilterChange(bloom) + } + if err != nil { + log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err) + } + } +} + +func (whisper *Whisper) getPeers() []*Peer { + arr := make([]*Peer, len(whisper.peers)) + i := 0 + whisper.peerMu.Lock() + for p := range whisper.peers { + arr[i] = p + i++ + } + whisper.peerMu.Unlock() + return arr +} + // getPeer retrieves peer by ID -func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { - w.peerMu.Lock() - defer w.peerMu.Unlock() - for p := range w.peers { +func (whisper *Whisper) getPeer(peerID []byte) (*Peer, error) { + whisper.peerMu.Lock() + defer whisper.peerMu.Unlock() + for p := range whisper.peers { id := p.peer.ID() if bytes.Equal(peerID, id[:]) { return p, nil @@ -199,8 +329,8 @@ func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { // AllowP2PMessagesFromPeer marks specific peer trusted, // which will allow it to send historic (expired) messages. -func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { - p, err := w.getPeer(peerID) +func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { + p, err := whisper.getPeer(peerID) if err != nil { return err } @@ -213,8 +343,8 @@ func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { // request and respond with a number of peer-to-peer messages (possibly expired), // which are not supposed to be forwarded any further. // The whisper protocol is agnostic of the format and contents of envelope. -func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { - p, err := w.getPeer(peerID) +func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { + p, err := whisper.getPeer(peerID) if err != nil { return err } @@ -223,22 +353,22 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) err } // SendP2PMessage sends a peer-to-peer message to a specific peer. -func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { - p, err := w.getPeer(peerID) +func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { + p, err := whisper.getPeer(peerID) if err != nil { return err } - return w.SendP2PDirect(p, envelope) + return whisper.SendP2PDirect(p, envelope) } // SendP2PDirect sends a peer-to-peer message to a specific peer. -func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error { - return p2p.Send(peer.ws, p2pCode, envelope) +func (whisper *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error { + return p2p.Send(peer.ws, p2pMessageCode, envelope) } // NewKeyPair generates a new cryptographic identity for the client, and injects // it into the known identities for message decryption. Returns ID of the new key pair. -func (w *Whisper) NewKeyPair() (string, error) { +func (whisper *Whisper) NewKeyPair() (string, error) { key, err := crypto.GenerateKey() if err != nil || !validatePrivateKey(key) { key, err = crypto.GenerateKey() // retry once @@ -255,55 +385,55 @@ func (w *Whisper) NewKeyPair() (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.privateKeys[id] != nil { + if whisper.privateKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - w.privateKeys[id] = key + whisper.privateKeys[id] = key return id, nil } // DeleteKeyPair deletes the specified key if it exists. -func (w *Whisper) DeleteKeyPair(key string) bool { - w.keyMu.Lock() - defer w.keyMu.Unlock() +func (whisper *Whisper) DeleteKeyPair(key string) bool { + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.privateKeys[key] != nil { - delete(w.privateKeys, key) + if whisper.privateKeys[key] != nil { + delete(whisper.privateKeys, key) return true } return false } // AddKeyPair imports a asymmetric private key and returns it identifier. -func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { +func (whisper *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { id, err := GenerateRandomID() if err != nil { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - w.privateKeys[id] = key - w.keyMu.Unlock() + whisper.keyMu.Lock() + whisper.privateKeys[id] = key + whisper.keyMu.Unlock() return id, nil } // HasKeyPair checks if the the whisper node is configured with the private key // of the specified public pair. -func (w *Whisper) HasKeyPair(id string) bool { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - return w.privateKeys[id] != nil +func (whisper *Whisper) HasKeyPair(id string) bool { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + return whisper.privateKeys[id] != nil } // GetPrivateKey retrieves the private key of the specified identity. -func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - key := w.privateKeys[id] +func (whisper *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + key := whisper.privateKeys[id] if key == nil { return nil, fmt.Errorf("invalid id") } @@ -312,12 +442,11 @@ func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { // GenerateSymKey generates a random symmetric key and stores it under id, // which is then returned. Will be used in the future for session key exchange. -func (w *Whisper) GenerateSymKey() (string, error) { - key := make([]byte, aesKeyLength) - _, err := crand.Read(key) +func (whisper *Whisper) GenerateSymKey() (string, error) { + key, err := generateSecureRandomData(aesKeyLength) if err != nil { return "", err - } else if !validateSymmetricKey(key) { + } else if !validateDataIntegrity(key, aesKeyLength) { return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data") } @@ -326,18 +455,18 @@ func (w *Whisper) GenerateSymKey() (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.symKeys[id] != nil { + if whisper.symKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - w.symKeys[id] = key + whisper.symKeys[id] = key return id, nil } // AddSymKeyDirect stores the key, and returns its id. -func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) { +func (whisper *Whisper) AddSymKeyDirect(key []byte) (string, error) { if len(key) != aesKeyLength { return "", fmt.Errorf("wrong key size: %d", len(key)) } @@ -347,85 +476,108 @@ func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() - if w.symKeys[id] != nil { + if whisper.symKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - w.symKeys[id] = key + whisper.symKeys[id] = key return id, nil } // AddSymKeyFromPassword generates the key from password, stores it, and returns its id. -func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) { +func (whisper *Whisper) AddSymKeyFromPassword(password string) (string, error) { id, err := GenerateRandomID() if err != nil { return "", fmt.Errorf("failed to generate ID: %s", err) } - if w.HasSymKey(id) { + if whisper.HasSymKey(id) { return "", fmt.Errorf("failed to generate unique ID") } - derived, err := deriveKeyMaterial([]byte(password), EnvelopeVersion) + // kdf should run no less than 0.1 seconds on an average computer, + // because it's an once in a session experience + derived := pbkdf2.Key([]byte(password), nil, 65356, aesKeyLength, sha256.New) if err != nil { return "", err } - w.keyMu.Lock() - defer w.keyMu.Unlock() + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() // double check is necessary, because deriveKeyMaterial() is very slow - if w.symKeys[id] != nil { + if whisper.symKeys[id] != nil { return "", fmt.Errorf("critical error: failed to generate unique ID") } - w.symKeys[id] = derived + whisper.symKeys[id] = derived return id, nil } // HasSymKey returns true if there is a key associated with the given id. // Otherwise returns false. -func (w *Whisper) HasSymKey(id string) bool { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - return w.symKeys[id] != nil +func (whisper *Whisper) HasSymKey(id string) bool { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + return whisper.symKeys[id] != nil } // DeleteSymKey deletes the key associated with the name string if it exists. -func (w *Whisper) DeleteSymKey(id string) bool { - w.keyMu.Lock() - defer w.keyMu.Unlock() - if w.symKeys[id] != nil { - delete(w.symKeys, id) +func (whisper *Whisper) DeleteSymKey(id string) bool { + whisper.keyMu.Lock() + defer whisper.keyMu.Unlock() + if whisper.symKeys[id] != nil { + delete(whisper.symKeys, id) return true } return false } // GetSymKey returns the symmetric key associated with the given id. -func (w *Whisper) GetSymKey(id string) ([]byte, error) { - w.keyMu.RLock() - defer w.keyMu.RUnlock() - if w.symKeys[id] != nil { - return w.symKeys[id], nil +func (whisper *Whisper) GetSymKey(id string) ([]byte, error) { + whisper.keyMu.RLock() + defer whisper.keyMu.RUnlock() + if whisper.symKeys[id] != nil { + return whisper.symKeys[id], nil } return nil, fmt.Errorf("non-existent key ID") } // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -func (w *Whisper) Subscribe(f *Filter) (string, error) { - return w.filters.Install(f) +func (whisper *Whisper) Subscribe(f *Filter) (string, error) { + s, err := whisper.filters.Install(f) + if err == nil { + whisper.updateBloomFilter(f) + } + return s, err +} + +// updateBloomFilter recalculates the new value of bloom filter, +// and informs the peers if necessary. +func (whisper *Whisper) updateBloomFilter(f *Filter) { + aggregate := make([]byte, bloomFilterSize) + for _, t := range f.Topics { + top := BytesToTopic(t) + b := TopicToBloom(top) + aggregate = addBloom(aggregate, b) + } + + if !bloomFilterMatch(whisper.BloomFilter(), aggregate) { + // existing bloom filter must be updated + aggregate = addBloom(whisper.BloomFilter(), aggregate) + whisper.SetBloomFilter(aggregate) + } } // GetFilter returns the filter by id. -func (w *Whisper) GetFilter(id string) *Filter { - return w.filters.Get(id) +func (whisper *Whisper) GetFilter(id string) *Filter { + return whisper.filters.Get(id) } // Unsubscribe removes an installed message handler. -func (w *Whisper) Unsubscribe(id string) error { - ok := w.filters.Uninstall(id) +func (whisper *Whisper) Unsubscribe(id string) error { + ok := whisper.filters.Uninstall(id) if !ok { return fmt.Errorf("Unsubscribe: Invalid ID") } @@ -434,8 +586,8 @@ func (w *Whisper) Unsubscribe(id string) error { // Send injects a message into the whisper send queue, to be distributed in the // network in the coming cycles. -func (w *Whisper) Send(envelope *Envelope) error { - ok, err := w.add(envelope) +func (whisper *Whisper) Send(envelope *Envelope) error { + ok, err := whisper.add(envelope) if err != nil { return err } @@ -447,13 +599,13 @@ func (w *Whisper) Send(envelope *Envelope) error { // Start implements node.Service, starting the background data propagation thread // of the Whisper protocol. -func (w *Whisper) Start(*p2p.Server) error { +func (whisper *Whisper) Start(*p2p.Server) error { log.Info("started whisper v." + ProtocolVersionStr) - go w.update() + go whisper.update() numCPU := runtime.NumCPU() for i := 0; i < numCPU; i++ { - go w.processQueue() + go whisper.processQueue() } return nil @@ -461,26 +613,26 @@ func (w *Whisper) Start(*p2p.Server) error { // Stop implements node.Service, stopping the background data propagation thread // of the Whisper protocol. -func (w *Whisper) Stop() error { - close(w.quit) +func (whisper *Whisper) Stop() error { + close(whisper.quit) log.Info("whisper stopped") return nil } // HandlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. -func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { +func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { // Create the new peer and start tracking it - whisperPeer := newPeer(wh, peer, rw) + whisperPeer := newPeer(whisper, peer, rw) - wh.peerMu.Lock() - wh.peers[whisperPeer] = struct{}{} - wh.peerMu.Unlock() + whisper.peerMu.Lock() + whisper.peers[whisperPeer] = struct{}{} + whisper.peerMu.Unlock() defer func() { - wh.peerMu.Lock() - delete(wh.peers, whisperPeer) - wh.peerMu.Unlock() + whisper.peerMu.Lock() + delete(whisper.peers, whisperPeer) + whisper.peerMu.Unlock() }() // Run the peer handshake and state updates @@ -490,11 +642,11 @@ func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { whisperPeer.start() defer whisperPeer.stop() - return wh.runMessageLoop(whisperPeer, rw) + return whisper.runMessageLoop(whisperPeer, rw) } // runMessageLoop reads and processes inbound messages directly to merge into client-global state. -func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { +func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { for { // fetch the next packet packet, err := rw.ReadMsg() @@ -502,7 +654,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("message loop", "peer", p.peer.ID(), "err", err) return err } - if packet.Size > wh.MaxMessageSize() { + if packet.Size > whisper.MaxMessageSize() { log.Warn("oversized message received", "peer", p.peer.ID()) return errors.New("oversized message received") } @@ -513,20 +665,53 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("unxepected status message received", "peer", p.peer.ID()) case messagesCode: // decode the contained envelopes - var envelope Envelope - if err := packet.Decode(&envelope); err != nil { - log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err) + var envelopes []*Envelope + if err := packet.Decode(&envelopes); err != nil { + log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid envelopes") + } + + trouble := false + for _, env := range envelopes { + cached, err := whisper.add(env) + if err != nil { + trouble = true + log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) + } + if cached { + p.mark(env) + } + } + + if trouble { return errors.New("invalid envelope") } - cached, err := wh.add(&envelope) + case powRequirementCode: + s := rlp.NewStream(packet.Payload, uint64(packet.Size)) + i, err := s.Uint() if err != nil { - log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid envelope") + log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid powRequirementCode message") + } + f := math.Float64frombits(i) + if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 { + log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid value in powRequirementCode message") + } + p.powRequirement = f + case bloomFilterExCode: + var bloom []byte + err := packet.Decode(&bloom) + if err == nil && len(bloom) != bloomFilterSize { + err = fmt.Errorf("wrong bloom filter size %d", len(bloom)) } - if cached { - p.mark(&envelope) + + if err != nil { + log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid bloom filter exchange message") } - case p2pCode: + p.setBloomFilter(bloom) + case p2pMessageCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and // therefore might not satisfy the PoW, expiry and other requirements. @@ -537,17 +722,17 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid direct message") } - wh.postEvent(&envelope, true) + whisper.postEvent(&envelope, true) } case p2pRequestCode: // Must be processed if mail server is implemented. Otherwise ignore. - if wh.mailServer != nil { + if whisper.mailServer != nil { var request Envelope if err := packet.Decode(&request); err != nil { log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid p2p request") } - wh.mailServer.DeliverMail(p, &request) + whisper.mailServer.DeliverMail(p, &request) } default: // New message types might be implemented in the future versions of Whisper. @@ -561,130 +746,126 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. -func (wh *Whisper) add(envelope *Envelope) (bool, error) { +func (whisper *Whisper) add(envelope *Envelope) (bool, error) { now := uint32(time.Now().Unix()) sent := envelope.Expiry - envelope.TTL if sent > now { - if sent-SynchAllowance > now { + if sent-DefaultSyncAllowance > now { return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) - } else { - // recalculate PoW, adjusted for the time difference, plus one second for latency - envelope.calculatePoW(sent - now + 1) } + // recalculate PoW, adjusted for the time difference, plus one second for latency + envelope.calculatePoW(sent - now + 1) } if envelope.Expiry < now { - if envelope.Expiry+SynchAllowance*2 < now { + if envelope.Expiry+DefaultSyncAllowance*2 < now { return false, fmt.Errorf("very old message") - } else { - log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error } + log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) + return false, nil // drop envelope without error } - if uint32(envelope.size()) > wh.MaxMessageSize() { + if uint32(envelope.size()) > whisper.MaxMessageSize() { return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } - if len(envelope.Version) > 4 { - return false, fmt.Errorf("oversized version [%x]", envelope.Hash()) - } - - aesNonceSize := len(envelope.AESNonce) - if aesNonceSize != 0 && aesNonceSize != AESNonceLength { - // the standard AES GCM nonce size is 12 bytes, - // but constant gcmStandardNonceSize cannot be accessed (not exported) - return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash()) + if envelope.PoW() < whisper.MinPow() { + // maybe the value was recently changed, and the peers did not adjust yet. + // in this case the previous value is retrieved by MinPowTolerance() + // for a short period of peer synchronization. + if envelope.PoW() < whisper.MinPowTolerance() { + return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + } } - if envelope.PoW() < wh.MinPow() { - log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error + if !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) { + // maybe the value was recently changed, and the peers did not adjust yet. + // in this case the previous value is retrieved by BloomFilterTolerance() + // for a short period of peer synchronization. + if !bloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) { + return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x", + envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic) + } } hash := envelope.Hash() - wh.poolMu.Lock() - _, alreadyCached := wh.envelopes[hash] + whisper.poolMu.Lock() + _, alreadyCached := whisper.envelopes[hash] if !alreadyCached { - wh.envelopes[hash] = envelope - if wh.expirations[envelope.Expiry] == nil { - wh.expirations[envelope.Expiry] = set.NewNonTS() + whisper.envelopes[hash] = envelope + if whisper.expirations[envelope.Expiry] == nil { + whisper.expirations[envelope.Expiry] = set.NewNonTS() } - if !wh.expirations[envelope.Expiry].Has(hash) { - wh.expirations[envelope.Expiry].Add(hash) + if !whisper.expirations[envelope.Expiry].Has(hash) { + whisper.expirations[envelope.Expiry].Add(hash) } } - wh.poolMu.Unlock() + whisper.poolMu.Unlock() if alreadyCached { log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) } else { log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) - wh.statsMu.Lock() - wh.stats.memoryUsed += envelope.size() - wh.statsMu.Unlock() - wh.postEvent(envelope, false) // notify the local node about the new message - if wh.mailServer != nil { - wh.mailServer.Archive(envelope) + whisper.statsMu.Lock() + whisper.stats.memoryUsed += envelope.size() + whisper.statsMu.Unlock() + whisper.postEvent(envelope, false) // notify the local node about the new message + if whisper.mailServer != nil { + whisper.mailServer.Archive(envelope) } } return true, nil } // postEvent queues the message for further processing. -func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { - // if the version of incoming message is higher than - // currently supported version, we can not decrypt it, - // and therefore just ignore this message - if envelope.Ver() <= EnvelopeVersion { - if isP2P { - w.p2pMsgQueue <- envelope - } else { - w.checkOverflow() - w.messageQueue <- envelope - } +func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) { + if isP2P { + whisper.p2pMsgQueue <- envelope + } else { + whisper.checkOverflow() + whisper.messageQueue <- envelope } } // checkOverflow checks if message queue overflow occurs and reports it if necessary. -func (w *Whisper) checkOverflow() { - queueSize := len(w.messageQueue) +func (whisper *Whisper) checkOverflow() { + queueSize := len(whisper.messageQueue) if queueSize == messageQueueLimit { - if !w.Overflow() { - w.settings.Store(overflowIdx, true) + if !whisper.Overflow() { + whisper.settings.Store(overflowIdx, true) log.Warn("message queue overflow") } } else if queueSize <= messageQueueLimit/2 { - if w.Overflow() { - w.settings.Store(overflowIdx, false) + if whisper.Overflow() { + whisper.settings.Store(overflowIdx, false) log.Warn("message queue overflow fixed (back to normal)") } } } // processQueue delivers the messages to the watchers during the lifetime of the whisper node. -func (w *Whisper) processQueue() { +func (whisper *Whisper) processQueue() { var e *Envelope for { select { - case <-w.quit: + case <-whisper.quit: return - case e = <-w.messageQueue: - w.filters.NotifyWatchers(e, false) + case e = <-whisper.messageQueue: + whisper.filters.NotifyWatchers(e, false) - case e = <-w.p2pMsgQueue: - w.filters.NotifyWatchers(e, true) + case e = <-whisper.p2pMsgQueue: + whisper.filters.NotifyWatchers(e, true) } } } // update loops until the lifetime of the whisper node, updating its internal // state by expiring stale messages from the pool. -func (w *Whisper) update() { +func (whisper *Whisper) update() { // Start a ticker to check for expirations expire := time.NewTicker(expirationCycle) @@ -692,9 +873,9 @@ func (w *Whisper) update() { for { select { case <-expire.C: - w.expire() + whisper.expire() - case <-w.quit: + case <-whisper.quit: return } } @@ -702,46 +883,46 @@ func (w *Whisper) update() { // expire iterates over all the expiration timestamps, removing all stale // messages from the pools. -func (w *Whisper) expire() { - w.poolMu.Lock() - defer w.poolMu.Unlock() +func (whisper *Whisper) expire() { + whisper.poolMu.Lock() + defer whisper.poolMu.Unlock() - w.statsMu.Lock() - defer w.statsMu.Unlock() - w.stats.reset() + whisper.statsMu.Lock() + defer whisper.statsMu.Unlock() + whisper.stats.reset() now := uint32(time.Now().Unix()) - for expiry, hashSet := range w.expirations { + for expiry, hashSet := range whisper.expirations { if expiry < now { // Dump all expired messages and remove timestamp hashSet.Each(func(v interface{}) bool { - sz := w.envelopes[v.(common.Hash)].size() - delete(w.envelopes, v.(common.Hash)) - w.stats.messagesCleared++ - w.stats.memoryCleared += sz - w.stats.memoryUsed -= sz + sz := whisper.envelopes[v.(common.Hash)].size() + delete(whisper.envelopes, v.(common.Hash)) + whisper.stats.messagesCleared++ + whisper.stats.memoryCleared += sz + whisper.stats.memoryUsed -= sz return true }) - w.expirations[expiry].Clear() - delete(w.expirations, expiry) + whisper.expirations[expiry].Clear() + delete(whisper.expirations, expiry) } } } // Stats returns the whisper node statistics. -func (w *Whisper) Stats() Statistics { - w.statsMu.Lock() - defer w.statsMu.Unlock() +func (whisper *Whisper) Stats() Statistics { + whisper.statsMu.Lock() + defer whisper.statsMu.Unlock() - return w.stats + return whisper.stats } // Envelopes retrieves all the messages currently pooled by the node. -func (w *Whisper) Envelopes() []*Envelope { - w.poolMu.RLock() - defer w.poolMu.RUnlock() +func (whisper *Whisper) Envelopes() []*Envelope { + whisper.poolMu.RLock() + defer whisper.poolMu.RUnlock() - all := make([]*Envelope, 0, len(w.envelopes)) - for _, envelope := range w.envelopes { + all := make([]*Envelope, 0, len(whisper.envelopes)) + for _, envelope := range whisper.envelopes { all = append(all, envelope) } return all @@ -749,13 +930,13 @@ func (w *Whisper) Envelopes() []*Envelope { // Messages iterates through all currently floating envelopes // and retrieves all the messages, that this filter could decrypt. -func (w *Whisper) Messages(id string) []*ReceivedMessage { +func (whisper *Whisper) Messages(id string) []*ReceivedMessage { result := make([]*ReceivedMessage, 0) - w.poolMu.RLock() - defer w.poolMu.RUnlock() + whisper.poolMu.RLock() + defer whisper.poolMu.RUnlock() - if filter := w.filters.Get(id); filter != nil { - for _, env := range w.envelopes { + if filter := whisper.filters.Get(id); filter != nil { + for _, env := range whisper.envelopes { msg := filter.processEnvelope(env) if msg != nil { result = append(result, msg) @@ -766,11 +947,11 @@ func (w *Whisper) Messages(id string) []*ReceivedMessage { } // isEnvelopeCached checks if envelope with specific hash has already been received and cached. -func (w *Whisper) isEnvelopeCached(hash common.Hash) bool { - w.poolMu.Lock() - defer w.poolMu.Unlock() +func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool { + whisper.poolMu.Lock() + defer whisper.poolMu.Unlock() - _, exist := w.envelopes[hash] + _, exist := whisper.envelopes[hash] return exist } @@ -796,9 +977,16 @@ func validatePrivateKey(k *ecdsa.PrivateKey) bool { return ValidatePublicKey(&k.PublicKey) } -// validateSymmetricKey returns false if the key contains all zeros -func validateSymmetricKey(k []byte) bool { - return len(k) > 0 && !containsOnlyZeros(k) +// validateDataIntegrity returns false if the data have the wrong or contains all zeros, +// which is the simplest and the most common bug. +func validateDataIntegrity(k []byte, expectedSize int) bool { + if len(k) != expectedSize { + return false + } + if expectedSize > 3 && containsOnlyZeros(k) { + return false + } + return true } // containsOnlyZeros checks if the data contain only zeros. @@ -830,29 +1018,51 @@ func BytesToUintBigEndian(b []byte) (res uint64) { return res } -// deriveKeyMaterial derives symmetric key material from the key or password. -// pbkdf2 is used for security, in case people use password instead of randomly generated keys. -func deriveKeyMaterial(key []byte, version uint64) (derivedKey []byte, err error) { - if version == 0 { - // kdf should run no less than 0.1 seconds on average compute, - // because it's a once in a session experience - derivedKey := pbkdf2.Key(key, nil, 65356, aesKeyLength, sha256.New) - return derivedKey, nil - } else { - return nil, unknownVersionError(version) - } -} - // GenerateRandomID generates a random string, which is then returned to be used as a key id func GenerateRandomID() (id string, err error) { - buf := make([]byte, keyIdSize) - _, err = crand.Read(buf) + buf, err := generateSecureRandomData(keyIDSize) if err != nil { return "", err } - if !validateSymmetricKey(buf) { + if !validateDataIntegrity(buf, keyIDSize) { return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data") } id = common.Bytes2Hex(buf) return id, err } + +func isFullNode(bloom []byte) bool { + if bloom == nil { + return true + } + for _, b := range bloom { + if b != 255 { + return false + } + } + return true +} + +func bloomFilterMatch(filter, sample []byte) bool { + if filter == nil { + return true + } + + for i := 0; i < bloomFilterSize; i++ { + f := filter[i] + s := sample[i] + if (f | s) != f { + return false + } + } + + return true +} + +func addBloom(a, b []byte) []byte { + c := make([]byte, bloomFilterSize) + for i := 0; i < bloomFilterSize; i++ { + c[i] = a[i] | b[i] + } + return c +}