# import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
from sklearn.feature_selection import mutual_info_regressionPython Code
# read data
data_raw = pd.read_csv("../posts/2024-10-02-ts-fundamentals-whats-a-time-series/example_ts_data.csv")
data_raw = (
# select columns
data_raw[["Country", "Product", "Date", "Revenue"]]
# change data types
.assign(
Date = pd.to_datetime(data_raw["Date"]),
Revenue = pd.to_numeric(data_raw["Revenue"])
)
)
# print the first few rows
print(data_raw.head())# filter on specific series
us_ck_raw = data_raw[(data_raw["Country"] == "United States") & (data_raw["Product"] == "Cookies")]
us_ck_raw.set_index("Date", inplace=True)
print(us_ck_raw.head())
# plot the data
plt.figure(figsize=(10, 6))
plt.plot(us_ck_raw.index, us_ck_raw["Revenue"], label="Cookies Revenue")
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Cookies Revenue in the United States")
plt.legend()# Set a random seed for reproducibility
random.seed(15)
us_ck_corr = us_ck_raw.copy()
# Generate a new variable with strong correlation
correlation_target = 0.9
n = us_ck_corr.shape[0]
noise = np.random.randn(n)
# Orthogonalize noise to the original variable to ensure independence
noise = noise - np.dot(noise, us_ck_corr["Revenue"]) / np.dot(us_ck_corr["Revenue"], us_ck_corr["Revenue"]) * us_ck_corr["Revenue"]
# Scale the orthogonalized noise to match the desired correlation
noise = noise / np.linalg.norm(noise) * np.sqrt(1 - correlation_target**2) * np.linalg.norm(us_ck_corr["Revenue"])
# Create the new variable
us_ck_corr["xreg1"] = correlation_target * us_ck_corr["Revenue"] + noise
# Verify the correlation
correlation = us_ck_corr["Revenue"].corr(us_ck_corr["xreg1"])
print(f"Correlation between Original and New_Var: {correlation:.4f}")
# create a variable that has a weak correlation to the revenue column, create random values between 1 and 100
us_ck_corr["xreg2"] = random.sample(range(1, 100), us_ck_corr.shape[0])
# print the first few rows, formatting numbers to 2 decimal places
print(us_ck_corr.head(10).round(2))
# plot the data with the new variables as dotted lines
plt.figure(figsize=(10, 6))
plt.plot(us_ck_corr.index, us_ck_corr["Revenue"], label="Cookies Revenue")
plt.plot(us_ck_corr.index, us_ck_corr["xreg1"], label="xreg1", linestyle = "dotted")
plt.plot(us_ck_corr.index, us_ck_corr["xreg2"], label="xreg2", linestyle = "dotted")
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.title("Cookies Revenue in the United States")
plt.legend()
# save the plot
# plt.savefig("chart1", dpi = 300, bbox_inches = "tight")# calculate the correlation between the target variable and the new variables, dropping the date, country, and product columns
correlation = us_ck_corr.drop(columns=["Country", "Product"]).corr()
# create a simple table to display the correlation values
correlation_table = correlation.stack().reset_index()
correlation_table.columns = ["Variable 1", "Variable 2", "Correlation"]
correlation_table = correlation_table[correlation_table["Variable 1"] == "Revenue"]
correlation_table = correlation_table[correlation_table["Variable 2"] != "Revenue"]
# print the correlation table, rounding the values to 2 decimal places
print(correlation_table.round(2))# calculate lags of the xreg1 and xreg2 columns. Create 1, 2, 3, 6, 9, 12 lags
us_ck_corr["xreg1_lag_1"] = us_ck_corr["xreg1"].shift(1)
us_ck_corr["xreg1_lag_2"] = us_ck_corr["xreg1"].shift(2)
us_ck_corr["xreg1_lag_3"] = us_ck_corr["xreg1"].shift(3)
us_ck_corr["xreg1_lag_6"] = us_ck_corr["xreg1"].shift(6)
us_ck_corr["xreg1_lag_9"] = us_ck_corr["xreg1"].shift(9)
us_ck_corr["xreg1_lag_12"] = us_ck_corr["xreg1"].shift(12)
us_ck_corr["xreg2_lag_1"] = us_ck_corr["xreg2"].shift(1)
us_ck_corr["xreg2_lag_2"] = us_ck_corr["xreg2"].shift(2)
us_ck_corr["xreg2_lag_3"] = us_ck_corr["xreg2"].shift(3)
us_ck_corr["xreg2_lag_6"] = us_ck_corr["xreg2"].shift(6)
us_ck_corr["xreg2_lag_9"] = us_ck_corr["xreg2"].shift(9)
us_ck_corr["xreg2_lag_12"] = us_ck_corr["xreg2"].shift(12)
# calculate the correlation between the target variable and the new variables, dropping the date, country, and product columns
relation = us_ck_corr.drop(columns=["Country", "Product"]).corr()
# create a simple table to display the correlation values
lag_table = relation.stack().reset_index()
lag_table.columns = ["Variable 1", "Variable 2", "Correlation"]
lag_table = lag_table[lag_table["Variable 1"] == "Revenue"]
lag_table = lag_table[lag_table["Variable 2"] != "Revenue"]
# print the correlation table, rounding the values to 2 decimal places
print(lag_table.round(2))# create mutual information function
def calculate_mutual_information(x, y, n_neighbors=3, discrete_features=False, random_state=123):
"""
Calculates the Mutual Information (MI) score between two variables using mutual_info_regression.
Parameters:
x (array-like): First variable (time series or feature).
y (array-like): Second variable (time series or feature).
n_neighbors (int): Number of neighbors to use for density estimation (default is 3).
discrete_features (bool): Whether the features are discrete (default is False).
Returns:
float: Mutual Information (MI) score.
"""
# Ensure inputs are numpy arrays
x = np.array(x).reshape(-1, 1) # Reshape x to 2D as required by mutual_info_regression
y = np.array(y)
# Calculate Mutual Information
mi_score = mutual_info_regression(x, y, n_neighbors=n_neighbors, discrete_features=discrete_features, random_state=random_state)
return mi_score[0] # Return the MI score (a single value since x has one feature)# copy the data
us_ck_mi = us_ck_corr.copy()
# drop NaN values
us_ck_mi.dropna(inplace=True)
# Define variables and their lags
variables = ["xreg1", "xreg2"]
lags = [0, 1, 2, 3, 6, 9, 12]
# Initialize results
mi_results = []
# Calculate MI dynamically for variables and their lags
for var in variables:
for lag in lags:
col_name = f"{var}_lag_{lag}" if lag != 0 else var
if col_name in us_ck_mi.columns:
mi_score = calculate_mutual_information(us_ck_mi[col_name], us_ck_mi["Revenue"])
mi_results.append({"Variable 1": "Revenue", "Variable 2": col_name, "Mutual Information": mi_score})
# Create a DataFrame
mi_table = pd.DataFrame(mi_results)
# Print the table rounded to 2 decimal places
print(mi_table.round(2))# create a binary variable that is 1 in the year 2020 and 0 otherwise
us_ck_binary = us_ck_corr.copy()
us_ck_binary["COVID_Flag"] = us_ck_binary.index.year == 2020
us_ck_binary["COVID_Flag"] = us_ck_binary["COVID_Flag"].astype(int)
print(us_ck_binary[["Country", "Product", "Revenue", "COVID_Flag"]].head())
# calculate the mutual information between the binary variable and the revenue column
mi_score = calculate_mutual_information(us_ck_binary["COVID_Flag"], us_ck_binary["Revenue"])
print(f"Mutual Information between COVID Flag and Revenue: {mi_score:.4f}")
# create a randrom variable that is 1 with a probability of 0.5 and 0 otherwise
us_ck_random = us_ck_corr.copy()
us_ck_random["Random_Flag"] = np.random.choice([0, 1], us_ck_random.shape[0], p=[0.5, 0.5])
print(us_ck_random[["Country", "Product", "Revenue", "Random_Flag"]].head())
# calculate the mutual information between the random variable and the revenue column
mi_score = calculate_mutual_information(us_ck_random["Random_Flag"], us_ck_random["Revenue"])
print(f"Mutual Information between Random Flag and Revenue: {mi_score:.4f}")