mirror of
https://github.com/status-im/op-geth.git
synced 2025-01-15 01:04:11 +00:00
5e4fd8e7db
* swarm/network: Revised depth calculation with tests * swarm/network: WIP remove redundant "full" function * swarm/network: WIP peerpot refactor * swarm/network: Make test methods submethod of peerpot and embed kad * swarm/network: Remove commented out code * swarm/network: Rename health test functions * swarm/network: Too many n's * swarm/network: Change hive Healthy func to accept addresses * swarm/network: Add Healthy proxy method for api in hive * swarm/network: Skip failing test out of scope for PR * swarm/network: Skip all tests dependent on SuggestPeers * swarm/network: Remove commented code and useless kad Pof member * swarm/network: Remove more unused code, add counter on depth test errors * swarm/network: WIP Create Healthy assertion tests * swarm/network: Roll back health related methods receiver change * swarm/network: Hardwire network minproxbinsize in swarm sim * swarm/network: Rework Health test to strict Pending add test for saturation And add test for as many as possible up to saturation * swarm/network: Skip discovery tests (dependent on SuggestPeer) * swarm/network: Remove useless minProxBinSize in stream * swarm/network: Remove unnecessary testing.T param to assert health * swarm/network: Implement t.Helper() in checkHealth * swarm/network: Rename check back to assert now that we have helper magic * swarm/network: Revert WaitTillHealthy change (deferred to nxt PR) * swarm/network: Kademlia tests GotNN => ConnectNN * swarm/network: Renames and comments * swarm/network: Add comments
1078 lines
33 KiB
Go
1078 lines
33 KiB
Go
// Copyright 2018 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package pss
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/rand"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/crypto/sha3"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/ethereum/go-ethereum/p2p/protocols"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
"github.com/ethereum/go-ethereum/swarm/log"
|
|
"github.com/ethereum/go-ethereum/swarm/network"
|
|
"github.com/ethereum/go-ethereum/swarm/pot"
|
|
"github.com/ethereum/go-ethereum/swarm/storage"
|
|
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
|
|
)
|
|
|
|
const (
|
|
defaultPaddingByteSize = 16
|
|
DefaultMsgTTL = time.Second * 120
|
|
defaultDigestCacheTTL = time.Second * 10
|
|
defaultSymKeyCacheCapacity = 512
|
|
digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
|
|
defaultWhisperWorkTime = 3
|
|
defaultWhisperPoW = 0.0000000001
|
|
defaultMaxMsgSize = 1024 * 1024
|
|
defaultCleanInterval = time.Second * 60 * 10
|
|
defaultOutboxCapacity = 100000
|
|
pssProtocolName = "pss"
|
|
pssVersion = 2
|
|
hasherCount = 8
|
|
)
|
|
|
|
var (
|
|
addressLength = len(pot.Address{})
|
|
)
|
|
|
|
// cache is used for preventing backwards routing
|
|
// will also be instrumental in flood guard mechanism
|
|
// and mailbox implementation
|
|
type pssCacheEntry struct {
|
|
expiresAt time.Time
|
|
}
|
|
|
|
// abstraction to enable access to p2p.protocols.Peer.Send
|
|
type senderPeer interface {
|
|
Info() *p2p.PeerInfo
|
|
ID() enode.ID
|
|
Address() []byte
|
|
Send(context.Context, interface{}) error
|
|
}
|
|
|
|
// per-key peer related information
|
|
// member `protected` prevents garbage collection of the instance
|
|
type pssPeer struct {
|
|
lastSeen time.Time
|
|
address PssAddress
|
|
protected bool
|
|
}
|
|
|
|
// Pss configuration parameters
|
|
type PssParams struct {
|
|
MsgTTL time.Duration
|
|
CacheTTL time.Duration
|
|
privateKey *ecdsa.PrivateKey
|
|
SymKeyCacheCapacity int
|
|
AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
|
|
}
|
|
|
|
// Sane defaults for Pss
|
|
func NewPssParams() *PssParams {
|
|
return &PssParams{
|
|
MsgTTL: DefaultMsgTTL,
|
|
CacheTTL: defaultDigestCacheTTL,
|
|
SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
|
|
}
|
|
}
|
|
|
|
func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
|
|
params.privateKey = privatekey
|
|
return params
|
|
}
|
|
|
|
// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
|
|
//
|
|
// Implements node.Service
|
|
type Pss struct {
|
|
*network.Kademlia // we can get the Kademlia address from this
|
|
privateKey *ecdsa.PrivateKey // pss can have it's own independent key
|
|
w *whisper.Whisper // key and encryption backend
|
|
auxAPIs []rpc.API // builtins (handshake, test) can add APIs
|
|
|
|
// sending and forwarding
|
|
fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
|
|
fwdPoolMu sync.RWMutex
|
|
fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
|
|
fwdCacheMu sync.RWMutex
|
|
cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
|
|
msgTTL time.Duration
|
|
paddingByteSize int
|
|
capstring string
|
|
outbox chan *PssMsg
|
|
|
|
// keys and peers
|
|
pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
|
|
pubKeyPoolMu sync.RWMutex
|
|
symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
|
|
symKeyPoolMu sync.RWMutex
|
|
symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
|
|
symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array
|
|
symKeyDecryptCacheCapacity int // max amount of symkeys to keep.
|
|
|
|
// message handling
|
|
handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
|
|
handlersMu sync.RWMutex
|
|
hashPool sync.Pool
|
|
topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go)
|
|
|
|
// process
|
|
quitC chan struct{}
|
|
}
|
|
|
|
func (p *Pss) String() string {
|
|
return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
|
|
}
|
|
|
|
// Creates a new Pss instance.
|
|
//
|
|
// In addition to params, it takes a swarm network Kademlia
|
|
// and a FileStore storage for message cache storage.
|
|
func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
|
|
if params.privateKey == nil {
|
|
return nil, errors.New("missing private key for pss")
|
|
}
|
|
cap := p2p.Cap{
|
|
Name: pssProtocolName,
|
|
Version: pssVersion,
|
|
}
|
|
ps := &Pss{
|
|
Kademlia: k,
|
|
privateKey: params.privateKey,
|
|
w: whisper.New(&whisper.DefaultConfig),
|
|
quitC: make(chan struct{}),
|
|
|
|
fwdPool: make(map[string]*protocols.Peer),
|
|
fwdCache: make(map[pssDigest]pssCacheEntry),
|
|
cacheTTL: params.CacheTTL,
|
|
msgTTL: params.MsgTTL,
|
|
paddingByteSize: defaultPaddingByteSize,
|
|
capstring: cap.String(),
|
|
outbox: make(chan *PssMsg, defaultOutboxCapacity),
|
|
|
|
pubKeyPool: make(map[string]map[Topic]*pssPeer),
|
|
symKeyPool: make(map[string]map[Topic]*pssPeer),
|
|
symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity),
|
|
symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
|
|
|
|
handlers: make(map[Topic]map[*handler]bool),
|
|
topicHandlerCaps: make(map[Topic]*handlerCaps),
|
|
|
|
hashPool: sync.Pool{
|
|
New: func() interface{} {
|
|
return sha3.NewKeccak256()
|
|
},
|
|
},
|
|
}
|
|
|
|
for i := 0; i < hasherCount; i++ {
|
|
hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
|
|
ps.hashPool.Put(hashfunc)
|
|
}
|
|
|
|
return ps, nil
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: node.Service interface
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
func (p *Pss) Start(srv *p2p.Server) error {
|
|
go func() {
|
|
ticker := time.NewTicker(defaultCleanInterval)
|
|
cacheTicker := time.NewTicker(p.cacheTTL)
|
|
defer ticker.Stop()
|
|
defer cacheTicker.Stop()
|
|
for {
|
|
select {
|
|
case <-cacheTicker.C:
|
|
p.cleanFwdCache()
|
|
case <-ticker.C:
|
|
p.cleanKeys()
|
|
case <-p.quitC:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case msg := <-p.outbox:
|
|
err := p.forward(msg)
|
|
if err != nil {
|
|
log.Error(err.Error())
|
|
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
|
|
}
|
|
case <-p.quitC:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
log.Info("Started Pss")
|
|
log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
|
|
return nil
|
|
}
|
|
|
|
func (p *Pss) Stop() error {
|
|
log.Info("Pss shutting down")
|
|
close(p.quitC)
|
|
return nil
|
|
}
|
|
|
|
var pssSpec = &protocols.Spec{
|
|
Name: pssProtocolName,
|
|
Version: pssVersion,
|
|
MaxMsgSize: defaultMaxMsgSize,
|
|
Messages: []interface{}{
|
|
PssMsg{},
|
|
},
|
|
}
|
|
|
|
func (p *Pss) Protocols() []p2p.Protocol {
|
|
return []p2p.Protocol{
|
|
{
|
|
Name: pssSpec.Name,
|
|
Version: pssSpec.Version,
|
|
Length: pssSpec.Length(),
|
|
Run: p.Run,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
|
|
pp := protocols.NewPeer(peer, rw, pssSpec)
|
|
p.fwdPoolMu.Lock()
|
|
p.fwdPool[peer.Info().ID] = pp
|
|
p.fwdPoolMu.Unlock()
|
|
return pp.Run(p.handlePssMsg)
|
|
}
|
|
|
|
func (p *Pss) APIs() []rpc.API {
|
|
apis := []rpc.API{
|
|
{
|
|
Namespace: "pss",
|
|
Version: "1.0",
|
|
Service: NewAPI(p),
|
|
Public: true,
|
|
},
|
|
}
|
|
apis = append(apis, p.auxAPIs...)
|
|
return apis
|
|
}
|
|
|
|
// add API methods to the pss API
|
|
// must be run before node is started
|
|
func (p *Pss) addAPI(api rpc.API) {
|
|
p.auxAPIs = append(p.auxAPIs, api)
|
|
}
|
|
|
|
// Returns the swarm Kademlia address of the pss node
|
|
func (p *Pss) BaseAddr() []byte {
|
|
return p.Kademlia.BaseAddr()
|
|
}
|
|
|
|
// Returns the pss node's public key
|
|
func (p *Pss) PublicKey() *ecdsa.PublicKey {
|
|
return &p.privateKey.PublicKey
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Message handling
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
// Links a handler function to a Topic
|
|
//
|
|
// All incoming messages with an envelope Topic matching the
|
|
// topic specified will be passed to the given Handler function.
|
|
//
|
|
// There may be an arbitrary number of handler functions per topic.
|
|
//
|
|
// Returns a deregister function which needs to be called to
|
|
// deregister the handler,
|
|
func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
|
|
p.handlersMu.Lock()
|
|
defer p.handlersMu.Unlock()
|
|
handlers := p.handlers[*topic]
|
|
if handlers == nil {
|
|
handlers = make(map[*handler]bool)
|
|
p.handlers[*topic] = handlers
|
|
log.Debug("registered handler", "caps", hndlr.caps)
|
|
}
|
|
if hndlr.caps == nil {
|
|
hndlr.caps = &handlerCaps{}
|
|
}
|
|
handlers[hndlr] = true
|
|
if _, ok := p.topicHandlerCaps[*topic]; !ok {
|
|
p.topicHandlerCaps[*topic] = &handlerCaps{}
|
|
}
|
|
if hndlr.caps.raw {
|
|
p.topicHandlerCaps[*topic].raw = true
|
|
}
|
|
if hndlr.caps.prox {
|
|
p.topicHandlerCaps[*topic].prox = true
|
|
}
|
|
return func() { p.deregister(topic, hndlr) }
|
|
}
|
|
func (p *Pss) deregister(topic *Topic, hndlr *handler) {
|
|
p.handlersMu.Lock()
|
|
defer p.handlersMu.Unlock()
|
|
handlers := p.handlers[*topic]
|
|
if len(handlers) > 1 {
|
|
delete(p.handlers, *topic)
|
|
// topic caps might have changed now that a handler is gone
|
|
caps := &handlerCaps{}
|
|
for h := range handlers {
|
|
if h.caps.raw {
|
|
caps.raw = true
|
|
}
|
|
if h.caps.prox {
|
|
caps.prox = true
|
|
}
|
|
}
|
|
p.topicHandlerCaps[*topic] = caps
|
|
return
|
|
}
|
|
delete(handlers, hndlr)
|
|
}
|
|
|
|
// get all registered handlers for respective topics
|
|
func (p *Pss) getHandlers(topic Topic) map[*handler]bool {
|
|
p.handlersMu.RLock()
|
|
defer p.handlersMu.RUnlock()
|
|
return p.handlers[topic]
|
|
}
|
|
|
|
// Filters incoming messages for processing or forwarding.
|
|
// Check if address partially matches
|
|
// If yes, it CAN be for us, and we process it
|
|
// Only passes error to pss protocol handler if payload is not valid pssmsg
|
|
func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
|
|
metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
|
|
pssmsg, ok := msg.(*PssMsg)
|
|
if !ok {
|
|
return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
|
|
}
|
|
log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:]))
|
|
if int64(pssmsg.Expire) < time.Now().Unix() {
|
|
metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
|
|
log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
|
|
return nil
|
|
}
|
|
if p.checkFwdCache(pssmsg) {
|
|
log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
|
|
return nil
|
|
}
|
|
p.addFwdCache(pssmsg)
|
|
|
|
psstopic := Topic(pssmsg.Payload.Topic)
|
|
|
|
// raw is simplest handler contingency to check, so check that first
|
|
var isRaw bool
|
|
if pssmsg.isRaw() {
|
|
if _, ok := p.topicHandlerCaps[psstopic]; ok {
|
|
if !p.topicHandlerCaps[psstopic].raw {
|
|
log.Debug("No handler for raw message", "topic", psstopic)
|
|
return nil
|
|
}
|
|
}
|
|
isRaw = true
|
|
}
|
|
|
|
// check if we can be recipient:
|
|
// - no prox handler on message and partial address matches
|
|
// - prox handler on message and we are in prox regardless of partial address match
|
|
// store this result so we don't calculate again on every handler
|
|
var isProx bool
|
|
if _, ok := p.topicHandlerCaps[psstopic]; ok {
|
|
isProx = p.topicHandlerCaps[psstopic].prox
|
|
}
|
|
isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
|
|
if !isRecipient {
|
|
log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
|
|
return p.enqueue(pssmsg)
|
|
}
|
|
|
|
log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:]))
|
|
if err := p.process(pssmsg, isRaw, isProx); err != nil {
|
|
qerr := p.enqueue(pssmsg)
|
|
if qerr != nil {
|
|
return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
|
|
}
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
// Entry point to processing a message for which the current node can be the intended recipient.
|
|
// Attempts symmetric and asymmetric decryption with stored keys.
|
|
// Dispatches message to all handlers matching the message topic
|
|
func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error {
|
|
metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
|
|
|
|
var err error
|
|
var recvmsg *whisper.ReceivedMessage
|
|
var payload []byte
|
|
var from PssAddress
|
|
var asymmetric bool
|
|
var keyid string
|
|
var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error)
|
|
|
|
envelope := pssmsg.Payload
|
|
psstopic := Topic(envelope.Topic)
|
|
|
|
if raw {
|
|
payload = pssmsg.Payload.Data
|
|
} else {
|
|
if pssmsg.isSym() {
|
|
keyFunc = p.processSym
|
|
} else {
|
|
asymmetric = true
|
|
keyFunc = p.processAsym
|
|
}
|
|
|
|
recvmsg, keyid, from, err = keyFunc(envelope)
|
|
if err != nil {
|
|
return errors.New("Decryption failed")
|
|
}
|
|
payload = recvmsg.Payload
|
|
}
|
|
|
|
if len(pssmsg.To) < addressLength {
|
|
if err := p.enqueue(pssmsg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) {
|
|
handlers := p.getHandlers(topic)
|
|
peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
|
|
for h := range handlers {
|
|
if !h.caps.raw && raw {
|
|
log.Warn("norawhandler")
|
|
continue
|
|
}
|
|
if !h.caps.prox && prox {
|
|
log.Warn("noproxhandler")
|
|
continue
|
|
}
|
|
err := (h.f)(payload, peer, asymmetric, keyid)
|
|
if err != nil {
|
|
log.Warn("Pss handler failed", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// will return false if using partial address
|
|
func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
|
|
return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
|
|
}
|
|
|
|
// test match of leftmost bytes in given message to node's Kademlia address
|
|
func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
|
|
local := p.Kademlia.BaseAddr()
|
|
|
|
// if a partial address matches we are possible recipient regardless of prox
|
|
// if not and prox is not set, we are surely not
|
|
if bytes.Equal(msg.To, local[:len(msg.To)]) {
|
|
|
|
return true
|
|
} else if !prox {
|
|
return false
|
|
}
|
|
|
|
depth := p.Kademlia.NeighbourhoodDepth()
|
|
po, _ := network.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
|
|
log.Trace("selfpossible", "po", po, "depth", depth)
|
|
|
|
return depth <= po
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Encryption
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
// Links a peer ECDSA public key to a topic
|
|
//
|
|
// This is required for asymmetric message exchange
|
|
// on the given topic
|
|
//
|
|
// The value in `address` will be used as a routing hint for the
|
|
// public key / topic association
|
|
func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error {
|
|
if err := validateAddress(address); err != nil {
|
|
return err
|
|
}
|
|
pubkeybytes := crypto.FromECDSAPub(pubkey)
|
|
if len(pubkeybytes) == 0 {
|
|
return fmt.Errorf("invalid public key: %v", pubkey)
|
|
}
|
|
pubkeyid := common.ToHex(pubkeybytes)
|
|
psp := &pssPeer{
|
|
address: address,
|
|
}
|
|
p.pubKeyPoolMu.Lock()
|
|
if _, ok := p.pubKeyPool[pubkeyid]; !ok {
|
|
p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
|
|
}
|
|
p.pubKeyPool[pubkeyid][topic] = psp
|
|
p.pubKeyPoolMu.Unlock()
|
|
log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address)
|
|
return nil
|
|
}
|
|
|
|
// Automatically generate a new symkey for a topic and address hint
|
|
func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) {
|
|
keyid, err := p.w.GenerateSymKey()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
|
|
return keyid, nil
|
|
}
|
|
|
|
// Links a peer symmetric key (arbitrary byte sequence) to a topic
|
|
//
|
|
// This is required for symmetrically encrypted message exchange
|
|
// on the given topic
|
|
//
|
|
// The key is stored in the whisper backend.
|
|
//
|
|
// If addtocache is set to true, the key will be added to the cache of keys
|
|
// used to attempt symmetric decryption of incoming messages.
|
|
//
|
|
// Returns a string id that can be used to retrieve the key bytes
|
|
// from the whisper backend (see pss.GetSymmetricKey())
|
|
func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) {
|
|
if err := validateAddress(address); err != nil {
|
|
return "", err
|
|
}
|
|
return p.setSymmetricKey(key, topic, address, addtocache, true)
|
|
}
|
|
|
|
func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) {
|
|
keyid, err := p.w.AddSymKeyDirect(key)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
|
|
return keyid, nil
|
|
}
|
|
|
|
// adds a symmetric key to the pss key pool, and optionally adds the key
|
|
// to the collection of keys used to attempt symmetric decryption of
|
|
// incoming messages
|
|
func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) {
|
|
psp := &pssPeer{
|
|
address: address,
|
|
protected: protected,
|
|
}
|
|
p.symKeyPoolMu.Lock()
|
|
if _, ok := p.symKeyPool[keyid]; !ok {
|
|
p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
|
|
}
|
|
p.symKeyPool[keyid][topic] = psp
|
|
p.symKeyPoolMu.Unlock()
|
|
if addtocache {
|
|
p.symKeyDecryptCacheCursor++
|
|
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
|
|
}
|
|
key, _ := p.GetSymmetricKey(keyid)
|
|
log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache)
|
|
}
|
|
|
|
// Returns a symmetric key byte seqyence stored in the whisper backend
|
|
// by its unique id
|
|
//
|
|
// Passes on the error value from the whisper backend
|
|
func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
|
|
symkey, err := p.w.GetSymKey(symkeyid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return symkey, nil
|
|
}
|
|
|
|
// Returns all recorded topic and address combination for a specific public key
|
|
func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
|
|
p.pubKeyPoolMu.RLock()
|
|
defer p.pubKeyPoolMu.RUnlock()
|
|
for t, peer := range p.pubKeyPool[keyid] {
|
|
topic = append(topic, t)
|
|
address = append(address, peer.address)
|
|
}
|
|
|
|
return topic, address, nil
|
|
}
|
|
|
|
func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
|
|
p.pubKeyPoolMu.RLock()
|
|
defer p.pubKeyPoolMu.RUnlock()
|
|
if peers, ok := p.pubKeyPool[keyid]; ok {
|
|
if t, ok := peers[topic]; ok {
|
|
return t.address, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
|
|
}
|
|
|
|
// Attempt to decrypt, validate and unpack a
|
|
// symmetrically encrypted message
|
|
// If successful, returns the unpacked whisper ReceivedMessage struct
|
|
// encapsulating the decrypted message, and the whisper backend id
|
|
// of the symmetric key used to decrypt the message.
|
|
// It fails if decryption of the message fails or if the message is corrupted
|
|
func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
|
|
metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
|
|
|
|
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
|
|
symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
|
|
symkey, err := p.w.GetSymKey(*symkeyid)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
recvmsg, err := envelope.OpenSymmetric(symkey)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if !recvmsg.Validate() {
|
|
return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
|
|
}
|
|
p.symKeyPoolMu.Lock()
|
|
from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
|
|
p.symKeyPoolMu.Unlock()
|
|
p.symKeyDecryptCacheCursor++
|
|
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
|
|
return recvmsg, *symkeyid, from, nil
|
|
}
|
|
return nil, "", nil, fmt.Errorf("could not decrypt message")
|
|
}
|
|
|
|
// Attempt to decrypt, validate and unpack an
|
|
// asymmetrically encrypted message
|
|
// If successful, returns the unpacked whisper ReceivedMessage struct
|
|
// encapsulating the decrypted message, and the byte representation of
|
|
// the public key used to decrypt the message.
|
|
// It fails if decryption of message fails, or if the message is corrupted
|
|
func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
|
|
metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
|
|
|
|
recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
|
|
if err != nil {
|
|
return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
|
|
}
|
|
// check signature (if signed), strip padding
|
|
if !recvmsg.Validate() {
|
|
return nil, "", nil, fmt.Errorf("invalid message")
|
|
}
|
|
pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
|
|
var from PssAddress
|
|
p.pubKeyPoolMu.Lock()
|
|
if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
|
|
from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
|
|
}
|
|
p.pubKeyPoolMu.Unlock()
|
|
return recvmsg, pubkeyid, from, nil
|
|
}
|
|
|
|
// Symkey garbage collection
|
|
// a key is removed if:
|
|
// - it is not marked as protected
|
|
// - it is not in the incoming decryption cache
|
|
func (p *Pss) cleanKeys() (count int) {
|
|
for keyid, peertopics := range p.symKeyPool {
|
|
var expiredtopics []Topic
|
|
for topic, psp := range peertopics {
|
|
if psp.protected {
|
|
continue
|
|
}
|
|
|
|
var match bool
|
|
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
|
|
cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
|
|
if *cacheid == keyid {
|
|
match = true
|
|
}
|
|
}
|
|
if !match {
|
|
expiredtopics = append(expiredtopics, topic)
|
|
}
|
|
}
|
|
for _, topic := range expiredtopics {
|
|
p.symKeyPoolMu.Lock()
|
|
delete(p.symKeyPool[keyid], topic)
|
|
log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
|
|
p.symKeyPoolMu.Unlock()
|
|
count++
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Message sending
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
func (p *Pss) enqueue(msg *PssMsg) error {
|
|
select {
|
|
case p.outbox <- msg:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
|
|
return errors.New("outbox full")
|
|
}
|
|
|
|
// Send a raw message (any encryption is responsibility of calling client)
|
|
//
|
|
// Will fail if raw messages are disallowed
|
|
func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
|
|
if err := validateAddress(address); err != nil {
|
|
return err
|
|
}
|
|
pssMsgParams := &msgParams{
|
|
raw: true,
|
|
}
|
|
payload := &whisper.Envelope{
|
|
Data: msg,
|
|
Topic: whisper.TopicType(topic),
|
|
}
|
|
pssMsg := newPssMsg(pssMsgParams)
|
|
pssMsg.To = address
|
|
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
|
|
pssMsg.Payload = payload
|
|
p.addFwdCache(pssMsg)
|
|
err := p.enqueue(pssMsg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// if we have a proxhandler on this topic
|
|
// also deliver message to ourselves
|
|
if _, ok := p.topicHandlerCaps[topic]; ok {
|
|
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
|
|
return p.process(pssMsg, true, true)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Send a message using symmetric encryption
|
|
//
|
|
// Fails if the key id does not match any of the stored symmetric keys
|
|
func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
|
|
symkey, err := p.GetSymmetricKey(symkeyid)
|
|
if err != nil {
|
|
return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
|
|
}
|
|
p.symKeyPoolMu.Lock()
|
|
psp, ok := p.symKeyPool[symkeyid][topic]
|
|
p.symKeyPoolMu.Unlock()
|
|
if !ok {
|
|
return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
|
|
}
|
|
return p.send(psp.address, topic, msg, false, symkey)
|
|
}
|
|
|
|
// Send a message using asymmetric encryption
|
|
//
|
|
// Fails if the key id does not match any in of the stored public keys
|
|
func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
|
|
if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
|
|
return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
|
|
}
|
|
p.pubKeyPoolMu.Lock()
|
|
psp, ok := p.pubKeyPool[pubkeyid][topic]
|
|
p.pubKeyPoolMu.Unlock()
|
|
if !ok {
|
|
return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
|
|
}
|
|
return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid))
|
|
}
|
|
|
|
// Send is payload agnostic, and will accept any byte slice as payload
|
|
// It generates an whisper envelope for the specified recipient and topic,
|
|
// and wraps the message payload in it.
|
|
// TODO: Implement proper message padding
|
|
func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
|
|
metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
|
|
|
|
if key == nil || bytes.Equal(key, []byte{}) {
|
|
return fmt.Errorf("Zero length key passed to pss send")
|
|
}
|
|
padding := make([]byte, p.paddingByteSize)
|
|
c, err := rand.Read(padding)
|
|
if err != nil {
|
|
return err
|
|
} else if c < p.paddingByteSize {
|
|
return fmt.Errorf("invalid padding length: %d", c)
|
|
}
|
|
wparams := &whisper.MessageParams{
|
|
TTL: defaultWhisperTTL,
|
|
Src: p.privateKey,
|
|
Topic: whisper.TopicType(topic),
|
|
WorkTime: defaultWhisperWorkTime,
|
|
PoW: defaultWhisperPoW,
|
|
Payload: msg,
|
|
Padding: padding,
|
|
}
|
|
if asymmetric {
|
|
pk, err := crypto.UnmarshalPubkey(key)
|
|
if err != nil {
|
|
return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
|
|
}
|
|
wparams.Dst = pk
|
|
} else {
|
|
wparams.KeySym = key
|
|
}
|
|
// set up outgoing message container, which does encryption and envelope wrapping
|
|
woutmsg, err := whisper.NewSentMessage(wparams)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
|
|
}
|
|
// performs encryption.
|
|
// Does NOT perform / performs negligible PoW due to very low difficulty setting
|
|
// after this the message is ready for sending
|
|
envelope, err := woutmsg.Wrap(wparams)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to perform whisper encryption: %v", err)
|
|
}
|
|
log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
|
|
|
|
// prepare for devp2p transport
|
|
pssMsgParams := &msgParams{
|
|
sym: !asymmetric,
|
|
}
|
|
pssMsg := newPssMsg(pssMsgParams)
|
|
pssMsg.To = to
|
|
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
|
|
pssMsg.Payload = envelope
|
|
err = p.enqueue(pssMsg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, ok := p.topicHandlerCaps[topic]; ok {
|
|
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
|
|
return p.process(pssMsg, true, true)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// sendFunc is a helper function that tries to send a message and returns true on success.
|
|
// It is set here for usage in production, and optionally overridden in tests.
|
|
var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg
|
|
|
|
// tries to send a message, returns true if successful
|
|
func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
|
|
var isPssEnabled bool
|
|
info := sp.Info()
|
|
for _, capability := range info.Caps {
|
|
if capability == p.capstring {
|
|
isPssEnabled = true
|
|
break
|
|
}
|
|
}
|
|
if !isPssEnabled {
|
|
log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
|
|
return false
|
|
}
|
|
|
|
// get the protocol peer from the forwarding peer cache
|
|
p.fwdPoolMu.RLock()
|
|
pp := p.fwdPool[sp.Info().ID]
|
|
p.fwdPoolMu.RUnlock()
|
|
|
|
err := pp.Send(context.TODO(), msg)
|
|
if err != nil {
|
|
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
|
|
log.Error(err.Error())
|
|
}
|
|
|
|
return err == nil
|
|
}
|
|
|
|
// Forwards a pss message to the peer(s) based on recipient address according to the algorithm
|
|
// described below. The recipient address can be of any length, and the byte slice will be matched
|
|
// to the MSB slice of the peer address of the equivalent length.
|
|
//
|
|
// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
|
|
// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
|
|
// partial address, it should be forwarded to all the peers matching the partial address, if there
|
|
// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
|
|
// forwarding fails, the node should try to forward it to the next best peer, until the message is
|
|
// successfully forwarded to at least one peer.
|
|
func (p *Pss) forward(msg *PssMsg) error {
|
|
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
|
|
sent := 0 // number of successful sends
|
|
to := make([]byte, addressLength)
|
|
copy(to[:len(msg.To)], msg.To)
|
|
neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
|
|
|
|
// luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
|
|
// but the luminosity is less. here luminosity equals the number of bits given in the destination address.
|
|
luminosityRadius := len(msg.To) * 8
|
|
|
|
// proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
|
|
pof := pot.DefaultPof(neighbourhoodDepth)
|
|
|
|
// soft threshold for msg broadcast
|
|
broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
|
|
if broadcastThreshold > luminosityRadius {
|
|
broadcastThreshold = luminosityRadius
|
|
}
|
|
|
|
var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address
|
|
|
|
// if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
|
|
// call below), then peers that fall in the same proximity bin as recipient address will appear
|
|
// [at least] one bit closer, but only if these additional bits are given in the recipient address.
|
|
if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
|
|
broadcastThreshold++
|
|
onlySendOnce = true
|
|
}
|
|
|
|
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
|
|
if po < broadcastThreshold && sent > 0 {
|
|
return false // stop iterating
|
|
}
|
|
if sendFunc(p, sp, msg) {
|
|
sent++
|
|
if onlySendOnce {
|
|
return false
|
|
}
|
|
if po == addressLength*8 {
|
|
// stop iterating if successfully sent to the exact recipient (perfect match of full address)
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
|
|
// if we failed to send to anyone, re-insert message in the send-queue
|
|
if sent == 0 {
|
|
log.Debug("unable to forward to any peers")
|
|
if err := p.enqueue(msg); err != nil {
|
|
metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
|
|
log.Error(err.Error())
|
|
return err
|
|
}
|
|
}
|
|
|
|
// cache the message
|
|
p.addFwdCache(msg)
|
|
return nil
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Caching
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
// cleanFwdCache is used to periodically remove expired entries from the forward cache
|
|
func (p *Pss) cleanFwdCache() {
|
|
metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
|
|
p.fwdCacheMu.Lock()
|
|
defer p.fwdCacheMu.Unlock()
|
|
for k, v := range p.fwdCache {
|
|
if v.expiresAt.Before(time.Now()) {
|
|
delete(p.fwdCache, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
func label(b []byte) string {
|
|
return fmt.Sprintf("%04x", b[:2])
|
|
}
|
|
|
|
// add a message to the cache
|
|
func (p *Pss) addFwdCache(msg *PssMsg) error {
|
|
metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
|
|
|
|
var entry pssCacheEntry
|
|
var ok bool
|
|
|
|
p.fwdCacheMu.Lock()
|
|
defer p.fwdCacheMu.Unlock()
|
|
|
|
digest := p.digest(msg)
|
|
if entry, ok = p.fwdCache[digest]; !ok {
|
|
entry = pssCacheEntry{}
|
|
}
|
|
entry.expiresAt = time.Now().Add(p.cacheTTL)
|
|
p.fwdCache[digest] = entry
|
|
return nil
|
|
}
|
|
|
|
// check if message is in the cache
|
|
func (p *Pss) checkFwdCache(msg *PssMsg) bool {
|
|
p.fwdCacheMu.Lock()
|
|
defer p.fwdCacheMu.Unlock()
|
|
|
|
digest := p.digest(msg)
|
|
entry, ok := p.fwdCache[digest]
|
|
if ok {
|
|
if entry.expiresAt.After(time.Now()) {
|
|
log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
|
|
metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
|
|
return true
|
|
}
|
|
metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Digest of message
|
|
func (p *Pss) digest(msg *PssMsg) pssDigest {
|
|
return p.digestBytes(msg.serialize())
|
|
}
|
|
|
|
func (p *Pss) digestBytes(msg []byte) pssDigest {
|
|
hasher := p.hashPool.Get().(hash.Hash)
|
|
defer p.hashPool.Put(hasher)
|
|
hasher.Reset()
|
|
hasher.Write(msg)
|
|
digest := pssDigest{}
|
|
key := hasher.Sum(nil)
|
|
copy(digest[:], key[:digestLength])
|
|
return digest
|
|
}
|
|
|
|
func validateAddress(addr PssAddress) error {
|
|
if len(addr) > addressLength {
|
|
return errors.New("address too long")
|
|
}
|
|
return nil
|
|
}
|