Created
May 6, 2023 19:01
-
-
Save benpturner/697ee6009db10c5fbcdd07f0c420edfd to your computer and use it in GitHub Desktop.
PoshC2 Simple Flask API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Install | |
# ======== | |
# cd /opt/PoshC2/ | |
# pipenv install pefile | |
# pipenv install flask | |
# pipenv install flask-httpauth | |
# pipenv install pysqlite3 | |
# Run | |
# === | |
# cd /opt/PoshC2/ | |
# pipenv run python3 poshc2_api.py | |
import re | |
import sys | |
import os | |
from flask import Flask, request, jsonify, make_response, send_from_directory, render_template | |
from flask_httpauth import HTTPBasicAuth | |
from poshc2 import Colours | |
from poshc2.server.Core import decrypt | |
from poshc2.server.database.Helpers import select_first | |
from poshc2.server.database.Model import C2Server | |
from poshc2.server.Config import DownloadsDirectory | |
from poshc2.server.database.Helpers import get_alive_implants, insert_object | |
from poshc2.server.database.Model import Implant | |
app = Flask(__name__) | |
auth = HTTPBasicAuth() | |
# Define your API users and passwords | |
API_USERS = { | |
"user": "pass", | |
} | |
@auth.verify_password | |
def verify_password(username, password): | |
if username in API_USERS and API_USERS[username] == password: | |
return username | |
@auth.error_handler | |
def unauthorized(): | |
return make_response(jsonify({'error': 'Unauthorized access'}), 401) | |
@app.route('/api/implants', methods=['GET']) | |
@auth.login_required | |
def get_implants(): | |
implants = get_alive_implants() | |
implants_data = [] | |
for implant in implants: | |
implant_attributes = implant.__repr__() | |
#implants_data.append(implant_attributes) | |
implants_data.append({ | |
'numeric_id': str(implant.numeric_id), | |
'id': str(implant.id), | |
'url_id': str(implant.url_id), | |
'user': implant.user, | |
'hostname': implant.hostname, | |
'ip_address': implant.ip_address, | |
'encryption_key': implant.encryption_key, | |
'first_seen': implant.first_seen, | |
'last_seen': implant.last_seen, | |
'process_id': str(implant.process_id), | |
'process_name': implant.process_name, | |
'architecture': implant.architecture, | |
'domain': implant.domain, | |
'alive': implant.alive, | |
'sleep': implant.sleep, | |
'loaded_modules': implant.loaded_modules, | |
'type': implant.type, | |
'label': implant.label | |
}) | |
return jsonify(implants_data) | |
@app.route('/api/tasks', methods=['POST']) | |
@auth.login_required | |
def add_task(): | |
implant_id = request.json['implant_id'] | |
task = request.json['task'] | |
task_id = insert_object("Tasks", ImplantID=implant_id, Task=task) | |
return jsonify({"task_id": task_id}), 201 | |
# Set the directory you want to serve | |
DOWNLOADS_DIR = os.path.dirname(DownloadsDirectory) | |
@app.route('/api/list', methods=['GET']) | |
def list_files(): | |
files = os.listdir(DOWNLOADS_DIR) | |
images = [f for f in files] | |
return render_template('thumbnails.html', images=images) | |
@app.route('/api/files/<path:filename>', methods=['GET']) | |
def serve_file(filename): | |
return send_from_directory(DOWNLOADS_DIR, filename) | |
if __name__ == '__main__': | |
app.run(debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment