-
Notifications
You must be signed in to change notification settings - Fork 344
Expand file tree
/
Copy pathchains.py
More file actions
153 lines (119 loc) · 4.25 KB
/
chains.py
File metadata and controls
153 lines (119 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license. # noqa: E501
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.
from collections import deque
import numpy as np
def atsc(IL, IR, j):
"""
Compute the anchored time series chain (ATSC)
Note that since the matrix profile indices, ``IL`` and ``IR``, are pre-computed,
this function is agnostic to subsequence normalization.
Parameters
----------
IL : numpy.ndarray
Left matrix profile indices.
IR : numpy.ndarray
Right matrix profile indices.
j : int
The index value for which to compute the ATSC.
Returns
-------
out : numpy.ndarray
Anchored time series chain for index, ``j``
See Also
--------
stumpy.allc : Compute the all-chain set (ALLC)
Notes
-----
`DOI: 10.1109/ICDM.2017.79 <https://www.cs.ucr.edu/~eamonn/chains_ICDM.pdf>`__
See Table I
This is the implementation for the anchored time series chains (ATSC).
Unlike the original paper, we've replaced the while-loop with a more stable
for-loop.
Examples
--------
>>> import stumpy
>>> import numpy as np
>>> mp = stumpy.stump(np.array([584., -11., 23., 79., 1001., 0., -19.]), m=3)
>>> stumpy.atsc(mp[:, 2], mp[:, 3], 1)
array([1, 3])
>>> # Alternative example using named attributes
>>>
>>> mp = stumpy.stump(np.array([584., -11., 23., 79., 1001., 0., -19.]), m=3)
>>> stumpy.atsc(mp.left_I_, mp.right_I_, 1)
array([1, 3])
"""
C = deque([j])
for i in range(IL.size):
if IR[j] == -1 or IL[IR[j]] != j:
break
else:
j = IR[j]
C.append(j)
out = np.array(list(C), dtype=np.int64)
return out
def allc(IL, IR):
"""
Compute the all-chain set (ALLC)
Note that since the matrix profile indices, ``IL`` and ``IR``, are pre-computed,
this function is agnostic to subsequence normalization.
Parameters
----------
IL : numpy.ndarray
Left matrix profile indices.
IR : numpy.ndarray
Right matrix profile indices.
Returns
-------
S : list(numpy.ndarray)
All-chain set.
C : numpy.ndarray
Anchored time series chain for the longest chain (also known as the unanchored
chain). Note that when there are multiple different chains with length equal to
``len(C)``, then only one chain from this set is returned. You may iterate over
the all-chain set, ``S``, to find all other possible chains with length
``len(C)``.
See Also
--------
stumpy.atsc : Compute the anchored time series chain (ATSC)
Notes
-----
`DOI: 10.1109/ICDM.2017.79 <https://www.cs.ucr.edu/~eamonn/chains_ICDM.pdf>`__
See Table II
Unlike the original paper, we've replaced the while-loop with a more stable
for-loop.
This is the implementation for the all-chain set (ALLC) and the unanchored
chain is simply the longest one among the all-chain set. Both the
all-chain set and unanchored chain are returned.
The all-chain set, ``S``, is returned as a list of unique numpy arrays.
Examples
--------
>>> import stumpy
>>> import numpy as np
>>> mp = stumpy.stump(np.array([584., -11., 23., 79., 1001., 0., -19.]), m=3)
>>> stumpy.allc(mp[:, 2], mp[:, 3])
([array([1, 3]), array([2]), array([0, 4])], array([0, 4]))
>>> # Alternative example using named attributes
>>>
>>> mp = stumpy.stump(np.array([584., -11., 23., 79., 1001., 0., -19.]), m=3)
>>> stumpy.allc(mp.left_I_, mp.right_I_)
([array([1, 3]), array([2]), array([0, 4])], array([0, 4]))
"""
L = np.ones(IL.size, dtype=np.int64)
S = set() # type: ignore
for i in range(IL.size):
if L[i] == 1:
j = i
C = deque([j])
for k in range(IL.size):
if IR[j] == -1 or IL[IR[j]] != j:
break
else:
j = IR[j]
L[j] = -1
L[i] = L[i] + 1
C.append(j)
S.update([tuple(C)])
C = atsc(IL, IR, L.argmax())
S = [np.array(s, dtype=np.int64) for s in S] # type: ignore
return S, C # type: ignore