Skip to content

Instantly share code, notes, and snippets.

@vslala
Created June 13, 2021 08:06
Show Gist options
  • Save vslala/937683cbcfadd743a587c82008be70f2 to your computer and use it in GitHub Desktop.
Save vslala/937683cbcfadd743a587c82008be70f2 to your computer and use it in GitHub Desktop.
Add/Delete Webhooks in Bitbucket Server Using REST API 1.0
import os
import requests
import json
from app.report import Report
BITBUCKET_BASE_URL = os.environ['BITBUCKET_URL'] # bitbucket base url
ACCESS_TOKEN = os.environ['ACCESS_TOKEN'] # get this token from bitbucket account
JENKINS_BASE_URL = os.environ['JENKINS_URL'] # host url for Jenkins
headers = {
'Authorization': "Bearer {access_token}".format(access_token=ACCESS_TOKEN),
'Content-Type': 'application/json'
}
hooks_endpoint = "/rest/api/1.0/projects/{projectKey}/repos/{repositorySlug}/webhooks"
class BitBucketApi:
def all_repos(self, project, limit=1000):
url = BITBUCKET_BASE_URL + "/rest/api/1.0/projects/{projectKey}/repos?limit={limit}".format(projectKey=project, limit=limit)
response = requests.get(url, headers=headers)
return response.json()
def all_webhooks(self, project, repo):
url = BITBUCKET_BASE_URL + hooks_endpoint.format(projectKey=project, repositorySlug=repo)
response = requests.get(url, headers=headers)
return response.json()
def create_webhook(self, project, repo, payload):
url = BITBUCKET_BASE_URL + hooks_endpoint.format(projectKey=project, repositorySlug=repo)
response = requests.post(
url=url,
headers=headers,
data=json.dumps(payload)
)
return response.json()
def delete_webhook(self, project, repo, webhook_id):
url = BITBUCKET_BASE_URL + "/rest/api/1.0/projects/{projectKey}/repos/{repositorySlug}/webhooks/{webhookId}"\
.format(projectKey=project, repositorySlug=repo, webhookId=webhook_id)
response = requests.delete(url, headers=headers)
print(response)
# Create WebHook
webhook_create_payload = {
"name": "Test Webhook",
"events": [
"repo:refs_changed"
],
"url": "http://example.com",
"active": "true"
}
def createWebhooks():
bb = BitBucketApi()
response = bb.all_repos("SIPP")
notify_url = 'https://{jenkinsBaseUrl}/git/notifyCommit?url={repoUrl}'.format(jenkinsBaseUrl=JENKINS_BASE_URL)
for repo in response['values']:
repo_slug = repo['slug']
repo_url = ''
for clone_option in repo['links']['clone']:
if 'ssh' == clone_option['name']:
repo_url = clone_option['href']
break
repo_project = repo['project']['key']
payload = {
'name': repo_slug + "-webhook",
'events': [
"repo:refs_changed"
],
'url': notify_url.format(repoUrl=repo_url),
'active': True
}
response = bb.create_webhook(repo_project, repo_slug, payload)
print("webhook created for repo " + repo['slug'])
def deleteWebhooks():
project_key = "SIPP"
print("Deleting webhooks...")
bb = BitBucketApi()
all_repos = bb.all_repos(project_key)['values']
for repo in all_repos:
all_hooks = bb.all_webhooks(project_key, repo['slug'])['values']
for hook in all_hooks:
print(hook)
bb.delete_webhook(project_key, repo['slug'], hook['id'])
print("deleted hook with id: " + str(hook['id']) + " in repo " + repo['slug'])
# deleteWebhooks()
createWebhooks()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment