【emergency!!!】occur error(TypeError: cannot serialize '_io.TextIOWrapper' object) in KubernetesExecutor #12979
Unanswered
zhangxiao696
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
when set dag use KubernetesExecutor with:
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
'executor_config': {
'KubernetesExecutor': {
'request_cpu': "200m",
'limit_cpu': "200m",
'request_memory': "500Mi",
'limit_memory': "500Mi"
}
}
},
)
the program raise exception:
[2020-12-10 10:02:08,142] {scheduler_job.py:1384} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1382, in _execute
self._execute_helper()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1453, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1515, in _validate_and_run_task_instances
self.executor.heartbeat()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 155, in trigger_tasks
executor_config=simple_ti.executor_config)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 803, in execute_async
self.task_queue.put((key, command, kube_executor_config))
File "", line 2, in put
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/managers.py", line 756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
But, if I remove executor_config in default_args of Dag, the program is working!!!
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
)
why???????
please help me!!!!
Beta Was this translation helpful? Give feedback.
All reactions