Parsing XML files made simple by PySpark

Posted by Jason Feng on July 14, 2019

Imagine you are given a task to parse thousands of xml files to extract the information, write the records into table format with proper data types, the task must be done in a timely manner and is repeated every hour. What are you going to do? With Apache Spark, the embarrassingly parallel processing framework, it can be done with much less effort.

Introduction

In this post, we are going to use PySpark to process xml files to extract the required records, transform them into DataFrame, then write as csv files (or any other format) to the destination. The input and the output of this task looks like below.

XML files

XML is designed to store and transport data. XML is self-descriptive which makes it flexibile and extensible to store different kinds of data.

On the other hand, it makes difficult to convert into tabular data because of its nature of semi-structured. For example, in the below XML excerption, the description element can be expanded to multiple lines. The price element can be omitted because it is yet to be determined.

1
2
3
4
5
6
   <book id="bk119">
      <author>Feng, Jason</author>
      <title>Playground</title>
      <description>This is the place where Jason puts his fun stuff
      mainly related with Python, R and GCP.</description>
   </book>

Solution

This is my scribble of the solution.

Step 1: Read XML files into RDD

We use spark.read.text to read all the xml files into a DataFrame. The DataFrame is with one column, and the value of each row is the whole content of each xml file. Then we convert it to RDD which we can utilise some low level API to perform the transformation.

1
2
# read each xml file as one row, then convert to RDD
file_rdd = spark.read.text("./data/*.xml", wholetext=True).rdd

Here is the output of one row in the DataFrame.

1
2
[Row(value='<?xml version="1.0"?>\r\n<catalog>\r\n   <book id="bk119">\r\n      <author>Feng, Jason</author>\r\n      <title>Playground</title>\r\n      <description>This is the place where Jason puts his fun stuff\r\n      mainly related with Python, R and GCP.</description>\r\n   </book>\r\n</catalog>')]

Step 2: Parse XML files, extract the records, and expand into multiple RDDs

Now it comes to the key part of the entire process. We need to parse each xml content into records according the pre-defined schema.

First, we define a function using Python standard library xml.etree.ElementTree to parse and extract the xml elements into a list of records. In this function, we cater for the scenario that some elements are missing which None is returned. It also casts price to float type and publish_date to date type.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def parse_xml(rdd):
    """
    Read the xml string from rdd, parse and extract the elements,
    then return a list of list.
    """
    results = []
    root = ET.fromstring(rdd[0])

    for b in root.findall('book'):
        rec = []
        rec.append(b.attrib['id'])
        for e in ELEMENTS_TO_EXTRAT:
            if b.find(e) is None:
                rec.append(None)
                continue
            value = b.find(e).text
            if e == 'price':
                value = float(value)
            elif e == 'publish_date':
                value = datetime.strptime(value, '%Y-%m-%d')
            rec.append(value)
        results.append(rec)

    return results

Then we use flatMap function which each input item as the content of an XML file can be mapped to multiple items through the function parse_xml. flatMap is one of the functions made me “WoW” when I first used Spark a few years ago.

1
2
# parse xml tree, extract the records and transform to new RDD
records_rdd = file_rdd.flatMap(parse_xml)

Step 3: Convert RDDs into DataFrame

We then convert the transformed RDDs to DataFrame with the pre-defined schema.

1
2
# convert RDDs to DataFrame with the pre-defined schema
book_df = records_rdd.toDF(my_schema)

The DataFrame looks like below.

1
2
3
4
5
6
7
8
9
+-------+--------------------+--------------------+---------------+-----+------------+--------------------+
|book_id|              author|               title|          genre|price|publish_date|         description|
+-------+--------------------+--------------------+---------------+-----+------------+--------------------+
|  bk101|Gambardella, Matthew|XML Developer's G...|       Computer|44.95|  2000-10-01|An in-depth look ...|
|  bk102|          Ralls, Kim|       Midnight Rain|        Fantasy| 5.95|  2000-12-16|A former architec...|
|  bk103|         Corets, Eva|     Maeve Ascendant|        Fantasy| 5.95|  2000-11-17|After the collaps...|
|  bk104|         Corets, Eva|     Oberon's Legacy|        Fantasy| 5.95|  2001-03-10|In post-apocalyps...|
|  bk105|         Corets, Eva|  The Sundered Grail|        Fantasy| 5.95|  2001-09-10|The two daughters...|
|  bk106|    Randall, Cynthia|         Lover Birds|        Romance| 4.95|  2000-09-02|When Carla meets ...|

Step 4: Save DataFrame as csv files

Finally we can save the results as csv files. Spark provides rich set of destination formats, i.e. we can write to JSON, parquet, avro, or even to a table in a database.

1
2
3
# write to csv
book_df.write.format("csv").mode("overwrite")\
    .save("./output")

Conclusion

This is just one of the showcases of what Spark can help to simplify the data processing especially when dealing with large amount of data. Spark provides both high-level API (DataFrame / DataSet), and low-level API (RDD) which enables us with the flexibility to handle various types of data format. Spark also abstracts the physical parallel computation on the cluster. We just need to focus our codes on the implementation of business logic.

Source codes are here.