Manipulate Datetime and timestamp in PySpark

Posted by Jason Feng on September 10, 2019

Apache Spark is a general-purpose computational framework which is well known for big data processing. This post will show how to manipulate time-series related data with PySpark.

The Data

Let us make some synthetic stock market data and put them into a DataFrame.

1
2
3
4
5
6
7
8
df = spark.createDataFrame([
    (18, "2017-03-09T10:27:18+00:00", "GOOG"),
    (17, "2017-03-10T15:27:18+00:00", "GOOG"),
    (22, "2017-03-13T12:27:18+00:00", "GOOG"),
    (13, "2017-03-15T12:27:18+00:00", "GOOG"),
    (19, "2017-03-15T02:27:18+00:00", "GOOG"),
    (25, "2017-03-18T11:27:18+00:00", "GOOG")],
    ["Close", "timestampGMT", "Symbol"])

Datetime functions in PySpark

pyspark.sql.functions module provides a rich set of functions to handle and manipulate datetime/timestamp related data.

Convert timestamp string to Unix time

Unix Epoch time is widely used especially for internal storage and computing.

The format arguement is following the pattern letters of the Java class java.text.SimpleDateFormat.

1
2
3
4
5
6
7
8
9
10
11
12
df = df.withColumn('unix_time', F.unix_timestamp(F.col('timestampGMT'), "yyyy-MM-dd'T'HH:mm:ssX"))
df.show(truncate=False)                                                           
+-----+-------------------------+------+----------+
|Close|timestampGMT             |Symbol|unix_time |
+-----+-------------------------+------+----------+
|18   |2017-03-09T10:27:18+00:00|GOOG  |1489055238|
|17   |2017-03-10T15:27:18+00:00|GOOG  |1489159638|
|22   |2017-03-13T12:27:18+00:00|GOOG  |1489408038|
|13   |2017-03-15T12:27:18+00:00|GOOG  |1489580838|
|19   |2017-03-15T02:27:18+00:00|GOOG  |1489544838|
|25   |2017-03-18T11:27:18+00:00|GOOG  |1489836438|
+-----+-------------------------+------+----------+

Get proper timestamp format

By default, Spark use the time zone setting of the system. You can change to your desired time zone with setting the configuration

1
df = df.withColumn('local_ts', F.date_format('timestampGMT', 'dd/MM/yyyy HH:mm:ss Z'))

Truncate to specific unit of time

If we want to aggregate the timestamp to a higher level, for example, get the average value of every day. We can get the date value and then group by the date to calculate the average value.

1
2
3
4
5
6
7
8
9
10
11
12
df = df.withColumn('local_date', F.date_trunc('day', 'timestampGMT'))
df_by_date = df.groupBy('Symbol', 'local_date').agg(F.avg('Close').alias('avg_close'))
df_by_date.show(truncate=False)                                                                                        
+------+-------------------+---------+
|Symbol|local_date         |avg_close|
+------+-------------------+---------+
|GOOG  |2017-03-09 00:00:00|18.0     |
|GOOG  |2017-03-11 00:00:00|17.0     |
|GOOG  |2017-03-13 00:00:00|22.0     |
|GOOG  |2017-03-15 00:00:00|16.0     |
|GOOG  |2017-03-18 00:00:00|25.0     |
+------+-------------------+---------+

Compute moving average

We can use window functions in PySpark to calculate the moving average for each symbol. If we want to get the results based on date range, the trick is the window_frame_clause is only supported numeric expression. We need to use unix_time column which is long type before applying the window function with a litte helper lambda function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
days = lambda d: d * 24 * 60 * 60

win_spec = Window.partitionBy('Symbol') \
    .orderBy('unix_time') \
    .rangeBetween(-days(7), 0)
col_mean_7d = F.avg('Close').over(win_spec)
df_mean = df.withColumn('mean_7d', col_mean_7d)
df_mean.show(truncate=False)                                                                                 
+-----+-------------------------+------+----------+-------------------------+-------------------+-------+
|Close|timestampGMT             |Symbol|unix_time |local_ts                 |local_date         |mean_7d|
+-----+-------------------------+------+----------+-------------------------+-------------------+-------+
|18   |2017-03-09T10:27:18+00:00|GOOG  |1489055238|09/03/2017 21:27:18 +1100|2017-03-09 00:00:00|18.0   |
|17   |2017-03-10T15:27:18+00:00|GOOG  |1489159638|11/03/2017 02:27:18 +1100|2017-03-11 00:00:00|17.5   |
|22   |2017-03-13T12:27:18+00:00|GOOG  |1489408038|13/03/2017 23:27:18 +1100|2017-03-13 00:00:00|19.0   |
|19   |2017-03-15T02:27:18+00:00|GOOG  |1489544838|15/03/2017 13:27:18 +1100|2017-03-15 00:00:00|19.0   |
|13   |2017-03-15T12:27:18+00:00|GOOG  |1489580838|15/03/2017 23:27:18 +1100|2017-03-15 00:00:00|17.8   |
|25   |2017-03-18T11:27:18+00:00|GOOG  |1489836438|18/03/2017 22:27:18 +1100|2017-03-18 00:00:00|19.75  |
+-----+-------------------------+------+----------+-------------------------+-------------------+-------+

If we just want to compute the moving average from the preceding N records to current record, we can use rangeBetween instead of rowsBetween.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
win_spec = Window.partitionBy('Symbol') \
    .orderBy('unix_time') \
    .rowsBetween(-4, 0)
col_mean_5r = F.avg('Close').over(win_spec)
df_mean = df.withColumn('mean_5r', col_mean_5r)
df_mean.show(truncate=False)
+-----+-------------------------+------+----------+-------------------------+-------------------+-------+
|Close|timestampGMT             |Symbol|unix_time |local_ts                 |local_date         |mean_5r|
+-----+-------------------------+------+----------+-------------------------+-------------------+-------+
|18   |2017-03-09T10:27:18+00:00|GOOG  |1489055238|09/03/2017 21:27:18 +1100|2017-03-09 00:00:00|18.0   |
|17   |2017-03-10T15:27:18+00:00|GOOG  |1489159638|11/03/2017 02:27:18 +1100|2017-03-11 00:00:00|17.5   |
|22   |2017-03-13T12:27:18+00:00|GOOG  |1489408038|13/03/2017 23:27:18 +1100|2017-03-13 00:00:00|19.0   |
|19   |2017-03-15T02:27:18+00:00|GOOG  |1489544838|15/03/2017 13:27:18 +1100|2017-03-15 00:00:00|19.0   |
|13   |2017-03-15T12:27:18+00:00|GOOG  |1489580838|15/03/2017 23:27:18 +1100|2017-03-15 00:00:00|17.8   |
|25   |2017-03-18T11:27:18+00:00|GOOG  |1489836438|18/03/2017 22:27:18 +1100|2017-03-18 00:00:00|19.2   |
+-----+-------------------------+------+----------+-------------------------+-------------------+-------+

Soure code can be found here.

Image by Peter H from Pixabay