diff --git a/go.mod b/go.mod index 828e6479..eeddbc6f 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/status-im/go-waku go 1.15 +replace github.com/libp2p/go-libp2p-pubsub => github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols + require ( github.com/ethereum/go-ethereum v1.10.1 github.com/golang/protobuf v1.4.3 @@ -9,6 +11,7 @@ require ( github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-pubsub v0.4.1 + github.com/minio/sha256-simd v0.1.1 github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multiaddr-net v0.2.0 github.com/status-im/status-go/eth-node v1.1.0 // indirect diff --git a/go.sum b/go.sum index 5041ce8a..fabd1d86 100644 --- a/go.sum +++ b/go.sum @@ -785,6 +785,8 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/status-im/doubleratchet v3.0.0+incompatible h1:aJ1ejcSERpSzmWZBgtfYtiU2nF0Q8ZkGyuEPYETXkCY= github.com/status-im/doubleratchet v3.0.0+incompatible/go.mod h1:1sqR0+yhiM/bd+wrdX79AOt2csZuJOni0nUDzKNuqOU= +github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols h1:xLGSO/46TTcwu0LhqchpJUvAa31gWxW49Snwr20Q108= +github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/status-im/status-go v0.73.3 h1:0WGov/EX4x0r1hmx4DSPRwyo7BlaL1e9syCket+/3f4= diff --git a/main.go b/main.go index 5018c249..e1f8b9ec 100644 --- a/main.go +++ b/main.go @@ -15,33 +15,13 @@ import ( ethNodeCrypto "github.com/status-im/status-go/eth-node/crypto" ) -/* -func readLoop(sub *pubsub.Subscription, ctx context.Context) { - for { - msg, err := sub.Next(ctx) - if err != nil { - fmt.Println(err) - return - } - - cm := new(Test) - err = json.Unmarshal(msg.Data, cm) - if err != nil { - return - } - - fmt.Println("Received: " + cm.Message) - } -} -*/ - func main() { golog.SetAllLoggers(golog.LevelInfo) // Change to INFO for extra info - hostAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:5555") - extAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:5555") + hostAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:60001") + extAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60001") - key := "9ceff459635becbab13190132172fc9612357696c176a9e2b6e22f28a73a54ce" + key := "9ceff459635becbab13190132172fc9612357696c176a9e2b6e22f28a73a54de" prvKey, err := ethNodeCrypto.HexToECDSA(key) ctx := context.Background() @@ -58,35 +38,45 @@ func main() { fmt.Println("Could not subscribe:", err) } - go func(sub chan *protocol.WakuMessage) { + // Read loop + go func() { for { - fmt.Println("Waiting for a message...") - x := <-sub - fmt.Println("Received a message: ", string(x.Payload)) + for value := range sub.C { + payload, err := node.DecodePayload(value, &node.KeyInfo{Kind: node.None}) + if err != nil { + fmt.Println(err) + return + } + + fmt.Println("Received message:", string(payload)) + //sub.Unsubscribe() + } } - }(sub) + }() - for { - time.Sleep(4 * time.Second) - fmt.Println("Sending 'Hello World'...") + // Write loop + go func() { + for { + time.Sleep(2 * time.Second) + var contentTopic uint32 = 1 + var version uint32 = 0 - var contentTopic uint32 = 1 - var version uint32 = 0 - - msg := &protocol.WakuMessage{Payload: []byte("Hello World"), Version: &version, ContentTopic: &contentTopic} - err = wakuNode.Publish(msg, nil) - if err != nil { - fmt.Println("ERROR SENDING MESSAGE", err) - } else { - fmt.Println("Sent...") + payload, err := node.Encode([]byte("Hello World"), &node.KeyInfo{Kind: node.None}, 0) + msg := &protocol.WakuMessage{Payload: payload, Version: &version, ContentTopic: &contentTopic} + err = wakuNode.Publish(msg, nil) + if err != nil { + fmt.Println("Error sending a message", err) + } else { + fmt.Println("Sent message...") + } } - } + }() // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) <-ch - fmt.Println("Received signal, shutting down...") + fmt.Println("\n\n\nReceived signal, shutting down...") // shut the node down if err := wakuNode.Stop(); err != nil { diff --git a/waku/v2/node/waku_payload.go b/waku/v2/node/waku_payload.go index dcfaddde..204bc061 100644 --- a/waku/v2/node/waku_payload.go +++ b/waku/v2/node/waku_payload.go @@ -23,9 +23,9 @@ const ( ) type KeyInfo struct { - kind KeyKind - symKey []byte - privKey ecdsa.PrivateKey + Kind KeyKind + SymKey []byte + PrivKey ecdsa.PrivateKey } // NOTICE: Extracted from status-go @@ -68,21 +68,21 @@ func decryptAsymmetric(payload []byte, key *ecdsa.PrivateKey) ([]byte, error) { return decrypted, err } -func decodePayload(message *protocol.WakuMessage, keyInfo *KeyInfo) ([]byte, error) { +func DecodePayload(message *protocol.WakuMessage, keyInfo *KeyInfo) ([]byte, error) { switch *message.Version { case uint32(0): return message.Payload, nil case uint32(1): - switch keyInfo.kind { + switch keyInfo.Kind { case Symmetric: - decoded, err := decryptSymmetric(message.Payload, keyInfo.symKey) + decoded, err := decryptSymmetric(message.Payload, keyInfo.SymKey) if err != nil { return nil, errors.New("Couldn't decrypt using symmetric key") } else { return decoded, nil } case Asymmetric: - decoded, err := decryptAsymmetric(message.Payload, &keyInfo.privKey) + decoded, err := decryptAsymmetric(message.Payload, &keyInfo.PrivKey) if err != nil { return nil, errors.New("Couldn't decrypt using asymmetric key") } else { @@ -187,21 +187,21 @@ func generateSecureRandomData(length int) ([]byte, error) { return res, nil } -func encode(rawPayload []byte, keyInfo *KeyInfo, version uint32) ([]byte, error) { +func Encode(rawPayload []byte, keyInfo *KeyInfo, version uint32) ([]byte, error) { switch version { case 0: return rawPayload, nil case 1: - switch keyInfo.kind { + switch keyInfo.Kind { case Symmetric: - encoded, err := encryptSymmetric(rawPayload, keyInfo.symKey) + encoded, err := encryptSymmetric(rawPayload, keyInfo.SymKey) if err != nil { return nil, errors.New("Couldn't encrypt using symmetric key") } else { return encoded, nil } case Asymmetric: - encoded, err := encryptAsymmetric(rawPayload, &keyInfo.privKey.PublicKey) + encoded, err := encryptAsymmetric(rawPayload, &keyInfo.PrivKey.PublicKey) if err != nil { return nil, errors.New("Couldn't encrypt using asymmetric key") } else { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index aa4d58bb..6af44f4a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -40,6 +40,13 @@ type MessagePair struct { b *protocol.WakuMessage } +type Subscription struct { + C chan *protocol.WakuMessage + closed bool + mutex sync.Mutex + quit chan struct{} +} + type WakuNode struct { // peerManager *PeerManager host host.Host @@ -84,10 +91,10 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA opts := []libp2p.Option{ libp2p.ListenAddrs(multiAddresses...), libp2p.Identity(nodeKey), - libp2p.DefaultTransports, // - libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. - libp2p.DisableRelay(), // TODO: what is this? - libp2p.EnableNATService(), // TODO: what is this? + libp2p.DefaultTransports, // + libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. + //libp2p.DisableRelay(), // TODO: is this needed? + //libp2p.EnableNATService(), // TODO: is this needed? } ctx, cancel := context.WithCancel(ctx) @@ -147,7 +154,7 @@ func (w *WakuNode) MountRelay() error { return nil } -func (node *WakuNode) Subscribe(topic *Topic) (chan *protocol.WakuMessage, error) { +func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { // Subscribes to a PubSub topic. Triggers handler when receiving messages on // this topic. TopicHandler is a method that takes a topic and some data. // NOTE The data field SHOULD be decoded as a WakuMessage. @@ -166,38 +173,46 @@ func (node *WakuNode) Subscribe(topic *Topic) (chan *protocol.WakuMessage, error return nil, err } - ch := make(chan *protocol.WakuMessage) + subscription := new(Subscription) + subscription.closed = false + subscription.C = make(chan *protocol.WakuMessage) + subscription.quit = make(chan struct{}) go func(ctx context.Context, sub *pubsub.Subscription) { for { - msg, err := sub.Next(ctx) - if err != nil { - fmt.Println("ERROR RECEIVING: ", err) // TODO: use log lib - return // Should close channel? - } - - fmt.Println("Received a message!") - - wakuMessage := &protocol.WakuMessage{} - if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - fmt.Println("ERROR DECODING: ", err) // TODO: use log lib + select { + case <-subscription.quit: + close(subscription.C) + subscription.closed = true return + default: + msg, err := sub.Next(ctx) + + if err != nil { + return // Should close channel? + } + + wakuMessage := &protocol.WakuMessage{} + if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { + fmt.Println("Error decoding WakuMessage: ", err) // TODO: use log lib + return + } + + subscription.C <- wakuMessage } - - fmt.Println("Decoded a message", wakuMessage.Payload) - - ch <- wakuMessage - fmt.Println("Sent to channel") } - // TODO: how to quit channel? perhaps using select? }(node.ctx, sub) - return ch, nil + return subscription, nil } -func (node *WakuNode) Unsubscribe() { +func (subs *Subscription) Unsubscribe() { // Unsubscribes a handler from a PubSub topic. - // TODO: + subs.mutex.Lock() + defer subs.mutex.Unlock() + if !subs.closed { + close(subs.quit) + } } func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) { diff --git a/waku/v2/protocol/waku_relay.go b/waku/v2/protocol/waku_relay.go index 6938a052..0f9258f1 100644 --- a/waku/v2/protocol/waku_relay.go +++ b/waku/v2/protocol/waku_relay.go @@ -2,40 +2,35 @@ // // See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md // for spec. - package protocol import ( "context" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" ) const WakuRelayCodec = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2") -type WakuRelaySubRouter struct { - *pubsub.GossipSubRouter +type WakuRelay struct { p *pubsub.PubSub } func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) { + // Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn + //opts = append(opts, pubsub.WithNoAuthor()) + //opts = append(opts, pubsub.WithMessageIdFn(messageIdFn)) opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - opts = append(opts, pubsub.WithNoAuthor()) - gossipSub, err := pubsub.NewGossipSub(ctx, h, opts...) + gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayCodec}, opts...) if err != nil { return nil, err } - w := new(WakuRelaySubRouter) + w := new(WakuRelay) w.p = gossipSub return gossipSub, nil } - -func (ws *WakuRelaySubRouter) Protocols() []protocol.ID { - return []libp2pProtocol.ID{WakuRelayCodec, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID} -}