Recursively git clone (or git pull if already exist) Gitlab projects
The snippet can be accessed without any authentication.
Authored by
Alexandre FEBLOT
Usual usage (using pip):
$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install requests
$ export GITLAB_TOKEN=<your token created with read_api, write_repository scopes>
$ ./cloner.py docs json-schemas it-operations-public ...
Usual usage (using uv):
$ export GITLAB_TOKEN=<your token created with read_api, write_repository scopes>
$ uv run cloner.py docs json-schemas it-operations-public ...
Will:
- create or update directories docs, json-schemas, it-operations-public, and their expected contents
- list all local repositories which don't exist anymore in Gitlab and propose to delete them locally
cloner.py 6.92 KiB
#!/usr/bin/env python
# Recursively git clone (or git pull if already exist) Gitlab projects
# Usual usage (using pip):
# $ python3 -m venv .venv
# $ source .venv/bin/activate
# $ pip install requests
# $ export GITLAB_TOKEN=<your token created with read_api, write_repository scopes>
# $ ./cloner.py docs json-schemas it-operations-public ...
#
# Usual usage (using uv):
# $ export GITLAB_TOKEN=<your token created with read_api, write_repository scopes>
# $ uv run cloner.py docs json-schemas it-operations-public ...
#
# Will:
# - create or update directories docs, json-schemas, it-operations-public, and their expected contents
# - list all local repositories which don't exist anymore in Gitlab and propose to delete them locally
# /// script
# dependencies = [
# "requests<3",
# ]
# ///
import sys
import multiprocessing
import subprocess, shlex
import argparse
import os
import logging
import threading
from concurrent.futures import ProcessPoolExecutor
import requests
import urllib3
import re
skipDirsRegExp = [
'^docs/operations-develop/.*',
]
RED = '\033[1;31m'
YELLOW = '\033[1;33m'
PURPLE = '\033[1;35m'
CYAN = '\033[1;36m'
NC = '\033[0m'
urllib3.disable_warnings()
class LogPipe(threading.Thread):
def __init__(self, level, tag):
threading.Thread.__init__(self)
self.daemon = False
self.level = level
self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead)
self.tag = tag
self.start()
def fileno(self):
return self.fdWrite
def run(self):
color = {
logging.NOTSET: NC,
logging.DEBUG: NC,
logging.INFO: NC,
logging.WARNING: YELLOW,
logging.ERROR: RED,
logging.CRITICAL: RED,
}[self.level]
for line in iter(self.pipeReader.readline, ''):
logging.log(self.level, f"{color}[{self.tag}] {line.strip()}{NC}")
self.pipeReader.close()
def close(self):
os.close(self.fdWrite)
# def flush(self):
# pass
def getGroups(gitlab_url, token):
return requests.get(f"{gitlab_url}/api/v4/groups/?private_token={token}&per_page=100&top_level_only=true").json()
def main():
logging.basicConfig(level="INFO", format= '[%(asctime)s] [%(levelname)s] %(message)s', datefmt='%H:%M:%S')
parser = argparse.ArgumentParser(description='Required args for recursive clone')
parser.add_argument('--branch', '-b', metavar='branch', help='Branch to clone in all repos (default: the projects default_branch)')
parser.add_argument('--ssh', help='Clone via ssh instead of http', action='store_true')
parser.add_argument('--gitlab-url', metavar='gitlab', default=os.environ.get('GITLAB_URL', "https://gitlab.forterro.com"), help='Gitlab address (default: %(default)s)')
parser.add_argument('--token', '-t', metavar='token', default=os.environ.get('GITLAB_TOKEN'), help='Gitlab Token with read_api and write_repository scopes (default: $GITLAB_TOKEN)')
parser.add_argument('groups', metavar='group_names', nargs='*', help='Top level group names')
args = parser.parse_args()
if not args.token:
sys.exit('The gitlab token is not defined. Abort')
if not args.groups:
print('Group names:')
print('\n'.join(sorted([f' - {group["full_path"]}' for group in getGroups(args.gitlab_url, args.token)])))
sys.exit(0)
clone(**args.__dict__)
def worker(command, message, path):
logging.info(f'[{path}] {message}')
try:
lpipe = LogPipe(logging.INFO, path)
epipe = LogPipe(logging.ERROR, path)
with subprocess.Popen(command, stdout=lpipe, stderr=epipe):
lpipe.close() # prevent deadlocks
epipe.close() # prevent deadlocks
logging.info(f'[{path}] DONE')
except Exception as e:
logging.error(f"Error on {command}: {e}")
def clone(groups, branch, token, gitlab_url, ssh):
group_name_to_id = {group['full_path'] : group['id'] for group in getGroups(gitlab_url, token)}
groups = ','.join(groups).split(',') # Also support -g group1,group2,...
try:
group_ids = [group_name_to_id[group] for group in groups]
except KeyError as e:
sys.exit(f"Can't find group id for group {e}")
updatedPaths = []
with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as pool:
for group_id in group_ids:
total_pages = 1
page = 0
while page < total_pages:
page += 1
response = requests.get(f"{gitlab_url}/api/v4/groups/{group_id}/projects?private_token={token}&include_subgroups=True&per_page=100&page={page}&with_shared=False")
for project in response.json():
path = project['path_with_namespace']
updatedPaths.append(path)
for regexp in skipDirsRegExp:
if re.match(regexp, path):
logging.info(f"[{path}] SKIP")
break
else:
current_branch = project['default_branch'] if branch is None else branch
url_to_repo = project[f'ssh_url_to_repo'] if ssh else project[f'http_url_to_repo']
if not os.path.exists(path):
pool.submit(worker,
shlex.split(f"git clone --quiet --branch {current_branch} {url_to_repo} {path}"),
f"{PURPLE}Clone {current_branch}{NC}",
path
)
else:
pool.submit(worker,
shlex.split(f"git -C {path} pull --quiet"),
f"{CYAN}Pull{NC}",
path
)
total_pages = int(response.headers['X-Total-Pages'])
# Search for existing git repos which don't exist anymore in gitlab, and remove them
toRemove = []
for group in groups:
for root, dirs, files in os.walk(group):
for name in dirs:
path = os.path.join(root, name)
if name in ['.git']:
dirs.clear()
if os.path.exists(os.path.join(path, ".git")) and path not in updatedPaths:
toRemove.append(path)
if toRemove:
print()
print(f"{YELLOW}The following local repositories don't exist anymore in Gitlab:{NC}")
print(' - ' + '\n - '.join(toRemove))
if input(f"{YELLOW}Do you want to remove them? (y/N)?{NC} ").lower() == 'y':
with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as pool:
for path in toRemove:
pool.submit(worker,
shlex.split(f"rm -rf {path}"),
f"{YELLOW}Remove{NC}",
path
)
if __name__ == '__main__':
main()
Please register or sign in to comment