Skip to content

Commit 87c0597

Browse files
Refactor / all_as_schedule crontab query optimization (#879)
* refactor crontab optimization logic and tests * add tests * remove prints * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * run pre commit * all_as_schedule test * test import problem * fix long lines * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * pre commit * test * remove pytz import * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove zoneinfo import * refactor flaky test * pre commit * use Zoneinfo in _get_timezone_offset * refactor use Zoneinfo in _get_timezone_offset * use now utils function as server time * use timezone.localtime with an aware time as server time * fix import * fix comment * better include hours implementation * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * pre commit * add SCHEDULE_SYNC_MAX_INTERVAL for force updating schedule every 5 minutes at most * aware_now utility function * refactor the test to use aware_now utility function * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add the test by @Azurency * pre commit test file * add tests for utils * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove extra import * remove extra import * use SCHEDULE_SYNC_MAX_INTERVAL in clocked query * refactor mock aware now --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 676a5b7 commit 87c0597

File tree

4 files changed

+551
-86
lines changed

4 files changed

+551
-86
lines changed

django_celery_beat/schedulers.py

Lines changed: 142 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@
44
import math
55
from multiprocessing.util import Finalize
66

7+
try:
8+
from zoneinfo import ZoneInfo # Python 3.9+
9+
except ImportError:
10+
from backports.zoneinfo import ZoneInfo # Python 3.8
11+
712
from celery import current_app, schedules
813
from celery.beat import ScheduleEntry, Scheduler
914
from celery.utils.log import get_logger
1015
from celery.utils.time import maybe_make_aware
1116
from django.conf import settings
1217
from django.core.exceptions import ObjectDoesNotExist
1318
from django.db import close_old_connections, transaction
14-
from django.db.models import Q
19+
from django.db.models import Case, F, IntegerField, Q, When
20+
from django.db.models.functions import Cast
1521
from django.db.utils import DatabaseError, InterfaceError
1622
from django.utils import timezone
1723
from kombu.utils.encoding import safe_repr, safe_str
@@ -20,12 +26,13 @@
2026
from .clockedschedule import clocked
2127
from .models import (ClockedSchedule, CrontabSchedule, IntervalSchedule,
2228
PeriodicTask, PeriodicTasks, SolarSchedule)
23-
from .utils import NEVER_CHECK_TIMEOUT, now
29+
from .utils import NEVER_CHECK_TIMEOUT, aware_now, now
2430

2531
# This scheduler must wake up more frequently than the
2632
# regular of 5 minutes because it needs to take external
2733
# changes to the schedule into account.
2834
DEFAULT_MAX_INTERVAL = 5 # seconds
35+
SCHEDULE_SYNC_MAX_INTERVAL = 300 # 5 minutes
2936

3037
ADD_ENTRY_ERROR = """\
3138
Cannot add entry %r to database schedule: %r. Contents: %r
@@ -238,6 +245,7 @@ class DatabaseScheduler(Scheduler):
238245
_last_timestamp = None
239246
_initial_read = True
240247
_heap_invalidated = False
248+
_last_full_sync = None
241249

242250
def __init__(self, *args, **kwargs):
243251
"""Initialize the database scheduler."""
@@ -256,23 +264,128 @@ def setup_schedule(self):
256264
def all_as_schedule(self):
257265
debug('DatabaseScheduler: Fetching database schedule')
258266
s = {}
259-
next_five_minutes = now() + datetime.timedelta(minutes=5)
260-
exclude_clock_tasks_query = Q(
261-
clocked__isnull=False, clocked__clocked_time__gt=next_five_minutes
267+
next_schedule_sync = now() + datetime.timedelta(
268+
seconds=SCHEDULE_SYNC_MAX_INTERVAL
262269
)
263-
exclude_hours = self.get_excluded_hours_for_crontab_tasks()
264-
exclude_cron_tasks_query = Q(
265-
crontab__isnull=False, crontab__hour__in=exclude_hours
270+
exclude_clock_tasks_query = Q(
271+
clocked__isnull=False,
272+
clocked__clocked_time__gt=next_schedule_sync
266273
)
267-
for model in self.Model.objects.enabled().exclude(
268-
exclude_clock_tasks_query | exclude_cron_tasks_query
269-
):
274+
275+
exclude_cron_tasks_query = self._get_crontab_exclude_query()
276+
277+
# Combine the queries for optimal database filtering
278+
exclude_query = exclude_clock_tasks_query | exclude_cron_tasks_query
279+
280+
# Fetch only the tasks we need to consider
281+
for model in self.Model.objects.enabled().exclude(exclude_query):
270282
try:
271283
s[model.name] = self.Entry(model, app=self.app)
272284
except ValueError:
273285
pass
274286
return s
275287

288+
def _get_crontab_exclude_query(self):
289+
"""
290+
Build a query to exclude crontab tasks based on their hour value,
291+
adjusted for timezone differences relative to the server.
292+
293+
This creates an annotation for each crontab task that represents the
294+
server-equivalent hour, then filters on that annotation.
295+
"""
296+
# Get server time based on Django settings
297+
298+
server_time = aware_now()
299+
server_hour = server_time.hour
300+
301+
# Window of +/- 2 hours around the current hour in server tz.
302+
hours_to_include = [
303+
(server_hour + offset) % 24 for offset in range(-2, 3)
304+
]
305+
hours_to_include += [4] # celery's default cleanup task
306+
307+
# Regex pattern to match only numbers
308+
# This ensures we only process numeric hour values
309+
numeric_hour_pattern = r'^\d+$'
310+
311+
# Get all tasks with a simple numeric hour value
312+
numeric_hour_tasks = CrontabSchedule.objects.filter(
313+
hour__regex=numeric_hour_pattern
314+
)
315+
316+
# Annotate these tasks with their server-hour equivalent
317+
annotated_tasks = numeric_hour_tasks.annotate(
318+
# Cast hour string to integer
319+
hour_int=Cast('hour', IntegerField()),
320+
321+
# Calculate server-hour based on timezone offset
322+
server_hour=Case(
323+
# Handle each timezone specifically
324+
*[
325+
When(
326+
timezone=timezone_name,
327+
then=(
328+
F('hour_int')
329+
+ self._get_timezone_offset(timezone_name)
330+
+ 24
331+
) % 24
332+
)
333+
for timezone_name in self._get_unique_timezone_names()
334+
],
335+
# Default case - use hour as is
336+
default=F('hour_int')
337+
)
338+
)
339+
340+
excluded_hour_task_ids = annotated_tasks.exclude(
341+
server_hour__in=hours_to_include
342+
).values_list('id', flat=True)
343+
344+
# Build the final exclude query:
345+
# Exclude crontab tasks that are not in our include list
346+
exclude_query = Q(crontab__isnull=False) & Q(
347+
crontab__id__in=excluded_hour_task_ids
348+
)
349+
350+
return exclude_query
351+
352+
def _get_unique_timezone_names(self):
353+
"""Get a list of all unique timezone names used in CrontabSchedule"""
354+
return CrontabSchedule.objects.values_list(
355+
'timezone', flat=True
356+
).distinct()
357+
358+
def _get_timezone_offset(self, timezone_name):
359+
"""
360+
Args:
361+
timezone_name: The name of the timezone or a ZoneInfo object
362+
363+
Returns:
364+
int: The hour offset
365+
"""
366+
# Get server timezone
367+
server_tz = timezone.get_current_timezone()
368+
369+
if isinstance(timezone_name, ZoneInfo):
370+
timezone_name = timezone_name.key
371+
372+
target_tz = ZoneInfo(timezone_name)
373+
374+
# Use a fixed point in time for the calculation to avoid DST issues
375+
fixed_dt = datetime.datetime(2023, 1, 1, 12, 0, 0)
376+
377+
# Calculate the offset
378+
dt1 = fixed_dt.replace(tzinfo=server_tz)
379+
dt2 = fixed_dt.replace(tzinfo=target_tz)
380+
381+
# Calculate hour difference
382+
offset_seconds = (
383+
dt1.utcoffset().total_seconds() - dt2.utcoffset().total_seconds()
384+
)
385+
offset_hours = int(offset_seconds / 3600)
386+
387+
return offset_hours
388+
276389
def schedule_changed(self):
277390
try:
278391
close_old_connections()
@@ -372,13 +485,31 @@ def schedules_equal(self, *args, **kwargs):
372485
@property
373486
def schedule(self):
374487
initial = update = False
488+
current_time = datetime.datetime.now()
489+
375490
if self._initial_read:
376491
debug('DatabaseScheduler: initial read')
377492
initial = update = True
378493
self._initial_read = False
494+
self._last_full_sync = current_time
379495
elif self.schedule_changed():
380496
info('DatabaseScheduler: Schedule changed.')
381497
update = True
498+
self._last_full_sync = current_time
499+
500+
# Force update the schedule if it's been more than 5 minutes
501+
if not update:
502+
time_since_last_sync = (
503+
current_time - self._last_full_sync
504+
).total_seconds()
505+
if (
506+
time_since_last_sync >= SCHEDULE_SYNC_MAX_INTERVAL
507+
):
508+
debug(
509+
'DatabaseScheduler: Forcing full sync after 5 minutes'
510+
)
511+
update = True
512+
self._last_full_sync = current_time
382513

383514
if update:
384515
self.sync()
@@ -392,32 +523,3 @@ def schedule(self):
392523
repr(entry) for entry in self._schedule.values()),
393524
)
394525
return self._schedule
395-
396-
@staticmethod
397-
def get_excluded_hours_for_crontab_tasks():
398-
# Generate the full list of allowed hours for crontabs
399-
allowed_crontab_hours = [
400-
f"{hour:02}" for hour in range(24)
401-
] + [
402-
str(hour) for hour in range(10)
403-
]
404-
405-
# Get current, next, and previous hours
406-
current_time = timezone.localtime(now())
407-
current_hour = current_time.hour
408-
next_hour = (current_hour + 1) % 24
409-
previous_hour = (current_hour - 1) % 24
410-
411-
# Create a set of hours to remove (both padded and non-padded versions)
412-
hours_to_remove = {
413-
f"{current_hour:02}", str(current_hour),
414-
f"{next_hour:02}", str(next_hour),
415-
f"{previous_hour:02}", str(previous_hour),
416-
str(4), "04", # celery's default cleanup task
417-
}
418-
419-
# Filter out 'should be considered' hours
420-
return [
421-
hour for hour in allowed_crontab_hours
422-
if hour not in hours_to_remove
423-
]

django_celery_beat/utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
"""Utilities."""
2+
import datetime
23
# -- XXX This module must not use translation as that causes
34
# -- a recursive loader import!
45
from datetime import timezone as datetime_timezone
56

7+
try:
8+
from zoneinfo import ZoneInfo # Python 3.9+
9+
except ImportError:
10+
from backports.zoneinfo import ZoneInfo # Python 3.8
11+
612
from django.conf import settings
713
from django.utils import timezone
814

@@ -37,6 +43,16 @@ def now():
3743
return timezone.now()
3844

3945

46+
def aware_now():
47+
if getattr(settings, 'USE_TZ', True):
48+
# When USE_TZ is True, return timezone.now()
49+
return timezone.now()
50+
else:
51+
# When USE_TZ is False, use the project's timezone
52+
project_tz = ZoneInfo(getattr(settings, 'TIME_ZONE', 'UTC'))
53+
return datetime.datetime.now(project_tz)
54+
55+
4056
def is_database_scheduler(scheduler):
4157
"""Return true if Celery is configured to use the db scheduler."""
4258
if not scheduler:

0 commit comments

Comments
 (0)