-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
108 lines (98 loc) · 4.12 KB
/
test.py
File metadata and controls
108 lines (98 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from Util.ParseData import SignalData
from Util.feature_extract import reconstruct_signal,filter_signal
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from warnings import filterwarnings
filterwarnings('ignore')
from pymongo import MongoClient
import urllib.parse
client = MongoClient(f"mongodb://localhost:27017/")
# "mongodb://myUserAdmin:password123@localhost:27017/?authSource=example_database"
# 选择数据库(如果不存在则会自动创建)
db = client['raw_data']
# 选择集合(类似于表,如果不存在则会自动创建)
collection = db['raw_collection']
fs = 1e9
all_data_df = pd.DataFrame(list(collection.find({})))
all_data_df.set_index("_id",inplace=True)
all_data_df['signal'] = all_data_df['signal'].apply(lambda x: np.frombuffer(x,dtype=np.float32))
all_data_df = all_data_df[1:]
def process_signal(signal,fs,threshold):
signal = signal.copy()
signal = filter_signal(signal, fs)
signal = reconstruct_signal(signal,threshold=threshold)
signal /= np.max(np.abs(signal))
return signal
all_data_df['processed_signal'] = all_data_df.apply(lambda x: process_signal(x['signal'],fs,threshold=0.1),axis=1)
from scipy.signal import find_peaks
def get_peaks(signal,height=0.25,distance=1000):
peaks, _ = find_peaks(signal, height=height, distance=distance)
return peaks[:7]
all_data_df['peaks'] = all_data_df['processed_signal'].apply(get_peaks)
def segmant_signal(signal,peaks,window_size=1024):
signal_list = []
half_window = int(window_size // 2)
# if len(peaks) > 0:
# peaks[0] = half_window
try:
for peak in peaks:
peak_signal = np.zeros(window_size)
left = max(0,peak - 490)
right = peak + half_window
peak_signal_slice = signal[left:right]
peak_signal[:len(peak_signal_slice)] = peak_signal_slice
signal_list.append(peak_signal)
except Exception as e:
print(peaks)
raise e
return signal_list
all_data_df['segmented_signal_list'] = all_data_df.apply(lambda x: segmant_signal(x['processed_signal'],x['peaks']),axis=1)
all_data_df['segmented_signal_num'] = all_data_df['segmented_signal_list'].apply(len)
def cacl_fft(signal, fs):
signal = signal.copy()
signal = process_signal(signal, fs, 0.3)
fft_result = np.fft.fft(signal)
# Calculate the amplitude spectrum
amplitude_spectrum = np.abs(fft_result)
# transform to dB
# amplitude_spectrum = 20 * np.log10(amplitude_spectrum)
# Calculate the phase spectrum
# phase_spectrum = np.angle(fft_result)
# Create the frequency axis
freq_axis = np.fft.fftfreq(len(signal), d=1 / fs)
# cut off the negative frequency
freq_axis_cut = np.where((freq_axis >= 0) & (freq_axis <= 2e7))
freq_axis = freq_axis[freq_axis_cut]
amplitude_spectrum = amplitude_spectrum[freq_axis_cut]
return freq_axis, amplitude_spectrum
def get_fft_list(signal_list, fs):
fft_list = []
for signal in signal_list:
freq, amp = cacl_fft(signal, fs)
fft_list.append(amp)
return fft_list
all_data_df['fft_list'] = all_data_df['segmented_signal_list'].apply(lambda x: get_fft_list(x,fs))
from Util.feature_extract import caclulate_cwt
from PIL import Image
# from torchvision.transforms import Resize
import torch
def get_cwt_array(signal_list,fs,size=(256,256)):
"""
Calculate the CWT of the signal list
return the cwt array with shape (num_signals,num_frequencies,num_samples)
"""
signal_array = np.stack(signal_list)
cwt,freqs = caclulate_cwt(signal_array,fs,1e6,2e7,1024)
cwt = np.abs(cwt).transpose(1,0,2)
# 对1,2维度进行矩阵缩放,使其符合size
cwt = np.stack([np.array(Image.fromarray(cwt[i]).resize(size)) for i in range(cwt.shape[0])])
cwt = cwt / (np.max(cwt) + 1e-8)
return cwt.astype(np.float32)
# cwt_tensor = torch.tensor(cwt,dtype=torch.float32)
# resize = Resize(size)
# return resize(cwt_tensor).numpy().astype(np.float32)
def compute_cwt_array(signal_list):
return get_cwt_array(signal_list, fs)
all_data_df['cwt_array'] = all_data_df['segmented_signal_list'].apply(compute_cwt_array)