Last active
October 30, 2019 02:12
-
-
Save Nani-o/48570fd5e2bda3e3b7869d19fb918ae1 to your computer and use it in GitHub Desktop.
Quick and dirty python script i'm using for backuping my Github repositories
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import os | |
| import sys | |
| import yaml | |
| from github import Github, Gist | |
| from github.GithubException import GithubException | |
| from datetime import datetime | |
| from termcolor import colored | |
| from subprocess import Popen, PIPE, STDOUT | |
| from threading import Thread, Semaphore | |
| config_example = """ | |
| example : | |
| --- | |
| github_token: XXXXXXXXXXXXXXXXXXXXXX | |
| dest_folder: /path/to/backup/or/workspace/ | |
| keywords: | |
| - python- | |
| - ansible-role- | |
| ... | |
| """ | |
| screen_lock = Semaphore(value=1) | |
| mkdir_lock = Semaphore(value=1) | |
| class GitCloner(Thread): | |
| def __init__(self, repo, dest_folder, keywords): | |
| Thread.__init__(self) | |
| self.repo = repo | |
| if isinstance(repo, Gist.Gist): | |
| self.isgist = True | |
| else: | |
| self.isgist = False | |
| if self.isgist: | |
| self.repo_name = next(iter(self.repo.files.keys())) | |
| generic_url = repo.git_pull_url.split("://")[1] | |
| self.ssh_url = "git@{}".format(generic_url.replace("/", ":", 1)) | |
| else: | |
| self.repo_name = self.repo.name | |
| self.ssh_url = self.repo.ssh_url | |
| self.repo_path = self.get_path(self.repo, dest_folder, keywords) | |
| self.parent_repo_path = os.path.abspath(os.path.join(self.repo_path, os.pardir)) | |
| def get_path(self, repo, dest_folder, keywords): | |
| if self.isgist: | |
| name = next(iter(repo.files.keys())) | |
| return os.path.join(dest_folder, 'gists', name) | |
| if repo.fork: | |
| return os.path.join(dest_folder, 'forks', repo.name) | |
| if repo.private: | |
| return os.path.join(dest_folder, 'privates', repo.name) | |
| for keyword in keywords: | |
| if repo.name.startswith(keyword): | |
| git_folder = keyword.replace('-', '/') | |
| local_repo_name = repo.name[len(keyword):] | |
| return os.path.join(dest_folder, git_folder, local_repo_name) | |
| return os.path.join(dest_folder, "others", repo.name) | |
| def is_repo_empty(self): | |
| if self.isgist: | |
| return False | |
| try: | |
| self.repo.get_commits().totalCount | |
| return False | |
| except GithubException as e: | |
| status = e.status | |
| return True | |
| def execute_command(self, command): | |
| proc = Popen(command, | |
| stdout=PIPE, | |
| stderr=STDOUT, | |
| universal_newlines=True) | |
| exit_code = proc.wait() | |
| result = "" | |
| for line in proc.stdout: | |
| result += line | |
| return exit_code, result | |
| def logger(self, action, repo, message): | |
| now = datetime.now().strftime('%d/%b/%Y:%H:%M:%S') | |
| print('[{}] [{}] [{}] {}'.format(now, repo, action, message)) | |
| def run(self): | |
| mkdir_lock.acquire() | |
| if not os.path.exists(self.parent_repo_path): | |
| os.makedirs(self.parent_repo_path) | |
| mkdir_lock.release() | |
| if os.path.exists(self.repo_path): | |
| if self.is_repo_empty(): | |
| action = "EMPTY" | |
| else: | |
| command = [ | |
| "git", "-C", self.repo_path, | |
| "-c", "rebase.autoStash=true", | |
| "pull", "--rebase" | |
| ] | |
| action = "PULL" | |
| else: | |
| command = ["git", "clone", self.ssh_url, self.repo_path] | |
| action = "CLONE" | |
| if action != 'EMPTY': | |
| rc, out = self.execute_command(command) | |
| else: | |
| rc = 0 | |
| screen_lock.acquire() | |
| if rc != 0: | |
| self.logger(colored(action, "red"), self.repo_name, self.repo_path) | |
| self.logger(colored("ERROR", "red"), | |
| self.repo_name, | |
| "Failure, here the error message :\n{}".format(out)) | |
| else: | |
| self.logger(colored(action, "green"), self.repo_name, self.repo_path) | |
| screen_lock.release() | |
| filename, extension = os.path.splitext(sys.argv[0]) | |
| config_file = filename + '.yml' | |
| def load_config(config_file): | |
| if os.path.exists(config_file): | |
| with open(config_file, 'r') as f: | |
| config = yaml.load(f) | |
| return config | |
| else: | |
| print("You must create a config file " + config_file) | |
| print(config_example) | |
| sys.exit(1) | |
| config = load_config(config_file) | |
| github_token = config['github_token'] | |
| keywords = config['keywords'] | |
| dest_folder = config['dest_folder'] | |
| gh = Github(github_token) | |
| thread_list = [] | |
| for repo in gh.get_user().get_repos(): | |
| thread_list.append(GitCloner(repo, dest_folder, keywords)) | |
| for gist in gh.get_user().get_gists(): | |
| thread_list.append(GitCloner(gist, dest_folder, keywords)) | |
| for thread in thread_list: | |
| thread.start() | |
| for thread in thread_list: | |
| thread.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment