Source code for owtf.api.handlers.jwtauth
"""
JSON Web Token auth for Tornado
"""
from sqlalchemy.sql.functions import user
from owtf.models.user_login_token import UserLoginToken
import jwt
from owtf.settings import JWT_SECRET_KEY, JWT_OPTIONS, JWT_ALGORITHM
from owtf.db.session import Session
[docs]
def jwtauth(handler_class):
"""Decorator to handle Tornado JWT Authentication"""
def wrap_execute(handler_execute):
def require_auth(handler, kwargs):
auth = handler.request.headers.get("Authorization")
if auth:
parts = auth.split()
if parts[0].lower() != "bearer" or len(parts) == 1 or len(parts) > 2:
handler._transforms = []
handler.set_status(401)
handler.write({"success": False, "message": "Invalid header authorization"})
handler.finish()
token = parts[1]
try:
payload = jwt.decode(token, JWT_SECRET_KEY, options=JWT_OPTIONS, algorithms=[JWT_ALGORITHM])
user_id = payload.get("user_id", None)
session = Session()
user_token = UserLoginToken.find_by_userid_and_token(session, user_id, token)
if user_id is None or user_token is None:
handler._transforms = []
handler.set_status(401)
handler.write({"success": False, "message": "Unauthorized"})
handler.finish()
except Exception:
handler._transforms = []
handler.set_status(401)
handler.write({"success": False, "message": "Unauthorized"})
handler.finish()
else:
handler._transforms = []
handler.write({"success": False, "message": "Missing authorization"})
handler.finish()
return True
def _execute(self, transforms, *args, **kwargs):
try:
require_auth(self, kwargs)
except Exception:
return False
return handler_execute(self, transforms, *args, **kwargs)
return _execute
handler_class._execute = wrap_execute(handler_class._execute)
return handler_class