1
1
from typing import (
2
+ Callable ,
2
3
Optional ,
3
4
Sequence ,
4
5
Tuple ,
5
6
Union
6
7
)
7
8
from datetime import date , timedelta
8
9
from epiweeks import Week , Year
9
-
10
+ import logging
10
11
11
12
def time_value_to_date (value : int ) -> date :
12
13
year , month , day = value // 10000 , (value % 10000 ) // 100 , value % 100
@@ -26,7 +27,7 @@ def week_value_to_week(value: int) -> Week:
26
27
27
28
def guess_time_value_is_day (value : int ) -> bool :
28
29
# YYYYMMDD type and not YYYYMM
29
- return len (str (value )) > 6
30
+ return len (str (value )) == 8
30
31
31
32
def guess_time_value_is_week (value : int ) -> bool :
32
33
# YYYYWW type and not YYYYMMDD
@@ -77,7 +78,7 @@ def weeks_in_range(week_range: Tuple[int, int]) -> int:
77
78
acc += year .totalweeks ()
78
79
return acc + 1 # same week should lead to 1 week that will be queried
79
80
80
- def dates_to_ranges (values : Optional [Sequence [Union [Tuple [int , int ], int ]]]) -> Optional [Sequence [Union [Tuple [int , int ], int ]]]:
81
+ def time_values_to_ranges (values : Optional [Sequence [Union [Tuple [int , int ], int ]]]) -> Optional [Sequence [Union [Tuple [int , int ], int ]]]:
81
82
"""
82
83
Converts a mixed list of dates and date ranges to an optimized list where dates are merged into ranges where possible.
83
84
e.g. [20200101, 20200102, (20200101, 20200104), 20200106] -> [(20200101, 20200104), 20200106]
@@ -87,84 +88,55 @@ def dates_to_ranges(values: Optional[Sequence[Union[Tuple[int, int], int]]]) ->
87
88
return values
88
89
89
90
# determine whether the list is of days (YYYYMMDD) or weeks (YYYYWW) based on first element
90
- try :
91
- if (isinstance (values [0 ], tuple ) and guess_time_value_is_day (values [0 ][0 ]))\
92
- or (isinstance (values [0 ], int ) and guess_time_value_is_day (values [0 ])):
93
- return days_to_ranges (values )
94
- elif (isinstance (values [0 ], tuple ) and guess_time_value_is_week (values [0 ][0 ]))\
95
- or (isinstance (values [0 ], int ) and guess_time_value_is_week (values [0 ])):
96
- return weeks_to_ranges (values )
97
- else :
98
- return values
99
- except :
91
+ first_element = values [0 ][0 ] if isinstance (values [0 ], tuple ) else values [0 ]
92
+ if guess_time_value_is_day (first_element ):
93
+ return days_to_ranges (values )
94
+ elif guess_time_value_is_week (first_element ):
95
+ return weeks_to_ranges (values )
96
+ else :
100
97
return values
101
98
102
99
def days_to_ranges (values : Sequence [Union [Tuple [int , int ], int ]]) -> Sequence [Union [Tuple [int , int ], int ]]:
103
- intervals = []
104
-
105
- # populate list of intervals based on original values
106
- for v in values :
107
- if isinstance (v , int ):
108
- # 20200101 -> [20200101, 20200101]
109
- intervals .append ([time_value_to_date (v ), time_value_to_date (v )])
110
- else : # tuple
111
- # (20200101, 20200102) -> [20200101, 20200102]
112
- intervals .append ([time_value_to_date (v [0 ]), time_value_to_date (v [1 ])])
113
-
114
- intervals .sort (key = lambda x : x [0 ])
115
-
116
- # merge overlapping intervals https://leetcode.com/problems/merge-intervals/
117
- merged = []
118
- for interval in intervals :
119
- # no overlap, append the interval
120
- # caveat: we subtract 1 from interval[0] so that contiguous intervals are considered overlapping. i.e. [1, 1], [2, 2] -> [1, 2]
121
- if not merged or merged [- 1 ][1 ] < interval [0 ] - timedelta (days = 1 ):
122
- merged .append (interval )
123
- # overlap, merge the current and previous intervals
124
- else :
125
- merged [- 1 ][1 ] = max (merged [- 1 ][1 ], interval [1 ])
126
-
127
- # convert intervals from dates back to integers
128
- ranges = []
129
- for m in merged :
130
- if m [0 ] == m [1 ]:
131
- ranges .append (date_to_time_value (m [0 ]))
132
- else :
133
- ranges .append ((date_to_time_value (m [0 ]), date_to_time_value (m [1 ])))
134
-
135
- return ranges
100
+ return _to_ranges (values , time_value_to_date , date_to_time_value , timedelta (days = 1 ))
136
101
137
102
def weeks_to_ranges (values : Sequence [Union [Tuple [int , int ], int ]]) -> Sequence [Union [Tuple [int , int ], int ]]:
138
- intervals = []
139
-
140
- # populate list of intervals based on original values
141
- for v in values :
142
- if isinstance (v , int ):
143
- # 202001 -> [202001, 202001]
144
- intervals .append ([week_value_to_week (v ), week_value_to_week (v )])
145
- else : # tuple
146
- # (202001, 202002) -> [202001, 202002]
147
- intervals .append ([week_value_to_week (v [0 ]), week_value_to_week (v [1 ])])
148
-
149
- intervals .sort (key = lambda x : x [0 ])
150
-
151
- # merge overlapping intervals https://leetcode.com/problems/merge-intervals/
152
- merged = []
153
- for interval in intervals :
154
- # no overlap, append the interval
155
- # caveat: we subtract 1 from interval[0] so that contiguous intervals are considered overlapping. i.e. [1, 1], [2, 2] -> [1, 2]
156
- if not merged or merged [- 1 ][1 ] < interval [0 ] - 1 :
157
- merged .append (interval )
158
- # overlap, merge the current and previous intervals
159
- else :
160
- merged [- 1 ][1 ] = max (merged [- 1 ][1 ], interval [1 ])
161
-
162
- # convert intervals from weeks back to integers
163
- ranges = []
164
- for m in merged :
165
- if m [0 ] == m [1 ]:
166
- ranges .append (week_to_time_value (m [0 ]))
167
- else :
168
- ranges .append ((week_to_time_value (m [0 ]), week_to_time_value (m [1 ])))
169
-
170
- return ranges
103
+ return _to_ranges (values , week_value_to_week , week_to_time_value , 1 )
104
+
105
+ def _to_ranges (values : Sequence [Union [Tuple [int , int ], int ]], value_to_date : Callable , date_to_value : Callable , time_unit : Union [int , timedelta ]) -> Sequence [Union [Tuple [int , int ], int ]]:
106
+ try :
107
+ intervals = []
108
+
109
+ # populate list of intervals based on original date/week values
110
+ for v in values :
111
+ if isinstance (v , int ):
112
+ # 20200101 -> [20200101, 20200101]
113
+ intervals .append ([value_to_date (v ), value_to_date (v )])
114
+ else : # tuple
115
+ # (20200101, 20200102) -> [20200101, 20200102]
116
+ intervals .append ([value_to_date (v [0 ]), value_to_date (v [1 ])])
117
+
118
+ intervals .sort ()
119
+
120
+ # merge overlapping intervals https://leetcode.com/problems/merge-intervals/
121
+ merged = []
122
+ for interval in intervals :
123
+ # no overlap, append the interval
124
+ # caveat: we subtract 1 from interval[0] so that contiguous intervals are considered overlapping. i.e. [1, 1], [2, 2] -> [1, 2]
125
+ if not merged or merged [- 1 ][1 ] < interval [0 ] - time_unit :
126
+ merged .append (interval )
127
+ # overlap, merge the current and previous intervals
128
+ else :
129
+ merged [- 1 ][1 ] = max (merged [- 1 ][1 ], interval [1 ])
130
+
131
+ # convert intervals from dates/weeks back to integers
132
+ ranges = []
133
+ for m in merged :
134
+ if m [0 ] == m [1 ]:
135
+ ranges .append (date_to_value (m [0 ]))
136
+ else :
137
+ ranges .append ((date_to_value (m [0 ]), date_to_value (m [1 ])))
138
+
139
+ return ranges
140
+ except Exception as e :
141
+ logging .info ('bad input to date ranges' , input = values , exception = e )
142
+ return values
0 commit comments