Last active
July 27, 2021 17:40
-
-
Save cloudnull/96ceaa3f2f13db534083c69e099a420c to your computer and use it in GitHub Desktop.
quick and dirty tool to scan an ansible archive and return a complete list of modules used throughout the archive.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import collections | |
import json | |
import os | |
import subprocess | |
import yaml | |
from yaml import scanner | |
MODULES = collections.defaultdict(int) | |
def parse_opts(): | |
"""Parse options.""" | |
parser = argparse.ArgumentParser( | |
description="Scan an ansible archive and return all used modules." | |
) | |
parser.add_argument( | |
"path", help="ansible archive to scan" | |
) | |
return parser.parse_args() | |
def parse_content(content, core_plugins): | |
for item in content: | |
if isinstance(item, (dict, list)) and 'hosts' in item: | |
for task_key in ["pre_tasks", "tasks", "post_tasks"]: | |
task_values = item.get(task_key) | |
if task_values: | |
for task_value in task_values: | |
parse_task(task=task_value, core_plugins=core_plugins) | |
else: | |
parse_task(task=item, core_plugins=core_plugins) | |
def parse_task(task, core_plugins): | |
"""Loop through all items in a task and check if it is a core plugin.""" | |
if isinstance(task, dict): | |
for k in task.keys(): | |
if k == "block": | |
for b in task[k]: | |
parse_task(task=b, core_plugins=core_plugins) | |
if k in core_plugins: | |
MODULES[k] += 1 | |
def yield_yaml_files(path): | |
"""Walk a given path and yield the full path. | |
yield: String - full file path. | |
""" | |
for root, _, files in os.walk(path): | |
for f in files: | |
file_path = os.path.join(root, f) | |
if f.endswith(('yml', 'yaml')) and os.path.exists(file_path): | |
yield os.path.abspath(os.path.expanduser(file_path)) | |
def find_content(content, ansible_core_plugins): | |
if isinstance(content, list): | |
parse_content(content=content, core_plugins=ansible_core_plugins) | |
elif isinstance(content, dict): | |
for value in content.values(): | |
find_content(content=value, ansible_core_plugins=ansible_core_plugins) | |
def main(): | |
"""Scan an Ansible archive and index all module usage.""" | |
# Uses built-in ansible commands to get the core module information. | |
outs = subprocess.check_output("ansible-doc --list --json", shell=True) | |
ansible_core_plugins = json.loads(outs) | |
opts = parse_opts() | |
for file_path in yield_yaml_files(path=opts.path): | |
with open(file_path) as y: | |
try: | |
content = yaml.safe_load(y) | |
find_content(content=content, ansible_core_plugins=ansible_core_plugins) | |
except scanner.ScannerError as exp: | |
print("skipped {} due to {}".format(file_path, str(exp))) | |
print(yaml.safe_dump(dict(MODULES), default_flow_style=False)) | |
print("Total module core calls: {}".format(sum(MODULES.values()))) | |
print("Total unique core modules: {}".format(len(MODULES.keys()))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment