-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsubscriptions.py
143 lines (109 loc) · 4.43 KB
/
subscriptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import asyncio
from .janus import Queue
from threading import Lock
class Subscription:
"""
Base class for defining the relationship between streams. For this subscription type, the subscriber gets an
instance of the other stream. Subscription classes are shared between a channel and a subscriber
A subscriber stream (or just subscriber) is a stream that is subscribed to another stream.
A channel stream (or just channel) is a stream that posts content to its subscribers if it has any
"""
def __init__(self, tag, producer_stream, service="default", enabled=True):
"""
:param tag: Name that this subscription will be under
:param producer_stream: An instance of DataStream
:param enabled: Disable or enable this subscription
"""
assert type(tag) == str, "tag must be a string: %s" % tag
self.tag = tag
self.producer_stream = producer_stream
self.enabled = enabled
self.consumer_stream = None
self.service = service
self.description = "subscribing to" # for debug printing
self.is_async = False
def set_consumer(self, subscriber):
self.consumer_stream = subscriber
def get_stream(self):
return self.producer_stream
def get_feed(self):
return None
async def async_post(self, data):
"""
Behavior for posting to a feed based subscription
:param data: Data to post to subscribers. When posting lists, make sure to copy them if you intend to modify
its contents in the subscriber's stream
"""
await asyncio.sleep(0.0)
def sync_post(self, data):
pass
def __repr__(self):
return "%s(tag='%s', producer_stream=%s, consumer_stream=%s)" % (
self.__class__.__name__, self.tag, self.producer_stream, self.consumer_stream)
class Feed(Subscription):
"""
This subscription type is a queue. Any data posted gets queued up in the feed
"""
def __init__(self, tag, producer_stream, service="default", enabled=True):
super(Feed, self).__init__(tag, producer_stream, service, enabled)
self.queue = None
self.description = "receiving feed from" # for debug printing
def set_consumer(self, consumer_stream):
self.consumer_stream = consumer_stream
self.queue = Queue(loop=consumer_stream.asyncio_loop)
def get_feed(self):
if self.is_async:
return self.queue.async_q
else:
return self.queue.sync_q
async def async_post(self, data):
await self.queue.async_q.put(data)
def sync_post(self, data):
self.queue.sync_q.put(data)
class Update(Subscription):
"""
This subscription type is like a mailbox. The subscriber gets the latest data and only the latest data
"""
def __init__(self, tag, producer_stream, service="default", enabled=True):
super(Update, self).__init__(tag, producer_stream, service, enabled)
self.mailbox = _SingletonQueue()
self.description = "receiving updates from" # for debug printing
async def async_post(self, data):
"""
Behavior for posting to an update based subscription
:param data: Data to post to subscribers. When posting lists, make sure to copy them if you intend to modify
its contents in the subscriber's stream
"""
self.mailbox.put(data)
await asyncio.sleep(0.0)
def sync_post(self, data):
self.mailbox.put(data)
def get_feed(self):
return self.mailbox
class Callback(Subscription):
def __init__(self, tag, producer_stream, service="default", enabled=True):
super(Callback, self).__init__(tag, producer_stream, service, enabled)
self.callback_fn = None
def set_callback(self, callback_fn):
self.callback_fn = callback_fn
assert callable(self.callback_fn)
async def async_post(self, data):
await self.callback_fn(data)
def sync_post(self, data):
self.callback_fn(data)
class _SingletonQueue:
"""
A queue that has a size of one. This acts a supplement to the Update class
"""
def __init__(self):
self.queue = [None]
self.lock = Lock()
def empty(self):
return self.queue[0] is None
def put(self, data):
with self.lock:
self.queue[0] = data
def get(self):
data = self.queue[0]
self.queue[0] = None
return data