Skip to content

Instantly share code, notes, and snippets.

@SemenMartynov
Created April 4, 2024 19:13
Show Gist options
  • Save SemenMartynov/26ac99fa8fbd1ab0dc53ebebae1b0661 to your computer and use it in GitHub Desktop.
Save SemenMartynov/26ac99fa8fbd1ab0dc53ebebae1b0661 to your computer and use it in GitHub Desktop.
Sometimes a situation arises when a huge number of logs from the workers accumulate on the GitLab server (dozens of gigabytes in my case). And it’s impossible to delete them manually, because the pipelines are not displayed in the WebUI. In this case, you need to go through all the jobs, get the pipeline ID from them, and use it for cleaning.
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from datetime import datetime
import requests
import json
# Config
MAX_THREADS = 24
CUTOFF_DATE = datetime.fromisoformat("2024-02-01T00:00:00.000+03:00")
GITLAB_TOKEN = "glpat-XXXZZZXXXZZZn_XXXZZZ"
# Static
GITLAB_URL = "https://my.gitlab.com/api/v4/projects"
GITLAB_HEADERS = {
"Content-Type": "application/json;charset=UTF-8",
"PRIVATE-TOKEN": GITLAB_TOKEN,
}
GITLAB_PAGINATION = {"pagination": "keyset", "per_page": "100", "order_by": "id"}
# Functions
def get_all_repos():
# Send the first request to get the list of projects
response = requests.get(
url=GITLAB_URL, headers=GITLAB_HEADERS, params=GITLAB_PAGINATION
)
# Get all repositories
repositories = response.json()
if response.status_code == 401:
raise requests.HTTPError("Invalid token")
# print(f"{json.dumps(dict(response.headers), indent=4)}")
next_url = response.headers.get('Link')
while next_url is not None:
print(f"Gathering list of projects...")
next_url = next_url.removeprefix("<").removesuffix('>; rel="next"')
response = requests.get(url=next_url, headers=GITLAB_HEADERS)
repositories.extend(response.json())
next_url = response.headers.get("Link")
return repositories
def get_all_jobs(repo_id: int):
jobs_url = f"{GITLAB_URL}/{repo_id}/jobs"
# Get the first page of jobs for the repository
response = requests.get(
url=jobs_url, headers=GITLAB_HEADERS, params=GITLAB_PAGINATION
)
if response.status_code == 403:
return {}
# Get all jobs
jobs = response.json()
# print(f"{json.dumps(dict(response.headers), indent=4)}")
next_url = response.headers.get("Link")
while next_url is not None:
print(f" Retrieving additional data...")
next_url = next_url.removeprefix("<").removesuffix('>; rel="next"')
response = requests.get(url=next_url, headers=GITLAB_HEADERS)
jobs.extend(response.json())
next_url = response.headers.get("Link")
return jobs
def delete_job_with_pipeline(job, repo_id: int):
"""
https://docs.gitlab.com/ee/api/pipelines.html#delete-a-pipeline
"""
try:
pipeline_url = f'{GITLAB_URL}/{repo_id}/pipelines/{job["pipeline"]["id"]}'
print(f" Deleting {pipeline_url}")
x = requests.delete(url=pipeline_url, headers=GITLAB_HEADERS)
# print(f"{x.status_code}")
# print(f"{json.dumps(dict(x.headers), indent=4)}")
# print(f"{json.dumps(dict(x.text), indent=4)}")
except Exception as e:
print(f"Unable to get url {pipeline_url} due to {e.__class__}.")
# def erase_job(job, repo_id: int):
# """
# https://docs.gitlab.com/ee/api/jobs.html#erase-a-job
# """
# try:
# job_url = f'{GITLAB_URL}/{repo_id}/jobs/{job["id"]}/erase'
# x = requests.post(url=job_url, headers=GITLAB_HEADERS)
# print(f"{json.dumps(dict(x.headers), indent=4)}")
# except Exception as e:
# print(f'Unable to get url {job_url} due to {e.__class__}.')
# def delete_artifact(job, repo_id: int):
# """
# https://docs.gitlab.com/ee/api/job_artifacts.html#delete-job-artifacts
# """
# try:
# artifact_url = f'{GITLAB_URL}/{repo_id}/jobs/{job["id"]}/artifacts'
# x = requests.delete(url=artifact_url, headers=GITLAB_HEADERS)
# print(f"{json.dumps(dict(x.headers), indent=4)}")
# except Exception as e:
# print(f'Unable to get url {artifact_url} due to {e.__class__}.')
if __name__ == "__main__":
repos = get_all_repos()
for repo in repos:
print(f'{repo["name_with_namespace"]} ({repo["path_with_namespace"]}) get jobs...')
all_jobs = get_all_jobs(repo["id"])
if len(all_jobs) == 0:
print(f'{repo["name_with_namespace"]} ({repo["path_with_namespace"]}) has no jobs.')
continue
# we can collect all the jobs first, OR
# we can process the repository one by one
# Skip fresh jobs
outdated_jobs = [
job
for job in all_jobs
if datetime.fromisoformat(job["created_at"]) < CUTOFF_DATE
]
outdated_jobs_n = len(outdated_jobs)
if outdated_jobs_n == 0:
print(f'{repo["name_with_namespace"]} ({repo["path_with_namespace"]}) has no outdated jobs.')
continue
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(partial(delete_job_with_pipeline, repo_id=repo["id"]), outdated_jobs)
print(f'{repo["name_with_namespace"]} ({repo["path_with_namespace"]}) had {outdated_jobs_n} outdated jobs.')
print("All done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment