Source code for owtf.managers.plugin

"""
owtf.managers.plugin
~~~~~~~~~~~~~~~~~~~~
This module manages the plugins and their dependencies
"""
import imp
import json
import os

from owtf.models.plugin import Plugin
from owtf.models.test_group import TestGroup
from owtf.settings import PLUGINS_DIR
from owtf.utils.error import abort_framework
from owtf.utils.file import FileOperations

TEST_GROUPS = ["web", "network", "auxiliary"]


[docs] def get_test_groups_config(file_path): """Reads the test groups from a config file .. note:: This needs to be a list instead of a dictionary to preserve order in python < 2.7 :param file_path: The path to the config file :type file_path: `str` :return: List of test groups :rtype: `list` """ test_groups = [] config_file = FileOperations.open(file_path, "r").read().splitlines() for line in config_file: if "#" == line[0]: continue # Skip comments try: code, priority, descrip, hint, url = line.strip().split(" | ") except ValueError: abort_framework( "Problem in Test Groups file: '{!s}' -> Cannot parse line: {!s}".format( file_path, line ) ) if len(descrip) < 2: descrip = hint if len(hint) < 2: hint = "" test_groups.append( { "code": code, "priority": priority, "descrip": descrip, "hint": hint, "url": url, } ) return test_groups
[docs] def load_test_groups(session, file_default, file_fallback, plugin_group): """Load test groups into the DB. :param test_groups_file: The path to the test groups config :type test_groups_file: `str` :param plugin_group: Plugin group to load :type plugin_group: `str` :return: None :rtype: None """ file_path = file_default if not os.path.isfile(file_default): file_path = file_fallback test_groups = get_test_groups_config(file_path) for group in test_groups: session.merge( TestGroup( code=group["code"], priority=group["priority"], descrip=group["descrip"], hint=group["hint"], url=group["url"], group=plugin_group, ) ) session.commit()
[docs] def load_plugins(session): """Loads the plugins from the filesystem and updates their info. .. note:: Walks through each sub-directory of `PLUGINS_DIR`. For each file, loads it thanks to the imp module. Updates the database with the information for each plugin: + 'title': the title of the plugin + 'name': the name of the plugin + 'code': the internal code of the plugin + 'group': the group of the plugin (ex: web) + 'type': the type of the plugin (ex: active, passive, ...) + 'descrip': the description of the plugin + 'file': the filename of the plugin + 'internet_res': does the plugin use internet resources? :return: None :rtype: None """ # TODO: When the -t, -e or -o is given to OWTF command line, only load # the specific plugins (and not all of them like below). # Retrieve the list of the plugins (sorted) from the directory given by # 'PLUGIN_DIR'. plugins = [] for root, _, files in os.walk(PLUGINS_DIR): plugins.extend( [ os.path.join(root, filename) for filename in files if filename.endswith("py") ] ) plugins = sorted(plugins) # Retrieve the information of the plugin. for plugin_path in plugins: # Only keep the relative path to the plugin plugin = plugin_path.replace(PLUGINS_DIR, "") # TODO: Using os.path.sep might not be portable especially on # Windows platform since it allows '/' and '\' in the path. # Retrieve the group, the type and the file of the plugin. # Ensure all empty strings are removed from the list chunks = list(filter(None, plugin.split(os.path.sep))) # TODO: Ensure that the variables group, type and file exist when # the length of chunks is less than 3. if len(chunks) == 3: group, type, file = chunks # Retrieve the internal name and code of the plugin. name, code = os.path.splitext(file)[0].split("@") # Only load the plugin if in XXX_TEST_GROUPS configuration (e.g. web_testgroups.cfg) if session.query(TestGroup).get(code) is None: continue # Load the plugin as a module. filename, pathname, desc = imp.find_module( os.path.splitext(os.path.basename(plugin_path))[0], [os.path.dirname(plugin_path)], ) plugin_module = imp.load_module( os.path.splitext(file)[0], filename, pathname, desc ) # Try te retrieve the `attr` dictionary from the module and convert # it to json in order to save it into the database. attr = None try: attr = json.dumps(plugin_module.ATTR) except AttributeError: # The plugin didn't define an attr dict. pass # Save the plugin into the database. session.merge( Plugin( key="{!s}@{!s}".format(type, code), group=group, type=type, title=name.title().replace("_", " "), name=name, code=code, file=file, descrip=plugin_module.DESCRIPTION, attr=attr, ) ) session.commit()
[docs] def get_types_for_plugin_group(session, plugin_group): """Get available plugin types for a plugin group :param plugin_group: Plugin group :type plugin_group: `str` :return: List of available plugin types :rtype: `list` """ plugin_types = session.query(Plugin.type).filter_by( group=plugin_group ).distinct().all() plugin_types = [i[0] for i in plugin_types] return plugin_types
[docs] def plugin_gen_query(session, criteria): """Generate a SQLAlchemy query based on the filter criteria :param criteria: Filter criteria :type criteria: `dict` :return: :rtype: """ query = session.query(Plugin).join(TestGroup) if criteria.get("type", None): if isinstance(criteria["type"], str): query = query.filter(Plugin.type == criteria["type"]) if isinstance(criteria["type"], list): query = query.filter(Plugin.type.in_(criteria["type"])) if criteria.get("group", None): if isinstance(criteria["group"], str): query = query.filter(Plugin.group == criteria["group"]) if isinstance(criteria["group"], list): query = query.filter(Plugin.group.in_(criteria["group"])) if criteria.get("code", None): if isinstance(criteria["code"], str): query = query.filter(Plugin.code == criteria["code"]) if isinstance(criteria["code"], list): query = query.filter(Plugin.code.in_(criteria["code"])) if criteria.get("name", None): if isinstance(criteria["name"], str): query = query.filter(Plugin.name == criteria["name"]) if isinstance(criteria["name"], list): query = query.filter(Plugin.name.in_(criteria["name"])) return query.order_by(TestGroup.priority.desc())
[docs] def get_all_plugin_dicts(session, criteria=None): """Get plugin dicts based on filter criteria :param criteria: Filter criteria :type criteria: `dict` :return: List of plugin dicts :rtype: `list` """ if criteria is None: criteria = {} if "code" in criteria: criteria["code"] = Plugin.name_to_code(session, criteria["code"]) query = plugin_gen_query(session, criteria) plugin_obj_list = query.all() plugin_dicts = [] for obj in plugin_obj_list: plugin_dicts.append(obj.to_dict()) return plugin_dicts
[docs] def get_plugins_by_type(session, plugin_type): """Get plugins based on type argument :param plugin_type: Plugin type :type plugin_type: `str` :return: List of plugin dicts :rtype: `list` """ return get_all_plugin_dicts(session, {"type": plugin_type})
[docs] def get_plugins_by_group(session, plugin_group): """Get plugins by plugin group :param plugin_group: Plugin group :type plugin_group: `str` :return: List of plugin dicts :rtype: `list` """ return get_all_plugin_dicts(session, {"group": plugin_group})
[docs] def get_plugins_by_group_type(session, plugin_group, plugin_type): """Get plugins by group and plugin type :param plugin_group: Plugin group :type plugin_group: `str` :param plugin_type: plugin type :type plugin_type: `str` :return: List of plugin dicts :rtype: `list` """ return get_all_plugin_dicts(session, {"type": plugin_type, "group": plugin_group})