diff --git a/app.py b/app.py index ff2ae55..18c3295 100644 --- a/app.py +++ b/app.py @@ -963,6 +963,22 @@ def configure_pat(): logger.warning(f"Rejected configure-pat from non-owner {get_request_user()} (owner: {app_owner})") return jsonify({"error": "Forbidden"}), 403 + # Idempotency / defence-in-depth: bootstrap is single-shot. Once a PAT + # is configured and the rotator is alive, refuse re-submission. Without + # this, an XSS or session-hijack vector inside the owner's browser could + # drive a swap to an attacker-controlled PAT — the owner-gate above + # would let it through because the request truly does come from the + # owner's session. The expired-token escape hatch preserves the legitimate + # re-bootstrap path (rotator timed out while idle, owner needs to refresh). + if pat_rotator.token and not pat_rotator.is_token_expired: + logger.warning( + f"Rejected configure-pat: PAT already active " + f"(user={get_request_user()}, source={request.remote_addr})" + ) + return jsonify({ + "error": "PAT already configured. Restart the app to reconfigure." + }), 409 + data = request.json token = data.get("token", "").strip() if not token: diff --git a/tests/test_auth_enforcement.py b/tests/test_auth_enforcement.py index 42030c3..f4889b6 100644 --- a/tests/test_auth_enforcement.py +++ b/tests/test_auth_enforcement.py @@ -212,6 +212,63 @@ def test_configure_pat_case_insensitive_for_owner(self): finally: app_module.app_owner = original_owner + # ---- Idempotency: defence-in-depth against XSS/session-hijack ---- + + def test_configure_pat_rejected_when_already_configured(self): + """When the rotator is alive with a fresh token, re-submission must + be refused with 409 — even from the legitimate owner. Defence-in-depth + against an attacker driving the owner's browser to swap PATs.""" + app_module = _get_app_module() + original_owner = app_module.app_owner + try: + app_module.app_owner = "owner@databricks.com" + client = _make_client(app_module) + with mock.patch.object(app_module, "_is_databricks_apps", return_value=True), \ + mock.patch.object(type(app_module.pat_rotator), "token", + new_callable=mock.PropertyMock, return_value="dapi-active"), \ + mock.patch.object(type(app_module.pat_rotator), "is_token_expired", + new_callable=mock.PropertyMock, return_value=False): + resp = client.post( + "/api/configure-pat", + json={"token": "dapi-attempt-swap"}, + headers={"X-Forwarded-Email": "owner@databricks.com"}, + ) + assert resp.status_code == 409, ( + f"Re-configure when rotator active should 409, got {resp.status_code}" + ) + body = resp.get_json() + assert "already configured" in body.get("error", "").lower(), ( + f"Error message should mention 'already configured', got: {body}" + ) + finally: + app_module.app_owner = original_owner + + def test_configure_pat_allowed_when_token_expired(self): + """Expired-token escape hatch: if the rotator timed out (long idle), + the owner must be able to re-bootstrap without restarting the app.""" + app_module = _get_app_module() + original_owner = app_module.app_owner + try: + app_module.app_owner = "owner@databricks.com" + client = _make_client(app_module) + with mock.patch.object(app_module, "_is_databricks_apps", return_value=True), \ + mock.patch.object(type(app_module.pat_rotator), "token", + new_callable=mock.PropertyMock, return_value="dapi-stale"), \ + mock.patch.object(type(app_module.pat_rotator), "is_token_expired", + new_callable=mock.PropertyMock, return_value=True): + # Empty body → should reach "Token required" (400), proving + # we passed BOTH the owner gate and the idempotency check. + resp = client.post( + "/api/configure-pat", + json={}, + headers={"X-Forwarded-Email": "owner@databricks.com"}, + ) + assert resp.status_code == 400, ( + f"Expired-PAT path should reach token-required check (400), got {resp.status_code}" + ) + finally: + app_module.app_owner = original_owner + # --------------------------------------------------------------------------- # 2. Case-insensitive email matching