|
| 1 | +import json |
| 2 | +import os |
| 3 | +from typing import Any, Dict, List, Optional |
| 4 | + |
| 5 | +import requests |
| 6 | +from requests_toolbelt.multipart.encoder import MultipartEncoder |
| 7 | + |
| 8 | +from roboflow.adapters.rfapi import RoboflowError |
| 9 | +from roboflow.config import API_URL |
| 10 | + |
| 11 | +_BASE = f"{API_URL}/vision-events" |
| 12 | + |
| 13 | + |
| 14 | +def _auth_headers(api_key: str) -> Dict[str, str]: |
| 15 | + return {"Authorization": f"Bearer {api_key}"} |
| 16 | + |
| 17 | + |
| 18 | +def write_event(api_key: str, event: Dict[str, Any]) -> dict: |
| 19 | + """Create a single vision event. |
| 20 | +
|
| 21 | + Args: |
| 22 | + api_key: Roboflow API key. |
| 23 | + event: Event payload dict (eventId, eventType, useCaseId, timestamp, etc.). |
| 24 | +
|
| 25 | + Returns: |
| 26 | + Parsed JSON response with ``eventId`` and ``created``. |
| 27 | +
|
| 28 | + Raises: |
| 29 | + RoboflowError: On non-201 response status codes. |
| 30 | + """ |
| 31 | + response = requests.post(_BASE, json=event, headers=_auth_headers(api_key)) |
| 32 | + if response.status_code != 201: |
| 33 | + raise RoboflowError(response.text) |
| 34 | + return response.json() |
| 35 | + |
| 36 | + |
| 37 | +def write_batch(api_key: str, events: List[Dict[str, Any]]) -> dict: |
| 38 | + """Create multiple vision events in a single request. |
| 39 | +
|
| 40 | + Args: |
| 41 | + api_key: Roboflow API key. |
| 42 | + events: List of event payload dicts (max 100 per the server). |
| 43 | +
|
| 44 | + Returns: |
| 45 | + Parsed JSON response with ``created`` count and ``eventIds``. |
| 46 | +
|
| 47 | + Raises: |
| 48 | + RoboflowError: On non-201 response status codes. |
| 49 | + """ |
| 50 | + response = requests.post( |
| 51 | + f"{_BASE}/batch", |
| 52 | + json={"events": events}, |
| 53 | + headers=_auth_headers(api_key), |
| 54 | + ) |
| 55 | + if response.status_code != 201: |
| 56 | + raise RoboflowError(response.text) |
| 57 | + return response.json() |
| 58 | + |
| 59 | + |
| 60 | +def query(api_key: str, query_params: Dict[str, Any]) -> dict: |
| 61 | + """Query vision events with filters and pagination. |
| 62 | +
|
| 63 | + Args: |
| 64 | + api_key: Roboflow API key. |
| 65 | + query_params: Query payload (useCaseId, eventType, startTime, endTime, |
| 66 | + cursor, limit, customMetadataFilters, etc.). |
| 67 | +
|
| 68 | + Returns: |
| 69 | + Parsed JSON response with ``events``, ``nextCursor``, ``hasMore``, |
| 70 | + and ``lookbackDays``. |
| 71 | +
|
| 72 | + Raises: |
| 73 | + RoboflowError: On non-200 response status codes. |
| 74 | + """ |
| 75 | + response = requests.post( |
| 76 | + f"{_BASE}/query", |
| 77 | + json=query_params, |
| 78 | + headers=_auth_headers(api_key), |
| 79 | + ) |
| 80 | + if response.status_code != 200: |
| 81 | + raise RoboflowError(response.text) |
| 82 | + return response.json() |
| 83 | + |
| 84 | + |
| 85 | +def list_use_cases(api_key: str, status: Optional[str] = None) -> dict: |
| 86 | + """List all use cases for a workspace. |
| 87 | +
|
| 88 | + Args: |
| 89 | + api_key: Roboflow API key. |
| 90 | + status: Optional status filter (default server-side: "active"). |
| 91 | +
|
| 92 | + Returns: |
| 93 | + Parsed JSON response with ``useCases`` list and ``lookbackDays``. |
| 94 | +
|
| 95 | + Raises: |
| 96 | + RoboflowError: On non-200 response status codes. |
| 97 | + """ |
| 98 | + params: Dict[str, str] = {} |
| 99 | + if status is not None: |
| 100 | + params["status"] = status |
| 101 | + response = requests.get( |
| 102 | + f"{_BASE}/use-cases", |
| 103 | + params=params, |
| 104 | + headers=_auth_headers(api_key), |
| 105 | + ) |
| 106 | + if response.status_code != 200: |
| 107 | + raise RoboflowError(response.text) |
| 108 | + return response.json() |
| 109 | + |
| 110 | + |
| 111 | +def upload_image( |
| 112 | + api_key: str, |
| 113 | + image_path: str, |
| 114 | + name: Optional[str] = None, |
| 115 | + metadata: Optional[Dict[str, Any]] = None, |
| 116 | +) -> dict: |
| 117 | + """Upload an image for use in vision events. |
| 118 | +
|
| 119 | + Args: |
| 120 | + api_key: Roboflow API key. |
| 121 | + image_path: Local filesystem path to the image file. |
| 122 | + name: Optional custom image name. |
| 123 | + metadata: Optional flat dict of metadata to attach. |
| 124 | +
|
| 125 | + Returns: |
| 126 | + Parsed JSON response with ``sourceId`` (and optionally ``url``). |
| 127 | +
|
| 128 | + Raises: |
| 129 | + RoboflowError: On non-201 response status codes. |
| 130 | + """ |
| 131 | + filename = name or os.path.basename(image_path) |
| 132 | + with open(image_path, "rb") as f: |
| 133 | + fields: Dict[str, Any] = { |
| 134 | + "file": (filename, f, "application/octet-stream"), |
| 135 | + } |
| 136 | + if name is not None: |
| 137 | + fields["name"] = name |
| 138 | + if metadata is not None: |
| 139 | + fields["metadata"] = json.dumps(metadata) |
| 140 | + m = MultipartEncoder(fields=fields) |
| 141 | + headers = _auth_headers(api_key) |
| 142 | + headers["Content-Type"] = m.content_type |
| 143 | + response = requests.post(f"{_BASE}/upload", data=m, headers=headers) |
| 144 | + |
| 145 | + if response.status_code != 201: |
| 146 | + raise RoboflowError(response.text) |
| 147 | + return response.json() |
0 commit comments