Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,18 @@ def pat_status():
@app.route("/api/configure-pat", methods=["POST"])
def configure_pat():
"""Accept a user-provided PAT, validate it, and start rotation."""
# Hotfix: only the resolved owner may (re-)configure the PAT. Without this,
# any workspace-SSO'd user who reaches the app can submit their own valid
# PAT and persistently impersonate the owner — every CLI call would then
# run under the submitter's identity. app_owner is set in initialize_app()
# before gunicorn binds, so it's reliably populated by request time on
# Databricks Apps; this guard short-circuits to "allow" only when owner
# resolution failed (matches the rest of the auth surface's behaviour).
if _is_databricks_apps() and app_owner:
if get_request_user() != app_owner:
logger.warning(f"Rejected configure-pat from non-owner {get_request_user()} (owner: {app_owner})")
return jsonify({"error": "Forbidden"}), 403

data = request.json
token = data.get("token", "").strip()
if not token:
Expand Down
99 changes: 99 additions & 0 deletions tests/test_auth_enforcement.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,105 @@ def test_resize_allowed_for_owner(self):
self._assert_not_denied("POST", "/api/resize", {"session_id": "fake", "cols": 80, "rows": 24})


# ---------------------------------------------------------------------------
# 1b. /api/configure-pat MUST enforce owner check (hotfix)
# ---------------------------------------------------------------------------

class TestConfigurePatAuth:
"""The PAT bootstrap endpoint is auth-exempt at the before_request gate
(intentionally — needed before the owner has set up). It MUST still gate
on owner once app_owner is resolved, otherwise any workspace-SSO'd user
can submit their own PAT and persistently impersonate the owner.
"""

def test_configure_pat_denied_for_non_owner(self):
app_module = _get_app_module()
original_owner = app_module.app_owner
try:
app_module.app_owner = "owner@databricks.com"
client = _make_client(app_module)
with mock.patch.object(app_module, "_is_databricks_apps", return_value=True):
resp = client.post(
"/api/configure-pat",
json={"token": "dapi-attacker"},
headers={"X-Forwarded-Email": "intruder@evil.com"},
)
assert resp.status_code == 403, (
f"POST /api/configure-pat should return 403 for non-owner, got {resp.status_code}"
)
finally:
app_module.app_owner = original_owner

def test_configure_pat_allowed_for_owner(self):
"""Owner can still bootstrap. We don't run the rotator, just confirm
the auth guard doesn't return 403 — actual token validation is mocked
out separately and not in scope here."""
app_module = _get_app_module()
original_owner = app_module.app_owner
try:
app_module.app_owner = "owner@databricks.com"
client = _make_client(app_module)
with mock.patch.object(app_module, "_is_databricks_apps", return_value=True):
# Don't pass a token — we expect 400 ("Token required"), which
# proves the request got past the owner check.
resp = client.post(
"/api/configure-pat",
json={},
headers={"X-Forwarded-Email": "owner@databricks.com"},
)
assert resp.status_code != 403, (
f"POST /api/configure-pat should not return 403 for owner, got {resp.status_code}"
)
assert resp.status_code == 400, (
f"Owner with empty body should get 400 'Token required', got {resp.status_code}"
)
finally:
app_module.app_owner = original_owner

def test_configure_pat_bootstrap_window_allowed(self):
"""During the brief window where app_owner hasn't been resolved yet
(e.g., Apps API hiccup at startup), the endpoint must still accept
the request — otherwise the owner can never finish bootstrap.
This is intentional and documented in the in-handler comment."""
app_module = _get_app_module()
original_owner = app_module.app_owner
try:
app_module.app_owner = None # unresolved
client = _make_client(app_module)
with mock.patch.object(app_module, "_is_databricks_apps", return_value=True):
resp = client.post(
"/api/configure-pat",
json={},
headers={"X-Forwarded-Email": "anyone@databricks.com"},
)
# Should NOT be 403 — should fall through to "Token required" (400)
assert resp.status_code != 403, (
f"During bootstrap (app_owner unresolved), configure-pat should not 403, got {resp.status_code}"
)
finally:
app_module.app_owner = original_owner

def test_configure_pat_case_insensitive_for_owner(self):
"""Owner email casing from the SSO header must match the lowercased
app_owner — same case-insensitive contract as the rest of auth."""
app_module = _get_app_module()
original_owner = app_module.app_owner
try:
app_module.app_owner = "owner@databricks.com"
client = _make_client(app_module)
with mock.patch.object(app_module, "_is_databricks_apps", return_value=True):
resp = client.post(
"/api/configure-pat",
json={},
headers={"X-Forwarded-Email": "Owner@Databricks.COM"},
)
assert resp.status_code != 403, (
f"Mixed-case owner header should be accepted, got {resp.status_code}"
)
finally:
app_module.app_owner = original_owner


# ---------------------------------------------------------------------------
# 2. Case-insensitive email matching
# ---------------------------------------------------------------------------
Expand Down