"""
owtf.api.handlers.auth
~~~~~~~~~~~~~~~~~~~~~~~~
"""
from sqlalchemy.sql.functions import user
from owtf.models.user_login_token import UserLoginToken
from owtf.api.handlers.base import APIRequestHandler
from owtf.lib.exceptions import APIError
from owtf.models.user import User
from datetime import datetime, timedelta
import bcrypt
import json
import jwt
import re
from owtf.settings import (
JWT_SECRET_KEY,
JWT_ALGORITHM,
JWT_EXP_DELTA_SECONDS,
is_password_valid_regex,
is_email_valid_regex,
EMAIL_FROM,
SMTP_HOST,
SMTP_LOGIN,
SMTP_PASS,
SMTP_PORT,
SERVER_ADDR,
SERVER_PORT,
FRONTEND_SERVER_PORT,
)
from owtf.db.session import Session
from uuid import uuid4
from owtf.models.email_confirmation import EmailConfirmation
from email.mime.text import MIMEText
import smtplib
from email.mime.multipart import MIMEMultipart
import logging
from bs4 import BeautifulSoup
from owtf.utils.logger import OWTFLogger
from owtf.api.handlers.jwtauth import jwtauth
import pyotp
[docs]
class LogInHandler(APIRequestHandler):
"""LogIn using the correct credentials (email, password). After successfull login a JWT Token is generated."""
SUPPORTED_METHODS = ["POST"]
[docs]
def post(self):
"""Post login data and return jwt token based on user credentials.
**Example request**:
.. sourcecode:: http
POST /api/v1/login/ HTTP/1.1
Content-Type: application/json; charset=UTF-8
{
"emailOrUsername": "test@test.com",
"password": "Test@34335",
}
**Example successful login response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": {
"jwt-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjozNSwiZXhwIjoxNjIzMjUyMjQwfQ.FjTpJySn3wprlaS26dC9LGBOMrtHJeJsTDJnyCKNmBk"
}
}
**Example failed login response**;
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "fail",
"message": "Invalid login credentials"
}
"""
email_or_username = self.get_argument("emailOrUsername", None)
password = self.get_argument("password", None)
if not email_or_username:
err = {"status": "fail", "message": "Missing email or username value"}
self.success(err)
if not password:
err = {"status": "fail", "message": "Missing password value"}
self.success(err)
user = User.find_by_email(self.session, email_or_username)
if user is None:
user = User.find_by_name(self.session, email_or_username)
if (
user
and user.password
and bcrypt.hashpw(password.encode("utf-8"), user.password.encode("utf-8")) == user.password.encode("utf-8")
and user.is_active
):
payload = {
"user_id": user.id,
"exp": datetime.utcnow() + timedelta(seconds=JWT_EXP_DELTA_SECONDS),
"username": user.name,
}
jwt_token = jwt.encode(payload, JWT_SECRET_KEY, JWT_ALGORITHM)
data = {"jwt-token": jwt_token}
UserLoginToken.add_user_login_token(self.session, jwt_token, user.id)
self.success({"status": "success", "message": data})
elif user and not user.is_active:
err = {"status": "fail", "message": "Your account is not active"}
self.success(err)
else:
err = {"status": "fail", "message": "Invalid login credentials"}
self.success(err)
[docs]
class RegisterHandler(APIRequestHandler):
"""Registers a new user when he provides email, name, password and confirm password."""
SUPPORTED_METHODS = ["POST"]
[docs]
def post(self):
"""Post data for creating a new user as per the data given by user.
**Example request**:
.. sourcecode:: http
POST /api/v1/register/ HTTP/1.1
Content-Type: application/json; charset=UTF-8
{
"email": "test@test.com",
"password": "Test@34335",
"confirm_password": "Test@34335",
"name": "test"
}
**Example Successful registration response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": "User created successfully"
}
**Example Failed registration response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "fail",
"message": "Email already exists"
}
"""
username = self.get_argument("username", None)
email = self.get_argument("email", None)
password = self.get_argument("password", None)
confirm_password = self.get_argument("confirm_password", None)
if not username:
err = {"status": "fail", "message": "Missing username value"}
self.success(err)
if not email:
err = {"status": "fail", "message": "Missing email value"}
self.success(err)
if not password:
err = {"status": "fail", "message": "Missing password value"}
self.success(err)
if not confirm_password:
err = {"status": "fail", "message": "Missing confirm password value"}
self.success(err)
email_already_taken = User.find_by_email(self.session, email)
name_already_taken = User.find_by_name(self.session, username)
match_password = re.search(is_password_valid_regex, password)
match_email = re.search(is_email_valid_regex, email)
if password != confirm_password:
err = {"status": "fail", "message": "Password doesn't match"}
self.success(err)
elif not match_email:
err = {"status": "fail", "message": "Choose a valid email"}
self.success(err)
elif not match_password:
err = {"status": "fail", "message": "Choose a strong password"}
self.success(err)
elif name_already_taken:
err = {"status": "fail", "message": "Username already exists"}
self.success(err)
elif email_already_taken:
err = {"status": "fail", "message": "Email already exists"}
self.success(err)
else:
hashed_pass = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
user = {}
user["email"] = email
user["password"] = hashed_pass
user["name"] = username # need to be chaned to username
user["otp_secret_key"] = pyotp.random_base32()
User.add_user(self.session, user)
data = "User created successfully"
self.success({"status": "success", "message": data})
[docs]
@jwtauth
class LogOutHandler(APIRequestHandler):
"""Logs out the current user and clears the cookie."""
[docs]
def get(self):
"""Get user log out of the system.
**Example request**:
.. sourcecode:: http
GET /api/v1/logout/ HTTP/1.1
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": "Logged out"
}
"""
auth = self.request.headers.get("Authorization")
if auth:
parts = auth.split()
token = parts[1]
UserLoginToken.delete_user_login_token(self.session, token)
response = {"status": "success", "message": "Logged out"}
self.success(response)
else:
raise APIError(400, "Invalid Token")
[docs]
def send_email_using_smtp(email_to, html, subject, logging_info):
"""Used for sending the email to the specified email with the given html and subject"""
if SMTP_HOST is not None:
msg = MIMEMultipart("alternative")
part = MIMEText(html, "html")
msg["From"] = EMAIL_FROM
msg["To"] = email_to
msg["Subject"] = subject
msg.attach(part)
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.login(SMTP_LOGIN, SMTP_PASS)
server.sendmail(EMAIL_FROM, email_to, msg.as_string())
del msg
else:
logger = OWTFLogger()
logger.enable_logging()
logging.info("")
logging.info(logging_info)
logger.disable_console_logging()
html = BeautifulSoup(html, "html.parser").get_text()
print(html)
[docs]
class AccountActivationGenerateHandler(APIRequestHandler):
"""Creates an email confirmation mail and sends it to the user for account confirmation."""
SUPPORTED_METHODS = ["POST"]
[docs]
def post(self):
"""Post an email verification link to the specified email.
**Example request**:
.. sourcecode:: http
POST /api/v1/generate/confirm_email/ HTTP/1.1
{
"email": "test@test.com",
}
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": "Email send successful"
}
"""
email_to = self.get_argument("email", None)
email_confirmation_dict = {}
email_confirmation_dict["key_value"] = str(uuid4())
email_confirmation_dict["expiration_time"] = datetime.now() + timedelta(hours=1)
user_obj = User.find_by_email(self.session, email_to)
email_confirmation_dict["user_id"] = user_obj.id
EmailConfirmation.remove_previous_all(self.session, user_obj.id)
EmailConfirmation.add_confirm_password(self.session, email_confirmation_dict)
html = (
"""\
<html>
<body>
Welcome """
+ user_obj.name
+ ", <br/><br/>"
"""
Click here """
+ "http://{}:{}".format(SERVER_ADDR, str(FRONTEND_SERVER_PORT))
+ "/email-verify/"
+ email_confirmation_dict["key_value"]
+ """ to activate your account (Link will expire in 1 hour).
</body>
</html>
"""
)
send_email_using_smtp(
email_to,
html,
"Account Activation",
"------> Showing the confirmation mail here, Since SMTP server is not set:",
)
response = {"status": "success", "message": "Email send successful"}
self.success(response)
[docs]
class AccountActivationValidateHandler(APIRequestHandler):
"""Validates an email confirmation mail which was sent to the user."""
SUPPORTED_METHODS = ["GET"]
[docs]
def get(self, key_value):
"""Get the email link to verify and activate user account.
**Example request**:
.. sourcecode:: http
GET /api/v1/verify/confirm_email/<link> HTTP/1.1
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": "Email Verified"
}
"""
email_conf_obj = EmailConfirmation.find_by_key_value(self.session, key_value)
if email_conf_obj is not None and email_conf_obj.expiration_time >= datetime.now():
User.activate_user(self.session, email_conf_obj.user_id)
response = {"status": "success", "message": "Email Verified"}
self.success(response)
elif email_conf_obj is not None and email_conf_obj.expiration_time < datetime.now():
user_id = email_conf_obj.user_id
user_email = User.find_by_id(self.session, user_id).email
if user_email is not None:
response = {"status": "success", "message": "Link Expired", "email": user_email}
self.success(response)
else:
response = {"status": "success", "message": "Invalid Link"}
self.success(response)
[docs]
class OtpGenerateHandler(APIRequestHandler):
"""Creates an otp and sends it to the user for password change"""
SUPPORTED_METHODS = ["POST"]
[docs]
def post(self):
"""
**Example request**:
.. sourcecode:: http
POST /api/v1/generate/otp/ HTTP/1.1
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": "Otp Send Successful"
}
"""
email_or_username = self.get_argument("emailOrUsername", None)
user_obj = User.find_by_email(self.session, email_or_username)
if user_obj is None:
user_obj = User.find_by_name(self.session, email_or_username)
if user_obj is not None:
secret_key = user_obj.otp_secret_key
totp = pyotp.TOTP(secret_key, interval=300) # 5 minutes interval
OTP = totp.now()
html = (
"""\
<html>
<body>
Welcome """
+ user_obj.name
+ ", <br/><br/>"
"""
Your OTP for changing password is: """
+ OTP
+ " (OTP will expire in 5 mins)"
+ """
</body>
</html>
"""
)
send_email_using_smtp(
user_obj.email,
html,
"OTP for Password Change",
"------> Showing the OTP here, Since SMTP server is not set:",
)
response = {"status": "success", "message": "Otp Send Successful"}
self.success(response)
else:
err = {"status": "fail", "message": "Username / Email doesn't exist"}
self.success(err)
[docs]
class OtpVerifyHandler(APIRequestHandler):
"""Validates an otp which was sent to the user"""
SUPPORTED_METHODS = ["POST"]
[docs]
def post(self):
"""
**Example request**:
.. sourcecode:: http
POST /api/v1/verify/otp/ HTTP/1.1
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": "OTP Verified"
}
"""
email_or_username = self.get_argument("emailOrUsername", None)
otp = self.get_argument("otp", None)
user_obj = User.find_by_email(self.session, email_or_username)
if user_obj is None:
user_obj = User.find_by_name(self.session, email_or_username)
if user_obj is not None and otp is not None:
secret_key = user_obj.otp_secret_key
totp = pyotp.TOTP(secret_key, interval=300)
verify = totp.verify(otp)
if verify:
self.success({"status": "success", "message": "OTP Verified"})
else:
self.success({"status": "fail", "message": "Invalid OTP"})
else:
err = {"status": "fail", "message": "Username / Email doesn't exist"}
self.success(err)
[docs]
class PasswordChangeHandler(APIRequestHandler):
"""Handles setting a new password for the verified user"""
SUPPORTED_METHODS = ["POST"]
[docs]
def post(self):
"""
**Example request**:
.. sourcecode:: http
POST /api/v1/new-password/ HTTP/1.1
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Encoding: gzip
Vary: Accept-Encoding
Content-Type: application/json; charset=UTF-8
{
"status": "success",
"message": "Password Change Successful"
}
"""
password = self.get_argument("password", None)
email_or_username = self.get_argument("emailOrUsername", None)
otp = self.get_argument("otp", None)
user_obj = User.find_by_email(self.session, email_or_username)
if user_obj is None:
user_obj = User.find_by_name(self.session, email_or_username)
match_password = re.search(is_password_valid_regex, password)
if not match_password:
err = {"status": "fail", "message": "Choose a strong password"}
self.success(err)
elif email_or_username is not None and password is not None and user_obj is not None and otp is not None:
secret_key = user_obj.otp_secret_key
totp = pyotp.TOTP(secret_key, interval=300)
verify = totp.verify(otp)
if verify:
hashed_pass = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
User.change_password(self.session, user_obj.email, hashed_pass)
data = {"status": "success", "message": "Password Change Successful"}
self.success(data)
else:
self.success({"status": "fail", "message": "Invalid OTP"})
else:
err = {"status": "fail", "message": "Password Change Unsuccessful"}
self.success(err)