# import libraries
import pandas as pd
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA
import matplotlib.pyplot as pltPython Code
# read data
data_raw = pd.read_csv("../posts/2024-10-02-ts-fundamentals-whats-a-time-series/example_ts_data.csv")
data_raw = (
# select columns
data_raw[["Country", "Product", "Date", "Revenue"]]
# change data types
.assign(
Date = pd.to_datetime(data_raw["Date"]),
Revenue = pd.to_numeric(data_raw["Revenue"])
)
)
# print the first few rows
print(data_raw.head())# filter on specific series
us_ic_raw = data_raw[(data_raw["Country"] == "United States") & (data_raw["Product"] == "Ice Cream")]
# create unique id
us_ic_raw["unique_id"] = us_ic_raw["Country"] + "_" + us_ic_raw["Product"]
# convert date to datetime
us_ic_raw["Date"] = pd.to_datetime(us_ic_raw["Date"])
# plot the data
plt.figure(figsize=(10, 6))
plt.plot(us_ic_raw.index, us_ic_raw["Revenue"], label="Ice Cream Revenue")
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Ice Cream Revenue in United States")
plt.legend()# get final data for forecasting
us_ic_clean = us_ic_raw[["unique_id", "Date", "Revenue"]].copy()
# set up models to train
sf = StatsForecast(
models=[AutoARIMA(season_length=12)],
freq='ME',
)
# fit the model and forecast for 12 months ahead
Y_hat_df = sf.forecast(df = us_ic_clean,
time_col = "Date",
target_col = "Revenue",
id_col = "unique_id",
h=12,
level=[95],
fitted=True)
print(Y_hat_df.head())
# convert date to be first of the month
Y_hat_df["Date"] = Y_hat_df["Date"].dt.to_period("M").dt.to_timestamp()# concat both df together
future_fcst_df = pd.concat([us_ic_clean, Y_hat_df], axis=0)
# make date the index
future_fcst_df.set_index("Date", inplace=True)
print(future_fcst_df.tail())# plot the future fcst data
plt.figure(figsize=(10, 6))
# plot the original revenue data as line and forecast as dotted line
plt.plot(future_fcst_df.index, future_fcst_df["Revenue"], label="Actual Revenue")
plt.plot(future_fcst_df.index, future_fcst_df["AutoARIMA"], label="ARIMA Forecast", linestyle='dotted')
# plot the prediction intervals as shaded areas
plt.fill_between(future_fcst_df.index,
future_fcst_df["AutoARIMA-lo-95"],
future_fcst_df["AutoARIMA-hi-95"],
color='gray', alpha=0.2, label='95% Prediction Interval')
# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Forecast Results for US Ice Cream Revenue")
plt.legend()
# save the plot
# plt.savefig("chart1", dpi = 300, bbox_inches = "tight")# get fitted values of the historical data
# Note: The fitted values are the predicted values for the training data
residual_values = sf.forecast_fitted_values()
# make date the index
residual_values.set_index("Date", inplace=True)
print(residual_values.head())# plot the historical fitted values
plt.figure(figsize=(10, 6))
# plot the original revenue data as line and forecast as dotted line
plt.plot(residual_values.index, residual_values["Revenue"], label="Actual Revenue")
plt.plot(residual_values.index, residual_values["AutoARIMA"], label="Forecasted Revenue", linestyle='dotted')
# plot the prediction intervals as shaded areas
plt.fill_between(residual_values.index,
residual_values["AutoARIMA-lo-95"],
residual_values["AutoARIMA-hi-95"],
color='gray', alpha=0.2, label='95% Prediction Interval')
# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("ARIMA Residuals for US Ice Cream Revenue")
plt.legend()
# save the plot
# plt.savefig("chart2", dpi = 300, bbox_inches = "tight")# calculate residuals and plot the residuals directly as a line chart
residuals = residual_values["Revenue"] - residual_values["AutoARIMA"]
plt.figure(figsize=(10, 6))
plt.plot(residuals.index, residuals, label="Residuals", color='orange')
plt.axhline(y=0, color='red', linestyle='--')
plt.xlabel("Date")
plt.ylabel("Residuals")
plt.title("Residuals Over Time")
# save the plot
# plt.savefig("chart3", dpi = 300, bbox_inches = "tight")# create histogram of residuals
plt.figure(figsize=(10, 6))
plt.hist(residuals, bins=6, color='blue', alpha=0.7)
plt.axvline(x=0, color='red', linestyle='--')
plt.xlabel("Residuals")
plt.ylabel("Frequency")
plt.title("Histogram of Residuals")
# save the plot
# plt.savefig("chart4", dpi = 300, bbox_inches = "tight")# create chart that puts the actual values on one side and the fitted values on the other as a scatter plot
plt.figure(figsize=(10, 6))
# plot the actual values
plt.scatter(residual_values["Revenue"], residual_values["AutoARIMA"], label="Fitted Values", alpha=0.5)
# plot the 45 degree line
plt.plot([residual_values["Revenue"].min(), residual_values["Revenue"].max()],
[residual_values["Revenue"].min(), residual_values["Revenue"].max()],
color='red', linestyle='--', label="45 Degree Line")
# chart formatting
plt.xlabel("Actual Revenue")
plt.ylabel("Fitted Revenue")
plt.title("Q-Q Plot of Actual vs Fitted Values")
# save the plot
# plt.savefig("chart5", dpi = 300, bbox_inches = "tight")# create ACF plot on the residuals
from statsmodels.graphics.tsaplots import plot_acf
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=24)
plt.title("ACF of Residuals")
plt.xlabel("Lags")
plt.ylabel("Autocorrelation")
plt.legend()
# save the plot
# plt.savefig("chart6", dpi = 300, bbox_inches = "tight")# create a chart that shows incorrect train test split by including future data in the training set
import numpy as np
# set random seed for reproducibility
np.random.seed(42)
# create a copy of the residual values data
incorrect_split_df = residual_values.copy().reset_index()
# randomly select 20% of the data as test set (INCORRECT for time series!)
test_size = int(len(incorrect_split_df) * 0.2)
test_indices = np.random.choice(incorrect_split_df.index, size=test_size, replace=False)
# create train/test labels
incorrect_split_df['split'] = 'Train'
incorrect_split_df.loc[test_indices, 'split'] = 'Test'
# plot the incorrect split
plt.figure(figsize=(12, 6))
# plot train data
train_data = incorrect_split_df[incorrect_split_df['split'] == 'Train']
plt.scatter(train_data['Date'], train_data['Revenue'],
color='blue', label='Training Data', alpha=0.7, s=50)
# plot test data
test_data = incorrect_split_df[incorrect_split_df['split'] == 'Test']
plt.scatter(test_data['Date'], test_data['Revenue'],
color='red', label='Test Data (Randomly Selected)', alpha=0.7, s=50, marker='x')
# connect all points with a line to show the time series
plt.plot(incorrect_split_df['Date'], incorrect_split_df['Revenue'],
color='gray', alpha=0.3, linewidth=1, zorder=0)
# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("INCORRECT: Random Train-Test Split for Time Series Data")
plt.legend()
plt.grid(True, alpha=0.3)
# add annotation explaining the problem
plt.text(0.5, 0.95, 'Problem: Test data (red X) is scattered throughout time,\nallowing the model to "peek" at future data during training!',
transform=plt.gca().transAxes,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
verticalalignment='top', horizontalalignment='center', fontsize=9)
# save the plot
# plt.savefig("chart7.png", dpi = 300, bbox_inches = "tight")# now create a chart showing the CORRECT time-based split
correct_split_df = residual_values.copy().reset_index()
# split by time: last 20% for testing (CORRECT for time series!)
split_point = int(len(correct_split_df) * 0.8)
correct_split_df['split'] = 'Train'
correct_split_df.loc[split_point:, 'split'] = 'Test'
# plot the correct split
plt.figure(figsize=(12, 6))
# plot train data
train_data = correct_split_df[correct_split_df['split'] == 'Train']
plt.scatter(train_data['Date'], train_data['Revenue'],
color='blue', label='Training Data', alpha=0.7, s=50)
# plot test data
test_data = correct_split_df[correct_split_df['split'] == 'Test']
plt.scatter(test_data['Date'], test_data['Revenue'],
color='green', label='Test Data (Held-Out Period)', alpha=0.7, s=50, marker='s')
# connect all points with a line
plt.plot(correct_split_df['Date'], correct_split_df['Revenue'],
color='gray', alpha=0.3, linewidth=1, zorder=0)
# add vertical line at the split point
plt.axvline(x=correct_split_df.loc[split_point, 'Date'],
color='red', linestyle='--', linewidth=2, label='Train/Test Split')
# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("CORRECT: Time-Based Train-Test Split for Time Series Data")
plt.legend()
plt.grid(True, alpha=0.3)
# add annotation explaining why this is correct
plt.text(0.5, 0.95, 'Correct: Test data (green squares) comes AFTER all training data,\nno future information leakage!',
transform=plt.gca().transAxes,
bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5),
verticalalignment='top', horizontalalignment='center', fontsize=9)
# save the plot
# plt.savefig("chart8.png", dpi = 300, bbox_inches = "tight")# create a forecast showing historical data for the train split, historical and forecast for the test split, and forecast for the future
# split the data: 80% train, 20% test
split_point = int(len(us_ic_clean) * 0.8)
train_data = us_ic_clean.iloc[:split_point].copy()
test_data = us_ic_clean.iloc[split_point:].copy()
# get the number of periods in test set
test_periods = len(test_data)
# MODEL 1: Train on training data only for test evaluation
sf_train = StatsForecast(
models=[AutoARIMA(season_length=12)],
freq='ME',
)
# forecast for test period only
test_forecast = sf_train.forecast(
df=train_data,
time_col="Date",
target_col="Revenue",
id_col="unique_id",
h=test_periods,
level=[95]
)
# convert date to first of month
test_forecast["Date"] = test_forecast["Date"].dt.to_period("M").dt.to_timestamp()
# MODEL 2: Refit on entire dataset for future predictions
sf_full = StatsForecast(
models=[AutoARIMA(season_length=12)],
freq='ME',
)
# forecast 12 months into the future using all available data
future_forecast = sf_full.forecast(
df=us_ic_clean,
time_col="Date",
target_col="Revenue",
id_col="unique_id",
h=12,
level=[95]
)
# convert date to first of month
future_forecast["Date"] = future_forecast["Date"].dt.to_period("M").dt.to_timestamp()
# create the visualization
plt.figure(figsize=(14, 7))
# plot training data
plt.plot(train_data["Date"], train_data["Revenue"],
color='blue', label='Training Data', linewidth=2)
# plot test data (actual)
plt.plot(test_data["Date"], test_data["Revenue"],
color='green', label='Test Data (Actual)', linewidth=2)
# plot test forecast (from model trained on train data only)
plt.plot(test_forecast["Date"], test_forecast["AutoARIMA"],
color='orange', label='Test Forecast (Model 1)', linestyle='--', linewidth=2)
# plot future forecast (from model refitted on all data)
plt.plot(future_forecast["Date"], future_forecast["AutoARIMA"],
color='red', label='Future Forecast (Model 2 - Refitted)', linestyle='--', linewidth=2)
# add prediction intervals for test period
plt.fill_between(test_forecast["Date"],
test_forecast["AutoARIMA-lo-95"],
test_forecast["AutoARIMA-hi-95"],
color='orange', alpha=0.2, label='Test 95% PI')
# add prediction intervals for future period
plt.fill_between(future_forecast["Date"],
future_forecast["AutoARIMA-lo-95"],
future_forecast["AutoARIMA-hi-95"],
color='red', alpha=0.2, label='Future 95% PI')
# add vertical line at train/test split
plt.axvline(x=train_data["Date"].iloc[-1],
color='gray', linestyle=':', linewidth=2, label='Train/Test Split')
# add vertical line at test/future split
plt.axvline(x=test_data["Date"].iloc[-1],
color='purple', linestyle=':', linewidth=2, label='Test/Future Split')
# chart formatting
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Train-Test-Future Split with Two Separate Models")
plt.legend(loc='upper left', fontsize=8)
plt.grid(True, alpha=0.3)
# add annotation explaining the two models
plt.text(0.5, 0.02, 'Model 1: Trained on train data only (blue) to forecast test period (orange)\nModel 2: Refitted on ALL data (blue + green) to forecast future (red)',
transform=plt.gca().transAxes,
bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.7),
verticalalignment='bottom', horizontalalignment='center', fontsize=8)
# save the plot
plt.savefig("chart9.png", dpi=300, bbox_inches="tight")
plt.show()
# print summary statistics
print(f"\nModel 1 (Train data only):")
print(f" Train period: {train_data['Date'].min()} to {train_data['Date'].max()}")
print(f" Test forecast period: {test_forecast['Date'].min()} to {test_forecast['Date'].max()}")
print(f"\nModel 2 (Refitted on all data):")
print(f" Training period: {us_ic_clean['Date'].min()} to {us_ic_clean['Date'].max()}")
print(f" Future forecast period: {future_forecast['Date'].min()} to {future_forecast['Date'].max()}")