-
Notifications
You must be signed in to change notification settings - Fork 27
Open
Description
When sharing large tensor (e.g. audio data with a duration of approx. 1h = 16000 * 60 * 60 = 57600000 values) leads to The process X for MyHandler handler is dead. It is followed up with Flow was stopped message.
Code sample to reproduce:
class AudioLoaderHandler(BaseTaskHandler):
def __init__(self):
self.loader = None
def on_start(self):
self.loader = AudioLoader()
aq_logger.info(f"{self.__class__.__name__} initialized.")
def handle(self, *tasks: BaseTask):
try:
for task in tasks:
wav_tensor, sr = self.loader(task.audio_bytes) # smth like torchaudio.load(...)
task.audio_tensor, task.sample_rate = wav, sr
except Exception as e:
aq_logger.exception(e)
msg = f"Error during {self.__class__.__name__} execution: {type(e).__name__} - {e}"
for task in tasks:
task.error_msg = msg
for task in tasks:
self.dispose(task)
def dispose(self, task: PrepareChunksTask):
del task.audio_bytesBecause serialization behavior is better defined for numpy arrays, it is better to use them to pass data between steps. So the final solution is to convert all the tensors to be shared to numpy.array().
Fixed code for handle() method:
def handle(self, *tasks: BaseTask):
try:
for task in tasks:
wav_tensor, sr = self.loader(task.audio_bytes)
task.audio_tensor, task.sample_rate = wav.numpy(), sr # convert to numpy() here
except Exception as e:
aq_logger.exception(e)
msg = f"Error during {self.__class__.__name__} execution: {type(e).__name__} - {e}"
for task in tasks:
task.error_msg = msg
for task in tasks:
self.dispose(task)Metadata
Metadata
Assignees
Labels
No labels