Python Code

# import libraries
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from statsmodels.tsa.seasonal import STL
from statsmodels.tsa.deterministic import Fourier
from scipy.interpolate import interp1d
# read data
data_raw = pd.read_csv("../posts/2024-10-02-ts-fundamentals-whats-a-time-series/example_ts_data.csv")

data_raw = (
    # select columns
    data_raw[["Country", "Product", "Date", "Revenue"]]
    # change data types
    .assign(
        Date = pd.to_datetime(data_raw["Date"]), 
        Revenue = pd.to_numeric(data_raw["Revenue"])
    )
)

# print the first few rows
print(data_raw.head())
# filter on specific series
us_ck_raw = data_raw[(data_raw["Country"] == "United States") & (data_raw["Product"] == "Cookies")]

us_ck_raw.set_index("Date", inplace=True)

print(us_ck_raw.head())
# randomly replace some revenue values with NaN
np.random.seed(123)

us_ck_mv = us_ck_raw.copy()

us_ck_mv.loc[us_ck_mv.sample(frac=0.03).index, "Revenue"] = np.nan

print(us_ck_mv.head())
# plot the series
plt.figure(figsize=(10, 6))
plt.plot(us_ck_mv.index, us_ck_raw["Revenue"], label = "Cookie Revenue", color = "blue")
plt.title("US Cookie Revenue - with Missing Values")
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.grid(True)

# save the plot
# plt.savefig("chart1", dpi = 300, bbox_inches = "tight")
# Function to interpolate missing values
def na_interp(ts, freq=None, lambda_=None, use_linear=None):
    # Ensure ts is a pandas Series
    if not isinstance(ts, pd.Series):
        raise ValueError("Input `ts` must be a pandas Series.")

    # Handle cases where there are no missing values
    if ts.isna().sum() == 0:
        return ts

    # Frequency determination
    if freq is None:
        freq = pd.infer_freq(ts.index) or 1

    # Automatic linear interpolation decision
    if use_linear is None:
        use_linear = freq <= 1 or ts.notna().sum() <= 2 * freq

    # Apply Box-Cox transformation if lambda_ is provided
    def boxcox_transform(series, lmbda):
        if lmbda is None:
            return series
        if lmbda == 0:
            return np.log(series)
        return (series ** lmbda - 1) / lmbda

    def inv_boxcox_transform(series, lmbda):
        if lmbda is None:
            return series
        if lmbda == 0:
            return np.exp(series)
        return (series * lmbda + 1) ** (1 / lmbda)

    ts_original = ts.copy()
    ts = ts.astype(float)
    if lambda_ is not None:
        ts = boxcox_transform(ts, lambda_)

    if use_linear:
        # Linear interpolation
        ts_interpolated = ts.interpolate(method='linear', limit_direction='both')
    else:
        # Seasonal interpolation
        ts_filled = ts.interpolate(method='linear', limit_direction='both')  # Pre-fill gaps for STL
        stl = STL(ts_filled, seasonal=freq, robust=True).fit()
        
        # Interpolate seasonally adjusted series
        sa = stl.trend + stl.resid  # Seasonally adjusted component
        
        # Convert DatetimeIndex to numeric values for interpolation
        idx_numeric = ts.index.astype(int) / 10**9  # Convert to seconds since epoch
        sa_interp_func = interp1d(
            idx_numeric[~ts.isna()], 
            sa[~ts.isna()], 
            bounds_error=False, 
            fill_value="extrapolate"
        )
        sa_interp = sa_interp_func(idx_numeric)

        # Add back the seasonal component
        ts_interpolated = pd.Series(sa_interp + stl.seasonal, index=ts.index)

    # Back-transform if Box-Cox was applied
    if lambda_ is not None:
        ts_interpolated = inv_boxcox_transform(ts_interpolated, lambda_)

    # Check for stability (fallback to linear if unstable)
    if not use_linear:
        ts_range = ts_original.dropna().max() - ts_original.dropna().min()
        if (ts_interpolated.max() > ts_original.max() + 0.5 * ts_range or 
            ts_interpolated.min() < ts_original.min() - 0.5 * ts_range):
            return na_interp(ts_original, freq=freq, lambda_=lambda_, use_linear=True)

    return ts_interpolated
# get the series with missing values and convert to float
us_ck_missing = us_ck_mv["Revenue"]
us_ck_missing = np.array(us_ck_missing, dtype=float)

# get the monthly date range, using first day of month
date_range = pd.date_range(start="2019-01-01", periods=60, freq="M")  # Monthly frequency

# Create a pandas Series
ts = pd.Series(us_ck_missing, index=date_range)

# Interpolating missing values
ts_interpolated = na_interp(ts, freq=11)

# Plot the results, with origin on top
plt.figure(figsize=(10, 6))
plt.plot(ts_interpolated, label="Interpolated", color="red", linestyle="--")
plt.plot(ts, label="Original", color="blue")
plt.legend()
plt.grid(True)
plt.title("US Cookie Revenue - with Missing Values Interpolated")
plt.xlabel("Date")
plt.ylabel("Revenue")

# save the plot
# plt.savefig("chart2", dpi = 300, bbox_inches = "tight")
# STL decomposition
stl = STL(us_ck_raw["Revenue"], seasonal = 13, period = 12)
res = stl.fit()

# plot the decomposition in a stacked chart
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize = (10, 8))

# original data
ax1.plot(res.observed, label = "Original", color = "blue")
ax1.set_title("Original Data")
ax1.set_ylabel("Revenue")
ax1.grid(True)
ax1.legend()

# trend
ax2.plot(res.trend, label = "Trend", color = "red")
ax2.set_title("Trend")
ax2.set_ylabel("Revenue")
ax2.grid(True)
ax2.legend()

# seasonal
ax3.plot(res.seasonal, label = "Seasonal", color = "green")
ax3.set_title("Seasonal")
ax3.set_ylabel("Revenue")
ax3.grid(True)
ax3.legend()

# residual
ax4.plot(res.resid, label = "Residual", color = "orange")
ax4.set_title("Residual")
ax4.set_xlabel("Date")
ax4.set_ylabel("Revenue")
ax4.grid(True)
ax4.legend()

# Formatting and final touches
plt.xlabel("Date")
plt.tight_layout()

# save the plot
# plt.savefig("chart3", dpi = 300, bbox_inches = "tight")
# calculate outliers based on residuals
residuals = res.resid
residuals_mean = residuals.mean()
residuals_std = residuals.std()

# calculate the z-score
z_score = (residuals - residuals_mean) / residuals_std

# identify the outliers
outliers = z_score[abs(z_score) > 3]

# plot the residuals
plt.figure(figsize=(10, 6))
plt.plot(residuals, label = "Residuals", color = "orange")
# plt.axhline(y = 0, color = "black", linestyle = "--")
plt.axhline(y = residuals_mean + 3 * residuals_std, color = "red", linestyle = "--")
plt.axhline(y = residuals_mean - 3 * residuals_std, color = "red", linestyle = "--")
# plt.scatter(outliers.index, outliers, color = "red", label = "Outliers")
plt.title("Residuals with Outliers")
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.legend()
plt.grid(True)

# save the plot
# plt.savefig("chart4", dpi = 300, bbox_inches = "tight")