Apache Spark is an open-source data processing engine. It contains number of libraries, which you can use to process data on a cluster of computers. The most important benefit is the ability to process data in parallel. It perfectly suits Big Data (add link). It supports multiple programming languages (Python, Java, Scala, and R). You can solve problems in a multiple ways by using SQL, data streaming, and machine learning.
It is widely used in engineering and science. One important user that comes to my mind is CERN they have petabytes of data and they crunch numbers with Spark. It can be used in your local clusters, cloud computing or distributed clusters. It can manage enormous amounts of data.
Spark coordinates the execution of jobs on data across the cluster. To manage cluster it needs additional tool like YARN or Mesos. They grant resources on a cluster to mange jobs. Essentially Spark is a set of tools that work together providing engine to process your data.
You can take advantage of Spark by using its’ tools and libraries. Spark is composed of lower level APIs and Structural APIs. you can access as well series of libraries for additional functionality. You can perform any data related activities. It can be graph analysis, machine learning, or data streaming.
Structured API are called Datasets. You can use them for writing statically typed code in Java or Scala. This dataset is not available in Python or R since they are dynamically typed. The dataset API provides ability to assign Java/Scala to the records within DataFrame and manipulate it as collection of objects. This flexibility in toolset makes it easy to use higher level API for simple data analysis. In case you have more complex problem you can solve it by using lower level APIs like Datasets or RDDs. they provide you with secure type safe data manipulation.
It is a high level API to perform structures streaming instead of batch processing. This can help reducing latency and allow for incremental processing. The biggest advantage is fast extraction of value out of stream with virtually no code changes. You can start with batch job and then convert it to streaming. This way by incrementally processing data you have full control on the complete end to end process.
These days Machine Learning is very important and help automating difficult tasks. Spark has library MLlib that allows processing, training of models and making predictions. You have access to very sophisticated API allowing to perform several tasks like classification, regression, clustering or deep learning.
Spark has lower level APIs to allow for arbitrary Java and Python object manipulation via Resilient Distributed Datasets (RDDs). Almost everything in Spark is build on top of RDDs. for basis tasks it is advisable to use DataFrames, which are higher level APIs. They offer simplicity to the user. Obviously if you need to access physical execution like partitions then RDDs may be a better choice.
This is a tool to run R on Spark. It has same principles as other Spark language APIs. Good news for R funs is that almost everything that is available in Python is available in R. Community has created plethora of libraries, there are over 300 packages available spark-packages.org.
Structured APIs are great to manipulate all types of data. You can manage unstructured files, semi-structured CSV files and highly structured Parquet files. There are three types of APIs
- SQL tables and views
With structured APIs you can run both batch and streaming jobs. It is very simple to switch between batch and streaming. Jobs begin with execution of a graph of instructions. They are broken down into stages. The logical structure that you can manipulate with transformations and actions are DataFrames and Datasets. To create a new DataFrame or Dataset, you call a transformation.
DataFrame and Datasets
They are structured collections representing table like collections with rows and columns (Excel is a good analogy). They are immutable, which means you can not change them after creation. They are same thing as tables and views for Spark SQL. When we are talking about this collection is good to know the concept of schema. Schema defies column names and data types like string or integer. Spark types map directly to the different language APIs.
DataFrames types are verified by Spark and runtime. On the other hand Datasets types are checked on compile time. Datasets are only available to JVM based languages like Java or Scala.
DataFrames are simply Datasets of Type Row, which is Spark internal representation of its optimized in memory format for computation. Column represents simple type like string or integer.
When you are aggregating something you are ultimately collecting data summaries. It is the cornerstone of Big Data analytics. In an aggregation you will specify a key or grouping and an aggregation function. This way you will specify how do you want to transform one or more columns. In majority circumstances you will aggregate to summarize numerical data by means of some grouping. With Spark you can aggregate any kind of values into array, list, or map.
You will quite often perform some kind of calculations based on groups in the data. There are several grouping types available in Spark:
- Simple grouping: summary of DataFrame by performing aggregation in a select statement.
- Group by: specify one or more keys and one or more aggregations to transform the value columns.
- Window: it is similar to the group by but the rows input to the function are somehow related to the current row.
- Grouping set: can be used aggregate at multiple different levels. They are available as primitives in SQL and via rollups and cubes in DataFrames.
- Rollup: specify one or more keys and one or more aggregations to transform the value columns, which will by summarized hierarchically.
- Cube: specify one or more keys and one or more aggregations to transform the value columns, which will be summarized across all combinations of columns. .
There are several aggregation functions available like count, countDistinct, approx_count_distinct, first and last, min and max, sum, ect..
If you want to perform some aggregations that are not available you can define them yourselves, they are called UDSFs. You can create custom formulae or business rules.
When working with data you will very often come across a problem, when you will need to merge different datasets together. That what joins are all about, they bring together two sets of data. This mechanism compares the left and right dataset on one or more keys and determines whether Spark should bring together the left set with the right set.
The most common is equi-join compares whether the specified keys in your left and right datasets are equal. If they are equal, Spark will combine the left and right datasets.
There are several join types available
- Inner joins: keep rows with keys that exits on the left and right
- Outer joins: keeps rows that exists in either the left or right dataset
- Left outer joins: keep rows with keys in the left dataset
- Right outer joins: keep rows with keys in the right dataset
- Left semi joins: keep rows in the left, and only the left, where they key appears in the right dataset
- Left anti joins: keep the rows in the left and only the left, where they do not appear in the right dataset
- Natural joins:perform a join by implicitly matching the columns between the two datasets with the same names
- Cross (Cartesian) joins: matches every row in the left dataset with every row in the right dataset.
There are some difficulties when performing joins. Although they can happen in rare circumstance you should know about them.
One tricky thing is a duplicate column name. In a DataFrame, each column has a unique ID within Spark SQL engine – Catalyst. This ID is internal and cannot be referenced. In some cases if two sets have same column names you can have a small problem. There are two ways of solving this problem.
- If there are two columns with same key names, the easiest way is to change the join expression from a Boolean to a String. This will automatically remove duplicated column from set.
- You can drop the column after the join.
- Rename the column before the join
Spark provides you with out of the box functionality to work with different data sources. There are six core data sources and hundreds of external sources written by the community. The core Sparks data sources are
- JDBC/ODBC connections
- Plain text files
Community gives you lots more connectivity like Cassandra, HBase, AWS Redshift, XML ect.
The read API looks like this
DataFrameReader.format(...). option(" key", "value"). schema(...). load()
The core structure for writing data is as follows:
DataFrameWriter.format(...). option(...). partitionBy(...). bucketBy(...). sortBy( ...). save()
One of the most powerful connectors is one for SQL Databases, which is very popular. You can connect to SQL Server, MySQL, PostgreSQL. Here I will give you a tip, since Spark cannot translate all of its own functions into the functions available in SQL database, it is advisable to pass entire query into SQL. This will return result in a form of a DataFrame.
This one is very important feature that will make your life easier. With Spark SQL you can run SQL queries against views or tables. Spark provides functions to analyse query plans in order to optimize workloads. This integrates directly into DataFrames and Datasets API.
With release 2.0 Spark supports both ANSI-SQL as well as HiveQL queries. The power of Spark SQL is in a ability to use in any data flow. Its API allows for data to be extracted with SQL manipulated as DataFrame. You can then pass it to MLlib and then use it as another data source. You have to bare in mind that this language was designed to work with OLAP workloads not low latency OLTP databases.
Spark SQL and Hive
Another good news is that Spark SQL has good relationship with Hive. it can connect to its metastore. Hive maintains table information for use across sessions in a metastore. This is useful for users working in some legacy Hadoop systems. You need to remember that there are several requirements you need to fulfil to access Hive metastore.
Spark SQL CLI
Spark SQL CLI is a convenient tool with which you can make basic Spark SQL queries in local mode. All of this is available in the command line.
To start the Spark SQL CLI, run the following in the Spark directory: ./bin/spark-sql
Spark SQL Interface
You can execute SQL queries via Spark SQL APIs. You can do this by using sql method on Sparksession object. This results in a DataFrame. Just like other transformations this will be executed lazily. This is a great benefit because there are transformations that are much simpler to express in SQL code than in DataFrames.
spark.sql(“SELECT 1 + 1“).show()
Spark provides JDBC interface by which you can connect to a driver and execute SQL queries.
The Catalog stores the metadata about the data in your table as well as other helpful things like databases, functions, and views. The catalog is available in the org.apache.spark.sql.catalog.Catalog package and contains a number of helpful functions. You need to wrap all the code into spark.sql function.
To work effectively with Spark SQL you need to define tables. Tables are logically equivalent to DataFrames having same structure. You can perform same kind of manipulations like aggregations, filtration or joins. The main difference between them is the scope, DataFrame are defined within programming language whether tables are defined within a database. There is a notion of default table where your tables will be stored unless you specify where exactly they should go.
You need to remember that when you create a table from a file you are creating an unmanaged table. When you create a table from DataFrame by using you are creating managed table. For which Spark will track all of the relevant information.
Spark SQL is fully compatible with Hive SQL (HiveQL) statements.
Datasets are the foundational type of the Structured APIs. They only available in JVM, which means Scala or Java. By using Datasets you can define the object that each row in your Datasets will contain. Spark manipulates the row object data for you
When you use the DataFrame API, you do not create strings or integers, but Spark manipulates the data for you by manipulating the Row object. What you need to about types is the concept of “Encoder”. Encoder job is to map your language data types to Spark’s internal type. To find out what Spark data types are go to Spark documentation. They are mostly usable when you cannot perform manipulations on DataFrames, then you should switch to Datasets. In addition Datasets API is type safe, which means the operation will fail at compilation time not runtime.
Partitioning is very important concept of Spark and Big Data in general. You must understant this topic in deep in order to ensure application runs in a optimal way. Spark process data using partititions. They allow to parallelize distributed data processing with minimal network traffic. There is a lot of overhead if you start exchanging data between executors.
There are several questions you need anwers to before making a Spark a success story for your project.
Spark makes the best effort to read data into an RDD from the nodes that are close to it. Since RDD can be huge it must be partitioned across various nodes. This feature alongside creation of partitions to hold data in chunks improve transformations. This has direct impact on time the application needs to run as well as cost impact for the business. Since we are working in a cloud as developers must be aware that more time takes more money. We should do best effort to wisely used company resources.
Spark partitions RDDs automatically without your intervension. Howerver there are time when you need to take change and adjust paritions according to the needs of your particular process. There is a lot of tunning you do for good Spark experience. I will write separate artice dedicatied to Spark tuning since this is important and complex problem to tackle.