In order to be able to query the Gitlab Group API we need to be
authenticated. CI_JOB_TOKEN for public jobs has a non-meaningfull
value which does not actually authenticate the Runner to the
parent 02b57af2
......@@ -40,29 +40,36 @@ def preserve_ci_vars(func):
"""Preserve the original CI Variable values"""
def wrapper():
token = os.environ["CI_JOB_TOKEN"]
url = os.environ["CI_PROJECT_URL"]
user = os.environ["GITLAB_USER_LOGIN"]
except KeyError:
token = "invalid"
url = "invalid"
user = ""
private = os.getenv("READ_PROJECTS_TOKEN", default=None)
if not private:
os.environ["READ_PROJECTS_TOKEN"] = "FOO"
os.environ["CI_JOB_TOKEN"] = token
os.environ["CI_PROJECT_URL"] = url
os.environ["GITLAB_USER_LOGIN"] = user
if private:
os.environ["READ_PROJECTS_TOKEN"] = private
# if it was set after
elif os.getenv("READ_PROJECTS_TOKEN", default=None):
del os.environ["READ_PROJECTS_TOKEN"]
return wrapper
def request_raw(path: str, token: str, project_url: str) -> List[Dict[str, str]]:
gitlab_header: Dict[str, str] = {'JOB_TOKEN': token }
def request_raw(path: str, headers: Dict[str, str], project_url: str) -> List[Dict[str, str]]:
base_url: str = get_hostname(project_url)
url: str = f"https://{base_url}/api/v4/{path}"
print(f"GET {url}")
resp = requests.get(url, headers=gitlab_header)
print(f"Headers: {headers}")
resp = requests.get(url, headers=headers)
print(f"Request returned: {resp.status_code}")
if not resp.ok:
......@@ -72,11 +79,24 @@ def request_raw(path: str, token: str, project_url: str) -> List[Dict[str, str]]
def request(path: str) -> List[Dict[str, str]]:
# mock: "xxxxxxxxxxxxxxxxxxxx"
token: str = os.environ["CI_JOB_TOKEN"]
# Check if there is a custom token set
# API calls to Group namespaces need to be authenticated
# regardless if the group/projects are public or not.
# CI_JOB_TOKEN has an actuall value only for private jobs
# and that's also an Gitlab EE feature.
# Which means no matter what we need to give the runner
# an actuall token if we want to query even the public
# gitlab.fd.o/gstreamer group
headers: Dict[str, str] = {'Private-Token': os.environ["READ_PROJECTS_TOKEN"] }
except KeyError:
print("Custom token was not set, group api querries will fail")
# JOB_TOKEN is the default placeholder of CI_JOB_TOKEN
headers: Dict[str, str] = {'JOB_TOKEN': "xxxxxxxxxxxxxxxxxxxx" }
# mock: ""
project_url: str = os.environ['CI_PROJECT_URL']
return request_raw(path, token, project_url)
return request_raw(path, headers, project_url)
def request_wrap(path: str) -> List[Dict[str, str]]:
......@@ -102,8 +122,8 @@ def get_project_branch(project_id: int, name: str) -> Dict[str, str]:
def test_get_project_branch():
id = 1353
os.environ["CI_JOB_TOKEN"] = "xxxxxxxxxxxxxxxxxxxx"
os.environ["CI_PROJECT_URL"] = ""
del os.environ["READ_PROJECTS_TOKEN"]
twelve = get_project_branch(id, '1.12')
assert twelve is not None
......@@ -129,8 +149,8 @@ def search_user_namespace(user: str, project: str) -> Dict[str, str]:
def test_search_user_namespace():
os.environ["CI_JOB_TOKEN"] = "xxxxxxxxxxxxxxxxxxxx"
os.environ["CI_PROJECT_URL"] = ""
del os.environ["READ_PROJECTS_TOKEN"]
user = "alatiera"
gst = search_user_namespace("alatiera", "gstreamer")
......@@ -232,9 +252,9 @@ def find_repository_sha(module: str, branchname: str) -> Tuple[str, str]:
def test_find_repository_sha():
os.environ["CI_JOB_TOKEN"] = "xxxxxxxxxxxxxxxxxxxx"
os.environ["CI_PROJECT_URL"] = ""
os.environ["GITLAB_USER_LOGIN"] = "alatiera"
del os.environ["READ_PROJECTS_TOKEN"]
# This should find the repository in the user namespace
remote, git_ref = find_repository_sha("gst-plugins-good", "1.2")
