-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdiebold_mariano_test.py
131 lines (118 loc) · 4.72 KB
/
diebold_mariano_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def dm_test(actual_lst, pred1_lst, pred2_lst, h=1, crit="MSE", power=2):
# Routine for checking errors
def error_check():
rt = 0
msg = ""
# Check if h is an integer
if not isinstance(h, int):
rt = -1
msg = "The type of the number of steps ahead (h) is not an integer."
return rt, msg
# Check the range of h
if h < 1:
rt = -1
msg = "The number of steps ahead (h) is not large enough."
return rt, msg
len_act = len(actual_lst)
len_p1 = len(pred1_lst)
len_p2 = len(pred2_lst)
# Check if lengths of actual values and predicted values are equal
if len_act != len_p1 or len_p1 != len_p2 or len_act != len_p2:
rt = -1
msg = "Lengths of actual_lst, pred1_lst and pred2_lst do not match."
return (rt, msg)
# Check range of h
if h >= len_act:
rt = -1
msg = "The number of steps ahead is too large."
return (rt, msg)
# Check if criterion supported
if crit != "MSE" and crit != "MAPE" and crit != "MAD" and crit != "poly":
rt = -1
msg = "The criterion is not supported."
return rt, msg
# Check if every value of the input lists are numerical values
from re import compile as re_compile
comp = re_compile("^\d+?\.\d+?$")
def compiled_regex(s):
""" Returns True is string is a number. """
if comp.match(s) is None:
return s.isdigit()
return True
for actual, pred1, pred2 in zip(actual_lst, pred1_lst, pred2_lst):
is_actual_ok = compiled_regex(str(abs(actual)))
is_pred1_ok = compiled_regex(str(abs(pred1)))
is_pred2_ok = compiled_regex(str(abs(pred2)))
if not (is_actual_ok and is_pred1_ok and is_pred2_ok):
msg = "An element in the actual_lst, pred1_lst or pred2_lst is not numeric."
rt = -1
return rt, msg
return rt, msg
# Error check
error_code = error_check()
# Raise error if cannot pass error check
if error_code[0] == -1:
raise SyntaxError(error_code[1])
return
# Import libraries
from scipy.stats import t
import collections
import pandas as pd
import numpy as np
# Initialise lists
e1_lst = []
e2_lst = []
d_lst = []
# convert every value of the lists into real values
actual_lst = pd.Series(actual_lst).apply(lambda x: float(x)).tolist()
pred1_lst = pd.Series(pred1_lst).apply(lambda x: float(x)).tolist()
pred2_lst = pd.Series(pred2_lst).apply(lambda x: float(x)).tolist()
# Length of lists (as real numbers)
T = float(len(actual_lst))
# construct d according to crit
if crit == "MSE":
for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
e1_lst.append((actual - p1) ** 2)
e2_lst.append((actual - p2) ** 2)
for e1, e2 in zip(e1_lst, e2_lst):
d_lst.append(e1 - e2)
elif crit == "MAD":
for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
e1_lst.append(abs(actual - p1))
e2_lst.append(abs(actual - p2))
for e1, e2 in zip(e1_lst, e2_lst):
d_lst.append(e1 - e2)
elif crit == "MAP":
for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
e1_lst.append(abs((actual - p1) / actual))
e2_lst.append(abs((actual - p2) / actual))
for e1, e2 in zip(e1_lst, e2_lst):
d_lst.append(e1 - e2)
elif crit == "poly":
for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
e1_lst.append(((actual - p1)) ** (power))
e2_lst.append(((actual - p2)) ** (power))
for e1, e2 in zip(e1_lst, e2_lst):
d_lst.append(e1 - e2)
# Mean of d
mean_d = pd.Series(d_lst).mean()
# Find autocovariance and construct DM test statistics
def autocovariance(Xi, N, k, Xs):
autoCov = 0
T = float(N)
for i in np.arange(0, N - k):
autoCov += ((Xi[i + k]) - Xs) * (Xi[i] - Xs)
return (1 / (T)) * autoCov
gamma = []
for lag in range(0, h):
gamma.append(autocovariance(d_lst, len(d_lst), lag, mean_d)) # 0, 1, 2
V_d = (gamma[0] + 2 * sum(gamma[1:])) / T
DM_stat = V_d ** (-0.5) * mean_d
harvey_adj = ((T + 1 - 2 * h + h * (h - 1) / T) / T) ** (0.5)
DM_stat = harvey_adj * DM_stat
# Find p-value
p_value = 2 * t.cdf(-abs(DM_stat), df=T - 1)
# Construct named tuple for return
dm_return = collections.namedtuple('dm_return', 'DM p_value')
rt = dm_return(DM=DM_stat, p_value=p_value)
return rt