Source code for owtf.managers.mapping
"""
owtf.managers.mapping
~~~~~~~~~~~~~~~~~~~~~
Manages the mapping between different plugin groups and codes
"""
import json
from owtf.db import models
from owtf.lib.exceptions import InvalidMappingReference
from owtf.managers.config import load_config_file
from owtf.utils.pycompat import iteritems
mapping_types = []
[docs]def get_mapping_types():
"""In memory data saved when loading db
:return: None
:rtype: None
"""
return mapping_types
[docs]def derive_mapping_dict(obj):
"""Fetch the mapping dict from an object
:param obj: The mapping object
:type obj:
:return: Mappings dict
:rtype: `dict`
"""
if obj:
pdict = dict(obj.__dict__)
pdict.pop("_sa_instance_state", None)
# If output is present, json decode it
if pdict.get("mappings", None):
pdict["mappings"] = json.loads(pdict["mappings"])
return pdict
[docs]def derive_mapping_dicts(obj_list):
"""Fetches the mapping dicts based on the objects list
:param obj_list: The plugin object list
:type obj_list: `list`
:return: Mapping dicts as a list
:rtype: `list`
"""
dict_list = []
for obj in obj_list:
dict_list.append(derive_mapping_dict(obj))
return dict_list
[docs]def get_all_mappings(session):
"""Create a mapping between OWTF plugins code and OWTF plugins description.
:return: Mapping dictionary {code: [mapped_code, mapped_description], code2: [mapped_code, mapped_description], ...}
:rtype: dict
"""
mapping_objs = session.query(models.Mapping).all()
return {mapping['owtf_code']: mapping['mappings'] for mapping in derive_mapping_dicts(mapping_objs)}
[docs]def get_mappings(session, mapping_type):
"""Fetches mappings from DB based on mapping type
:param mapping_type: Mapping type like OWTF, OWASP (v3, v4, Top 10), NIST, CWE
:type mapping_type: `str`
:return: Mappings
:rtype: `dict`
"""
if mapping_type in mapping_types:
mapping_objs = session.query(models.Mapping).all()
mappings = {}
for mapping_dict in derive_mapping_dicts(mapping_objs):
if mapping_dict["mappings"].get(mapping_type, None):
mappings[mapping_dict["owtf_code"]] = mapping_dict["mappings"][mapping_type]
return mappings
else:
raise InvalidMappingReference("InvalidMappingReference %s requested" % mapping_type)
[docs]def get_mapping_category(session, plugin_code):
"""Get the categories for a plugin code
:param plugin_code: The code for the specific plugin
:type plugin_code: `int`
:return: category for the plugin code
:rtype: `str`
"""
category = session.query(models.Mapping.category).get(plugin_code)
# Getting the corresponding category back from db
return category
[docs]def load_mappings(session, default, fallback):
"""Loads the mappings from the config file
.. note::
This needs to be a list instead of a dictionary to preserve order in python < 2.7
:param session: SQLAlchemy database session
:type session: `object`
:param default: The fallback path to config file
:type default: `str`
:param fallback: The path to config file
:type fallback: `str`
:return: None
:rtype: None
"""
config_dump = load_config_file(default, fallback)
for owtf_code, mappings in iteritems(config_dump):
category = None
if 'category' in mappings:
category = mappings.pop('category')
session.merge(models.Mapping(owtf_code=owtf_code, mappings=json.dumps(mappings), category=category))
session.commit()