From b9a6e71617b02d878187c12362194f70ec9689f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 13:24:43 +0300 Subject: [PATCH] whisper: general cleanups, documentation --- envelope.go | 12 +-- filter.go | 11 ++- peer.go | 4 - whisper.go | 270 +++++++++++++++++++++++++++++----------------------- 4 files changed, 157 insertions(+), 140 deletions(-) diff --git a/envelope.go b/envelope.go index 9daaf64..0a817e2 100644 --- a/envelope.go +++ b/envelope.go @@ -24,7 +24,7 @@ type Envelope struct { Data []byte Nonce uint32 - hash common.Hash + hash common.Hash // Cached hash of the envelope to avoid rehashing every time } // NewEnvelope wraps a Whisper message with expiration and destination data @@ -59,16 +59,6 @@ func (self *Envelope) Seal(pow time.Duration) { } } -// valid checks whether the claimed proof of work was indeed executed. -// TODO: Is this really useful? Isn't this always true? -func (self *Envelope) valid() bool { - d := make([]byte, 64) - copy(d[:32], self.rlpWithoutNonce()) - binary.BigEndian.PutUint32(d[60:], self.Nonce) - - return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0 -} - // rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce. func (self *Envelope) rlpWithoutNonce() []byte { enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data}) diff --git a/filter.go b/filter.go index 7258de3..8fcc45a 100644 --- a/filter.go +++ b/filter.go @@ -1,10 +1,13 @@ +// Contains the message filter for fine grained subscriptions. + package whisper import "crypto/ecdsa" +// Filter is used to subscribe to specific types of whisper messages. type Filter struct { - To *ecdsa.PublicKey - From *ecdsa.PublicKey - Topics []Topic - Fn func(*Message) + To *ecdsa.PublicKey // Recipient of the message + From *ecdsa.PublicKey // Sender of the message + Topics []Topic // Topics to watch messages on + Fn func(*Message) // Handler in case of a match } diff --git a/peer.go b/peer.go index 338166c..e50c9ec 100644 --- a/peer.go +++ b/peer.go @@ -9,10 +9,6 @@ import ( "gopkg.in/fatih/set.v0" ) -const ( - protocolVersion uint64 = 0x02 -) - type peer struct { host *Whisper peer *p2p.Peer diff --git a/whisper.go b/whisper.go index a4ec943..f51f14a 100644 --- a/whisper.go +++ b/whisper.go @@ -2,7 +2,6 @@ package whisper import ( "crypto/ecdsa" - "errors" "sync" "time" @@ -17,12 +16,16 @@ import ( ) const ( - statusMsg = 0x0 - envelopesMsg = 0x01 - whisperVersion = 0x02 + statusMsg = 0x00 + envelopesMsg = 0x01 + + protocolVersion uint64 = 0x02 + protocolName = "shh" signatureFlag = byte(1 << 7) signatureLength = 65 + + expirationTicks = 800 * time.Millisecond ) const ( @@ -42,9 +45,9 @@ type Whisper struct { protocol p2p.Protocol filters *filter.Filters - mmu sync.RWMutex - messages map[common.Hash]*Envelope - expiry map[uint32]*set.SetNonTS + mmu sync.RWMutex // Message mutex to sync the below pool + messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node + expiry map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter) quit chan struct{} @@ -63,8 +66,8 @@ func New() *Whisper { // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ - Name: "shh", - Version: uint(whisperVersion), + Name: protocolName, + Version: uint(protocolVersion), Length: 2, Run: whisper.msgHandler, } @@ -72,10 +75,64 @@ func New() *Whisper { return whisper } +// Protocol returns the whisper sub-protocol handler for this particular client. +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol +} + +// Version returns the whisper sub-protocols version number. func (self *Whisper) Version() uint { return self.protocol.Version } +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. +func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key + + return key +} + +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. +func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { + return self.keys[string(crypto.FromECDSAPub(key))] != nil +} + +// GetIdentity retrieves the private key of the specified public identity. +func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { + return self.keys[string(crypto.FromECDSAPub(key))] +} + +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(options Filter) int { + filter := filter.Generic{ + Str1: string(crypto.FromECDSAPub(options.To)), + Str2: string(crypto.FromECDSAPub(options.From)), + Data: NewTopicSet(options.Topics), + Fn: func(data interface{}) { + options.Fn(data.(*Message)) + }, + } + return self.filters.Install(filter) +} + +// Unwatch removes an installed message handler. +func (self *Whisper) Unwatch(id int) { + self.filters.Uninstall(id) +} + +// Send injects a message into the whisper send queue, to be distributed in the +// network in the coming cycles. +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) +} + func (self *Whisper) Start() { glog.V(logger.Info).Infoln("Whisper started") go self.update() @@ -83,29 +140,7 @@ func (self *Whisper) Start() { func (self *Whisper) Stop() { close(self.quit) -} - -func (self *Whisper) Send(envelope *Envelope) error { - return self.add(envelope) -} - -func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { - key, err := crypto.GenerateKey() - if err != nil { - panic(err) - } - - self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key - - return key -} - -func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { - return self.keys[string(crypto.FromECDSAPub(key))] != nil -} - -func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { - return self.keys[string(crypto.FromECDSAPub(key))] + glog.V(logger.Info).Infoln("Whisper stopped") } // func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { @@ -117,22 +152,7 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { // return false // } -func (self *Whisper) Watch(opts Filter) int { - return self.filters.Install(filter.Generic{ - Str1: string(crypto.FromECDSAPub(opts.To)), - Str2: string(crypto.FromECDSAPub(opts.From)), - Data: NewTopicSet(opts.Topics), - Fn: func(data interface{}) { - opts.Fn(data.(*Message)) - }, - }) -} - -func (self *Whisper) Unwatch(id int) { - self.filters.Uninstall(id) -} - -func (self *Whisper) Messages(id int) (messages []*Message) { +/*func (self *Whisper) Messages(id int) (messages []*Message) { filter := self.filters.Get(id) if filter != nil { for _, e := range self.messages { @@ -146,6 +166,36 @@ func (self *Whisper) Messages(id int) (messages []*Message) { } return +}*/ + +// add inserts a new envelope into the message pool to be distributed within the +// whisper network. It also inserts the envelope into the expiration pool at the +// appropriate time-stamp. +func (self *Whisper) add(envelope *Envelope) error { + self.mmu.Lock() + defer self.mmu.Unlock() + + // Insert the message into the tracked pool + hash := envelope.Hash() + if _, ok := self.messages[hash]; ok { + glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope) + return nil + } + self.messages[hash] = envelope + + // Insert the message into the expiration pool for later removal + if self.expiry[envelope.Expiry] == nil { + self.expiry[envelope.Expiry] = set.NewNonTS() + } + if !self.expiry[envelope.Expiry].Has(hash) { + self.expiry[envelope.Expiry].Add(hash) + + // Notify the local node of a message arrival + go self.postEvent(envelope) + } + glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope) + + return nil } // Main handler for passing whisper messages to whisper peer objects @@ -182,79 +232,6 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { } } -// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. -func (self *Whisper) add(envelope *Envelope) error { - if !envelope.valid() { - return errors.New("invalid pow provided for envelope") - } - - self.mmu.Lock() - defer self.mmu.Unlock() - - hash := envelope.Hash() - self.messages[hash] = envelope - if self.expiry[envelope.Expiry] == nil { - self.expiry[envelope.Expiry] = set.NewNonTS() - } - - if !self.expiry[envelope.Expiry].Has(hash) { - self.expiry[envelope.Expiry].Add(hash) - go self.postEvent(envelope) - } - glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope) - - return nil -} - -func (self *Whisper) update() { - expire := time.NewTicker(800 * time.Millisecond) -out: - for { - select { - case <-expire.C: - self.expire() - case <-self.quit: - break out - } - } -} - -func (self *Whisper) expire() { - self.mmu.Lock() - defer self.mmu.Unlock() - - now := uint32(time.Now().Unix()) - for then, hashSet := range self.expiry { - if then > now { - continue - } - - hashSet.Each(func(v interface{}) bool { - delete(self.messages, v.(common.Hash)) - return true - }) - self.expiry[then].Clear() - } -} - -func (self *Whisper) envelopes() (envelopes []*Envelope) { - self.mmu.RLock() - defer self.mmu.RUnlock() - - envelopes = make([]*Envelope, len(self.messages)) - i := 0 - for _, envelope := range self.messages { - envelopes[i] = envelope - i++ - } - - return -} - -func (self *Whisper) Protocol() p2p.Protocol { - return self.protocol -} - // postEvent opens an envelope with the configured identities and delivers the // message upstream from application processing. func (self *Whisper) postEvent(envelope *Envelope) { @@ -293,3 +270,54 @@ func createFilter(message *Message, topics []Topic) filter.Filter { Data: NewTopicSet(topics), } } + +// update loops until the lifetime of the whisper node, updating its internal +// state by expiring stale messages from the pool. +func (self *Whisper) update() { + // Start a ticker to check for expirations + expire := time.NewTicker(expirationTicks) + + // Repeat updates until termination is requested + for { + select { + case <-expire.C: + self.expire() + + case <-self.quit: + return + } + } +} + +// expire iterates over all the expiration timestamps, removing all stale +// messages from the pools. +func (self *Whisper) expire() { + self.mmu.Lock() + defer self.mmu.Unlock() + + now := uint32(time.Now().Unix()) + for then, hashSet := range self.expiry { + // Short circuit if a future time + if then > now { + continue + } + // Dump all expired messages and remove timestamp + hashSet.Each(func(v interface{}) bool { + delete(self.messages, v.(common.Hash)) + return true + }) + self.expiry[then].Clear() + } +} + +// envelopes retrieves all the messages currently pooled by the node. +func (self *Whisper) envelopes() []*Envelope { + self.mmu.RLock() + defer self.mmu.RUnlock() + + envelopes := make([]*Envelope, 0, len(self.messages)) + for _, envelope := range self.messages { + envelopes = append(envelopes, envelope) + } + return envelopes +}