docs: Add developer documentation and diagrams for streaming

This commit is contained in:
Daniel Nephin 2020-11-19 12:16:54 -05:00
parent 0d35351e2d
commit afec53190e
9 changed files with 204 additions and 0 deletions

100
docs/streaming/README.md Normal file
View File

@ -0,0 +1,100 @@
# Event Streaming
Event streaming is a new asynchronous RPC mechanism that is being added to Consul. Instead
of synchronous blocking RPC calls (long polling) to fetch data when it changes, streaming
sends events as they occur, and the client maintains a materialized view of the events.
At the time of writing only the service health endpoint uses streaming, but more endpoints
will be added in the future.
## Overview
The diagram below shows the components that are used in streaming, and how they fit into
the rest of Consul.
![Streaming Overview](./overview.svg)
<sup>[source](./overview.mmd)</sup>
Read requests are received either from the HTTP API or from a DNS request. They use
[rpcclient/health.Health]
to query the cache. The [StreamingHealthServices cache-type] uses a [materialized view]
to manage subscriptions and store the aggregated events. On the server, the
[SubscribeEndpoint] subscribes and receives events from [EventPublisher].
Writes will likely enter the system through the client as well, but to make the diagram
less complicated the write flow starts when it is received by the RPC endpoint. The
endpoint calls raft.Apply, which if successful will save the new data in the state.Store.
When the [state.Store commits] it produces an event which is managed by the [EventPublisher]
and sent to any active subscriptions.
[rpcclient/health.Health]: https://github.com/hashicorp/consul/blob/master/agent/rpcclient/health/health.go
[StreamingHealthServices cache-type]: https://github.com/hashicorp/consul/blob/master/agent/cache-types/streaming_health_services.go
[materialized view]: https://github.com/hashicorp/consul/blob/master/agent/submatview/materializer.go
[SubscribeEndpoint]: https://github.com/hashicorp/consul/blob/master/agent/rpc/subscribe/subscribe.go
[EventPublisher]: https://github.com/hashicorp/consul/blob/master/agent/consul/stream/event_publisher.go
[state.Store commits]: https://github.com/hashicorp/consul/blob/master/agent/consul/state/memdb.go
## Event Publisher
The [EventPublisher] in at the core of streaming. It receives published events, and
subscription requests, and forwards events to the appropriate subscriptions. The diagram
below illustrates how events are stored by the [EventPublisher].
![Event Publisher layout](./event-publisher-layout.svg)
<sup>[source](./event-publisher-layout.mmd)</sup>
When a new subscription is created it will create a snapshot of the events required to
reflect the current state. This snapshot is cached by the [EventPublisher] so that other
subscriptions can re-use the snapshot without having to recreate it.
The snapshot always points at the first item in the linked list of events. A subscription
will initially point at the first item, but the pointer advances each time
`Subscribe.Next` is called. The topic buffers in the EventPublisher always point at the
latest item in the linked list, so that new events can be appended to the buffer.
When a snapshot cache TTL expires, the snapshot is removed. If there are no other
subscriptions holding a reference to those items, the items will be garbage collected by
the Go runtime. This setup allows EventPublisher to keep some events around for a short
period of time, without any hard coded limit on the number of events to cache.
## Subscription events
A subscription provides a stream of events on a single topic. Most of the events contain
data for a change in state, but there are a few special "framing" events that are used to
communicate something to the client. The diagram below helps illustrate the logic in
`EventPublusher.Subscribe` and the [materialized view].
![Framing events](./framing-events.svg)
<sup>[source](./framing-events.mmd)</sup>
Events in the `Snapshot` contain the same data as those in the `EventStream`, the only
difference is that events in the `Snapshot` indicate the current state not a change in
state.
`NewSnapshotToFollow` is a framing event that indicates to the client that their existing
view is out of date. They must reset their view and prepare to receive a new snapshot.
`EndOfSnapshot` indicates to the client that the snapshot is complete. Any future events
will be changes in state.
## Event filtering
As events pass through the system from the `state.Store` to the client they are grouped
and filtered along the way. The diagram below helps illustrate where each of the grouping
and filtering happens.
![event filtering](./event-filtering.svg)
<sup>[source](./event-filtering.mmd)</sup>

View File

@ -0,0 +1,11 @@
graph TD
state.Store -->|events in different topics| EventPublisher.Publish
EventPublisher.Publish -->|group by topic| EventPublisher.topicBuffer
EventPublisher.topicBuffer --> Subscription
Subscription -->|filter by key and namespace| SubscribeEndpoint
SubscribeEndpoint -->|"filter by auth (acl token)"| ProtobufEvents[/ grpc /]
ProtobufEvents -->|filter with bexpr| MaterializedView
MaterializedView --> HTTPEndpoint

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 13 KiB

View File

@ -0,0 +1,36 @@
graph TB
subgraph ep[ ]
EventPublisher
subscriptions
snapshots
topicBuffers
end
EventPublisher --> snapshots & subscriptions & topicBuffers
Subscription
Snapshot
Item0
Item1
Item2
Item3
Item4
topicBuffers ----->|head| Item4
subscriptions --> Subscription
snapshots --> Snapshot
Subscription -->|next| Item0
Item0 --> Item1
Item1 --> Item2
Item2 --> Item3
Item3 --> Item4
Snapshot -->|first| Item0
Subscription -..->|next| Item1
Subscription -..->|next| Item2
Subscription -..->|next| Item3
Subscription -..->|next| Item4

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 23 KiB

View File

@ -0,0 +1,17 @@
graph TD
SubscribeIndex0[Subscribe, index = 0, no snapshot]
SubscribeIndexNot0[Subscribe, index > 0, with snapshot]
SubscribeIndex0 --->|if events in topic| Snapshot
Snapshot --> EndOfSnapshot
SubscribeIndex0 ------->|no events in topic| EndOfSnapshot
EndOfSnapshot --> EventStream
SubscribeIndexNot0 -->|if index != TopicBuffer.Head| NewSnapshotToFollow
NewSnapshotToFollow ---> Snapshot
SubscribeIndexNot0 -->|if index == TopicBuffer.Head| EventStream
class EndOfSnapshot,NewSnapshotToFollow framing
classDef framing fill:#FFD700,stroke:#333

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 13 KiB

View File

@ -0,0 +1,36 @@
graph TD
subgraph ClientAgent[Client Agent]
HTTPEndpoint
DNSEndpoint
rpcClient.Health
AgentCache
MaterializedView
end
subgraph ServerAgent[Server Agent]
RPCEndpoint
raft.Apply
FSM.applyRegistration
state.Store.Register
SubscribeEndpoint
EventPublisher
end
Read --> HTTPEndpoint & DNSEndpoint
HTTPEndpoint & DNSEndpoint --> rpcClient.Health
rpcClient.Health --> AgentCache
AgentCache --> MaterializedView
MaterializedView --> SubscribeEndpoint
SubscribeEndpoint -->|Subscribe to topic| EventPublisher
Write --> RPCEndpoint
RPCEndpoint --> raft.Apply
raft.Apply --> FSM.applyRegistration
FSM.applyRegistration --> state.Store.Register
state.Store.Register -->|Publish event| EventPublisher
class Read,Write start
classDef start fill:transparent,stroke:transparent

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 22 KiB