The dataset we will be using is the M4 Competition Dataset: https://www.m4.unic.ac.cy/the-dataset/ For more information about this competition or time series forecasting in general I'd recommend Rob Hyndman's Blog, he is the creator of the popular R forecast package. He also has a free online textbook about forecasting: https://www.otexts.org/fpp
Required packages in R:
Required packages in Python:
- Dask native parallel processing in python
- Dask Distributed distributed processing in python
- Pandas pandas dataframe
- Numpy numpy array
- Prophet facebook forecasting library
- rpy2 runs R code within python
- plotly interactive plotting library based on D3
- cufflinks high lever wrapper around plotly to directly use pandas dataframes
- magi high lever wrapper for parallel univariate time series forecasting
Required Dataset:
Getting the Data¶
First we'll import everything we need: note that we import the ts object and the forecast package from R using rpy2 interface, as well as activating pandas2ri, which makes dealing with the r-python integration much easier in this case
from rpy2.robjects.packages import importr
#get ts object as python object
from rpy2.robjects import pandas2ri
import rpy2.robjects as robjects
ts=robjects.r('ts')
#import forecast package
forecast=importr('forecast')
import pandas as pd
import numpy as np
from fbprophet import Prophet
from datetime import datetime,timedelta
from dateutil.relativedelta import relativedelta
import time
import zipfile
from dask.distributed import Client, LocalCluster
import dask
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
from plotly.graph_objs import Scatter, Figure, Layout
import plotly
import plotly.graph_objs as go
init_notebook_mode(connected=False)
Note: If you don't want to or are having troubles downloading the M4Dataset file, you can also create a dataframe of random time series by using the following code for demonstration purposes. However, I did design the notebook to work with the M4 Dataset so you will have to edit the cells to work with this dataframe instead if you want to run this yourself
def gen_ts(freq='MS',ncols=5,nrows=24,num_range=[0,10000],end_date='2018-04-01'):
colnames = []
for i in range(ncols):
colnames.append('ts'+str(i))
df = pd.DataFrame(np.random.randint(num_range[0],num_range[1],size=(nrows, ncols)), columns=colnames)
df_date_index = pd.date_range(end=end_date, periods=nrows, freq=freq)
df.index = df_date_index
return df
df=gen_ts()
df.head()
here we'll start our Dask Local cluster which we'll use for our parallelized time series forecasting later on in the notebook
cluster = LocalCluster()
client = Client(cluster)
client
We can see which files are included in this zip archive using .namelist()
zf = zipfile.ZipFile('M4DataSet.zip')
zf.namelist()
let's read one file in and check out the format of these files
monthly_df = pd.read_csv(zf.open('Monthly-train.csv'))
#need to transpose so that we have time series as columns and can index on V's as the dates
monthly_df = monthly_df.set_index('V1').T
monthly_df.head()
monthly_df.tail()
let's read in the info file also. For this tutorial we'll only be looking at the monthly data
M4_info = pd.read_csv('M4-info.csv')
M4_info.head()
So it looks like we have 48000 monthly time series in this data, with possibly 2795 months for each series. However we do see that alot of series do not line up with each other, so we'll have to keep this in mind
We can see from the cell below that all the monthly series have a horizon of 18 months so we can use this as our forecast horizon in the upcoming functions
M4_info[M4_info['SP']=='Monthly'].describe()
Formatting the Data¶
So these dataframes seem to be all unnamed columns with the time index in V1, we'll just set this index explicitly to make manipulating this data easier
monthly_df_indexed = monthly_df.copy()
monthly_df_indexed.index.name = 'Time'
monthly_df_indexed.head()
We'll stop here for a second and discuss the overall high level process we are going to follow and the format the data needs to be in for this process to work.
This dataframe format above is our data framework. That is, for everything that we are going to work we should structure our data structure as a pandas dataframe where the index is the time or date of the series and the columns are all the different time series for that particular dimension. If we wanted to forecast for say 1000 sensors that all have similar time components we could put them in one dataframe. But if you want to aggregate all sensor data and do a forecast on that too, that should be a separate dataframe. This framework is a very important concept because it enables everything else.
If you think about it, this situation is easily parallelizable because each column is indepedent of every other column in terms of forecasting for it (these series could be related and you could do analysis around that but that's a topic for discussion for another day. In terms of computation, each series can be considered independent)
Now that our dataframe is in the correct format our general workflow will entail four steps:
- Clean our dataframe
- compute model predictions on dataframe
- How to calculate error measures (only showing in sample accuracy in this notebook, but out of sample cross validation is recommended)
- Visualize predictions to get sense of what's going on
There's a lot more you may want to add in like cross validation and rolling origin accuracy measures for model selection, but for the purposes of this tutorial I just want to show how you can do the foundational forecasting workflows in this framework
Side Note: This framework isn't the most optimized, especially for this particular use case of the competition. It is more well suited for series that are all on the same time scale. However, i think it's the easiest way to understand what's going on, so that's why I'm doing it this way. You could also format your data long ways with a multiindex for each series or just no index and make it work if you know what you're doing. That would even simplify your code in this instance because you don't have to keep indexing by just the part of each series you care about.
Cleaning the Data¶
In the below cell I show how to extract one time series from this dataframe. If all your series were on the same time scale you would not need these extra steps, but in this case the series could stop and start at any point in the dataframe, so here we're finding the beginning and end, and extracting the series from those indexes
#find the start of the time series
start_ts = monthly_df_indexed['M600'][monthly_df_indexed['M600'].notna()].index[0]
#find the end of the time series
end_ts = monthly_df_indexed['M600'][monthly_df_indexed['M600'].notna()].index[-1]
#get full time series to be cleaned
ts_ex = monthly_df_indexed['M600'].loc[start_ts:end_ts]
ts_ex.head()
ts_ex.tail()
In practice cleaning your data in general is heavily dependent on your domain and your dataframe may require very little or a whole lot of business specific data cleansing to get it in a state ready for forecasting. This should be the most customized step of the process, I'll just show some general things here
Important:¶
Here we introduce another important concept to fit nicely in our parallel framework. Both our cleaning and modeling functions should take as input and return as output a time series (just a series in python that has a Datetime index)
def clean_ts(ts):
"""
takes time series as input and returns cleaned time series
*removes outliers
*linearly interpolates missing values
returns time series back
"""
#find the start of the time series
start_ts = ts[ts.notna()].index[0]
#find the end of the time series
end_ts = ts[ts.notna()].index[-1]
ts = ts.loc[start_ts:end_ts]
#sets outliers to NaN defined as a point that is 3 deviations away from rolling mean + 3*rolling standard deviation
ts = ts.where(~(abs(ts) > (ts.rolling(center=False,window=3).mean() + (ts.rolling(center=False,window=3,min_periods = 3).std() * 3))),np.nan)
#linearly interpolate any NaN values
ts = ts.interpolate()
#make sure any remaining null values are filled with 0's
ts = ts.fillna(0)
return ts
clean_ts(monthly_df_indexed['M600']).head()
for this example we're going to limit to just 20 series so the cells don't take too long to execute
let's give our first introduction to what you can do with dask here, we'll time the execution of applying this function across all dataframe columns in single threaded manner, and then use dask delayed to parallelize this operation
monthly_df_small = monthly_df_indexed.iloc[:,:20]
monthly_df_small.head()
Single threaded/ Single Worker for loop to clean the series¶
%%time
output = []
for series in monthly_df_small:
cleaned_series = clean_ts(monthly_df_small[series])
output.append(cleaned_series)
#turn list of series back into full dataframe
full_df = pd.concat(output,ignore_index=False,keys=monthly_df_small.columns,axis=1)
#sort by index to get back to format we want
sorted_index = full_df.index.to_series().str[1:].astype(int).sort_values()
cleaned_df = full_df.reindex(index=sorted_index.index)
cleaned_df.head()
parallelized for loop using dask.delayed to clean the series (6 workers)¶
%%time
output = []
for series in monthly_df_small:
cleaned_series = dask.delayed(clean_ts)(monthly_df_small[series])
output.append(cleaned_series)
total = dask.delayed(output).compute()
#turn list of series back into full dataframe
full_df = pd.concat(total,ignore_index=False,keys=monthly_df_small.columns,axis=1)
sorted_index = full_df.index.to_series().str[1:].astype(int).sort_values()
cleaned_df = full_df.reindex(index=sorted_index.index)
cleaned_df.head()
You see here there is actually not a big difference,and sometimes dask may actually be slower, but that's because the slowest part of this process is combining all the series back together with pd.concat and then sorting by the index and that is not parallelized. Also it's only 1000 columns, if we were to use the full dataframe or bigger data you would start to see more improvements. The rule of thumb I've noticed with dask is it's worth it over pandas if your computation time start taking in the order of minutes, for example longer than 20 minutes, you can start to see significant speedup times
The most exciting thing you should see from above though is how easy it is to turn your normal code into parallel code using Dask. You can take any custom workload and change a couple lines of code and have your for loop or whatever it is to be able to run in parallel using dask delayed. What's more, that cell where we defined our localCluster, all we have to do is replace that with the scheduler address of a remote cluster to have our computation run in a distributed cluster. it's that easy to go from testing locally to running remotely on your cluster. Of course, there's other considerations you want to keep in mind when deploying to a cluster like communication time between the nodes. You'd want to design your workflow to minimize data transfer between nodes and the way this is set up doesn't necessarily do that, but those are details for another time.
Forecasting¶
We will now show the actual forecasting part now. This part can benefit the most from parallelism as it's computationally expensive. I have created a couple functions below for Prophet library as well as R forecasting functions, which take in a time series object as input, fit and predict using a specified model, and return a time series object back These functions are what we will use to iterate over our dataframe and fit the model to each series
these functions require actual dates to work which you should normally have for series. This competiton does not give us dates so we are gonna make some up as a workaround and then reset to the V numbering afterwards. We will need to use pd.date_range to generate these
dates = pd.date_range(end='4/1/2018', periods=len(cleaned_df),freq='MS')
dated_df = cleaned_df.iloc[:len(cleaned_df),:].set_index(dates)
dated_df.head()
Forecasting with Prophet¶
I've created this forecast prophet function as a light wrapper around the prophet forecasting library to do all the menial formatting required within the library. So now you can just input a simple time series and get back a time series
Forecasting with Prophet single series¶
def forecast_prophet(ts, forecast_periods=18,changepoint_prior_scale=.25,freq='MS',return_df=False):
"""forecasts a time series object using FB prophet model (https://facebook.github.io/prophet/)
Note: forecast_ts returns fitted period values as well as forecasted period values
if return_df is True need to get results like this:
tseries, prophet_model, forc_df = forecast_prophet(df['series1'],return_df=True)
Args:
ts: time series object
forecast_periods: periods to forecast for
changepoint_prior_scale: changepoint value (lower value means trend is more flexible, easier to change)
freq: frequency of the data ('MS' is Monthly Start could use M for month end)
return_df: if set to true, returns full forecast dataframe with upper and lower values
Returns:
forecast_ts: time series of fitted and forecasted values
forecast_df: full forecast dataframe to be used for plotting or for upper/lower conf interval values
model: prophet fitted model
"""
#find the start of the time series
start_ts = ts[ts.notna()].index[0]
#find the end of the time series
end_ts = ts[ts.notna()].index[-1]
#extract actual time series
ts = ts.loc[start_ts:end_ts]
model_ts = ts.reset_index()
model_ts.columns = ['ds', 'y']
model = Prophet(changepoint_prior_scale=changepoint_prior_scale)
model.fit(model_ts)
future = model.make_future_dataframe(periods=forecast_periods, freq=freq)
forecast_df = model.predict(future)
forecast_ts = forecast_df.set_index('ds')['yhat']
#date range is min of current series to max of all series (we'll filter out dataframe of series we dont want eventually so that we dont have to do this)
if return_df==False:
return forecast_ts
return forecast_ts, model, forecast_df
#example of using results to plot
tseries, prophet_model, fc_df = forecast_prophet(dated_df['M2'],return_df=True,forecast_periods=18)
#prophet_model.plot(fc_df);
#example for normal tseries usage
#tseries = forecast_prophet(dated_df['M2'],return_df=True,forecast_periods=18)
#tseries.plot()
trace_fitted = go.Scatter(
x=tseries.index,
y=tseries.values,
name = 'fitted+predicted')
trace_actual = go.Scatter(
x=dated_df['M2'].index,
y=dated_df['M2'].values,
name = 'actual')
data = [trace_actual,trace_fitted]
layout = go.Layout(
title='fitted+predicted vs actual values for M2 Series by Prophet Model',
xaxis=dict(
title='Date'))
fig = go.Figure(data=data, layout=layout)
iplot(fig)
So that's forecasting for a single series, now let's look at the time difference for forecasting all series with 1 worker vs in parallel with 6 workers.
Important Note: One detail with the prophet function is that it is implemented with pystan which locks the GIL, so you won't get any benefit from having more threads than cores in this case, although for other operations that do not lock the GIL you may get speedups from having more threads per worker
Also we're gonna want to disable these annoying prophet warnings so we run this cell below
For timing purposes, we're just gonna run prophet on 20 columns because it can take a long time since our time series length is pretty big here. In my experience I've run prophet on monthly time series with 3 years historical data and predict 2 years out. This was done for around 1000 time series, and it takes around 15 minutes using 4 cores
import logging
#disables fbprophet warning messages being printed to stdout
logging.getLogger('fbprophet').setLevel(logging.WARNING)
The prophet model and some of the R models can be quite slow, so we are only going to use the first 15 columns to demonstrate the difference
Forecasting with Prophet multiple series 1 worker¶
%%time
output_fc = []
for i in dated_df:
forecasted_series = forecast_prophet(dated_df[i])
output_fc.append(forecasted_series)
full_fc_df = pd.concat(output_fc,ignore_index=False,keys=dated_df.columns,axis=1)
#full_fc_df.head()
full_fc_df.head()
Forecasting with Prophet multiple series 6 workers with Dask delayed¶
%%time
output_fc = []
for i in dated_df:
forecasted_series = dask.delayed(forecast_prophet)(dated_df[i])
output_fc.append(forecasted_series)
total = dask.delayed(output_fc).compute()
full_fc_df = pd.concat(total,ignore_index=False,keys=dated_df.columns,axis=1)
#full_fc_df.head()
full_fc_df.head()
Forecasting using R forecast package models¶
Now we'll look at some foecasting models from the R forecast package. The reason we're using this package is it's the most comprehensive package for forecasting time series that I've seen. Most of the python libraries focus only on ARIMA models which actually perform worse than other methods such as ets and theta on many practical business problems. These other models have very little to no support in python currently and recreating all the work of this R package would take a long time.
Below I've created a model dictionary for some of the forecasting functions in the R forecast package. It works with most of them, I don't show tbats here because it can be slow and it's better for daily data anyways, and nnetar has some issues with concurrency. For a full list, reference: https://www.rdocumentation.org/packages/forecast/versions/8.3
R_model_dict = {'ets_damped':'ets(rdata,damped=TRUE)',
'ets':'ets(rdata)',
'auto_arima_D_1':'auto.arima(rdata,D=1)',
'auto_arima':'auto.arima(rdata)',
'thetaf':'thetaf(rdata',
'splinef':'splinef(rdata',
'meanf':'meanf(rdata',
'rwf':'rwf(rdata'}
R_model_dict['ets_damped']
I've also created this forecast R function to be able to take in a python series and forecast a model in R and return the full fitted + prediction series to you in python series format. This function is not very robust and may not handle every case, but it works quite well for the monthly case that I use the most.
You can pass the model strings from the above dict into the forecast_R function as the model parameter to specify your model
Forecasting with R single series¶
def forecast_R(time_series,model=R_model_dict['thetaf'],forecast_periods=18, freq=12,confidence_level=80.0,return_interval=False):
"""forecasts a time series object using R models in forecast package (https://www.rdocumentation.org/packages/forecast/versions/8.1)
Note: forecast_ts returns fitted period values as well as forecasted period values
-Need to make sure forecast function happens in R so dask delayed works correctly
Args:
time_series: time series object
model: pass in R forecast model as string that you want to evaluate, make sure you leave rdata as rdata in all calls
forecast_periods: periods to forecast for
freq: frequency of time series (12 is monthly)
Returns:
full_series: time series of fitted and forecasted values
"""
#set frequency string to monthly start if frequency is 12
freq_dict = {12:'MS',1:'Y',365:'D',4:'QS'}
freq_string = freq_dict[freq]
#find the start of the time series
start_ts = time_series[time_series.notna()].index[0]
#find the end of the time series
end_ts = time_series[time_series.notna()].index[-1]
#extract actual time series
time_series = time_series.loc[start_ts:end_ts]
#converts to ts object in R
time_series_R = robjects.IntVector(time_series)
rdata=ts(time_series_R,frequency=freq)
#if forecast model ends in f, assume its a direct forecasting object so handle it differently, no need to fit
if model.split('(')[0][-1] == 'f':
rstring="""
function(rdata){
library(forecast)
forecasted_data<-%s,h=%s,level=c(%s))
return(list(fitted_series=forecasted_data$fitted, predicted_series=forecasted_data$mean,lower_PI=forecasted_data$lower,upperPI=forecasted_data$upper))
}
""" % (model,forecast_periods,confidence_level)
else:
rstring="""
function(rdata){
library(forecast)
fitted_model<-%s
forecasted_data<-forecast(fitted_model,h=%s,level=c(%s))
return(list(fitted_series=forecasted_data$fitted, predicted_series=forecasted_data$mean,lowerpi=forecasted_data$lower,upperpi=forecasted_data$upper))
}
""" % (model,forecast_periods,confidence_level)
rfunc=robjects.r(rstring)
#gets fitted and predicted series, and lower and upper prediction intervals from R model
fitted_series,predicted_series,lowerpi,upperpi=rfunc(rdata)
#convert IntVector representation to pandas series
fitted_series = pd.Series(pandas2ri.ri2py(fitted_series),index=pd.date_range(start=time_series[time_series.notnull()].index.min(),periods=len(time_series[time_series.notnull()]),freq=freq_string))
predicted_series = pandas2ri.ri2py(predicted_series)
#get index for series
index=pd.date_range(start=time_series.index.max(),periods=len(predicted_series)+1,freq=freq_string)[1:]
try:
predicted_series = pd.Series(predicted_series,index=index)
except:
#need for splinef because returns array with 2 brackets
predicted_series = pd.Series(predicted_series.ravel(),index=index)
full_series = fitted_series.append(predicted_series)
#if return interval set to True, then you can get the lower and upper prediction intervals back also
if return_interval == False:
return full_series
return full_series, lowerpi, upperpi
full_series = forecast_R(dated_df['M2'])
full_series.head()
#full_series.plot()
#dated_df['M2'].plot()
trace_fitted = go.Scatter(
x=full_series.index,
y=full_series.values,
name = 'fitted+predicted')
trace_actual = go.Scatter(
x=dated_df['M2'].index,
y=dated_df['M2'].values,
name = 'actual')
data = [trace_actual,trace_fitted]
layout = go.Layout(
title='fitted+predicted vs actual values for M2 Series by R Model',
xaxis=dict(
title='Date'))
fig = go.Figure(data=data, layout=layout)
iplot(fig)
Forecasting with R multiple series 1 worker¶
%%time
output_fc_R = []
for i in dated_df:
forecasted_series = forecast_R(dated_df[i],model=R_model_dict['ets_damped'])
output_fc_R.append(forecasted_series)
full_fc_R = pd.concat(output_fc_R,ignore_index=False,keys=dated_df.columns,axis=1)
full_fc_R.head()
Forecasting with R multiple series 6 workers¶
%%time
output_fc_R = []
for i in dated_df:
forecasted_series = dask.delayed(forecast_R)(dated_df[i],model=R_model_dict['ets_damped'])
output_fc_R.append(forecasted_series)
total = dask.delayed(output_fc_R).compute()
full_fc_R = pd.concat(total,ignore_index=False,keys=dated_df.columns,axis=1)
full_fc_R.head()
# we just shorten the R model dict to these 2 for demonstration purposes
R_model_dict = {'ets_damped':'ets(rdata,damped=TRUE)',
'thetaf':'thetaf(rdata'}
R_model_dict['ets_damped']
Forecasting with R multiple models and multiple series 1 workers¶
Note for these two cells where we call forecast_R we are going to get back our dataframe in long instead of wide format. This is where our framework sort of breaks down, because now we want to see our data by model name also. You could also have a dict of the dataframes or multi-index the dataframe, but personally I just prefer the long format at this point because that's how I usually insert the data into a Database.
%%time
list_of_dfs = []
for model in enumerate(sorted(R_model_dict)):
print(model)
output = []
for i in dated_df:
#print(i)
forecasted_series = forecast_R(dated_df[i],model=R_model_dict[model[1]])
output.append(forecasted_series)
full_df = pd.concat(output,ignore_index=False,keys=dated_df.columns)
final_model_df = full_df.to_frame().reset_index()
final_model_df.columns = ['Series','Date','value_variable']
#add model Name
final_model_df['Model_name'] = model[1]
list_of_dfs.append(final_model_df)
final_df = pd.concat(list_of_dfs)
final_df.head()
final_df.tail()
Forecasting with R multiple models and multiple series 6 workers¶
%%time
list_of_dfs = []
for model in enumerate(sorted(R_model_dict)):
print(model)
output = []
for i in dated_df:
#print(i)
forecasted_series = dask.delayed(forecast_R)(dated_df[i],model=R_model_dict[model[1]])
output.append(forecasted_series)
total = dask.delayed(output).compute()
full_df = pd.concat(total,ignore_index=False,keys=dated_df.columns)
final_model_df = full_df.to_frame().reset_index()
final_model_df.columns = ['Series','Date','value_variable']
#add model Name
final_model_df['Model_name'] = model[1]
list_of_dfs.append(final_model_df)
final_df = pd.concat(list_of_dfs)
final_df.head()
final_df.tail()
Calculating error Measures in Python¶
reference
- textbook: https://www.otexts.org/fpp/2/5
- paper: https://robjhyndman.com/papers/mase.pdf
for more info about forecasting accuracy measures
I'm gonna make a small wrapper function similar to the accuracy function call in R to give summary accuracy measures in Python
Note: Thanks to Pranav for the tsmetrics library
from tabulate import tabulate
import numpy as np
def accuracy(actual,predicted):
"""
Args:
actual: actual values as numpy array or time series
predicted: predicted values as numpy array or time series
Returns:
accuracy_dict: dictionary of accuracy measures
prints accuracy measures for forecasting and returns them as a dictionary
"""
#convert to np array if series
try:
actual = actual.values
except:
pass
try:
predicted = predicted.values
except:
pass
actual = actual.astype(float)
predicted = predicted.astype(float)
#if any nan values, convert to 0
actual[np.isnan(actual)] = 0.0
predicted[np.isnan(predicted)] = 0.0
# ----------------------------Scale independent metrics------------------------
# The mean absolute percentage error (MAPE), is a measure of prediction
# accuracy of a forecasting method in statistics. The 'min_val' variable is
# used to fill zeros in test data, if any. Filling zero with a constant helps
# to avoid division by zero error
def mean_absolute_percentage_error(y_true, y_pred, min_val=1):
y_true = y_true + min_val
return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
# SMAPE is an alternative for MAPE when there are zeros in the testing data. It
# scales the absolute percentage by the sum of forecast and observed values
def s_mean_absolute_percentage_error(y_true, y_pred):
return np.mean(np.abs((y_true - y_pred) / ((y_true + y_pred)/2))) * 100
#returns average error
def mean_error(y_true,y_pred):
return np.mean(y_true - y_pred)
# MAE is used to measure how close the forecast is to observed value
def mean_absolute_error(y_true, y_pred):
return np.mean(abs(y_true - y_pred))
# -----------------------------Scale dependent metrics-------------------------
# In statistics, the mean squared error (MSE) of an estimator measures the
# average of the squares of the errors. The squaring is necessary to remove any
# negative signs. It also gives more weight to larger differences
def mean_squared_error(y_true, y_pred):
return np.mean((y_true - y_pred)**2)
# RMSE is the standard deviation of the residuals. It shares the same
# properties as MSE. The square root is used to dampen the magnitude of errors
# caused by squaring them.
def root_mean_squared_error(y_true, y_pred):
return np.sqrt(np.mean((y_true - y_pred)**2))
# It measures the total squared deviation of forecasted observations, from the
# observed values
def sum_of_squared_error(y_true, y_pred):
return np.sum((y_true - y_pred)**2)
# Thiels U accuracy measure (lies between 0 and 1, closer to 0 is more accurate)
def theil_u_statistic(y_true, y_pred):
return np.sqrt(np.sum((y_pred - y_true)**2)/np.sum(y_true**2))
def autocorrelation_lag_1(y_true, y_pred, lag=1):
error = y_true - y_pred
# Slice the relevant subseries based on the lag
y1 = error[:(len(error)-lag)]
y2 = error[lag:]
# Subtract the mean of the whole series x to calculate Cov
sum_product = np.sum((y1-np.mean(error))*(y2-np.mean(error)))
# Normalize with var of whole series
return sum_product / ((len(error) - lag) * np.var(error))
MAPE = mean_absolute_percentage_error(actual,predicted)
SMAPE = s_mean_absolute_percentage_error(actual,predicted)
ME = mean_error(actual,predicted)
MAE = mean_absolute_error(actual,predicted)
MSE = mean_squared_error(actual,predicted)
RMSE = root_mean_squared_error(actual,predicted)
SSE = sum_of_squared_error(actual,predicted)
ThielsU = theil_u_statistic(actual,predicted)
ACF1 = autocorrelation_lag_1(actual,predicted)
print(tabulate([[MAPE,SMAPE,ME,MAE,MSE,RMSE,ThielsU,ACF1]], headers=['MAPE','SMAPE','ME','MAE','MSE','RMSE','ThielsU','ACF1']))
accuracy_dict = {'MAPE':MAPE,'SMAPE':SMAPE,'ME':ME,'MAE':MAE,'MSE':MSE,'RMSE':RMSE,'ThielsU':ThielsU,'ACF1':ACF1}
#return accuracy_dict
NOTE: THIS IS BAD PRACTICE. DONT DO THIS.¶
I am only doing this for demonstration purposes. I am calculating the error measures based on the fitted vs actual values. But THIS IS NOT OUT OF SAMPLE. If you are using these error measures for model selection, you should be looking at the forecasted vs ytest values, or do some cross validation with a hold out set. Again I cannot stress this enough, do not do this. You are very likely to overfit your series if you choose your model based on in sample error measures, and it's already easy to overfit in forecasting scenarios so this just makes it doubly bad
mask = (final_df['Series']=='M2') & (final_df['Model_name']=='thetaf') & (final_df['Date']<'1980-12-01')
M2_full_df = final_df[mask]
M2_fitted_series = M2_full_df[['Date','value_variable']].set_index('Date')
M2_fitted_series.head()
actual_series = dated_df['M2'][dated_df['M2'].notna()]
actual_series.head()
Below we see the accuracy measures for our fitted vs actual values for M2 series. We could also calculate the error measures in parallel with dask, although usually this calculates so quickly already it's not worth it, but you could fairly easily
accuracy(actual_series.values,M2_fitted_series.values.ravel())
Plotting our predictions¶
Finally let's plot the actual, fitted, and forecasted series in a graph. This is especially useful for univariate time series because sometimes the error measures can be misleading
# Create traces
#get predicted values
mask = (final_df['Series']=='M2') & (final_df['Model_name']=='thetaf') & (final_df['Date']>='1980-12-01')
M2_full_df = final_df[mask]
M2_predicted_series = M2_full_df[['Date','value_variable']].set_index('Date')
#call forecast_R function again to get prediction interval for the series
full_series,lowerpi,upperpi = forecast_R(dated_df['M2'],return_interval = True)
trace_actuals = go.Scatter(
x = actual_series.index,
y = actual_series.values,
mode = 'lines',
name = 'actual'
)
trace_fitted = go.Scatter(
x = M2_fitted_series.index,
y = M2_fitted_series.values.ravel(),
mode = 'lines+markers',
name = 'fitted',
opacity=0.8
)
trace_predicted = go.Scatter(
x = M2_predicted_series.index,
y = M2_predicted_series.values.ravel(),
mode='lines',
name = 'predicted'
)
trace_lower = go.Scatter(
x=M2_predicted_series.index,
y=np.array(lowerpi).ravel(),
fill= None,
mode='lines',
name='Lower PI (80%)',
line=dict(
color='lightgreen',
)
)
trace_upper = go.Scatter(
x=M2_predicted_series.index,
y=np.array(upperpi).ravel(),
fill='tonexty',
mode='lines',
name='Upper PI (80%)',
line=dict(
color='lightgreen',
)
)
data = [trace_actuals, trace_fitted,trace_predicted,trace_lower,trace_upper]
# layout = dict(title = 'Actual v Fitted Values for M2 Series with Thetaf Model',
# xaxis = dict(title = 'Date'),
# yaxis = dict(title = 'value variable'),
# )
layout = dict(
title='Actual v Fitted Values for M2 Series with Thetaf Model',
yaxis = dict(title = 'value variable'),
xaxis=dict(
title = 'Date',
rangeselector=dict(
buttons=list([
dict(count=3,
label='3y',
step='year',
stepmode='forward'),
dict(count=5,
label='5y',
step='year',
stepmode='forward'),
dict(count=10,
label='10y',
step='year',
stepmode='forward'),
dict(step='all')
])
),
rangeslider=dict(),
type='date'
)
)
fig = dict(data=data, layout=layout)
iplot(fig, show_link=False)
Presenting Magi¶
So that's basically the core components for parallized univariate time series forecasting in python for any custom function using dask delayed. Of course there's more to add like model selection using rolling origin cross validation, which can be added into the workflow.
I'm also working on an open source python library to wrap all these convenience functions into a more fully fledged open source library to do high level parallelized univariate time series forecasting. If you'd like to contribute please reach out and check it out on github: https://github.com/DavisTownsend/magi
Below I show how I've simplified the code using Magi. In under 10 lines of code, we can clean and forecast the time series in parallel using the auto arima model from the R forecast package with custom parameters, and then plot the accuracy metrics for the fitted v actuals values for each series in an interactive plotly graph
#import package functions
from magi.core import forecast
from magi.plotting import fc_plot, acc_plot
from magi.accuracy import accuracy
#connect to local dask cluster, already connected
#cluster = LocalCluster()
#client = Client(cluster)
#clean and forecasts dataframe of time series, then plot accuracy metrics by series
fc_obj = forecast(time_series=dated_df,forecast_periods=18,frequency=12)
forecast_df = fc_obj.tsclean().R(model='auto.arima(rdata,D=1,stationary=TRUE)',fit=True)
acc_df = accuracy(dated_df,forecast_df,separate_series=True)
acc_plot(acc_df,title='Accuracy Measures by Series',xTitle='Accuracy Measure',yTitle='Series')
and to forecast for a single series and plot the prediction it would just be:
fc_obj = forecast(time_series=dated_df['M2'],forecast_periods=18,frequency=12)
forecast_dic = fc_obj.tsclean().R(model='thetaf')
fc_plot(forecast_dic)