So you've decided to use Apache Spark Streaming to update your real-time analytics system. This tool helps to handle events being published by upstream services and collating them into a database. 

However, with any asynchronous system, it's possible that events arrive in a different order than the order they were published. This could be due to network outages, overloaded systems, or a variety of other causes. 

Regardless of the reason, how do you ensure that asynchronous events with dependent data are processed in the order you expect? For instance, what if one event contained a foreign key to data in another event? Well, you would need to ensure that the referenced event was persisted first. 

In this blog post, we'll explore how we can handle this scenario using Spark Streaming. Let’s take a look.

Meet Apache Spark Streaming

Apache Spark Streaming is a real-time data processing tool that seamlessly integrates with the core Spark API. Instead of dealing with static datasets, Spark Streaming focuses on analyzing and processing live data as it arrives. 

But here's where it gets interesting. Spark Structured Streaming abstracts away the intricacies of streaming. This means that complex aspects like incremental processing, watermarks, and checkpointing are handled effortlessly. You can, therefore, focus on building sophisticated streaming applications without delving deep into new tools or concepts.

Moreover, one of the standout features of Spark Streaming is its unified batch and streaming APIs. You don't have to switch between different technology stacks for batch and streaming. This is not just a boon for developers, but it also ensures a smoother transition from batch Spark jobs to streaming ones. 

Handling Out Of Order Events and Message Sequencing With Spark Streaming

When using Spark Streaming, messages are divided into batches using a provided batch duration configuration. If you specify a batch duration of 10 seconds, all messages received by spark within that 10 seconds will be converted into a batch to be processed by your streaming job. 

For this example, let's assume you are handling 2 types of events, a CreateOrder event and an AddItem event. To be able to process an AddItem event, you'll need to ensure that a CreateOrder event has been processed and persisted.

Each batch of events will have 3 states.

  • A CreateOrder event arrives with no corresponding AddItem events in the batch
  • An AddItem event arrives with no corresponding CreateOrder event in the batch
  • Both a CreateOrder and corresponding AddItem events arrive in the batch

To handle all 3 of these states, you'll need to do a JavaPairDStream::fullOuterJoin on the 2 streams.

JavaPairDStream<String, CreateOrder> createOrderStream = getCreateOrderPairStreamFromContext();  JavaPairDStream<String, List<AddOrderItem>> addItemStream = getAddItemPairStreamFromContext();  JavaPairDStream<String, Tuple2<Optional<CreateOrder>, Optional<List<AddOrderItem>>>> joinedStream =         createOrderStream.fullOuterJoin(addItemStream);

Additionally, I’m going to map the values to the following POJO class for clarity.

// JoinedOrderEvent.java package com.experoinc.demo;  import com.experoinc.demo.event.AddOrderItem; import com.experoinc.demo.event.CreateOrder; import lombok.NonNull; import org.apache.spark.api.java.Optional;  import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List;  public class JoinedOrderEvent implements Serializable {      private static final long serialVersionUID = 1L;      private final Optional<CreateOrder> order;     private final Optional<List<AddOrderItem>> items;      public JoinedOrderEvent(CreateOrder order, List<AddOrderItem> items) {         this.order = Optional.ofNullable(order);         this.items = Optional.ofNullable(items);     }      public JoinedOrderEvent addItems(@NonNull Collection<AddOrderItem> additionalItems)     {         List<AddOrderItem> items = this.items.orElse(new ArrayList<>(additionalItems.size()));         items.addAll(additionalItems);          return new JoinedOrderEvent(this.order.orElse(null), items);     }      public Optional<CreateOrder> getOrder() {         return order;     }      public Optional<List<AddOrderItem>> getItems() {         return items;     }      public boolean hasOrder() {         return order.isPresent();     }      public boolean hasItems() {         return items.isPresent();     } }

The stream can be mapped to this class by following:

JavaPairDStream<String, JoinedOrderEvent> joinedOrderStream = joinedStream         .mapToPair((pair) -> {             String orderNumber = pair._1;             CreateOrder order = pair._2._1.orNull();             List<AddOrderItem> items = pair._2._2.orNull();              return Tuple2.apply(orderNumber, new JoinedOrderEvent(order, items));         });

If you were to process each stream of events independently, you could not guarantee that if the 2 events arrived in the same batch that the CreateOrder event would be processed before the AddItem event. A race condition would exist where different Spark workers could handle the actual work of saving the individual streams to the database and process them at different rates. 

Now that you have the 2 streams joined, you can ensure that messages received within the same batch can be processed together in the correct order. However, you'll still need to handle the scenario where an AddItem message arrives in a batch prior to a corresponding CreateOrder event. 

In the example below, you'll notice that an AddItem event referencing order number 101 arrives in a batch before the CreateOrder event for that order number.

To handle this, you'll use the JavaPairDStream::mapWithState function. The mapWithState function allows you to process each tuple in a JavaPairDStream and store calculated values in memory by the key of the pair in the stream. 

It also requires the use of checkpointing to occasionally persist the state for fault tolerance. You’ll need to provide a StateSpec instance to mapWithState to define the behavior of how each pair will add, update, or remove data from state as well as what data to emit to be consumed downstream.

The way to create a StateSpec instance is to use the static StateSpec.function builder constructor. It requires an implementation of a Spark Function Interface. In our case we’ll create an implementation of the interface Function3<KeyType, ValueType, StateType, MappedType>

  • KeyType represents the type of key, in our case the order number String
  • ValueType represents the type of Value to be mapped, in our case the JoinedOrderEvent
  • StateType represents the type of object to be stored in state, in our case it will be a List
  • MappedType represents the type of object to be emitted each time this function is called, in our case we will use an Optional to represent the values to be persisted after each iteration. If the output is Optional::absent(), then nothing should be persisted.

This function will handle the following scenarios:

  • If a CreateOrder event exists in this batch, join it against any AddOrderItem entries in the state, clear the state for that order number, and emit the combined events.
  • If an AddItem event exists in this batch with no corresponding CreateOrder event, add the AddItem event to the state and emit Optional::absent()

Here is an abstract implementation of that interface:

Now you’ll include it in your stream processing:

AbstractJoinedOrderStateFunc stateFunc = createJoinedOrderStateFuncImpl(); JavaMapWithStateDStream<String, JoinedOrderEvent, List<AddOrderItem>, Optional<JoinedOrderEvent>> stateStream =             joinedOrderStream.mapWithState(StateSpec.function(stateFunc));  stateStream.filter(Optional::isPresent)     .map(Optional::get)     .foreachRDD((rdd) -> { /* persistence processing */ });

Here is a GIF representing the process:

At this point, you've guaranteed that even if the events arrive out of order you can process them as though they came in serially.

Advantages of Message Sequencing in Spark

Message sequencing in Spark isn’t just a buzzword. It's an imperative approach to ensure data integrity in real-time analytics. Firstly, by maintaining the correct order of messages, businesses can ensure accurate event tracking, which in turn, affects the insights derived. If the sequence is wrong, the analysis can go awry, leading to potential misinformed decisions.

Furthermore, consider the dependencies between different types of events. When message sequencing in Spark is enforced correctly, it guarantees that dependent events are processed in the right order. 

For instance, an AddItem event must be processed only after its corresponding CreateOrder event. Sequencing ensures such dependencies are respected, eliminating potential errors or inconsistencies in the data.

Finally, message sequencing in Spark reduces the complexity of handling out-of-order events. Instead of building intricate logic to handle these scenarios, Spark can take care of it, allowing developers to focus on other core functionalities of the application.

Conclusion

In the realm of real-time analytics, ensuring data consistency and order is not a luxury; it's a necessity. Message sequencing in Spark offers a reliable way to handle asynchronous events, even when they arrive out of order. 

With the formidable power of Apache Spark Streaming and its built-in features, businesses can now seamlessly handle, process, and analyze live streams of data with confidence.

Contact Us

We are ready to accelerate your business. Get in touch.

Tell us what you need and one of our experts will get back to you.

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.