Messages coming into your Spark stream processor may not arrive in the order you expect. Learn how to handle the unexpected.
So you've decided to use Apache Spark Streaming to update your real-time analytics system. This tool helps to handle events being published by upstream services and collating them into a database.
However, with any asynchronous system, it's possible that events arrive in a different order than the order they were published. This could be due to network outages, overloaded systems, or a variety of other causes.
Regardless of the reason, how do you ensure that asynchronous events with dependent data are processed in the order you expect? For instance, what if one event contained a foreign key to data in another event? Well, you would need to ensure that the referenced event was persisted first.
In this blog post, we'll explore how we can handle this scenario using Spark Streaming. Let’s take a look.
Apache Spark Streaming is a real-time data processing tool that seamlessly integrates with the core Spark API. Instead of dealing with static datasets, Spark Streaming focuses on analyzing and processing live data as it arrives.
But here's where it gets interesting. Spark Structured Streaming abstracts away the intricacies of streaming. This means that complex aspects like incremental processing, watermarks, and checkpointing are handled effortlessly. You can, therefore, focus on building sophisticated streaming applications without delving deep into new tools or concepts.
Moreover, one of the standout features of Spark Streaming is its unified batch and streaming APIs. You don't have to switch between different technology stacks for batch and streaming. This is not just a boon for developers, but it also ensures a smoother transition from batch Spark jobs to streaming ones.
When using Spark Streaming, messages are divided into batches using a provided batch duration configuration. If you specify a batch duration of 10 seconds, all messages received by spark within that 10 seconds will be converted into a batch to be processed by your streaming job.
For this example, let's assume you are handling 2 types of events, a CreateOrder event and an AddItem event. To be able to process an AddItem event, you'll need to ensure that a CreateOrder event has been processed and persisted.
Each batch of events will have 3 states.
To handle all 3 of these states, you'll need to do a JavaPairDStream::fullOuterJoin on the 2 streams.
Additionally, I’m going to map the values to the following POJO class for clarity.
The stream can be mapped to this class by following:
If you were to process each stream of events independently, you could not guarantee that if the 2 events arrived in the same batch that the CreateOrder event would be processed before the AddItem event. A race condition would exist where different Spark workers could handle the actual work of saving the individual streams to the database and process them at different rates.
Now that you have the 2 streams joined, you can ensure that messages received within the same batch can be processed together in the correct order. However, you'll still need to handle the scenario where an AddItem message arrives in a batch prior to a corresponding CreateOrder event.
In the example below, you'll notice that an AddItem event referencing order number 101 arrives in a batch before the CreateOrder event for that order number.
To handle this, you'll use the JavaPairDStream::mapWithState function. The mapWithState function allows you to process each tuple in a JavaPairDStream and store calculated values in memory by the key of the pair in the stream.
It also requires the use of checkpointing to occasionally persist the state for fault tolerance. You’ll need to provide a StateSpec instance to mapWithState to define the behavior of how each pair will add, update, or remove data from state as well as what data to emit to be consumed downstream.
The way to create a StateSpec instance is to use the static StateSpec.function builder constructor. It requires an implementation of a Spark Function Interface. In our case we’ll create an implementation of the interface Function3<KeyType, ValueType, StateType, MappedType>
This function will handle the following scenarios:
Here is an abstract implementation of that interface:
Now you’ll include it in your stream processing:
Here is a GIF representing the process:
At this point, you've guaranteed that even if the events arrive out of order you can process them as though they came in serially.
Message sequencing in Spark isn’t just a buzzword. It's an imperative approach to ensure data integrity in real-time analytics. Firstly, by maintaining the correct order of messages, businesses can ensure accurate event tracking, which in turn, affects the insights derived. If the sequence is wrong, the analysis can go awry, leading to potential misinformed decisions.
Furthermore, consider the dependencies between different types of events. When message sequencing in Spark is enforced correctly, it guarantees that dependent events are processed in the right order.
For instance, an AddItem event must be processed only after its corresponding CreateOrder event. Sequencing ensures such dependencies are respected, eliminating potential errors or inconsistencies in the data.
Finally, message sequencing in Spark reduces the complexity of handling out-of-order events. Instead of building intricate logic to handle these scenarios, Spark can take care of it, allowing developers to focus on other core functionalities of the application.
In the realm of real-time analytics, ensuring data consistency and order is not a luxury; it's a necessity. Message sequencing in Spark offers a reliable way to handle asynchronous events, even when they arrive out of order.
With the formidable power of Apache Spark Streaming and its built-in features, businesses can now seamlessly handle, process, and analyze live streams of data with confidence.
Tell us what you need and one of our experts will get back to you.