-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathshared_connection.py
More file actions
52 lines (40 loc) · 1.77 KB
/
Copy pathshared_connection.py
File metadata and controls
52 lines (40 loc) · 1.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""Easy integration: one shared, thread-safe link with espbridge.connect().
A board's link can't be opened twice, and a Bridge is already safe to share
across threads. So instead of threading a Bridge object through your code, call
espbridge.connect() anywhere — every call with the same settings returns the
SAME auto-reconnecting connection.
uv run basics/shared_connection.py
This runs several worker threads that all grab the shared link and hammer it
concurrently; their requests pipeline on the wire and stay correctly correlated.
"""
import threading
import time
import espbridge
def worker(n: int, stop: threading.Event) -> int:
# No Bridge passed in — just ask for the shared one. Same link as everyone.
esp = espbridge.connect(ble=False)
reads = 0
while not stop.is_set():
esp.adc.read(34) # read-only; safe to spam from many threads
esp.ping()
reads += 1
print(f"[worker {n}] {reads} round-trips")
return reads
def main() -> None:
esp = espbridge.connect(ble=False) # opens the link on first call
print(f"connected to {esp.info.chip.name} ({esp.info.name or esp.info.mac})")
esp.adc.config(34, atten=11)
stop = threading.Event()
threads = [threading.Thread(target=worker, args=(i, stop)) for i in range(4)]
for t in threads:
t.start()
time.sleep(3)
stop.set()
for t in threads:
t.join()
print("\nAll four threads shared ONE connection — no locks in your code, no\n"
"second link. In a web app you'd call espbridge.connect() in each route;\n"
"for asyncio: esp = espbridge.AsyncBridge.wrap(espbridge.connect(ble=False)).")
espbridge.disconnect_all() # close shared links at shutdown
if __name__ == "__main__":
main()