A high-level conceptual definition of a platform for stateful event processing with low latency is given.
- Client connections
- Optimised for low latency
- Client/server latency measured and proactively monitored on all levels - also on the client for better UX
- Connections on edge servers (smooth rollouts of updates, geo-expansion)
- Two-way resilient communication between end user devices and the entities, persistent socket
- The users perceive state change as immediate: In system message roundtrip p99 delays <100ms
- Stateful entities
- Large entities shared to 1000+ users, 1000+ updates per second.
- Small entities (e.g. user states) 1M+ active users
- Local platform states - strong consistency, CQRS/event sourcing for transactional integrity
- Remote state proxies - eventual consistency, strong consistency via sync/remote system call
- Durability: entity (inclusing all per-user states) is guaranteed to be recovered in <1 second, even in case of severe failure.
- Scalability: entities are independent, not locking each other resources, smooth horisontal upscaling by adding nodes
- Caching: read models with tunable consistency, downstream aggregation
- Message transport
- ‘At most once’ delivery for messages between state nodes and edge nodes
- Guarantees uniqueness of stateful entity receiving the message
- Parallelisation and scaling of traffic
- Development and delivery
- Sandbox model for functionality development and testing (pure domain-leve APIs for events, client messages)
- Continuous delivery with minimal friction
- Rolling updates: applications are updated without any noticeable downtime
- Extensible architecture:
- User identity via JWT tokens
- Clearly defined and well maintained APIs
- Downstreaming events for live views, reporting, backoffices…
Edge and state nodes
- Edge nodes are the nodes to which end user devices (phones, browsers) are connected.
- State nodes keep the actual state, as identifiable entities, in memory and as durable copy.
Edges could be public or internal:
- Public edge nodes can be located geographically close to the end users.
- Internal edge nodes are located on premises, connecting special interfaces and devices with the system. They could use either pull or push, any kind of API that the device needs. Authentication on the internal nodes is done by IP address or some other hard-wired way.
State nodes’ location may be subject to regulations (where all stuff happens).
Transport layer connects edge nodes with state nodes. Each edge or state node could be brought down gracefully transparently for the users, without any real-time process being disrupted.
For development purposes, local/minimal transport is supported, to allow development and running functional tests in the same process.
Event sourced states
Event sourced, dynamically updated states are the core business of the platform.
- confidentiality - only authorised users could access states
- integrity - the state is at all times consistent
- availability - the state is either available, or being recovered after switching from leaving node
Categories and IDs
By the nature of the objects they represent, stateful entities appear in several categories. Each category’s states differ in size (hundreds of bytes to some megabytes), the numbers of entities within category (1 (singleton), <1000 or 1M+>?) and how often they are updated/queried.
Each entity has an ID unique within its category.
Each state node can hold entities from multiple categories, the entity distribution layout (which node holds which share of which categories) could be configured.
There may be dedicated ‘lanes’ in the transport layer - to allow tuning for QoS, etc. per class of communication/category.
State entities are implemented using CQRS/event sourcing paradigm. An entity processes one message at a time, decides whether or not it changes the state. If it does, an event is generated and persisted to durable storage. Then change is applied to the state. Finally, some messages could be sent back to the clients.
API for a state entity includes logics for standard client messages like
terminate(d), persistence journal, and outbound communication.
All events, that change the state, are streamed down to
events topic as part of state entity API, for secondary aggregation/display or offline reporting needs.
The stream of events of an entity is also published for downstream consumers and bulding of read models (caches) across the platform.
The platform API also supports creation of mirror entities, feeding on the stream of events from
events topic and recovering the exact state.
Durability and recovery
Event-sourced states uses event storage, durable database persisting state on physical media:
- To save the stream of events
- For eventual state recovery (snapshots + events)
It is of vital importance to ensure instant recovery of the entities.
If recovery from the primary persistence is not enough, an optimisation could be applied: at the cost of extra memory, each state node
assigned to the domain, listens to
events topic and keeps the mirrored state in memory.
When the state needs to be recovered, the readily available state could be picked up from the memory immediately,
without recovering it from persistent storage.
To query and update state that is external to the platform, state proxies are used. By definition, it is not strongly consistent, unless explicitly requested from external primary source. For many uses, ‘last known good’ state may be sufficient, so the state proxy holds last known remote state and its timestamp.
The state proxies support the following operations:
- push state from primary (incoming update)
- force pull state from primary
- send state update command + force pull updated state (one atomic operation)
- read cached ‘last known’ state
Message transport delivers messages from/to clients on the edges, and between stateful entities.
It guarantees that the messages will be delivered to an entity on a given node, one at a time, and there will be just one such instance, per entity ID.
Per message, delivery guarantee - at most once, with duplicates possible. The entities are responsible for idempotent message processing - assuming each message has unique ID, generated by its sender.
It delivers messages from the clients to the state entities (inbound traffic) and from entities to the clients (outbound). It also delivers messages between the states.
Edge node uses entity ID as a key to guarantee that all these IDs are coming through the same partition, preserving the sequence.
To save on latency, edge nodes do not parse the payload of the actual messages, passing it through, just using an envelope (entity ID as the key and adding connection ID and edge node own timestamp as attributes).
Message transport guarantees to deliver messages addressed to an instance entity with certain ID, ensuring it is one and only one instance at all available state nodes. This prevents split-brain scenarios in which different states under the same ID exist.
It is up to the state entity to process the message and ensure that it is valid and authorised, according to connection ID (JWT claims were sent in
connect when the connection was established) and edge node timestamp, to apply time-related logics.
Outbound messages (from stateful entities to the clients) contain a target lookup query, that targets either one client
connection by ID or all clients connected to a certain entity, maybe or some others.
This allows saving on fan-out outbound traffic, when a message needs to be delivered to many clients at the same entity.
All edge nodes listen to all messages in
outbound channel and use the lookups to send them to locally connected clients.
End-user device establishes a websocket connection to the public edge node, using token issued by identity service for authorisation.
The same device could establish and operation multiple connections at the same time, possibly via different edge nodes.
Internal devices may connect to internal edges by various means, authentication is per IP address or somehow per device class. Internal edges may implement any kind of interface, e.g. HTTP push, HTTP pull, websocket or UART/serial port.
Websocket HTTP upgrade request carries
Authorization: Bearer xxx JWT token to authorise given client for connection.
The connection, in its URL, always addresses some stateful entity, and there’s protocol in place for joining/leaving the state.
Public edge node validates JWT token signature, and passes claims of the token are sent to the state entity in the initial
The entity either accepts the client and sends state update message, or sends
terminate if the claims are not accepted.
When connection is dropped by client,
disconnect message is sent to the entity, that could schedule handling of disconnect or wait till eventual
The claims in the token may contain specific end user roles, so the stateful entity knows who of the connected users is entitled to what.
Each connection has the following attributes:
- connection ID - to track zombie connections/refreshes
- entity domain (uses same communication channel)
- domain entity ID (aggregate root within a domain)
Connection ID is generated as UUID by the client. Node doesn’t care about the ID content and assumes it’s just an unique string, long enough to prevent “insecure direct object reference” kind of guess attack. When a connection is dropped, client is required to reconnect keeping the same connection ID.
Edge node, on request of the state entity or otherwise, can send
terminate message requesting end user device to close connection and either reconnect or go away.
In particular, client connection is terminated upon session timeout or a forced logout.
When client just goes away and closes the connection (in fact, when any kind of socket exception kills the connection),
terminated message is sent
to the state entity. It is up to state entity to wait if the client comes back, ‘remembering’ him for some grace period, hoping that
he comes back after a while, with same connection ID, applying any kind of custom ‘cleanup’ logics.