Parallel_ts_fc_Dask

The dataset we will be using is the M4 Competition Dataset: https://www.m4.unic.ac.cy/the-dataset/ For more information about this competition or time series forecasting in general I'd recommend Rob Hyndman's Blog, he is the creator of the popular R forecast package. He also has a free online textbook about forecasting: https://www.otexts.org/fpp

Required packages in R:

Required packages in Python:

  • Dask native parallel processing in python
  • Prophet facebook forecasting library
  • rpy2 runs R code within python
  • plotly interactive plotting library based on D3
  • cufflinks high lever wrapper around plotly to directly use pandas dataframes
  • magi high lever wrapper for parallel univariate time series forecasting

Required Dataset:

Getting the Data

First we'll import everything we need: note that we import the ts object and the forecast package from R using rpy2 interface, as well as activating pandas2ri, which makes dealing with the r-python integration much easier in this case

In [54]:
from rpy2.robjects.packages import importr
#get ts object as python object
from rpy2.robjects import pandas2ri
import rpy2.robjects as robjects
ts=robjects.r('ts')
#import forecast package
forecast=importr('forecast')

import pandas as pd
import numpy as np
from fbprophet import Prophet

from datetime import datetime,timedelta
from dateutil.relativedelta import relativedelta
import time
import zipfile

from dask.distributed import Client, LocalCluster
import dask

from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
from plotly.graph_objs import Scatter, Figure, Layout
import plotly
import plotly.graph_objs as go
init_notebook_mode(connected=False)

Note: If you don't want to or are having troubles downloading the M4Dataset file, you can also create a dataframe of random time series by using the following code for demonstration purposes. However, I did design the notebook to work with the M4 Dataset so you will have to edit the cells to work with this dataframe instead if you want to run this yourself

In [2]:
def gen_ts(freq='MS',ncols=5,nrows=24,num_range=[0,10000],end_date='2018-04-01'):
    colnames = []
    for i in range(ncols):
        colnames.append('ts'+str(i))
    df = pd.DataFrame(np.random.randint(num_range[0],num_range[1],size=(nrows, ncols)), columns=colnames)
    df_date_index = pd.date_range(end=end_date, periods=nrows, freq=freq)
    df.index = df_date_index
    return df
df=gen_ts()
df.head()
Out[2]:
ts0 ts1 ts2 ts3 ts4
2016-05-01 8461 5136 6508 1819 2403
2016-06-01 8525 7987 7564 3597 338
2016-07-01 9054 6645 8945 6639 6099
2016-08-01 6665 76 943 1312 1918
2016-09-01 3729 6544 7943 5504 5146

here we'll start our Dask Local cluster which we'll use for our parallelized time series forecasting later on in the notebook

In [3]:
cluster = LocalCluster()
client = Client(cluster)
client
Out[3]:

Client

  • Scheduler: tcp://127.0.0.1:32990

Cluster

  • Workers: 6
  • Cores: 6
  • Memory: 33.57 GB

We can see which files are included in this zip archive using .namelist()

In [4]:
zf = zipfile.ZipFile('M4DataSet.zip')
zf.namelist()
Out[4]:
['Daily-train.csv',
 'Hourly-train.csv',
 'Monthly-train.csv',
 'Quarterly-train.csv',
 'Weekly-train.csv',
 'Yearly-train.csv']

let's read one file in and check out the format of these files

In [5]:
monthly_df = pd.read_csv(zf.open('Monthly-train.csv'))
#need to transpose so that we have time series as columns and can index on V's as the dates
monthly_df = monthly_df.set_index('V1').T
monthly_df.head()
Out[5]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 ... M47991 M47992 M47993 M47994 M47995 M47996 M47997 M47998 M47999 M48000
V2 8000.0 2440.0 2670.0 7264.0 4210.0 5600.0 865.0 796.0 8980.0 16960.0 ... 1500.0 90.0 804.71 1361.3 136.13 9964.0 1696.6 5900.0 4660.0 6070.0
V3 8350.0 2490.0 2590.0 7215.0 4290.0 5700.0 902.0 856.0 8970.0 17170.0 ... 3300.0 190.0 1019.47 1454.0 281.52 7212.0 1627.2 6000.0 4980.0 6150.0
V4 8570.0 2710.0 2784.0 7022.0 4250.0 5700.0 909.0 848.0 9120.0 17420.0 ... 4700.0 290.0 765.91 1514.4 434.67 7858.0 1465.4 6280.0 5060.0 6350.0
V5 7700.0 2290.0 2816.0 7040.0 4320.0 5700.0 945.0 890.0 9080.0 17680.0 ... 6200.0 400.0 735.20 1636.2 601.34 8493.0 1672.8 6100.0 5090.0 6130.0
V6 7080.0 1960.0 2888.0 6966.0 4320.0 5600.0 953.0 909.0 9600.0 17670.0 ... 8000.0 520.0 782.96 1631.3 764.48 8203.0 1760.1 5410.0 4790.0 5560.0

5 rows × 48000 columns

In [6]:
monthly_df.tail()
Out[6]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 ... M47991 M47992 M47993 M47994 M47995 M47996 M47997 M47998 M47999 M48000
V2791 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
V2792 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
V2793 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
V2794 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
V2795 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN

5 rows × 48000 columns

let's read in the info file also. For this tutorial we'll only be looking at the monthly data

In [7]:
M4_info = pd.read_csv('M4-info.csv')
M4_info.head()
Out[7]:
M4id category Frequency Horizon SP
0 Y1 Macro 1 6 Yearly
1 Y2 Macro 1 6 Yearly
2 Y3 Macro 1 6 Yearly
3 Y4 Macro 1 6 Yearly
4 Y5 Macro 1 6 Yearly

So it looks like we have 48000 monthly time series in this data, with possibly 2795 months for each series. However we do see that alot of series do not line up with each other, so we'll have to keep this in mind

We can see from the cell below that all the monthly series have a horizon of 18 months so we can use this as our forecast horizon in the upcoming functions

In [8]:
M4_info[M4_info['SP']=='Monthly'].describe()
Out[8]:
Frequency Horizon
count 48000.0 48000.0
mean 12.0 18.0
std 0.0 0.0
min 12.0 18.0
25% 12.0 18.0
50% 12.0 18.0
75% 12.0 18.0
max 12.0 18.0

Formatting the Data

So these dataframes seem to be all unnamed columns with the time index in V1, we'll just set this index explicitly to make manipulating this data easier

In [9]:
monthly_df_indexed = monthly_df.copy()
monthly_df_indexed.index.name = 'Time'
monthly_df_indexed.head()
Out[9]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 ... M47991 M47992 M47993 M47994 M47995 M47996 M47997 M47998 M47999 M48000
Time
V2 8000.0 2440.0 2670.0 7264.0 4210.0 5600.0 865.0 796.0 8980.0 16960.0 ... 1500.0 90.0 804.71 1361.3 136.13 9964.0 1696.6 5900.0 4660.0 6070.0
V3 8350.0 2490.0 2590.0 7215.0 4290.0 5700.0 902.0 856.0 8970.0 17170.0 ... 3300.0 190.0 1019.47 1454.0 281.52 7212.0 1627.2 6000.0 4980.0 6150.0
V4 8570.0 2710.0 2784.0 7022.0 4250.0 5700.0 909.0 848.0 9120.0 17420.0 ... 4700.0 290.0 765.91 1514.4 434.67 7858.0 1465.4 6280.0 5060.0 6350.0
V5 7700.0 2290.0 2816.0 7040.0 4320.0 5700.0 945.0 890.0 9080.0 17680.0 ... 6200.0 400.0 735.20 1636.2 601.34 8493.0 1672.8 6100.0 5090.0 6130.0
V6 7080.0 1960.0 2888.0 6966.0 4320.0 5600.0 953.0 909.0 9600.0 17670.0 ... 8000.0 520.0 782.96 1631.3 764.48 8203.0 1760.1 5410.0 4790.0 5560.0

5 rows × 48000 columns

We'll stop here for a second and discuss the overall high level process we are going to follow and the format the data needs to be in for this process to work.

This dataframe format above is our data framework. That is, for everything that we are going to work we should structure our data structure as a pandas dataframe where the index is the time or date of the series and the columns are all the different time series for that particular dimension. If we wanted to forecast for say 1000 sensors that all have similar time components we could put them in one dataframe. But if you want to aggregate all sensor data and do a forecast on that too, that should be a separate dataframe. This framework is a very important concept because it enables everything else.

If you think about it, this situation is easily parallelizable because each column is indepedent of every other column in terms of forecasting for it (these series could be related and you could do analysis around that but that's a topic for discussion for another day. In terms of computation, each series can be considered independent)

Now that our dataframe is in the correct format our general workflow will entail four steps:

  • Clean our dataframe
  • compute model predictions on dataframe
  • How to calculate error measures (only showing in sample accuracy in this notebook, but out of sample cross validation is recommended)
  • Visualize predictions to get sense of what's going on

There's a lot more you may want to add in like cross validation and rolling origin accuracy measures for model selection, but for the purposes of this tutorial I just want to show how you can do the foundational forecasting workflows in this framework

Side Note: This framework isn't the most optimized, especially for this particular use case of the competition. It is more well suited for series that are all on the same time scale. However, i think it's the easiest way to understand what's going on, so that's why I'm doing it this way. You could also format your data long ways with a multiindex for each series or just no index and make it work if you know what you're doing. That would even simplify your code in this instance because you don't have to keep indexing by just the part of each series you care about.

Cleaning the Data

In the below cell I show how to extract one time series from this dataframe. If all your series were on the same time scale you would not need these extra steps, but in this case the series could stop and start at any point in the dataframe, so here we're finding the beginning and end, and extracting the series from those indexes

In [10]:
#find the start of the time series
start_ts = monthly_df_indexed['M600'][monthly_df_indexed['M600'].notna()].index[0]
#find the end of the time series
end_ts = monthly_df_indexed['M600'][monthly_df_indexed['M600'].notna()].index[-1]
#get full time series to be cleaned
ts_ex = monthly_df_indexed['M600'].loc[start_ts:end_ts]
ts_ex.head()
Out[10]:
Time
V2    351.8
V3    346.4
V4    345.9
V5    346.8
V6    351.4
Name: M600, dtype: float64
In [11]:
ts_ex.tail()
Out[11]:
Time
V622    25163.2
V623    25050.8
V624    25121.7
V625    25294.4
V626    26175.4
Name: M600, dtype: float64

In practice cleaning your data in general is heavily dependent on your domain and your dataframe may require very little or a whole lot of business specific data cleansing to get it in a state ready for forecasting. This should be the most customized step of the process, I'll just show some general things here

Important:

Here we introduce another important concept to fit nicely in our parallel framework. Both our cleaning and modeling functions should take as input and return as output a time series (just a series in python that has a Datetime index)

In [12]:
def clean_ts(ts):
    
    """
    takes time series as input and returns cleaned time series
    
    *removes outliers
    *linearly interpolates missing values
    
    returns time series back
    """
    #find the start of the time series
    start_ts = ts[ts.notna()].index[0]
    #find the end of the time series
    end_ts = ts[ts.notna()].index[-1]
    ts = ts.loc[start_ts:end_ts]
    
    #sets outliers to NaN defined as a point that is 3 deviations away from rolling mean + 3*rolling standard deviation
    ts = ts.where(~(abs(ts) > (ts.rolling(center=False,window=3).mean() + (ts.rolling(center=False,window=3,min_periods = 3).std() * 3))),np.nan)
    #linearly interpolate any NaN values
    ts = ts.interpolate()
    #make sure any remaining null values are filled with 0's
    ts = ts.fillna(0)
            
    return ts
In [13]:
clean_ts(monthly_df_indexed['M600']).head()
Out[13]:
Time
V2    351.8
V3    346.4
V4    345.9
V5    346.8
V6    351.4
Name: M600, dtype: float64

for this example we're going to limit to just 20 series so the cells don't take too long to execute

let's give our first introduction to what you can do with dask here, we'll time the execution of applying this function across all dataframe columns in single threaded manner, and then use dask delayed to parallelize this operation

In [14]:
monthly_df_small = monthly_df_indexed.iloc[:,:20]
monthly_df_small.head()
Out[14]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
Time
V2 8000.0 2440.0 2670.0 7264.0 4210.0 5600.0 865.0 796.0 8980.0 16960.0 3727.0 900.0 584.9 1159.0 843.4 3537.0 1769.4 1767.6 2250.0 2600.0
V3 8350.0 2490.0 2590.0 7215.0 4290.0 5700.0 902.0 856.0 8970.0 17170.0 3776.0 909.0 596.3 1156.0 837.1 3532.0 1786.6 1745.4 3220.0 5800.0
V4 8570.0 2710.0 2784.0 7022.0 4250.0 5700.0 909.0 848.0 9120.0 17420.0 3799.0 913.0 601.5 1156.0 834.9 3556.0 1799.7 1756.3 2710.0 2500.0
V5 7700.0 2290.0 2816.0 7040.0 4320.0 5700.0 945.0 890.0 9080.0 17680.0 3815.0 919.0 605.8 1164.0 843.9 3564.0 1797.1 1766.9 2460.0 2600.0
V6 7080.0 1960.0 2888.0 6966.0 4320.0 5600.0 953.0 909.0 9600.0 17670.0 3806.0 937.0 624.6 1174.0 849.5 3585.0 1794.7 1790.3 2500.0 2600.0

Single threaded/ Single Worker for loop to clean the series

In [15]:
%%time
output = []
for series in monthly_df_small:
    cleaned_series = clean_ts(monthly_df_small[series])
    output.append(cleaned_series)
    
#turn list of series back into full dataframe
full_df = pd.concat(output,ignore_index=False,keys=monthly_df_small.columns,axis=1)
#sort by index to get back to format we want
sorted_index = full_df.index.to_series().str[1:].astype(int).sort_values()
cleaned_df = full_df.reindex(index=sorted_index.index)
CPU times: user 80 ms, sys: 7.09 ms, total: 87.1 ms
Wall time: 83.6 ms
/opt/conda/lib/python3.5/site-packages/ipykernel/__main__.py:7: FutureWarning: Sorting because non-concatenation axis is not aligned. A future version
of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=True'.

To retain the current behavior and silence the warning, pass sort=False

In [16]:
cleaned_df.head()
Out[16]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
V2 8000.0 2440.0 2670.0 7264.0 4210.0 5600.0 865.0 796.0 8980.0 16960.0 3727.0 900.0 584.9 1159.0 843.4 3537.0 1769.4 1767.6 2250.0 2600.0
V3 8350.0 2490.0 2590.0 7215.0 4290.0 5700.0 902.0 856.0 8970.0 17170.0 3776.0 909.0 596.3 1156.0 837.1 3532.0 1786.6 1745.4 3220.0 5800.0
V4 8570.0 2710.0 2784.0 7022.0 4250.0 5700.0 909.0 848.0 9120.0 17420.0 3799.0 913.0 601.5 1156.0 834.9 3556.0 1799.7 1756.3 2710.0 2500.0
V5 7700.0 2290.0 2816.0 7040.0 4320.0 5700.0 945.0 890.0 9080.0 17680.0 3815.0 919.0 605.8 1164.0 843.9 3564.0 1797.1 1766.9 2460.0 2600.0
V6 7080.0 1960.0 2888.0 6966.0 4320.0 5600.0 953.0 909.0 9600.0 17670.0 3806.0 937.0 624.6 1174.0 849.5 3585.0 1794.7 1790.3 2500.0 2600.0

parallelized for loop using dask.delayed to clean the series (6 workers)

In [17]:
%%time
output = []
for series in monthly_df_small:
    cleaned_series = dask.delayed(clean_ts)(monthly_df_small[series])
    output.append(cleaned_series)
    
total = dask.delayed(output).compute()
#turn list of series back into full dataframe
full_df = pd.concat(total,ignore_index=False,keys=monthly_df_small.columns,axis=1)

sorted_index = full_df.index.to_series().str[1:].astype(int).sort_values()
cleaned_df = full_df.reindex(index=sorted_index.index)
CPU times: user 102 ms, sys: 13.5 ms, total: 115 ms
Wall time: 584 ms
/opt/conda/lib/python3.5/site-packages/ipykernel/__main__.py:8: FutureWarning: Sorting because non-concatenation axis is not aligned. A future version
of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=True'.

To retain the current behavior and silence the warning, pass sort=False

In [18]:
cleaned_df.head()
Out[18]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
V2 8000.0 2440.0 2670.0 7264.0 4210.0 5600.0 865.0 796.0 8980.0 16960.0 3727.0 900.0 584.9 1159.0 843.4 3537.0 1769.4 1767.6 2250.0 2600.0
V3 8350.0 2490.0 2590.0 7215.0 4290.0 5700.0 902.0 856.0 8970.0 17170.0 3776.0 909.0 596.3 1156.0 837.1 3532.0 1786.6 1745.4 3220.0 5800.0
V4 8570.0 2710.0 2784.0 7022.0 4250.0 5700.0 909.0 848.0 9120.0 17420.0 3799.0 913.0 601.5 1156.0 834.9 3556.0 1799.7 1756.3 2710.0 2500.0
V5 7700.0 2290.0 2816.0 7040.0 4320.0 5700.0 945.0 890.0 9080.0 17680.0 3815.0 919.0 605.8 1164.0 843.9 3564.0 1797.1 1766.9 2460.0 2600.0
V6 7080.0 1960.0 2888.0 6966.0 4320.0 5600.0 953.0 909.0 9600.0 17670.0 3806.0 937.0 624.6 1174.0 849.5 3585.0 1794.7 1790.3 2500.0 2600.0

You see here there is actually not a big difference,and sometimes dask may actually be slower, but that's because the slowest part of this process is combining all the series back together with pd.concat and then sorting by the index and that is not parallelized. Also it's only 1000 columns, if we were to use the full dataframe or bigger data you would start to see more improvements. The rule of thumb I've noticed with dask is it's worth it over pandas if your computation time start taking in the order of minutes, for example longer than 20 minutes, you can start to see significant speedup times

The most exciting thing you should see from above though is how easy it is to turn your normal code into parallel code using Dask. You can take any custom workload and change a couple lines of code and have your for loop or whatever it is to be able to run in parallel using dask delayed. What's more, that cell where we defined our localCluster, all we have to do is replace that with the scheduler address of a remote cluster to have our computation run in a distributed cluster. it's that easy to go from testing locally to running remotely on your cluster. Of course, there's other considerations you want to keep in mind when deploying to a cluster like communication time between the nodes. You'd want to design your workflow to minimize data transfer between nodes and the way this is set up doesn't necessarily do that, but those are details for another time.

Forecasting

We will now show the actual forecasting part now. This part can benefit the most from parallelism as it's computationally expensive. I have created a couple functions below for Prophet library as well as R forecasting functions, which take in a time series object as input, fit and predict using a specified model, and return a time series object back These functions are what we will use to iterate over our dataframe and fit the model to each series

these functions require actual dates to work which you should normally have for series. This competiton does not give us dates so we are gonna make some up as a workaround and then reset to the V numbering afterwards. We will need to use pd.date_range to generate these

In [19]:
dates = pd.date_range(end='4/1/2018', periods=len(cleaned_df),freq='MS')
dated_df = cleaned_df.iloc[:len(cleaned_df),:].set_index(dates)
dated_df.head()
Out[19]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
1941-11-01 8000.0 2440.0 2670.0 7264.0 4210.0 5600.0 865.0 796.0 8980.0 16960.0 3727.0 900.0 584.9 1159.0 843.4 3537.0 1769.4 1767.6 2250.0 2600.0
1941-12-01 8350.0 2490.0 2590.0 7215.0 4290.0 5700.0 902.0 856.0 8970.0 17170.0 3776.0 909.0 596.3 1156.0 837.1 3532.0 1786.6 1745.4 3220.0 5800.0
1942-01-01 8570.0 2710.0 2784.0 7022.0 4250.0 5700.0 909.0 848.0 9120.0 17420.0 3799.0 913.0 601.5 1156.0 834.9 3556.0 1799.7 1756.3 2710.0 2500.0
1942-02-01 7700.0 2290.0 2816.0 7040.0 4320.0 5700.0 945.0 890.0 9080.0 17680.0 3815.0 919.0 605.8 1164.0 843.9 3564.0 1797.1 1766.9 2460.0 2600.0
1942-03-01 7080.0 1960.0 2888.0 6966.0 4320.0 5600.0 953.0 909.0 9600.0 17670.0 3806.0 937.0 624.6 1174.0 849.5 3585.0 1794.7 1790.3 2500.0 2600.0

Forecasting with Prophet

I've created this forecast prophet function as a light wrapper around the prophet forecasting library to do all the menial formatting required within the library. So now you can just input a simple time series and get back a time series

Forecasting with Prophet single series

In [55]:
def forecast_prophet(ts, forecast_periods=18,changepoint_prior_scale=.25,freq='MS',return_df=False):
    
    """forecasts a time series object using FB prophet model (https://facebook.github.io/prophet/)
    Note: forecast_ts returns fitted period values as well as forecasted period values
    
    if return_df is True need to get results like this: 
    
        tseries, prophet_model, forc_df = forecast_prophet(df['series1'],return_df=True)
    
        Args:
            ts: time series object
            forecast_periods: periods to forecast for
            changepoint_prior_scale: changepoint value (lower value means trend is more flexible, easier to change)
            freq: frequency of the data ('MS' is Monthly Start could use M for month end)
            return_df: if set to true, returns full forecast dataframe with upper and lower values

        Returns:
             forecast_ts: time series of fitted and forecasted values
             forecast_df: full forecast dataframe to be used for plotting or for upper/lower conf interval values
             model: prophet fitted model

    """
    
    #find the start of the time series
    start_ts = ts[ts.notna()].index[0]
    #find the end of the time series
    end_ts = ts[ts.notna()].index[-1]
    #extract actual time series
    ts = ts.loc[start_ts:end_ts]
    
    model_ts = ts.reset_index()
    model_ts.columns = ['ds', 'y']
    
    model = Prophet(changepoint_prior_scale=changepoint_prior_scale)
    model.fit(model_ts)

    future = model.make_future_dataframe(periods=forecast_periods, freq=freq)
    forecast_df = model.predict(future)
    
    forecast_ts = forecast_df.set_index('ds')['yhat']

    #date range is min of current series to max of all series (we'll filter out dataframe of series we dont want eventually so that we dont have to do this)

    if return_df==False:
        return forecast_ts
    
    return forecast_ts, model, forecast_df

#example of using results to plot
tseries, prophet_model, fc_df = forecast_prophet(dated_df['M2'],return_df=True,forecast_periods=18)
#prophet_model.plot(fc_df);

#example for normal tseries usage
#tseries = forecast_prophet(dated_df['M2'],return_df=True,forecast_periods=18)
#tseries.plot()
/opt/conda/lib/python3.5/site-packages/pystan/misc.py:399: FutureWarning:

Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`.

/opt/conda/lib/python3.5/site-packages/fbprophet/forecaster.py:1010: FutureWarning:

Method .as_matrix will be removed in a future version. Use .values instead.

/opt/conda/lib/python3.5/site-packages/fbprophet/forecaster.py:1134: FutureWarning:

Method .as_matrix will be removed in a future version. Use .values instead.

In [71]:
trace_fitted = go.Scatter(
          x=tseries.index,
          y=tseries.values,
          name = 'fitted+predicted')

trace_actual = go.Scatter(
          x=dated_df['M2'].index,
          y=dated_df['M2'].values,
          name = 'actual')

data = [trace_actual,trace_fitted]

layout = go.Layout(
    title='fitted+predicted vs actual values for M2 Series by Prophet Model',
    xaxis=dict(
        title='Date'))
fig = go.Figure(data=data, layout=layout)
    
iplot(fig)

So that's forecasting for a single series, now let's look at the time difference for forecasting all series with 1 worker vs in parallel with 6 workers.

Important Note: One detail with the prophet function is that it is implemented with pystan which locks the GIL, so you won't get any benefit from having more threads than cores in this case, although for other operations that do not lock the GIL you may get speedups from having more threads per worker

Also we're gonna want to disable these annoying prophet warnings so we run this cell below

For timing purposes, we're just gonna run prophet on 20 columns because it can take a long time since our time series length is pretty big here. In my experience I've run prophet on monthly time series with 3 years historical data and predict 2 years out. This was done for around 1000 time series, and it takes around 15 minutes using 4 cores

In [21]:
import logging
#disables fbprophet warning messages being printed to stdout
logging.getLogger('fbprophet').setLevel(logging.WARNING)

The prophet model and some of the R models can be quite slow, so we are only going to use the first 15 columns to demonstrate the difference

Forecasting with Prophet multiple series 1 worker

In [22]:
%%time
output_fc = []
for i in dated_df:
    forecasted_series = forecast_prophet(dated_df[i])
    output_fc.append(forecasted_series)
    
full_fc_df = pd.concat(output_fc,ignore_index=False,keys=dated_df.columns,axis=1)
#full_fc_df.head()
/opt/conda/lib/python3.5/site-packages/pystan/misc.py:399: FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`.
  elif np.issubdtype(np.asarray(v).dtype, float):
/opt/conda/lib/python3.5/site-packages/fbprophet/forecaster.py:1010: FutureWarning: Method .as_matrix will be removed in a future version. Use .values instead.
  X = seasonal_features.as_matrix()
/opt/conda/lib/python3.5/site-packages/fbprophet/forecaster.py:1134: FutureWarning: Method .as_matrix will be removed in a future version. Use .values instead.
  seasonal = np.matmul(seasonal_features.as_matrix(), beta) * self.y_scale
CPU times: user 1min 14s, sys: 2min 5s, total: 3min 19s
Wall time: 1min 2s
In [23]:
full_fc_df.head()
Out[23]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
ds
1941-11-01 7151.117178 2299.541298 2703.376974 7237.477919 4235.635781 5424.325205 870.295576 822.616703 8172.040339 17815.264651 3634.592612 697.422031 380.851551 1097.989343 819.628171 3501.959267 1840.213866 1677.670046 2274.452298 2621.011123
1941-12-01 7715.664272 2227.930023 2669.574481 7216.908706 4261.552041 5409.555657 885.721104 835.562548 8182.462371 17709.267388 3828.620835 725.863443 411.818227 1217.144655 826.373106 3650.730860 1970.457037 1695.210061 3068.412085 4008.572296
1942-01-01 7716.000012 2014.627094 2740.489705 7063.803930 4256.661384 5425.700955 900.190796 844.253806 8044.381186 17638.899078 3898.833792 765.410843 451.501421 1240.424659 833.663323 3713.198168 2009.379707 1717.520264 2512.177489 2137.470946
1942-02-01 6726.903133 1988.816299 2836.474972 6996.024069 4304.721952 5402.512350 934.599105 879.321849 8036.547253 17558.010367 3920.155422 804.558662 488.412813 1245.797556 841.084085 3717.631044 1994.408320 1735.647987 2549.339469 2202.639077
1942-03-01 6382.020985 2014.233452 2800.110352 7023.647198 4271.201283 5458.211704 948.196979 901.957793 8492.779041 17499.059084 3940.989535 837.505134 523.240462 1203.559731 851.552696 3780.396347 2005.913434 1785.959845 2714.990265 2361.687380

Forecasting with Prophet multiple series 6 workers with Dask delayed

In [24]:
%%time
output_fc = []
for i in dated_df:
    forecasted_series = dask.delayed(forecast_prophet)(dated_df[i])
    output_fc.append(forecasted_series)
    
total = dask.delayed(output_fc).compute()

full_fc_df = pd.concat(total,ignore_index=False,keys=dated_df.columns,axis=1)
#full_fc_df.head()
CPU times: user 1.62 s, sys: 265 ms, total: 1.89 s
Wall time: 34.9 s
In [25]:
full_fc_df.head()
Out[25]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
ds
1941-11-01 7151.117178 2299.541298 2703.376974 7237.477919 4235.635781 5424.325205 870.295576 822.616703 8172.040339 17815.264651 3634.592612 697.422031 380.851551 1097.989343 819.628171 3501.959267 1840.213866 1677.670046 2274.452298 2621.011123
1941-12-01 7715.664272 2227.930023 2669.574481 7216.908706 4261.552041 5409.555657 885.721104 835.562548 8182.462371 17709.267388 3828.620835 725.863443 411.818227 1217.144655 826.373106 3650.730860 1970.457037 1695.210061 3068.412085 4008.572296
1942-01-01 7716.000012 2014.627094 2740.489705 7063.803930 4256.661384 5425.700955 900.190796 844.253806 8044.381186 17638.899078 3898.833792 765.410843 451.501421 1240.424659 833.663323 3713.198168 2009.379707 1717.520264 2512.177489 2137.470946
1942-02-01 6726.903133 1988.816299 2836.474972 6996.024069 4304.721952 5402.512350 934.599105 879.321849 8036.547253 17558.010367 3920.155422 804.558662 488.412813 1245.797556 841.084085 3717.631044 1994.408320 1735.647987 2549.339469 2202.639077
1942-03-01 6382.020985 2014.233452 2800.110352 7023.647198 4271.201283 5458.211704 948.196979 901.957793 8492.779041 17499.059084 3940.989535 837.505134 523.240462 1203.559731 851.552696 3780.396347 2005.913434 1785.959845 2714.990265 2361.687380

Forecasting using R forecast package models

Now we'll look at some foecasting models from the R forecast package. The reason we're using this package is it's the most comprehensive package for forecasting time series that I've seen. Most of the python libraries focus only on ARIMA models which actually perform worse than other methods such as ets and theta on many practical business problems. These other models have very little to no support in python currently and recreating all the work of this R package would take a long time.

Below I've created a model dictionary for some of the forecasting functions in the R forecast package. It works with most of them, I don't show tbats here because it can be slow and it's better for daily data anyways, and nnetar has some issues with concurrency. For a full list, reference: https://www.rdocumentation.org/packages/forecast/versions/8.3

In [26]:
R_model_dict = {'ets_damped':'ets(rdata,damped=TRUE)',
                'ets':'ets(rdata)',
                'auto_arima_D_1':'auto.arima(rdata,D=1)',
                'auto_arima':'auto.arima(rdata)',
                'thetaf':'thetaf(rdata',
                'splinef':'splinef(rdata',
                'meanf':'meanf(rdata',
                'rwf':'rwf(rdata'}

R_model_dict['ets_damped']
Out[26]:
'ets(rdata,damped=TRUE)'

I've also created this forecast R function to be able to take in a python series and forecast a model in R and return the full fitted + prediction series to you in python series format. This function is not very robust and may not handle every case, but it works quite well for the monthly case that I use the most.

You can pass the model strings from the above dict into the forecast_R function as the model parameter to specify your model

Forecasting with R single series

In [68]:
def forecast_R(time_series,model=R_model_dict['thetaf'],forecast_periods=18, freq=12,confidence_level=80.0,return_interval=False):
    
    """forecasts a time series object using R models in forecast package (https://www.rdocumentation.org/packages/forecast/versions/8.1)
    Note: forecast_ts returns fitted period values as well as forecasted period values
    -Need to make sure forecast function happens in R so dask delayed works correctly
    
        
        Args:
            time_series: time series object
            model: pass in R forecast model as string that you want to evaluate, make sure you leave rdata as rdata in all calls
            forecast_periods: periods to forecast for
            freq: frequency of time series (12 is monthly)

        Returns:
             full_series: time series of fitted and forecasted values

    """
    #set frequency string to monthly start if frequency is 12
    freq_dict = {12:'MS',1:'Y',365:'D',4:'QS'}
    freq_string = freq_dict[freq]
    
    #find the start of the time series
    start_ts = time_series[time_series.notna()].index[0]
    #find the end of the time series
    end_ts = time_series[time_series.notna()].index[-1]
    #extract actual time series
    time_series = time_series.loc[start_ts:end_ts]
    #converts to ts object in R
    time_series_R = robjects.IntVector(time_series)
    rdata=ts(time_series_R,frequency=freq)

    
    #if forecast model ends in f, assume its a direct forecasting object so handle it differently, no need to fit
    if model.split('(')[0][-1] == 'f':
        rstring="""
         function(rdata){
         library(forecast)
         forecasted_data<-%s,h=%s,level=c(%s))
         return(list(fitted_series=forecasted_data$fitted, predicted_series=forecasted_data$mean,lower_PI=forecasted_data$lower,upperPI=forecasted_data$upper))
         }
        """ % (model,forecast_periods,confidence_level)
        
    else:
        rstring="""
         function(rdata){
         library(forecast)
         fitted_model<-%s
         forecasted_data<-forecast(fitted_model,h=%s,level=c(%s))
         return(list(fitted_series=forecasted_data$fitted, predicted_series=forecasted_data$mean,lowerpi=forecasted_data$lower,upperpi=forecasted_data$upper))
         }
        """ % (model,forecast_periods,confidence_level)
        
    rfunc=robjects.r(rstring)
    #gets fitted and predicted series, and lower and upper prediction intervals from R model
    fitted_series,predicted_series,lowerpi,upperpi=rfunc(rdata)
    #convert IntVector representation to pandas series
    fitted_series = pd.Series(pandas2ri.ri2py(fitted_series),index=pd.date_range(start=time_series[time_series.notnull()].index.min(),periods=len(time_series[time_series.notnull()]),freq=freq_string))
    predicted_series = pandas2ri.ri2py(predicted_series)
    #get index for series
    index=pd.date_range(start=time_series.index.max(),periods=len(predicted_series)+1,freq=freq_string)[1:]

    try:
        predicted_series = pd.Series(predicted_series,index=index)
    except:
        #need for splinef because returns array with 2 brackets
        predicted_series = pd.Series(predicted_series.ravel(),index=index)
        
    full_series = fitted_series.append(predicted_series)
    
    #if return interval set to True, then you can get the lower and upper prediction intervals back also                       
    if return_interval == False:
        return full_series
    return full_series, lowerpi, upperpi

full_series = forecast_R(dated_df['M2'])
full_series.head()
#full_series.plot()
#dated_df['M2'].plot()
Out[68]:
1941-11-01    2295.098585
1941-12-01    2238.945391
1942-01-01    2265.844066
1942-02-01    2478.312581
1942-03-01    2422.019009
dtype: float64
In [70]:
trace_fitted = go.Scatter(
          x=full_series.index,
          y=full_series.values,
          name = 'fitted+predicted')

trace_actual = go.Scatter(
          x=dated_df['M2'].index,
          y=dated_df['M2'].values,
          name = 'actual')

data = [trace_actual,trace_fitted]

layout = go.Layout(
    title='fitted+predicted vs actual values for M2 Series by R Model',
    xaxis=dict(
        title='Date'))
fig = go.Figure(data=data, layout=layout)
    
iplot(fig)

Forecasting with R multiple series 1 worker

In [35]:
%%time
output_fc_R = []
for i in dated_df:
    forecasted_series = forecast_R(dated_df[i],model=R_model_dict['ets_damped'])
    output_fc_R.append(forecasted_series)
    
full_fc_R = pd.concat(output_fc_R,ignore_index=False,keys=dated_df.columns,axis=1)
CPU times: user 10.9 s, sys: 26.7 ms, total: 10.9 s
Wall time: 10.9 s
In [36]:
full_fc_R.head()
Out[36]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
1941-11-01 8244.243086 2576.956081 2684.473761 7172.063927 4284.811259 5622.042314 869.136880 828.727280 7942.893765 17701.426213 3706.922931 949.688458 616.057551 1130.135500 815.493416 3528.689816 1749.145424 1750.016082 2128.666066 2795.683760
1941-12-01 8499.322282 2463.241897 2637.813995 7249.267798 4246.297562 5581.003180 883.677958 810.433958 8778.302286 16989.277931 3815.900119 941.012880 633.288966 1164.631007 850.525568 3556.854162 1775.290736 1759.916391 3223.660161 3774.285115
1942-01-01 8294.961608 2215.902037 2688.356758 7206.040499 4279.325170 5677.855141 913.039928 846.635697 9403.799069 17181.007276 3840.156538 971.124391 640.740229 1172.351041 848.007280 3549.068663 1792.857209 1754.918537 2434.820900 2992.548893
1942-02-01 7400.543575 2406.784952 2866.752115 7019.833102 4267.470940 5707.087998 942.487763 859.624308 9529.987203 17427.309199 3848.832520 947.003941 637.936165 1177.862761 844.389307 3533.132323 1804.749207 1773.523876 2650.622049 2440.056481
1942-03-01 7089.691170 2359.974636 2800.815718 7029.040178 4304.944287 5735.166065 949.230949 895.191353 10269.473845 17684.880512 3840.338829 970.882784 635.965648 1178.359003 853.987545 3581.259991 1794.456126 1808.208658 2848.403140 2561.289625

Forecasting with R multiple series 6 workers

In [37]:
%%time
output_fc_R = []
for i in dated_df:
    forecasted_series = dask.delayed(forecast_R)(dated_df[i],model=R_model_dict['ets_damped'])
    output_fc_R.append(forecasted_series)
    
total = dask.delayed(output_fc_R).compute()

full_fc_R = pd.concat(total,ignore_index=False,keys=dated_df.columns,axis=1)
CPU times: user 315 ms, sys: 91.2 ms, total: 407 ms
Wall time: 4.16 s
In [38]:
full_fc_R.head()
Out[38]:
V1 M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20
1941-11-01 8244.243086 2576.956081 2684.473761 7172.063927 4284.811259 5622.042314 869.136880 828.727280 7942.893765 17701.426213 3706.922931 949.688458 616.057551 1130.135500 815.493416 3528.689816 1749.145424 1750.016082 2128.666066 2795.683760
1941-12-01 8499.322282 2463.241897 2637.813995 7249.267798 4246.297562 5581.003180 883.677958 810.433958 8778.302286 16989.277931 3815.900119 941.012880 633.288966 1164.631007 850.525568 3556.854162 1775.290736 1759.916391 3223.660161 3774.285115
1942-01-01 8294.961608 2215.902037 2688.356758 7206.040499 4279.325170 5677.855141 913.039928 846.635697 9403.799069 17181.007276 3840.156538 971.124391 640.740229 1172.351041 848.007280 3549.068663 1792.857209 1754.918537 2434.820900 2992.548893
1942-02-01 7400.543575 2406.784952 2866.752115 7019.833102 4267.470940 5707.087998 942.487763 859.624308 9529.987203 17427.309199 3848.832520 947.003941 637.936165 1177.862761 844.389307 3533.132323 1804.749207 1773.523876 2650.622049 2440.056481
1942-03-01 7089.691170 2359.974636 2800.815718 7029.040178 4304.944287 5735.166065 949.230949 895.191353 10269.473845 17684.880512 3840.338829 970.882784 635.965648 1178.359003 853.987545 3581.259991 1794.456126 1808.208658 2848.403140 2561.289625
In [39]:
# we just shorten the R model dict to these 2 for demonstration purposes
R_model_dict = {'ets_damped':'ets(rdata,damped=TRUE)',
                'thetaf':'thetaf(rdata'}

R_model_dict['ets_damped']
Out[39]:
'ets(rdata,damped=TRUE)'

Forecasting with R multiple models and multiple series 1 workers

Note for these two cells where we call forecast_R we are going to get back our dataframe in long instead of wide format. This is where our framework sort of breaks down, because now we want to see our data by model name also. You could also have a dict of the dataframes or multi-index the dataframe, but personally I just prefer the long format at this point because that's how I usually insert the data into a Database.

In [40]:
%%time
list_of_dfs = []
for model in enumerate(sorted(R_model_dict)):
    print(model)
    output = []
    for i in dated_df:
        #print(i)
        forecasted_series = forecast_R(dated_df[i],model=R_model_dict[model[1]])
        output.append(forecasted_series)

    full_df = pd.concat(output,ignore_index=False,keys=dated_df.columns)
    
    final_model_df = full_df.to_frame().reset_index()
    final_model_df.columns = ['Series','Date','value_variable']
    #add model Name
    final_model_df['Model_name'] = model[1]

    list_of_dfs.append(final_model_df)
final_df = pd.concat(list_of_dfs)
(0, 'ets_damped')
(1, 'thetaf')
CPU times: user 10.9 s, sys: 21.5 ms, total: 11 s
Wall time: 10.9 s
In [41]:
final_df.head()
Out[41]:
Series Date value_variable Model_name
0 M1 1941-11-01 8244.243086 ets_damped
1 M1 1941-12-01 8499.322282 ets_damped
2 M1 1942-01-01 8294.961608 ets_damped
3 M1 1942-02-01 7400.543575 ets_damped
4 M1 1942-03-01 7089.691170 ets_damped
In [42]:
final_df.tail()
Out[42]:
Series Date value_variable Model_name
10466 M20 1957-06-01 3832.063481 thetaf
10467 M20 1957-07-01 3831.406002 thetaf
10468 M20 1957-08-01 3830.748523 thetaf
10469 M20 1957-09-01 3830.091044 thetaf
10470 M20 1957-10-01 3829.433565 thetaf

Forecasting with R multiple models and multiple series 6 workers

In [43]:
%%time
list_of_dfs = []
for model in enumerate(sorted(R_model_dict)):
    print(model)
    output = []
    for i in dated_df:
        #print(i)
        forecasted_series = dask.delayed(forecast_R)(dated_df[i],model=R_model_dict[model[1]])
        output.append(forecasted_series)

    total = dask.delayed(output).compute()

    full_df = pd.concat(total,ignore_index=False,keys=dated_df.columns)
    
    final_model_df = full_df.to_frame().reset_index()
    final_model_df.columns = ['Series','Date','value_variable']
    #add model Name
    final_model_df['Model_name'] = model[1]

    list_of_dfs.append(final_model_df)
    final_df = pd.concat(list_of_dfs)
(0, 'ets_damped')
(1, 'thetaf')
CPU times: user 385 ms, sys: 48.7 ms, total: 434 ms
Wall time: 2.85 s
In [44]:
final_df.head()
Out[44]:
Series Date value_variable Model_name
0 M1 1941-11-01 8244.243086 ets_damped
1 M1 1941-12-01 8499.322282 ets_damped
2 M1 1942-01-01 8294.961608 ets_damped
3 M1 1942-02-01 7400.543575 ets_damped
4 M1 1942-03-01 7089.691170 ets_damped
In [45]:
final_df.tail()
Out[45]:
Series Date value_variable Model_name
10466 M20 1957-06-01 3832.063481 thetaf
10467 M20 1957-07-01 3831.406002 thetaf
10468 M20 1957-08-01 3830.748523 thetaf
10469 M20 1957-09-01 3830.091044 thetaf
10470 M20 1957-10-01 3829.433565 thetaf

Calculating error Measures in Python

reference

for more info about forecasting accuracy measures

I'm gonna make a small wrapper function similar to the accuracy function call in R to give summary accuracy measures in Python

Note: Thanks to Pranav for the tsmetrics library

In [46]:
from tabulate import tabulate
import numpy as np

def accuracy(actual,predicted):
    
    """
    Args:
        actual: actual values as numpy array or time series
        predicted: predicted values as numpy array or time series
        
    Returns:
        accuracy_dict: dictionary of accuracy measures
        
    
    prints accuracy measures for forecasting and returns them as a dictionary
    """
    
    #convert to np array if series
    try:
        actual = actual.values
    except:
        pass
    try:
        predicted = predicted.values
    except:
        pass
    actual = actual.astype(float)
    predicted = predicted.astype(float)
    
    #if any nan values, convert to 0
    actual[np.isnan(actual)] = 0.0
    predicted[np.isnan(predicted)] = 0.0
    # ----------------------------Scale independent metrics------------------------

    # The mean absolute percentage error (MAPE), is a measure of prediction
    # accuracy of a forecasting method in statistics. The 'min_val' variable is
    # used to fill zeros in test data, if any. Filling zero with a constant helps
    # to avoid division by zero error
    def mean_absolute_percentage_error(y_true, y_pred, min_val=1):
        y_true = y_true + min_val
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100


    # SMAPE is an alternative for MAPE when there are zeros in the testing data. It
    # scales the absolute percentage by the sum of forecast and observed values
    def s_mean_absolute_percentage_error(y_true, y_pred):
        return np.mean(np.abs((y_true - y_pred) / ((y_true + y_pred)/2))) * 100

    #returns average error
    def mean_error(y_true,y_pred):
        return np.mean(y_true - y_pred)
    
    # MAE is used to measure how close the forecast is to observed value
    def mean_absolute_error(y_true, y_pred):
        return np.mean(abs(y_true - y_pred))
    
    # -----------------------------Scale dependent metrics-------------------------


    # In statistics, the mean squared error (MSE) of an estimator measures the
    # average of the squares of the errors. The squaring is necessary to remove any
    # negative signs. It also gives more weight to larger differences
    def mean_squared_error(y_true, y_pred):
        return np.mean((y_true - y_pred)**2)


    # RMSE is the standard deviation of the residuals. It shares the same
    # properties as MSE. The square root is used to dampen the magnitude of errors
    # caused by squaring them.
    def root_mean_squared_error(y_true, y_pred):
        return np.sqrt(np.mean((y_true - y_pred)**2))


    # It measures the total squared deviation of forecasted observations, from the
    # observed values
    def sum_of_squared_error(y_true, y_pred):
        return np.sum((y_true - y_pred)**2)


    # Thiels U accuracy measure (lies between 0  and 1, closer to 0 is more accurate)
    def theil_u_statistic(y_true, y_pred):
        return np.sqrt(np.sum((y_pred - y_true)**2)/np.sum(y_true**2))
    
    def autocorrelation_lag_1(y_true, y_pred, lag=1):
        error = y_true - y_pred
        # Slice the relevant subseries based on the lag
        y1 = error[:(len(error)-lag)]
        y2 = error[lag:]
        # Subtract the mean of the whole series x to calculate Cov
        sum_product = np.sum((y1-np.mean(error))*(y2-np.mean(error)))
        # Normalize with var of whole series
        return sum_product / ((len(error) - lag) * np.var(error))

    MAPE = mean_absolute_percentage_error(actual,predicted)
    SMAPE = s_mean_absolute_percentage_error(actual,predicted)
    ME = mean_error(actual,predicted)
    MAE = mean_absolute_error(actual,predicted)
    MSE = mean_squared_error(actual,predicted)
    RMSE = root_mean_squared_error(actual,predicted)
    SSE = sum_of_squared_error(actual,predicted)
    ThielsU = theil_u_statistic(actual,predicted)
    ACF1 = autocorrelation_lag_1(actual,predicted)
    
    print(tabulate([[MAPE,SMAPE,ME,MAE,MSE,RMSE,ThielsU,ACF1]], headers=['MAPE','SMAPE','ME','MAE','MSE','RMSE','ThielsU','ACF1']))
    
    accuracy_dict = {'MAPE':MAPE,'SMAPE':SMAPE,'ME':ME,'MAE':MAE,'MSE':MSE,'RMSE':RMSE,'ThielsU':ThielsU,'ACF1':ACF1}
    
    #return accuracy_dict

NOTE: THIS IS BAD PRACTICE. DONT DO THIS.

I am only doing this for demonstration purposes. I am calculating the error measures based on the fitted vs actual values. But THIS IS NOT OUT OF SAMPLE. If you are using these error measures for model selection, you should be looking at the forecasted vs ytest values, or do some cross validation with a hold out set. Again I cannot stress this enough, do not do this. You are very likely to overfit your series if you choose your model based on in sample error measures, and it's already easy to overfit in forecasting scenarios so this just makes it doubly bad

In [47]:
mask = (final_df['Series']=='M2') & (final_df['Model_name']=='thetaf') & (final_df['Date']<'1980-12-01')
M2_full_df = final_df[mask]
M2_fitted_series = M2_full_df[['Date','value_variable']].set_index('Date')
M2_fitted_series.head()
Out[47]:
value_variable
Date
1941-11-01 2295.098585
1941-12-01 2238.945391
1942-01-01 2265.844066
1942-02-01 2478.312581
1942-03-01 2422.019009
In [48]:
actual_series = dated_df['M2'][dated_df['M2'].notna()]
actual_series.head()
Out[48]:
1941-11-01    2440.0
1941-12-01    2490.0
1942-01-01    2710.0
1942-02-01    2290.0
1942-03-01    1960.0
Freq: MS, Name: M2, dtype: float64

Below we see the accuracy measures for our fitted vs actual values for M2 series. We could also calculate the error measures in parallel with dask, although usually this calculates so quickly already it's not worth it, but you could fairly easily

In [49]:
accuracy(actual_series.values,M2_fitted_series.values.ravel())
   MAPE    SMAPE        ME      MAE      MSE     RMSE    ThielsU      ACF1
-------  -------  --------  -------  -------  -------  ---------  --------
10.9805  10.8048  -1.24349  243.599  93130.2  305.172   0.130076  0.077302

Plotting our predictions

Finally let's plot the actual, fitted, and forecasted series in a graph. This is especially useful for univariate time series because sometimes the error measures can be misleading

In [51]:
# Create traces
#get predicted values
mask = (final_df['Series']=='M2') & (final_df['Model_name']=='thetaf') & (final_df['Date']>='1980-12-01')
M2_full_df = final_df[mask]
M2_predicted_series = M2_full_df[['Date','value_variable']].set_index('Date')

#call forecast_R function again to get prediction interval for the series
full_series,lowerpi,upperpi = forecast_R(dated_df['M2'],return_interval = True)

trace_actuals = go.Scatter(
    x = actual_series.index,
    y = actual_series.values,
    mode = 'lines',
    name = 'actual'
)
trace_fitted = go.Scatter(
    x = M2_fitted_series.index,
    y = M2_fitted_series.values.ravel(),
    mode = 'lines+markers',
    name = 'fitted',
    opacity=0.8
)

trace_predicted = go.Scatter(
    x = M2_predicted_series.index,
    y = M2_predicted_series.values.ravel(),
    mode='lines',
    name = 'predicted'
)

trace_lower = go.Scatter(
    x=M2_predicted_series.index,
    y=np.array(lowerpi).ravel(),
    fill= None,
    mode='lines',
    name='Lower PI (80%)',
    line=dict(
        color='lightgreen',
    )
)
trace_upper = go.Scatter(
    x=M2_predicted_series.index,
    y=np.array(upperpi).ravel(),
    fill='tonexty',
    mode='lines',
    name='Upper PI (80%)',
    line=dict(
        color='lightgreen',
    )
)

data = [trace_actuals, trace_fitted,trace_predicted,trace_lower,trace_upper]

# layout = dict(title = 'Actual v Fitted Values for M2 Series with Thetaf Model',
#               xaxis = dict(title = 'Date'),
#               yaxis = dict(title = 'value variable'),
#               )

layout = dict(
        title='Actual v Fitted Values for M2 Series with Thetaf Model',
        yaxis = dict(title = 'value variable'),
        xaxis=dict(
            title = 'Date',
            rangeselector=dict(
                buttons=list([
                    dict(count=3,
                         label='3y',
                         step='year',
                         stepmode='forward'),
                    dict(count=5,
                         label='5y',
                         step='year',
                         stepmode='forward'),
                    dict(count=10,
                         label='10y',
                         step='year',
                         stepmode='forward'),
                    dict(step='all')
                ])
            ),
            rangeslider=dict(),
            type='date'
        )
    )
fig = dict(data=data, layout=layout)
iplot(fig, show_link=False)

Presenting Magi

So that's basically the core components for parallized univariate time series forecasting in python for any custom function using dask delayed. Of course there's more to add like model selection using rolling origin cross validation, which can be added into the workflow.

I'm also working on an open source python library to wrap all these convenience functions into a more fully fledged open source library to do high level parallelized univariate time series forecasting. If you'd like to contribute please reach out and check it out on github: https://github.com/DavisTownsend/magi

Below I show how I've simplified the code using Magi. In under 10 lines of code, we can clean and forecast the time series in parallel using the auto arima model from the R forecast package with custom parameters, and then plot the accuracy metrics for the fitted v actuals values for each series in an interactive plotly graph

In [52]:
#import package functions
from magi.core import forecast
from magi.plotting import fc_plot, acc_plot
from magi.accuracy import accuracy

#connect to local dask cluster, already connected
#cluster = LocalCluster()
#client = Client(cluster)

#clean and forecasts dataframe of time series, then plot accuracy metrics by series
fc_obj = forecast(time_series=dated_df,forecast_periods=18,frequency=12)
forecast_df = fc_obj.tsclean().R(model='auto.arima(rdata,D=1,stationary=TRUE)',fit=True)
acc_df = accuracy(dated_df,forecast_df,separate_series=True)
acc_plot(acc_df,title='Accuracy Measures by Series',xTitle='Accuracy Measure',yTitle='Series')

and to forecast for a single series and plot the prediction it would just be:

In [53]:
fc_obj = forecast(time_series=dated_df['M2'],forecast_periods=18,frequency=12)
forecast_dic = fc_obj.tsclean().R(model='thetaf')
fc_plot(forecast_dic)