sryza/spark-timeseries

Python API's DateTimeIndex keys do not match Java's ZonedDateTimes

Opened this issue · 5 comments

Consider the following test data:

>>> sightings = [{"_id" : "1", "location": "Alderaan",   "minutes":  1, "date": "1977-05-25", "subject":  "Leia Organa"},
                     {"_id" : "2", "location": "Alderaan",   "minutes": 32, "date": "1982-01-01", "subject":  "Leia Organa"},
                     {"_id" : "3", "location": "Rebel Base", "minutes": 25, "date": "1982-10-02", "subject":  "Leia Organa"},
                     {"_id" : "4", "location": "Rebel Base", "minutes": 79, "date": "1982-10-05", "subject":  "Leia Organa"},
                     {"_id" : "5", "location": "Hoth",       "minutes":  2, "date": "1985-06-01", "subject":  "Leia Organa"},
                     {"_id" : "a", "location": "Tatooine",   "minutes": 92, "date": "1977-05-25", "subject":  "Luke Skywalker"},
                     {"_id" : "b", "location": "Tatooine",   "minutes": 17, "date": "1983-12-01", "subject":  "Luke Skywalker"}, 
                     {"_id" : "c", "location": "Tatooine",   "minutes": 26, "date": "1983-12-25", "subject":  "Luke Skywalker"},
                     {"_id" : "d", "location": "Tatooine",   "minutes":  4, "date": "1983-12-25", "subject":  "Luke Skywalker"},
                     {"_id" : "e", "location": "Hoth",       "minutes":  7, "date": "1985-06-01", "subject":  "Luke Skywalker"}]


>>> df = sqlContext.createDataFrame(sightings).select("subject", "date", "minutes")

>>> df = df.withColumn("minutes", df.minutes.cast("double"))\
...                .withColumn("date", df.date.cast("timestamp"))
​
>>> end_date = Timestamp("1985-06-01")
>>> start_date = Timestamp("1977-05-25")

>>> df.show()

+--------------+--------------------+-------+
|       subject|                date|minutes|
+--------------+--------------------+-------+
|   Leia Organa|1977-05-25 00:00:...|    1.0|
|   Leia Organa|1982-01-01 00:00:...|   32.0|
|   Leia Organa|1982-10-02 00:00:...|   25.0|
|   Leia Organa|1982-10-05 00:00:...|   79.0|
|   Leia Organa|1985-06-01 00:00:...|    2.0|
|Luke Skywalker|1977-05-25 00:00:...|   92.0|
|Luke Skywalker|1983-12-01 00:00:...|   17.0|
|Luke Skywalker|1983-12-25 00:00:...|   26.0|
|Luke Skywalker|1983-12-25 00:00:...|    4.0|
|Luke Skywalker|1985-06-01 00:00:...|    7.0|
+--------------+--------------------+-------+

>>> df.printSchema()
root
 |-- subject: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- minutes: double (nullable = true)

>>> print end_date
1985-06-01 00:00:00
>>> print start_date
1977-05-25 00:00:00

Creating a 24-hour frequency index works just fine. You can see that a quick collect() shows that, at least on the python level, every timestamp in the dataframe has a corresponding location on the DateTimeIndex
(side note: I couldn't get this to work with a DayFrequency because of a separate daylight savings issue)

>>> index = uniform(start_date, end_date, freq=HourFrequency(24,sc), sc=sc)
>>> print index
​uniform,America/Los_Angeles,1977-05-24T17:00-07:00[America/Los_Angeles],2930,hours 24

>>> for row in df.collect(): print "Date:", row["date"], "index_location:", index[Timestamp(row["date"])]

Date: 1977-05-25 00:00:00 index_location: 0
Date: 1982-01-01 00:00:00 index_location: 1682
Date: 1982-10-02 00:00:00 index_location: 1956
Date: 1982-10-05 00:00:00 index_location: 1959
Date: 1985-06-01 00:00:00 index_location: 2929
Date: 1977-05-25 00:00:00 index_location: 0
Date: 1983-12-01 00:00:00 index_location: 2381
Date: 1983-12-25 00:00:00 index_location: 2405
Date: 1983-12-25 00:00:00 index_location: 2405
Date: 1985-06-01 00:00:00 index_location: 2929

However, trying to generate the timeseriesRDD from this DF using time_series_rdd_from_observations results in none of the values actually being inserted at their appropriate location (they're all Nan):

>>> tsrdd = time_series_rdd_from_observations(index, df, "date", "subject", "minutes")
>>> for ts in tsrdd.collect(): print ts
(u'Luke Skywalker', array([ nan,  nan,  nan, ...,  nan,  nan,  nan]))
(u'Leia Organa', array([ nan,  nan,  nan, ...,  nan,  nan,  nan]))

>>> for ts in tsrdd.collect(): print [num for num in ts[1].tolist() if num > 0]
[]
[]

Trying to use a different index or initializing the index with strings/nanos is also unsuccessful. My guess of what is happening is that the python serializer is passing datetime objects to whatever fills in the RDD values and not Timestamp objects. I believe the DateTimeIndex does not work with datetimes as keys

I've found the source of the error; will upload explanation soon

sryza commented

Thanks for looking into this @diegorep

Ok, so it seems like the problem is not exactly in the time_series_rdd_from_observations() function but rather in the DateTimeIndex objects.

There are three main issues we're looking at:

1) The python version of the DateTimeIndex is behaving differently than the Java one

If you remember the collect() example in the issue description, the DateTimeIndex is correctly matching each Timestamp to its key, but it somehow didn't perform this matching when the DateTimeIndex was used within time_series_rdd_from_observations:

>>> df.show()
+--------------+--------------------+-------+
|       subject|                date|minutes|
+--------------+--------------------+-------+
|   Leia Organa|1977-05-25 00:00:...|    1.0|
|   Leia Organa|1982-01-01 00:00:...|   32.0|
|   Leia Organa|1982-10-02 00:00:...|   25.0|
|   Leia Organa|1982-10-05 00:00:...|   79.0|
|   Leia Organa|1985-06-01 00:00:...|    2.0|
|Luke Skywalker|1977-05-25 00:00:...|   92.0|
|Luke Skywalker|1983-12-01 00:00:...|   17.0|
|Luke Skywalker|1983-12-25 00:00:...|   26.0|
|Luke Skywalker|1983-12-25 00:00:...|    4.0|
|Luke Skywalker|1985-06-01 00:00:...|    7.0|
+--------------+--------------------+-------+

...
...
...

>>> index = uniform(start_date, end_date, freq=HourFrequency(24,sc), sc=sc)
>>> print index
​uniform,America/Los_Angeles,1977-05-24T17:00-07:00[America/Los_Angeles],2930,hours 24

>>> for row in df.collect(): print "Date:", row["date"], "index_location:", index[Timestamp(row["date"])]

Date: 1977-05-25 00:00:00 index_location: 0
Date: 1982-01-01 00:00:00 index_location: 1682
Date: 1982-10-02 00:00:00 index_location: 1956
Date: 1982-10-05 00:00:00 index_location: 1959
Date: 1985-06-01 00:00:00 index_location: 2929
Date: 1977-05-25 00:00:00 index_location: 0
Date: 1983-12-01 00:00:00 index_location: 2381
Date: 1983-12-25 00:00:00 index_location: 2405
Date: 1983-12-25 00:00:00 index_location: 2405
Date: 1985-06-01 00:00:00 index_location: 2929

>>> tsrdd = time_series_rdd_from_observations(index, df, "date", "subject", "minutes")
>>> for ts in tsrdd.collect(): print ts
(u'Luke Skywalker', array([ nan,  nan,  nan, ...,  nan,  nan,  nan]))
(u'Leia Organa', array([ nan,  nan,  nan, ...,  nan,  nan,  nan]))

>>> for ts in tsrdd.collect(): print [num for num in ts[1].tolist() if num > 0]
[]
[]

After looking at some of the project's older issues, I suspected the problem may have occurred when converting the Timestamp objects into the Java ZonedDateTime objects. Surely enough, I found that performing some timezone localization judo yielded a version where the Timestamp objects were not valid keys of the python DateTimeIndex, but they somehow were being correctly matched to the Java index inside time_series_rdd_from_observations():

>>> df = df.withColumn("date", F.udf(lambda ts: (Timestamp(ts)).tz_localize("UTC")\
                                                                .astimezone('America/Los_Angeles')\
                                                                .to_datetime(), TimestampType())("date"))

>>> end_date = Timestamp("1985-06-01").tz_localize('UTC')
>>> start_date = Timestamp("1977-05-25").tz_localize('UTC')

>>> df.show()
+--------------+--------------------+-------+
|       subject|                date|minutes|
+--------------+--------------------+-------+
|   Leia Organa|1977-05-24 17:00:...|    1.0|
|   Leia Organa|1981-12-31 16:00:...|   32.0|
|   Leia Organa|1982-10-01 17:00:...|   25.0|
|   Leia Organa|1982-10-04 17:00:...|   79.0|
|   Leia Organa|1985-05-31 17:00:...|    2.0|
|Luke Skywalker|1977-05-24 17:00:...|   92.0|
|Luke Skywalker|1983-11-30 16:00:...|   17.0|
|Luke Skywalker|1983-12-24 16:00:...|   26.0|
|Luke Skywalker|1983-12-24 16:00:...|    4.0|
|Luke Skywalker|1985-05-31 17:00:...|    7.0|
+--------------+--------------------+-------+

>>> index = uniform(start_date, end_date, freq=HourFrequency(24,sc), sc=sc)
>>> print index
​uniform,America/Los_Angeles,1977-05-24T17:00-07:00[America/Los_Angeles],2930,hours 24

>>> for row in df.collect(): print "Date:", row["date"], "index_location:", index[Timestamp(row["date"])]

uniform,America/Los_Angeles,1977-05-24T17:00-07:00[America/Los_Angeles],2930,hours 24
Date: 1977-05-24 17:00:00 index_location: -1
Date: 1981-12-31 16:00:00 index_location: -1
Date: 1982-10-01 17:00:00 index_location: -1
Date: 1982-10-04 17:00:00 index_location: -1
Date: 1985-05-31 17:00:00 index_location: -1
Date: 1977-05-24 17:00:00 index_location: -1
Date: 1983-11-30 16:00:00 index_location: -1
Date: 1983-12-24 16:00:00 index_location: -1
Date: 1983-12-24 16:00:00 index_location: -1
Date: 1985-05-31 17:00:00 index_location: -1

>>> tsrdd = time_series_rdd_from_observations(index, df, "date", "subject", "minutes")
>>> for ts in tsrdd.collect(): print ts
(u'Luke Skywalker', array([ 92.,  nan,  nan, ...,  nan,  nan,   7.]))
(u'Leia Organa', array([  1.,  nan,  nan, ...,  nan,  nan,   2.]))

>>> for ts in tsrdd.collect(): print [num for num in ts[1].tolist() if num > 0]
[92.0, 17.0, 4.0, 7.0]
[1.0, 32.0, 25.0, 79.0, 2.0]

Clearly, there's some dissonance between what the python DateTImeIndex claims to have as keys in its index and what the keys turn out to be in reality. I'm not one of the most familiar persons with this project yet, but I'll go on a whim and say that this discrepancy is really a bug and not a feature by design

2) There seems to be no way of adjusting the DateTimeIndex's timezone information

As you can see above, explicitly "casting" my Timestamps into the America/Los_Angeles tz provided a viable solution to get the RDD going. However, the reality is that this method artificially skews my data (which I need to be in UTC). Ideally, I'd be able to have a DateTimeIndex object with the tz information I desire (or, even better, a tz-agnositc one). However, I can't figure out any way to do this. Localizing the start/end dates fed into the uniform function into a TZ other than America/Los_Angeles seems to have no effect. Maybe there is a way to do this but I haven't been able to find it. If there is a way, however, I'll at least say it's darn hard to find.

3) There is no easy index solution for users who would like to use a Timeseries with lower resolution (e.g. daily timestamps)

I've noticed some DayFrequency and BusinessDayFrequency objects in the project. Initially, I tried using these objects as the problems I deal with need a time resolution of no more than one day. In the day-by-day setting, the most intuitive behavior is one where Timestamp 2016-01-01 00:00:00 and 2016-01-01 01:00:00 both match to the same Day in the DateTimeIndex. After all, they both occurred in the same day. However, what I've noticed is that indices built using the DayFrequency object only accept timestamps that are separated precisely in 24 hour intervals (+/-1 an hour when the TZ changes), meaning that there is absolutely no difference between a 24-hr DayFrequency object and a 1-day DayFrequency object. I'm not sure if this is an intentional design choice or not. In either case, the current DayFrequency index behavior is hardly a robust solution for me, and I suspect it may be hard for others to utilize as well.

@sryza want to make a decision on this? I could try to implement a solution but don't want to work on anything without agreeing on it beforehand. We should at least allow for specifying tz info in the python TimeIndex objects

sryza commented

@diegorep sorry for the long delay in getting to this. Your assessment seems spot on. Is there a solution that you prefer? Allowing specifying tz info in Python DateTimeIndex objects seems like a good idea to me. If you'd be willing to submit a PR, I'd be happy to review it.