-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathfrequency_shift.py
More file actions
executable file
·90 lines (72 loc) · 2.25 KB
/
frequency_shift.py
File metadata and controls
executable file
·90 lines (72 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python3
"""Realtime best-effort frequency scaling for mono streamed data."""
import argparse
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent
LIB_ROOT = ROOT / "lib"
for _p in reversed((LIB_ROOT, ROOT)):
if str(_p) not in sys.path:
sys.path.insert(0, str(_p))
from lib.bootstrap import maybe_reexec_venv
maybe_reexec_venv(__file__)
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Scale frequencies in an MSIG1 float32 mono stream by a factor "
"(best-effort chunk-local algorithm)."
)
)
p.add_argument("factor", type=float, help="Frequency multiplier (> 0).")
p.add_argument(
"--chunk-bytes",
type=int,
default=16384,
help="Read size in bytes (default: 16384).",
)
p.add_argument(
"--raw",
action="store_true",
help="Read raw float32 input instead of MSIG1 header.",
)
p.add_argument(
"--rate",
type=float,
default=None,
help="Input sample rate Hz (required with --raw).",
)
return p
def main() -> int:
args = build_parser().parse_args()
from lib.dsp import ChunkFrequencyScaler
from lib.signal_stream import (
FloatSignalReader,
FloatSignalWriter,
StreamFormatError,
install_sigpipe_default,
)
install_sigpipe_default()
if args.factor <= 0:
print("error: factor must be > 0", file=sys.stderr)
return 2
if args.chunk_bytes < 8:
print("error: --chunk-bytes must be >= 8", file=sys.stderr)
return 2
try:
reader = FloatSignalReader.from_stdin(raw=args.raw, sample_rate=args.rate)
except EOFError:
return 0
except StreamFormatError as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
scaler = ChunkFrequencyScaler(factor=args.factor)
writer = FloatSignalWriter.to_stdout(sample_rate=reader.sample_rate, raw=False)
try:
for chunk in reader.iter_chunks(chunk_bytes=args.chunk_bytes):
writer.write(scaler.process(chunk))
writer.flush()
except BrokenPipeError:
return 0
return 0
if __name__ == "__main__":
raise SystemExit(main())