From e5aed90e96ed3952c4425890755222262f57bcfd Mon Sep 17 00:00:00 2001 From: Kim Shepherd Date: Fri, 13 Mar 2026 05:04:43 +0100 Subject: [PATCH] Support personal API token authentication (read from file) If no token is found, fall back to standard password auth --- dspace_rest_client/client.py | 68 ++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/dspace_rest_client/client.py b/dspace_rest_client/client.py index 724bef6..a381516 100644 --- a/dspace_rest_client/client.py +++ b/dspace_rest_client/client.py @@ -20,6 +20,7 @@ import logging import functools import os +from pathlib import Path from uuid import UUID import requests @@ -81,6 +82,23 @@ def parse_params(params=None, embeds=None): return params +def read_personal_api_token_secret(): + """ + Read either file path referenced in PERSONAL_API_TOKEN_FILE + or current working dir .dspace-personal-api-token.secret file + or user home .dspace-personal-api-token.secret file + """ + candidates = [ + os.environ.get("PERSONAL_API_TOKEN_FILE"), + Path.cwd() / ".dspace-personal-api-token.secret", + Path.home() / ".dspace-personal-api-token.secret", + ] + for path in candidates: + if path and (p := Path(path)).exists(): + return p.read_text(encoding='utf8').strip() + return None + + class DSpaceClient: """ Main class of the API client itself. This client uses request sessions to connect and @@ -92,7 +110,6 @@ class DSpaceClient: """ # Set up basic environment, variables - session = None API_ENDPOINT = "http://localhost:8080/server/api" SOLR_ENDPOINT = "http://localhost:8983/solr" SOLR_AUTH = None @@ -213,6 +230,7 @@ def __init__( :param password: password for the above username """ self.session = requests.Session() + self.api_token = read_personal_api_token_secret() self.API_ENDPOINT = api_endpoint self.LOGIN_URL = f"{self.API_ENDPOINT}/authn/login" self.USERNAME = username @@ -244,6 +262,52 @@ def __init__( } def authenticate(self, retry=False): + if self.api_token is not None: + return self.authenticate_with_token(retry, self.api_token) + else: + return self.authenticate_with_credentials(retry) + + + def authenticate_with_token(self, retry=False, token=None): + if token is None: + logging.error("Personal API Token is required for authentication") + return None + self.session.headers.update({"Authorization": token}) + + # Get and check authentication status + r = self.session.get( + f"{self.API_ENDPOINT}/authn/status", headers=self.request_headers, + proxies=self.proxies + ) + + if r.status_code == 200: + r_json = parse_json(r) + if r_json is not None and "authenticated" in r_json and r_json["authenticated"] is True: + logging.info("Authenticated successfully using personal API token as %s", self.USERNAME) + return r_json["authenticated"] + + if r.status_code == 401: + # 401 Unauthorized + # If we get a 401, this means a general authentication failure + logging.error( + "Authentication failure: invalid API token", token + ) + return False + + if r.status_code == 403: + if retry: + logging.error( + "Too many retries updating token: %s: %s", r.status_code, r.text + ) + return False + else: + logging.debug("Retrying request with updated CSRF token") + return self.authenticate_with_credentials(retry=True) + + return False + + + def authenticate_with_credentials(self, retry=False): """ Authenticate with the DSpace REST API. As with other operations, perform XSRF refreshes when necessary. After POST, check /authn/status and log success if the authenticated json property is true @@ -271,7 +335,7 @@ def authenticate(self, retry=False): return False else: logging.debug("Retrying request with updated CSRF token") - return self.authenticate(retry=True) + return self.authenticate_with_credentials(retry=True) if r.status_code == 401: # 401 Unauthorized