-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcelery_task.py
More file actions
101 lines (82 loc) · 2.77 KB
/
celery_task.py
File metadata and controls
101 lines (82 loc) · 2.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
import time
from datetime import timedelta
import redis
from billiard.exceptions import SoftTimeLimitExceeded
from celery import Celery
from celery.task import periodic_task
from celery.schedules import crontab, solar
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
app = Celery('celery_task', broker='redis://localhost:6379/0')
# My locations.
LATITUDE = 43.765204
LONGITUDE = -79.502520
# app.conf.beat_schedule = {
# # Executes at sunset in Melbourne
# 'add-at-toronto-sunset': {
# 'task': 'celery_tasks.add',
# 'schedule': solar('sunrise', LATITUDE, LONGITUDE),
# 'args': (16, 16),
# },
# }
@app.task(name='celery_tasks.add')
def add(x, y):
total = x + y
print(f'{x} + {y} = {total}')
time.sleep(10)
@app.task(name='celery_tasks.test_post')
def test_post(x, y):
total = x + y
print(f'{x} + {y} = {total}')
time.sleep(10)
return total
def backoff(attempts):
"""
1, 2, 4, 8, 16, 32, ...
:param attempts: Current number of attempts
:return: 2 ^ (n) where n is the number of attempts.
"""
return 2 ** attempts
#TODO: Test soft time limit.
# @app.task(bind=True, max_retries=20, soft_time_limit=1)
# def data_extractor(self):
# log.info(f"Running {self.name}.")
# try:
# for i in range(1, 11):
# print('Crawling HTML DOM!')
# if i == 5:
# raise ValueError('Crawling Index Error.')
# except SoftTimeLimitExceeded:
# print(f"Soft time limit exceeded for task: {self.name}.")
# except Exception as exc:
# print('There was an error let try after 5 seconds.')
# log.exception(exc)
# raise self.retry(exc=exc, countdown=backoff(self.request.retries))
# @periodic_task(bind=True, run_every=(crontab(minute='*/1')), ignore_result=True)
# def send_mail_queue(self):
# try:
# messages_sent = "example.email"
# print(f"Task: {self.name}")
# print(f"Total email message successfully sent {messages_sent}.")
# finally:
# print("release resources")
#
# key = '151361115230283ACB4F556778CBE87789100212620510281'
#
# @periodic_task(bind=True, run_every=timedelta(seconds=5), name='celery_task.send_mail_from_queue')
# def send_mail_from_queue(self):
# REDIS_CLIENT = redis.Redis()
# timeout = 60 * 5 # lock expires in 5 minutes
# have_lock = False
# my_lock = REDIS_CLIENT.lock(key, timeout=timeout)
# try:
# have_lock = my_lock.acquire(blocking=False)
# if have_lock:
# message = 'example.email'
# print(f'{self.request.hostname}: Email Sent successfully, [{message}]')
# time.sleep(10)
# finally:
# print('release resources')
# if have_lock:
# my_lock.release()