Let us have a deep dive and check out how Spark can tackle some of the challenges of ETL pipeline that a data engineer is facing in his/her daily life.
Various types of data source
Source data may exist in various formats, such as csv, JSON, database table, text, etc. Along with the varieties of source formats, the data can be structured (Parquet, ORC, database table, etc.), semi-structured(csv, JSON, XML) or even unstructured (server logs).
Spark DataFrame interface provides operation on a wild variety of data sources. The supported built-in data sources include csv, JSON, Parquet, ORC, text, JDBC, Hive tables, and Avro. Spark also supports streaming processing as directly reading data from Kafka.
There are third-party packages available as data source connectors to get data to Spark. Check out Spark Packages website.
We can even write some customised codes to read data source, for example, I have a post of processing XML files with Spark.
Schema mismatch
It is not uncommon that the data type is inconsistent among records especially NoSQL is getting more popular. Spark supports inferring data type and schema if we don’t provide a pre-defined schema when reading the data source.
However, this is not a good practice especially in production environment. If we get a wrong data type, it may impact the downstream analysis, insights and reporting. We would prefer schema enforcement instead. We need to find out any data type inconsistency during data exploration, and fix it when transforming the data.
If it is really required, we can add an intermediate layer by saving the raw data in JSON format. Then we can reprocess the JSON data in case of changing schema.
Corrupted files or data records
In big data era, there are hundreds and thousands of files that are required to be ingested through ETL pipeline around the clock. Files may be corrupted or one or more records in the files may be malformed when something goes wrong on the data source.
Spark provides error handling mechanisms to suite different needs when reading the data sources in csv or JSON format.
Method | Arguments | Description |
---|---|---|
option() | (“mode”, “PERMISSIVE”) | This is the default value. When it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord , and sets other fields to null. To keep corrupt records, a user can set a string type field named columnNameOfCorruptRecord in a user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. |
option() | (“mode”, “DROPMALFORMED”) | ignores the whole corrupted records. |
option() | (“mode”, “FAILFAST”) | throws an exception when it meets corrupted records. |
Schema evolution
Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. The Parquet data source is now able to automatically detect this case and merge schemas of all these files. We can enable it by either of below
- setting data source option
mergeSchema
totrue
when reading Parquet files (as shown in the examples below), or - setting the global SQL option
spark.sql.parquet.mergeSchema
totrue
.
Scalability and parallelism
One of the most important aspects of Spark is the scalability. It can utilise the computational power of the hundreds of thousands of workers in the cluster to process the data in parallel.
To fully harness the work horse of parallelism, the data source files are ideally split into a reasonable number of files which can be read by all the executors at the same time. If there is only one big gzip file, it will end up with one executor reading the file while the rest of executors kept idle. As a last resort, we can look into this solution Making gzip splittable for Hadoop.