Data quality monitoring made easy
Jul 16, 2021 • 11 min read
Jul 16, 2021 • 11 min read
This Blueprint aims to address a number of architectural goals we’ve heard repeatedly from our customers, who are CTOs and Chief Architects of Big Data systems:
In the current version of the Blueprint, we pre-integrated a number of technologies (outlined in this post) that, together, make up an In-Stream Processing stack:
The Blueprint also addresses a number of operational concerns:
We start by describing a distributed, fault-tolerant computing system architecture that is highly available with respect to a failure of any node or functional component of the Blueprint. The diagram below shows the minimum cluster configuration that can be scaled up to meet actual business demands:
If any single node fails, the system in the diagram above will still be fully functional. The diagram below shows which nodes could fail simultaneously without a negative impact on our In-Stream Processing service's functionality. There won’t be any performance degradation if the virtual machines are sized correctly to handle the complete workload in an emergency situation.
One point worth noting is that the absence of a single point of failure makes the system tolerant of many — but not all — kinds of failures. For example, systems distributed across several data centers indirectly include communication subsystems that are not under the direct control of a Data Center or a Platform as a Service (PaaS) provider. The number of possible deployment configurations is so large that no single recipe can be universal enough to address all non-specific fault tolerance problems. That said, the presented architecture is able to ensure 99.999% reliability in most cases.
Another important aspect of reliability is the delivery semantic. In the strictest case every event must be processed by the streaming service once, and only once. An end-to-end ability to process all the events exactly once requires:
The idempotency requirement means the downstream system has to tolerate data duplication. See the diagram below:
Data transmission reliability to (communication point #1) and from our In-Stream Processing Service (communication point #3) requires certain behaviors from components external to the Blueprint. Specifically, the source and downstream systems should deliver and consume events in a synchronous way.
Apache Kafka and Spark Streaming ensure that data will be processed without losses once it is accepted for processing. Simply speaking, failures inside the Spark Streaming Application are automatically handled by re-processing the recent data. It is achieved by Apache Kafka’s capability to keep data for a configurable period of time up to dozens of days (communication point #2).
Getting an optimal configuration for Apache Kafka requires several important design decisions and careful trade offs between architectural concerns such as durability and latency, ease of scaling, and cost of overhead.
First, let’s start with durability concerns. Apache Kafka is run as a cluster of brokers. One cluster can contain many logical channels called queues or topics, each split into multiple partitions. Each partition is replicated across brokers according to a replication factor configured for durability. As a result, each broker is responsible for a number of assigned partitions and replicas. Coordination of re-assignments in case of failures is provided with Apache ZooKeeper.
Data replication and persistence together deliver “pretty good” immunity from data loss. But as with all distributed systems, there is a trade off between latency and the system’s level of durability. Apache Kafka provides many configuration options, starting with asynchronous delivery of events from producers in batches and ending with synchronous publishing of every event blocking until the event is committed, i.e. when all in sync replicas for the partition have applied it to their logs.
Asynchronous delivery clearly will be the fastest. The producer will receive an immediate acknowledgment when data reaches the Kafka broker without waiting for replication and persistence. If something happens to the Kafka broker at that moment, the data will be lost. The other extreme is full persistence. The producer will receive an acknowledgment only after the submitted events are successfully replicated and physically stored. If something happens during these operations, the producer will re-submit the events. This will be the safest configuration option from a data loss standpoint, but it is the slowest mode.
The next important aspect of Apache Kafka configuration is choosing the optimal number of partitions. Since repartitioning the message queue is a manual process, it is highly disruptive to service operations because it requires downtime. Ideally, we shouldn’t need to do this more frequently than once a year; however, to maintain stable partitioning for this long requires solid forecasting skills.
In a nutshell, getting partitioning right is a combination of art and science directed at finding a balance between ease of scaling and the overhead cost of over-partitioning. More partitions bring higher overhead and impact memory requirements, ZooKeeper performance, and partition transitioning time during failover. On the other hand, since a partition is a unit of parallelization, the number of partitions should be sufficient to efficiently parallelize event handling by the message queue. Since a single Kafka partition must be consumed by a single sourcing container in a Spark Streaming application, the number of partitions also heavily affects overall throughput of event consumption by the sourcing containers. The advantages and drawbacks of over-partitioning are described in this blog post from the Apache Kafka designers.
The illustration above shows how, for efficient topic partitioning, we should estimate the number of partitions. It should be the minimum number sufficient to achieve:
The process of finding an optimal parallelization level for a Spark Streaming applications is, again, a mixture of art and science. It is an iterative process of application tuning with many forks and dependencies that is highly dependent on the nature of the application. Sadly, no “one-size-fits-all” methodology has emerged that can substitute for hands-on experience.
Finally, we will provide a few thoughts on hardware configuration for Kafka. Since Kafka is a highly-optimized message queue system designed to efficiently parallelize computing and I/O operations, it’s crucial to parallelize storage I/O operations since they are the slowest part of the system, especially with random data access. An important Apache Kafka advantage is its ability to keep the raw events stream for several days. For example, in case of 300K events per second, 150 bytes per event, the data size for seven days will be about 24 TB. Kafka provides per-event compression which can cut storage requirements, but compression impacts performance — and, in any case, the actual compression ratio depends on the events format and content.
ZooKeeper is a distributed coordination service used to achieve high availability for Apache Kafka partitions and brokers, as well as a resource manager for the Spark Streaming cluster. Apache ZooKeeper lets distributed systems follow services, which facilitates overall high availability: it provides guaranteed consistent storage for the state of a distributed system, client monitoring, and leader election.
Previous versions of the Spark API for Apache Kafka integration used ZooKeeper for managing consumer offsets, which almost always turned ZooKeeper into a performance bottleneck. Nowadays, with the introduction of a Direct Apache Kafka API in Spark, consumer offsets are best managed in checkpoints. In this case, one ZooKeeper ensemble can be shared between the Message Queue and the Stream Processing clusters with no compromise in performance or availability, while substantially reducing administration efforts and cloud resources.
A ZooKeeper ensemble consists of several, typically three to seven, identical nodes that each have dedicated high speed storage for transaction persistence. ZooKeeper works with small and simple data structures reflecting the cluster state. This data is very limited in size. Therefore RAM and consumed disk space are almost independent of the event flow rate and may be considered constant for all practical purposes.
ZooKeeper performance is sensitive to storage I/O performance. Considering that ZooKeeper is a crucial part of the Kafka ecosystem, it’s a good idea to run it on dedicated instances or at least to set up a dedicated storage mount point for its persistent data.
Here is a fault-tolerant Spark Streaming cluster design. It addresses three reliability aspects: zero data loss, no duplicates, and automatic failover for all component-level failures. The next diagram shows a five-component sub-system where each component must be highly available:
The next aspect of reliability is the assurance of zero data loss and no duplications. Typical message delivery semantics recognize three types of requirements:
In-Stream Processing applications are often required to support Exactly Once delivery semantics. While it is possible to achieve an end-to-end Exactly Once semantic for most In-Stream Processing applications, comprehensive discussion of this topic deserves a separate article. Here, we’ll briefly mention the main approaches, starting with the diagram below:
As previously mentioned, Kafka has the ability to replay the stream, which is important if there is a failure. A replay of recent events is automatically triggered in case of a failure inside the Stream Processing Application, whether it is caused by hardware or software. “Recent events” in this case means micro-batches that were emitted but not successfully processed. At Least Once delivery semantics are achieved by doing this. Exactly Once requires additional effort which either involves In-Stream events deduplication (i.e. looking up processed event identifiers from an external database) or by designing an idempotent method of insight consumption by downstream systems. The last option simply means the downstream system is tolerant of duplicates; i.e. two equal REST calls, one of which is duplicated, will not break consistency.
Cassandra is a massively scalable, highly available NoSQL database. A Cassandra cluster consists of peer nodes with no dedicated cluster manager function. That means any client may connect to any node and request any data, as shown on this diagram.
The minimum highly available Cassandra cluster consists of 3 (three) nodes, where replication factor is 3 (three), with 2 (two) read and write replicas. With fewer nodes, a node failure will block read or write operations if the database was configured to ensure data consistency.
Cassandra allows you to tune its durability and consistency levels by configuring the replication factor (RF) and the number of nodes to write (W) and read (R). In order for the data to be consistent, the following rule has to be fulfilled: W + R > RF.
Durability is achieved via data replication. The Virtual Node (VN) concept is essential for understanding how data replication works. As we have said, a regular Cassandra cluster consists of several homogenous nodes. The dataset is split into many segments, called Virtual Nodes. Each segment is replicated several times according to the configured replication factor, as seen in the diagram below:
Hardware failures, generally speaking, result in data inconsistency in replicas. Once a failed node comes back online, a repair process is started that restores replica consistency. The repair process has automatic and manual options. If the failed node is considered permanently down, the administrator has to remove it manually from the cluster.
Another aspect of durability is data persistence. Cassandra employs commit logs. Essentially, all writes to a commit log are cached like any other storage I/O operation. The data is truly persisted only after it is physically written to storage. Since such operations are time-consuming, the commit log is synced periodically — every so many milliseconds — based on parameters specified in the configuration. Theoretically, the data not synced with physical storage might be lost due to problems such as a sudden power outage. In practice, data replication and power supply redundancy make this situation very unlikely.
Redis was originally written in 2009 as a simple, key-value, in-memory database. This made Redis very efficient for read operations with simple data structures. Since then, Redis has acquired many additional features including sharding, persistence, replication, and automatic failover. This has made it an ideal lookup datastore. Since lookup data volumes are usually rather small, we rarely see a necessity for data partitioning for scalability with Redis. The typical HA Redis deployment consists of at least two nodes. Availability is achieved with Master-Slave data replication. Redis includes an agent called Sentinel that monitors data nodes and performs the failover procedure when the Master node fails. Sentinel can be deployed in various ways; the documentation provides information about the advantages and pitfalls of different approaches. We suggest deploying Sentinel to ZooKeeper nodes, as shown in the following diagram:
The number of running Sentinel instances must be greater than two since each Sentinel agent can start a failover procedure after coordination with a majority of agents.
Redis is very performant database and it’s very unlikely that the CPU will become a bottleneck, any more than memory will for the “lookup database.” However, if the CPU becomes a bottleneck, scalability is achieved via data sharding. Redis is a single-threaded application, so multiple CPU cores can be utilized with data sharding and by running several Redis instances on the same VM. In sharding mode, Redis creates 16384 hash slots and distributes them between shards. Clients may still request data from any node. In case a request comes to a node that doesn’t have the requested data, it will be redirected to the proper node.
Durability is achieved with data persistence. Here, Redis provides two main options: a snapshot of all user data as an RDB file, or log all write operations in an AOF (Append-Only File). Both options have their own advantages and drawbacks, which are described in detail in the official documentation. (In our experience we have found that the AOF method provides higher durability in exchange for slightly more maintenance effort).
Now that we have analyzed each component separately, it is time to display a single “big picture” diagram that shows the complete, end-to-end topology of a scalable, durable and fault-tolerant In-Stream Processing service design:
To summarize the capabilities of our In-Stream Processing service, these are the SLAs targeted by the design we have presented here:
Several important architectural and operational concerns are not addressed by this version of our Blueprint, such as:
The usual reasons for exclusion include:
If you have a question you’d like to ask, we’d be happy to share what we’ve learned. Please leave a question in the comments or send us a note.
Sergey Tryuber, Anton Ovchinnikov, Victoria Livschitz