diff --git a/.github/workflows/build-docs.yml b/.github/workflows/build-docs.yml
index 5d47f29..ec464ab 100644
--- a/.github/workflows/build-docs.yml
+++ b/.github/workflows/build-docs.yml
@@ -1,6 +1,12 @@
name: Build Docs
-on: [push, pull_request]
+on:
+ push:
+ branches:
+ - master
+ pull_request:
+ branches:
+ - master
jobs:
deploy:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2fd7cd5..04a87a4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.5.0
+* Support for WebSocket authorization *(Thanks to @SelfhostedPro for make issues)*
+* Function **get_raw_jwt()** can pass parameter encoded_token
+
## 0.4.0
* Support set and unset cookies when returning a **Response** directly
@@ -22,7 +26,7 @@
* Custom error message key and status code
* JWT in cookies *(Thanks to @m4nuC for make issues)*
* Add Additional claims
-* Add Documentation *(#9 by @paulussimanjuntak)*
+* Add Documentation PR #9 by @paulussimanjuntak
## 0.2.0
diff --git a/README.md b/README.md
index 2f6a4b6..a7735e6 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,7 @@ FastAPI extension that provides JWT Auth support (secure, easy to use and lightw
- Access tokens and refresh tokens
- Freshness Tokens
- Revoking Tokens
+- Support for WebSocket authorization
- Support for adding custom claims to JSON Web Tokens
- Storing tokens in cookies and CSRF protection
diff --git a/docs/advanced-usage/websocket.md b/docs/advanced-usage/websocket.md
new file mode 100644
index 0000000..8008fe6
--- /dev/null
+++ b/docs/advanced-usage/websocket.md
@@ -0,0 +1,52 @@
+The WebSocket protocol doesn’t handle authorization or authentication. Practically, this means that a WebSocket opened from a page behind auth doesn’t "automatically" receive any sort of auth. You need to take steps to also secure the WebSocket connection.
+
+Since you cannot customize WebSocket headers from JavaScript, you’re limited to the "implicit" auth (i.e. Basic or cookies) that’s sent from the browser. The more common approach to generates a token from your normal HTTP server and then have the client send the token (either as a query string in the WebSocket path or as the first WebSocket message). The WebSocket server then validates that the token is valid.
+
+**Note**: *Change all IP address to your localhost*
+
+Here is an example of how you authorize from query URL:
+```python hl_lines="42-52 65-66 71 73"
+{!../examples/websocket.py!}
+```
+You will see a simple page like this:
+
+
+
+
+
+You can copy the token from endpoint **/login** and then send them:
+
+
+
+
+
+And your WebSocket route will respond back if the token is valid or not:
+
+
+
+
+
+
+Here is an example of how you authorize from cookie:
+```python hl_lines="30-47 60-61 66 68"
+{!../examples/websocket_cookie.py!}
+```
+
+You will see a simple page like this:
+
+
+
+
+
+You can get the token from URL **/get-cookie**:
+
+
+
+
+
+And click button send then your WebSocket route will respond back if the
+cookie and csrf token is match or cookie is valid or not:
+
+
+
+
diff --git a/docs/api-doc.md b/docs/api-doc.md
index fd0a60d..2f2acd6 100644
--- a/docs/api-doc.md
+++ b/docs/api-doc.md
@@ -18,11 +18,20 @@ In here you will find the API for everything exposed in this extension.
### Protected Endpoint
-**jwt_required**()
+**jwt_required**(auth_from="request", token=None, websocket=None, csrf_token=None)
: *If you call this function, it will ensure that the requester has a valid access token before
executing the code below your router. This does not check the freshness of the access token.*
-**jwt_optional**()
+ * Parameters:
+ * **auth_from**: For identity get token from HTTP or WebSocket
+ * **token**: The encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ * **websocket**: An instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ * **csrf_token**: The CSRF double submit token. Since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ * Returns: None
+
+**jwt_optional**(auth_from="request", token=None, websocket=None, csrf_token=None)
: *If an access token present in the request, this will call the endpoint with `get_jwt_identity()`
having the identity of the access token. If no access token is present in the request, this endpoint
will still be called, but `get_jwt_identity()` will return None instead.*
@@ -30,14 +39,41 @@ In here you will find the API for everything exposed in this extension.
*If there is an invalid access token in the request (expired, tampered with, etc),
this will still call the appropriate error handler.*
-**jwt_refresh_token_required**()
+ * Parameters:
+ * **auth_from**: For identity get token from HTTP or WebSocket
+ * **token**: The encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ * **websocket**: An instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ * **csrf_token**: The CSRF double submit token. Since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ * Returns: None
+
+**jwt_refresh_token_required**(auth_from="request", token=None, websocket=None, csrf_token=None)
: *If you call this function, it will ensure that the requester has a valid refresh token before
executing the code below your router.*
-**fresh_jwt_required**()
+ * Parameters:
+ * **auth_from**: For identity get token from HTTP or WebSocket
+ * **token**: The encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ * **websocket**: An instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ * **csrf_token**: The CSRF double submit token. Since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ * Returns: None
+
+**fresh_jwt_required**(auth_from="request", token=None, websocket=None, csrf_token=None)
: *If you call this function, it will ensure that the requester has a valid and fresh access token before
executing the code below your router.*
+ * Parameters:
+ * **auth_from**: For identity get token from HTTP or WebSocket
+ * **token**: The encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ * **websocket**: An instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ * **csrf_token**: The CSRF double submit token. Since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ * Returns: None
+
### Utilities
**create_access_token**(subject, fresh=False, algorithm=None, headers=None, expires_time=None, audience=None, user_claims={})
@@ -106,10 +142,14 @@ In here you will find the API for everything exposed in this extension.
* **response**: The FastAPI response object to delete the refresh cookies in
* Returns: None
-**get_raw_jwt**()
+**get_raw_jwt**(encoded_token=None)
: *This will return the python dictionary which has all of the claims of the JWT that is accessing the endpoint.
If no JWT is currently present, return `None` instead.*
+ * Parameters:
+ * **encoded_token**: The encoded JWT from parameter
+ * Returns: Claims of JWT
+
**get_jti**(encoded_token)
: *Returns the JTI (unique identifier) of an encoded JWT*
diff --git a/examples/asymmetric.py b/examples/asymmetric.py
index e90ce4c..30fb930 100644
--- a/examples/asymmetric.py
+++ b/examples/asymmetric.py
@@ -5,7 +5,7 @@
from pydantic import BaseModel
# In the real case, you can put the
-# public key and private key in *.txt then you can read that file
+# public key and private key in *.pem, *.key then you can read that file
private_key = """
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgGBoQhqHdMU65aSBQVC/u9a6HMfKA927aZOk7HA/kXuA5UU4Sl+U
diff --git a/examples/websocket.py b/examples/websocket.py
new file mode 100644
index 0000000..85ed25f
--- /dev/null
+++ b/examples/websocket.py
@@ -0,0 +1,84 @@
+from fastapi import FastAPI, WebSocket, Depends, Request, HTTPException, Query
+from fastapi.responses import HTMLResponse, JSONResponse
+from fastapi_jwt_auth import AuthJWT
+from fastapi_jwt_auth.exceptions import AuthJWTException
+from pydantic import BaseModel
+
+app = FastAPI()
+
+class User(BaseModel):
+ username: str
+ password: str
+
+class Settings(BaseModel):
+ authjwt_secret_key: str = "secret"
+
+@AuthJWT.load_config
+def get_config():
+ return Settings()
+
+@app.exception_handler(AuthJWTException)
+def authjwt_exception_handler(request: Request, exc: AuthJWTException):
+ return JSONResponse(
+ status_code=exc.status_code,
+ content={"detail": exc.message}
+ )
+
+
+html = """
+
+
+
+ Authorize
+
+
+ WebSocket Authorize
+ Token:
+
+ Send
+
+
+
+
+"""
+
+@app.get("/")
+async def get():
+ return HTMLResponse(html)
+
+@app.websocket('/ws')
+async def websocket(websocket: WebSocket, token: str = Query(...), Authorize: AuthJWT = Depends()):
+ await websocket.accept()
+ try:
+ Authorize.jwt_required("websocket",token=token)
+ # Authorize.jwt_optional("websocket",token=token)
+ # Authorize.jwt_refresh_token_required("websocket",token=token)
+ # Authorize.fresh_jwt_required("websocket",token=token)
+ await websocket.send_text("Successfully Login!")
+ decoded_token = Authorize.get_raw_jwt(token)
+ await websocket.send_text(f"Here your decoded token: {decoded_token}")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+@app.post('/login')
+def login(user: User, Authorize: AuthJWT = Depends()):
+ if user.username != "test" or user.password != "test":
+ raise HTTPException(status_code=401,detail="Bad username or password")
+
+ access_token = Authorize.create_access_token(subject=user.username,fresh=True)
+ refresh_token = Authorize.create_refresh_token(subject=user.username)
+ return {"access_token": access_token, "refresh_token": refresh_token}
diff --git a/examples/websocket_cookie.py b/examples/websocket_cookie.py
new file mode 100644
index 0000000..386dba8
--- /dev/null
+++ b/examples/websocket_cookie.py
@@ -0,0 +1,79 @@
+from fastapi import FastAPI, WebSocket, Depends, Query
+from fastapi.responses import HTMLResponse
+from fastapi_jwt_auth import AuthJWT
+from fastapi_jwt_auth.exceptions import AuthJWTException
+from pydantic import BaseModel
+
+app = FastAPI()
+
+class Settings(BaseModel):
+ authjwt_secret_key: str = "secret"
+ authjwt_token_location: set = {"cookies"}
+
+@AuthJWT.load_config
+def get_config():
+ return Settings()
+
+
+html = """
+
+
+
+ Authorize
+
+
+ WebSocket Authorize
+ Send
+
+
+
+
+"""
+
+@app.get("/")
+async def get():
+ return HTMLResponse(html)
+
+@app.websocket('/ws')
+async def websocket(websocket: WebSocket, csrf_token: str = Query(...), Authorize: AuthJWT = Depends()):
+ await websocket.accept()
+ try:
+ Authorize.jwt_required("websocket",websocket=websocket,csrf_token=csrf_token)
+ # Authorize.jwt_optional("websocket",websocket=websocket,csrf_token=csrf_token)
+ # Authorize.jwt_refresh_token_required("websocket",websocket=websocket,csrf_token=csrf_token)
+ # Authorize.fresh_jwt_required("websocket",websocket=websocket,csrf_token=csrf_token)
+ await websocket.send_text("Successfully Login!")
+ decoded_token = Authorize.get_raw_jwt()
+ await websocket.send_text(f"Here your decoded token: {decoded_token}")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+@app.get('/get-cookie')
+def get_cookie(Authorize: AuthJWT = Depends()):
+ access_token = Authorize.create_access_token(subject='test',fresh=True)
+ refresh_token = Authorize.create_refresh_token(subject='test')
+
+ Authorize.set_access_cookies(access_token)
+ Authorize.set_refresh_cookies(refresh_token)
+ return {"msg":"Successfully login"}
diff --git a/fastapi_jwt_auth/auth_config.py b/fastapi_jwt_auth/auth_config.py
index 48877eb..b259f2e 100644
--- a/fastapi_jwt_auth/auth_config.py
+++ b/fastapi_jwt_auth/auth_config.py
@@ -6,8 +6,6 @@
class AuthConfig:
_token = None
_token_location = {'headers'}
- _response = None
- _request = None
_secret_key = None
_public_key = None
diff --git a/fastapi_jwt_auth/auth_jwt.py b/fastapi_jwt_auth/auth_jwt.py
index e21a28d..4110bdb 100644
--- a/fastapi_jwt_auth/auth_jwt.py
+++ b/fastapi_jwt_auth/auth_jwt.py
@@ -2,16 +2,14 @@
from jwt.algorithms import requires_cryptography, has_crypto
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Union, Sequence
-from types import GeneratorType
-from fastapi import Request, Response
+from fastapi import Request, Response, WebSocket
from fastapi_jwt_auth.auth_config import AuthConfig
from fastapi_jwt_auth.exceptions import (
InvalidHeaderError,
CSRFError,
JWTDecodeError,
RevokedTokenError,
- MissingHeaderError,
- MissingCookieError,
+ MissingTokenError,
AccessTokenRequired,
RefreshTokenRequired,
FreshTokenRequired
@@ -148,9 +146,9 @@ def _create_token(
# Validation type data
if not isinstance(subject, (str,int)):
raise TypeError("subject must be a string or integer")
- if not isinstance(fresh, (bool)):
+ if not isinstance(fresh, bool):
raise TypeError("fresh must be a boolean")
- if audience and not isinstance(audience, (str, list, tuple, set, frozenset, GeneratorType)):
+ if audience and not isinstance(audience, (str, list, tuple, set, frozenset)):
raise TypeError("audience must be a string or sequence")
if algorithm and not isinstance(algorithm, str):
raise TypeError("algorithm must be a string")
@@ -224,7 +222,7 @@ def _get_expired_time(
expires_time: Optional[Union[timedelta,int,bool]] = None
) -> Union[None,int]:
"""
- Dynamic token expired if expires_time is False exp claim not created
+ Dynamic token expired, if expires_time is False exp claim not created
:param type_token: indicate token is access_token or refresh_token
:param expires_time: duration expired jwt
@@ -484,96 +482,106 @@ def unset_refresh_cookies(self,response: Optional[Response] = None) -> None:
domain=self._cookie_domain
)
- def _verify_and_get_jwt_optional_in_cookies(self,issuer: Optional[str] = None) -> "AuthJWT":
+ def _verify_and_get_jwt_optional_in_cookies(
+ self,
+ request: Union[Request,WebSocket],
+ csrf_token: Optional[str] = None,
+ ) -> "AuthJWT":
"""
Optionally check if cookies have a valid access token. if an access token present in
- cookies property _token will set. raises exception error when an access token is invalid
- and doesn't match with CSRF token double submit
+ cookies, self._token will set. raises exception error when an access token is invalid
+ or doesn't match with CSRF token double submit
- :param issuer: expected issuer in the JWT
+ :param request: for identity get cookies from HTTP or WebSocket
+ :param csrf_token: the CSRF double submit token
"""
+ if not isinstance(request,(Request,WebSocket)):
+ raise TypeError("request must be an instance of 'Request' or 'WebSocket'")
+
cookie_key = self._access_cookie_key
- cookie = self._request.cookies.get(cookie_key)
- csrf_cookie = self._request.headers.get(self._access_csrf_header_name)
+ cookie = request.cookies.get(cookie_key)
+ if not isinstance(request, WebSocket):
+ csrf_token = request.headers.get(self._access_csrf_header_name)
- if (
- cookie and
- self._cookie_csrf_protect and
- self._request.method in self._csrf_methods and
- not csrf_cookie
- ):
- raise CSRFError(status_code=401,message="Missing CSRF Token")
+ if cookie and self._cookie_csrf_protect and not csrf_token:
+ if isinstance(request, WebSocket) or request.method in self._csrf_methods:
+ raise CSRFError(status_code=401,message="Missing CSRF Token")
# set token from cookie and verify jwt
self._token = cookie
- self._verify_jwt_optional_in_request(self._token,issuer)
+ self._verify_jwt_optional_in_request(self._token)
decoded_token = self.get_raw_jwt()
- if (
- self._cookie_csrf_protect and
- self._request.method in self._csrf_methods and
- csrf_cookie and
- decoded_token
- ):
- if 'csrf' not in decoded_token:
- raise JWTDecodeError(status_code=422,message="Missing claim: csrf")
- if not hmac.compare_digest(csrf_cookie,decoded_token['csrf']):
- raise CSRFError(status_code=401,message="CSRF double submit tokens do not match")
+ if decoded_token and self._cookie_csrf_protect and csrf_token:
+ if isinstance(request, WebSocket) or request.method in self._csrf_methods:
+ if 'csrf' not in decoded_token:
+ raise JWTDecodeError(status_code=422,message="Missing claim: csrf")
+ if not hmac.compare_digest(csrf_token,decoded_token['csrf']):
+ raise CSRFError(status_code=401,message="CSRF double submit tokens do not match")
def _verify_and_get_jwt_in_cookies(
self,
type_token: str,
- issuer: Optional[str] = None,
- fresh: Optional[bool] = False
+ request: Union[Request,WebSocket],
+ csrf_token: Optional[str] = None,
+ fresh: Optional[bool] = False,
) -> "AuthJWT":
"""
Check if cookies have a valid access or refresh token. if an token present in
- cookies property _token will set. raises exception error when an access or refresh token
- is invalid and doesn't match with CSRF token double submit
+ cookies, self._token will set. raises exception error when an access or refresh token
+ is invalid or doesn't match with CSRF token double submit
:param type_token: indicate token is access or refresh token
- :param issuer: expected issuer in the JWT
+ :param request: for identity get cookies from HTTP or WebSocket
+ :param csrf_token: the CSRF double submit token
:param fresh: check freshness token if True
"""
+ if type_token not in ['access','refresh']:
+ raise ValueError("type_token must be between 'access' or 'refresh'")
+ if not isinstance(request,(Request,WebSocket)):
+ raise TypeError("request must be an instance of 'Request' or 'WebSocket'")
+
if type_token == 'access':
cookie_key = self._access_cookie_key
- cookie = self._request.cookies.get(cookie_key)
- csrf_cookie = self._request.headers.get(self._access_csrf_header_name)
+ cookie = request.cookies.get(cookie_key)
+ if not isinstance(request, WebSocket):
+ csrf_token = request.headers.get(self._access_csrf_header_name)
if type_token == 'refresh':
cookie_key = self._refresh_cookie_key
- cookie = self._request.cookies.get(cookie_key)
- csrf_cookie = self._request.headers.get(self._refresh_csrf_header_name)
+ cookie = request.cookies.get(cookie_key)
+ if not isinstance(request, WebSocket):
+ csrf_token = request.headers.get(self._refresh_csrf_header_name)
if not cookie:
- raise MissingCookieError(status_code=401,message="Missing cookie {}".format(cookie_key))
+ raise MissingTokenError(status_code=401,message="Missing cookie {}".format(cookie_key))
- if self._cookie_csrf_protect and self._request.method in self._csrf_methods and not csrf_cookie:
- raise CSRFError(status_code=401,message="Missing CSRF Token")
+ if self._cookie_csrf_protect and not csrf_token:
+ if isinstance(request, WebSocket) or request.method in self._csrf_methods:
+ raise CSRFError(status_code=401,message="Missing CSRF Token")
# set token from cookie and verify jwt
self._token = cookie
- self._verify_jwt_in_request(self._token,type_token,'cookies',issuer,fresh)
+ self._verify_jwt_in_request(self._token,type_token,'cookies',fresh)
decoded_token = self.get_raw_jwt()
- if self._cookie_csrf_protect and self._request.method in self._csrf_methods and csrf_cookie:
- if 'csrf' not in decoded_token:
- raise JWTDecodeError(status_code=422,message="Missing claim: csrf")
- if not hmac.compare_digest(csrf_cookie,decoded_token['csrf']):
- raise CSRFError(status_code=401,message="CSRF double submit tokens do not match")
+ if self._cookie_csrf_protect and csrf_token:
+ if isinstance(request, WebSocket) or request.method in self._csrf_methods:
+ if 'csrf' not in decoded_token:
+ raise JWTDecodeError(status_code=422,message="Missing claim: csrf")
+ if not hmac.compare_digest(csrf_token,decoded_token['csrf']):
+ raise CSRFError(status_code=401,message="CSRF double submit tokens do not match")
- def _verify_jwt_optional_in_request(self,token: str, issuer: Optional[str] = None) -> None:
+ def _verify_jwt_optional_in_request(self,token: str) -> None:
"""
Optionally check if this request has a valid access token
:param token: The encoded JWT
- :param issuer: expected issuer in the JWT
"""
- if token:
- self._verifying_token(token,issuer)
+ if token: self._verifying_token(token)
- if token and self._get_type_token() != 'access':
+ if token and self.get_raw_jwt(token)['type'] != 'access':
raise AccessTokenRequired(status_code=422,message="Only access tokens are allowed")
def _verify_jwt_in_request(
@@ -581,7 +589,6 @@ def _verify_jwt_in_request(
token: str,
type_token: str,
token_from: str,
- issuer: Optional[str] = None,
fresh: Optional[bool] = False
) -> None:
"""
@@ -589,24 +596,32 @@ def _verify_jwt_in_request(
:param token: The encoded JWT
:param type_token: indicate token is access or refresh token
- :param token_from: indicate token from headers or cookies
- :param issuer: expected issuer in the JWT
+ :param token_from: indicate token from headers cookies, websocket
:param fresh: check freshness token if True
"""
- if token:
- self._verifying_token(token,issuer)
+ if type_token not in ['access','refresh']:
+ raise ValueError("type_token must be between 'access' or 'refresh'")
+ if token_from not in ['headers','cookies','websocket']:
+ raise ValueError("token_from must be between 'headers', 'cookies', 'websocket'")
- if not token and token_from == 'headers':
- raise MissingHeaderError(status_code=401,message="Missing {} Header".format(self._header_name))
+ if not token:
+ if token_from == 'headers':
+ raise MissingTokenError(status_code=401,message="Missing {} Header".format(self._header_name))
+ if token_from == 'websocket':
+ raise MissingTokenError(status_code=1008,message="Missing {} token from Query or Path".format(type_token))
- if self._get_type_token() != type_token:
+ # verify jwt
+ issuer = self._decode_issuer if type_token == 'access' else None
+ self._verifying_token(token,issuer)
+
+ if self.get_raw_jwt(token)['type'] != type_token:
msg = "Only {} tokens are allowed".format(type_token)
if type_token == 'access':
raise AccessTokenRequired(status_code=422,message=msg)
if type_token == 'refresh':
raise RefreshTokenRequired(status_code=422,message=msg)
- if fresh and not self._get_fresh_token():
+ if fresh and not self.get_raw_jwt(token)['fresh']:
raise FreshTokenRequired(status_code=401,message="Fresh token required")
def _verifying_token(self,encoded_token: str, issuer: Optional[str] = None) -> None:
@@ -653,89 +668,159 @@ def _verified_token(self,encoded_token: str, issuer: Optional[str] = None) -> Di
except Exception as err:
raise JWTDecodeError(status_code=422,message=str(err))
- def _get_type_token(self) -> str:
- return self.get_raw_jwt()['type']
-
- def _get_fresh_token(self) -> bool:
- return self.get_raw_jwt()['fresh']
-
- def jwt_required(self) -> None:
+ def jwt_required(
+ self,
+ auth_from: str = "request",
+ token: Optional[str] = None,
+ websocket: Optional[WebSocket] = None,
+ csrf_token: Optional[str] = None,
+ ) -> None:
"""
Only access token can access this function
- """
- if len(self._token_location) == 2:
- if self._token and self.jwt_in_headers:
- self._verify_jwt_in_request(self._token,'access','headers',self._decode_issuer)
- if not self._token and self.jwt_in_cookies:
- self._verify_and_get_jwt_in_cookies('access',self._decode_issuer)
- else:
- if self.jwt_in_headers:
- self._verify_jwt_in_request(self._token,'access','headers',self._decode_issuer)
- if self.jwt_in_cookies:
- self._verify_and_get_jwt_in_cookies('access',self._decode_issuer)
- def jwt_optional(self) -> None:
+ :param auth_from: for identity get token from HTTP or WebSocket
+ :param token: the encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ :param websocket: an instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ """
+ if auth_from == "websocket":
+ if websocket: self._verify_and_get_jwt_in_cookies('access',websocket,csrf_token)
+ else: self._verify_jwt_in_request(token,'access','websocket')
+
+ if auth_from == "request":
+ if len(self._token_location) == 2:
+ if self._token and self.jwt_in_headers:
+ self._verify_jwt_in_request(self._token,'access','headers')
+ if not self._token and self.jwt_in_cookies:
+ self._verify_and_get_jwt_in_cookies('access',self._request)
+ else:
+ if self.jwt_in_headers:
+ self._verify_jwt_in_request(self._token,'access','headers')
+ if self.jwt_in_cookies:
+ self._verify_and_get_jwt_in_cookies('access',self._request)
+
+ def jwt_optional(
+ self,
+ auth_from: str = "request",
+ token: Optional[str] = None,
+ websocket: Optional[WebSocket] = None,
+ csrf_token: Optional[str] = None,
+ ) -> None:
"""
If an access token in present in the request you can get data from get_raw_jwt() or get_jwt_subject(),
If no access token is present in the request, this endpoint will still be called, but
get_raw_jwt() or get_jwt_subject() will return None
- """
- if len(self._token_location) == 2:
- if self._token and self.jwt_in_headers:
- self._verify_jwt_optional_in_request(self._token,self._decode_issuer)
- if not self._token and self.jwt_in_cookies:
- self._verify_and_get_jwt_optional_in_cookies(self._decode_issuer)
- else:
- if self.jwt_in_headers:
- self._verify_jwt_optional_in_request(self._token,self._decode_issuer)
- if self.jwt_in_cookies:
- self._verify_and_get_jwt_optional_in_cookies(self._decode_issuer)
- def jwt_refresh_token_required(self) -> None:
+ :param auth_from: for identity get token from HTTP or WebSocket
+ :param token: the encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ :param websocket: an instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ """
+ if auth_from == "websocket":
+ if websocket: self._verify_and_get_jwt_optional_in_cookies(websocket,csrf_token)
+ else: self._verify_jwt_optional_in_request(token)
+
+ if auth_from == "request":
+ if len(self._token_location) == 2:
+ if self._token and self.jwt_in_headers:
+ self._verify_jwt_optional_in_request(self._token)
+ if not self._token and self.jwt_in_cookies:
+ self._verify_and_get_jwt_optional_in_cookies(self._request)
+ else:
+ if self.jwt_in_headers:
+ self._verify_jwt_optional_in_request(self._token)
+ if self.jwt_in_cookies:
+ self._verify_and_get_jwt_optional_in_cookies(self._request)
+
+ def jwt_refresh_token_required(
+ self,
+ auth_from: str = "request",
+ token: Optional[str] = None,
+ websocket: Optional[WebSocket] = None,
+ csrf_token: Optional[str] = None,
+ ) -> None:
"""
This function will ensure that the requester has a valid refresh token
- """
- if len(self._token_location) == 2:
- if self._token and self.jwt_in_headers:
- self._verify_jwt_in_request(self._token,'refresh','headers')
- if not self._token and self.jwt_in_cookies:
- self._verify_and_get_jwt_in_cookies('refresh')
- else:
- if self.jwt_in_headers:
- self._verify_jwt_in_request(self._token,'refresh','headers')
- if self.jwt_in_cookies:
- self._verify_and_get_jwt_in_cookies('refresh')
- def fresh_jwt_required(self) -> None:
+ :param auth_from: for identity get token from HTTP or WebSocket
+ :param token: the encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ :param websocket: an instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ """
+ if auth_from == "websocket":
+ if websocket: self._verify_and_get_jwt_in_cookies('refresh',websocket,csrf_token)
+ else: self._verify_jwt_in_request(token,'refresh','websocket')
+
+ if auth_from == "request":
+ if len(self._token_location) == 2:
+ if self._token and self.jwt_in_headers:
+ self._verify_jwt_in_request(self._token,'refresh','headers')
+ if not self._token and self.jwt_in_cookies:
+ self._verify_and_get_jwt_in_cookies('refresh',self._request)
+ else:
+ if self.jwt_in_headers:
+ self._verify_jwt_in_request(self._token,'refresh','headers')
+ if self.jwt_in_cookies:
+ self._verify_and_get_jwt_in_cookies('refresh',self._request)
+
+ def fresh_jwt_required(
+ self,
+ auth_from: str = "request",
+ token: Optional[str] = None,
+ websocket: Optional[WebSocket] = None,
+ csrf_token: Optional[str] = None,
+ ) -> None:
"""
This function will ensure that the requester has a valid access token and fresh token
- """
- if len(self._token_location) == 2:
- if self._token and self.jwt_in_headers:
- self._verify_jwt_in_request(self._token,'access','headers',self._decode_issuer,True)
- if not self._token and self.jwt_in_cookies:
- self._verify_and_get_jwt_in_cookies('access',self._decode_issuer,True)
- else:
- if self.jwt_in_headers:
- self._verify_jwt_in_request(self._token,'access','headers',self._decode_issuer,True)
- if self.jwt_in_cookies:
- self._verify_and_get_jwt_in_cookies('access',self._decode_issuer,True)
- def get_raw_jwt(self) -> Optional[Dict[str,Union[str,int,bool]]]:
+ :param auth_from: for identity get token from HTTP or WebSocket
+ :param token: the encoded JWT, it's required if the protected endpoint use WebSocket to
+ authorization and get token from Query Url or Path
+ :param websocket: an instance of WebSocket, it's required if protected endpoint use a cookie to authorization
+ :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers
+ its must be passing csrf_token manually and can achieve by Query Url or Path
+ """
+ if auth_from == "websocket":
+ if websocket: self._verify_and_get_jwt_in_cookies('access',websocket,csrf_token,True)
+ else: self._verify_jwt_in_request(token,'access','websocket',True)
+
+ if auth_from == "request":
+ if len(self._token_location) == 2:
+ if self._token and self.jwt_in_headers:
+ self._verify_jwt_in_request(self._token,'access','headers',True)
+ if not self._token and self.jwt_in_cookies:
+ self._verify_and_get_jwt_in_cookies('access',self._request,fresh=True)
+ else:
+ if self.jwt_in_headers:
+ self._verify_jwt_in_request(self._token,'access','headers',True)
+ if self.jwt_in_cookies:
+ self._verify_and_get_jwt_in_cookies('access',self._request,fresh=True)
+
+ def get_raw_jwt(self,encoded_token: Optional[str] = None) -> Optional[Dict[str,Union[str,int,bool]]]:
"""
this will return the python dictionary which has all of the claims of the JWT that is accessing the endpoint.
If no JWT is currently present, return None instead
+ :param encoded_token: The encoded JWT from parameter
:return: claims of JWT
"""
- if self._token:
- return self._verified_token(self._token)
+ token = encoded_token or self._token
+
+ if token:
+ return self._verified_token(token)
return None
def get_jti(self,encoded_token: str) -> str:
"""
Returns the JTI (unique identifier) of an encoded JWT
+ :param encoded_token: The encoded JWT from parameter
:return: string of JTI
"""
return self._verified_token(encoded_token)['jti']
diff --git a/fastapi_jwt_auth/exceptions.py b/fastapi_jwt_auth/exceptions.py
index b85aff9..590423c 100644
--- a/fastapi_jwt_auth/exceptions.py
+++ b/fastapi_jwt_auth/exceptions.py
@@ -6,39 +6,31 @@ class AuthJWTException(Exception):
class InvalidHeaderError(AuthJWTException):
"""
- An error getting header information from a request
+ An error getting jwt in header or jwt header information from a request
"""
def __init__(self,status_code: int, message: str):
self.status_code = status_code
self.message = message
-class MissingHeaderError(AuthJWTException):
- """
- Error raised when not found in the header
- """
- def __init__(self,status_code: int, message: str):
- self.status_code = status_code
- self.message = message
-
-class MissingCookieError(AuthJWTException):
+class JWTDecodeError(AuthJWTException):
"""
- Error raised when token not found in cookie
+ An error decoding a JWT
"""
def __init__(self,status_code: int, message: str):
self.status_code = status_code
self.message = message
-class JWTDecodeError(AuthJWTException):
+class CSRFError(AuthJWTException):
"""
- An error decoding a JWT
+ An error with CSRF protection
"""
def __init__(self,status_code: int, message: str):
self.status_code = status_code
self.message = message
-class CSRFError(AuthJWTException):
+class MissingTokenError(AuthJWTException):
"""
- An error with CSRF protection
+ Error raised when token not found
"""
def __init__(self,status_code: int, message: str):
self.status_code = status_code
diff --git a/mkdocs.yml b/mkdocs.yml
index ef75b33..a89a921 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -8,10 +8,6 @@ theme:
repo_name: IndominusByte/fastapi-jwt-auth
repo_url: https://github.com/IndominusByte/fastapi-jwt-auth
-google_analytics:
- - G-P08KBZV1K6
- - auto
-
markdown_extensions:
- markdown_include.include:
base_path: docs
@@ -47,6 +43,7 @@ nav:
- Asymmetric Algorithm: advanced-usage/asymmetric.md
- Dynamic Token Expires: advanced-usage/dynamic-expires.md
- Dynamic Token Algorithm: advanced-usage/dynamic-algorithm.md
+ - WebSocket Protecting: advanced-usage/websocket.md
- Bigger Applications: advanced-usage/bigger-app.md
- Generate Documentation: advanced-usage/generate-docs.md
- Configuration Options:
diff --git a/tests/test_config.py b/tests/test_config.py
index 57a572a..05d2a85 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -20,8 +20,6 @@ def protected(Authorize: AuthJWT = Depends()):
def test_default_config():
assert AuthJWT._token is None
assert AuthJWT._token_location == {'headers'}
- assert AuthJWT._response is None
- assert AuthJWT._request is None
assert AuthJWT._secret_key is None
assert AuthJWT._public_key is None
assert AuthJWT._private_key is None
diff --git a/tests/test_decode_token.py b/tests/test_decode_token.py
index 9ca4395..5344d48 100644
--- a/tests/test_decode_token.py
+++ b/tests/test_decode_token.py
@@ -116,6 +116,9 @@ def test_get_raw_token(client,default_access_token,encoded_token):
assert response.status_code == 200
assert response.json() == default_access_token
+def test_get_raw_jwt(default_access_token,encoded_token,Authorize):
+ assert Authorize.get_raw_jwt(encoded_token) == default_access_token
+
def test_get_jwt_jti(client,default_access_token,encoded_token,Authorize):
assert Authorize.get_jti(encoded_token=encoded_token) == default_access_token['jti']
diff --git a/tests/test_websocket.py b/tests/test_websocket.py
new file mode 100644
index 0000000..be5f28c
--- /dev/null
+++ b/tests/test_websocket.py
@@ -0,0 +1,355 @@
+import pytest
+from fastapi_jwt_auth import AuthJWT
+from fastapi_jwt_auth.exceptions import AuthJWTException
+from fastapi import FastAPI, Depends, WebSocket, Query
+from fastapi.testclient import TestClient
+
+@pytest.fixture(scope='function')
+def client():
+ app = FastAPI()
+
+ @app.get('/all-token')
+ def all_token(Authorize: AuthJWT = Depends()):
+ access_token = Authorize.create_access_token(subject=1,fresh=True)
+ refresh_token = Authorize.create_refresh_token(subject=1)
+ Authorize.set_access_cookies(access_token)
+ Authorize.set_refresh_cookies(refresh_token)
+ return {"msg":"all token"}
+
+ @app.get('/unset-all-token')
+ def unset_all_token(Authorize: AuthJWT = Depends()):
+ Authorize.unset_jwt_cookies()
+ return {"msg":"unset all token"}
+
+ @app.websocket('/jwt-required')
+ async def websocket_jwt_required(
+ websocket: WebSocket,
+ token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.jwt_required("websocket",token=token)
+ await websocket.send_text("Successfully Login!")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ @app.websocket('/jwt-required-cookies')
+ async def websocket_jwt_required_cookies(
+ websocket: WebSocket,
+ csrf_token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.jwt_required("websocket",websocket=websocket,csrf_token=csrf_token)
+ await websocket.send_text("Successfully Login!")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ @app.websocket('/jwt-optional')
+ async def websocket_jwt_optional(
+ websocket: WebSocket,
+ token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.jwt_optional("websocket",token=token)
+ decoded_token = Authorize.get_raw_jwt(token)
+ if decoded_token:
+ await websocket.send_text("hello world")
+ await websocket.send_text("hello anonym")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ @app.websocket('/jwt-optional-cookies')
+ async def websocket_jwt_optional_cookies(
+ websocket: WebSocket,
+ csrf_token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.jwt_optional("websocket",websocket=websocket,csrf_token=csrf_token)
+ decoded_token = Authorize.get_raw_jwt()
+ if decoded_token:
+ await websocket.send_text("hello world")
+ await websocket.send_text("hello anonym")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ @app.websocket('/jwt-refresh-required')
+ async def websocket_jwt_refresh_required(
+ websocket: WebSocket,
+ token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.jwt_refresh_token_required("websocket",token=token)
+ await websocket.send_text("Successfully Login!")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ @app.websocket('/jwt-refresh-required-cookies')
+ async def websocket_jwt_refresh_required_cookies(
+ websocket: WebSocket,
+ csrf_token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.jwt_refresh_token_required("websocket",websocket=websocket,csrf_token=csrf_token)
+ await websocket.send_text("Successfully Login!")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ @app.websocket('/fresh-jwt-required')
+ async def websocket_fresh_jwt_required(
+ websocket: WebSocket,
+ token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.fresh_jwt_required("websocket",token=token)
+ await websocket.send_text("Successfully Login!")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ @app.websocket('/fresh-jwt-required-cookies')
+ async def websocket_fresh_jwt_required_cookies(
+ websocket: WebSocket,
+ csrf_token: str = Query(...),
+ Authorize: AuthJWT = Depends()
+ ):
+ await websocket.accept()
+ try:
+ Authorize.fresh_jwt_required("websocket",websocket=websocket,csrf_token=csrf_token)
+ await websocket.send_text("Successfully Login!")
+ except AuthJWTException as err:
+ await websocket.send_text(err.message)
+ await websocket.close()
+
+ client = TestClient(app)
+ return client
+
+@pytest.mark.parametrize("url",["/jwt-required","/jwt-refresh-required","/fresh-jwt-required"])
+def test_missing_token_websocket(client,url):
+ token_type = "access" if url != "/jwt-refresh-required" else "refresh"
+ with client.websocket_connect(url + "?token=") as websocket:
+ data = websocket.receive_text()
+ assert data == f"Missing {token_type} token from Query or Path"
+
+@pytest.mark.parametrize("url",["/jwt-required","/jwt-optional","/fresh-jwt-required"])
+def test_only_access_token_allowed_websocket(client,url,Authorize):
+ token = Authorize.create_refresh_token(subject='test')
+ with client.websocket_connect(url + f"?token={token}") as websocket:
+ data = websocket.receive_text()
+ assert data == 'Only access tokens are allowed'
+
+def test_jwt_required_websocket(client,Authorize):
+ url = '/jwt-required'
+ token = Authorize.create_access_token(subject='test')
+ with client.websocket_connect(url + f"?token={token}") as websocket:
+ data = websocket.receive_text()
+ assert data == 'Successfully Login!'
+
+def test_jwt_optional_websocket(client,Authorize):
+ url = '/jwt-optional'
+ # if token not define return anonym user
+ with client.websocket_connect(url + "?token=") as websocket:
+ data = websocket.receive_text()
+ assert data == "hello anonym"
+
+ token = Authorize.create_access_token(subject='test')
+ with client.websocket_connect(url + f"?token={token}") as websocket:
+ data = websocket.receive_text()
+ assert data == "hello world"
+
+def test_refresh_required_websocket(client,Authorize):
+ url = '/jwt-refresh-required'
+ # only refresh token allowed
+ token = Authorize.create_access_token(subject='test')
+ with client.websocket_connect(url + f"?token={token}") as websocket:
+ data = websocket.receive_text()
+ assert data == "Only refresh tokens are allowed"
+
+ token = Authorize.create_refresh_token(subject='test')
+ with client.websocket_connect(url + f"?token={token}") as websocket:
+ data = websocket.receive_text()
+ assert data == "Successfully Login!"
+
+def test_fresh_jwt_required_websocket(client,Authorize):
+ url = '/fresh-jwt-required'
+ # only fresh token allowed
+ token = Authorize.create_access_token(subject='test')
+ with client.websocket_connect(url + f"?token={token}") as websocket:
+ data = websocket.receive_text()
+ assert data == "Fresh token required"
+
+ token = Authorize.create_access_token(subject='test',fresh=True)
+ with client.websocket_connect(url + f"?token={token}") as websocket:
+ data = websocket.receive_text()
+ assert data == "Successfully Login!"
+
+# ========= COOKIES ========
+
+def test_invalid_instance_websocket(Authorize):
+ with pytest.raises(TypeError,match=r"request"):
+ Authorize.jwt_required("websocket",websocket="test")
+ with pytest.raises(TypeError,match=r"request"):
+ Authorize.jwt_optional("websocket",websocket="test")
+ with pytest.raises(TypeError,match=r"request"):
+ Authorize.jwt_refresh_token_required("websocket",websocket="test")
+ with pytest.raises(TypeError,match=r"request"):
+ Authorize.fresh_jwt_required("websocket",websocket="test")
+
+@pytest.mark.parametrize("url",["/jwt-required-cookies","/jwt-refresh-required-cookies","/fresh-jwt-required-cookies"])
+def test_missing_cookie(url,client):
+ cookie_key = "access_token_cookie" if url != "/jwt-refresh-required-cookies" else "refresh_token_cookie"
+ with client.websocket_connect(url + "?csrf_token=") as websocket:
+ data = websocket.receive_text()
+ assert data == f"Missing cookie {cookie_key}"
+
+@pytest.mark.parametrize("url",[
+ "/jwt-required-cookies",
+ "/jwt-refresh-required-cookies",
+ "/fresh-jwt-required-cookies",
+ "/jwt-optional-cookies"
+])
+def test_missing_csrf_token(url,client):
+ @AuthJWT.load_config
+ def get_cookie_location():
+ return [("authjwt_token_location",{'cookies'}),("authjwt_secret_key","secret")]
+
+ # required and optional
+ client.get('/all-token')
+
+ with client.websocket_connect(url + "?csrf_token=") as websocket:
+ data = websocket.receive_text()
+ assert data == "Missing CSRF Token"
+
+ client.get('/unset-all-token')
+
+ # disable csrf protection
+ @AuthJWT.load_config
+ def change_request_csrf_protect_to_false():
+ return [
+ ("authjwt_token_location",{'cookies'}),
+ ("authjwt_secret_key","secret"),
+ ("authjwt_cookie_csrf_protect",False)
+ ]
+
+ client.get('/all-token')
+
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
+ with client.websocket_connect(url + "?csrf_token=") as websocket:
+ data = websocket.receive_text()
+ assert data == msg
+
+@pytest.mark.parametrize("url",[
+ "/jwt-required-cookies",
+ "/jwt-refresh-required-cookies",
+ "/fresh-jwt-required-cookies",
+ "/jwt-optional-cookies"
+])
+def test_missing_claim_csrf_in_token(url,client):
+ # required and optional
+ @AuthJWT.load_config
+ def change_request_csrf_protect_to_false():
+ return [
+ ("authjwt_token_location",{'cookies'}),
+ ("authjwt_secret_key","secret"),
+ ("authjwt_cookie_csrf_protect",False)
+ ]
+
+ client.get('/all-token')
+
+ @AuthJWT.load_config
+ def change_request_csrf_protect_to_true():
+ return [("authjwt_token_location",{'cookies'}),("authjwt_secret_key","secret")]
+
+ with client.websocket_connect(url + "?csrf_token=test") as websocket:
+ data = websocket.receive_text()
+ assert data == "Missing claim: csrf"
+
+ # disable csrf protection
+ @AuthJWT.load_config
+ def change_request_csrf_protect_to_false_again():
+ return [
+ ("authjwt_token_location",{'cookies'}),
+ ("authjwt_secret_key","secret"),
+ ("authjwt_cookie_csrf_protect",False)
+ ]
+
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
+ with client.websocket_connect(url + "?csrf_token=test") as websocket:
+ data = websocket.receive_text()
+ assert data == msg
+
+@pytest.mark.parametrize("url",[
+ "/jwt-required-cookies",
+ "/jwt-refresh-required-cookies",
+ "/fresh-jwt-required-cookies",
+ "/jwt-optional-cookies"
+])
+def test_invalid_csrf_double_submit(url,client):
+ # required and optional
+ @AuthJWT.load_config
+ def get_cookie_location():
+ return [("authjwt_token_location",{'cookies'}),("authjwt_secret_key","secret")]
+
+ client.get('/all-token')
+
+ with client.websocket_connect(url + "?csrf_token=test") as websocket:
+ data = websocket.receive_text()
+ assert data == "CSRF double submit tokens do not match"
+
+ # disable csrf protection
+ @AuthJWT.load_config
+ def change_request_csrf_protect_to_false():
+ return [
+ ("authjwt_token_location",{'cookies'}),
+ ("authjwt_secret_key","secret"),
+ ("authjwt_cookie_csrf_protect",False)
+ ]
+
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
+ with client.websocket_connect(url + "?csrf_token=test") as websocket:
+ data = websocket.receive_text()
+ assert data == msg
+
+@pytest.mark.parametrize("url",[
+ "/jwt-required-cookies",
+ "/jwt-refresh-required-cookies",
+ "/fresh-jwt-required-cookies",
+ "/jwt-optional-cookies"
+])
+def test_valid_access_endpoint_with_csrf(url,client):
+ # required and optional
+ @AuthJWT.load_config
+ def get_cookie_location():
+ return [("authjwt_token_location",{'cookies'}),("authjwt_secret_key","secret")]
+
+ res = client.get('/all-token')
+ csrf_access = res.cookies.get("csrf_access_token")
+ csrf_refresh = res.cookies.get("csrf_refresh_token")
+
+ if url == "/jwt-refresh-required-cookies":
+ with client.websocket_connect(url + f"?csrf_token={csrf_refresh}") as websocket:
+ data = websocket.receive_text()
+ assert data == "Successfully Login!"
+ else:
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
+ with client.websocket_connect(url + f"?csrf_token={csrf_access}") as websocket:
+ data = websocket.receive_text()
+ assert data == msg