package main import ( "chat2/pb" "context" "encoding/hex" "errors" "fmt" "strings" "sync" "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" wrln "github.com/waku-org/go-waku/waku/v2/protocol/rln" "github.com/waku-org/go-waku/waku/v2/utils" "google.golang.org/protobuf/proto" ) // Chat represents a subscription to a single PubSub topic. Messages // can be published to the topic with Chat.Publish, and received // messages are pushed to the Messages channel. type Chat struct { ctx context.Context wg sync.WaitGroup node *node.WakuNode ui UI uiReady chan struct{} inputChan chan string options Options C chan *protocol.Envelope nick string } func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat { chat := &Chat{ ctx: ctx, node: node, options: options, nick: options.Nickname, uiReady: make(chan struct{}, 1), inputChan: make(chan string, 100), } chat.ui = NewUIModel(chat.uiReady, chat.inputChan) topics := options.Relay.Topics.Value() if len(topics) == 0 { topics = append(topics, relay.DefaultWakuTopic) } if options.Filter.Enable { cf := protocol.ContentFilter{ PubsubTopic: relay.DefaultWakuTopic, ContentTopics: protocol.NewContentTopicSet(options.ContentTopic), } var filterOpt filter.FilterSubscribeOption peerID, err := options.Filter.NodePeerID() if err != nil { filterOpt = filter.WithAutomaticPeerSelection() } else { filterOpt = filter.WithPeer(peerID) chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID)) } theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt) if err != nil { chat.ui.ErrorMessage(err) } else { chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified. } } else { for _, topic := range topics { sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { chat.ui.ErrorMessage(err) } else { chat.C = make(chan *protocol.Envelope) go func() { for e := range sub[0].Ch { chat.C <- e } }() } } } chat.wg.Add(7) go chat.parseInput() go chat.receiveMessages() connectionWg := sync.WaitGroup{} connectionWg.Add(2) go chat.welcomeMessage() go chat.connectionWatcher(&connectionWg, connNotifier) go chat.staticNodes(&connectionWg) go chat.discoverNodes(&connectionWg) go chat.retrieveHistory(&connectionWg) return chat } func (c *Chat) Stop() { c.wg.Wait() close(c.inputChan) } func (c *Chat) connectionWatcher(connectionWg *sync.WaitGroup, connNotifier <-chan node.PeerConnection) { defer c.wg.Done() for conn := range connNotifier { if conn.Connected { c.ui.InfoMessage(fmt.Sprintf("Peer %s connected", conn.PeerID.String())) } else { c.ui.InfoMessage(fmt.Sprintf("Peer %s disconnected", conn.PeerID.String())) } } } func (c *Chat) receiveMessages() { defer c.wg.Done() for { select { case <-c.ctx.Done(): return case value := <-c.C: msgContentTopic := value.Message().ContentTopic if msgContentTopic != c.options.ContentTopic { continue // Discard messages from other topics } msg, err := decodeMessage(c.options.ContentTopic, value.Message()) if err == nil { // send valid messages to the UI c.ui.ChatMessage(int64(msg.Timestamp), msg.Nick, string(msg.Payload)) } } } } func (c *Chat) parseInput() { defer c.wg.Done() for { select { case <-c.ctx.Done(): return case line := <-c.inputChan: c.ui.SetSending(true) go func() { defer c.ui.SetSending(false) // bail if requested if line == "/exit" { c.ui.Quit() fmt.Println("Bye!") return } // add peer if strings.HasPrefix(line, "/connect") { peer := strings.TrimPrefix(line, "/connect ") c.wg.Add(1) go func(peer string) { defer c.wg.Done() ma, err := multiaddr.NewMultiaddr(peer) if err != nil { c.ui.ErrorMessage(err) return } peerID, err := ma.ValueForProtocol(multiaddr.P_P2P) if err != nil { c.ui.ErrorMessage(err) return } c.ui.InfoMessage(fmt.Sprintf("Connecting to peer: %s", peerID)) ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second) defer cancel() err = c.node.DialPeerWithMultiAddress(ctx, ma) if err != nil { c.ui.ErrorMessage(err) } }(peer) return } // list peers if line == "/peers" { peers := c.node.Host().Network().Peers() if len(peers) == 0 { c.ui.InfoMessage("No peers available") } else { peerInfoMsg := "Peers: \n" for _, p := range peers { peerInfo := c.node.Host().Peerstore().PeerInfo(p) peerProtocols, err := c.node.Host().Peerstore().GetProtocols(p) if err != nil { c.ui.ErrorMessage(err) return } peerInfoMsg += fmt.Sprintf("• %s:\n", p.String()) var strProtocols []string for _, p := range peerProtocols { strProtocols = append(strProtocols, string(p)) } peerInfoMsg += fmt.Sprintf(" Protocols: %s\n", strings.Join(strProtocols, ", ")) peerInfoMsg += " Addresses:\n" for _, addr := range peerInfo.Addrs { peerInfoMsg += fmt.Sprintf(" - %s/p2p/%s\n", addr.String(), p.String()) } } c.ui.InfoMessage(peerInfoMsg) } return } // change nick if strings.HasPrefix(line, "/nick") { newNick := strings.TrimSpace(strings.TrimPrefix(line, "/nick ")) if newNick != "" { c.nick = newNick } else { c.ui.ErrorMessage(errors.New("invalid nickname")) } return } if line == "/help" { c.ui.InfoMessage(`Available commands: /connect multiaddress - dials a node adding it to the list of connected peers /peers - list of peers connected to this node /nick newNick - change the user's nickname /exit - closes the app`) return } c.SendMessage(line) }() } } } func (c *Chat) SendMessage(line string) { tCtx, cancel := context.WithTimeout(c.ctx, 3*time.Second) defer func() { cancel() }() err := c.publish(tCtx, line) if err != nil { if err.Error() == "validation failed" { err = errors.New("message rate violation") } c.ui.ErrorMessage(err) } } func (c *Chat) publish(ctx context.Context, message string) error { msg := &pb.Chat2Message{ Timestamp: uint64(c.node.Timesource().Now().Unix()), Nick: c.nick, Payload: []byte(message), } msgBytes, err := proto.Marshal(msg) if err != nil { return err } version := uint32(0) timestamp := utils.GetUnixEpochFrom(c.node.Timesource().Now()) keyInfo := &payload.KeyInfo{ Kind: payload.None, } p := new(payload.Payload) p.Data = msgBytes p.Key = keyInfo payload, err := p.Encode(version) if err != nil { return err } wakuMsg := &wpb.WakuMessage{ Payload: payload, Version: proto.Uint32(version), ContentTopic: options.ContentTopic, Timestamp: timestamp, } if c.options.RLNRelay.Enable { // for future version when we support more than one rln protected content topic, // we should check the message content topic as well err = c.node.RLNRelay().AppendRLNProof(wakuMsg, c.node.Timesource().Now()) if err != nil { return err } rateLimitProof, err := wrln.BytesToRateLimitProof(wakuMsg.RateLimitProof) if err != nil { return err } c.ui.InfoMessage(fmt.Sprintf("RLN Epoch: %d", rateLimitProof.Epoch.Uint64())) } if c.options.LightPush.Enable { lightOpt := []lightpush.RequestOption{lightpush.WithDefaultPubsubTopic()} var peerID peer.ID peerID, err = options.LightPush.NodePeerID() if err != nil { lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection()) } else { lightOpt = append(lightOpt, lightpush.WithPeer(peerID)) } _, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...) } else { _, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic()) } return err } func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Message, error) { keyInfo := &payload.KeyInfo{ Kind: payload.None, } payload, err := payload.DecodePayload(wakumsg, keyInfo) if err != nil { return nil, err } msg := &pb.Chat2Message{} if err := proto.Unmarshal(payload.Data, msg); err != nil { return nil, err } return msg, nil } func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) { defer c.wg.Done() connectionWg.Wait() // Wait until node connection operations are done if !c.options.Store.Enable { return } var storeOpt legacy_store.HistoryRequestOption if c.options.Store.Node == nil { c.ui.InfoMessage("No store node configured. Choosing one at random...") storeOpt = legacy_store.WithAutomaticPeerSelection() } else { peerID, err := (*c.options.Store.Node).ValueForProtocol(multiaddr.P_P2P) if err != nil { c.ui.ErrorMessage(err) return } pID, err := peer.Decode(peerID) if err != nil { c.ui.ErrorMessage(err) return } storeOpt = legacy_store.WithPeer(pID) c.ui.InfoMessage(fmt.Sprintf("Querying historic messages from %s", peerID)) } tCtx, cancel := context.WithTimeout(c.ctx, 10*time.Second) defer cancel() q := legacy_store.Query{ ContentTopics: []string{options.ContentTopic}, } response, err := c.node.LegacyStore().Query(tCtx, q, legacy_store.WithAutomaticRequestID(), storeOpt, legacy_store.WithPaging(false, 100)) if err != nil { c.ui.ErrorMessage(fmt.Errorf("could not query storenode: %w", err)) } else { if len(response.Messages) == 0 { c.ui.InfoMessage("0 historic messages available") } else { for _, msg := range response.Messages { c.C <- protocol.NewEnvelope(msg, msg.GetTimestamp(), relay.DefaultWakuTopic) } } } } func (c *Chat) staticNodes(connectionWg *sync.WaitGroup) { defer c.wg.Done() defer connectionWg.Done() <-c.uiReady // wait until UI is ready wg := sync.WaitGroup{} wg.Add(len(options.StaticNodes)) for _, n := range options.StaticNodes { go func(addr multiaddr.Multiaddr) { defer wg.Done() ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second) defer cancel() c.ui.InfoMessage(fmt.Sprintf("Connecting to %s", addr.String())) err := c.node.DialPeerWithMultiAddress(ctx, addr) if err != nil { c.ui.ErrorMessage(err) } }(n) } wg.Wait() } func (c *Chat) welcomeMessage() { defer c.wg.Done() <-c.uiReady // wait until UI is ready c.ui.InfoMessage("Welcome, " + c.nick + "!") c.ui.InfoMessage("type /help to see available commands \n") addrMessage := "Listening on:\n" for _, addr := range c.node.ListenAddresses() { addrMessage += " -" + addr.String() + "\n" } c.ui.InfoMessage(addrMessage) if !c.options.RLNRelay.Enable { return } credential, err := c.node.RLNRelay().IdentityCredential() if err != nil { c.ui.Quit() fmt.Println(err.Error()) } idx := c.node.RLNRelay().MembershipIndex() idTrapdoor := credential.IDTrapdoor idNullifier := credential.IDSecretHash idSecretHash := credential.IDSecretHash idCommitment := credential.IDCommitment rlnMessage := "RLN config:\n" rlnMessage += fmt.Sprintf("- Your membership index is: %d\n", idx) rlnMessage += fmt.Sprintf("- Your rln identity trapdoor is: 0x%s\n", hex.EncodeToString(idTrapdoor[:])) rlnMessage += fmt.Sprintf("- Your rln identity nullifier is: 0x%s\n", hex.EncodeToString(idNullifier[:])) rlnMessage += fmt.Sprintf("- Your rln identity secret hash is: 0x%s\n", hex.EncodeToString(idSecretHash[:])) rlnMessage += fmt.Sprintf("- Your rln identity commitment key is: 0x%s\n", hex.EncodeToString(idCommitment[:])) c.ui.InfoMessage(rlnMessage) } func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) { defer c.wg.Done() defer connectionWg.Done() <-c.uiReady // wait until UI is ready var dnsDiscoveryUrl string if options.Fleet != fleetNone { if options.Fleet == fleetTest { dnsDiscoveryUrl = "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im" } else { // Connect to prod by default dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" } } if options.DNSDiscovery.Enable && options.DNSDiscovery.URL != "" { dnsDiscoveryUrl = options.DNSDiscovery.URL } if dnsDiscoveryUrl != "" { c.ui.InfoMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl)) nodes, err := dnsdisc.RetrieveNodes(c.ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver)) if err != nil { c.ui.ErrorMessage(errors.New(err.Error())) } else { var nodeList []peer.AddrInfo for _, n := range nodes { nodeList = append(nodeList, n.PeerInfo) } c.ui.InfoMessage(fmt.Sprintf("Discovered and connecting to %v ", nodeList)) wg := sync.WaitGroup{} wg.Add(len(nodeList)) for _, n := range nodeList { go func(ctx context.Context, info peer.AddrInfo) { defer wg.Done() ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second) defer cancel() err = c.node.DialPeerWithInfo(ctx, info) if err != nil { c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.String(), err)) } }(c.ctx, n) } wg.Wait() } } }