Skip to content

Commit 647197a

Browse files
committed
Basic functionality of job queue
(and availability from command line for non-cursor-related functionality).
1 parent d31d8bc commit 647197a

3 files changed

Lines changed: 263 additions & 11 deletions

File tree

bugout/__main__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import textwrap
33

44
from .app import Bugout
5+
from .jobs import generate_cli as generate_jobs_cli
56

67

78
def get_methods_list(args: argparse.Namespace) -> None:
@@ -27,6 +28,13 @@ def main() -> None:
2728
)
2829
parser_common.set_defaults(func=get_methods_list)
2930

31+
parser_jobs = generate_jobs_cli()
32+
subcommands.add_parser(
33+
"jobs",
34+
parents=[parser_jobs],
35+
add_help=False,
36+
)
37+
3038
args = parser.parse_args()
3139
args.func(args)
3240

bugout/data.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,7 @@ class BugoutHumbugIntegration(BaseModel):
245245

246246
class BugoutHumbugIntegrationsList(BaseModel):
247247
integrations: List[BugoutHumbugIntegration] = Field(default_factory=list)
248+
249+
250+
class BugoutSearchResultWithEntryID(BugoutSearchResult):
251+
id: str

bugout/jobs.py

Lines changed: 251 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,26 @@
33
integration: https://github.com/bugout-dev/thorax
44
"""
55

6+
import argparse
67
from datetime import datetime
8+
import json
9+
import os
710
import requests
11+
from typing import Callable, Optional
812
from uuid import UUID
913

1014
from .app import Bugout
11-
from .data import AuthType, BugoutSearchResult
15+
from .data import AuthType, BugoutSearchResultWithEntryID
1216
from .journal import SearchOrder
1317
from .settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, REQUESTS_TIMEOUT
1418

1519

20+
DEFAULT_CONTEXT_TYPE = "job"
21+
DEFAULT_SUCCESS_TAG = "job:success"
22+
DEFAULT_FAILURE_TAG = "job:failure"
23+
DEFAULT_CURSOR_CONTEXT_TYPE = "job_cursor"
24+
25+
1626
class BugoutJobQueue:
1727
"""
1828
This class implements a job queue in a Bugout journal.
@@ -29,15 +39,15 @@ class BugoutJobQueue:
2939
def __init__(
3040
self,
3141
bugout_token: str,
32-
journal_id: UUID,
33-
context_type: str = "job",
34-
success_tag: str = "job:success",
35-
failure_tag: str = "job:failure",
36-
cursor_context_type: str = "job_cursor",
42+
journal_id: str,
43+
context_type: str = DEFAULT_CONTEXT_TYPE,
44+
success_tag: str = DEFAULT_SUCCESS_TAG,
45+
failure_tag: str = DEFAULT_FAILURE_TAG,
46+
cursor_context_type: str = DEFAULT_CURSOR_CONTEXT_TYPE,
3747
brood_api_url: str = BUGOUT_BROOD_URL,
3848
spire_api_url: str = BUGOUT_SPIRE_URL,
3949
write_timeout: float = REQUESTS_TIMEOUT,
40-
auth_type: AuthType = AuthType.bearer,
50+
auth_type: str = AuthType.bearer.name,
4151
) -> None:
4252
self.bugout_token = bugout_token
4353
self.journal_id = journal_id
@@ -49,7 +59,7 @@ def __init__(
4959
self.write_timeout = write_timeout
5060
self.auth_type = auth_type
5161

52-
def create_job(self, job_id: str, job_title: str, job_content: str) -> None:
62+
def create_job(self, context_id: str, job_title: str, job_content: str) -> None:
5363
"""
5464
Create a job in the jobs journal.
5565
"""
@@ -58,7 +68,8 @@ def create_job(self, job_id: str, job_title: str, job_content: str) -> None:
5868
self.journal_id,
5969
job_title,
6070
job_content,
61-
context_id=job_id,
71+
tags=[self.context_type, f"{self.context_type}:{context_id}"],
72+
context_id=context_id,
6273
context_type=self.context_type,
6374
timeout=self.write_timeout,
6475
)
@@ -90,7 +101,7 @@ def remaining_jobs(
90101
use_cursor: bool = True,
91102
limit: int = 10,
92103
offset: int = 0,
93-
) -> list[BugoutSearchResult]:
104+
) -> list[BugoutSearchResultWithEntryID]:
94105
"""
95106
List all remaining jobs. These are jobs that have neither been marked as complete nor as failed.
96107
If the use_cursor argument is True, this only returns jobs since the most recent cursor. If it is
@@ -131,4 +142,233 @@ def remaining_jobs(
131142
auth_type=self.auth_type,
132143
)
133144

134-
return job_results.results
145+
results = [
146+
BugoutSearchResultWithEntryID(
147+
**dict(raw_result), id=raw_result.entry_url.split("/")[-1]
148+
)
149+
for raw_result in job_results.results
150+
]
151+
return results
152+
153+
def job_complete(self, job_id: str) -> None:
154+
self.client.update_tags(
155+
self.bugout_token,
156+
self.journal_id,
157+
job_id,
158+
tags=[self.success_tag],
159+
timeout=self.write_timeout,
160+
auth_type=self.auth_type,
161+
)
162+
163+
def job_failed(self, job_id: str) -> None:
164+
self.client.update_tags(
165+
self.bugout_token,
166+
self.journal_id,
167+
job_id,
168+
tags=[self.failure_tag],
169+
timeout=self.write_timeout,
170+
auth_type=self.auth_type,
171+
)
172+
173+
174+
def value_or_environment_variable(
175+
environment_variable: str, error_if_none: bool
176+
) -> Callable[[Optional[str]], Optional[str]]:
177+
def type_fn(raw: Optional[str]) -> Optional[str]:
178+
final = raw
179+
if not raw:
180+
final = os.environ.get(environment_variable)
181+
182+
if not final and error_if_none:
183+
raise ValueError(f"{environment_variable} not set")
184+
185+
return final
186+
187+
return type_fn
188+
189+
190+
def add_queue_args(parser: argparse.ArgumentParser) -> None:
191+
"""
192+
Mutates the given argument parser by adding common arguments needed to instantiate a job queue for
193+
all commands in the jobs CLI.
194+
"""
195+
parser.add_argument(
196+
"-t",
197+
"--token",
198+
required=False,
199+
default="",
200+
type=value_or_environment_variable("BUGOUT_JOBS_ACCESS_TOKEN", True),
201+
help="An access token for the Bugout API. If this is not provided, the BUGOUT_JOBS_ACCESS_TOKEN environment variable is used.",
202+
)
203+
parser.add_argument(
204+
"-j",
205+
"--journal",
206+
required=False,
207+
default="",
208+
type=value_or_environment_variable("BUGOUT_JOBS_JOURNAL_ID", True),
209+
help="An access token for the Bugout API. If this is not provided, the BUGOUT_JOBS_JOURNAL_ID environment variable is used.",
210+
)
211+
parser.add_argument(
212+
"--context-type",
213+
required=False,
214+
default=DEFAULT_CONTEXT_TYPE,
215+
help="Context type by which to identify jobs in the journal.",
216+
)
217+
parser.add_argument(
218+
"--success-tag",
219+
required=False,
220+
default=DEFAULT_SUCCESS_TAG,
221+
help="Success tag to attach to successfully completed jobs in the journal.",
222+
)
223+
parser.add_argument(
224+
"--failure-tag",
225+
required=False,
226+
default=DEFAULT_FAILURE_TAG,
227+
help="Failure tag to attach to unsuccessful jobs in the journal.",
228+
)
229+
parser.add_argument(
230+
"--cursor-context-type",
231+
required=False,
232+
default=DEFAULT_CURSOR_CONTEXT_TYPE,
233+
help="Context type by which to identify the cursor in the journal.",
234+
)
235+
parser.add_argument(
236+
"--brood-api-url",
237+
required=False,
238+
default=BUGOUT_BROOD_URL,
239+
help="Brood API URI.",
240+
)
241+
parser.add_argument(
242+
"--spire-api-url",
243+
required=False,
244+
default=BUGOUT_SPIRE_URL,
245+
help="Spire API URI.",
246+
)
247+
parser.add_argument(
248+
"--write-timeout",
249+
required=False,
250+
type=float,
251+
default=30.0,
252+
help="Timeout for writing jobs and cursors to the job journal.",
253+
)
254+
parser.add_argument(
255+
"--auth-type",
256+
required=False,
257+
type=str,
258+
default=AuthType.bearer.name,
259+
choices=[AuthType.bearer.name, AuthType.web3.name],
260+
help="Type of token that you are using to authenticate to the job journal.",
261+
)
262+
263+
264+
def queue_from_args(args: argparse.Namespace) -> BugoutJobQueue:
265+
return BugoutJobQueue(
266+
args.token,
267+
args.journal,
268+
args.context_type,
269+
args.success_tag,
270+
args.failure_tag,
271+
args.cursor_context_type,
272+
args.brood_api_url,
273+
args.spire_api_url,
274+
args.write_timeout,
275+
args.auth_type,
276+
)
277+
278+
279+
def handle_create_job(args: argparse.Namespace) -> None:
280+
queue = queue_from_args(args)
281+
if args.title is None:
282+
args.title = f"Job ID: {args.id}"
283+
queue.create_job(args.id, args.title, args.content)
284+
285+
286+
def handle_remaining_jobs(args: argparse.Namespace) -> None:
287+
queue = queue_from_args(args)
288+
remaining_jobs = queue.remaining_jobs(args.use_cursor, args.limit, args.offset)
289+
print(json.dumps([json.loads(job.json()) for job in remaining_jobs]))
290+
291+
292+
def handle_complete_job(args: argparse.Namespace) -> None:
293+
queue = queue_from_args(args)
294+
queue.job_complete(args.job_id)
295+
296+
297+
def handle_fail_job(args: argparse.Namespace) -> None:
298+
queue = queue_from_args(args)
299+
queue.job_failed(args.job_id)
300+
301+
302+
def generate_cli() -> argparse.ArgumentParser:
303+
"""
304+
Generates the "bugout-py jobs" CLI.
305+
"""
306+
parser = argparse.ArgumentParser(
307+
description="bugout-py jobs: A command-line tool to manage jobs using a Bugout journal"
308+
)
309+
parser.set_defaults(func=lambda _: parser.print_help())
310+
311+
subparsers = parser.add_subparsers()
312+
313+
create_job_parser = subparsers.add_parser("create-job", help="Create a job")
314+
add_queue_args(create_job_parser)
315+
create_job_parser.add_argument("--id", required=True, help="ID for job.")
316+
create_job_parser.add_argument(
317+
"--title",
318+
required=False,
319+
default=None,
320+
help="Title for job. If not provided, title is constructed from job ID.",
321+
)
322+
create_job_parser.add_argument(
323+
"--content", required=True, help="Job specification."
324+
)
325+
create_job_parser.set_defaults(func=handle_create_job)
326+
327+
remaining_jobs_parser = subparsers.add_parser(
328+
"remaining-jobs", help="View remaining jobs in queue (FIFO order)"
329+
)
330+
add_queue_args(remaining_jobs_parser)
331+
remaining_jobs_parser.add_argument(
332+
"-c",
333+
"--use-cursor",
334+
action="store_true",
335+
help="Set this flag if you want to only view remaining jobs created after the most recent cursor position",
336+
)
337+
remaining_jobs_parser.add_argument(
338+
"--limit", type=int, default=10, help="Number of remaining jobs to view"
339+
)
340+
remaining_jobs_parser.add_argument(
341+
"--offset",
342+
type=int,
343+
default=0,
344+
help="Offset from which to page through remaining jobs",
345+
)
346+
remaining_jobs_parser.set_defaults(func=handle_remaining_jobs)
347+
348+
complete_job_parser = subparsers.add_parser(
349+
"complete-job", help="Mark a job as complete"
350+
)
351+
add_queue_args(complete_job_parser)
352+
complete_job_parser.add_argument(
353+
"-i", "--job-id", required=True, help="ID of job to mark as complete."
354+
)
355+
complete_job_parser.set_defaults(func=handle_complete_job)
356+
357+
fail_job_parser = subparsers.add_parser("fail-job", help="Mark a job as failed")
358+
add_queue_args(fail_job_parser)
359+
fail_job_parser.add_argument(
360+
"-i", "--job-id", required=True, help="ID of job to mark as failed."
361+
)
362+
fail_job_parser.set_defaults(func=handle_fail_job)
363+
364+
return parser
365+
366+
367+
def main() -> None:
368+
parser = generate_cli()
369+
args = parser.parse_args()
370+
args.func(args)
371+
372+
373+
if __name__ == "__main__":
374+
main()

0 commit comments

Comments
 (0)