Skip to content

Commit 7f04b5d

Browse files
authored
✨ v3.1.3 Fix integer pipes, add self syntax for symlinking. (#224)
# v3.1.3 - **Fix syncing pipes with integer datetimes.** An issue with dtype detection has been fixed, improving performance for pipes with integer datetimes. - **Fix recursive parameters references.** A single pipe may now reference itself when symlinking parameters. - **Add `self` syntax for parameters symlinking.** Pipes may now use `self` in parameters rather than requiring the fully qualified name: ```python import meerschaum as mrsm pipe = mrsm.Pipe( 'sql:memory', 'symlink', 'self', instance='sql:memory', parameters={ "custom": { "value": 123, }, "sql": "SELECT * FROM foo WHERE val = {{ self.parameters['custom']['value'] }}", }, ) print(pipe.parameters['sql']) # SELECT * FROM foo WHERE val = 123 ```
1 parent b90f3e2 commit 7f04b5d

File tree

8 files changed

+101
-12
lines changed

8 files changed

+101
-12
lines changed

CHANGELOG.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,35 @@
44

55
This is the current release cycle, so stay tuned for future releases!
66

7+
### v3.1.3
8+
9+
- **Fix syncing pipes with integer datetimes.**
10+
An issue with dtype detection has been fixed, improving performance for pipes with integer datetimes.
11+
12+
- **Fix recursive parameters references.**
13+
A single pipe may now reference itself when symlinking parameters.
14+
15+
- **Add `self` syntax for parameters symlinking.**
16+
Pipes may now use `self` in parameters rather than requiring the fully qualified name:
17+
18+
```python
19+
import meerschaum as mrsm
20+
21+
pipe = mrsm.Pipe(
22+
'sql:memory', 'symlink', 'self',
23+
instance='sql:memory',
24+
parameters={
25+
"custom": {
26+
"value": 123,
27+
},
28+
"sql": "SELECT * FROM foo WHERE val = {{ self.parameters['custom']['value'] }}",
29+
},
30+
)
31+
32+
print(pipe.parameters['sql'])
33+
# SELECT * FROM foo WHERE val = 123
34+
```
35+
736
### v3.1.2
837

938
- **Improve filtering logic for non-datetime pipes.**

docs/mkdocs/news/changelog.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,35 @@
44

55
This is the current release cycle, so stay tuned for future releases!
66

7+
### v3.1.3
8+
9+
- **Fix syncing pipes with integer datetimes.**
10+
An issue with dtype detection has been fixed, improving performance for pipes with integer datetimes.
11+
12+
- **Fix recursive parameters references.**
13+
A single pipe may now reference itself when symlinking parameters.
14+
15+
- **Add `self` syntax for parameters symlinking.**
16+
Pipes may now use `self` in parameters rather than requiring the fully qualified name:
17+
18+
```python
19+
import meerschaum as mrsm
20+
21+
pipe = mrsm.Pipe(
22+
'sql:memory', 'symlink', 'self',
23+
instance='sql:memory',
24+
parameters={
25+
"custom": {
26+
"value": 123,
27+
},
28+
"sql": "SELECT * FROM foo WHERE val = {{ self.parameters['custom']['value'] }}",
29+
},
30+
)
31+
32+
print(pipe.parameters['sql'])
33+
# SELECT * FROM foo WHERE val = 123
34+
```
35+
736
### v3.1.2
837

938
- **Improve filtering logic for non-datetime pipes.**

docs/mkdocs/reference/pipes/parameters.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ This page catalogs useful keys in the `parameters` dictionary.
4848
'upsert': "{{ Pipe('demo', 'symlink', instance='sql:memory').upsert }}",
4949
'custom': "{{ Pipe('demo', 'symlink', instance='sql:memory').parameters['custom'] }}",
5050
'parent_pipe_id': "{{ Pipe('demo', 'symlink', instance='sql:memory').id }}",
51+
'value': 123,
52+
'self_value': "{{ self.parameters['value'] }}",
5153
},
5254
)
5355

@@ -67,6 +69,8 @@ This page catalogs useful keys in the `parameters` dictionary.
6769
# ]
6870
# },
6971
# "parent_pipe_id": 1
72+
# "value": 123,
73+
# "self_value": 123,
7074
# }
7175
```
7276

meerschaum/config/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
Specify the Meerschaum release version.
33
"""
44

5-
__version__ = "3.1.2"
5+
__version__ = "3.1.3"

meerschaum/connectors/sql/_sql.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,17 @@ def exec(
624624
warn(str(e), stacklevel=3)
625625
result = None
626626
if _commit:
627+
if debug:
628+
dprint(f"[{self}] Rolling back failed transaction...")
627629
transaction.rollback()
628630
connection = self.get_connection(rebuild=True)
629631
finally:
630632
if _close:
631633
connection.close()
632634

635+
if debug:
636+
dprint(f"[{self}] Done executing.")
637+
633638
if with_connection:
634639
return result, connection
635640

@@ -709,8 +714,14 @@ def exec_queries(
709714
elif debug:
710715
dprint(f"[{self}]\n" + str(msg))
711716
result = None
717+
718+
if debug:
719+
dprint(f"[{self}] Finished executing.")
720+
712721
if result is None and break_on_error:
713722
if rollback:
723+
if debug:
724+
dprint(f"[{self}] Rolling back...")
714725
session.rollback()
715726
results.append(result)
716727
break

meerschaum/core/Pipe/_attributes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def recursive_replace(obj: Any, path: tuple) -> Any:
118118
if isinstance(obj, list):
119119
return [recursive_replace(elem, path + (i,)) for i, elem in enumerate(obj)]
120120
if isinstance(obj, str):
121-
substituted_val = replace_pipes_syntax(obj)
121+
substituted_val = replace_pipes_syntax(obj, _pipe=self)
122122
if substituted_val != obj:
123123
self._symlinks[path] = {
124124
'original': obj,

meerschaum/utils/dtypes/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,12 @@ def are_dtypes_equal(
222222
'int', 'int64', 'int32', 'int16', 'int8',
223223
'uint', 'uint64', 'uint32', 'uint16', 'uint8',
224224
)
225+
int_substrings = ('int',)
225226
if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
226227
return True
228+
for substring in int_substrings:
229+
if substring in ldtype.lower() and substring in rdtype.lower():
230+
return True
227231

228232
float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
229233
if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:

meerschaum/utils/pipes.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
import meerschaum as mrsm
1919

2020

21-
def evaluate_pipe_access_chain(access_chain: str, pipe: mrsm.Pipe):
21+
def evaluate_pipe_access_chain(access_chain: str, pipe: mrsm.Pipe, _pipe=None):
2222
"""
2323
Safely evaluate the access chain on a Pipe.
2424
"""
2525
expr = f"pipe{access_chain}"
26+
if pipe == _pipe and access_chain.lstrip('.').startswith('parameters'):
27+
parameters_access_chain = access_chain.lstrip('.').split('parameters', maxsplit=1)[-1]
28+
expr = f"pipe.attributes['parameters']{parameters_access_chain}"
2629
tree = ast.parse(expr, mode='eval')
2730

2831
def _eval(node, context):
@@ -59,18 +62,22 @@ def _eval(node, context):
5962

6063

6164

62-
def _evaluate_pipe_access_chain_from_match(pipe_match: re.Match) -> Any:
65+
def _evaluate_pipe_access_chain_from_match(pipe_match: re.Match, _pipe=None) -> Any:
6366
"""
6467
Helper function to evaluate a pipe from a regex match object.
6568
"""
6669
from meerschaum.utils.warnings import warn
6770
from meerschaum.utils.misc import parse_arguments_str
6871
from meerschaum.utils.sql import sql_item_name
6972
try:
70-
args_str = pipe_match.group(1)
71-
access_chain = pipe_match.group(2)
72-
args, kwargs = parse_arguments_str(args_str)
73-
pipe = mrsm.Pipe(*args, **kwargs)
73+
if 'self' in pipe_match.group(0) and _pipe is not None:
74+
pipe = _pipe
75+
access_chain = pipe_match.group(1)
76+
else:
77+
args_str = pipe_match.group(1)
78+
access_chain = pipe_match.group(2)
79+
args, kwargs = parse_arguments_str(args_str)
80+
pipe = mrsm.Pipe(*args, **kwargs)
7481
except Exception as e:
7582
warn(f"Failed to parse pipe from template string:\n{e}")
7683
raise e
@@ -88,18 +95,22 @@ def _evaluate_pipe_access_chain_from_match(pipe_match: re.Match) -> Any:
8895
else pipe.target
8996
)
9097

91-
return evaluate_pipe_access_chain(access_chain, pipe)
98+
return evaluate_pipe_access_chain(access_chain, pipe, _pipe=_pipe)
9299

93100

94-
def replace_pipes_syntax(text: str) -> Any:
101+
def replace_pipes_syntax(text: str, _pipe=None) -> Any:
95102
"""
96103
Parse a string containing the `{{ Pipe() }}` syntax.
97104
"""
98105
from meerschaum.utils.warnings import warn
99106
from meerschaum.utils.dtypes import json_serialize_value
100107
pattern = r'\{\{\s*(?:mrsm\.)?Pipe\((.*?)\)((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}'
108+
self_pattern = r'\{\{\s*self((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}'
101109

102110
matches = list(re.finditer(pattern, text))
111+
if _pipe is not None:
112+
self_matches = list(re.finditer(self_pattern, text))
113+
matches.extend(self_matches)
103114
if not matches:
104115
return text
105116

@@ -115,9 +126,10 @@ def replace_pipes_syntax(text: str) -> Any:
115126
resolved_values = {}
116127
for placeholder, match in placeholders.items():
117128
try:
118-
resolved_values[placeholder] = _evaluate_pipe_access_chain_from_match(match)
129+
resolved_values[placeholder] = _evaluate_pipe_access_chain_from_match(match, _pipe=_pipe)
119130
except Exception as e:
120-
warn(f"Failed to resolve pipe syntax '{match.group(0)}': {e}")
131+
import traceback
132+
warn(f"Failed to resolve pipe syntax '{match.group(0)}': {traceback.format_exc()}")
121133
resolved_values[placeholder] = match.group(0)
122134

123135
if len(matches) == 1:

0 commit comments

Comments
 (0)