Last active
May 14, 2017 09:59
-
-
Save steinfletcher/30994cbae0a7d1487973d6b1c8619c20 to your computer and use it in GitHub Desktop.
Clone all repositories owned by a GitHub organisation team
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import optparse | |
import os | |
import urllib2 | |
from subprocess import call | |
from threading import Thread | |
"""Script to clone all GitHub repositories owned by an organisation team | |
Example: | |
$ export GITHUB_USERNAME=<your github user> | |
$ export GITHUB_API_KEY=<your github api key> | |
$ python git_clone.py --org=MyOrg --team=MyTeam [--dir=~/projects/piano] | |
""" | |
def main(): | |
parser = optparse.OptionParser('usage %prog --directory <folder to clone into> -o <github org> -t <github team>') | |
parser.add_option('-d', '--dir', '--directory', dest='directory', type='string', | |
default='src', help='Target folder, default=src') | |
parser.add_option('-o', '--org', dest='org', type='string', help='GitHub organisation') | |
parser.add_option('-t', '--team', dest='team', type='string', help='GitHub team') | |
(options, args) = parser.parse_args() | |
directory = options.directory | |
org = options.org | |
team = options.team | |
if (org is None) | (team is None): | |
parser.print_help() | |
exit(1) | |
clone_repos(directory, org, team) | |
def clone_repo(ssh_url, path): | |
call(["git", "clone", ssh_url, path]) | |
def clone_repos(directory, org, team): | |
"""Clones all repos that the given organisation team manages""" | |
repos = fetch_team_repos(org_name=org, team_name=team) | |
clone_jobs = [] | |
for repo in repos: | |
base_dir = os.path.expanduser('{}/'.format(directory) if directory else '') | |
thread = Thread(target=clone_repo, args=(repo['ssh_url'], '{}{}'.format(base_dir, repo['name']))) | |
clone_jobs.append(thread) | |
[job.start() for job in clone_jobs] | |
[job.join() for job in clone_jobs] | |
def fetch_team_repos(org_name, team_name): | |
"""Fetches the repos for a given team and organization""" | |
teams = _request_json('https://api.github.com/orgs/{}/teams'.format(org_name)) | |
team_with_name = map(lambda x: x['id'], filter(lambda x: x['name'] == team_name, teams)) | |
team_id = team_with_name[0] if team_with_name else None | |
if team_id: | |
repos = [] | |
page = 1 | |
while True: | |
response = _request('https://api.github.com/teams/{}/repos?page={}'.format(team_id, page)) | |
repos += json.load(response) | |
if has_next_page(response.headers.get('link')): | |
page += 1 | |
else: | |
break | |
return repos | |
else: | |
return [] | |
def has_next_page(link_header): | |
page_links = link_header.split(",") | |
return any('rel="next"' in link for link in page_links) | |
def _request_json(url): | |
response = _request(url) | |
return json.load(response) | |
def _request(url): | |
username, password = _credentials() | |
basic_auth = base64.b64encode(b'{}:{}'.format(username, password)) | |
req = urllib2.Request(url) | |
req.add_header("Authorization", "Basic %s" % basic_auth) | |
return urllib2.urlopen(req) | |
def _credentials(): | |
return _env('GITHUB_USERNAME'), _env('GITHUB_API_KEY') | |
def _env(name): | |
env_var = os.environ.get(name) | |
if env_var is None: | |
raise Exception('%s is not defined' % name) | |
return env_var | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment