Data quality monitoring made easy
Jul 16, 2021 • 11 min read
Jul 16, 2021 • 11 min read
In early 2016 one of our online media customers came to us with a problem. Our customer, a media giant, hosts articles from its newspapers and magazines on its websites. Each of the articles’ web pages has three ad blocks, and the customer buys paid redirects to the article pages. They then analyze how much is spent on the redirects and how much it will earned from the ads. If an ad campaign proves to be profitable, the company may spend more money on it in order to earn more. It also uses analytics to determine its most popular journalists, most popular articles, and trends in what people are reading.
The problem was that its analytics were too slow. Its existing batch processing system was taking 50 to 60 minutes to deliver user data, but the company wanted that data output to its AWS Redshift data warehouse in five minutes. The longer it had to wait, the greater the chance that it would miss out on revenue opportunities, or spend too long promoting poorly performing campaigns. The customer wanted a new platform that was faster, handled big data analytics better, had true streaming capability, was easy and fast to scale up, could be hosted on AWS, and used fewer corporate resources. Together, we decided to replace the existing batch processing platform with stream processing and to host the platform on the cloud.
To meet the strict five-minute SLA, we needed to dramatically decrease complexity and focus on In-Stream Processing. After a series of discussions we produced the following solution architecture:
In the process of moving to the streaming platform, we encountered a number of challenges we had to overcome before going into production. We needed to:
In this application, an event was an action initiated by a user from a web browser.
The Spark streaming application loses three minutes’ worth of data if Spark Streaming fails. At first, Spark checkpointing looked like the best way to deal with the problem of Spark Streaming failures, but when we tested checkpointing under production load we ran into a surprising issue: from time to time, Spark wasn’t able to read its own checkpointed data. This finding forced us to build a custom solution to audit the data for consistency.
For the audit process we decided to use Hive over the data in S3. (If we were architecting the application today we might use AWS Athena, but at the time we were architecting this system it hadn’t been released.) We configured Hive tables on top of the data in S3 so we could easily query data in S3. A Hive query would return all missing or uncompleted batch IDs, which would be reprocessed — in other words, written to Kinesis — where they were read and processed by the Spark Streaming application. The audit flow checked that all data was consistent, and if any data was missing, it would be sent to reprocessing.
The audit process looked like this:
At the REST endpoint all data was packed into batches (files), and each batch (file) was given a unique ID — the batch_id plus the number of lines in the batch, which is stored as metadata in an AWS SimpleDB so the audit process can easily verify whether the processed data has the correct number of records for a given batch_id. If part of the data is missing, the audit process would load the whole batch to Kinesis. This would cause data duplication, so we implemented deduplication to handle this case and any others where we might also have duplicates.
To implement deduplication, we store all IDs of processed records and simply filter incoming data against the IDs we’ve already processed, as we’ve discussed in detail elsewhere.
One of our key requirements called for a complex join strategy. After a web page fires an event on load, it will fire the next event in 90 seconds. We need to join the first event with the second using a hash, then take all the fields from the second event and add them to the first.
The Spark application reads data continuously and packs it into 90-second batches. When the application starts we read the first batch and wait for the second. Once the second batch arrives, we start processing the first batch and take events from the second batch for the join step. The join itself is done by Spark.
The join logic looks like:
After the join is completed successfully, we take all the extra data from the joined events and write it to the first event from the batch that has time stamp 10:00:00. In general, all data goes through deduplication, extraction, join, filtering (bot filtering), and data enrichment stages, after which the data is formatted the way we need and split by type — we have several data types — and ready to be stored in S3.
Sometimes Spark Streaming fails due to S3 connectivity issues, which happen regularly. If the flow fails while we’re storing to S3, we can run into a case where one data type is stored while five other types are not. To avoid such behavior, we implemented a transaction mechanism.
The streaming application has six data subflows which are stored separately in different formats (avro or tsv for further loading to AWS Redshift) with different compression codecs. To consistently store all data or none, we coded a JobDataTransaction step in our ETL workflow.
To build in transactions, we first store all data to a temporary (jobdata) directory, with all six flows in the application stored separately. Once all flows are stored, we trigger creation of a transaction, and move all data from the temporary directory to the final one. We use several tricky techniques to make Spark wait until a transaction completes before starting to process the next batch of data. In the end, we get all the data into S3.
Once data is in the final directory, it’s ready to be loaded to Redshift. Our Delivery Driver application is responsible for loading to Redshift. The application constantly polls new manifests in S3. If one is ready, the driver triggers a data import load from S3 to Redshift. If Redshift faces any kind of issues (a vacuum is running, or there’s no empty disk space due to a complex Data Analyst query), the application will load the batch later.
You may wonder: Why don’t we stream data from an application directly to Redshift? The answer is simple: Most data warehouses, including Redshift, don’t support streaming at all. The only data warehouse that does, to my knowledge, is Google BigQuery, but it only started supporting streaming, in a beta version, a year after we implemented and released our streaming platform.
Spark provides for both vertical and horizontal scaling. Our streaming application can scale vertically by increasing the number of Spark executor cores, though we’re already using high-performant nodes. For horizontal scaling, we can increase the number of executors when we see a higher number of events.
Our cloud-hosted application had one more advantage. It can be hard or even impossible to add several new servers to an existing cluster due to domain controller limits, and monitoring and supporting 65 nodes of an on-premises cluster takes a lot of resources. In the old system the client had to run lots of monitoring processes to check hardware health. Cloud hosting avoids all those support, upgrade, and capacity planning issues; if we face a hardware problem, we simply terminate the affected instance and replace it. It’s also worth mentioning that, with the new system, the client was able to reduce the size of its DevOps team.
Migrating the client’s batch processing system to a stream processing application was a hard task, but well worth the time and effort it took, given the benefits we derived (see table). We cut our client’s data delivery time from 50 to 60 minutes down to the desired five minutes. The client ended up with a simple cloud-based application stack that makes upgrades and additions simple, and cuts its potential downtime dramatically. We also lowered the necessary amount of resources to run the application, from 60 nodes running on the customer’s hardware to 18 running on AWS.
Benefits of the new streaming application
|Old Flow||New Flow|
|Data delivery latency||50 - 60 minutes||5 minutes|
|Cluster||65 nodes in DC||18 nodes in AWS (AWS installation was cheaper than bare metal)|
|Monitoring||Hard to monitor 65 nodes||Monitoring is done mostly by cloud provider|
|When software/hardware upgrade is needed||Stop current cluster, install new Hadoop, deploy new application. Upgrade takes all weekend.||Easy Spark/Hadoop version upgrade: spin up new cluster, move processing there. Upgrade done with 10 minutes downtime.|
It’s no easy task to deliver data in near real-time, with guaranteed delivery and without duplicates, but similar requirements can crop up in almost any cloud or big data conversion. Implementing an architecture like the Grid Dynamics In-Stream Processing Service Blueprint can lessen the pain of a major stream processing conversion. Another thing that helps is to “expect the unexpected.” Who would guess that Spark would be unable to read its own checkpointing output? We didn’t expect that, but we handled it, just as you will handle similar problems when they crop up in your projects.