Refactor/waku tidy (#1957)

* Refactor tidy of waku package

* Added deprecation warning on whisper README.md

* Appeasing the lint gods and testing is good

* Place Whisper deprecation warning in the correct package README

:facepalm

* Implementing changes after team feedback

* More offerings to the lint gods

* Remove apparently redundant context params

* Correctly handle concurrent HandlePeer err

* Revert "Remove apparently redundant context params"

This reverts commit 557dbd0d64.

* Added note to waku/api.go about context
This commit is contained in:
Samuel Hawksby-Robinson 2020-04-30 13:52:48 +01:00 committed by GitHub
parent aa7f591587
commit b025db235f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 192 additions and 121 deletions

View File

@ -70,6 +70,8 @@ type Info struct {
MaxMessageSize uint32 `json:"maxMessageSize"` // Maximum accepted message size MaxMessageSize uint32 `json:"maxMessageSize"` // Maximum accepted message size
} }
// Context is used higher up the food-chain and without significant refactoring is not a simple thing to remove / change
// Info returns diagnostic information about the waku node. // Info returns diagnostic information about the waku node.
func (api *PublicWakuAPI) Info(ctx context.Context) Info { func (api *PublicWakuAPI) Info(ctx context.Context) Info {
return Info{ return Info{
@ -216,12 +218,6 @@ type NewMessage struct {
TargetPeer string `json:"targetPeer"` TargetPeer string `json:"targetPeer"`
} }
type newMessageOverride struct { // nolint: deadcode,unused
PublicKey hexutil.Bytes
Payload hexutil.Bytes
Padding hexutil.Bytes
}
// Post posts a message on the Waku network. // Post posts a message on the Waku network.
// returns the hash of the message in case of success. // returns the hash of the message in case of success.
func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Bytes, error) { func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Bytes, error) {
@ -363,10 +359,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub
} }
} }
for i, bt := range crit.Topics { for _, bt := range crit.Topics {
if len(bt) == 0 || len(bt) > 4 {
return nil, fmt.Errorf("subscribe: topic %d has wrong size: %d", i, len(bt))
}
filter.Topics = append(filter.Topics, bt[:]) filter.Topics = append(filter.Topics, bt[:])
} }
@ -442,14 +435,6 @@ type Message struct {
P2P bool `json:"bool,omitempty"` P2P bool `json:"bool,omitempty"`
} }
type messageOverride struct { // nolint: deadcode,unused
Sig hexutil.Bytes
Payload hexutil.Bytes
Padding hexutil.Bytes
Hash hexutil.Bytes
Dst hexutil.Bytes
}
// ToWakuMessage converts an internal message into an API version. // ToWakuMessage converts an internal message into an API version.
func ToWakuMessage(message *common.ReceivedMessage) *Message { func ToWakuMessage(message *common.ReceivedMessage) *Message {
msg := Message{ msg := Message{

View File

@ -52,7 +52,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
} }
found := false found := false
candidates := w.filters.GetWatchersByTopic(common.TopicType(t1)) candidates := w.filters.GetWatchersByTopic(t1)
for _, f := range candidates { for _, f := range candidates {
if len(f.Topics) == 2 { if len(f.Topics) == 2 {
if bytes.Equal(f.Topics[0], t1[:]) && bytes.Equal(f.Topics[1], t2[:]) { if bytes.Equal(f.Topics[0], t1[:]) && bytes.Equal(f.Topics[1], t2[:]) {

View File

@ -91,15 +91,15 @@ func (e *Envelope) Seal(options *MessageParams) error {
target = e.powToFirstBit(options.PoW) target = e.powToFirstBit(options.PoW)
} }
rlp := e.rlpWithoutNonce() rwn := e.rlpWithoutNonce()
buf := make([]byte, len(rlp)+8) buf := make([]byte, len(rwn)+8)
copy(buf, rlp) copy(buf, rwn)
asAnInt := new(big.Int) asAnInt := new(big.Int)
finish := time.Now().Add(time.Duration(options.WorkTime) * time.Second).UnixNano() finish := time.Now().Add(time.Duration(options.WorkTime) * time.Second).UnixNano()
for nonce := uint64(0); time.Now().UnixNano() < finish; { for nonce := uint64(0); time.Now().UnixNano() < finish; {
for i := 0; i < 1024; i++ { for i := 0; i < 1024; i++ {
binary.BigEndian.PutUint64(buf[len(rlp):], nonce) binary.BigEndian.PutUint64(buf[len(rwn):], nonce)
h := crypto.Keccak256(buf) h := crypto.Keccak256(buf)
asAnInt.SetBytes(h) asAnInt.SetBytes(h)
leadingZeros := 256 - asAnInt.BitLen() leadingZeros := 256 - asAnInt.BitLen()
@ -130,14 +130,14 @@ func (e *Envelope) PoW() float64 {
} }
func (e *Envelope) CalculatePoW(diff uint32) { func (e *Envelope) CalculatePoW(diff uint32) {
rlp := e.rlpWithoutNonce() rwn := e.rlpWithoutNonce()
buf := make([]byte, len(rlp)+8) buf := make([]byte, len(rwn)+8)
copy(buf, rlp) copy(buf, rwn)
binary.BigEndian.PutUint64(buf[len(rlp):], e.Nonce) binary.BigEndian.PutUint64(buf[len(rwn):], e.Nonce)
powHash := new(big.Int).SetBytes(crypto.Keccak256(buf)) powHash := new(big.Int).SetBytes(crypto.Keccak256(buf))
leadingZeroes := 256 - powHash.BitLen() leadingZeroes := 256 - powHash.BitLen()
x := math.Pow(2, float64(leadingZeroes)) x := math.Pow(2, float64(leadingZeroes))
x /= float64(len(rlp)) x /= float64(len(rwn))
x /= float64(e.TTL + diff) x /= float64(e.TTL + diff)
e.pow = x e.pow = x
} }
@ -266,7 +266,7 @@ func TopicToBloom(topic TopicType) []byte {
for j := 0; j < 3; j++ { for j := 0; j < 3; j++ {
byteIndex := index[j] / 8 byteIndex := index[j] / 8
bitIndex := index[j] % 8 bitIndex := index[j] % 8
b[byteIndex] = (1 << uint(bitIndex)) b[byteIndex] = 1 << uint(bitIndex)
} }
return b return b
} }

View File

@ -85,7 +85,7 @@ func generateTestCases(t *testing.T, SizeTestFilters int) []FilterTestCase {
for i := 0; i < SizeTestFilters; i++ { for i := 0; i < SizeTestFilters; i++ {
f, _ := generateFilter(t, true) f, _ := generateFilter(t, true)
cases[i].f = f cases[i].f = f
cases[i].alive = mrand.Int()&int(1) == 0 // nolint: gosec cases[i].alive = mrand.Int()&1 == 0 // nolint: gosec
} }
return cases return cases
} }

View File

@ -220,7 +220,7 @@ func (msg *sentMessage) sign(key *ecdsa.PrivateKey) error {
hash := crypto.Keccak256(msg.Raw) hash := crypto.Keccak256(msg.Raw)
signature, err := crypto.Sign(hash, key) signature, err := crypto.Sign(hash, key)
if err != nil { if err != nil {
msg.Raw[0] &= (0xFF ^ signatureFlag) // clear the flag msg.Raw[0] &= 0xFF ^ signatureFlag // clear the flag
return err return err
} }
msg.Raw = append(msg.Raw, signature...) msg.Raw = append(msg.Raw, signature...)

View File

@ -26,6 +26,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -187,7 +189,10 @@ func TestMessageSeal(t *testing.T) {
target := 32.0 target := 32.0
params.WorkTime = 4 params.WorkTime = 4
params.PoW = target params.PoW = target
env.Seal(params) // nolint: errcheck err = env.Seal(params)
if err != nil {
t.Logf("failed to seal envelope: %s", err)
}
env.CalculatePoW(0) env.CalculatePoW(0)
pow := env.PoW() pow := env.PoW()
@ -195,9 +200,11 @@ func TestMessageSeal(t *testing.T) {
t.Fatalf("failed Wrap with seed %d: pow < target (%f vs. %f).", seed, pow, target) t.Fatalf("failed Wrap with seed %d: pow < target (%f vs. %f).", seed, pow, target)
} }
// Seal should fail as WorkTime is significantly lower than PoW would require
params.WorkTime = 1 params.WorkTime = 1
params.PoW = 1000000000.0 params.PoW = 1000000000.0
env.Seal(params) // nolint: errcheck err = env.Seal(params)
require.EqualError(t, err, "failed to reach the PoW target, specified pow time (1 seconds) was insufficient")
env.CalculatePoW(0) env.CalculatePoW(0)
pow = env.PoW() pow = env.PoW()
if pow < 2*target { if pow < 2*target {

View File

@ -122,8 +122,8 @@ type PeerRateLimiter struct {
func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandler) *PeerRateLimiter { func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandler) *PeerRateLimiter {
if cfg == nil { if cfg == nil {
copy := defaultPeerRateLimiterConfig cfgCopy := defaultPeerRateLimiterConfig
cfg = &copy cfg = &cfgCopy
} }
return &PeerRateLimiter{ return &PeerRateLimiter{
@ -138,11 +138,20 @@ func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandl
} }
func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runLoop runLoop) error { func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runLoop runLoop) error {
in, out := p2p.MsgPipe()
defer in.Close()
defer out.Close()
errC := make(chan error, 1) errC := make(chan error, 1)
in, out := p2p.MsgPipe()
defer func() {
if err := in.Close(); err != nil {
errC <- err
}
}()
defer func() {
if err := out.Close(); err != nil {
errC <- err
}
}()
// Read from the original reader and write to the message pipe. // Read from the original reader and write to the message pipe.
go func() { go func() {
for { for {

View File

@ -11,7 +11,6 @@ import (
) )
func TestEncodeDecodeRLP(t *testing.T) { func TestEncodeDecodeRLP(t *testing.T) {
initRLPKeyFields()
pow := math.Float64bits(6.02) pow := math.Float64bits(6.02)
lightNodeEnabled := true lightNodeEnabled := true
confirmationsEnabled := true confirmationsEnabled := true

View File

@ -21,7 +21,6 @@ package waku
import ( import (
"bytes" "bytes"
"crypto/ecdsa" "crypto/ecdsa"
"fmt"
mrand "math/rand" mrand "math/rand"
"net" "net"
"sync" "sync"
@ -104,9 +103,7 @@ var unexpectedMessage = []byte("per rectum ad astra")
var masterBloomFilter []byte var masterBloomFilter []byte
var masterPow = 0.00000001 var masterPow = 0.00000001
var round = 1 var round = 1
var debugMode = false
var prevTime time.Time var prevTime time.Time
var cntPrev int
func TestSimulationBloomFilter(t *testing.T) { func TestSimulationBloomFilter(t *testing.T) {
// create a chain of waku nodes, // create a chain of waku nodes,
@ -129,7 +126,7 @@ func TestSimulationBloomFilter(t *testing.T) {
checkPowExchange(t) checkPowExchange(t)
// send new pow and bloom exchange messages // send new pow and bloom exchange messages
resetParams(t) resetParams()
// node #1 sends one expected (decryptable) message // node #1 sends one expected (decryptable) message
sendMsg(t, true, 1) sendMsg(t, true, 1)
@ -144,7 +141,7 @@ func TestSimulationBloomFilter(t *testing.T) {
stopServers() stopServers()
} }
func resetParams(t *testing.T) { func resetParams() {
// change pow only for node zero // change pow only for node zero
masterPow = 7777777.0 masterPow = 7777777.0
_ = nodes[0].waku.SetMinimumPoW(masterPow, true) _ = nodes[0].waku.SetMinimumPoW(masterPow, true)
@ -338,15 +335,6 @@ func checkTestStatus() {
cnt++ cnt++
} }
} }
if debugMode {
if cntPrev != cnt {
fmt.Printf(" %v \t number of nodes that have received all msgs: %d, number of peers per node: %v \n",
time.Since(prevTime), cnt, arr)
prevTime = time.Now()
cntPrev = cnt
}
}
} }
func isTestComplete() bool { func isTestComplete() bool {
@ -425,7 +413,7 @@ func TestPeerBasic(t *testing.T) {
func checkPowExchangeForNodeZero(t *testing.T) { func checkPowExchangeForNodeZero(t *testing.T) {
const iterations = 200 const iterations = 200
for j := 0; j < iterations; j++ { for j := 0; j < iterations; j++ {
lastCycle := (j == iterations-1) lastCycle := j == iterations-1
ok := checkPowExchangeForNodeZeroOnce(t, lastCycle) ok := checkPowExchangeForNodeZeroOnce(t, lastCycle)
if ok { if ok {
break break
@ -490,7 +478,7 @@ func checkBloomFilterExchangeOnce(t *testing.T, mustPass bool) bool {
func checkBloomFilterExchange(t *testing.T) { func checkBloomFilterExchange(t *testing.T) {
const iterations = 200 const iterations = 200
for j := 0; j < iterations; j++ { for j := 0; j < iterations; j++ {
lastCycle := (j == iterations-1) lastCycle := j == iterations-1
ok := checkBloomFilterExchangeOnce(t, lastCycle) ok := checkBloomFilterExchangeOnce(t, lastCycle)
if ok { if ok {
break break

View File

@ -860,9 +860,6 @@ func (w *Waku) AddSymKeyFromPassword(password string) (string, error) {
// kdf should run no less than 0.1 seconds on an average computer, // kdf should run no less than 0.1 seconds on an average computer,
// because it's an once in a session experience // because it's an once in a session experience
derived := pbkdf2.Key([]byte(password), nil, 65356, common.AESKeyLength, sha256.New) derived := pbkdf2.Key([]byte(password), nil, 65356, common.AESKeyLength, sha256.New)
if err != nil {
return "", err
}
w.keyMu.Lock() w.keyMu.Lock()
defer w.keyMu.Unlock() defer w.keyMu.Unlock()
@ -971,7 +968,7 @@ func (w *Waku) GetFilter(id string) *common.Filter {
func (w *Waku) Unsubscribe(id string) error { func (w *Waku) Unsubscribe(id string) error {
ok := w.filters.Uninstall(id) ok := w.filters.Uninstall(id)
if !ok { if !ok {
return fmt.Errorf("Unsubscribe: Invalid ID") return fmt.Errorf("failed to unsubscribe: invalid ID '%s'", id)
} }
return nil return nil
} }
@ -1091,12 +1088,12 @@ func (w *Waku) OnMessagesRequest(request common.MessagesRequest, p common.Peer)
} }
func (w *Waku) OnP2PRequestCompleted(payload []byte, p common.Peer) error { func (w *Waku) OnP2PRequestCompleted(payload []byte, p common.Peer) error {
event, err := CreateMailServerEvent(p.EnodeID(), payload) msEvent, err := CreateMailServerEvent(p.EnodeID(), payload)
if err != nil { if err != nil {
return fmt.Errorf("invalid p2p request complete payload: %v", err) return fmt.Errorf("invalid p2p request complete payload: %v", err)
} }
w.postP2P(*event) w.postP2P(*msEvent)
return nil return nil
} }
@ -1314,16 +1311,16 @@ func (w *Waku) processP2P() {
case <-w.quit: case <-w.quit:
return return
case e := <-w.p2pMsgQueue: case e := <-w.p2pMsgQueue:
switch event := e.(type) { switch evn := e.(type) {
case *common.Envelope: case *common.Envelope:
w.filters.NotifyWatchers(event, true) w.filters.NotifyWatchers(evn, true)
w.envelopeFeed.Send(common.EnvelopeEvent{ w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: event.Topic, Topic: evn.Topic,
Hash: event.Hash(), Hash: evn.Hash(),
Event: common.EventEnvelopeAvailable, Event: common.EventEnvelopeAvailable,
}) })
case common.EnvelopeEvent: case common.EnvelopeEvent:
w.envelopeFeed.Send(event) w.envelopeFeed.Send(evn)
} }
} }
} }

View File

@ -46,7 +46,7 @@ var seed int64
// InitSingleTest should be called in the beginning of every // InitSingleTest should be called in the beginning of every
// test, which uses RNG, in order to make the tests // test, which uses RNG, in order to make the tests
// reproduciblity independent of their sequence. // reproducibility independent of their sequence.
func InitSingleTest() { func InitSingleTest() {
seed = time.Now().Unix() seed = time.Now().Unix()
mrand.Seed(seed) mrand.Seed(seed)
@ -404,9 +404,6 @@ func TestSymKeyManagement(t *testing.T) {
if !w.HasSymKey(id2) { if !w.HasSymKey(id2) {
t.Fatalf("failed to delete first key: second key does not exist.") t.Fatalf("failed to delete first key: second key does not exist.")
} }
if k1 != nil {
t.Fatalf("failed to delete first key.")
}
if k2 == nil { if k2 == nil {
t.Fatalf("failed to delete first key: second key is nil.") t.Fatalf("failed to delete first key: second key is nil.")
} }
@ -430,12 +427,6 @@ func TestSymKeyManagement(t *testing.T) {
if w.HasSymKey(id2) { if w.HasSymKey(id2) {
t.Fatalf("failed to delete second key: still exist.") t.Fatalf("failed to delete second key: still exist.")
} }
if k1 != nil {
t.Fatalf("failed to delete second key: first key is not nil.")
}
if k2 != nil {
t.Fatalf("failed to delete second key: second key is not nil.")
}
randomKey = make([]byte, common.AESKeyLength+1) randomKey = make([]byte, common.AESKeyLength+1)
mrand.Read(randomKey) // nolint: gosec mrand.Read(randomKey) // nolint: gosec
@ -484,12 +475,16 @@ func TestExpiry(t *testing.T) {
t.Fatal("failed to set min pow") t.Fatal("failed to set min pow")
} }
defer w.SetMinimumPoW(common.DefaultMinimumPoW, false) // nolint: errcheck defer func() {
handleError(t, w.SetMinimumPoW(common.DefaultMinimumPoW, false))
}()
err = w.Start(nil) err = w.Start(nil)
if err != nil { if err != nil {
t.Fatal("failed to start waku") t.Fatal("failed to start waku")
} }
defer w.Stop() // nolint: errcheck defer func() {
handleError(t, w.Stop())
}()
params, err := generateMessageParams() params, err := generateMessageParams()
if err != nil { if err != nil {
@ -550,12 +545,18 @@ func TestCustomization(t *testing.T) {
InitSingleTest() InitSingleTest()
w := New(nil, nil) w := New(nil, nil)
defer w.SetMinimumPoW(common.DefaultMinimumPoW, false) // nolint: errcheck defer func() {
defer w.SetMaxMessageSize(common.DefaultMaxMessageSize) // nolint: errcheck handleError(t, w.SetMinimumPoW(common.DefaultMinimumPoW, false))
}()
defer func() {
handleError(t, w.SetMaxMessageSize(common.DefaultMaxMessageSize))
}()
if err := w.Start(nil); err != nil { if err := w.Start(nil); err != nil {
t.Fatal("failed to start node") t.Fatal("failed to start node")
} }
defer w.Stop() // nolint: errcheck defer func() {
handleError(t, w.Stop())
}()
const smallPoW = 0.00001 const smallPoW = 0.00001
@ -643,13 +644,19 @@ func TestSymmetricSendCycle(t *testing.T) {
InitSingleTest() InitSingleTest()
w := New(nil, nil) w := New(nil, nil)
defer w.SetMinimumPoW(common.DefaultMinimumPoW, false) // nolint: errcheck defer func() {
defer w.SetMaxMessageSize(common.DefaultMaxMessageSize) // nolint: errcheck handleError(t, w.SetMinimumPoW(common.DefaultMinimumPoW, false))
}()
defer func() {
handleError(t, w.SetMaxMessageSize(common.DefaultMaxMessageSize))
}()
err := w.Start(nil) err := w.Start(nil)
if err != nil { if err != nil {
t.Fatal("failed to start node") t.Fatal("failed to start node")
} }
defer w.Stop() // nolint: errcheck defer func() {
handleError(t, w.Stop())
}()
filter1, err := generateFilter(t, true) filter1, err := generateFilter(t, true)
if err != nil { if err != nil {
@ -735,12 +742,18 @@ func TestSymmetricSendCycleWithTopicInterest(t *testing.T) {
InitSingleTest() InitSingleTest()
w := New(nil, nil) w := New(nil, nil)
defer w.SetMinimumPoW(common.DefaultMinimumPoW, false) // nolint: errcheck defer func() {
defer w.SetMaxMessageSize(common.DefaultMaxMessageSize) // nolint: errcheck handleError(t, w.SetMinimumPoW(common.DefaultMinimumPoW, false))
}()
defer func() {
handleError(t, w.SetMaxMessageSize(common.DefaultMaxMessageSize))
}()
if err := w.Start(nil); err != nil { if err := w.Start(nil); err != nil {
t.Fatal("could not start node") t.Fatal("could not start node")
} }
defer w.Stop() // nolint: errcheck defer func() {
handleError(t, w.Stop())
}()
filter1, err := generateFilter(t, true) filter1, err := generateFilter(t, true)
if err != nil { if err != nil {
@ -826,10 +839,19 @@ func TestSymmetricSendWithoutAKey(t *testing.T) {
InitSingleTest() InitSingleTest()
w := New(nil, nil) w := New(nil, nil)
defer w.SetMinimumPoW(common.DefaultMinimumPoW, false) // nolint: errcheck if err := w.Start(nil); err != nil {
defer w.SetMaxMessageSize(common.DefaultMaxMessageSize) // nolint: errcheck t.Errorf("failed to start waku: '%s'", err)
w.Start(nil) // nolint: errcheck }
defer w.Stop() // nolint: errcheck
defer func() {
handleError(t, w.SetMinimumPoW(common.DefaultMinimumPoW, false))
}()
defer func() {
handleError(t, w.SetMaxMessageSize(common.DefaultMaxMessageSize))
}()
defer func() {
handleError(t, w.Stop())
}()
filter, err := generateFilter(t, true) filter, err := generateFilter(t, true)
if err != nil { if err != nil {
@ -894,10 +916,18 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
InitSingleTest() InitSingleTest()
w := New(nil, nil) w := New(nil, nil)
defer w.SetMinimumPoW(common.DefaultMinimumPoW, false) // nolint: errcheck if err := w.Start(nil); err != nil {
defer w.SetMaxMessageSize(common.DefaultMaxMessageSize) // nolint: errcheck t.Errorf("failed to start waku: '%s'", err)
w.Start(nil) // nolint: errcheck }
defer w.Stop() // nolint: errcheck defer func() {
handleError(t, w.SetMinimumPoW(common.DefaultMinimumPoW, false))
}()
defer func() {
handleError(t, w.SetMaxMessageSize(common.DefaultMaxMessageSize))
}()
defer func() {
handleError(t, w.Stop())
}()
filter, err := generateFilter(t, true) filter, err := generateFilter(t, true)
if err != nil { if err != nil {
@ -1061,15 +1091,27 @@ func TestHandleP2PMessageCode(t *testing.T) {
InitSingleTest() InitSingleTest()
w1 := New(nil, nil) w1 := New(nil, nil)
w1.SetMinimumPoW(0.0000001, false) // nolint: errcheck if err := w1.SetMinimumPoW(0.0000001, false); err != nil {
w1.Start(nil) // nolint: errcheck t.Error(err)
}
if err := w1.Start(nil); err != nil {
t.Error(err)
}
defer w1.Stop() // nolint: errcheck defer func() {
handleError(t, w1.Stop())
}()
w2 := New(nil, nil) w2 := New(nil, nil)
w2.SetMinimumPoW(0.0000001, false) // nolint: errcheck if err := w2.SetMinimumPoW(0.0000001, false); err != nil {
w2.Start(nil) // nolint: errcheck t.Error(err)
defer w2.Stop() // nolint: errcheck }
if err := w2.Start(nil); err != nil {
t.Error(err)
}
defer func() {
handleError(t, w2.Stop())
}()
envelopeEvents := make(chan common.EnvelopeEvent, 10) envelopeEvents := make(chan common.EnvelopeEvent, 10)
sub := w1.SubscribeEnvelopeEvents(envelopeEvents) sub := w1.SubscribeEnvelopeEvents(envelopeEvents)
@ -1102,8 +1144,12 @@ func TestHandleP2PMessageCode(t *testing.T) {
case err := <-errorc: case err := <-errorc:
t.Log(err) t.Log(err)
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
rw1.Close() if err := rw1.Close(); err != nil {
rw2.Close() t.Error(err)
}
if err := rw2.Close(); err != nil {
t.Error(err)
}
} }
}() }()
@ -1138,7 +1184,9 @@ func testConfirmationsHandshake(t *testing.T, expectConfirmations bool) {
}() }()
// so that actual read won't hang forever // so that actual read won't hang forever
time.AfterFunc(5*time.Second, func() { time.AfterFunc(5*time.Second, func() {
rw1.Close() if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}) })
require.NoError( require.NoError(
t, t,
@ -1182,7 +1230,9 @@ func TestConfirmationReceived(t *testing.T) {
case err := <-errorc: case err := <-errorc:
t.Log(err) t.Log(err)
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
rw1.Close() if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
} }
}() }()
pow := math.Float64bits(w.MinPow()) pow := math.Float64bits(w.MinPow())
@ -1239,8 +1289,12 @@ func TestMessagesResponseWithError(t *testing.T) {
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}) p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}})
rw1, rw2 := p2p.MsgPipe() rw1, rw2 := p2p.MsgPipe()
defer func() { defer func() {
rw1.Close() if err := rw1.Close(); err != nil {
rw2.Close() t.Errorf("error closing MsgPipe 1, '%s'", err)
}
if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe 2, '%s'", err)
}
}() }()
errorc := make(chan error, 1) errorc := make(chan error, 1)
go func() { go func() {
@ -1321,7 +1375,9 @@ func testConfirmationEvents(t *testing.T, envelope common.Envelope, envelopeErro
errorc <- err errorc <- err
}() }()
time.AfterFunc(5*time.Second, func() { time.AfterFunc(5*time.Second, func() {
rw1.Close() if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}) })
pow := math.Float64bits(w.MinPow()) pow := math.Float64bits(w.MinPow())
@ -1419,7 +1475,9 @@ func TestEventsWithoutConfirmation(t *testing.T) {
errorc <- err errorc <- err
}() }()
time.AfterFunc(5*time.Second, func() { time.AfterFunc(5*time.Second, func() {
rw1.Close() if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}) })
pow := math.Float64bits(w.MinPow()) pow := math.Float64bits(w.MinPow())
@ -1491,15 +1549,19 @@ func TestWakuTimeDesyncEnvelopeIgnored(t *testing.T) {
} }
rw1, rw2 := p2p.MsgPipe() rw1, rw2 := p2p.MsgPipe()
defer func() { defer func() {
rw1.Close() if err := rw1.Close(); err != nil {
rw2.Close() t.Errorf("error closing MsgPipe, '%s'", err)
}
if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}() }()
p1 := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) p1 := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
p2 := p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"shh", 6}}) p2 := p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"shh", 6}})
w1, w2 := New(c, nil), New(c, nil) w1, w2 := New(c, nil), New(c, nil)
errc := make(chan error) errc := make(chan error)
go func() { go func() {
w1.HandlePeer(p2, rw2) // nolint: errcheck errc <- w1.HandlePeer(p2, rw2)
}() }()
go func() { go func() {
errc <- w2.HandlePeer(p1, rw1) errc <- w2.HandlePeer(p1, rw1)
@ -1519,7 +1581,9 @@ func TestWakuTimeDesyncEnvelopeIgnored(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
case <-time.After(time.Second): case <-time.After(time.Second):
} }
rw2.Close() if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
select { select {
case err := <-errc: case err := <-errc:
require.Error(t, err, "p2p: read or write on closed message pipe") require.Error(t, err, "p2p: read or write on closed message pipe")
@ -1532,7 +1596,9 @@ func TestRequestSentEventWithExpiry(t *testing.T) {
w := New(nil, nil) w := New(nil, nil)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw := discardPipe() rw := discardPipe()
defer rw.Close() defer func() {
handleError(t, rw.Close())
}()
w.peers[v0.NewPeer(w, p, rw, nil)] = struct{}{} w.peers[v0.NewPeer(w, p, rw, nil)] = struct{}{}
events := make(chan common.EnvelopeEvent, 1) events := make(chan common.EnvelopeEvent, 1)
sub := w.SubscribeEnvelopeEvents(events) sub := w.SubscribeEnvelopeEvents(events)
@ -1598,8 +1664,12 @@ func TestRateLimiterIntegration(t *testing.T) {
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}) p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}})
rw1, rw2 := p2p.MsgPipe() rw1, rw2 := p2p.MsgPipe()
defer func() { defer func() {
rw1.Close() if err := rw1.Close(); err != nil {
rw2.Close() t.Errorf("error closing MsgPipe, '%s'", err)
}
if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}() }()
errorc := make(chan error, 1) errorc := make(chan error, 1)
go func() { go func() {
@ -1629,7 +1699,9 @@ func TestRateLimiterIntegration(t *testing.T) {
func TestMailserverCompletionEvent(t *testing.T) { func TestMailserverCompletionEvent(t *testing.T) {
w1 := New(nil, nil) w1 := New(nil, nil)
require.NoError(t, w1.Start(nil)) require.NoError(t, w1.Start(nil))
defer w1.Stop() // nolint: errcheck defer func() {
handleError(t, w1.Stop())
}()
rw1, rw2 := p2p.MsgPipe() rw1, rw2 := p2p.MsgPipe()
peer1 := v0.NewPeer(w1, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil) peer1 := v0.NewPeer(w1, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil)
@ -1638,7 +1710,9 @@ func TestMailserverCompletionEvent(t *testing.T) {
w2 := New(nil, nil) w2 := New(nil, nil)
require.NoError(t, w2.Start(nil)) require.NoError(t, w2.Start(nil))
defer w2.Stop() // nolint: errcheck defer func() {
handleError(t, w2.Stop())
}()
peer2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil) peer2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil)
peer2.SetPeerTrusted(true) peer2.SetPeerTrusted(true)
@ -1650,11 +1724,10 @@ func TestMailserverCompletionEvent(t *testing.T) {
envelopes := []*common.Envelope{{Data: []byte{1}}, {Data: []byte{2}}} envelopes := []*common.Envelope{{Data: []byte{1}}, {Data: []byte{2}}}
go func() { go func() {
require.NoError(t, peer2.Start()) require.NoError(t, peer2.Start())
require.NoError(t, p2p.Send(rw2, v0.P2PMessageCode, envelopes)) require.NoError(t, p2p.Send(rw2, v0.P2PMessageCode, envelopes))
require.NoError(t, p2p.Send(rw2, v0.P2PRequestCompleteCode, [100]byte{})) // 2 hashes + cursor size require.NoError(t, p2p.Send(rw2, v0.P2PRequestCompleteCode, [100]byte{})) // 2 hashes + cursor size
rw2.Close() require.NoError(t, rw2.Close())
}() }()
require.NoError(t, peer1.Start(), "p2p: read or write on closed message pipe") require.NoError(t, peer1.Start(), "p2p: read or write on closed message pipe")
@ -1672,13 +1745,19 @@ func TestMailserverCompletionEvent(t *testing.T) {
count++ count++
case common.EventMailServerRequestCompleted: case common.EventMailServerRequestCompleted:
require.Equal(t, count, len(envelopes), require.Equal(t, count, len(envelopes),
"all envelope.avaiable events mut be recevied before request is compelted") "all envelope.available events mut be received before request is completed")
return return
} }
} }
} }
} }
func handleError(t *testing.T, err error) {
if err != nil {
t.Logf("deferred function error: '%s'", err)
}
}
func generateFilter(t *testing.T, symmetric bool) (*common.Filter, error) { func generateFilter(t *testing.T, symmetric bool) (*common.Filter, error) {
var f common.Filter var f common.Filter
f.Messages = common.NewMemoryMessageStore() f.Messages = common.NewMemoryMessageStore()

7
whisper/README.md Normal file
View File

@ -0,0 +1,7 @@
# Whisper
Go implementation of the [Whisper specifications](https://geth.ethereum.org/docs/whisper/whisper-overview)
# Deprecation Warning!
This package is **DEPRECATED** and, except for security patches, should not receive any further updates.