mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-02 14:03:06 +00:00
feat(noise): finish pairing impl and test
This commit is contained in:
parent
5511e55227
commit
4df6aa98bc
2
Makefile
2
Makefile
@ -72,7 +72,7 @@ lint:
|
||||
@golangci-lint --exclude=SA1019 run ./... --deadline=5m
|
||||
|
||||
test:
|
||||
${GOBIN} test ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp
|
||||
${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp
|
||||
cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE}
|
||||
${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV}
|
||||
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
package noise
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
n "github.com/waku-org/noise"
|
||||
@ -14,41 +15,31 @@ import (
|
||||
|
||||
var ErrPairingTimeout = errors.New("pairing has timed out")
|
||||
|
||||
|
||||
type Sender interface {
|
||||
Publish(msg *pb.WakuMessage) <-chan struct{}
|
||||
Publish(ctx context.Context, contentTopic string, payload PayloadV2) error
|
||||
}
|
||||
|
||||
type Receiver {
|
||||
subscribe(decoder: Decoder<NoiseHandshakeMessage>): Promise<void>;
|
||||
|
||||
// next message should return messages received in a content topic
|
||||
// messages should be kept in a queue, meaning that nextMessage
|
||||
// will call pop in the queue to remove the oldest message received
|
||||
// (it's important to maintain order of received messages)
|
||||
nextMessage(contentTopic: string): Promise<NoiseHandshakeMessage>;
|
||||
type Receiver interface {
|
||||
// Subscribe will return a channel to obtain next message received in a content topic
|
||||
Subscribe(ctx context.Context, contentTopic string) <-chan *pb.WakuMessage
|
||||
}
|
||||
|
||||
type Pairing struct {
|
||||
sync.RWMutex
|
||||
|
||||
ContentTopic string
|
||||
|
||||
ContentTopic string
|
||||
msgCh <-chan *pb.WakuMessage
|
||||
randomFixLenVal []byte
|
||||
myCommittedStaticKey []byte
|
||||
params PairingParameters
|
||||
handshake *Handshake
|
||||
authCode string
|
||||
authCodeEmitted chan string
|
||||
authCodeConfirmed chan bool
|
||||
messenger NoiseMessenger
|
||||
logger *zap.Logger
|
||||
|
||||
params PairingParameters
|
||||
handshake *Handshake
|
||||
|
||||
authCode string
|
||||
authCodeEmitted chan string
|
||||
authCodeConfirmed chan bool
|
||||
|
||||
timeoutCh chan struct{}
|
||||
|
||||
logger *zap.Logger
|
||||
|
||||
started bool
|
||||
started bool
|
||||
completed bool
|
||||
}
|
||||
|
||||
type PairingParameterOption func(*PairingParameters) error
|
||||
@ -65,7 +56,8 @@ func WithInitiatorParameters(qrString string, qrMessageNametag MessageNametag) P
|
||||
return nil
|
||||
}
|
||||
}
|
||||
func WithReceiverParameters(applicationName, applicationVersion, shardId string, myEphemeralPublicKey ed25519.PublicKey, qrMessageNameTag *MessageNametag) PairingParameterOption {
|
||||
|
||||
func WithResponderParameters(applicationName, applicationVersion, shardId string, qrMessageNameTag *MessageNametag) PairingParameterOption {
|
||||
return func(params *PairingParameters) error {
|
||||
params.initiator = false
|
||||
if qrMessageNameTag == nil {
|
||||
@ -77,20 +69,29 @@ func WithReceiverParameters(applicationName, applicationVersion, shardId string,
|
||||
params.qrMessageNametag = BytesToMessageNametag(b)
|
||||
} else {
|
||||
params.qrMessageNametag = *qrMessageNameTag
|
||||
params.qr = NewQR(applicationName, applicationVersion, shardId, myEphemeralPublicKey, params.myCommitedStaticKey)
|
||||
}
|
||||
params.qr = NewQR(applicationName, applicationVersion, shardId, params.ephemeralPublicKey, params.myCommitedStaticKey)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
const DefaultApplicationName = "waku-noise-sessions"
|
||||
const DefaultApplicationVersion = "0.1"
|
||||
const DefaultShardId = "10"
|
||||
|
||||
func WithDefaultResponderParameters() PairingParameterOption {
|
||||
return WithResponderParameters(DefaultApplicationName, DefaultApplicationVersion, DefaultShardId, nil)
|
||||
}
|
||||
|
||||
type PairingParameters struct {
|
||||
myCommitedStaticKey []byte
|
||||
ephemeralPublicKey ed25519.PublicKey
|
||||
initiator bool
|
||||
qr QR
|
||||
qrMessageNametag MessageNametag
|
||||
}
|
||||
|
||||
func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParameterOption, logger *zap.Logger) (*Pairing, error) {
|
||||
func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParameterOption, messenger NoiseMessenger, logger *zap.Logger) (*Pairing, error) {
|
||||
b := make([]byte, 32)
|
||||
_, err := rand.Read(b)
|
||||
if err != nil {
|
||||
@ -101,7 +102,11 @@ func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParamet
|
||||
|
||||
var params PairingParameters
|
||||
params.myCommitedStaticKey = myCommitedStaticKey
|
||||
opts(¶ms)
|
||||
params.ephemeralPublicKey = myEphemeralKey.Public
|
||||
err = opts(¶ms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
preMessagePKs := params.qr.ephemeralPublicKey
|
||||
hs, err := NewHandshake_WakuPairing_25519_ChaChaPoly_SHA256(myStaticKey, myEphemeralKey, params.initiator, params.qr.Bytes(), preMessagePKs)
|
||||
@ -109,107 +114,102 @@ func NewPairing(myStaticKey n.DHKey, myEphemeralKey n.DHKey, opts PairingParamet
|
||||
return nil, err
|
||||
}
|
||||
|
||||
contentTopic := "/" + params.qr.applicationName + "/" + params.qr.applicationVersion + "/wakunoise/1/sessions_shard-" + params.qr.shardId + "/proto"
|
||||
|
||||
// TODO: check if subscription is removed on stop
|
||||
msgCh := messenger.Subscribe(context.Background(), contentTopic)
|
||||
|
||||
return &Pairing{
|
||||
ContentTopic: "/" + params.qr.applicationName + "/" + params.qr.applicationVersion + "/wakunoise/1/sessions_shard-" + params.qr.shardId + "/proto",
|
||||
randomFixLenVal: b, // r or s depending if you're responder or initiator
|
||||
ContentTopic: contentTopic,
|
||||
msgCh: msgCh,
|
||||
randomFixLenVal: b, // this represents r or s depending if you're responder or initiator
|
||||
myCommittedStaticKey: myCommitedStaticKey,
|
||||
authCodeEmitted: make(chan string, 1),
|
||||
authCodeConfirmed: make(chan bool, 1),
|
||||
params: params,
|
||||
handshake: hs,
|
||||
logger: logger.Named("waku-pairing-1"),
|
||||
messenger: messenger,
|
||||
logger: logger.Named("waku-pairing1"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *Pairing) PairingInfo() (string, MessageNametag) {
|
||||
func (p *Pairing) PairingInfo() (qrString string, qrMessageNametag MessageNametag) {
|
||||
p.RLock()
|
||||
defer p.RUnlock()
|
||||
return p.params.qr.String(), p.params.qrMessageNametag
|
||||
}
|
||||
|
||||
func (p *Pairing) Execute(timeout time.Duration) error {
|
||||
func (p *Pairing) Execute(ctx context.Context) error {
|
||||
p.RLock()
|
||||
if p.started {
|
||||
p.RUnlock()
|
||||
return errors.New("pairing already executed. Create new pairing object")
|
||||
}
|
||||
|
||||
defer p.messenger.Stop()
|
||||
|
||||
p.RUnlock()
|
||||
p.Lock()
|
||||
p.started = true
|
||||
p.timeoutCh = make(chan struct{}, 1)
|
||||
p.Unlock()
|
||||
|
||||
t := time.NewTimer(timeout)
|
||||
defer t.Stop()
|
||||
|
||||
var doneCh <-chan error
|
||||
if p.params.initiator {
|
||||
doneCh = p.initiatorHandshake()
|
||||
doneCh = p.initiatorHandshake(ctx, p.msgCh)
|
||||
} else {
|
||||
doneCh = p.responderHandshake()
|
||||
doneCh = p.responderHandshake(ctx, p.msgCh)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-ctx.Done():
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
close(p.timeoutCh)
|
||||
return ErrPairingTimeout
|
||||
case err := <-doneCh:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pairing) isAuthCodeConfirmed() (bool, error) {
|
||||
func (p *Pairing) isAuthCodeConfirmed(ctx context.Context) (bool, error) {
|
||||
// wait for user to confirm or not, or for the whole pairing process to time out
|
||||
select {
|
||||
case <-p.timeoutCh:
|
||||
case <-ctx.Done():
|
||||
return false, ErrPairingTimeout
|
||||
case confirmed := <-p.authCodeConfirmed:
|
||||
return confirmed, nil
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func executeReadStepWithNextMessage(contentTopic string, messageNametag MessageNameTag) error {
|
||||
// TODO: create test unit for this function
|
||||
let stopLoop = false;
|
||||
func (p *Pairing) executeReadStepWithNextMessage(ctx context.Context, nextMsgChan <-chan *pb.WakuMessage, messageNametag MessageNametag) (*HandshakeStepResult, error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ErrPairingTimeout
|
||||
case msg := <-nextMsgChan:
|
||||
payload, err := DecodePayloadV2(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
this.eventEmitter.once("pairingTimeout", () => {
|
||||
stopLoop = true;
|
||||
});
|
||||
step, err := p.handshake.Step(payload, nil, &messageNametag)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNametagNotExpected) {
|
||||
p.logger.Debug(err.Error())
|
||||
continue
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return step, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.eventEmitter.once("pairingComplete", () => {
|
||||
stopLoop = true;
|
||||
});
|
||||
|
||||
while (!stopLoop) {
|
||||
try {
|
||||
const hsMessage = await this.receiver.nextMessage(contentTopic);
|
||||
const step = this.handshake.stepHandshake({
|
||||
readPayloadV2: hsMessage.payloadV2,
|
||||
messageNametag,
|
||||
});
|
||||
return step;
|
||||
} catch (err) {
|
||||
if (err instanceof MessageNametagError) {
|
||||
console.debug("Unexpected message nametag", err.expectedNametag, err.actualNametag);
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error("could not obtain next message");
|
||||
}
|
||||
*/
|
||||
|
||||
func (p *Pairing) initiatorHandshake() (doneCh chan error) {
|
||||
func (p *Pairing) initiatorHandshake(ctx context.Context, msgCh <-chan *pb.WakuMessage) (doneCh chan error) {
|
||||
doneCh = make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
<-p.receiver.subscribe(p.ContentTopic)
|
||||
defer close(doneCh)
|
||||
|
||||
// The handshake initiator writes a Waku2 payload v2 containing the handshake message
|
||||
// and the (encrypted) transport message
|
||||
@ -219,145 +219,236 @@ func (p *Pairing) initiatorHandshake() (doneCh chan error) {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// We prepare a message from initiator's payload2
|
||||
// At this point wakuMsg is sent over the Waku network to receiver content topic
|
||||
<-p.sender.publish(p.ContentTopic, hsStep)
|
||||
err = p.messenger.Publish(ctx, p.ContentTopic, hsStep.Payload2)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// We generate an authorization code using the handshake state
|
||||
// this check has to be confirmed with a user interaction, comparing auth codes in both ends
|
||||
|
||||
authCode, err := p.handshake.Authcode()
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
p.Lock()
|
||||
p.authCode = authCode
|
||||
p.Unlock()
|
||||
|
||||
p.authCodeEmitted <- authCode
|
||||
|
||||
p.logger.Info("waiting for authcode confirmation....")
|
||||
|
||||
confirmed, err := p.isAuthCodeConfirmed()
|
||||
confirmed, err := p.isAuthCodeConfirmed(ctx)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
if !confirmed {
|
||||
p.logger.Info("authcode not confirmed")
|
||||
doneCh <- errors.New("authcode not confirmed")
|
||||
return
|
||||
}
|
||||
|
||||
close(doneCh)
|
||||
/*
|
||||
// 2nd step
|
||||
// <- sB, eAsB {r}
|
||||
hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.handshake.hs.toMessageNametag());
|
||||
// 2nd step
|
||||
// <- sB, eAsB {r}
|
||||
hsMessageNametag, err := p.handshake.ToMessageNametag()
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if (!this.handshake.hs.rs) throw new Error("invalid handshake state");
|
||||
hsStep, err = p.executeReadStepWithNextMessage(ctx, msgCh, hsMessageNametag)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Initiator further checks if receiver's commitment opens to receiver's static key received
|
||||
const expectedReceiverCommittedStaticKey = commitPublicKey(this.handshake.hs.rs, hsStep.transportMessage);
|
||||
if (!uint8ArrayEquals(expectedReceiverCommittedStaticKey, this.qr.committedStaticKey)) {
|
||||
throw new Error("expected committed static key does not match the receiver actual committed static key");
|
||||
}
|
||||
// Initiator further checks if receiver's commitment opens to receiver's static key received
|
||||
expectedReceiverCommittedStaticKey := CommitPublicKey(p.handshake.RS(), hsStep.TransportMessage)
|
||||
if !bytes.Equal(expectedReceiverCommittedStaticKey, p.params.qr.committedStaticKey) {
|
||||
doneCh <- errors.New("expected committed static key does not match the receiver actual committed static key")
|
||||
return
|
||||
}
|
||||
|
||||
// 3rd step
|
||||
// -> sA, sAeB, sAsB {s}
|
||||
// Similarly as in first step, the initiator writes a Waku2 payload containing the handshake message and the (encrypted) transport message
|
||||
hsStep = this.handshake.stepHandshake({
|
||||
transportMessage: this.randomFixLenVal,
|
||||
messageNametag: this.handshake.hs.toMessageNametag(),
|
||||
});
|
||||
// 3rd step
|
||||
// -> sA, sAeB, sAsB {s}
|
||||
// Similarly as in first step, the initiator writes a Waku2 payload containing the handshake message and the (encrypted) transport message
|
||||
hsMessageNametag, err = p.handshake.ToMessageNametag()
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
encoder = new NoiseHandshakeEncoder(this.contentTopic, hsStep);
|
||||
await this.sender.publish(encoder, {});
|
||||
hsStep, err = p.handshake.Step(nil, p.randomFixLenVal, &hsMessageNametag)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Secure Transfer Phase
|
||||
this.handshakeResult = this.handshake.finalizeHandshake();
|
||||
err = p.messenger.Publish(ctx, p.ContentTopic, hsStep.Payload2)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
this.eventEmitter.emit("pairingComplete");
|
||||
// Secure Transfer Phase
|
||||
if !p.handshake.HandshakeComplete() {
|
||||
doneCh <- errors.New("handshake is in undefined state")
|
||||
return
|
||||
}
|
||||
|
||||
return WakuPairing.getSecureCodec(this.contentTopic, this.handshakeResult);*/
|
||||
p.Lock()
|
||||
p.completed = true
|
||||
p.Unlock()
|
||||
}()
|
||||
|
||||
return doneCh
|
||||
}
|
||||
|
||||
func (p *Pairing) responderHandshake() (doneCh chan error) {
|
||||
func (p *Pairing) responderHandshake(ctx context.Context, msgCh <-chan *pb.WakuMessage) (doneCh chan error) {
|
||||
doneCh = make(chan error, 1)
|
||||
|
||||
func() {
|
||||
<-p.receiver.subscribe(p.ContentTopic)
|
||||
defer close(doneCh)
|
||||
|
||||
close(doneCh)
|
||||
// the received reads the initiator's payloads, and returns the (decrypted) transport message the initiator sent
|
||||
// Note that the received verifies if the received payloadV2 has the expected messageNametag set
|
||||
hsStep, err := p.executeReadStepWithNextMessage(ctx, msgCh, p.params.qrMessageNametag)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
// the received reads the initiator's payloads, and returns the (decrypted) transport message the initiator sent
|
||||
// Note that the received verifies if the received payloadV2 has the expected messageNametag set
|
||||
let hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.qrMessageNameTag);
|
||||
initiatorCommittedStaticKey := hsStep.TransportMessage
|
||||
|
||||
const initiatorCommittedStaticKey = new Uint8Array(hsStep.transportMessage);
|
||||
// We generate an authorization code using the handshake state
|
||||
// this check has to be confirmed with a user interaction, comparing auth codes in both ends
|
||||
authCode, err := p.handshake.Authcode()
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
const confirmationPromise = this.isAuthCodeConfirmed();
|
||||
await delay(100);
|
||||
this.eventEmitter.emit("authCodeGenerated", this.handshake.genAuthcode());
|
||||
console.log("Waiting for authcode confirmation...");
|
||||
const confirmed = await confirmationPromise;
|
||||
if (!confirmed) {
|
||||
throw new Error("authcode is not confirmed");
|
||||
}
|
||||
p.Lock()
|
||||
p.authCode = authCode
|
||||
p.Unlock()
|
||||
|
||||
p.authCodeEmitted <- authCode
|
||||
|
||||
/*
|
||||
// 2nd step
|
||||
// <- sB, eAsB {r}
|
||||
// Receiver writes and returns a payload
|
||||
hsStep = this.handshake.stepHandshake({
|
||||
transportMessage: this.randomFixLenVal,
|
||||
messageNametag: this.handshake.hs.toMessageNametag(),
|
||||
});
|
||||
p.logger.Info("waiting for authcode confirmation....")
|
||||
|
||||
// We prepare a Waku message from receiver's payload2
|
||||
const encoder = new NoiseHandshakeEncoder(this.contentTopic, hsStep);
|
||||
await this.sender.publish(encoder, {});
|
||||
confirmed, err := p.isAuthCodeConfirmed(ctx)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
if !confirmed {
|
||||
p.logger.Info("authcode not confirmed")
|
||||
doneCh <- errors.New("authcode not confirmed")
|
||||
return
|
||||
}
|
||||
|
||||
// 3rd step
|
||||
// -> sA, sAeB, sAsB {s}
|
||||
// 2nd step
|
||||
// <- sB, eAsB {r}
|
||||
// Receiver writes and returns a payload
|
||||
hsMessageNametag, err := p.handshake.ToMessageNametag()
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// The receiver reads the initiator's payload sent by the initiator
|
||||
hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.handshake.hs.toMessageNametag());
|
||||
hsStep, err = p.handshake.Step(nil, p.randomFixLenVal, &hsMessageNametag)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if (!this.handshake.hs.rs) throw new Error("invalid handshake state");
|
||||
// We prepare a Waku message from receiver's payload2
|
||||
err = p.messenger.Publish(ctx, p.ContentTopic, hsStep.Payload2)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// The receiver further checks if the initiator's commitment opens to the initiator's static key received
|
||||
const expectedInitiatorCommittedStaticKey = commitPublicKey(this.handshake.hs.rs, hsStep.transportMessage);
|
||||
if (!uint8ArrayEquals(expectedInitiatorCommittedStaticKey, initiatorCommittedStaticKey)) {
|
||||
throw new Error("expected committed static key does not match the initiator actual committed static key");
|
||||
}
|
||||
// 3rd step
|
||||
// -> sA, sAeB, sAsB {s}
|
||||
hsMessageNametag, err = p.handshake.ToMessageNametag()
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Secure Transfer Phase
|
||||
this.handshakeResult = this.handshake.finalizeHandshake();
|
||||
// The receiver reads the initiator's payload sent by the initiator
|
||||
hsStep, err = p.executeReadStepWithNextMessage(ctx, msgCh, hsMessageNametag)
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
this.eventEmitter.emit("pairingComplete");
|
||||
// The receiver further checks if the initiator's commitment opens to the initiator's static key received
|
||||
expectedInitiatorCommittedStaticKey := CommitPublicKey(p.handshake.RS(), hsStep.TransportMessage)
|
||||
if !bytes.Equal(expectedInitiatorCommittedStaticKey, initiatorCommittedStaticKey) {
|
||||
doneCh <- errors.New("expected committed static key does not match the initiator actual committed static key")
|
||||
return
|
||||
}
|
||||
|
||||
return WakuPairing.getSecureCodec(this.contentTopic, this.handshakeResult);*/
|
||||
// Secure Transfer Phase
|
||||
if !p.handshake.HandshakeComplete() {
|
||||
doneCh <- errors.New("handshake is in undefined state")
|
||||
return
|
||||
}
|
||||
|
||||
p.Lock()
|
||||
p.completed = true
|
||||
p.Unlock()
|
||||
}()
|
||||
return doneCh
|
||||
}
|
||||
|
||||
func (p *Pairing) HandshakeComplete() bool {
|
||||
p.RLock()
|
||||
defer p.RUnlock()
|
||||
return p.completed
|
||||
}
|
||||
|
||||
// Returns a WakuMessage with version 2 and encrypted payload
|
||||
func (p *Pairing) Encrypt(plaintext []byte) (*pb.WakuMessage, error) {
|
||||
payload, err := p.handshake.Encrypt(plaintext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return EncodePayloadV2(payload)
|
||||
}
|
||||
|
||||
func (p *Pairing) Decrypt(msg *pb.WakuMessage) ([]byte, error) {
|
||||
payload, err := DecodePayloadV2(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.handshake.Decrypt(payload)
|
||||
}
|
||||
|
||||
func (p *Pairing) ConfirmAuthCode(confirmed bool) error {
|
||||
p.Lock()
|
||||
p.RLock()
|
||||
authcode := p.authCode
|
||||
p.Unlock()
|
||||
p.RUnlock()
|
||||
|
||||
if authcode != "" {
|
||||
p.authCodeConfirmed <- confirmed
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New("authcode has not been generated yet")
|
||||
}
|
||||
|
||||
func (p *Pairing) AuthCode() chan<- string {
|
||||
func (p *Pairing) AuthCode() <-chan string {
|
||||
ch := make(chan string, 1)
|
||||
|
||||
p.Lock()
|
||||
|
||||
135
waku/v2/noise/pairing_relay_messenger.go
Normal file
135
waku/v2/noise/pairing_relay_messenger.go
Normal file
@ -0,0 +1,135 @@
|
||||
package noise
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
v2 "github.com/waku-org/go-waku/waku/v2"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
)
|
||||
|
||||
type NoiseMessenger interface {
|
||||
Sender
|
||||
Receiver
|
||||
Stop()
|
||||
}
|
||||
|
||||
type contentTopicSubscription struct {
|
||||
envChan chan *protocol.Envelope
|
||||
msgChan chan *pb.WakuMessage
|
||||
}
|
||||
|
||||
type NoiseWakuRelay struct {
|
||||
NoiseMessenger
|
||||
relay *relay.WakuRelay
|
||||
relaySub *relay.Subscription
|
||||
broadcaster v2.Broadcaster
|
||||
cancel context.CancelFunc
|
||||
timesource timesource.Timesource
|
||||
pubsubTopic string
|
||||
subscriptionChPerContentTopic map[string][]contentTopicSubscription
|
||||
}
|
||||
|
||||
func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic *string, timesource timesource.Timesource) (*NoiseWakuRelay, error) {
|
||||
var topic string
|
||||
if pubsubTopic != nil {
|
||||
topic = *pubsubTopic
|
||||
} else {
|
||||
topic = relay.DefaultWakuTopic
|
||||
}
|
||||
|
||||
subs, err := r.SubscribeToTopic(ctx, topic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
wr := &NoiseWakuRelay{
|
||||
relay: r,
|
||||
relaySub: subs,
|
||||
cancel: cancel,
|
||||
timesource: timesource,
|
||||
broadcaster: v2.NewBroadcaster(1024),
|
||||
pubsubTopic: topic,
|
||||
subscriptionChPerContentTopic: make(map[string][]contentTopicSubscription),
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
subs.Unsubscribe()
|
||||
wr.broadcaster.Close()
|
||||
return
|
||||
case envelope := <-subs.C:
|
||||
if envelope != nil {
|
||||
wr.broadcaster.Submit(envelope)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return wr, nil
|
||||
}
|
||||
|
||||
func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-chan *pb.WakuMessage {
|
||||
sub := contentTopicSubscription{
|
||||
envChan: make(chan *protocol.Envelope, 1024),
|
||||
msgChan: make(chan *pb.WakuMessage, 1024),
|
||||
}
|
||||
|
||||
r.broadcaster.Register(&r.pubsubTopic, sub.envChan)
|
||||
|
||||
subscriptionCh := r.subscriptionChPerContentTopic[contentTopic]
|
||||
subscriptionCh = append(subscriptionCh, sub)
|
||||
r.subscriptionChPerContentTopic[contentTopic] = subscriptionCh
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case env := <-sub.envChan:
|
||||
if env == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if env.Message().ContentTopic != contentTopic || env.Message().Version != 2 {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Might make sense to create a ring buffer here, to drop messages if queue fills up
|
||||
sub.msgChan <- env.Message()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return sub.msgChan
|
||||
}
|
||||
|
||||
func (r *NoiseWakuRelay) Publish(ctx context.Context, contentTopic string, payload PayloadV2) error {
|
||||
|
||||
message, err := EncodePayloadV2(&payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
message.ContentTopic = contentTopic
|
||||
message.Timestamp = r.timesource.Now().UnixNano()
|
||||
|
||||
_, err = r.relay.PublishToTopic(ctx, message, r.pubsubTopic)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *NoiseWakuRelay) Stop() {
|
||||
r.cancel()
|
||||
for _, contentTopicSubscriptions := range r.subscriptionChPerContentTopic {
|
||||
for _, c := range contentTopicSubscriptions {
|
||||
close(c.envChan)
|
||||
close(c.msgChan)
|
||||
}
|
||||
}
|
||||
}
|
||||
208
waku/v2/noise/pairing_test.go
Normal file
208
waku/v2/noise/pairing_test.go
Normal file
@ -0,0 +1,208 @@
|
||||
package noise
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
v2 "github.com/waku-org/go-waku/waku/v2"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
n "github.com/waku-org/noise"
|
||||
)
|
||||
|
||||
func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) {
|
||||
port, err := tests.FindFreePort(t, "", 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(1024), 0, timesource.NewDefaultClock(), utils.Logger())
|
||||
require.NoError(t, err)
|
||||
|
||||
return host, relay
|
||||
}
|
||||
|
||||
func TestPairingObj1Success(t *testing.T) {
|
||||
host1, relay1 := createRelayNode(t)
|
||||
host2, relay2 := createRelayNode(t)
|
||||
|
||||
defer host1.Close()
|
||||
defer host2.Close()
|
||||
defer relay1.Stop()
|
||||
defer relay2.Stop()
|
||||
|
||||
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||
err := host1.Peerstore().AddProtocols(host2.ID(), string(relay.WakuRelayID_v200))
|
||||
require.NoError(t, err)
|
||||
_, err = host1.Network().DialPeer(context.Background(), host2.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(2 * time.Second) // Wait for relay to form mesh
|
||||
|
||||
bobStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
bobEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
|
||||
bobMessenger, err := NewWakuRelayMessenger(context.Background(), relay1, nil, timesource.NewDefaultClock())
|
||||
require.NoError(t, err)
|
||||
|
||||
bobPairingObj, err := NewPairing(bobStaticKey, bobEphemeralKey, WithDefaultResponderParameters(), bobMessenger, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
|
||||
authCodeCheckCh := make(chan string, 2)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Check that authcodes match
|
||||
authcode1 := <-authCodeCheckCh
|
||||
authcode2 := <-authCodeCheckCh
|
||||
require.Equal(t, authcode1, authcode2)
|
||||
}()
|
||||
|
||||
// Execute in separate go routine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
err := bobPairingObj.Execute(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Confirmation is done by manually
|
||||
go func() {
|
||||
authCode := <-bobPairingObj.AuthCode()
|
||||
authCodeCheckCh <- authCode
|
||||
err := bobPairingObj.ConfirmAuthCode(true)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
aliceStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
aliceEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
|
||||
aliceMessenger, err := NewWakuRelayMessenger(context.Background(), relay2, nil, timesource.NewDefaultClock())
|
||||
require.NoError(t, err)
|
||||
|
||||
qrString, qrMessageNameTag := bobPairingObj.PairingInfo()
|
||||
alicePairingObj, err := NewPairing(aliceStaticKey, aliceEphemeralKey, WithInitiatorParameters(qrString, qrMessageNameTag), aliceMessenger, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Execute in separate go routine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
err := alicePairingObj.Execute(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Alice waits for authcode and confirms it
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
authCode := <-alicePairingObj.AuthCode()
|
||||
authCodeCheckCh <- authCode
|
||||
err := alicePairingObj.ConfirmAuthCode(true)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// We test read/write of random messages exchanged between Alice and Bob
|
||||
// Note that we exchange more than the number of messages contained in the nametag buffer to test if they are filled correctly as the communication proceeds
|
||||
// We assume messages are sent via one of waku protocols
|
||||
for i := 0; i < 10*MessageNametagBufferSize; i++ {
|
||||
// Alice writes to Bob
|
||||
message := generateRandomBytes(t, 32)
|
||||
msg, err := alicePairingObj.Encrypt(message)
|
||||
require.NoError(t, err)
|
||||
|
||||
readMessage, err := bobPairingObj.Decrypt(msg)
|
||||
require.NoError(t, err)
|
||||
require.True(t, bytes.Equal(message, readMessage))
|
||||
|
||||
// Bob writes to Alice
|
||||
message = generateRandomBytes(t, 32)
|
||||
msg, err = alicePairingObj.Encrypt(message)
|
||||
require.NoError(t, err)
|
||||
|
||||
readMessage, err = bobPairingObj.Decrypt(msg)
|
||||
require.NoError(t, err)
|
||||
require.True(t, bytes.Equal(message, readMessage))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPairingObj1ShouldTimeout(t *testing.T) {
|
||||
host1, relay1 := createRelayNode(t)
|
||||
host2, relay2 := createRelayNode(t)
|
||||
|
||||
defer host1.Close()
|
||||
defer host2.Close()
|
||||
defer relay1.Stop()
|
||||
defer relay2.Stop()
|
||||
|
||||
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||
err := host1.Peerstore().AddProtocols(host2.ID(), string(relay.WakuRelayID_v200))
|
||||
require.NoError(t, err)
|
||||
_, err = host1.Network().DialPeer(context.Background(), host2.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(2 * time.Second) // Wait for relay to form mesh
|
||||
|
||||
bobStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
bobEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
|
||||
bobMessenger, err := NewWakuRelayMessenger(context.Background(), relay1, nil, timesource.NewDefaultClock())
|
||||
require.NoError(t, err)
|
||||
|
||||
bobPairingObj, err := NewPairing(bobStaticKey, bobEphemeralKey, WithDefaultResponderParameters(), bobMessenger, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
|
||||
aliceStaticKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
aliceEphemeralKey, _ := n.DH25519.GenerateKeypair(rand.Reader)
|
||||
|
||||
aliceMessenger, err := NewWakuRelayMessenger(context.Background(), relay2, nil, timesource.NewDefaultClock())
|
||||
require.NoError(t, err)
|
||||
|
||||
qrString, qrMessageNameTag := bobPairingObj.PairingInfo()
|
||||
alicePairingObj, err := NewPairing(aliceStaticKey, aliceEphemeralKey, WithInitiatorParameters(qrString, qrMessageNameTag), aliceMessenger, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
err := bobPairingObj.Execute(ctx)
|
||||
require.ErrorIs(t, err, ErrPairingTimeout)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
err := alicePairingObj.Execute(ctx)
|
||||
require.ErrorIs(t, err, ErrPairingTimeout)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user