Fast Data Processing Pipeline for Predicting Flight Delays Using Apache

According to Bob Renner, CEO of MapR Partner Liaison Technologies, in Forbes machine learning predictions for 2018, the possibility to blend machine learning with real-time transactional data flowing through a single platform is opening a world of new possibilities, such as enabling organizations to take advantage of opportunities as they arise. According to Gartner Over the next few years, virtually every app, application, and service will incorporate some level of machine learning. Leveraging these opportunities requires fast and scalable data processing pipelines.

This is the third in a series of blogs that discuss the architecture of a data pipeline that combines streaming data with machine learning and fast storage. The first post discussed creating a machine learning model to predict flight delays. The second post discussed using the saved model with streaming data to do real-time analysis of flight delays. This third post will discuss fast storage and analysis with MapR-DBApache Spark, Apache Drill, and OJAI.

Machine Learning Logistics and Data Pipelines

Machine learning usually refers to the model training piece of an ML workflow. But as Ted Dunning says, 90% of the effort around machine learning is data logistics, which includes all of the aspects that occur before and after this training. When you combine event streams with microservices, you can greatly enhance the agility with which you build, deploy, and maintain complex data pipelines. Pipelines are constructed by chaining together microservices, each of which listens for the arrival of some data, performs its designated task, and optionally publishes its own messages to another topic. Combining event-driven data pipelines with machine learning can handle the logistics of machine learning in a flexible way by:

  • Making input and output data available to independent consumers.
  • Managing and evaluating multiple models and easily deploying new models.
  • Monitoring and analyzing models, with historical and real-time data.

Architectures for these types of applications are discussed in more detail in the e-books Machine Learning LogisticsStreaming Architecture, and Microservices and Containers.

The following figure depicts the (simplified) data pipeline for this tutorial:

  • Flight trip data is published to a MapR Event Streams (ES) topic using the Kafka API. (Note that this data contains the actual delay label, in the real world architecture the actual delay label would come later in a different topic, but to keep the tutorial code simple it is combined with the input data).
  • A Spark Streaming application subscribed to the first topic enriches the event with the flight predictions and publishes the results in JSON format to another topic. (In the real world architecture, there would be multiple consumers publishing model predictions, but to keep the tutorial code simple, there is only one here.)
  • A Spark Streaming application subscribed to the second topic stores the flight trip data and predictions in MapR-DB using the Spark MapR-DB Connector.
  • Apache Spark SQL, Apache Drill SQL, and Open JSON applications query MapR-DB to analyze flight data and prediction performance.

How to Store the Data

One of the challenges when you are processing lots of streaming data is deciding where you want to store it. With a relational database and a normalized schema, related data is stored in different tables. Queries joining this data together can cause bottlenecks with lots of data. For this application, MapR-DB JSON, a high-performance NoSQL database, was chosen for its scalability and flexible ease of use with JSON — MapR-DB and a denormalized schema scale because data that is read together is stored together.

With MapR-DB (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing scalable and fast reads and writes by row key.

The Spark MapR-DB Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets.

JSON Schema Flexibility

MapR-DB supports JSON documents as a native data store. MapR-DB makes it easy to store, query, and build applications with JSON documents. The Spark connector makes it easy to build real-time or batch pipelines between your JSON data and MapR-DB and leverage Spark within the pipeline.

JSON facilitates the natural evolution of your data schema during the life of your application. For example, suppose at first we have the following schema, where each JSON message has the predicted flight delay using a decision tree:

{
    "_id": "UA_2017-03-28_DEN_SFO_721",
    "dofW": 2,
    "carrier": "UA",
    "origin": "DEN",
    "dest": "SFO",
    "crsdephour": 11,
    "crsdeptime": 1120.0,
    "crsarrtime": 1308.0,
    "crselapsedtime": 168.0,
    "pred_dtree": 1.0
}

Later, you can easily capture more prediction data values quickly without changing the architecture of your application and without updating a database schema by adding attributes. In the example below, we have added predictions for other machine learning models. These can be added dynamically to the same document instance in MapR-DB without any database schema changes:

{
    "_id": "UA_2017-03-28_DEN_SFO_721",
    "dofW": 2,
    "carrier": "UA",
    "origin": "DEN",
    "dest": "SFO",
    "crsdephour": 11,
    "crsdeptime": 1120.0,
    "crsarrtime": 1308.0,
    "crselapsedtime": 168.0,
    "pred_dtree": 1.0
     "pred_randforest": 1.0
     "pred_svm": 1.0
     "actual_delay": 1.0

 
}

MapR Event Streams allow processing of the same messages by different consumers. This makes it easy to add different consumers for the same message. With this type of architecture and flexible schema, you can easily add and deploy new microservices with new machine learning models.

Spark Streaming writing to MapR-DB

The MapR-DB OJAI Connector for Apache Spark enables you to use MapR-DB as a sink for Apache Spark Data Streams.

You can read about the MapR Event Streams Spark Streaming code in Part 2 of this series. Here, we will focus on Spark Streaming writing to MapR-DB. The messages from the MapR-DB topic are in JSON format and contain the following for each flight: the flight ID, day of the week, carrier, origin, destination, scheduled departure hour, scheduled departure time, scheduled arrival time, scheduled travel time, delay prediction, and actual delay label. (Note: In the real world, architecture the actual delay label would come later in a different topic, but to keep the tutorial code simple it is combined here.) An example is shown below:

{
    "_id": "UA_2017-03-28_DEN_SFO_721",
    "dofW": 2,
    "carrier": "UA",
    "origin": "DEN",
    "dest": "SFO",
    "crsdephour": 11,
    "crsdeptime": 1120.0,
    "crsarrtime": 1308.0,
    "crselapsedtime": 168.0,
    "label": 0.0,
    "pred_dtree": 1.0
}

Below, we use a Scala case class and Structype to define the schema, corresponding to the input data.

We use the KafkaUtils createDirectStream method with Kafka configuration parameters to create an input stream from a MapR-ES topic. This creates a DStream that represents the stream of incoming data, where each message is a key-value pair. We use the DStream map transformation to create a DStream with the message values.

In the code below each RDD in the valuesDStream is transformed into a Spark Dataset. Then, the MapR-DB Spark Connector DStream saveToMapRDB method performs a parallel partitioned bulk insert of JSON FlightwPred objects into MapR-DB.

Querying MapR-DB JSON With Spark SQL

The Spark MapR-DB Connector enables users to perform complex SQL queries and updates on top of MapR-DB using a Spark dataset while applying critical techniques such as projection and filter pushdown, custom partitioning, and data locality.

A Spark dataset is a distributed collection of data. Dataset is a newer interface, which provides the benefits of strong typing, the ability to use powerful lambda functions, and efficient object serialization/deserialization, combined with the benefits of Spark SQL's optimized execution engine.

A DataFrame is a dataset organized into named columns Dataset[Row]. (In Spark 2.0, the DataFrame APIs merged with Datasets APIs.)

Loading Data From MapR-DB Into a Spark Dataset

To load data from a MapR-DB JSON table into an Apache Spark dataset, we invoke the loadFromMapRDB method on a SparkSession object, providing the tableName, schema, and case class. This will return a Dataset of FlightwPred objects:

Explore and Query the Flight Data With Spark SQL

Datasets provide a domain-specific language for structured data manipulation in Scala, Java, and Python. Below are some examples in Scala. The dataset show() action displays the top 20 rows in a tabular form.

In the code below, a filter is used to count the predicted delays, actual delays, and total delays. This is then used to calculate the ratio wrong, correct, or false positive. These type of calculations would be useful for continued analysis of models in production.

The output is shown below.

What is the count of predicted delay/notdelay for this dstream dataset?

You can register a dataset as a temporary table using a given name and then run Spark SQL. Here are some example Spark SQL queries on the Dataset of FlightwPred objects.

What is the count of predicted delay/notdelay by day of the week?

scala> spark.sql("select dofW, pred_dtree, count(pred_dtree) from flight group by dofW, pred_dtree order by dofW").show

What is the count of predicted delay/notdelay by destination?

scala> spark.sql("select dest, pred_dtree, count(pred_dtree) from flight group by dest, pred_dtree order by dest").show

(The complete code, instructions, and more example queries are in the GitHub code link at the end.)

Querying the Data With Apache Drill

Apache Drill is an open-source, low-latency query engine for big data that delivers interactive SQL analytics at petabyte scale. Drill provides a massively parallel processing execution engine built to perform distributed query processing across the various nodes in a cluster.

With Drill, you can use SQL to interactively query and join data from files in JSON, Parquet, or CSV format, Hive, and NoSQL stores including HBase, MapR-DB, and Mongo without defining schemas. MapR provides a Drill JDBC driver that you can use to connect Java applications and BI tools such as SquirreL and Spotfire to Drill. Below is a snippet of Java code for querying MapR-DB using Drill and JDBC:

The output for this query What is the count of predicted delay/notdelay by origin? is shown below:

Below are some example SQL queries using the Drill shell.

What is the count of predicted delay/notdelay by origin?What is the count of ?predicted delay/notdelay by origin and dest?

Follow the instructions in the GitHub code readme to add a secondary index to MapR-DB and try more queries using the index.

Querying With the Open JSON API (OJAI)

Below is a Java example of using the OJAI Query interface to query documents in a MapR-DB JSONtable:

Partial output for this query to "find predicted late flights for AA" is shown below:

Below are some example OJAI queries using the MapR-DB shell.

What are the SFO to DEN flights that were predicted late?

{"_id":"%SFO_DEN%"} }]}' --f _id,pred_dtree