mirror of https://github.com/status-im/consul.git
Merge pull request #8357 from hashicorp/streaming/add-service-health-events
streaming: add ServiceHealth events
This commit is contained in:
commit
ed4b51f1ae
|
@ -0,0 +1,475 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
type changeOp int
|
||||
|
||||
const (
|
||||
OpDelete changeOp = iota
|
||||
OpCreate
|
||||
OpUpdate
|
||||
)
|
||||
|
||||
type eventPayload struct {
|
||||
Op changeOp
|
||||
Obj interface{}
|
||||
}
|
||||
|
||||
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
||||
// of stream.Events that describe the current state of a service health query.
|
||||
//
|
||||
// TODO: no tests for this yet
|
||||
func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc {
|
||||
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
connect := topic == TopicServiceHealthConnect
|
||||
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
|
||||
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, n := range nodes {
|
||||
event := stream.Event{
|
||||
Index: idx,
|
||||
Topic: topic,
|
||||
Payload: eventPayload{
|
||||
Op: OpCreate,
|
||||
Obj: &n,
|
||||
},
|
||||
}
|
||||
|
||||
if n.Service != nil {
|
||||
event.Key = n.Service.Service
|
||||
}
|
||||
|
||||
// append each event as a separate item so that they can be serialized
|
||||
// separately, to prevent the encoding of one massive message.
|
||||
buf.Append([]stream.Event{event})
|
||||
}
|
||||
|
||||
return idx, err
|
||||
}
|
||||
}
|
||||
|
||||
type nodeServiceTuple struct {
|
||||
Node string
|
||||
ServiceID string
|
||||
EntMeta structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
func newNodeServiceTupleFromServiceNode(sn *structs.ServiceNode) nodeServiceTuple {
|
||||
return nodeServiceTuple{
|
||||
Node: sn.Node,
|
||||
ServiceID: sn.ServiceID,
|
||||
EntMeta: sn.EnterpriseMeta,
|
||||
}
|
||||
}
|
||||
|
||||
func newNodeServiceTupleFromServiceHealthCheck(hc *structs.HealthCheck) nodeServiceTuple {
|
||||
return nodeServiceTuple{
|
||||
Node: hc.Node,
|
||||
ServiceID: hc.ServiceID,
|
||||
EntMeta: hc.EnterpriseMeta,
|
||||
}
|
||||
}
|
||||
|
||||
type serviceChange struct {
|
||||
changeType changeType
|
||||
change memdb.Change
|
||||
}
|
||||
|
||||
var serviceChangeIndirect = serviceChange{changeType: changeIndirect}
|
||||
|
||||
// ServiceHealthEventsFromChanges returns all the service and Connect health
|
||||
// events that should be emitted given a set of changes to the state store.
|
||||
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||
var events []stream.Event
|
||||
|
||||
var nodeChanges map[string]changeType
|
||||
var serviceChanges map[nodeServiceTuple]serviceChange
|
||||
|
||||
markNode := func(node string, typ changeType) {
|
||||
if nodeChanges == nil {
|
||||
nodeChanges = make(map[string]changeType)
|
||||
}
|
||||
// If the caller has an actual node mutation ensure we store it even if the
|
||||
// node is already marked. If the caller is just marking the node dirty
|
||||
// without a node change, don't overwrite any existing node change we know
|
||||
// about.
|
||||
if nodeChanges[node] == changeIndirect {
|
||||
nodeChanges[node] = typ
|
||||
}
|
||||
}
|
||||
markService := func(key nodeServiceTuple, svcChange serviceChange) {
|
||||
if serviceChanges == nil {
|
||||
serviceChanges = make(map[nodeServiceTuple]serviceChange)
|
||||
}
|
||||
// If the caller has an actual service mutation ensure we store it even if
|
||||
// the service is already marked. If the caller is just marking the service
|
||||
// dirty without a service change, don't overwrite any existing service change we
|
||||
// know about.
|
||||
if serviceChanges[key].changeType == changeIndirect {
|
||||
serviceChanges[key] = svcChange
|
||||
}
|
||||
}
|
||||
|
||||
for _, change := range changes.Changes {
|
||||
switch change.Table {
|
||||
case "nodes":
|
||||
// Node changed in some way, if it's not a delete, we'll need to
|
||||
// re-deliver CheckServiceNode results for all services on that node but
|
||||
// we mark it anyway because if it _is_ a delete then we need to know that
|
||||
// later to avoid trying to deliver events when node level checks mark the
|
||||
// node as "changed".
|
||||
n := changeObject(change).(*structs.Node)
|
||||
markNode(n.Node, changeTypeFromChange(change))
|
||||
|
||||
case "services":
|
||||
sn := changeObject(change).(*structs.ServiceNode)
|
||||
srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
||||
markService(newNodeServiceTupleFromServiceNode(sn), srvChange)
|
||||
|
||||
case "checks":
|
||||
// For health we only care about the scope for now to know if it's just
|
||||
// affecting a single service or every service on a node. There is a
|
||||
// subtle edge case where the check with same ID changes from being node
|
||||
// scoped to service scoped or vice versa, in either case we need to treat
|
||||
// it as affecting all services on the node.
|
||||
switch {
|
||||
case change.Updated():
|
||||
before := change.Before.(*structs.HealthCheck)
|
||||
after := change.After.(*structs.HealthCheck)
|
||||
if after.ServiceID == "" || before.ServiceID == "" {
|
||||
// check before and/or after is node-scoped
|
||||
markNode(after.Node, changeIndirect)
|
||||
} else {
|
||||
// Check changed which means we just need to emit for the linked
|
||||
// service.
|
||||
markService(newNodeServiceTupleFromServiceHealthCheck(after), serviceChangeIndirect)
|
||||
|
||||
// Edge case - if the check with same ID was updated to link to a
|
||||
// different service ID but the old service with old ID still exists,
|
||||
// then the old service instance needs updating too as it has one
|
||||
// fewer checks now.
|
||||
if before.ServiceID != after.ServiceID {
|
||||
markService(newNodeServiceTupleFromServiceHealthCheck(before), serviceChangeIndirect)
|
||||
}
|
||||
}
|
||||
|
||||
case change.Deleted(), change.Created():
|
||||
obj := changeObject(change).(*structs.HealthCheck)
|
||||
if obj.ServiceID == "" {
|
||||
// Node level check
|
||||
markNode(obj.Node, changeIndirect)
|
||||
} else {
|
||||
markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now act on those marked nodes/services
|
||||
for node, changeType := range nodeChanges {
|
||||
if changeType == changeDelete {
|
||||
// Node deletions are a no-op here since the state store transaction will
|
||||
// have also removed all the service instances which will be handled in
|
||||
// the loop below.
|
||||
continue
|
||||
}
|
||||
// Rebuild events for all services on this node
|
||||
es, err := newServiceHealthEventsForNode(tx, changes.Index, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events = append(events, es...)
|
||||
}
|
||||
|
||||
for tuple, srvChange := range serviceChanges {
|
||||
// change may be nil if there was a change that _affected_ the service
|
||||
// like a change to checks but it didn't actually change the service
|
||||
// record itself.
|
||||
if srvChange.changeType == changeDelete {
|
||||
sn := srvChange.change.Before.(*structs.ServiceNode)
|
||||
e := newServiceHealthEventDeregister(changes.Index, sn)
|
||||
events = append(events, e)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this was a service mutation that changed it's name which
|
||||
// requires special handling even if node changed and new events were
|
||||
// already published.
|
||||
if srvChange.changeType == changeUpdate {
|
||||
before := srvChange.change.Before.(*structs.ServiceNode)
|
||||
after := srvChange.change.After.(*structs.ServiceNode)
|
||||
|
||||
if before.ServiceName != after.ServiceName {
|
||||
// Service was renamed, the code below will ensure the new registrations
|
||||
// go out to subscribers to the new service name topic key, but we need
|
||||
// to fix up subscribers that were watching the old name by sending
|
||||
// deregistrations.
|
||||
e := newServiceHealthEventDeregister(changes.Index, before)
|
||||
events = append(events, e)
|
||||
}
|
||||
|
||||
if e, ok := isConnectProxyDestinationServiceChange(changes.Index, before, after); ok {
|
||||
events = append(events, e)
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := nodeChanges[tuple.Node]; ok {
|
||||
// We already rebuilt events for everything on this node, no need to send
|
||||
// a duplicate.
|
||||
continue
|
||||
}
|
||||
// Build service event and append it
|
||||
e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events = append(events, e)
|
||||
}
|
||||
|
||||
// Duplicate any events that affected connect-enabled instances (proxies or
|
||||
// native apps) to the relevant Connect topic.
|
||||
events = append(events, serviceHealthToConnectEvents(events...)...)
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// isConnectProxyDestinationServiceChange handles the case where a Connect proxy changed
|
||||
// the service it is proxying. We need to issue a de-registration for the old
|
||||
// service on the Connect topic. We don't actually need to deregister this sidecar
|
||||
// service though as it still exists and didn't change its name.
|
||||
func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.ServiceNode) (stream.Event, bool) {
|
||||
if before.ServiceKind != structs.ServiceKindConnectProxy ||
|
||||
before.ServiceProxy.DestinationServiceName == after.ServiceProxy.DestinationServiceName {
|
||||
return stream.Event{}, false
|
||||
}
|
||||
|
||||
e := newServiceHealthEventDeregister(idx, before)
|
||||
e.Topic = TopicServiceHealthConnect
|
||||
e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName
|
||||
return e, true
|
||||
}
|
||||
|
||||
type changeType uint8
|
||||
|
||||
const (
|
||||
// changeIndirect indicates some other object changed which has implications
|
||||
// for the target object.
|
||||
changeIndirect changeType = iota
|
||||
changeDelete
|
||||
changeCreate
|
||||
changeUpdate
|
||||
)
|
||||
|
||||
func changeTypeFromChange(change memdb.Change) changeType {
|
||||
switch {
|
||||
case change.Deleted():
|
||||
return changeDelete
|
||||
case change.Created():
|
||||
return changeCreate
|
||||
default:
|
||||
return changeUpdate
|
||||
}
|
||||
}
|
||||
|
||||
// serviceHealthToConnectEvents converts already formatted service health
|
||||
// registration events into the ones needed to publish to the Connect topic.
|
||||
// This essentially means filtering out any instances that are not Connect
|
||||
// enabled and so of no interest to those subscribers but also involves
|
||||
// switching connection details to be the proxy instead of the actual instance
|
||||
// in case of a sidecar.
|
||||
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
|
||||
var result []stream.Event
|
||||
for _, event := range events {
|
||||
if event.Topic != TopicServiceHealth {
|
||||
// Skip non-health or any events already emitted to Connect topic
|
||||
continue
|
||||
}
|
||||
node := getPayloadCheckServiceNode(event.Payload)
|
||||
if node.Service == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
connectEvent := event
|
||||
connectEvent.Topic = TopicServiceHealthConnect
|
||||
|
||||
switch {
|
||||
case node.Service.Connect.Native:
|
||||
result = append(result, connectEvent)
|
||||
|
||||
case node.Service.Kind == structs.ServiceKindConnectProxy:
|
||||
connectEvent.Key = node.Service.Proxy.DestinationServiceName
|
||||
result = append(result, connectEvent)
|
||||
|
||||
default:
|
||||
// ServiceKindTerminatingGateway changes are handled separately.
|
||||
// All other cases are not relevant to the connect topic
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
|
||||
ep, ok := payload.(eventPayload)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
csn, ok := ep.Obj.(*structs.CheckServiceNode)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return csn
|
||||
}
|
||||
|
||||
// newServiceHealthEventsForNode returns health events for all services on the
|
||||
// given node. This mirrors some of the the logic in the oddly-named
|
||||
// parseCheckServiceNodes but is more efficient since we know they are all on
|
||||
// the same node.
|
||||
func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) {
|
||||
// TODO(namespace-streaming): figure out the right EntMeta and mystery arg.
|
||||
services, err := catalogServiceListByNode(tx, node, nil, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
n, checksFunc, err := getNodeAndChecks(tx, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var events []stream.Event
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
sn := service.(*structs.ServiceNode)
|
||||
|
||||
event := newServiceHealthEventRegister(idx, n, sn, checksFunc(sn.ServiceID))
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// getNodeAndNodeChecks returns a the node structure and a function that returns
|
||||
// the full list of checks for a specific service on that node.
|
||||
func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc, error) {
|
||||
// Fetch the node
|
||||
nodeRaw, err := tx.First("nodes", "id", node)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if nodeRaw == nil {
|
||||
return nil, nil, ErrMissingNode
|
||||
}
|
||||
n := nodeRaw.(*structs.Node)
|
||||
|
||||
// TODO(namespace-streaming): work out what EntMeta is needed here, wildcard?
|
||||
iter, err := catalogListChecksByNode(tx, node, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var nodeChecks structs.HealthChecks
|
||||
var svcChecks map[string]structs.HealthChecks
|
||||
|
||||
for check := iter.Next(); check != nil; check = iter.Next() {
|
||||
check := check.(*structs.HealthCheck)
|
||||
if check.ServiceID == "" {
|
||||
nodeChecks = append(nodeChecks, check)
|
||||
} else {
|
||||
if svcChecks == nil {
|
||||
svcChecks = make(map[string]structs.HealthChecks)
|
||||
}
|
||||
svcChecks[check.ServiceID] = append(svcChecks[check.ServiceID], check)
|
||||
}
|
||||
}
|
||||
serviceChecks := func(serviceID string) structs.HealthChecks {
|
||||
// Create a new slice so that append does not modify the array backing nodeChecks.
|
||||
result := make(structs.HealthChecks, 0, len(nodeChecks))
|
||||
result = append(result, nodeChecks...)
|
||||
for _, check := range svcChecks[serviceID] {
|
||||
result = append(result, check)
|
||||
}
|
||||
return result
|
||||
}
|
||||
return n, serviceChecks, nil
|
||||
}
|
||||
|
||||
type serviceChecksFunc func(serviceID string) structs.HealthChecks
|
||||
|
||||
func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTuple) (stream.Event, error) {
|
||||
n, checksFunc, err := getNodeAndChecks(tx, tuple.Node)
|
||||
if err != nil {
|
||||
return stream.Event{}, err
|
||||
}
|
||||
|
||||
svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID)
|
||||
if err != nil {
|
||||
return stream.Event{}, err
|
||||
}
|
||||
|
||||
raw := svc.Next()
|
||||
if raw == nil {
|
||||
return stream.Event{}, ErrMissingService
|
||||
}
|
||||
|
||||
sn := raw.(*structs.ServiceNode)
|
||||
return newServiceHealthEventRegister(idx, n, sn, checksFunc(sn.ServiceID)), nil
|
||||
}
|
||||
|
||||
func newServiceHealthEventRegister(
|
||||
idx uint64,
|
||||
node *structs.Node,
|
||||
sn *structs.ServiceNode,
|
||||
checks structs.HealthChecks,
|
||||
) stream.Event {
|
||||
csn := &structs.CheckServiceNode{
|
||||
Node: node,
|
||||
Service: sn.ToNodeService(),
|
||||
Checks: checks,
|
||||
}
|
||||
return stream.Event{
|
||||
Topic: TopicServiceHealth,
|
||||
Key: sn.ServiceName,
|
||||
Index: idx,
|
||||
Payload: eventPayload{
|
||||
Op: OpCreate,
|
||||
Obj: csn,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream.Event {
|
||||
// We actually only need the node name populated in the node part as it's only
|
||||
// used as a key to know which service was deregistered so don't bother looking
|
||||
// up the node in the DB. Note that while the ServiceNode does have NodeID
|
||||
// etc. fields, they are never populated in memdb per the comment on that
|
||||
// struct and only filled in when we return copies of the result to users.
|
||||
// This is also important because if the service was deleted as part of a
|
||||
// whole node deregistering then the node record won't actually exist now
|
||||
// anyway and we'd have to plumb it through from the changeset above.
|
||||
csn := &structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
Node: sn.Node,
|
||||
},
|
||||
Service: sn.ToNodeService(),
|
||||
}
|
||||
|
||||
return stream.Event{
|
||||
Topic: TopicServiceHealth,
|
||||
Key: sn.ServiceName,
|
||||
Index: idx,
|
||||
Payload: eventPayload{
|
||||
Op: OpDelete,
|
||||
Obj: csn,
|
||||
},
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -85,11 +85,8 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
|
|||
return t
|
||||
}
|
||||
|
||||
func (c *changeTrackerDB) publish(changes Changes) error {
|
||||
readOnlyTx := c.db.Txn(false)
|
||||
defer readOnlyTx.Abort()
|
||||
|
||||
events, err := c.processChanges(readOnlyTx, changes)
|
||||
func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error {
|
||||
events, err := c.processChanges(tx, changes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed generating events from changes: %v", err)
|
||||
}
|
||||
|
@ -127,7 +124,7 @@ type txn struct {
|
|||
// Index is stored so that it may be passed along to any subscribers as part
|
||||
// of a change event.
|
||||
Index uint64
|
||||
publish func(changes Changes) error
|
||||
publish func(tx ReadTxn, changes Changes) error
|
||||
}
|
||||
|
||||
// Commit first pushes changes to EventPublisher, then calls Commit on the
|
||||
|
@ -152,7 +149,7 @@ func (tx *txn) Commit() error {
|
|||
// In those cases changes should also be empty, and there will be nothing
|
||||
// to publish.
|
||||
if tx.publish != nil {
|
||||
if err := tx.publish(changes); err != nil {
|
||||
if err := tx.publish(tx.Txn, changes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -168,11 +165,33 @@ func (t topic) String() string {
|
|||
return string(t)
|
||||
}
|
||||
|
||||
var (
|
||||
// TopicServiceHealth contains events for all registered service instances.
|
||||
TopicServiceHealth topic = "topic-service-health"
|
||||
// TopicServiceHealthConnect contains events for connect-enabled service instances.
|
||||
TopicServiceHealthConnect topic = "topic-service-health-connect"
|
||||
)
|
||||
|
||||
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||
// TODO: add other table handlers here.
|
||||
return aclChangeUnsubscribeEvent(tx, changes)
|
||||
var events []stream.Event
|
||||
fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){
|
||||
aclChangeUnsubscribeEvent,
|
||||
ServiceHealthEventsFromChanges,
|
||||
// TODO: add other table handlers here.
|
||||
}
|
||||
for _, fn := range fns {
|
||||
e, err := fn(tx, changes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events = append(events, e...)
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func newSnapshotHandlers() stream.SnapshotHandlers {
|
||||
return stream.SnapshotHandlers{}
|
||||
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
|
||||
return stream.SnapshotHandlers{
|
||||
TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth),
|
||||
TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,17 +162,17 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
|
|||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
s := &Store{
|
||||
schema: schema,
|
||||
abandonCh: make(chan struct{}),
|
||||
kvsGraveyard: NewGraveyard(gc),
|
||||
lockDelay: NewDelay(),
|
||||
db: &changeTrackerDB{
|
||||
db: db,
|
||||
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second),
|
||||
processChanges: processDBChanges,
|
||||
},
|
||||
schema: schema,
|
||||
abandonCh: make(chan struct{}),
|
||||
kvsGraveyard: NewGraveyard(gc),
|
||||
lockDelay: NewDelay(),
|
||||
stopEventPublisher: cancel,
|
||||
}
|
||||
s.db = &changeTrackerDB{
|
||||
db: db,
|
||||
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second),
|
||||
processChanges: processDBChanges,
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -376,7 +376,7 @@ var topicService stream.Topic = topic("test-topic-service")
|
|||
|
||||
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
|
||||
return stream.SnapshotHandlers{
|
||||
topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
|
||||
topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
|
||||
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
|
||||
if err != nil {
|
||||
return idx, err
|
||||
|
|
|
@ -61,7 +61,11 @@ type changeEvents struct {
|
|||
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
||||
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
||||
// The nil Topic is reserved and should not be used.
|
||||
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
||||
type SnapshotHandlers map[Topic]SnapshotFunc
|
||||
|
||||
// SnapshotFunc builds a snapshot for the subscription request, and appends the
|
||||
// events to the Snapshot using SnapshotAppender.
|
||||
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
||||
|
||||
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
||||
type SnapshotAppender interface {
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
|||
|
||||
func newTestSnapshotHandlers() SnapshotHandlers {
|
||||
return SnapshotHandlers{
|
||||
testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
if req.Topic != testTopic {
|
||||
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
|
||||
}
|
||||
|
@ -117,7 +117,7 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
|
|||
t.Cleanup(cancel)
|
||||
|
||||
handlers := newTestSnapshotHandlers()
|
||||
fn := func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
fn := func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
handlers[intTopic(22)] = fn
|
||||
|
|
|
@ -18,8 +18,6 @@ type eventSnapshot struct {
|
|||
snapBuffer *eventBuffer
|
||||
}
|
||||
|
||||
type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
|
||||
|
||||
// newEventSnapshot creates a snapshot buffer based on the subscription request.
|
||||
// The current buffer head for the topic requested is passed so that once the
|
||||
// snapshot is complete and has been delivered into the buffer, any events
|
||||
|
@ -27,7 +25,7 @@ type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
|
|||
// missed. Once the snapshot is delivered the topic buffer is spliced onto the
|
||||
// snapshot buffer so that subscribers will naturally follow from the snapshot
|
||||
// to wait for any subsequent updates.
|
||||
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot {
|
||||
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot {
|
||||
buf := newEventBuffer()
|
||||
s := &eventSnapshot{
|
||||
Head: buf.Head(),
|
||||
|
@ -35,7 +33,7 @@ func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn sna
|
|||
}
|
||||
|
||||
go func() {
|
||||
idx, err := fn(req, s.snapBuffer)
|
||||
idx, err := fn(*req, s.snapBuffer)
|
||||
if err != nil {
|
||||
s.snapBuffer.AppendItem(&bufferItem{Err: err})
|
||||
return
|
||||
|
|
|
@ -161,8 +161,8 @@ func genSequentialIDs(start, end int) []string {
|
|||
return ids
|
||||
}
|
||||
|
||||
func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc {
|
||||
return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapshotFunc {
|
||||
return func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
for i := 0; i < size; i++ {
|
||||
// Event content is arbitrary we are just using Health because it's the
|
||||
// first type defined. We just want a set of things with consecutive
|
||||
|
|
Loading…
Reference in New Issue