Clean up whisper log delivery (#555)

This commit is contained in:
Adam Babik 2018-01-19 15:53:16 +01:00 committed by GitHub
parent 0771e7d1b7
commit c153a60dc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 17 additions and 736 deletions

View File

@ -486,7 +486,7 @@ func (s *ManagerTestSuite) TestNodeStartCrash() {
s.NoError(err) s.NoError(err)
// start node outside the manager (on the same port), so that manager node.Start() method fails // start node outside the manager (on the same port), so that manager node.Start() method fails
outsideNode, err := node.MakeNode(nodeConfig, node.LogDeliveryService{}) outsideNode, err := node.MakeNode(nodeConfig)
s.NoError(err) s.NoError(err)
err = outsideNode.Start() err = outsideNode.Start()
s.NoError(err) s.NoError(err)

View File

@ -1,405 +0,0 @@
diff --git a/common/message/message.go b/common/message/message.go
new file mode 100644
index 00000000..c9f8965c
--- /dev/null
+++ b/common/message/message.go
@@ -0,0 +1,65 @@
+package message
+
+// Direction defines a int type to indicate a message as either incoming or outgoing.
+type Direction int
+
+// consts of all message direction values.
+const (
+ IncomingMessage Direction = iota + 1
+ OutgoingMessage
+)
+
+// String returns the representation of giving direction.
+func (d Direction) String() string {
+ switch d {
+ case IncomingMessage:
+ return "IncomingMessage"
+ case OutgoingMessage:
+ return "OutgoingMessage"
+ }
+
+ return "MessageDirectionUnknown"
+}
+
+// Status defines a int type to indicate different status value of a
+// message state.
+type Status int
+
+// consts of all message delivery status.
+const (
+ PendingStatus Status = iota + 1
+ QueuedStatus
+ CachedStatus
+ SentStatus
+ ExpiredStatus
+ ProcessingStatus
+ ResentStatus
+ RejectedStatus
+ DeliveredStatus
+)
+
+// String returns the representation of giving state.
+func (s Status) String() string {
+ switch s {
+ case PendingStatus:
+ return "Pending"
+ case QueuedStatus:
+ return "Queued"
+ case CachedStatus:
+ return "Cached"
+ case SentStatus:
+ return "Sent"
+ case ProcessingStatus:
+ return "Processing"
+ case ExpiredStatus:
+ return "ExpiredTTL"
+ case ResentStatus:
+ return "Resent"
+ case RejectedStatus:
+ return "Rejected"
+ case DeliveredStatus:
+ return "Delivered"
+ }
+
+ return "unknown"
+}
diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go
index e3c2f4a9..75ef8b66 100644
--- a/whisper/whisperv5/api.go
+++ b/whisper/whisperv5/api.go
@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -238,13 +239,17 @@ type newMessageOverride struct {
// Post a message on the Whisper network.
func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) {
var (
- symKeyGiven = len(req.SymKeyID) > 0
- pubKeyGiven = len(req.PublicKey) > 0
- err error
+ symKeyGiven = len(req.SymKeyID) > 0
+ pubKeyGiven = len(req.PublicKey) > 0
+ isP2PMessage = len(req.TargetPeer) > 0
+ err error
)
+ api.w.traceOutgoingDelivery(isP2PMessage, message.PendingStatus, &req, nil, nil, nil)
+
// user must specify either a symmetric or an asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrSymAsym)
return false, ErrSymAsym
}
@@ -260,6 +265,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set key that is used to sign the message
if len(req.Sig) > 0 {
if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
}
@@ -267,12 +273,15 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrNoTopics)
return false, ErrNoTopics
}
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
if !validateSymmetricKey(params.KeySym) {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidSymmetricKey)
return false, ErrInvalidSymmetricKey
}
}
@@ -281,6 +290,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidPublicKey)
return false, ErrInvalidPublicKey
}
}
@@ -288,11 +298,13 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// encrypt and sent message
whisperMsg, err := NewSentMessage(params)
if err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
env, err := whisperMsg.Wrap(params)
if err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
@@ -300,16 +312,28 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if len(req.TargetPeer) > 0 {
n, err := discover.ParseNode(req.TargetPeer)
if err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
return false, fmt.Errorf("failed to parse target peer: %s", err)
}
- return true, api.w.SendP2PMessage(n.ID[:], env)
+
+ api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
+
+ if err := api.w.SendP2PMessage(n.ID[:], env); err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
+ return true, err
+ }
+
+ api.w.traceOutgoingDelivery(isP2PMessage, message.DeliveredStatus, &req, env, nil, err)
+ return true, nil
}
// ensure that the message PoW meets the node's minimum accepted PoW
if req.PowTarget < api.w.MinPow() {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, ErrTooLowPoW)
return false, ErrTooLowPoW
}
+ api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
return true, api.w.Send(env)
}
diff --git a/whisper/whisperv5/doc.go b/whisper/whisperv5/doc.go
index a6c9e610..b3bc9963 100644
--- a/whisper/whisperv5/doc.go
+++ b/whisper/whisperv5/doc.go
@@ -33,6 +33,7 @@ import (
"fmt"
"time"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/p2p"
)
@@ -99,3 +100,23 @@ type NotificationServer interface {
// Stop stops notification sending loop, releasing related resources
Stop() error
}
+
+// MessageState holds the current delivery status of a whisper p2p message.
+type MessageState struct {
+ IsP2P bool `json:"is_p2p"`
+ Reason error `json:"reason"`
+ Envelope Envelope `json:"envelope"`
+ Timestamp time.Time `json:"timestamp"`
+ Source NewMessage `json:"source"`
+ Status message.Status `json:"status"`
+ Direction message.Direction `json:"direction"`
+ Received ReceivedMessage `json:"received"`
+}
+
+// DeliveryServer represents a small message status
+// notification system where a message delivery status
+// update event is delivered to it's underline system
+// for both rpc messages and p2p messages.
+type DeliveryServer interface {
+ SendState(MessageState)
+}
diff --git a/whisper/whisperv5/filter.go b/whisper/whisperv5/filter.go
index b5e893e0..71dc3b2d 100644
--- a/whisper/whisperv5/filter.go
+++ b/whisper/whisperv5/filter.go
@@ -18,10 +18,12 @@ package whisperv5
import (
"crypto/ecdsa"
+ "errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
@@ -115,15 +117,20 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match {
msg = env.Open(watcher)
if msg == nil {
+ err := errors.New("Envelope failed to be opened")
+ fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
}
} else {
+ err := errors.New("processing message: does not match")
+ fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
}
}
if match && msg != nil {
log.Trace("processing message: decrypted", "hash", env.Hash().Hex())
+ fs.whisper.traceIncomingDelivery(p2pMessage, message.DeliveredStatus, nil, env, msg, nil)
if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) {
watcher.Trigger(msg)
}
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go
index d1ef2445..58d0fd6e 100644
--- a/whisper/whisperv5/whisper.go
+++ b/whisper/whisperv5/whisper.go
@@ -27,6 +27,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
@@ -77,7 +78,8 @@ type Whisper struct {
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
- mailServer MailServer // MailServer interface
+ mailServer MailServer // MailServer interface
+ deliveryServer DeliveryServer // DeliveryServer interface
notificationServer NotificationServer
}
@@ -157,6 +159,11 @@ func (w *Whisper) RegisterServer(server MailServer) {
w.mailServer = server
}
+// RegisterDeliveryServer registers notification server with Whisper
+func (w *Whisper) RegisterDeliveryServer(server DeliveryServer) {
+ w.deliveryServer = server
+}
+
// RegisterNotificationServer registers notification server with Whisper
func (w *Whisper) RegisterNotificationServer(server NotificationServer) {
w.notificationServer = server
@@ -620,8 +627,11 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var envelope Envelope
if err := packet.Decode(&envelope); err != nil {
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &envelope, nil, err)
return errors.New("invalid direct message")
}
+
+ wh.traceIncomingDelivery(true, message.SentStatus, nil, &envelope, nil, nil)
wh.postEvent(&envelope, true)
}
case p2pRequestCode:
@@ -630,6 +640,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var request Envelope
if err := packet.Decode(&request); err != nil {
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &request, nil, err)
return errors.New("invalid p2p request")
}
wh.mailServer.DeliverMail(p, &request)
@@ -700,16 +711,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
if !wh.expirations[envelope.Expiry].Has(hash) {
wh.expirations[envelope.Expiry].Add(hash)
}
+
+ wh.traceIncomingDelivery(false, message.CachedStatus, nil, envelope, nil, nil)
}
wh.poolMu.Unlock()
if alreadyCached {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
+ wh.traceIncomingDelivery(false, message.ResentStatus, nil, envelope, nil, nil)
} else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
wh.statsMu.Lock()
wh.stats.memoryUsed += envelope.size()
wh.statsMu.Unlock()
+
+ wh.traceIncomingDelivery(false, message.QueuedStatus, nil, envelope, nil, nil)
+
wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil {
wh.mailServer.Archive(envelope)
@@ -718,6 +735,47 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return true, nil
}
+func (w *Whisper) traceIncomingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
+ w.traceDelivery(isP2P, message.IncomingMessage, status, src, env, rec, err)
+}
+
+func (w *Whisper) traceOutgoingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
+ w.traceDelivery(isP2P, message.OutgoingMessage, status, src, env, rec, err)
+}
+
+func (w *Whisper) traceDelivery(isP2P bool, dir message.Direction, status message.Status, newmsg *NewMessage, envelope *Envelope, received *ReceivedMessage, err error) {
+ if w.deliveryServer == nil {
+ return
+ }
+
+ var env Envelope
+ var rec ReceivedMessage
+ var src NewMessage
+
+ if newmsg != nil {
+ src = *newmsg
+ }
+
+ if envelope != nil {
+ env = *envelope
+ }
+
+ if received != nil {
+ rec = *received
+ }
+
+ go w.deliveryServer.SendState(MessageState{
+ Reason: err,
+ Source: src,
+ Received: rec,
+ IsP2P: isP2P,
+ Status: status,
+ Envelope: env,
+ Direction: dir,
+ Timestamp: time.Now(),
+ })
+}
+
// postEvent queues the message for further processing.
func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
// if the version of incoming message is higher than
@@ -730,6 +788,13 @@ func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
w.checkOverflow()
w.messageQueue <- envelope
}
+
+ return
+ }
+
+ if w.deliveryServer != nil {
+ err := fmt.Errorf("Mismatch Envelope version(%d) to wanted Version(%d)", envelope.Ver(), EnvelopeVersion)
+ w.traceIncomingDelivery(isP2P, message.RejectedStatus, nil, envelope, nil, err)
}
}
@@ -759,9 +824,11 @@ func (w *Whisper) processQueue() {
return
case e = <-w.messageQueue:
+ w.traceIncomingDelivery(false, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, false)
case e = <-w.p2pMsgQueue:
+ w.traceIncomingDelivery(true, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, true)
}
}

View File

@ -13,7 +13,6 @@ We try to minimize number and amount of changes in those patches as much as poss
- `0002-les-api-status.patch` — adds StatusBackend into LES code (need to be inspected, some things can and should be done outside of les code) - `0002-les-api-status.patch` — adds StatusBackend into LES code (need to be inspected, some things can and should be done outside of les code)
- `0003-dockerfiles-wnode-swarm.patch` — adds Dockerfiles (who uses this?) - `0003-dockerfiles-wnode-swarm.patch` — adds Dockerfiles (who uses this?)
- `0004-whisper-notifications.patch` — adds Whisper notifications (need to be reviewed and documented) - `0004-whisper-notifications.patch` — adds Whisper notifications (need to be reviewed and documented)
- `0005-whisper-delivery.patch` - adds support for logs/traces of Whisper traffic (questionable, nobody used this functionality so far)
- `0006-latest-cht.patch` updates CHT root hashes, should be updated regularly to keep sync fast, until proper Trusted Checkpoint sync is not implemented as part of LES/2 protocol. - `0006-latest-cht.patch` updates CHT root hashes, should be updated regularly to keep sync fast, until proper Trusted Checkpoint sync is not implemented as part of LES/2 protocol.
- `0007-README.patch` — update upstream README.md. - `0007-README.patch` — update upstream README.md.
- `0008-tx-pool-nonce.patch` - On GetTransactionCount request with PendingBlockNumber get the nonce from transaction pool - `0008-tx-pool-nonce.patch` - On GetTransactionCount request with PendingBlockNumber get the nonce from transaction pool
@ -38,7 +37,6 @@ git clone https://github.com/ethereum/go-ethereum
# update remote url to point to our fork repo # update remote url to point to our fork repo
git remote set-url origin git@github.com:status-im/go-ethereum.git git remote set-url origin git@github.com:status-im/go-ethereum.git
``` ```
### Using existing fork repo (recommended) ### Using existing fork repo (recommended)

View File

@ -44,34 +44,6 @@ func (k *SelectedExtKey) Hex() string {
return k.Address.Hex() return k.Address.Hex()
} }
// MessageState defines a struct to hold given facts about a message stat.
type MessageState struct {
// Type defines Direction type: IncomingMessage or OutgoingMessage.
Type string `json:"type"`
// Protocol defines means of transmission in whisper: RPC or P2P.
Protocol string `json:"protocol"`
// Status defines current status of message: Pending, Delivered, Rejected, etc.
Status string `json:"status"`
// Envelope struct holding encrypted message.
Envelope []byte `json:"envelope"`
// Time in of sent time of message.
TimeSent uint32 `json:"time,omitempty"`
// Received defines time when delivery notification was received
Received time.Time `json:"received"`
// Payload associated with envelope.
Payload []byte `json:"payload,omitempty"`
// Hash defines the Envelope's hash
Hash string `json:"envelope_hash"`
// FromDevice defines the device sending message if value is extractable.
FromDevice string `json:"from_device,omitempty"`
// ToDevice defines the receiving message if value is extractable.
ToDevice string `json:"to_device,omitempty"`
// RejectionError defines the error message when message ending with a Rejected status.
RejectionError string `json:"rejection_reason,omitempty"`
// Source of message when type is Outgoing which contains raw rpc data.
Source whisper.NewMessage `json:"source,omitempty"`
}
// NodeManager defines expected methods for managing Status node // NodeManager defines expected methods for managing Status node
type NodeManager interface { type NodeManager interface {
// StartNode start Status node, fails if node is already started // StartNode start Status node, fails if node is already started

View File

@ -1,78 +0,0 @@
package node
import (
"encoding/base64"
"encoding/json"
gethcommon "github.com/ethereum/go-ethereum/common"
gethmessage "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log"
)
// LogDeliveryService implements the whisper.DeliveryServer which logs out
// stats of whisper.MessageState to the log.
type LogDeliveryService struct{}
// SendState logs incoming whisper.MesssageState into the log.
func (ld LogDeliveryService) SendState(state whisper.MessageState) {
var stat common.MessageState
var protocol string
var payload []byte
var from, to string
if state.IsP2P {
protocol = "P2P"
} else {
protocol = "RPC"
}
switch state.Direction {
case gethmessage.IncomingMessage:
payload = state.Received.Payload
if state.Received.Src != nil {
from = gethcommon.ToHex(crypto.FromECDSAPub(state.Received.Src))
}
if state.Received.Dst != nil {
to = gethcommon.ToHex(crypto.FromECDSAPub(state.Received.Dst))
}
case gethmessage.OutgoingMessage:
from = state.Source.Sig
if len(state.Source.PublicKey) == 0 {
to = string(state.Source.PublicKey)
} else {
to = state.Source.TargetPeer
}
}
if state.Reason != nil {
stat.RejectionError = state.Reason.Error()
}
stat.Protocol = protocol
stat.Payload = payload
stat.FromDevice = from
stat.ToDevice = to
stat.Received = state.Timestamp
stat.Source = state.Source
stat.Envelope = state.Envelope.Data
stat.Status = state.Status.String()
stat.Type = state.Direction.String()
stat.Hash = state.Envelope.Hash().String()
stat.TimeSent = state.Envelope.Expiry - state.Envelope.TTL
statdata, err := json.Marshal(stat)
if err != nil {
log.Warn("failed to marshal common.MessageStat", "err", err)
return
}
encodedStat := base64.StdEncoding.EncodeToString(statdata)
log.Debug("Message delivery notification", "state", encodedStat)
}

View File

@ -70,7 +70,7 @@ func (m *NodeManager) startNode(config *params.NodeConfig) (<-chan struct{}, err
m.initLog(config) m.initLog(config)
ethNode, err := MakeNode(config, LogDeliveryService{}) ethNode, err := MakeNode(config)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -38,7 +38,7 @@ var (
) )
// MakeNode create a geth node entity // MakeNode create a geth node entity
func MakeNode(config *params.NodeConfig, deliveryServer whisper.DeliveryServer) (*node.Node, error) { func MakeNode(config *params.NodeConfig) (*node.Node, error) {
// make sure data directory exists // make sure data directory exists
if err := os.MkdirAll(filepath.Join(config.DataDir), os.ModePerm); err != nil { if err := os.MkdirAll(filepath.Join(config.DataDir), os.ModePerm); err != nil {
return nil, err return nil, err
@ -76,7 +76,7 @@ func MakeNode(config *params.NodeConfig, deliveryServer whisper.DeliveryServer)
} }
// start Whisper service // start Whisper service
if err := activateShhService(stack, config, deliveryServer); err != nil { if err := activateShhService(stack, config); err != nil {
return nil, fmt.Errorf("%v: %v", ErrWhisperServiceRegistrationFailure, err) return nil, fmt.Errorf("%v: %v", ErrWhisperServiceRegistrationFailure, err)
} }
@ -156,7 +156,7 @@ func activateEthService(stack *node.Node, config *params.NodeConfig) error {
} }
// activateShhService configures Whisper and adds it to the given node. // activateShhService configures Whisper and adds it to the given node.
func activateShhService(stack *node.Node, config *params.NodeConfig, deliveryServer whisper.DeliveryServer) error { func activateShhService(stack *node.Node, config *params.NodeConfig) error {
if !config.WhisperConfig.Enabled { if !config.WhisperConfig.Enabled {
log.Info("SHH protocol is disabled") log.Info("SHH protocol is disabled")
return nil return nil
@ -166,10 +166,6 @@ func activateShhService(stack *node.Node, config *params.NodeConfig, deliverySer
whisperConfig := config.WhisperConfig whisperConfig := config.WhisperConfig
whisperService := whisper.New(nil) whisperService := whisper.New(nil)
if deliveryServer != nil {
whisperService.RegisterDeliveryServer(deliveryServer)
}
// enable mail service // enable mail service
if whisperConfig.EnableMailServer { if whisperConfig.EnableMailServer {
if whisperConfig.Password == "" { if whisperConfig.Password == "" {

View File

@ -1,3 +1,8 @@
# Go Ethereum (Status fork)
This is a forked version of the official `go-ethereum` repository. For detailed information on patches applied, see [https://github.com/status-im/status-go/geth-patches/](https://github.com/status-im/status-go/geth-patches/).
# Original README
## Go Ethereum ## Go Ethereum
Official golang implementation of the Ethereum protocol. Official golang implementation of the Ethereum protocol.

View File

@ -98,8 +98,6 @@ var (
argEnode = flag.String("boot", "", "bootstrap node you want to connect to (e.g. enode://e454......08d50@52.176.211.200:16428)") argEnode = flag.String("boot", "", "bootstrap node you want to connect to (e.g. enode://e454......08d50@52.176.211.200:16428)")
argTopic = flag.String("topic", "", "topic in hexadecimal format (e.g. 70a4beef)") argTopic = flag.String("topic", "", "topic in hexadecimal format (e.g. 70a4beef)")
argSaveDir = flag.String("savedir", "", "directory where incoming messages will be saved as files") argSaveDir = flag.String("savedir", "", "directory where incoming messages will be saved as files")
argSymPass = flag.String("sympass", "", "SymKey password")
argMsPass = flag.String("mspass", "", "Mailserver password")
) )
func main() { func main() {
@ -148,13 +146,6 @@ func processArgs() {
} else if *fileExMode { } else if *fileExMode {
utils.Fatalf("Parameter 'savedir' is mandatory for file exchange mode") utils.Fatalf("Parameter 'savedir' is mandatory for file exchange mode")
} }
if len(*argSymPass) > 0 {
symPass = *argSymPass
}
if len(*argMsPass) > 0 {
msPassword = *argMsPass
}
if *echoMode { if *echoMode {
echo() echo()
@ -424,24 +415,10 @@ func run() {
} else if *fileExMode { } else if *fileExMode {
sendFilesLoop() sendFilesLoop()
} else { } else {
pingLoop() // instead of sendLoop() sendLoop()
} }
} }
func pingLoop() {
ticker := time.NewTicker(time.Second * 120)
for {
select {
case <-ticker.C:
fmt.Println("I am alive: ", time.Now())
case <-done:
return
}
}
}
func sendLoop() { func sendLoop() {
for { for {
s := scanLine("") s := scanLine("")

View File

@ -1,65 +0,0 @@
package message
// Direction defines a int type to indicate a message as either incoming or outgoing.
type Direction int
// consts of all message direction values.
const (
IncomingMessage Direction = iota + 1
OutgoingMessage
)
// String returns the representation of giving direction.
func (d Direction) String() string {
switch d {
case IncomingMessage:
return "IncomingMessage"
case OutgoingMessage:
return "OutgoingMessage"
}
return "MessageDirectionUnknown"
}
// Status defines a int type to indicate different status value of a
// message state.
type Status int
// consts of all message delivery status.
const (
PendingStatus Status = iota + 1
QueuedStatus
CachedStatus
SentStatus
ExpiredStatus
ProcessingStatus
ResentStatus
RejectedStatus
DeliveredStatus
)
// String returns the representation of giving state.
func (s Status) String() string {
switch s {
case PendingStatus:
return "Pending"
case QueuedStatus:
return "Queued"
case CachedStatus:
return "Cached"
case SentStatus:
return "Sent"
case ProcessingStatus:
return "Processing"
case ExpiredStatus:
return "ExpiredTTL"
case ResentStatus:
return "Resent"
case RejectedStatus:
return "Rejected"
case DeliveredStatus:
return "Delivered"
}
return "unknown"
}

View File

@ -26,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
@ -241,15 +240,11 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
var ( var (
symKeyGiven = len(req.SymKeyID) > 0 symKeyGiven = len(req.SymKeyID) > 0
pubKeyGiven = len(req.PublicKey) > 0 pubKeyGiven = len(req.PublicKey) > 0
isP2PMessage = len(req.TargetPeer) > 0
err error err error
) )
api.w.traceOutgoingDelivery(isP2PMessage, message.PendingStatus, &req, nil, nil, nil)
// user must specify either a symmetric or an asymmetric key // user must specify either a symmetric or an asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrSymAsym)
return false, ErrSymAsym return false, ErrSymAsym
} }
@ -265,7 +260,6 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set key that is used to sign the message // Set key that is used to sign the message
if len(req.Sig) > 0 { if len(req.Sig) > 0 {
if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil { if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err return false, err
} }
} }
@ -273,15 +267,12 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set symmetric key that is used to encrypt the message // Set symmetric key that is used to encrypt the message
if symKeyGiven { if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrNoTopics)
return false, ErrNoTopics return false, ErrNoTopics
} }
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err return false, err
} }
if !validateSymmetricKey(params.KeySym) { if !validateSymmetricKey(params.KeySym) {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidSymmetricKey)
return false, ErrInvalidSymmetricKey return false, ErrInvalidSymmetricKey
} }
} }
@ -290,7 +281,6 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if pubKeyGiven { if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey) params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) { if !ValidatePublicKey(params.Dst) {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidPublicKey)
return false, ErrInvalidPublicKey return false, ErrInvalidPublicKey
} }
} }
@ -298,13 +288,11 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// encrypt and sent message // encrypt and sent message
whisperMsg, err := NewSentMessage(params) whisperMsg, err := NewSentMessage(params)
if err != nil { if err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err return false, err
} }
env, err := whisperMsg.Wrap(params) env, err := whisperMsg.Wrap(params)
if err != nil { if err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err return false, err
} }
@ -312,28 +300,16 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if len(req.TargetPeer) > 0 { if len(req.TargetPeer) > 0 {
n, err := discover.ParseNode(req.TargetPeer) n, err := discover.ParseNode(req.TargetPeer)
if err != nil { if err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
return false, fmt.Errorf("failed to parse target peer: %s", err) return false, fmt.Errorf("failed to parse target peer: %s", err)
} }
return true, api.w.SendP2PMessage(n.ID[:], env)
api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
if err := api.w.SendP2PMessage(n.ID[:], env); err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
return true, err
}
api.w.traceOutgoingDelivery(isP2PMessage, message.DeliveredStatus, &req, env, nil, err)
return true, nil
} }
// ensure that the message PoW meets the node's minimum accepted PoW // ensure that the message PoW meets the node's minimum accepted PoW
if req.PowTarget < api.w.MinPow() { if req.PowTarget < api.w.MinPow() {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, ErrTooLowPoW)
return false, ErrTooLowPoW return false, ErrTooLowPoW
} }
api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
return true, api.w.Send(env) return true, api.w.Send(env)
} }

View File

@ -33,7 +33,6 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
) )
@ -100,23 +99,3 @@ type NotificationServer interface {
// Stop stops notification sending loop, releasing related resources // Stop stops notification sending loop, releasing related resources
Stop() error Stop() error
} }
// MessageState holds the current delivery status of a whisper p2p message.
type MessageState struct {
IsP2P bool `json:"is_p2p"`
Reason error `json:"reason"`
Envelope Envelope `json:"envelope"`
Timestamp time.Time `json:"timestamp"`
Source NewMessage `json:"source"`
Status message.Status `json:"status"`
Direction message.Direction `json:"direction"`
Received ReceivedMessage `json:"received"`
}
// DeliveryServer represents a small message status
// notification system where a message delivery status
// update event is delivered to it's underline system
// for both rpc messages and p2p messages.
type DeliveryServer interface {
SendState(MessageState)
}

View File

@ -18,12 +18,10 @@ package whisperv5
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"errors"
"fmt" "fmt"
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
@ -117,20 +115,15 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match { if match {
msg = env.Open(watcher) msg = env.Open(watcher)
if msg == nil { if msg == nil {
err := errors.New("Envelope failed to be opened")
fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i) log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
} }
} else { } else {
err := errors.New("processing message: does not match")
fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i) log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
} }
} }
if match && msg != nil { if match && msg != nil {
log.Trace("processing message: decrypted", "hash", env.Hash().Hex()) log.Trace("processing message: decrypted", "hash", env.Hash().Hex())
fs.whisper.traceIncomingDelivery(p2pMessage, message.DeliveredStatus, nil, env, msg, nil)
if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) { if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) {
watcher.Trigger(msg) watcher.Trigger(msg)
} }

View File

@ -27,7 +27,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
@ -79,7 +78,6 @@ type Whisper struct {
stats Statistics // Statistics of whisper node stats Statistics // Statistics of whisper node
mailServer MailServer // MailServer interface mailServer MailServer // MailServer interface
deliveryServer DeliveryServer // DeliveryServer interface
notificationServer NotificationServer notificationServer NotificationServer
} }
@ -159,11 +157,6 @@ func (w *Whisper) RegisterServer(server MailServer) {
w.mailServer = server w.mailServer = server
} }
// RegisterDeliveryServer registers notification server with Whisper
func (w *Whisper) RegisterDeliveryServer(server DeliveryServer) {
w.deliveryServer = server
}
// RegisterNotificationServer registers notification server with Whisper // RegisterNotificationServer registers notification server with Whisper
func (w *Whisper) RegisterNotificationServer(server NotificationServer) { func (w *Whisper) RegisterNotificationServer(server NotificationServer) {
w.notificationServer = server w.notificationServer = server
@ -627,11 +620,8 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var envelope Envelope var envelope Envelope
if err := packet.Decode(&envelope); err != nil { if err := packet.Decode(&envelope); err != nil {
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &envelope, nil, err)
return errors.New("invalid direct message") return errors.New("invalid direct message")
} }
wh.traceIncomingDelivery(true, message.SentStatus, nil, &envelope, nil, nil)
wh.postEvent(&envelope, true) wh.postEvent(&envelope, true)
} }
case p2pRequestCode: case p2pRequestCode:
@ -640,7 +630,6 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var request Envelope var request Envelope
if err := packet.Decode(&request); err != nil { if err := packet.Decode(&request); err != nil {
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &request, nil, err)
return errors.New("invalid p2p request") return errors.New("invalid p2p request")
} }
wh.mailServer.DeliverMail(p, &request) wh.mailServer.DeliverMail(p, &request)
@ -711,22 +700,16 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
if !wh.expirations[envelope.Expiry].Has(hash) { if !wh.expirations[envelope.Expiry].Has(hash) {
wh.expirations[envelope.Expiry].Add(hash) wh.expirations[envelope.Expiry].Add(hash)
} }
wh.traceIncomingDelivery(false, message.CachedStatus, nil, envelope, nil, nil)
} }
wh.poolMu.Unlock() wh.poolMu.Unlock()
if alreadyCached { if alreadyCached {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
wh.traceIncomingDelivery(false, message.ResentStatus, nil, envelope, nil, nil)
} else { } else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
wh.statsMu.Lock() wh.statsMu.Lock()
wh.stats.memoryUsed += envelope.size() wh.stats.memoryUsed += envelope.size()
wh.statsMu.Unlock() wh.statsMu.Unlock()
wh.traceIncomingDelivery(false, message.QueuedStatus, nil, envelope, nil, nil)
wh.postEvent(envelope, false) // notify the local node about the new message wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil { if wh.mailServer != nil {
wh.mailServer.Archive(envelope) wh.mailServer.Archive(envelope)
@ -735,47 +718,6 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return true, nil return true, nil
} }
func (w *Whisper) traceIncomingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
w.traceDelivery(isP2P, message.IncomingMessage, status, src, env, rec, err)
}
func (w *Whisper) traceOutgoingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
w.traceDelivery(isP2P, message.OutgoingMessage, status, src, env, rec, err)
}
func (w *Whisper) traceDelivery(isP2P bool, dir message.Direction, status message.Status, newmsg *NewMessage, envelope *Envelope, received *ReceivedMessage, err error) {
if w.deliveryServer == nil {
return
}
var env Envelope
var rec ReceivedMessage
var src NewMessage
if newmsg != nil {
src = *newmsg
}
if envelope != nil {
env = *envelope
}
if received != nil {
rec = *received
}
go w.deliveryServer.SendState(MessageState{
Reason: err,
Source: src,
Received: rec,
IsP2P: isP2P,
Status: status,
Envelope: env,
Direction: dir,
Timestamp: time.Now(),
})
}
// postEvent queues the message for further processing. // postEvent queues the message for further processing.
func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
// if the version of incoming message is higher than // if the version of incoming message is higher than
@ -788,13 +730,6 @@ func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
w.checkOverflow() w.checkOverflow()
w.messageQueue <- envelope w.messageQueue <- envelope
} }
return
}
if w.deliveryServer != nil {
err := fmt.Errorf("Mismatch Envelope version(%d) to wanted Version(%d)", envelope.Ver(), EnvelopeVersion)
w.traceIncomingDelivery(isP2P, message.RejectedStatus, nil, envelope, nil, err)
} }
} }
@ -824,11 +759,9 @@ func (w *Whisper) processQueue() {
return return
case e = <-w.messageQueue: case e = <-w.messageQueue:
w.traceIncomingDelivery(false, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, false) w.filters.NotifyWatchers(e, false)
case e = <-w.p2pMsgQueue: case e = <-w.p2pMsgQueue:
w.traceIncomingDelivery(true, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, true) w.filters.NotifyWatchers(e, true)
} }
} }