15
15
import json
16
16
from .utils import GatewayLogger
17
17
18
+
18
19
class CephUtils :
19
- """Miscellaneous functions which connect to Ceph
20
- """
20
+ """Miscellaneous functions which connect to Ceph"""
21
21
22
22
def __init__ (self , config ):
23
23
self .logger = GatewayLogger (config ).logger
24
- self .ceph_conf = config .get_with_default ("ceph" , "config_file" , "/etc/ceph/ceph.conf" )
24
+ self .ceph_conf = config .get_with_default (
25
+ "ceph" , "config_file" , "/etc/ceph/ceph.conf"
26
+ )
25
27
self .rados_id = config .get_with_default ("ceph" , "id" , "" )
26
28
self .anagroup_list = []
27
29
self .last_sent = time .time ()
28
30
29
31
def execute_ceph_monitor_command (self , cmd ):
30
- self .logger .debug (f"Execute monitor command: { cmd } " )
31
- with rados .Rados (conffile = self .ceph_conf , rados_id = self .rados_id ) as cluster :
32
- rply = cluster .mon_command (cmd , b'' )
32
+ self .logger .debug (f"Execute monitor command: { cmd } " )
33
+ with rados .Rados (conffile = self .ceph_conf , rados_id = self .rados_id ) as cluster :
34
+ rply = cluster .mon_command (cmd , b"" )
33
35
self .logger .debug (f"Monitor reply: { rply } " )
34
36
return rply
37
+
35
38
def get_gw_id_owner_ana_group (self , pool , group , anagrp ):
36
- str = '{' + f'"prefix":"nvme-gw show", "pool":"{ pool } ", "group":"{ group } "' + '}'
39
+ str = "{" + f'"prefix":"nvme-gw show", "pool":"{ pool } ", "group":"{ group } "' + "}"
37
40
self .logger .debug (f"nvme-show string: { str } " )
38
41
rply = self .execute_ceph_monitor_command (str )
39
- self .logger .debug (f" reply \ "{ rply } \" " )
42
+ self .logger .debug (f' reply "{ rply } "' )
40
43
conv_str = rply [1 ].decode ()
41
44
data = json .loads (conv_str )
42
45
@@ -45,31 +48,35 @@ def get_gw_id_owner_ana_group(self, pool, group, anagrp):
45
48
comp_str = f"{ anagrp } : ACTIVE"
46
49
for gateway in data ["Created Gateways:" ]:
47
50
if comp_str in gateway ["ana states" ]:
48
- gw_id = gateway ["gw-id" ]
49
- self .logger .debug (f"found gw owner of anagrp { anagrp } : gw { gw_id } " )
50
- break
51
+ gw_id = gateway ["gw-id" ]
52
+ self .logger .debug (f"found gw owner of anagrp { anagrp } : gw { gw_id } " )
53
+ break
51
54
return gw_id
52
55
53
56
def get_number_created_gateways (self , pool , group ):
54
57
now = time .time ()
55
- if (now - self .last_sent ) < 10 and self .anagroup_list :
56
- self .logger .info (f"Caching response of the monitor: { self .anagroup_list } " )
57
- return self .anagroup_list
58
- else :
58
+ if (now - self .last_sent ) < 10 and self .anagroup_list :
59
+ self .logger .info (f"Caching response of the monitor: { self .anagroup_list } " )
60
+ return self .anagroup_list
61
+ else :
59
62
try :
60
63
self .anagroup_list = []
61
64
self .last_sent = now
62
- str = '{' + f'"prefix":"nvme-gw show", "pool":"{ pool } ", "group":"{ group } "' + '}'
65
+ str = (
66
+ "{"
67
+ + f'"prefix":"nvme-gw show", "pool":"{ pool } ", "group":"{ group } "'
68
+ + "}"
69
+ )
63
70
self .logger .debug (f"nvme-show string: { str } " )
64
71
rply = self .execute_ceph_monitor_command (str )
65
- self .logger .debug (f" reply \ "{ rply } \" " )
72
+ self .logger .debug (f' reply "{ rply } "' )
66
73
conv_str = rply [1 ].decode ()
67
74
pos = conv_str .find ("[" )
68
75
if pos != - 1 :
69
76
new_str = conv_str [pos + len ("[" ) :]
70
- pos = new_str .find ("]" )
71
- new_str = new_str [: pos ].strip ()
72
- int_str_list = new_str .split (' ' )
77
+ pos = new_str .find ("]" )
78
+ new_str = new_str [:pos ].strip ()
79
+ int_str_list = new_str .split (" " )
73
80
self .logger .debug (f"new_str : { new_str } " )
74
81
for x in int_str_list :
75
82
self .anagroup_list .append (int (x ))
@@ -86,17 +93,27 @@ def get_number_created_gateways(self, pool, group):
86
93
def fetch_and_display_ceph_version (self ):
87
94
try :
88
95
rply = self .execute_ceph_monitor_command ('{"prefix":"mon versions"}' )
89
- ceph_ver = rply [1 ].decode ().removeprefix ("{" ).strip ().split (":" )[0 ].removeprefix ('"' ).removesuffix ('"' )
96
+ ceph_ver = (
97
+ rply [1 ]
98
+ .decode ()
99
+ .removeprefix ("{" )
100
+ .strip ()
101
+ .split (":" )[0 ]
102
+ .removeprefix ('"' )
103
+ .removesuffix ('"' )
104
+ )
90
105
ceph_ver = ceph_ver .removeprefix ("ceph version " )
91
- self .logger .info (f" Connected to Ceph with version \ "{ ceph_ver } \" " )
106
+ self .logger .info (f' Connected to Ceph with version "{ ceph_ver } "' )
92
107
except Exception :
93
108
self .logger .exception (f"Failure fetching Ceph version:" )
94
109
pass
95
110
96
111
def fetch_ceph_fsid (self ) -> str :
97
112
fsid = None
98
113
try :
99
- with rados .Rados (conffile = self .ceph_conf , rados_id = self .rados_id ) as cluster :
114
+ with rados .Rados (
115
+ conffile = self .ceph_conf , rados_id = self .rados_id
116
+ ) as cluster :
100
117
fsid = cluster .get_fsid ()
101
118
except Exception :
102
119
self .logger .exception (f"Failure fetching Ceph fsid:" )
@@ -105,7 +122,9 @@ def fetch_ceph_fsid(self) -> str:
105
122
106
123
def pool_exists (self , pool ) -> bool :
107
124
try :
108
- with rados .Rados (conffile = self .ceph_conf , rados_id = self .rados_id ) as cluster :
125
+ with rados .Rados (
126
+ conffile = self .ceph_conf , rados_id = self .rados_id
127
+ ) as cluster :
109
128
if cluster .pool_exists (pool ):
110
129
return True
111
130
except Exception :
@@ -116,8 +135,8 @@ def pool_exists(self, pool) -> bool:
116
135
117
136
def service_daemon_register (self , cluster , metadata ):
118
137
try :
119
- if cluster : # rados client
120
- daemon_name = metadata ['id' ]
138
+ if cluster : # rados client
139
+ daemon_name = metadata ["id" ]
121
140
cluster .service_daemon_register ("nvmeof" , daemon_name , metadata )
122
141
self .logger .info (f"Registered { daemon_name } to service_map!" )
123
142
except Exception :
@@ -128,46 +147,62 @@ def service_daemon_update(self, cluster, status_buffer):
128
147
if cluster and status_buffer :
129
148
cluster .service_daemon_update (status_buffer )
130
149
except Exception :
131
- self .logger .exception (f"Can't update daemon status to service_map!" )
150
+ self .logger .exception (f"Can't update daemon status to service_map!" )
132
151
133
152
def create_image (self , pool_name , image_name , size ) -> bool :
134
153
# Check for pool existence in advance as we don't create it if it's not there
135
154
if not self .pool_exists (pool_name ):
136
- raise rbd .ImageNotFound (f"Pool { pool_name } doesn't exist" , errno = errno .ENODEV )
155
+ raise rbd .ImageNotFound (
156
+ f"Pool { pool_name } doesn't exist" , errno = errno .ENODEV
157
+ )
137
158
138
159
image_exists = False
139
160
try :
140
161
image_size = self .get_image_size (pool_name , image_name )
141
162
image_exists = True
142
163
except rbd .ImageNotFound :
143
- self .logger .debug (f"Image { pool_name } /{ image_name } doesn't exist, will create it using size { size } " )
164
+ self .logger .debug (
165
+ f"Image { pool_name } /{ image_name } doesn't exist, will create it using size { size } "
166
+ )
144
167
pass
145
168
146
169
if image_exists :
147
170
if image_size != size :
148
- raise rbd .ImageExists (f"Image { pool_name } /{ image_name } already exists with a size of { image_size } bytes which differs from the requested size of { size } bytes" ,
149
- errno = errno .EEXIST )
150
- return False # Image exists with an idetical size, there is nothing to do here
171
+ raise rbd .ImageExists (
172
+ f"Image { pool_name } /{ image_name } already exists with a size of { image_size } bytes which differs from the requested size of { size } bytes" ,
173
+ errno = errno .EEXIST ,
174
+ )
175
+ return (
176
+ False # Image exists with an idetical size, there is nothing to do here
177
+ )
151
178
152
179
with rados .Rados (conffile = self .ceph_conf , rados_id = self .rados_id ) as cluster :
153
180
with cluster .open_ioctx (pool_name ) as ioctx :
154
181
rbd_inst = rbd .RBD ()
155
182
try :
156
183
rbd_inst .create (ioctx , image_name , size )
157
184
except rbd .ImageExists as ex :
158
- self .logger .exception (f"Image { pool_name } /{ image_name } was created just now" )
159
- raise rbd .ImageExists (f"Image { pool_name } /{ image_name } was just created by someone else, please retry" ,
160
- errno = errno .EAGAIN )
185
+ self .logger .exception (
186
+ f"Image { pool_name } /{ image_name } was created just now"
187
+ )
188
+ raise rbd .ImageExists (
189
+ f"Image { pool_name } /{ image_name } was just created by someone else, please retry" ,
190
+ errno = errno .EAGAIN ,
191
+ )
161
192
except Exception as ex :
162
- self .logger .exception (f"Can't create image { pool_name } /{ image_name } " )
193
+ self .logger .exception (
194
+ f"Can't create image { pool_name } /{ image_name } "
195
+ )
163
196
raise ex
164
197
165
198
return True
166
199
167
200
def get_image_size (self , pool_name , image_name ) -> int :
168
201
image_size = 0
169
202
if not self .pool_exists (pool_name ):
170
- raise rbd .ImageNotFound (f"Pool { pool_name } doesn't exist" , errno = errno .ENODEV )
203
+ raise rbd .ImageNotFound (
204
+ f"Pool { pool_name } doesn't exist" , errno = errno .ENODEV
205
+ )
171
206
172
207
with rados .Rados (conffile = self .ceph_conf , rados_id = self .rados_id ) as cluster :
173
208
with cluster .open_ioctx (pool_name ) as ioctx :
@@ -176,9 +211,14 @@ def get_image_size(self, pool_name, image_name) -> int:
176
211
with rbd .Image (ioctx , image_name ) as img :
177
212
image_size = img .size ()
178
213
except rbd .ImageNotFound :
179
- raise rbd .ImageNotFound (f"Image { pool_name } /{ image_name } doesn't exist" , errno = errno .ENODEV )
214
+ raise rbd .ImageNotFound (
215
+ f"Image { pool_name } /{ image_name } doesn't exist" ,
216
+ errno = errno .ENODEV ,
217
+ )
180
218
except Exception as ex :
181
- self .logger .exception (f"Error while trying to get the size of image { pool_name } /{ image_name } " )
219
+ self .logger .exception (
220
+ f"Error while trying to get the size of image { pool_name } /{ image_name } "
221
+ )
182
222
raise ex
183
223
184
224
return image_size
0 commit comments