Source code for owtf.managers.session
"""
owtf.managers.session
~~~~~~~~~~~~~~~~~~~~~
Manager functions for sessions
"""
from owtf.db.session import get_scoped_session
from owtf.lib import exceptions
from owtf.models.session import Session
from owtf.models.target import Target
from owtf.utils.strings import str2bool
[docs]
def session_required(func):
"""
In order to use this decorator on a `method` there is one requirements
, target_id must be a kwarg of the function
All this decorator does is check if a valid value is passed for target_id
if not get the target_id from target manager and pass it
"""
def wrapped_function(*args, **kwargs):
# True if target_id doesnt exist
if (
kwargs.get("session_id", "None") == "None"
or kwargs.get("session_id", True) is None
):
kwargs["session_id"] = Session.get_active(get_scoped_session())
return func(*args, **kwargs)
return wrapped_function
def _ensure_default_session(session):
"""If there are no sessions, it will be deadly, so if number of sessions is zero then add a default session
:return: None
:rtype: None
"""
if session.query(Session).count() == 0:
add_session(session, "default session")
[docs]
def add_session(session, session_name):
"""Adds a new session to the DB
:param session_name: Name of the new session
:type session_name: `str`
:return: None
:rtype: None
"""
existing_obj = session.query(Session).filter_by(name=session_name).first()
if existing_obj is None:
session_obj = Session(name=session_name[:50])
session.add(session_obj)
session.commit()
Session.set_by_id(session, session_obj.id)
else:
raise exceptions.DBIntegrityException(
"Session already exists with session name: {!s}".format(session_name)
)
[docs]
@session_required
def add_target_to_session(session, target_id, session_id=None):
"""Adds the target to the session
:param target_id: ID of the target to add
:type target_id: `int`
:param session_id: ID of the session
:type session_id: `int`
:return: None
:rtype: None
"""
session_obj = session.query(Session).get(session_id)
target_obj = session.query(Target).get(target_id)
if session_obj is None:
raise exceptions.InvalidSessionReference(
"No session with id: {!s}".format(session_id)
)
if target_obj is None:
raise exceptions.InvalidTargetReference(
"No target with id: {!s}".format(target_id)
)
if session_obj not in target_obj.sessions:
session_obj.targets.append(target_obj)
session.commit()
[docs]
@session_required
def remove_target_from_session(session, target_id, session_id=None):
"""Remove target from a session
:param target_id: ID of the target
:type target_id: `int`
:param session_id: ID of the session
:type session_id: `int`
:return: None
:rtype: None
"""
session_obj = session.query(Session).get(session_id)
target_obj = session.query(Target).get(target_id)
if session_obj is None:
raise exceptions.InvalidSessionReference(
"No session with id: {!s}".format(session_id)
)
if target_obj is None:
raise exceptions.InvalidTargetReference(
"No target with id: {!s}".format(target_id)
)
session_obj.targets.remove(target_obj)
# Delete target whole together if present in this session alone
if len(target_obj.sessions) == 0:
from owtf.managers.target import delete_target
delete_target(session, id=target_obj.id)
session.commit()
[docs]
def delete_session(session, session_id):
"""Deletes a session from the DB
:param session_id: ID of the session to delete
:type session_id: `int`
:return: None
:rtype: None
"""
session_obj = session.query(Session).get(session_id)
if session_obj is None:
raise exceptions.InvalidSessionReference(
"No session with id: {!s}".format(session_id)
)
for target in session_obj.targets:
# Means attached to only this session obj
if len(target.sessions) == 1:
from owtf.managers.target import delete_target
delete_target(session, id=target.id)
session.delete(session_obj)
_ensure_default_session(session) # i.e if there are no sessions, add one
session.commit()
[docs]
def session_generate_query(session, filter_data=None):
"""Generate query based on filter data
:param filter_data: Filter data
:type filter_data: `dict`
:return:
:rtype:
"""
if filter_data is None:
filter_data = {}
query = session.query(Session)
# it doesn't make sense to search in a boolean column :P
if filter_data.get("active", None):
if isinstance(filter_data.get("active"), list):
filter_data["active"] = filter_data["active"][0]
query = query.filter_by(active=str2bool(filter_data["active"]))
return query.order_by(Session.id)
[docs]
def get_all_session_dicts(session, filter_data):
"""Get session dicts based on filter criteria
:param filter_data: Filter data
:type filter_data: `dict`
:return: List of session dicts
:rtype: `dict`
"""
session_objs = session_generate_query(session, filter_data).all()
results = []
for session_obj in session_objs:
if session_obj:
results.append(session_obj.to_dict())
return results