Imagine you are given a task to parse thousands of xml files to extract the information, write the records into table format with proper data types, the task must be done in a timely manner and is repeated every hour. What are you going to do? With Apache Spark, the embarrassingly parallel processing framework, it can be done with much less effort.
Introduction
In this post, we are going to use PySpark to process xml files to extract the required records, transform them into DataFrame, then write as csv files (or any other format) to the destination. The input and the output of this task looks like below.
XML files
XML is designed to store and transport data. XML is self-descriptive which makes it flexibile and extensible to store different kinds of data.
On the other hand, it makes difficult to convert into tabular data because of its nature of semi-structured. For example, in the below XML excerption, the description
element can be expanded to multiple lines. The price
element can be omitted because it is yet to be determined.
1
2
3
4
5
6
<book id="bk119">
<author>Feng, Jason</author>
<title>Playground</title>
<description>This is the place where Jason puts his fun stuff
mainly related with Python, R and GCP.</description>
</book>
Solution
This is my scribble of the solution.
Step 1: Read XML files into RDD
We use spark.read.text
to read all the xml files into a DataFrame. The DataFrame is with one column, and the value of each row is the whole content of each xml file. Then we convert it to RDD which we can utilise some low level API to perform the transformation.
1
2
# read each xml file as one row, then convert to RDD
file_rdd = spark.read.text("./data/*.xml", wholetext=True).rdd
Here is the output of one row in the DataFrame.
1
2
[Row(value='<?xml version="1.0"?>\r\n<catalog>\r\n <book id="bk119">\r\n <author>Feng, Jason</author>\r\n <title>Playground</title>\r\n <description>This is the place where Jason puts his fun stuff\r\n mainly related with Python, R and GCP.</description>\r\n </book>\r\n</catalog>')]
Step 2: Parse XML files, extract the records, and expand into multiple RDDs
Now it comes to the key part of the entire process. We need to parse each xml content into records according the pre-defined schema.
First, we define a function using Python standard library xml.etree.ElementTree
to parse and extract the xml elements into a list of records. In this function, we cater for the scenario that some elements are missing which None
is returned. It also casts price
to float type and publish_date
to date type.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def parse_xml(rdd):
"""
Read the xml string from rdd, parse and extract the elements,
then return a list of list.
"""
results = []
root = ET.fromstring(rdd[0])
for b in root.findall('book'):
rec = []
rec.append(b.attrib['id'])
for e in ELEMENTS_TO_EXTRAT:
if b.find(e) is None:
rec.append(None)
continue
value = b.find(e).text
if e == 'price':
value = float(value)
elif e == 'publish_date':
value = datetime.strptime(value, '%Y-%m-%d')
rec.append(value)
results.append(rec)
return results
Then we use flatMap
function which each input item as the content of an XML file can be mapped to multiple items through the function parse_xml
. flatMap
is one of the functions made me “WoW” when I first used Spark a few years ago.
1
2
# parse xml tree, extract the records and transform to new RDD
records_rdd = file_rdd.flatMap(parse_xml)
Step 3: Convert RDDs into DataFrame
We then convert the transformed RDDs to DataFrame with the pre-defined schema.
1
2
# convert RDDs to DataFrame with the pre-defined schema
book_df = records_rdd.toDF(my_schema)
The DataFrame looks like below.
1
2
3
4
5
6
7
8
9
+-------+--------------------+--------------------+---------------+-----+------------+--------------------+
|book_id| author| title| genre|price|publish_date| description|
+-------+--------------------+--------------------+---------------+-----+------------+--------------------+
| bk101|Gambardella, Matthew|XML Developer's G...| Computer|44.95| 2000-10-01|An in-depth look ...|
| bk102| Ralls, Kim| Midnight Rain| Fantasy| 5.95| 2000-12-16|A former architec...|
| bk103| Corets, Eva| Maeve Ascendant| Fantasy| 5.95| 2000-11-17|After the collaps...|
| bk104| Corets, Eva| Oberon's Legacy| Fantasy| 5.95| 2001-03-10|In post-apocalyps...|
| bk105| Corets, Eva| The Sundered Grail| Fantasy| 5.95| 2001-09-10|The two daughters...|
| bk106| Randall, Cynthia| Lover Birds| Romance| 4.95| 2000-09-02|When Carla meets ...|
Step 4: Save DataFrame as csv files
Finally we can save the results as csv files. Spark provides rich set of destination formats, i.e. we can write to JSON, parquet, avro, or even to a table in a database.
1
2
3
# write to csv
book_df.write.format("csv").mode("overwrite")\
.save("./output")
Conclusion
This is just one of the showcases of what Spark can help to simplify the data processing especially when dealing with large amount of data. Spark provides both high-level API (DataFrame / DataSet), and low-level API (RDD) which enables us with the flexibility to handle various types of data format. Spark also abstracts the physical parallel computation on the cluster. We just need to focus our codes on the implementation of business logic.
Source codes are here.