feat: ephemeral messages (#305)

This commit is contained in:
Richard Ramos 2022-09-12 12:10:34 -04:00 committed by GitHub
parent cd79be4812
commit 83250be0fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 21 deletions

View File

@ -115,6 +115,7 @@ type WakuMessage struct {
Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
RateLimitProof *RateLimitProof `protobuf:"bytes,21,opt,name=rate_limit_proof,json=rateLimitProof,proto3" json:"rate_limit_proof,omitempty"` RateLimitProof *RateLimitProof `protobuf:"bytes,21,opt,name=rate_limit_proof,json=rateLimitProof,proto3" json:"rate_limit_proof,omitempty"`
Ephemeral bool `protobuf:"varint,31,opt,name=ephemeral,proto3" json:"ephemeral,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -188,6 +189,13 @@ func (m *WakuMessage) GetRateLimitProof() *RateLimitProof {
return nil return nil
} }
func (m *WakuMessage) GetEphemeral() bool {
if m != nil {
return m.Ephemeral
}
return false
}
func init() { func init() {
proto.RegisterType((*RateLimitProof)(nil), "pb.RateLimitProof") proto.RegisterType((*RateLimitProof)(nil), "pb.RateLimitProof")
proto.RegisterType((*WakuMessage)(nil), "pb.WakuMessage") proto.RegisterType((*WakuMessage)(nil), "pb.WakuMessage")
@ -196,26 +204,27 @@ func init() {
func init() { proto.RegisterFile("waku_message.proto", fileDescriptor_6f0a20862b3bf714) } func init() { proto.RegisterFile("waku_message.proto", fileDescriptor_6f0a20862b3bf714) }
var fileDescriptor_6f0a20862b3bf714 = []byte{ var fileDescriptor_6f0a20862b3bf714 = []byte{
// 299 bytes of a gzipped FileDescriptorProto // 313 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0xbf, 0x4e, 0xf3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x41, 0x4e, 0x02, 0x31,
0x14, 0xc5, 0x3f, 0xf7, 0xa3, 0xad, 0x7a, 0x5b, 0xaa, 0xca, 0x02, 0xe1, 0x01, 0x85, 0xaa, 0x53, 0x18, 0x85, 0x2d, 0x0a, 0xc8, 0x0f, 0x12, 0xd2, 0x68, 0xec, 0x82, 0x0c, 0x13, 0x56, 0xb3, 0x62,
0xa7, 0x0e, 0xb0, 0x32, 0x31, 0x83, 0x84, 0x2c, 0x24, 0x60, 0x8a, 0x9c, 0x70, 0x4b, 0xad, 0xfc, 0xa1, 0x5b, 0x57, 0xae, 0x35, 0x31, 0x8d, 0x89, 0xba, 0x9a, 0x14, 0xfc, 0x91, 0x86, 0xe9, 0xfc,
0xb9, 0x96, 0xe3, 0x00, 0x7d, 0x13, 0x5e, 0x82, 0x87, 0x60, 0x63, 0xe4, 0x11, 0x50, 0x78, 0x11, 0x4d, 0xa7, 0xa8, 0xdc, 0xc4, 0x4b, 0x78, 0x0f, 0x97, 0xde, 0x40, 0x83, 0x17, 0x31, 0xd3, 0x91,
0x14, 0x87, 0x2a, 0x64, 0xf3, 0xef, 0x1c, 0x1d, 0xe9, 0xf8, 0x5c, 0xe0, 0xcf, 0x2a, 0x29, 0xc3, 0x8c, 0xec, 0xfa, 0xbe, 0x97, 0x97, 0xbc, 0xbe, 0x1f, 0xf8, 0x8b, 0x5a, 0xae, 0x52, 0x83, 0x45,
0x0c, 0x8b, 0x42, 0x3d, 0xe2, 0xca, 0x58, 0x72, 0xc4, 0x7b, 0x26, 0x5a, 0xbc, 0x31, 0x98, 0x4a, 0xa1, 0x9e, 0x70, 0x62, 0x1d, 0x79, 0xe2, 0x0d, 0x3b, 0x1d, 0xbf, 0x33, 0xe8, 0x4b, 0xe5, 0xf1,
0xe5, 0xf0, 0x52, 0x67, 0xda, 0x5d, 0x5b, 0xa2, 0x35, 0x3f, 0x80, 0xbe, 0xa9, 0x1f, 0x82, 0xcd, 0x4a, 0x1b, 0xed, 0x6f, 0x1c, 0xd1, 0x9c, 0x1f, 0x43, 0xd3, 0x96, 0x0f, 0xc1, 0x62, 0x96, 0xf4,
0xd9, 0x72, 0x22, 0x1b, 0xe0, 0x27, 0x30, 0xce, 0xd0, 0x26, 0x29, 0x86, 0x96, 0xc8, 0x89, 0x9e, 0x64, 0x25, 0xf8, 0x08, 0xba, 0x06, 0xdd, 0x32, 0xc3, 0xd4, 0x11, 0x79, 0xd1, 0x08, 0x1e, 0x54,
0xf7, 0xa0, 0x91, 0x24, 0x91, 0xab, 0x63, 0x68, 0x28, 0xde, 0x88, 0xff, 0x4d, 0xcc, 0x03, 0x3f, 0x48, 0x12, 0xf9, 0x32, 0x86, 0x96, 0x66, 0x0b, 0xb1, 0x5f, 0xc5, 0x82, 0xe0, 0xa7, 0xd0, 0x2e,
0x82, 0x61, 0xb1, 0x51, 0x16, 0xc3, 0x17, 0xb1, 0xe7, 0xf5, 0x81, 0xc7, 0xbb, 0xd6, 0xd8, 0x8a, 0x16, 0xca, 0x61, 0xfa, 0x2a, 0x0e, 0x02, 0x6f, 0x05, 0x79, 0x5f, 0x1b, 0x6b, 0xd1, 0xfc, 0x67,
0xfe, 0x1f, 0xe3, 0x9e, 0x1f, 0xc3, 0x28, 0x2f, 0xd3, 0x54, 0xaf, 0x35, 0x5a, 0x31, 0xf0, 0x56, 0x3c, 0xf0, 0x21, 0x74, 0xf2, 0x55, 0x96, 0xe9, 0xb9, 0x46, 0x27, 0x5a, 0xc1, 0xaa, 0xc1, 0xf8,
0x2b, 0x2c, 0xde, 0x19, 0x8c, 0x6f, 0x55, 0x52, 0x5e, 0x35, 0x3f, 0xe1, 0x02, 0x86, 0x46, 0x6d, 0x8b, 0x41, 0xf7, 0x4e, 0x2d, 0x57, 0xd7, 0xd5, 0x4f, 0xb8, 0x80, 0xb6, 0x55, 0xeb, 0x8c, 0xd4,
0x53, 0x52, 0x0f, 0xbf, 0x75, 0x77, 0xc8, 0x17, 0x30, 0x89, 0x29, 0x77, 0x98, 0xbb, 0x1b, 0x32, 0xe3, 0x5f, 0xdd, 0xad, 0xe4, 0x63, 0xe8, 0xcd, 0x28, 0xf7, 0x98, 0xfb, 0x5b, 0xb2, 0x7a, 0x16,
0x3a, 0xf6, 0x8d, 0x47, 0xb2, 0xa3, 0xd5, 0xe9, 0x27, 0xb4, 0x85, 0xa6, 0xdc, 0xb7, 0xde, 0x97, 0x1a, 0x77, 0xe4, 0x0e, 0x2b, 0xd3, 0xcf, 0xe8, 0x0a, 0x4d, 0x79, 0x68, 0x7d, 0x24, 0xb7, 0xb2,
0x3b, 0xac, 0x5b, 0x38, 0x9d, 0x61, 0xe1, 0x54, 0x66, 0x04, 0xcc, 0xd9, 0x92, 0xcb, 0x56, 0xe0, 0x6c, 0xe1, 0xb5, 0xc1, 0xc2, 0x2b, 0x63, 0x05, 0xc4, 0x2c, 0xe1, 0xb2, 0x06, 0xfc, 0x02, 0x06,
0xe7, 0x30, 0xb3, 0xca, 0x61, 0x98, 0xd6, 0xab, 0x85, 0xcd, 0x5a, 0x87, 0x73, 0xb6, 0x1c, 0x9f, 0x4e, 0x79, 0x4c, 0xb3, 0x72, 0xb5, 0xb4, 0x5a, 0xeb, 0x24, 0x66, 0x49, 0xf7, 0x8c, 0x4f, 0xec,
0xf2, 0x95, 0x89, 0x56, 0xdd, 0x41, 0xe5, 0xd4, 0x76, 0xf8, 0x62, 0xf6, 0x51, 0x05, 0xec, 0xb3, 0x74, 0xb2, 0x3b, 0xa8, 0xec, 0xbb, 0xdd, 0x81, 0x87, 0xd0, 0x41, 0xbb, 0x40, 0x83, 0x4e, 0x65,
0x0a, 0xd8, 0x57, 0x15, 0xb0, 0xd7, 0xef, 0xe0, 0x5f, 0x34, 0xf0, 0x07, 0x39, 0xfb, 0x09, 0x00, 0x62, 0x14, 0xb3, 0xe4, 0x50, 0xd6, 0xe0, 0x72, 0xf0, 0xb1, 0x89, 0xd8, 0xe7, 0x26, 0x62, 0xdf,
0x00, 0xff, 0xff, 0x4a, 0x19, 0x2a, 0x86, 0xa6, 0x01, 0x00, 0x00, 0x9b, 0x88, 0xbd, 0xfd, 0x44, 0x7b, 0xd3, 0x56, 0x38, 0xd7, 0xf9, 0x6f, 0x00, 0x00, 0x00, 0xff,
0xff, 0xe6, 0x01, 0x38, 0xc8, 0xc4, 0x01, 0x00, 0x00,
} }
func (m *RateLimitProof) Marshal() (dAtA []byte, err error) { func (m *RateLimitProof) Marshal() (dAtA []byte, err error) {
@ -311,6 +320,18 @@ func (m *WakuMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized) i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized)
} }
if m.Ephemeral {
i--
if m.Ephemeral {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x1
i--
dAtA[i] = 0xf8
}
if m.RateLimitProof != nil { if m.RateLimitProof != nil {
{ {
size, err := m.RateLimitProof.MarshalToSizedBuffer(dAtA[:i]) size, err := m.RateLimitProof.MarshalToSizedBuffer(dAtA[:i])
@ -423,6 +444,9 @@ func (m *WakuMessage) Size() (n int) {
l = m.RateLimitProof.Size() l = m.RateLimitProof.Size()
n += 2 + l + sovWakuMessage(uint64(l)) n += 2 + l + sovWakuMessage(uint64(l))
} }
if m.Ephemeral {
n += 3
}
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
@ -861,6 +885,26 @@ func (m *WakuMessage) Unmarshal(dAtA []byte) error {
return err return err
} }
iNdEx = postIndex iNdEx = postIndex
case 31:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ephemeral", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Ephemeral = bool(v != 0)
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipWakuMessage(dAtA[iNdEx:]) skippy, err := skipWakuMessage(dAtA[iNdEx:])

View File

@ -17,4 +17,5 @@ message WakuMessage {
uint32 version = 3; uint32 version = 3;
sint64 timestamp = 10; sint64 timestamp = 10;
RateLimitProof rate_limit_proof = 21; RateLimitProof rate_limit_proof = 21;
bool ephemeral = 31;
} }

View File

@ -214,6 +214,10 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
return ErrFutureMessage return ErrFutureMessage
} }
if env.Message().Ephemeral {
return nil
}
err := store.msgProvider.Put(env) err := store.msgProvider.Put(env)
if err != nil { if err != nil {
store.log.Error("storing message", zap.Error(err)) store.log.Error("storing message", zap.Error(err))

View File

@ -22,8 +22,18 @@ func TestStorePersistence(t *testing.T) {
Version: 0, Version: 0,
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),
} }
err := s1.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), defaultPubSubTopic))
require.NoError(t, err)
_ = s1.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), defaultPubSubTopic)) msg2 := &pb.WakuMessage{
Payload: []byte{4, 5, 6},
ContentTopic: defaultContentTopic,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
Ephemeral: true, // Should not insert this message
}
err = s1.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
require.NoError(t, err)
allMsgs, err := db.GetAll() allMsgs, err := db.GetAll()
require.NoError(t, err) require.NoError(t, err)