-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconcurrent.py
More file actions
70 lines (63 loc) · 2.42 KB
/
concurrent.py
File metadata and controls
70 lines (63 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Channel:
"""An implementation of a clojure-like channel.
https://clojure.org/guides/async_walkthrough
https://clojure.org/news/2013/06/28/clojure-clore-async-channels
Built on top of python queue.Queue, which uses a mutex with
a deque under the hood.
Args:
maxsize: the maximum number of items in the channel.
- 0 is inf buffer, 1 is unbuffered (rendezvous), >1 is a n-buffer.
mode: the mode of the channel
- default: blocking put, blocking get
- dropping: drop newest values when buffer is full
- sliding: drop oldest values when buffer is full
Methods:
put(item: Any) -> None:
Put an item into the channel (blocking).
get() -> Any:
Get an item from the channel (blocking).
drain() -> list[Any]:
Drain the channel, returning a list of items.
"""
# TODO: add a close sentinel?
def __init__(self, maxsize: int = 10, mode: str = "default"):
if mode not in ["default", "dropping", "sliding"]:
raise ValueError(f"Invalid mode: {mode}")
self._maxsize = maxsize
self._mode = mode
self._chan = queue.Queue(maxsize=maxsize)
self._lock = threading.Lock()
def put(self, item: Any) -> None:
# default: blocking put, blocking get
if self._mode == "default":
self._chan.put(item, block=True)
return None
# dropping: drop newest values when buffer is full
if self._mode == "dropping":
try:
self._chan.put(item, block=False)
except queue.Full:
pass
return None
# sliding: drop oldest values when buffer is full
if self._mode == "sliding":
with self._lock:
try:
self._chan.put(item, block=False)
except queue.Full:
try:
self._chan.get(block=False)
except queue.Empty:
pass
self._chan.put(item, block=False)
return None
def get(self) -> Any:
return self._chan.get(block=True)
def drain(self) -> list[Any]:
items = []
while True:
try:
items.append(self._chan.get(block=False))
except queue.Empty:
break
return items