2
2
import imaplib
3
3
import datetime
4
4
from collections import UserString
5
- from typing import AnyStr , Optional , List , Iterable , Sequence , Union , Tuple , Iterator
5
+ from typing import Optional , List , Iterable , Sequence , Union , Tuple , Iterator
6
6
7
7
from .message import MailMessage
8
8
from .folder import MailBoxFolderManager
9
9
from .idle import IdleManager
10
10
from .consts import UID_PATTERN , PYTHON_VERSION_MINOR
11
11
from .utils import clean_uids , check_command_status , chunks , encode_folder , clean_flags , check_timeout_arg_support , \
12
- chunks_crop
12
+ chunks_crop , StrOrBytes
13
13
from .errors import MailboxStarttlsError , MailboxLoginError , MailboxLogoutError , MailboxNumbersError , \
14
14
MailboxFetchError , MailboxExpungeError , MailboxDeleteError , MailboxCopyError , MailboxFlagError , \
15
15
MailboxAppendError , MailboxUidsError , MailboxTaggedResponseError
18
18
# 20Mb is enough for search response with about 2 000 000 message numbers
19
19
imaplib ._MAXLINE = 20 * 1024 * 1024 # 20Mb
20
20
21
- Criteria = Union [AnyStr , UserString ]
21
+ Criteria = Union [StrOrBytes , UserString ]
22
22
23
23
24
24
class BaseMailBox :
@@ -80,7 +80,7 @@ def login_utf8(self, username: str, password: str, initial_folder: Optional[str]
80
80
81
81
def xoauth2 (self , username : str , access_token : str , initial_folder : Optional [str ] = 'INBOX' ) -> 'BaseMailBox' :
82
82
"""Authenticate to account using OAuth 2.0 mechanism"""
83
- auth_string = 'user={}\1 auth=Bearer {}\1 \1 ' . format ( username , access_token )
83
+ auth_string = f 'user={ username } \1 auth=Bearer { access_token } \1 \1 '
84
84
result = self .client .authenticate ('XOAUTH2' , lambda x : auth_string ) # noqa
85
85
check_command_status (result , MailboxLoginError )
86
86
if initial_folder is not None :
@@ -133,7 +133,7 @@ def uids(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII',
133
133
encoded_criteria = criteria if type (criteria ) is bytes else str (criteria ).encode (charset )
134
134
if sort :
135
135
sort = (sort ,) if isinstance (sort , str ) else sort
136
- uid_result = self .client .uid ('SORT' , '({})' . format ( ' ' .join (sort )) , charset , encoded_criteria )
136
+ uid_result = self .client .uid ('SORT' , f '({ " " .join (sort )} )' , charset , encoded_criteria )
137
137
else :
138
138
uid_result = self .client .uid ('SEARCH' , 'CHARSET' , charset , encoded_criteria ) # *charset are opt here
139
139
check_command_status (uid_result , MailboxUidsError )
@@ -187,8 +187,8 @@ def fetch(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII', limit: Op
187
187
:param sort: criteria for sort messages on server, use SortCriteria constants. Charset arg is important for sort
188
188
:return generator: MailMessage
189
189
"""
190
- message_parts = "(BODY{}[{}] UID FLAGS RFC822.SIZE)" . format (
191
- '' if mark_seen else '.PEEK' , 'HEADER' if headers_only else '' )
190
+ message_parts = \
191
+ f"(BODY { '' if mark_seen else '.PEEK' } [ { 'HEADER' if headers_only else '' } ] UID FLAGS RFC822.SIZE)"
192
192
limit_range = slice (0 , limit ) if type (limit ) is int else limit or slice (None )
193
193
assert type (limit_range ) is slice
194
194
uids = tuple ((reversed if reverse else iter )(self .uids (criteria , charset , sort )))[limit_range ]
@@ -218,7 +218,7 @@ def delete(self, uid_list: Union[str, Iterable[str]]) -> Optional[Tuple[tuple, t
218
218
expunge_result = self .expunge ()
219
219
return store_result , expunge_result
220
220
221
- def copy (self , uid_list : Union [str , Iterable [str ]], destination_folder : AnyStr ) -> Optional [tuple ]:
221
+ def copy (self , uid_list : Union [str , Iterable [str ]], destination_folder : StrOrBytes ) -> Optional [tuple ]:
222
222
"""
223
223
Copy email messages into the specified folder
224
224
Do nothing on empty uid_list
@@ -231,7 +231,7 @@ def copy(self, uid_list: Union[str, Iterable[str]], destination_folder: AnyStr)
231
231
check_command_status (copy_result , MailboxCopyError )
232
232
return copy_result
233
233
234
- def move (self , uid_list : Union [str , Iterable [str ]], destination_folder : AnyStr ) -> Optional [Tuple [tuple , tuple ]]:
234
+ def move (self , uid_list : Union [str , Iterable [str ]], destination_folder : StrOrBytes ) -> Optional [Tuple [tuple , tuple ]]:
235
235
"""
236
236
Move email messages into the specified folder
237
237
Do nothing on empty uid_list
@@ -256,14 +256,13 @@ def flag(self, uid_list: Union[str, Iterable[str]], flag_set: Union[str, Iterabl
256
256
if not uid_str :
257
257
return None
258
258
store_result = self .client .uid (
259
- 'STORE' , uid_str , ('+' if value else '-' ) + 'FLAGS' ,
260
- '({})' .format (' ' .join (clean_flags (flag_set ))))
259
+ 'STORE' , uid_str , ('+' if value else '-' ) + 'FLAGS' , f'({ " " .join (clean_flags (flag_set ))} )' )
261
260
check_command_status (store_result , MailboxFlagError )
262
261
expunge_result = self .expunge ()
263
262
return store_result , expunge_result
264
263
265
264
def append (self , message : Union [MailMessage , bytes ],
266
- folder : AnyStr = 'INBOX' ,
265
+ folder : StrOrBytes = 'INBOX' ,
267
266
dt : Optional [datetime .datetime ] = None ,
268
267
flag_set : Optional [Union [str , Iterable [str ]]] = None ) -> tuple :
269
268
"""
@@ -281,7 +280,7 @@ def append(self, message: Union[MailMessage, bytes],
281
280
cleaned_flags = clean_flags (flag_set or [])
282
281
typ , dat = self .client .append (
283
282
encode_folder (folder ), # noqa
284
- '({})' . format ( ' ' .join (cleaned_flags )) if cleaned_flags else None ,
283
+ f '({ " " .join (cleaned_flags )} )' if cleaned_flags else None ,
285
284
dt or datetime .datetime .now (timezone ), # noqa
286
285
message if type (message ) is bytes else message .obj .as_bytes ()
287
286
)
@@ -339,10 +338,10 @@ def __init__(self, host='', port=993, timeout=None, keyfile=None, certfile=None,
339
338
340
339
def _get_mailbox_client (self ) -> imaplib .IMAP4 :
341
340
if PYTHON_VERSION_MINOR < 9 :
342
- return imaplib .IMAP4_SSL (self ._host , self ._port , self ._keyfile , self ._certfile , self ._ssl_context )
341
+ return imaplib .IMAP4_SSL (self ._host , self ._port , self ._keyfile , self ._certfile , self ._ssl_context ) # noqa
343
342
elif PYTHON_VERSION_MINOR < 12 :
344
343
return imaplib .IMAP4_SSL (
345
- self ._host , self ._port , self ._keyfile , self ._certfile , self ._ssl_context , self ._timeout )
344
+ self ._host , self ._port , self ._keyfile , self ._certfile , self ._ssl_context , self ._timeout ) # noqa
346
345
else :
347
346
return imaplib .IMAP4_SSL (self ._host , self ._port , ssl_context = self ._ssl_context , timeout = self ._timeout )
348
347
0 commit comments