Data Applications for Large Scale Data Transformations Made Easy - Part 3

CASE STUDY

Introduction

Previous blogs in this series:

If you have not had a chance to read the first 2 parts of this series, I highly recommend it. To recap, Xcalar is that long needed data application tier that makes it easy to build powerful data applications on top of Snowflake, S3 and other data lake technologies. Xcalar pushes SQL predicates and sub-queries to connected data warehouses and helps build complex data applications on the rows and columns of interest. Today I want to showcase the power of the Xcalar programming paradigm and Xcalar IDE by zooming into one of the side data applications that we developed while working with a top 5 investment bank customer.

As mentioned in the previous blog, the bank’s streaming platform integrates dozens of regional compute-heavy Xcalar data applications that continuously process over two terabytes of data from HDFS with several gigabytes daily volume of live updates via multi-partitioned Kafka streams. Each of these data applications performs complex computations over more than 30 source tables and micro-batch increments in near real-time and outputs compute results back into Kafka. You can imagine the complexity. 

A complex system like this required a high level of transparency to analyze its internals and identify volume and performance trends and correlations, and discover opportunities to improve and catch any bottlenecks before they become problems. 

Streaming Data Applications on Xcalar Platform - Review

As a result, we used Xcalar to develop a monitoring solution for the bank’s data applications running on Xcalar. For this project, I wanted to pull a bunch of logs, such as application logs, Linux logs and Xcalar logs, and do some log processing. What are my next steps? I love SQL and I am not very familiar with log processing applications. In this blog I will show how I leveraged Xcalar to extract rich semantics of operationalized metrics of the complex streaming data applications and published them as a data mart with just SQL+Python, woven together using Xcalar’s low-code visual programming.

As mentioned, the main input into the data mart is the streaming platform application logs including information about 

  • Initial sources
  • Kafka streams (topics, partitions, offsets, etc.)
  • Application configurations and metadata
  • Applications regions, sources and dependencies
  • Batches and their versions
  • Processing stages & microservices (load, ingest, IMD, execute, export, snapshot, recovery)
  • Live table lifecycles (microbatch row counts, sizes, versions)
  • Transactions and their metadata
  • Errors and warnings if any

Image 1. Xcalar streaming data application platform workflow

Getting Started

I decided to use the SaaS version of Xcalar to develop the streaming application operational monitoring data mart. You can register for a free Xcalar Cloud Developer account at my.xcalar.cloud and get started in just a few easy steps outlined in the Creating a Xcalar account help page.

Let’s talk through how one would build such an application. As with every data compute application - we need to start with data.

First I navigate to my S3 bucket. Some data is already there. As shown in the image below, I uploaded my log file and selected it, then selected empty string as a column delimiter to load it as  a single column CSV. Next, I reviewed the auto-detected schema (see step 2 on the screenshot below)  and eventually I loaded the table and navigated to my notebook project.

Image 2. Steps to load data with Xcalar Load 

See Youtube Xcalar Channel for more on how to register for an Xcalar Cloud Developer account and how to upload your data. Data loading is also described in this how-to blog.

In this case study I would like to highlight a few awesome capabilities of Xcalar that made my data engineering experience pleasant and efficient.

Scalar Function Example

The first awesome capability that I want to highlight is that I did not have to create and deploy a predefined log parser. Rather than parsing log files on ingest - I loaded them into Xcalar as a single-column CSV (see the first step in the diagram below). Then, I developed a simple custom scalar function in Python that applies a basic regex to extract syntactic components of a log record into columns. Xcalar handles the mapping over table rows, column population and all the other boring details. 

Custom scalar functions in Xcalar provide a powerful way to extend built-in SQL operators using Python (and soon to be supported languages such as Java or Scala). Scalar functions execute in their own process space, so that user errors are isolated and have no way to affect or crash core Xcalar Engine execution. 

Traditionally, scalar functions are known to work on tuples or one or more parameters and give you back a scalar value. This is also what our scalar functions do. You can point them to columns from one or more tables and they will give you a value that you can put in any table column. Scalar functions execute on one row at a time but Xcalar parallelizes them across all cores in the cluster boosting performance proportionally to the number of cores in your cluster. For example, if your cluster has 32 cores across all nodes - performance is boosted 32 times. If you have 144 cores - it is boosted 144 times. 1000 cores - 1000 times. With Xcalar I could develop and test with large datasets iteratively.

Of course, a similar performance boost would apply if I run a scalar function that applies a machine learning or NLP algorithm in Xcalar - but this topic deserves a whole other blog post. Stay tuned.

You can see the application of my scalar function in the second step on the diagram. The function is very basic, just 9 lines of code aided by a few imported external Python modules.

As a next step, I extract syntactical attributes, extract semantics, e.g., parent-child relationships between records and group records by applications. And lastly, I publish the temporary table interface that other modules can reuse. Look at what we have done - we built a complete parser using just ANSI SQL enhanced with very simple scalar functions applied in parallel and leveraging the full power of all cores. We avoided the need to create custom ingest parsers and instead were able to develop parsing logic interactively as part of the overall business logic of the application.

Image 3. Loading and extracting log semantics with Xcalar 

For more on Scalar Functions refer to these resources:

Complete Implementation

The second incredible capability of Xcalar that I want to highlight is the power it gave me to combine declarative and imperative programming paradigms. The thing is - when you start developing a data mart you may not yet know what the final data model will look like until you have loaded and profiled the source data and analyzed its patterns. This is a kind of catch-22. Xcalar allows you to break this evil circle. It empowers the result-driven data engineers who understand the business goal of what they are doing. The Xcalar programming paradigm allows them to work iteratively, fine-tuning the solution and the data model, developing and testing continuously, one bit at a time.

As you see in the diagram below (“Analyze Profile Test” block in the middle), once I finish parsing the data, I am able to start analyzing the data attributes using table profiling and SQL (the two blocks above). Then I build the first set of business logic and interpolations using SQL, visual programming, Python scalar functions and table functions. And then more analysis and testing, then next level of development, and so on until I arrive at the final data mart, with a data model ready to share with BI developers.

Image 4. Xcalar Data Application programming paradigm

This diagram below  illustrates how I eventually organized my Xcalar data application and refactored it into four application modules.

Module 1 - Log Parsing 

As discussed in the previous section, here we load source files, parse out log syntax, collate stack traces (remember that stack traces span over multiple lines which need to be associated with the first complete log entry) and mainly extract workflow semantics related to streaming applications and their component state changes, micro batch identifiers and processing stages.

Module 2: Data Preparation 

Here we extrapolate application state changes and micro-batch identifiers and enrich the corresponding data records that were logged by the same Xcalar application. Thankfully, Xcalar allowed me to combine the declarative power of SQL with the imperative programming in Python to accomplish this task easily.

Module 3: Compute and Build Normalized Tables 

With the main measures and dimensions computed, I now focused on the final steps. I exploded objects written into log files as JSON blobs into their own data structures to enable tracking size, row counts and other properties of the live tables of each data application over its lifetime. 

Module 4: Visualization Data Cube

Lastly I prepared the final tables of my data mart and finalized their data models. The last computation in this tiny data application creates a data cube that will be used for visualizations. The snippet in the bottom right exemplifies a SQL statement used in this operator.

Image 5. Complete data application broken into modules

Zooming into Module 1 - notice how I can track intermediate table row counts, skews and time taken after each step while developing my data application. This lets me preview and adjust my dataflows interactively and iteratively to optimize execution.

Image 6. Zoom into interactive Logical Plan showing time taken, rows processed and skew after each operation

Table Function Example

In Module 3 I mentioned the explosion of JSON objects containing micro-batch sizes and row-counts for each of the sub-streams into a separate normalized table. 

For this I developed a reusable table function, which I will explain here.  

A table function is a phenomenal construct in Xcalar that lets me generalize a group of operations that manipulate and retrieve data from one or more input tables and returns the results in another table as a function. I can then use this function in the FROM clauses of subsequent SQL statements.

Look at the SQL node in the middle. It selects from the get_increments() table function consisting of three operations: first it explodes an array of JSON dictionaries into rows, then extracts and normalizes attributes (like increment sizes and row counts of micro-batches) as Xcalar table fields and in the last step it filters this new resultset to get rid of extraneous details and keep only information about user table increments.

This table function is then invoked from within SQL statements. Because table functions are reusable, I was able to leverage it in another dataflow.

Image 8. Zooming into Table Function example

For more on Scalar Functions refer to these resources:

Operationalization

The next diagram shows my operationalized data application. Here you see how dozens of SQL operators that I developed interactively in the previous step got compiled down to hundreds of execution primitives. The callouts show zoomed-in sets of operators.

Using this view of the execution plan you can track how data was transformed alongside the execution path, row counts, skew and time taken by each of the execution primitives. These execution primitives such as filters, maps and joins were compiled down from the high level SQL statements. This lets you troubleshoot your application and identify opportunities for improvement.

This operationalized application can now be either scheduled or invoked as a microservice via the Xcalar Shell (distributed along with Xcalar).

Image 9. Operationalized Data Application

Data Model and Visualization

Finally - the data model is exposed to BI developers. 

Xcalar supports Tableau, Qlik, Looker and other BI tools. Here we decided to use Jupyter notebook to execute SQL queries against the Xcalar data mart we created and build visualizations with MatPlotLib and Seaborn. In this case, Jupyter Notebook extracted data from Xcalar using PYODBC and Spark SQL connector.

For more on the connectivity from BI tools please check out this blog.

Image 10. BI Integration with Jupyter Notebook example

Summary

The real value for me as a data engineer was that with Xcalar I was able to build this in-memory data mart weaving its data model interactively and iteratively while I was analyzing data and developing business logic at the same time. No advanced DDL was required whatsoever. Xcalar provides a serverless IDE to create tables conforming to an E-R diagram.

  • Xcalar is low code. I was empowered to work on this in a highly parallel fashion without any knowledge of iterators. No niche expertise (such as Java, Scala, multithreading) was required - just SQL and basic Python. The Xcalar runtime engine took care of the rest.
  • I can utilize all resources available on the backend without even worrying about it - because I work serverlessly within the confines of my IDE. Xcalar will ensure maximum resources utilization for all my queries and data applications. 
  • Xcalar scalar functions work on all rows in parallel enabling superior performance on full data sets. I can write Python code, and I can do all of this myself here and now. We just built a whole parser in SQL because I got regular expressions and I got the ability to invoke functions in parallel utilizing all CPUs in my cluster.
  • I can take a very complex data application and step through the whole code down to execution primitives on a production-like data backend. That means that in production it will work exactly as it worked during development. This level of zero-change production deployment is a game changer.

Blog Series Conclusion

Folks - thank you for reading my three-part blog series. We reviewed the role of data applications in the fast-changing data-driven world where time-to-insights of operationalised ideas and time-to-operationalize ideas into data applications defines the winner. We also reviewed why Xcalar is the champion in this space. I hope that this blog series (Part 1. Xcalar - The real “Spark“ behind Data Applications, Part 2. Accelerating Complex, Real-time Insights at a Top Global Investment Bank and the case study you are now reading) helped convey why I am so excited about the Xcalar technology and why I think that this is a game changer. 

For those of you who went through the mill and got disillusioned by the promises of technologies that require steep learning curves, sophisticated skills and time investment to be able to code business logic over big data, I recommend trying out Xcalar to build business logic and create a real-time, converged layer above data warehouses, cloud storage and Kafka. Xcalar pushes SQL predicates and sub-queries to connected data warehouses and helps build complex data applications on the rows and columns of interest. Xcalar lowers the technical barrier of entry and empowers SQL+Python developers to build distributed data apps that outperform over-engineered data apps built with Apache Spark.

Xcalar provides a very clean way to write code using both styles: declarative SQL and imperative Python (or other languages of your choice) with open standards and without vendor lock-in. Xcalar gives me a pure compute layer, I mainly deal with code and I am happy to have the power of SQL available to me. We are using SQL ANSI 11 and we are able to extend it beyond its existing capabilities into regular expressions or any other Python libraries such as TensorFlow, NLTK, etc.  

Many of our customers use Xcalar as a compute layer on top of Kafka, S3, Snowflake and other data lake technologies and cloud databases, replacing Spark in this space. In the coming blogs we will show how Xcalar can run complex data applications on top of multiple Snowflake databases and S3 data lakes pushing down filter predicates and subqueries to the underlying data storage and run complex compute on the rows and columns of interest.

Please visit xcalar.com and to learn more.