@@ -35,6 +35,7 @@ def __init__(
35
35
36
36
self ._control_callback = on_control_callback
37
37
self ._termination_callback = None
38
+ self ._drain_callback = None
38
39
39
40
async def explicit_ack (self , qualified_offset ):
40
41
"""
@@ -51,6 +52,16 @@ async def explicit_ack(self, qualified_offset):
51
52
"Cannot send explicit ack since control callback was not initialized"
52
53
)
53
54
55
+ def set_drain_callback (self , callback ):
56
+ """
57
+ Register a callback to be called when the platform is draining (rebalance happening in stream).
58
+ If already registered, the callback will be replaced.
59
+ When called, the callback will be called with zero arguments.
60
+
61
+ :param callback: the callback to call when terminating
62
+ """
63
+ self ._drain_callback = callback
64
+
54
65
def set_termination_callback (self , callback ):
55
66
"""
56
67
Register a callback to be called when the platform is terminating.
@@ -64,7 +75,6 @@ def set_termination_callback(self, callback):
64
75
def call_function (
65
76
self , function_name , event , node = None , timeout = None , service_name_override = None
66
77
):
67
-
68
78
# get connection from provider
69
79
connection = self ._connection_provider (
70
80
self ._get_function_url (function_name , service_name_override ),
@@ -106,7 +116,7 @@ def call_function(
106
116
response_headers = {}
107
117
108
118
# get response headers as lowercase
109
- for ( name , value ) in response .getheaders ():
119
+ for name , value in response .getheaders ():
110
120
response_headers [name .lower ()] = value
111
121
112
122
# if content type exists, use it
@@ -124,7 +134,6 @@ def call_function(
124
134
)
125
135
126
136
def _get_function_url (self , function_name , service_name_override = None ):
127
-
128
137
# local envs prefix namespace
129
138
if self .kind == "local" :
130
139
service_name = service_name_override or "nuclio-{0}-{1}" .format (
@@ -134,10 +143,14 @@ def _get_function_url(self, function_name, service_name_override=None):
134
143
service_name = service_name_override or "nuclio-{0}" .format (function_name )
135
144
return "{0}:8080" .format (service_name )
136
145
137
- def _on_signal (self ):
146
+ def _on_signal (self , callback_type = "termination" ):
138
147
"""
139
- When a signal is received, call the termination callback as a hook before exiting
148
+ When a signal is received, call the termination/drain callback as a hook before exiting
140
149
If not set, the callback will be a no-op
150
+
151
+ :arg callback_type:str - callback type, can be "termination" or "drain"
141
152
"""
142
- if self ._termination_callback :
153
+ if callback_type == "termination" and self ._termination_callback :
143
154
self ._termination_callback ()
155
+ elif callback_type == "drain" and self ._drain_callback :
156
+ self ._drain_callback ()
0 commit comments