Skip to content

Commit d31d8bc

Browse files
author
Neeraj Kashyap
committed
Started working on Bugout job queue implementation
1 parent ceb752a commit d31d8bc

1 file changed

Lines changed: 134 additions & 0 deletions

File tree

bugout/jobs.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
"""
2+
This module makes it easy to use a Bugout journal as a job queue. It is inspired by the Bugout Segment
3+
integration: https://github.com/bugout-dev/thorax
4+
"""
5+
6+
from datetime import datetime
7+
import requests
8+
from uuid import UUID
9+
10+
from .app import Bugout
11+
from .data import AuthType, BugoutSearchResult
12+
from .journal import SearchOrder
13+
from .settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, REQUESTS_TIMEOUT
14+
15+
16+
class BugoutJobQueue:
17+
"""
18+
This class implements a job queue in a Bugout journal.
19+
20+
This job queue uses:
21+
1. context_type to represent jobs in the queue.
22+
2. context_id to represent the id of each job. Bugout journals do not allow duplication of (context_type, context_id),
23+
so this allows for deduplcation of jobs by context_id.
24+
3. A special tag that is used to denote whether a job has been completed successfully.
25+
4. A special tag that is used to denote whether a job failed.
26+
5. A context_type for entries that represent cursor entries for the job queue.
27+
"""
28+
29+
def __init__(
30+
self,
31+
bugout_token: str,
32+
journal_id: UUID,
33+
context_type: str = "job",
34+
success_tag: str = "job:success",
35+
failure_tag: str = "job:failure",
36+
cursor_context_type: str = "job_cursor",
37+
brood_api_url: str = BUGOUT_BROOD_URL,
38+
spire_api_url: str = BUGOUT_SPIRE_URL,
39+
write_timeout: float = REQUESTS_TIMEOUT,
40+
auth_type: AuthType = AuthType.bearer,
41+
) -> None:
42+
self.bugout_token = bugout_token
43+
self.journal_id = journal_id
44+
self.context_type = context_type
45+
self.success_tag = success_tag
46+
self.failure_tag = failure_tag
47+
self.cursor_context_type = cursor_context_type
48+
self.client = Bugout(brood_api_url, spire_api_url)
49+
self.write_timeout = write_timeout
50+
self.auth_type = auth_type
51+
52+
def create_job(self, job_id: str, job_title: str, job_content: str) -> None:
53+
"""
54+
Create a job in the jobs journal.
55+
"""
56+
self.client.create_entry(
57+
self.bugout_token,
58+
self.journal_id,
59+
job_title,
60+
job_content,
61+
context_id=job_id,
62+
context_type=self.context_type,
63+
timeout=self.write_timeout,
64+
)
65+
66+
def update_cursor(self, created_at: datetime):
67+
"""
68+
Update the position of the cursor in the journal to the given "created_at" time.
69+
70+
This is done by simply creating a new entry representing the cursor position.
71+
"""
72+
cursor_tag = f"cursor:{self.cursor_context_type}"
73+
headers = {"Authorization": f"{self.auth_type.value} {self.bugout_token}"}
74+
body = {
75+
"title": cursor_tag,
76+
"content": "",
77+
"tags": [cursor_tag],
78+
"context_type": self.cursor_context_type,
79+
"created_at": created_at.isoformat(),
80+
}
81+
requests.post(
82+
self.client.spire_api_url,
83+
headers=headers,
84+
json=body,
85+
timeout=self.write_timeout,
86+
)
87+
88+
def remaining_jobs(
89+
self,
90+
use_cursor: bool = True,
91+
limit: int = 10,
92+
offset: int = 0,
93+
) -> list[BugoutSearchResult]:
94+
"""
95+
List all remaining jobs. These are jobs that have neither been marked as complete nor as failed.
96+
If the use_cursor argument is True, this only returns jobs since the most recent cursor. If it is
97+
False, remaining_jobs returns all incomplete and unfailed jobs since the beginning of time.
98+
99+
Jobs are returned in chronological order.
100+
"""
101+
query_components: list[str] = [
102+
f"context_type:{self.context_type}",
103+
f"!tag:{self.success_tag}",
104+
f"!tag:{self.failure_tag}",
105+
]
106+
if use_cursor:
107+
cursor_results = self.client.search(
108+
self.bugout_token,
109+
self.journal_id,
110+
f"context_type:{self.cursor_context_type}",
111+
limit=1,
112+
content=False,
113+
order=SearchOrder.DESCENDING,
114+
auth_type=self.auth_type,
115+
)
116+
if cursor_results.results:
117+
cursor = cursor_results.results[0]
118+
119+
created_at = cursor.created_at.replace(" ", "T")
120+
query_components.append(f"created_at:>{created_at}")
121+
122+
query = " ".join(query_components)
123+
job_results = self.client.search(
124+
self.bugout_token,
125+
self.journal_id,
126+
query,
127+
limit=limit,
128+
offset=offset,
129+
content=True,
130+
order=SearchOrder.ASCENDING,
131+
auth_type=self.auth_type,
132+
)
133+
134+
return job_results.results

0 commit comments

Comments
 (0)