Python Code

# import libraries
import pandas as pd
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA
import matplotlib.pyplot as plt
# read data
data_raw = pd.read_csv("../posts/2024-10-02-ts-fundamentals-whats-a-time-series/example_ts_data.csv")

data_raw = (
    # select columns
    data_raw[["Country", "Product", "Date", "Revenue"]]
    # change data types
    .assign(
        Date = pd.to_datetime(data_raw["Date"]), 
        Revenue = pd.to_numeric(data_raw["Revenue"])
    )
)

# print the first few rows
print(data_raw.head())
# filter on specific series
us_ic_raw = data_raw[(data_raw["Country"] == "United States") & (data_raw["Product"] == "Ice Cream")]

# create unique id
us_ic_raw["unique_id"] = us_ic_raw["Country"] + "_" + us_ic_raw["Product"]

# convert date to datetime
us_ic_raw["Date"] = pd.to_datetime(us_ic_raw["Date"])

# plot the data
plt.figure(figsize=(10, 6))
plt.plot(us_ic_raw.index, us_ic_raw["Revenue"], label="Ice Cream Revenue")
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Ice Cream Revenue in United States")
plt.legend()
# get final data for forecasting
us_ic_clean = us_ic_raw[["unique_id", "Date", "Revenue"]].copy()

# set up models to train
sf = StatsForecast(
    models=[AutoARIMA(season_length=12)],
    freq='ME',
)

# fit the model and forecast for 12 months ahead
Y_hat_df = sf.forecast(df = us_ic_clean, 
                       time_col = "Date", 
                       target_col = "Revenue", 
                       id_col = "unique_id", 
                       h=12, 
                       level=[95], 
                       fitted=True)

print(Y_hat_df.head())

# convert date to be first of the month
Y_hat_df["Date"] = Y_hat_df["Date"].dt.to_period("M").dt.to_timestamp()
# concat both df together
future_fcst_df = pd.concat([us_ic_clean, Y_hat_df], axis=0)

# make date the index
future_fcst_df.set_index("Date", inplace=True)

print(future_fcst_df.tail())
# plot the future fcst data
plt.figure(figsize=(10, 6))

# plot the original revenue data as line and forecast as dotted line
plt.plot(future_fcst_df.index, future_fcst_df["Revenue"], label="Actual Revenue")
plt.plot(future_fcst_df.index, future_fcst_df["AutoARIMA"], label="ARIMA Forecast", linestyle='dotted')

# plot the prediction intervals as shaded areas
plt.fill_between(future_fcst_df.index, 
                 future_fcst_df["AutoARIMA-lo-95"], 
                 future_fcst_df["AutoARIMA-hi-95"], 
                 color='gray', alpha=0.2, label='95% Prediction Interval')

# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Forecast Results for US Ice Cream Revenue")
plt.legend()

# save the plot
# plt.savefig("chart1", dpi = 300, bbox_inches = "tight")
# get fitted values of the historical data
# Note: The fitted values are the predicted values for the training data
residual_values = sf.forecast_fitted_values()

# make date the index
residual_values.set_index("Date", inplace=True)

print(residual_values.head())
# plot the historical fitted values
plt.figure(figsize=(10, 6))

# plot the original revenue data as line and forecast as dotted line
plt.plot(residual_values.index, residual_values["Revenue"], label="Actual Revenue")
plt.plot(residual_values.index, residual_values["AutoARIMA"], label="Forecasted Revenue", linestyle='dotted')

# plot the prediction intervals as shaded areas
plt.fill_between(residual_values.index, 
                 residual_values["AutoARIMA-lo-95"], 
                 residual_values["AutoARIMA-hi-95"], 
                 color='gray', alpha=0.2, label='95% Prediction Interval')

# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("ARIMA Residuals for US Ice Cream Revenue")
plt.legend()

# save the plot
# plt.savefig("chart2", dpi = 300, bbox_inches = "tight")
# calculate residuals and plot the residuals directly as a line chart 
residuals = residual_values["Revenue"] - residual_values["AutoARIMA"]

plt.figure(figsize=(10, 6))
plt.plot(residuals.index, residuals, label="Residuals", color='orange')
plt.axhline(y=0, color='red', linestyle='--')
plt.xlabel("Date")
plt.ylabel("Residuals")
plt.title("Residuals Over Time")

# save the plot
# plt.savefig("chart3", dpi = 300, bbox_inches = "tight")
# create histogram of residuals
plt.figure(figsize=(10, 6))
plt.hist(residuals, bins=6, color='blue', alpha=0.7)
plt.axvline(x=0, color='red', linestyle='--')
plt.xlabel("Residuals")
plt.ylabel("Frequency")
plt.title("Histogram of Residuals")

# save the plot
# plt.savefig("chart4", dpi = 300, bbox_inches = "tight")
# create chart that puts the actual values on one side and the fitted values on the other as a scatter plot
plt.figure(figsize=(10, 6))
# plot the actual values
plt.scatter(residual_values["Revenue"], residual_values["AutoARIMA"], label="Fitted Values", alpha=0.5)
# plot the 45 degree line
plt.plot([residual_values["Revenue"].min(), residual_values["Revenue"].max()], 
         [residual_values["Revenue"].min(), residual_values["Revenue"].max()], 
         color='red', linestyle='--', label="45 Degree Line")
# chart formatting
plt.xlabel("Actual Revenue")
plt.ylabel("Fitted Revenue")
plt.title("Q-Q Plot of Actual vs Fitted Values")

# save the plot
# plt.savefig("chart5", dpi = 300, bbox_inches = "tight")
# create ACF plot on the residuals
from statsmodels.graphics.tsaplots import plot_acf
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=24)
plt.title("ACF of Residuals")
plt.xlabel("Lags")
plt.ylabel("Autocorrelation")
plt.legend()

# save the plot
# plt.savefig("chart6", dpi = 300, bbox_inches = "tight")
# create a chart that shows incorrect train test split by including future data in the training set
import numpy as np

# set random seed for reproducibility
np.random.seed(42)

# create a copy of the residual values data
incorrect_split_df = residual_values.copy().reset_index()

# randomly select 20% of the data as test set (INCORRECT for time series!)
test_size = int(len(incorrect_split_df) * 0.2)
test_indices = np.random.choice(incorrect_split_df.index, size=test_size, replace=False)

# create train/test labels
incorrect_split_df['split'] = 'Train'
incorrect_split_df.loc[test_indices, 'split'] = 'Test'

# plot the incorrect split
plt.figure(figsize=(12, 6))

# plot train data
train_data = incorrect_split_df[incorrect_split_df['split'] == 'Train']
plt.scatter(train_data['Date'], train_data['Revenue'], 
           color='blue', label='Training Data', alpha=0.7, s=50)

# plot test data
test_data = incorrect_split_df[incorrect_split_df['split'] == 'Test']
plt.scatter(test_data['Date'], test_data['Revenue'], 
           color='red', label='Test Data (Randomly Selected)', alpha=0.7, s=50, marker='x')

# connect all points with a line to show the time series
plt.plot(incorrect_split_df['Date'], incorrect_split_df['Revenue'], 
        color='gray', alpha=0.3, linewidth=1, zorder=0)

# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("INCORRECT: Random Train-Test Split for Time Series Data")
plt.legend()
plt.grid(True, alpha=0.3)

# add annotation explaining the problem
plt.text(0.5, 0.95, 'Problem: Test data (red X) is scattered throughout time,\nallowing the model to "peek" at future data during training!', 
         transform=plt.gca().transAxes, 
         bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
         verticalalignment='top', horizontalalignment='center', fontsize=9)

# save the plot
# plt.savefig("chart7.png", dpi = 300, bbox_inches = "tight")
# now create a chart showing the CORRECT time-based split
correct_split_df = residual_values.copy().reset_index()

# split by time: last 20% for testing (CORRECT for time series!)
split_point = int(len(correct_split_df) * 0.8)

correct_split_df['split'] = 'Train'
correct_split_df.loc[split_point:, 'split'] = 'Test'

# plot the correct split
plt.figure(figsize=(12, 6))

# plot train data
train_data = correct_split_df[correct_split_df['split'] == 'Train']
plt.scatter(train_data['Date'], train_data['Revenue'], 
           color='blue', label='Training Data', alpha=0.7, s=50)

# plot test data
test_data = correct_split_df[correct_split_df['split'] == 'Test']
plt.scatter(test_data['Date'], test_data['Revenue'], 
           color='green', label='Test Data (Held-Out Period)', alpha=0.7, s=50, marker='s')

# connect all points with a line
plt.plot(correct_split_df['Date'], correct_split_df['Revenue'], 
        color='gray', alpha=0.3, linewidth=1, zorder=0)

# add vertical line at the split point
plt.axvline(x=correct_split_df.loc[split_point, 'Date'], 
           color='red', linestyle='--', linewidth=2, label='Train/Test Split')

# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("CORRECT: Time-Based Train-Test Split for Time Series Data")
plt.legend()
plt.grid(True, alpha=0.3)

# add annotation explaining why this is correct
plt.text(0.5, 0.95, 'Correct: Test data (green squares) comes AFTER all training data,\nno future information leakage!', 
         transform=plt.gca().transAxes, 
         bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5),
         verticalalignment='top', horizontalalignment='center', fontsize=9)

# save the plot
# plt.savefig("chart8.png", dpi = 300, bbox_inches = "tight")
# create a forecast showing historical data for the train split, historical and forecast for the test split, and forecast for the future

# split the data: 80% train, 20% test
split_point = int(len(us_ic_clean) * 0.8)
train_data = us_ic_clean.iloc[:split_point].copy()
test_data = us_ic_clean.iloc[split_point:].copy()

# get the number of periods in test set
test_periods = len(test_data)

# MODEL 1: Train on training data only for test evaluation
sf_train = StatsForecast(
    models=[AutoARIMA(season_length=12)],
    freq='ME',
)

# forecast for test period only
test_forecast = sf_train.forecast(
    df=train_data,
    time_col="Date",
    target_col="Revenue",
    id_col="unique_id",
    h=test_periods,
    level=[95]
)

# convert date to first of month
test_forecast["Date"] = test_forecast["Date"].dt.to_period("M").dt.to_timestamp()

# MODEL 2: Refit on entire dataset for future predictions
sf_full = StatsForecast(
    models=[AutoARIMA(season_length=12)],
    freq='ME',
)

# forecast 12 months into the future using all available data
future_forecast = sf_full.forecast(
    df=us_ic_clean,
    time_col="Date",
    target_col="Revenue",
    id_col="unique_id",
    h=12,
    level=[95]
)

# convert date to first of month
future_forecast["Date"] = future_forecast["Date"].dt.to_period("M").dt.to_timestamp()

# create the visualization
plt.figure(figsize=(14, 7))

# plot training data
plt.plot(train_data["Date"], train_data["Revenue"], 
         color='blue', label='Training Data', linewidth=2)

# plot test data (actual)
plt.plot(test_data["Date"], test_data["Revenue"], 
         color='green', label='Test Data (Actual)', linewidth=2)

# plot test forecast (from model trained on train data only)
plt.plot(test_forecast["Date"], test_forecast["AutoARIMA"], 
         color='orange', label='Test Forecast (Model 1)', linestyle='--', linewidth=2)

# plot future forecast (from model refitted on all data)
plt.plot(future_forecast["Date"], future_forecast["AutoARIMA"], 
         color='red', label='Future Forecast (Model 2 - Refitted)', linestyle='--', linewidth=2)

# add prediction intervals for test period
plt.fill_between(test_forecast["Date"], 
                 test_forecast["AutoARIMA-lo-95"], 
                 test_forecast["AutoARIMA-hi-95"], 
                 color='orange', alpha=0.2, label='Test 95% PI')

# add prediction intervals for future period
plt.fill_between(future_forecast["Date"], 
                 future_forecast["AutoARIMA-lo-95"], 
                 future_forecast["AutoARIMA-hi-95"], 
                 color='red', alpha=0.2, label='Future 95% PI')

# add vertical line at train/test split
plt.axvline(x=train_data["Date"].iloc[-1], 
           color='gray', linestyle=':', linewidth=2, label='Train/Test Split')

# add vertical line at test/future split
plt.axvline(x=test_data["Date"].iloc[-1], 
           color='purple', linestyle=':', linewidth=2, label='Test/Future Split')

# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Train-Test-Future Split with Two Separate Models")
plt.legend(loc='upper left', fontsize=8)
plt.grid(True, alpha=0.3)

# add annotation explaining the two models
plt.text(0.5, 0.02, 'Model 1: Trained on train data only (blue) to forecast test period (orange)\nModel 2: Refitted on ALL data (blue + green) to forecast future (red)', 
         transform=plt.gca().transAxes, 
         bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.7),
         verticalalignment='bottom', horizontalalignment='center', fontsize=8)

# save the plot
plt.savefig("chart9.png", dpi=300, bbox_inches="tight")
plt.show()

# print summary statistics
print(f"\nModel 1 (Train data only):")
print(f"  Train period: {train_data['Date'].min()} to {train_data['Date'].max()}")
print(f"  Test forecast period: {test_forecast['Date'].min()} to {test_forecast['Date'].max()}")
print(f"\nModel 2 (Refitted on all data):")
print(f"  Training period: {us_ic_clean['Date'].min()} to {us_ic_clean['Date'].max()}")
print(f"  Future forecast period: {future_forecast['Date'].min()} to {future_forecast['Date'].max()}")