diff --git a/Makefile b/Makefile index 56fc3b55..79968a68 100644 --- a/Makefile +++ b/Makefile @@ -72,7 +72,7 @@ lint: @golangci-lint --exclude=SA1019 run ./... --deadline=5m test: - ${GOBIN} test ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp + ${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE} ${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV} diff --git a/waku/v2/noise/pairing.go b/waku/v2/noise/pairing.go index 7048586d..777408eb 100644 --- a/waku/v2/noise/pairing.go +++ b/waku/v2/noise/pairing.go @@ -1,11 +1,12 @@ package noise import ( + "bytes" + "context" "crypto/ed25519" "crypto/rand" "errors" "sync" - "time" "github.com/waku-org/go-waku/waku/v2/protocol/pb" n "github.com/waku-org/noise" @@ -14,41 +15,31 @@ import ( var ErrPairingTimeout = errors.New("pairing has timed out") - type Sender interface { - Publish(msg *pb.WakuMessage) <-chan struct{} + Publish(ctx context.Context, contentTopic string, payload PayloadV2) error } -type Receiver { - subscribe(decoder: Decoder): Promise; - - // next message should return messages received in a content topic - // messages should be kept in a queue, meaning that nextMessage - // will call pop in the queue to remove the oldest message received - // (it's important to maintain order of received messages) - nextMessage(contentTopic: string): Promise; +type Receiver interface { + // Subscribe will return a channel to obtain next message received in a content topic + Subscribe(ctx context.Context, contentTopic string) <-chan *pb.WakuMessage } type Pairing struct { sync.RWMutex - - ContentTopic string - + ContentTopic string + msgCh <-chan *pb.WakuMessage randomFixLenVal []byte myCommittedStaticKey []byte + params PairingParameters + handshake *Handshake + authCode string + authCodeEmitted chan string + authCodeConfirmed chan bool + messenger NoiseMessenger + logger *zap.Logger - params PairingParameters - handshake *Handshake - - authCode string - authCodeEmitted chan string - authCodeConfirmed chan bool - - timeoutCh chan struct{} - - logger *zap.Logger - - started bool + started bool + completed bool } type PairingParameterOption func(*PairingParameters) error @@ -65,7 +56,8 @@ func WithInitiatorParameters(qrString string, qrMessageNametag MessageNametag) P return nil } } -func WithReceiverParameters(applicationName, applicationVersion, shardId string, myEphemeralPublicKey ed25519.PublicKey, qrMessageNameTag *MessageNametag) PairingParameterOption { + +func WithResponderParameters(applicationName, applicationVersion, shardId string, qrMessageNameTag *MessageNametag) PairingParameterOption { return func(params *PairingParameters) error { params.initiator = false if qrMessageNameTag == nil { @@ -77,20 +69,29 @@ func WithReceiverParameters(applicationName, applicationVersion, shardId string, params.qrMessageNametag = BytesToMessageNametag(b) } else { params.qrMessageNametag = *qrMessageNameTag - params.qr = NewQR(applicationName, applicationVersion, shardId, myEphemeralPublicKey, params.myCommitedStaticKey) } + params.qr = NewQR(applicationName, applicationVersion, shardId, params.ephemeralPublicKey, params.myCommitedStaticKey) return nil } } +const DefaultApplicationName = "waku-noise-sessions" +const DefaultApplicationVersion = "0.1" +const DefaultShardId = "10" + +func WithDefaultResponderParameters() PairingParameterOption { + return WithResponderParameters(DefaultApplicationName, DefaultApplicationVersion, DefaultShardId, nil) +} + type PairingParameters struct { myCommitedStaticKey []byte + ephemeralPublicKey ed25519.PublicKey initiator bool qr QR qrMessageNametag MessageNametag } -func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParameterOption, logger *zap.Logger) (*Pairing, error) { +func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParameterOption, messenger NoiseMessenger, logger *zap.Logger) (*Pairing, error) { b := make([]byte, 32) _, err := rand.Read(b) if err != nil { @@ -101,7 +102,11 @@ func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParamet var params PairingParameters params.myCommitedStaticKey = myCommitedStaticKey - opts(¶ms) + params.ephemeralPublicKey = myEphemeralKey.Public + err = opts(¶ms) + if err != nil { + return nil, err + } preMessagePKs := params.qr.ephemeralPublicKey hs, err := NewHandshake_WakuPairing_25519_ChaChaPoly_SHA256(myStaticKey, myEphemeralKey, params.initiator, params.qr.Bytes(), preMessagePKs) @@ -109,107 +114,102 @@ func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParamet return nil, err } + contentTopic := "/" + params.qr.applicationName + "/" + params.qr.applicationVersion + "/wakunoise/1/sessions_shard-" + params.qr.shardId + "/proto" + + // TODO: check if subscription is removed on stop + msgCh := messenger.Subscribe(context.Background(), contentTopic) + return &Pairing{ - ContentTopic: "/" + params.qr.applicationName + "/" + params.qr.applicationVersion + "/wakunoise/1/sessions_shard-" + params.qr.shardId + "/proto", - randomFixLenVal: b, // r or s depending if you're responder or initiator + ContentTopic: contentTopic, + msgCh: msgCh, + randomFixLenVal: b, // this represents r or s depending if you're responder or initiator myCommittedStaticKey: myCommitedStaticKey, authCodeEmitted: make(chan string, 1), authCodeConfirmed: make(chan bool, 1), params: params, handshake: hs, - logger: logger.Named("waku-pairing-1"), + messenger: messenger, + logger: logger.Named("waku-pairing1"), }, nil } -func (p *Pairing) PairingInfo() (string, MessageNametag) { +func (p *Pairing) PairingInfo() (qrString string, qrMessageNametag MessageNametag) { p.RLock() defer p.RUnlock() return p.params.qr.String(), p.params.qrMessageNametag } -func (p *Pairing) Execute(timeout time.Duration) error { +func (p *Pairing) Execute(ctx context.Context) error { p.RLock() if p.started { p.RUnlock() return errors.New("pairing already executed. Create new pairing object") } + defer p.messenger.Stop() + p.RUnlock() p.Lock() p.started = true - p.timeoutCh = make(chan struct{}, 1) p.Unlock() - t := time.NewTimer(timeout) - defer t.Stop() - var doneCh <-chan error if p.params.initiator { - doneCh = p.initiatorHandshake() + doneCh = p.initiatorHandshake(ctx, p.msgCh) } else { - doneCh = p.responderHandshake() + doneCh = p.responderHandshake(ctx, p.msgCh) } select { - case <-t.C: + case <-ctx.Done(): p.Lock() defer p.Unlock() - close(p.timeoutCh) return ErrPairingTimeout case err := <-doneCh: return err } } -func (p *Pairing) isAuthCodeConfirmed() (bool, error) { +func (p *Pairing) isAuthCodeConfirmed(ctx context.Context) (bool, error) { // wait for user to confirm or not, or for the whole pairing process to time out select { - case <-p.timeoutCh: + case <-ctx.Done(): return false, ErrPairingTimeout case confirmed := <-p.authCodeConfirmed: return confirmed, nil } } -/* -func executeReadStepWithNextMessage(contentTopic string, messageNametag MessageNameTag) error { - // TODO: create test unit for this function - let stopLoop = false; +func (p *Pairing) executeReadStepWithNextMessage(ctx context.Context, nextMsgChan <-chan *pb.WakuMessage, messageNametag MessageNametag) (*HandshakeStepResult, error) { + for { + select { + case <-ctx.Done(): + return nil, ErrPairingTimeout + case msg := <-nextMsgChan: + payload, err := DecodePayloadV2(msg) + if err != nil { + return nil, err + } - this.eventEmitter.once("pairingTimeout", () => { - stopLoop = true; - }); + step, err := p.handshake.Step(payload, nil, &messageNametag) + if err != nil { + if errors.Is(err, ErrNametagNotExpected) { + p.logger.Debug(err.Error()) + continue + } else { + return nil, err + } + } + return step, nil + } + } +} - this.eventEmitter.once("pairingComplete", () => { - stopLoop = true; - }); - - while (!stopLoop) { - try { - const hsMessage = await this.receiver.nextMessage(contentTopic); - const step = this.handshake.stepHandshake({ - readPayloadV2: hsMessage.payloadV2, - messageNametag, - }); - return step; - } catch (err) { - if (err instanceof MessageNametagError) { - console.debug("Unexpected message nametag", err.expectedNametag, err.actualNametag); - } else { - throw err; - } - } - } - - throw new Error("could not obtain next message"); - } -*/ - -func (p *Pairing) initiatorHandshake() (doneCh chan error) { +func (p *Pairing) initiatorHandshake(ctx context.Context, msgCh <-chan *pb.WakuMessage) (doneCh chan error) { doneCh = make(chan error, 1) go func() { - <-p.receiver.subscribe(p.ContentTopic) + defer close(doneCh) // The handshake initiator writes a Waku2 payload v2 containing the handshake message // and the (encrypted) transport message @@ -219,145 +219,236 @@ func (p *Pairing) initiatorHandshake() (doneCh chan error) { doneCh <- err return } + // We prepare a message from initiator's payload2 // At this point wakuMsg is sent over the Waku network to receiver content topic - <-p.sender.publish(p.ContentTopic, hsStep) + err = p.messenger.Publish(ctx, p.ContentTopic, hsStep.Payload2) + if err != nil { + doneCh <- err + return + } // We generate an authorization code using the handshake state // this check has to be confirmed with a user interaction, comparing auth codes in both ends - authCode, err := p.handshake.Authcode() if err != nil { doneCh <- err return } + p.Lock() + p.authCode = authCode + p.Unlock() + p.authCodeEmitted <- authCode p.logger.Info("waiting for authcode confirmation....") - confirmed, err := p.isAuthCodeConfirmed() + confirmed, err := p.isAuthCodeConfirmed(ctx) if err != nil { doneCh <- err return } if !confirmed { p.logger.Info("authcode not confirmed") + doneCh <- errors.New("authcode not confirmed") + return } - close(doneCh) - /* - // 2nd step - // <- sB, eAsB {r} - hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.handshake.hs.toMessageNametag()); + // 2nd step + // <- sB, eAsB {r} + hsMessageNametag, err := p.handshake.ToMessageNametag() + if err != nil { + doneCh <- err + return + } - if (!this.handshake.hs.rs) throw new Error("invalid handshake state"); + hsStep, err = p.executeReadStepWithNextMessage(ctx, msgCh, hsMessageNametag) + if err != nil { + doneCh <- err + return + } - // Initiator further checks if receiver's commitment opens to receiver's static key received - const expectedReceiverCommittedStaticKey = commitPublicKey(this.handshake.hs.rs, hsStep.transportMessage); - if (!uint8ArrayEquals(expectedReceiverCommittedStaticKey, this.qr.committedStaticKey)) { - throw new Error("expected committed static key does not match the receiver actual committed static key"); - } + // Initiator further checks if receiver's commitment opens to receiver's static key received + expectedReceiverCommittedStaticKey := CommitPublicKey(p.handshake.RS(), hsStep.TransportMessage) + if !bytes.Equal(expectedReceiverCommittedStaticKey, p.params.qr.committedStaticKey) { + doneCh <- errors.New("expected committed static key does not match the receiver actual committed static key") + return + } - // 3rd step - // -> sA, sAeB, sAsB {s} - // Similarly as in first step, the initiator writes a Waku2 payload containing the handshake message and the (encrypted) transport message - hsStep = this.handshake.stepHandshake({ - transportMessage: this.randomFixLenVal, - messageNametag: this.handshake.hs.toMessageNametag(), - }); + // 3rd step + // -> sA, sAeB, sAsB {s} + // Similarly as in first step, the initiator writes a Waku2 payload containing the handshake message and the (encrypted) transport message + hsMessageNametag, err = p.handshake.ToMessageNametag() + if err != nil { + doneCh <- err + return + } - encoder = new NoiseHandshakeEncoder(this.contentTopic, hsStep); - await this.sender.publish(encoder, {}); + hsStep, err = p.handshake.Step(nil, p.randomFixLenVal, &hsMessageNametag) + if err != nil { + doneCh <- err + return + } - // Secure Transfer Phase - this.handshakeResult = this.handshake.finalizeHandshake(); + err = p.messenger.Publish(ctx, p.ContentTopic, hsStep.Payload2) + if err != nil { + doneCh <- err + return + } - this.eventEmitter.emit("pairingComplete"); + // Secure Transfer Phase + if !p.handshake.HandshakeComplete() { + doneCh <- errors.New("handshake is in undefined state") + return + } - return WakuPairing.getSecureCodec(this.contentTopic, this.handshakeResult);*/ + p.Lock() + p.completed = true + p.Unlock() }() return doneCh } -func (p *Pairing) responderHandshake() (doneCh chan error) { +func (p *Pairing) responderHandshake(ctx context.Context, msgCh <-chan *pb.WakuMessage) (doneCh chan error) { doneCh = make(chan error, 1) func() { - <-p.receiver.subscribe(p.ContentTopic) + defer close(doneCh) - close(doneCh) + // the received reads the initiator's payloads, and returns the (decrypted) transport message the initiator sent + // Note that the received verifies if the received payloadV2 has the expected messageNametag set + hsStep, err := p.executeReadStepWithNextMessage(ctx, msgCh, p.params.qrMessageNametag) + if err != nil { + doneCh <- err + return + } - /* - // the received reads the initiator's payloads, and returns the (decrypted) transport message the initiator sent - // Note that the received verifies if the received payloadV2 has the expected messageNametag set - let hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.qrMessageNameTag); + initiatorCommittedStaticKey := hsStep.TransportMessage - const initiatorCommittedStaticKey = new Uint8Array(hsStep.transportMessage); + // We generate an authorization code using the handshake state + // this check has to be confirmed with a user interaction, comparing auth codes in both ends + authCode, err := p.handshake.Authcode() + if err != nil { + doneCh <- err + return + } - const confirmationPromise = this.isAuthCodeConfirmed(); - await delay(100); - this.eventEmitter.emit("authCodeGenerated", this.handshake.genAuthcode()); - console.log("Waiting for authcode confirmation..."); - const confirmed = await confirmationPromise; - if (!confirmed) { - throw new Error("authcode is not confirmed"); - } + p.Lock() + p.authCode = authCode + p.Unlock() + p.authCodeEmitted <- authCode - /* - // 2nd step - // <- sB, eAsB {r} - // Receiver writes and returns a payload - hsStep = this.handshake.stepHandshake({ - transportMessage: this.randomFixLenVal, - messageNametag: this.handshake.hs.toMessageNametag(), - }); + p.logger.Info("waiting for authcode confirmation....") - // We prepare a Waku message from receiver's payload2 - const encoder = new NoiseHandshakeEncoder(this.contentTopic, hsStep); - await this.sender.publish(encoder, {}); + confirmed, err := p.isAuthCodeConfirmed(ctx) + if err != nil { + doneCh <- err + return + } + if !confirmed { + p.logger.Info("authcode not confirmed") + doneCh <- errors.New("authcode not confirmed") + return + } - // 3rd step - // -> sA, sAeB, sAsB {s} + // 2nd step + // <- sB, eAsB {r} + // Receiver writes and returns a payload + hsMessageNametag, err := p.handshake.ToMessageNametag() + if err != nil { + doneCh <- err + return + } - // The receiver reads the initiator's payload sent by the initiator - hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.handshake.hs.toMessageNametag()); + hsStep, err = p.handshake.Step(nil, p.randomFixLenVal, &hsMessageNametag) + if err != nil { + doneCh <- err + return + } - if (!this.handshake.hs.rs) throw new Error("invalid handshake state"); + // We prepare a Waku message from receiver's payload2 + err = p.messenger.Publish(ctx, p.ContentTopic, hsStep.Payload2) + if err != nil { + doneCh <- err + return + } - // The receiver further checks if the initiator's commitment opens to the initiator's static key received - const expectedInitiatorCommittedStaticKey = commitPublicKey(this.handshake.hs.rs, hsStep.transportMessage); - if (!uint8ArrayEquals(expectedInitiatorCommittedStaticKey, initiatorCommittedStaticKey)) { - throw new Error("expected committed static key does not match the initiator actual committed static key"); - } + // 3rd step + // -> sA, sAeB, sAsB {s} + hsMessageNametag, err = p.handshake.ToMessageNametag() + if err != nil { + doneCh <- err + return + } - // Secure Transfer Phase - this.handshakeResult = this.handshake.finalizeHandshake(); + // The receiver reads the initiator's payload sent by the initiator + hsStep, err = p.executeReadStepWithNextMessage(ctx, msgCh, hsMessageNametag) + if err != nil { + doneCh <- err + return + } - this.eventEmitter.emit("pairingComplete"); + // The receiver further checks if the initiator's commitment opens to the initiator's static key received + expectedInitiatorCommittedStaticKey := CommitPublicKey(p.handshake.RS(), hsStep.TransportMessage) + if !bytes.Equal(expectedInitiatorCommittedStaticKey, initiatorCommittedStaticKey) { + doneCh <- errors.New("expected committed static key does not match the initiator actual committed static key") + return + } - return WakuPairing.getSecureCodec(this.contentTopic, this.handshakeResult);*/ + // Secure Transfer Phase + if !p.handshake.HandshakeComplete() { + doneCh <- errors.New("handshake is in undefined state") + return + } + + p.Lock() + p.completed = true + p.Unlock() }() return doneCh } +func (p *Pairing) HandshakeComplete() bool { + p.RLock() + defer p.RUnlock() + return p.completed +} + +// Returns a WakuMessage with version 2 and encrypted payload +func (p *Pairing) Encrypt(plaintext []byte) (*pb.WakuMessage, error) { + payload, err := p.handshake.Encrypt(plaintext) + if err != nil { + return nil, err + } + + return EncodePayloadV2(payload) +} + +func (p *Pairing) Decrypt(msg *pb.WakuMessage) ([]byte, error) { + payload, err := DecodePayloadV2(msg) + if err != nil { + return nil, err + } + return p.handshake.Decrypt(payload) +} + func (p *Pairing) ConfirmAuthCode(confirmed bool) error { - p.Lock() + p.RLock() authcode := p.authCode - p.Unlock() + p.RUnlock() if authcode != "" { p.authCodeConfirmed <- confirmed - return nil } return errors.New("authcode has not been generated yet") } -func (p *Pairing) AuthCode() chan<- string { +func (p *Pairing) AuthCode() <-chan string { ch := make(chan string, 1) p.Lock() diff --git a/waku/v2/noise/pairing_relay_messenger.go b/waku/v2/noise/pairing_relay_messenger.go new file mode 100644 index 00000000..0f49a1d5 --- /dev/null +++ b/waku/v2/noise/pairing_relay_messenger.go @@ -0,0 +1,135 @@ +package noise + +import ( + "context" + + v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" +) + +type NoiseMessenger interface { + Sender + Receiver + Stop() +} + +type contentTopicSubscription struct { + envChan chan *protocol.Envelope + msgChan chan *pb.WakuMessage +} + +type NoiseWakuRelay struct { + NoiseMessenger + relay *relay.WakuRelay + relaySub *relay.Subscription + broadcaster v2.Broadcaster + cancel context.CancelFunc + timesource timesource.Timesource + pubsubTopic string + subscriptionChPerContentTopic map[string][]contentTopicSubscription +} + +func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic *string, timesource timesource.Timesource) (*NoiseWakuRelay, error) { + var topic string + if pubsubTopic != nil { + topic = *pubsubTopic + } else { + topic = relay.DefaultWakuTopic + } + + subs, err := r.SubscribeToTopic(ctx, topic) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + + wr := &NoiseWakuRelay{ + relay: r, + relaySub: subs, + cancel: cancel, + timesource: timesource, + broadcaster: v2.NewBroadcaster(1024), + pubsubTopic: topic, + subscriptionChPerContentTopic: make(map[string][]contentTopicSubscription), + } + + go func() { + for { + select { + case <-ctx.Done(): + subs.Unsubscribe() + wr.broadcaster.Close() + return + case envelope := <-subs.C: + if envelope != nil { + wr.broadcaster.Submit(envelope) + } + } + } + }() + + return wr, nil +} + +func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-chan *pb.WakuMessage { + sub := contentTopicSubscription{ + envChan: make(chan *protocol.Envelope, 1024), + msgChan: make(chan *pb.WakuMessage, 1024), + } + + r.broadcaster.Register(&r.pubsubTopic, sub.envChan) + + subscriptionCh := r.subscriptionChPerContentTopic[contentTopic] + subscriptionCh = append(subscriptionCh, sub) + r.subscriptionChPerContentTopic[contentTopic] = subscriptionCh + + go func() { + for { + select { + case <-ctx.Done(): + return + case env := <-sub.envChan: + if env == nil { + return + } + + if env.Message().ContentTopic != contentTopic || env.Message().Version != 2 { + continue + } + + // TODO: Might make sense to create a ring buffer here, to drop messages if queue fills up + sub.msgChan <- env.Message() + } + } + }() + + return sub.msgChan +} + +func (r *NoiseWakuRelay) Publish(ctx context.Context, contentTopic string, payload PayloadV2) error { + + message, err := EncodePayloadV2(&payload) + if err != nil { + return err + } + + message.ContentTopic = contentTopic + message.Timestamp = r.timesource.Now().UnixNano() + + _, err = r.relay.PublishToTopic(ctx, message, r.pubsubTopic) + return err +} + +func (r *NoiseWakuRelay) Stop() { + r.cancel() + for _, contentTopicSubscriptions := range r.subscriptionChPerContentTopic { + for _, c := range contentTopicSubscriptions { + close(c.envChan) + close(c.msgChan) + } + } +} diff --git a/waku/v2/noise/pairing_test.go b/waku/v2/noise/pairing_test.go new file mode 100644 index 00000000..8f962698 --- /dev/null +++ b/waku/v2/noise/pairing_test.go @@ -0,0 +1,208 @@ +package noise + +import ( + "bytes" + "context" + "crypto/rand" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" + n "github.com/waku-org/noise" +) + +func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(1024), 0, timesource.NewDefaultClock(), utils.Logger()) + require.NoError(t, err) + + return host, relay +} + +func TestPairingObj1Success(t *testing.T) { + host1, relay1 := createRelayNode(t) + host2, relay2 := createRelayNode(t) + + defer host1.Close() + defer host2.Close() + defer relay1.Stop() + defer relay2.Stop() + + host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err := host1.Peerstore().AddProtocols(host2.ID(), string(relay.WakuRelayID_v200)) + require.NoError(t, err) + _, err = host1.Network().DialPeer(context.Background(), host2.ID()) + require.NoError(t, err) + + time.Sleep(2 * time.Second) // Wait for relay to form mesh + + bobStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + bobEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + + bobMessenger, err := NewWakuRelayMessenger(context.Background(), relay1, nil, timesource.NewDefaultClock()) + require.NoError(t, err) + + bobPairingObj, err := NewPairing(bobStaticKey, bobEphemeralKey, WithDefaultResponderParameters(), bobMessenger, utils.Logger()) + require.NoError(t, err) + + authCodeCheckCh := make(chan string, 2) + + time.Sleep(1 * time.Second) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + // Check that authcodes match + authcode1 := <-authCodeCheckCh + authcode2 := <-authCodeCheckCh + require.Equal(t, authcode1, authcode2) + }() + + // Execute in separate go routine + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err := bobPairingObj.Execute(ctx) + require.NoError(t, err) + }() + + // Confirmation is done by manually + go func() { + authCode := <-bobPairingObj.AuthCode() + authCodeCheckCh <- authCode + err := bobPairingObj.ConfirmAuthCode(true) + require.NoError(t, err) + }() + + aliceStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + aliceEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + + aliceMessenger, err := NewWakuRelayMessenger(context.Background(), relay2, nil, timesource.NewDefaultClock()) + require.NoError(t, err) + + qrString, qrMessageNameTag := bobPairingObj.PairingInfo() + alicePairingObj, err := NewPairing(aliceStaticKey, aliceEphemeralKey, WithInitiatorParameters(qrString, qrMessageNameTag), aliceMessenger, utils.Logger()) + require.NoError(t, err) + + // Execute in separate go routine + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err := alicePairingObj.Execute(ctx) + require.NoError(t, err) + }() + + // Alice waits for authcode and confirms it + wg.Add(1) + go func() { + defer wg.Done() + authCode := <-alicePairingObj.AuthCode() + authCodeCheckCh <- authCode + err := alicePairingObj.ConfirmAuthCode(true) + require.NoError(t, err) + }() + + wg.Wait() + + // We test read/write of random messages exchanged between Alice and Bob + // Note that we exchange more than the number of messages contained in the nametag buffer to test if they are filled correctly as the communication proceeds + // We assume messages are sent via one of waku protocols + for i := 0; i < 10*MessageNametagBufferSize; i++ { + // Alice writes to Bob + message := generateRandomBytes(t, 32) + msg, err := alicePairingObj.Encrypt(message) + require.NoError(t, err) + + readMessage, err := bobPairingObj.Decrypt(msg) + require.NoError(t, err) + require.True(t, bytes.Equal(message, readMessage)) + + // Bob writes to Alice + message = generateRandomBytes(t, 32) + msg, err = alicePairingObj.Encrypt(message) + require.NoError(t, err) + + readMessage, err = bobPairingObj.Decrypt(msg) + require.NoError(t, err) + require.True(t, bytes.Equal(message, readMessage)) + } + +} + +func TestPairingObj1ShouldTimeout(t *testing.T) { + host1, relay1 := createRelayNode(t) + host2, relay2 := createRelayNode(t) + + defer host1.Close() + defer host2.Close() + defer relay1.Stop() + defer relay2.Stop() + + host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err := host1.Peerstore().AddProtocols(host2.ID(), string(relay.WakuRelayID_v200)) + require.NoError(t, err) + _, err = host1.Network().DialPeer(context.Background(), host2.ID()) + require.NoError(t, err) + + time.Sleep(2 * time.Second) // Wait for relay to form mesh + + bobStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + bobEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + + bobMessenger, err := NewWakuRelayMessenger(context.Background(), relay1, nil, timesource.NewDefaultClock()) + require.NoError(t, err) + + bobPairingObj, err := NewPairing(bobStaticKey, bobEphemeralKey, WithDefaultResponderParameters(), bobMessenger, utils.Logger()) + require.NoError(t, err) + + aliceStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + aliceEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader) + + aliceMessenger, err := NewWakuRelayMessenger(context.Background(), relay2, nil, timesource.NewDefaultClock()) + require.NoError(t, err) + + qrString, qrMessageNameTag := bobPairingObj.PairingInfo() + alicePairingObj, err := NewPairing(aliceStaticKey, aliceEphemeralKey, WithInitiatorParameters(qrString, qrMessageNameTag), aliceMessenger, utils.Logger()) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + wg.Add(2) + + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := bobPairingObj.Execute(ctx) + require.ErrorIs(t, err, ErrPairingTimeout) + }() + + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := alicePairingObj.Execute(ctx) + require.ErrorIs(t, err, ErrPairingTimeout) + }() + + wg.Wait() +}