Skip to content

Commit 82c7d38

Browse files
committed
✨ feat: celery 异步框架测试运行。
celery 不支持在 windows 上调用 async 函数。所有要部署在 linux 上,部署待测试。
1 parent c823cc2 commit 82c7d38

File tree

6 files changed

+130
-0
lines changed

6 files changed

+130
-0
lines changed

celery-client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from celery_server.tasks import asyncRun
2+
3+
# r = test.delay(1,2)
4+
# r2 = test.delay(1,2)
5+
6+
r = asyncRun.delay("13809213237")
7+
print(r.get())

celery_server/README.MD

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Celery 异步服务器后端模块
2+
3+
## 部署
4+
```shell
5+
pip install celery gevent -i https://pypi.doubanio.com/simple/
6+
celery -A celery_server worker -l info --pool=eventlet
7+
```
8+
需要在 celery 5.0 中才能使用 async

celery_server/__init__.py

Whitespace-only changes.

celery_server/celery.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from celery import Celery
2+
from celery.utils.log import get_task_logger
3+
4+
app = Celery(
5+
'celery_server',
6+
include=[
7+
'celery_server.tasks'
8+
]
9+
)
10+
app.config_from_object(
11+
'celery_server.config',
12+
)
13+
14+
logger = get_task_logger(__name__)
15+
16+
if __name__ == '__main__':
17+
app.start()

celery_server/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#broker(消息中间件来接收和发送任务消息)
2+
BROKER_URL = 'redis://localhost:6379/1'
3+
#backend(存储worker执行的结果)
4+
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
5+
6+
#设置时间参照,不设置默认使用的UTC时间
7+
CELERY_TIMEZONE = 'Asia/Shanghai'
8+
#指定任务的序列化
9+
CELERY_TASK_SERIALIZER='json'
10+
#指定执行结果的序列化
11+
CELERY_RESULT_SERIALIZER='json'

celery_server/utils.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# encoding=utf8
2+
# 请求的方法
3+
from smsboom import load_getapi, load_json
4+
from utils.log import logger
5+
from utils.models import API
6+
from utils import default_header
7+
import httpx
8+
from httpx import Limits
9+
from typing import Union, List
10+
import asyncio
11+
12+
import sys
13+
sys.path.append("E:\coding\SMSBoom")
14+
15+
16+
def reqAPI(api: API, client: httpx.AsyncClient):
17+
if isinstance(api.data, dict):
18+
resp = client.request(method=api.method, json=api.data,
19+
headers=api.header, url=api.url, timeout=10)
20+
else:
21+
resp = client.request(method=api.method, data=api.data,
22+
headers=api.header, url=api.url, timeout=10)
23+
return resp
24+
25+
26+
async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore):
27+
"""异步请求方法
28+
:param:
29+
:return:
30+
"""
31+
# 多手机号支持
32+
if isinstance(phone, tuple):
33+
phone_lst = [_ for _ in phone]
34+
else:
35+
phone_lst = [phone]
36+
async with semaphore:
37+
async with httpx.AsyncClient(
38+
limits=Limits(max_connections=1000,
39+
max_keepalive_connections=2000),
40+
headers=default_header,
41+
verify=False,
42+
timeout=99999
43+
) as c:
44+
45+
for ph in phone_lst:
46+
try:
47+
if isinstance(src, API):
48+
src = src.handle_API(ph)
49+
r = await reqAPI(src, c)
50+
else:
51+
# 利用元组传参安全因为元组不可修改
52+
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace(
53+
"&", "").replace('\n', '').replace('\r', ''),)
54+
r = await c.get(*s)
55+
return r
56+
except httpx.HTTPError as why:
57+
# logger.error(f"异步请求失败{type(why)}")
58+
pass
59+
except TypeError:
60+
# logger.error("类型错误")
61+
pass
62+
except Exception as wy:
63+
# logger.exception(f"异步失败{wy}")
64+
pass
65+
66+
def callback(result):
67+
"""异步回调函数"""
68+
log = result.result()
69+
if log is not None:
70+
# logger.info(f"请求结果:{log.text[:30]}")
71+
print(log.text[:30])
72+
pass
73+
74+
75+
async def runAsync(apis: List[Union[API, str]], phone: Union[tuple, str]):
76+
77+
tasks = []
78+
79+
for api in apis:
80+
semaphore = asyncio.Semaphore(999999)
81+
task = asyncio.create_task(asyncReqs(api, phone, semaphore))
82+
task.add_done_callback(callback)
83+
tasks.append(task)
84+
85+
await asyncio.gather(
86+
*tasks
87+
)

0 commit comments

Comments
 (0)