Update proto for change to stream framing events

This commit is contained in:
Daniel Nephin 2020-10-01 14:41:47 -04:00
parent a5df5d17b4
commit b88f817bc5
2 changed files with 106 additions and 88 deletions

View File

@ -200,7 +200,7 @@ type Event struct {
//
// Types that are valid to be assigned to Payload:
// *Event_EndOfSnapshot
// *Event_EndOfEmptySnapshot
// *Event_NewSnapshotToFollow
// *Event_EventBatch
// *Event_ServiceHealth
Payload isEvent_Payload `protobuf_oneof:"Payload"`
@ -251,8 +251,8 @@ type isEvent_Payload interface {
type Event_EndOfSnapshot struct {
EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"`
}
type Event_EndOfEmptySnapshot struct {
EndOfEmptySnapshot bool `protobuf:"varint,6,opt,name=EndOfEmptySnapshot,proto3,oneof"`
type Event_NewSnapshotToFollow struct {
NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"`
}
type Event_EventBatch struct {
EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"`
@ -261,10 +261,10 @@ type Event_ServiceHealth struct {
ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"`
}
func (*Event_EndOfSnapshot) isEvent_Payload() {}
func (*Event_EndOfEmptySnapshot) isEvent_Payload() {}
func (*Event_EventBatch) isEvent_Payload() {}
func (*Event_ServiceHealth) isEvent_Payload() {}
func (*Event_EndOfSnapshot) isEvent_Payload() {}
func (*Event_NewSnapshotToFollow) isEvent_Payload() {}
func (*Event_EventBatch) isEvent_Payload() {}
func (*Event_ServiceHealth) isEvent_Payload() {}
func (m *Event) GetPayload() isEvent_Payload {
if m != nil {
@ -301,9 +301,9 @@ func (m *Event) GetEndOfSnapshot() bool {
return false
}
func (m *Event) GetEndOfEmptySnapshot() bool {
if x, ok := m.GetPayload().(*Event_EndOfEmptySnapshot); ok {
return x.EndOfEmptySnapshot
func (m *Event) GetNewSnapshotToFollow() bool {
if x, ok := m.GetPayload().(*Event_NewSnapshotToFollow); ok {
return x.NewSnapshotToFollow
}
return false
}
@ -326,7 +326,7 @@ func (m *Event) GetServiceHealth() *ServiceHealthUpdate {
func (*Event) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _Event_OneofMarshaler, _Event_OneofUnmarshaler, _Event_OneofSizer, []interface{}{
(*Event_EndOfSnapshot)(nil),
(*Event_EndOfEmptySnapshot)(nil),
(*Event_NewSnapshotToFollow)(nil),
(*Event_EventBatch)(nil),
(*Event_ServiceHealth)(nil),
}
@ -343,9 +343,9 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
}
_ = b.EncodeVarint(5<<3 | proto.WireVarint)
_ = b.EncodeVarint(t)
case *Event_EndOfEmptySnapshot:
case *Event_NewSnapshotToFollow:
t := uint64(0)
if x.EndOfEmptySnapshot {
if x.NewSnapshotToFollow {
t = 1
}
_ = b.EncodeVarint(6<<3 | proto.WireVarint)
@ -377,12 +377,12 @@ func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer)
x, err := b.DecodeVarint()
m.Payload = &Event_EndOfSnapshot{x != 0}
return true, err
case 6: // Payload.EndOfEmptySnapshot
case 6: // Payload.NewSnapshotToFollow
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.Payload = &Event_EndOfEmptySnapshot{x != 0}
m.Payload = &Event_NewSnapshotToFollow{x != 0}
return true, err
case 7: // Payload.EventBatch
if wire != proto.WireBytes {
@ -412,7 +412,7 @@ func _Event_OneofSizer(msg proto.Message) (n int) {
case *Event_EndOfSnapshot:
n += 1 // tag and wire
n += 1
case *Event_EndOfEmptySnapshot:
case *Event_NewSnapshotToFollow:
n += 1 // tag and wire
n += 1
case *Event_EventBatch:
@ -546,40 +546,40 @@ func init() {
func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) }
var fileDescriptor_ab3eb8c810e315fb = []byte{
// 521 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x4f, 0x8f, 0xd2, 0x40,
0x14, 0xef, 0xc0, 0x02, 0xcb, 0xc3, 0xdd, 0xd4, 0x11, 0x63, 0xc3, 0x26, 0x0d, 0x12, 0xb3, 0xa9,
0x9b, 0x48, 0x37, 0x98, 0xe8, 0x4d, 0x23, 0x2c, 0x8a, 0x31, 0x11, 0x53, 0xdc, 0x83, 0xde, 0x86,
0xf6, 0x49, 0x1b, 0xd8, 0x99, 0xb1, 0x1d, 0x56, 0xb9, 0xfb, 0x21, 0xf6, 0xcb, 0x78, 0xf7, 0xe8,
0x47, 0x30, 0xf8, 0x45, 0x0c, 0x43, 0xb7, 0x5b, 0x60, 0x6f, 0xde, 0xfa, 0x7e, 0x7f, 0xe6, 0xfd,
0xf2, 0x5e, 0x1f, 0x3c, 0x94, 0xb1, 0x50, 0xc2, 0x95, 0xe3, 0x64, 0x3e, 0x4e, 0xfc, 0x38, 0x1a,
0xa3, 0x9b, 0x7d, 0xb5, 0x35, 0x47, 0xab, 0x19, 0xd0, 0x68, 0x64, 0x6a, 0x8c, 0x2f, 0x23, 0x1f,
0x5d, 0x2e, 0x82, 0x54, 0xd6, 0xba, 0x22, 0x60, 0x8e, 0xae, 0x95, 0x1e, 0x7e, 0x9d, 0x63, 0xa2,
0xe8, 0x31, 0x94, 0x3e, 0x0a, 0x19, 0xf9, 0x16, 0x69, 0x12, 0xe7, 0xb0, 0x63, 0xb6, 0x6f, 0x1e,
0xd7, 0xb8, 0xb7, 0xa6, 0xa9, 0x09, 0xc5, 0x77, 0xb8, 0xb0, 0x0a, 0x4d, 0xe2, 0x54, 0xbd, 0xd5,
0x27, 0xad, 0xaf, 0x9c, 0x53, 0xe4, 0x56, 0x51, 0x63, 0xeb, 0x62, 0x85, 0xbe, 0xe5, 0x01, 0x7e,
0xb7, 0xf6, 0x9a, 0xc4, 0xd9, 0xf3, 0xd6, 0x05, 0xb5, 0x01, 0xce, 0x98, 0x62, 0x3e, 0x72, 0x85,
0xb1, 0x55, 0xd2, 0x86, 0x1c, 0xd2, 0xfa, 0x59, 0x80, 0x52, 0xff, 0x12, 0xf9, 0x7f, 0xe6, 0x59,
0x77, 0x2e, 0xe6, 0x3b, 0x1f, 0xc3, 0x41, 0x9f, 0x07, 0xc3, 0x2f, 0x23, 0xce, 0x64, 0x12, 0x0a,
0xa5, 0x9b, 0xef, 0x0f, 0x0c, 0x6f, 0x13, 0xa6, 0xa7, 0x40, 0x35, 0xd0, 0xbf, 0x90, 0x6a, 0x91,
0x89, 0xcb, 0xa9, 0xf8, 0x16, 0x8e, 0x3e, 0x07, 0xd0, 0x91, 0xbb, 0x4c, 0xf9, 0xa1, 0x55, 0x69,
0x12, 0xa7, 0xd6, 0xb9, 0x9f, 0x8b, 0x7b, 0x43, 0x0e, 0x0c, 0x2f, 0x27, 0xa5, 0xaf, 0xe1, 0x60,
0xb4, 0xde, 0xce, 0x00, 0xd9, 0x4c, 0x85, 0x16, 0x68, 0xaf, 0x9d, 0xf3, 0x6e, 0xf0, 0xe7, 0x32,
0x60, 0x0a, 0x57, 0x91, 0x37, 0xe0, 0x6e, 0x15, 0x2a, 0x1f, 0xd8, 0x62, 0x26, 0x58, 0xd0, 0x7a,
0x96, 0xcf, 0x42, 0x1d, 0x28, 0xeb, 0x2a, 0xb1, 0x48, 0xb3, 0xe8, 0xd4, 0x36, 0x86, 0xa8, 0x09,
0x2f, 0xe5, 0x5b, 0x3f, 0x08, 0xdc, 0xbb, 0xa5, 0x17, 0x7d, 0x04, 0x85, 0xa1, 0x4c, 0x57, 0x50,
0xcf, 0xb9, 0x7b, 0x4c, 0xb1, 0x99, 0x98, 0x0c, 0xa5, 0x57, 0x18, 0x4a, 0xfa, 0x06, 0xcc, 0x5e,
0x88, 0xfe, 0x34, 0x7d, 0xe1, 0xbd, 0x08, 0x50, 0x2f, 0xa4, 0xd6, 0x39, 0x6a, 0x67, 0x7f, 0x60,
0x7b, 0x5b, 0xe2, 0xed, 0x98, 0x4e, 0x5e, 0xa5, 0x4b, 0xa7, 0x35, 0xa8, 0x9c, 0xf3, 0x29, 0x17,
0xdf, 0xb8, 0x69, 0xd0, 0xbb, 0x5b, 0x73, 0x32, 0x09, 0xb5, 0xa0, 0xbe, 0x01, 0xf5, 0x04, 0xe7,
0xe8, 0x2b, 0xb3, 0x70, 0xf2, 0x18, 0xaa, 0x59, 0x38, 0x7a, 0x07, 0xf6, 0x3d, 0x9c, 0x44, 0x89,
0xc2, 0xd8, 0x34, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xd7, 0x35, 0xe9, 0x7c, 0x82, 0x07, 0x23, 0xc5,
0x14, 0xf6, 0x42, 0xc6, 0x27, 0x98, 0x5e, 0x84, 0x54, 0x91, 0xe0, 0xf4, 0x05, 0x54, 0xb3, 0x0b,
0xa1, 0x47, 0xf9, 0x85, 0x6c, 0xdd, 0x4d, 0x63, 0x67, 0xa6, 0x2d, 0xe3, 0x94, 0x74, 0x5f, 0xfe,
0x5a, 0xda, 0xe4, 0xf7, 0xd2, 0x26, 0x7f, 0x96, 0x36, 0xb9, 0xfa, 0x6b, 0x1b, 0x9f, 0x9f, 0x4c,
0x22, 0x15, 0xce, 0xc7, 0x6d, 0x5f, 0x5c, 0xb8, 0x21, 0x4b, 0xc2, 0xc8, 0x17, 0xb1, 0x74, 0x7d,
0xc1, 0x93, 0xf9, 0xcc, 0xdd, 0x39, 0xed, 0x71, 0x59, 0x43, 0x4f, 0xff, 0x05, 0x00, 0x00, 0xff,
0xff, 0x7d, 0xf7, 0xca, 0x01, 0xf6, 0x03, 0x00, 0x00,
// 526 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x5f, 0x6f, 0xd2, 0x50,
0x14, 0xef, 0x85, 0x01, 0xe3, 0xe0, 0x96, 0x7a, 0x87, 0xb1, 0x61, 0x49, 0x83, 0xc4, 0x2c, 0x75,
0x89, 0xd4, 0x60, 0xa2, 0x6f, 0x1a, 0x61, 0x9b, 0x18, 0x93, 0x61, 0xca, 0xf6, 0xa0, 0x6f, 0x97,
0xf6, 0x48, 0x1b, 0xea, 0xbd, 0xb5, 0xbd, 0x0c, 0xf7, 0xee, 0x87, 0xd8, 0xb7, 0xf1, 0xd5, 0x47,
0x3f, 0x82, 0xc1, 0x2f, 0x62, 0xb8, 0x94, 0xae, 0xc0, 0xde, 0xf6, 0xd6, 0xf3, 0xfb, 0x73, 0xcf,
0x2f, 0xe7, 0xf4, 0xc0, 0x93, 0x28, 0x16, 0x52, 0xd8, 0xd1, 0x28, 0x99, 0x8e, 0x12, 0x37, 0x0e,
0x46, 0x68, 0x67, 0x5f, 0x6d, 0xc5, 0xd1, 0x6a, 0x06, 0x34, 0x1a, 0x99, 0x1a, 0xe3, 0xab, 0xc0,
0x45, 0x9b, 0x0b, 0x2f, 0x95, 0xb5, 0x6e, 0x08, 0xe8, 0xc3, 0x95, 0xd2, 0xc1, 0xef, 0x53, 0x4c,
0x24, 0x3d, 0x82, 0xd2, 0x85, 0x88, 0x02, 0xd7, 0x20, 0x4d, 0x62, 0xed, 0x77, 0xf4, 0xf6, 0xed,
0xe3, 0x0a, 0x77, 0x96, 0x34, 0xd5, 0xa1, 0xf8, 0x11, 0xaf, 0x8d, 0x42, 0x93, 0x58, 0x55, 0x67,
0xf1, 0x49, 0xeb, 0x0b, 0xe7, 0x04, 0xb9, 0x51, 0x54, 0xd8, 0xb2, 0x58, 0xa0, 0x1f, 0xb8, 0x87,
0x3f, 0x8c, 0x9d, 0x26, 0xb1, 0x76, 0x9c, 0x65, 0x41, 0x4d, 0x80, 0x13, 0x26, 0x99, 0x8b, 0x5c,
0x62, 0x6c, 0x94, 0x94, 0x21, 0x87, 0xb4, 0x7e, 0x15, 0xa0, 0x74, 0x7a, 0x85, 0xfc, 0x9e, 0x79,
0x96, 0x9d, 0x8b, 0xf9, 0xce, 0x47, 0xb0, 0x77, 0xca, 0xbd, 0xc1, 0xd7, 0x21, 0x67, 0x51, 0xe2,
0x0b, 0xa9, 0x9a, 0xef, 0xf6, 0x35, 0x67, 0x1d, 0xa6, 0x1d, 0x38, 0x38, 0xc7, 0xd9, 0xaa, 0xbc,
0x10, 0x67, 0x22, 0x0c, 0xc5, 0xcc, 0x28, 0xa7, 0xea, 0xbb, 0x48, 0xfa, 0x1a, 0x40, 0x85, 0xee,
0x32, 0xe9, 0xfa, 0x46, 0xa5, 0x49, 0xac, 0x5a, 0xe7, 0x51, 0x2e, 0xf0, 0x2d, 0xd9, 0xd7, 0x9c,
0x9c, 0x94, 0x9e, 0xc1, 0xde, 0x70, 0xb9, 0x9f, 0x3e, 0xb2, 0x50, 0xfa, 0x06, 0x28, 0xaf, 0x99,
0xf3, 0xae, 0xf1, 0x97, 0x91, 0xc7, 0x24, 0x2e, 0x42, 0xaf, 0xc1, 0xdd, 0x2a, 0x54, 0x3e, 0xb1,
0xeb, 0x50, 0x30, 0xaf, 0xf5, 0x2a, 0x9f, 0x85, 0x5a, 0x50, 0x56, 0x55, 0x62, 0x90, 0x66, 0xd1,
0xaa, 0xad, 0x8d, 0x51, 0x11, 0x4e, 0xca, 0xb7, 0x7e, 0x12, 0x38, 0xb8, 0xa3, 0x17, 0x7d, 0x0a,
0x85, 0x41, 0x94, 0x2e, 0xa1, 0x9e, 0x73, 0xf7, 0x98, 0x64, 0xa1, 0x18, 0x0f, 0x22, 0xa7, 0x30,
0x88, 0xe8, 0x7b, 0xd0, 0x7b, 0x3e, 0xba, 0x93, 0xf4, 0x85, 0x73, 0xe1, 0xa1, 0x5a, 0x49, 0xad,
0x73, 0xd8, 0xce, 0xfe, 0xc1, 0xf6, 0xa6, 0xc4, 0xd9, 0x32, 0x1d, 0xbf, 0x4b, 0xd7, 0x4e, 0x6b,
0x50, 0xb9, 0xe4, 0x13, 0x2e, 0x66, 0x5c, 0xd7, 0xe8, 0xc3, 0x8d, 0x39, 0xe9, 0x84, 0x1a, 0x50,
0x5f, 0x83, 0x7a, 0x82, 0x73, 0x74, 0xa5, 0x5e, 0x38, 0x7e, 0x06, 0xd5, 0x2c, 0x1c, 0x7d, 0x00,
0xbb, 0x0e, 0x8e, 0x83, 0x44, 0x62, 0xac, 0x6b, 0x74, 0x1f, 0xe0, 0x04, 0xe3, 0x55, 0x4d, 0x3a,
0x9f, 0xe1, 0xf1, 0x50, 0x32, 0x89, 0x3d, 0x9f, 0xf1, 0x31, 0xa6, 0x37, 0x11, 0xc9, 0x40, 0x70,
0xfa, 0x06, 0xaa, 0xd9, 0x8d, 0xd0, 0xc3, 0xfc, 0x42, 0x36, 0x2e, 0xa7, 0xb1, 0x35, 0xd3, 0x96,
0xf6, 0x82, 0x74, 0xdf, 0xfe, 0x9e, 0x9b, 0xe4, 0xcf, 0xdc, 0x24, 0x7f, 0xe7, 0x26, 0xb9, 0xf9,
0x67, 0x6a, 0x5f, 0x9e, 0x8f, 0x03, 0xe9, 0x4f, 0x47, 0x6d, 0x57, 0x7c, 0xb3, 0x7d, 0x96, 0xf8,
0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, 0x4f, 0xa6, 0xa1, 0xbd, 0x75, 0xdc, 0xa3, 0xb2, 0x82, 0x5e,
0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x44, 0xbc, 0x0a, 0xfb, 0xf8, 0x03, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -595,18 +595,24 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StateChangeSubscriptionClient interface {
// Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
//
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
//
// Subscribe may return an ABORTED status error to indicate the client must
// re-start the Subscribe call.
// If SubscribeRequest.Index is > 0 it is assumed the client already has a
// snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed
// and the server doesn't know which previously delivered events should
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
// stream, for example because the ACL permissions for the token changed, or
// because the server state was restored from a snapshot.
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (StateChangeSubscription_SubscribeClient, error)
}
@ -653,18 +659,24 @@ func (x *stateChangeSubscriptionSubscribeClient) Recv() (*Event, error) {
// StateChangeSubscriptionServer is the server API for StateChangeSubscription service.
type StateChangeSubscriptionServer interface {
// Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
//
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
//
// Subscribe may return an ABORTED status error to indicate the client must
// re-start the Subscribe call.
// If SubscribeRequest.Index is > 0 it is assumed the client already has a
// snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed
// and the server doesn't know which previously delivered events should
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
// stream, for example because the ACL permissions for the token changed, or
// because the server state was restored from a snapshot.
Subscribe(*SubscribeRequest, StateChangeSubscription_SubscribeServer) error
}
@ -842,14 +854,14 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0x28
return len(dAtA) - i, nil
}
func (m *Event_EndOfEmptySnapshot) MarshalTo(dAtA []byte) (int, error) {
func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) {
return m.MarshalToSizedBuffer(dAtA[:m.Size()])
}
func (m *Event_EndOfEmptySnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
i--
if m.EndOfEmptySnapshot {
if m.NewSnapshotToFollow {
dAtA[i] = 1
} else {
dAtA[i] = 0
@ -1058,7 +1070,7 @@ func (m *Event_EndOfSnapshot) Size() (n int) {
n += 2
return n
}
func (m *Event_EndOfEmptySnapshot) Size() (n int) {
func (m *Event_NewSnapshotToFollow) Size() (n int) {
if m == nil {
return 0
}
@ -1444,7 +1456,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
m.Payload = &Event_EndOfSnapshot{b}
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndOfEmptySnapshot", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
@ -1462,7 +1474,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
}
b := bool(v != 0)
m.Payload = &Event_EndOfEmptySnapshot{b}
m.Payload = &Event_NewSnapshotToFollow{b}
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType)

View File

@ -13,18 +13,24 @@ import "proto/pbservice/node.proto";
// state change events. Events are streamed as they happen.
service StateChangeSubscription {
// Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
//
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
//
// Subscribe may return an ABORTED status error to indicate the client must
// re-start the Subscribe call.
// If SubscribeRequest.Index is > 0 it is assumed the client already has a
// snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed
// and the server doesn't know which previously delivered events should
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
// stream, for example because the ACL permissions for the token changed, or
// because the server state was restored from a snapshot.
rpc Subscribe(SubscribeRequest) returns (stream Event) {}
}
@ -92,11 +98,11 @@ message Event {
// ended. Subsequent Events delivered will be mutations to that result.
bool EndOfSnapshot = 5;
// EndOfEmptySnapshot indicates that the client is still up-to-date.
// The snapshot has ended, and was empty. The rest of the stream will be
// individual update events. It distinguishes between "up to date, no snapshot"
// and "snapshot contains zero events but you should reset any old state to be blank".
bool EndOfEmptySnapshot = 6;
// NewSnapshotToFollow indicates that the client view is stale. The client
// must reset its view before handing any more events. Subsequent events
// in the stream will be for a new snapshot until an EndOfSnapshot event
// is received.
bool NewSnapshotToFollow = 6;
// EventBatch is a set of events. This is typically used as the payload
// type where multiple events are emitted in a single topic and raft