Skip to content

Commit ad810b7

Browse files
committed
Fully featured job queue implementation
1 parent 30d22f9 commit ad810b7

1 file changed

Lines changed: 39 additions & 9 deletions

File tree

bugout/jobs.py

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,14 @@ def update_cursor(self, created_at: datetime):
9595
"context_type": self.cursor_context_type,
9696
"created_at": created_at.isoformat(),
9797
}
98-
requests.post(
99-
self.client.spire_api_url,
98+
request_url = f'{self.client.spire_api_url.rstrip("/")}/journals/{self.journal_id}/entries'
99+
r = requests.post(
100+
request_url,
100101
headers=headers,
101102
json=body,
102103
timeout=self.write_timeout,
103104
)
105+
r.raise_for_status()
104106

105107
def list_jobs(
106108
self,
@@ -178,6 +180,9 @@ def list_jobs(
178180
return results
179181

180182
def job_complete(self, job_id: str) -> None:
183+
"""
184+
Mark a job as successfully completed.
185+
"""
181186
self.client.update_tags(
182187
self.bugout_token,
183188
self.journal_id,
@@ -188,6 +193,9 @@ def job_complete(self, job_id: str) -> None:
188193
)
189194

190195
def job_failed(self, job_id: str) -> None:
196+
"""
197+
Mark a job as failed.
198+
"""
191199
self.client.update_tags(
192200
self.bugout_token,
193201
self.journal_id,
@@ -305,9 +313,7 @@ def queue_from_args(args: argparse.Namespace) -> BugoutJobQueue:
305313

306314
def handle_create_job(args: argparse.Namespace) -> None:
307315
queue = queue_from_args(args)
308-
if args.title is None:
309-
args.title = f"Job ID: {args.id}"
310-
queue.create_job(args.id, args.title, args.content)
316+
queue.create_job(args.context_id, args.title, args.content)
311317

312318

313319
def handle_list_jobs(args: argparse.Namespace) -> None:
@@ -326,6 +332,13 @@ def handle_fail_job(args: argparse.Namespace) -> None:
326332
queue.job_failed(args.job_id)
327333

328334

335+
def handle_update_cursor(args: argparse.Namespace) -> None:
336+
queue = queue_from_args(args)
337+
if args.time is None:
338+
args.time = datetime.utcnow()
339+
queue.update_cursor(created_at=args.time)
340+
341+
329342
def generate_cli() -> argparse.ArgumentParser:
330343
"""
331344
Generates the "bugout-py jobs" CLI.
@@ -339,12 +352,16 @@ def generate_cli() -> argparse.ArgumentParser:
339352

340353
create_job_parser = subparsers.add_parser("create-job", help="Create a job")
341354
add_queue_args(create_job_parser)
342-
create_job_parser.add_argument("--id", required=True, help="ID for job.")
343355
create_job_parser.add_argument(
344-
"--title",
356+
"--context-id",
345357
required=False,
346-
default=None,
347-
help="Title for job. If not provided, title is constructed from job ID.",
358+
default="",
359+
help="Context ID for job. This is not used by Bugout, but can provide useful context to queue consumers.",
360+
)
361+
create_job_parser.add_argument(
362+
"--title",
363+
required=True,
364+
help="Title for job.",
348365
)
349366
create_job_parser.add_argument(
350367
"--content", required=True, help="Job specification."
@@ -396,6 +413,19 @@ def generate_cli() -> argparse.ArgumentParser:
396413
)
397414
fail_job_parser.set_defaults(func=handle_fail_job)
398415

416+
update_cursor_parser = subparsers.add_parser(
417+
"update-cursor", help="Update the cursor in the job queue"
418+
)
419+
add_queue_args(update_cursor_parser)
420+
update_cursor_parser.add_argument(
421+
"--time",
422+
type=datetime.fromisoformat,
423+
required=False,
424+
default=None,
425+
help="Time to update the cursor to. If not provided, uses the current system time on the client.",
426+
)
427+
update_cursor_parser.set_defaults(func=handle_update_cursor)
428+
399429
return parser
400430

401431

0 commit comments

Comments
 (0)