Handle unsubscriptions and encoding of messages

This commit is contained in:
Richard Ramos 2021-03-15 17:17:36 -04:00
parent 09fe21baa5
commit cac9aa7b37
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
6 changed files with 95 additions and 90 deletions

3
go.mod
View File

@ -2,6 +2,8 @@ module github.com/status-im/go-waku
go 1.15 go 1.15
replace github.com/libp2p/go-libp2p-pubsub => github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols
require ( require (
github.com/ethereum/go-ethereum v1.10.1 github.com/ethereum/go-ethereum v1.10.1
github.com/golang/protobuf v1.4.3 github.com/golang/protobuf v1.4.3
@ -9,6 +11,7 @@ require (
github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-pubsub v0.4.1 github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/minio/sha256-simd v0.1.1
github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multiaddr-net v0.2.0 github.com/multiformats/go-multiaddr-net v0.2.0
github.com/status-im/status-go/eth-node v1.1.0 // indirect github.com/status-im/status-go/eth-node v1.1.0 // indirect

2
go.sum
View File

@ -785,6 +785,8 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
github.com/status-im/doubleratchet v3.0.0+incompatible h1:aJ1ejcSERpSzmWZBgtfYtiU2nF0Q8ZkGyuEPYETXkCY= github.com/status-im/doubleratchet v3.0.0+incompatible h1:aJ1ejcSERpSzmWZBgtfYtiU2nF0Q8ZkGyuEPYETXkCY=
github.com/status-im/doubleratchet v3.0.0+incompatible/go.mod h1:1sqR0+yhiM/bd+wrdX79AOt2csZuJOni0nUDzKNuqOU= github.com/status-im/doubleratchet v3.0.0+incompatible/go.mod h1:1sqR0+yhiM/bd+wrdX79AOt2csZuJOni0nUDzKNuqOU=
github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols h1:xLGSO/46TTcwu0LhqchpJUvAa31gWxW49Snwr20Q108=
github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/status-im/status-go v0.73.3 h1:0WGov/EX4x0r1hmx4DSPRwyo7BlaL1e9syCket+/3f4= github.com/status-im/status-go v0.73.3 h1:0WGov/EX4x0r1hmx4DSPRwyo7BlaL1e9syCket+/3f4=

60
main.go
View File

@ -15,33 +15,13 @@ import (
ethNodeCrypto "github.com/status-im/status-go/eth-node/crypto" ethNodeCrypto "github.com/status-im/status-go/eth-node/crypto"
) )
/*
func readLoop(sub *pubsub.Subscription, ctx context.Context) {
for {
msg, err := sub.Next(ctx)
if err != nil {
fmt.Println(err)
return
}
cm := new(Test)
err = json.Unmarshal(msg.Data, cm)
if err != nil {
return
}
fmt.Println("Received: " + cm.Message)
}
}
*/
func main() { func main() {
golog.SetAllLoggers(golog.LevelInfo) // Change to INFO for extra info golog.SetAllLoggers(golog.LevelInfo) // Change to INFO for extra info
hostAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:5555") hostAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:60001")
extAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:5555") extAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60001")
key := "9ceff459635becbab13190132172fc9612357696c176a9e2b6e22f28a73a54ce" key := "9ceff459635becbab13190132172fc9612357696c176a9e2b6e22f28a73a54de"
prvKey, err := ethNodeCrypto.HexToECDSA(key) prvKey, err := ethNodeCrypto.HexToECDSA(key)
ctx := context.Background() ctx := context.Background()
@ -58,35 +38,45 @@ func main() {
fmt.Println("Could not subscribe:", err) fmt.Println("Could not subscribe:", err)
} }
go func(sub chan *protocol.WakuMessage) { // Read loop
go func() {
for { for {
fmt.Println("Waiting for a message...") for value := range sub.C {
x := <-sub payload, err := node.DecodePayload(value, &node.KeyInfo{Kind: node.None})
fmt.Println("Received a message: ", string(x.Payload)) if err != nil {
fmt.Println(err)
return
} }
}(sub)
fmt.Println("Received message:", string(payload))
//sub.Unsubscribe()
}
}
}()
// Write loop
go func() {
for { for {
time.Sleep(4 * time.Second) time.Sleep(2 * time.Second)
fmt.Println("Sending 'Hello World'...")
var contentTopic uint32 = 1 var contentTopic uint32 = 1
var version uint32 = 0 var version uint32 = 0
msg := &protocol.WakuMessage{Payload: []byte("Hello World"), Version: &version, ContentTopic: &contentTopic} payload, err := node.Encode([]byte("Hello World"), &node.KeyInfo{Kind: node.None}, 0)
msg := &protocol.WakuMessage{Payload: payload, Version: &version, ContentTopic: &contentTopic}
err = wakuNode.Publish(msg, nil) err = wakuNode.Publish(msg, nil)
if err != nil { if err != nil {
fmt.Println("ERROR SENDING MESSAGE", err) fmt.Println("Error sending a message", err)
} else { } else {
fmt.Println("Sent...") fmt.Println("Sent message...")
} }
} }
}()
// Wait for a SIGINT or SIGTERM signal // Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch <-ch
fmt.Println("Received signal, shutting down...") fmt.Println("\n\n\nReceived signal, shutting down...")
// shut the node down // shut the node down
if err := wakuNode.Stop(); err != nil { if err := wakuNode.Stop(); err != nil {

View File

@ -23,9 +23,9 @@ const (
) )
type KeyInfo struct { type KeyInfo struct {
kind KeyKind Kind KeyKind
symKey []byte SymKey []byte
privKey ecdsa.PrivateKey PrivKey ecdsa.PrivateKey
} }
// NOTICE: Extracted from status-go // NOTICE: Extracted from status-go
@ -68,21 +68,21 @@ func decryptAsymmetric(payload []byte, key *ecdsa.PrivateKey) ([]byte, error) {
return decrypted, err return decrypted, err
} }
func decodePayload(message *protocol.WakuMessage, keyInfo *KeyInfo) ([]byte, error) { func DecodePayload(message *protocol.WakuMessage, keyInfo *KeyInfo) ([]byte, error) {
switch *message.Version { switch *message.Version {
case uint32(0): case uint32(0):
return message.Payload, nil return message.Payload, nil
case uint32(1): case uint32(1):
switch keyInfo.kind { switch keyInfo.Kind {
case Symmetric: case Symmetric:
decoded, err := decryptSymmetric(message.Payload, keyInfo.symKey) decoded, err := decryptSymmetric(message.Payload, keyInfo.SymKey)
if err != nil { if err != nil {
return nil, errors.New("Couldn't decrypt using symmetric key") return nil, errors.New("Couldn't decrypt using symmetric key")
} else { } else {
return decoded, nil return decoded, nil
} }
case Asymmetric: case Asymmetric:
decoded, err := decryptAsymmetric(message.Payload, &keyInfo.privKey) decoded, err := decryptAsymmetric(message.Payload, &keyInfo.PrivKey)
if err != nil { if err != nil {
return nil, errors.New("Couldn't decrypt using asymmetric key") return nil, errors.New("Couldn't decrypt using asymmetric key")
} else { } else {
@ -187,21 +187,21 @@ func generateSecureRandomData(length int) ([]byte, error) {
return res, nil return res, nil
} }
func encode(rawPayload []byte, keyInfo *KeyInfo, version uint32) ([]byte, error) { func Encode(rawPayload []byte, keyInfo *KeyInfo, version uint32) ([]byte, error) {
switch version { switch version {
case 0: case 0:
return rawPayload, nil return rawPayload, nil
case 1: case 1:
switch keyInfo.kind { switch keyInfo.Kind {
case Symmetric: case Symmetric:
encoded, err := encryptSymmetric(rawPayload, keyInfo.symKey) encoded, err := encryptSymmetric(rawPayload, keyInfo.SymKey)
if err != nil { if err != nil {
return nil, errors.New("Couldn't encrypt using symmetric key") return nil, errors.New("Couldn't encrypt using symmetric key")
} else { } else {
return encoded, nil return encoded, nil
} }
case Asymmetric: case Asymmetric:
encoded, err := encryptAsymmetric(rawPayload, &keyInfo.privKey.PublicKey) encoded, err := encryptAsymmetric(rawPayload, &keyInfo.PrivKey.PublicKey)
if err != nil { if err != nil {
return nil, errors.New("Couldn't encrypt using asymmetric key") return nil, errors.New("Couldn't encrypt using asymmetric key")
} else { } else {

View File

@ -40,6 +40,13 @@ type MessagePair struct {
b *protocol.WakuMessage b *protocol.WakuMessage
} }
type Subscription struct {
C chan *protocol.WakuMessage
closed bool
mutex sync.Mutex
quit chan struct{}
}
type WakuNode struct { type WakuNode struct {
// peerManager *PeerManager // peerManager *PeerManager
host host.Host host host.Host
@ -86,8 +93,8 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA
libp2p.Identity(nodeKey), libp2p.Identity(nodeKey),
libp2p.DefaultTransports, // libp2p.DefaultTransports, //
libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts.
libp2p.DisableRelay(), // TODO: what is this? //libp2p.DisableRelay(), // TODO: is this needed?
libp2p.EnableNATService(), // TODO: what is this? //libp2p.EnableNATService(), // TODO: is this needed?
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -147,7 +154,7 @@ func (w *WakuNode) MountRelay() error {
return nil return nil
} }
func (node *WakuNode) Subscribe(topic *Topic) (chan *protocol.WakuMessage, error) { func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
// Subscribes to a PubSub topic. Triggers handler when receiving messages on // Subscribes to a PubSub topic. Triggers handler when receiving messages on
// this topic. TopicHandler is a method that takes a topic and some data. // this topic. TopicHandler is a method that takes a topic and some data.
// NOTE The data field SHOULD be decoded as a WakuMessage. // NOTE The data field SHOULD be decoded as a WakuMessage.
@ -166,38 +173,46 @@ func (node *WakuNode) Subscribe(topic *Topic) (chan *protocol.WakuMessage, error
return nil, err return nil, err
} }
ch := make(chan *protocol.WakuMessage) subscription := new(Subscription)
subscription.closed = false
subscription.C = make(chan *protocol.WakuMessage)
subscription.quit = make(chan struct{})
go func(ctx context.Context, sub *pubsub.Subscription) { go func(ctx context.Context, sub *pubsub.Subscription) {
for { for {
select {
case <-subscription.quit:
close(subscription.C)
subscription.closed = true
return
default:
msg, err := sub.Next(ctx) msg, err := sub.Next(ctx)
if err != nil { if err != nil {
fmt.Println("ERROR RECEIVING: ", err) // TODO: use log lib
return // Should close channel? return // Should close channel?
} }
fmt.Println("Received a message!")
wakuMessage := &protocol.WakuMessage{} wakuMessage := &protocol.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
fmt.Println("ERROR DECODING: ", err) // TODO: use log lib fmt.Println("Error decoding WakuMessage: ", err) // TODO: use log lib
return return
} }
fmt.Println("Decoded a message", wakuMessage.Payload) subscription.C <- wakuMessage
}
ch <- wakuMessage
fmt.Println("Sent to channel")
} }
// TODO: how to quit channel? perhaps using select?
}(node.ctx, sub) }(node.ctx, sub)
return ch, nil return subscription, nil
} }
func (node *WakuNode) Unsubscribe() { func (subs *Subscription) Unsubscribe() {
// Unsubscribes a handler from a PubSub topic. // Unsubscribes a handler from a PubSub topic.
// TODO: subs.mutex.Lock()
defer subs.mutex.Unlock()
if !subs.closed {
close(subs.quit)
}
} }
func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) { func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) {

View File

@ -2,40 +2,35 @@
// //
// See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md // See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
// for spec. // for spec.
package protocol package protocol
import ( import (
"context" "context"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
) )
const WakuRelayCodec = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2") const WakuRelayCodec = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2")
type WakuRelaySubRouter struct { type WakuRelay struct {
*pubsub.GossipSubRouter
p *pubsub.PubSub p *pubsub.PubSub
} }
func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) { func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) {
// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn
//opts = append(opts, pubsub.WithNoAuthor())
//opts = append(opts, pubsub.WithMessageIdFn(messageIdFn))
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
opts = append(opts, pubsub.WithNoAuthor())
gossipSub, err := pubsub.NewGossipSub(ctx, h, opts...) gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayCodec}, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
w := new(WakuRelaySubRouter) w := new(WakuRelay)
w.p = gossipSub w.p = gossipSub
return gossipSub, nil return gossipSub, nil
} }
func (ws *WakuRelaySubRouter) Protocols() []protocol.ID {
return []libp2pProtocol.ID{WakuRelayCodec, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID}
}