diff --git a/.github/workflows/ogm-nightly-sync.yml b/.github/workflows/ogm-nightly-sync.yml new file mode 100644 index 00000000..a3e4dd04 --- /dev/null +++ b/.github/workflows/ogm-nightly-sync.yml @@ -0,0 +1,66 @@ +name: Nightly OGM Sync + +on: + schedule: + - cron: "15 7 * * *" + workflow_dispatch: + +permissions: + contents: read + +concurrency: + group: ogm-nightly-sync + cancel-in-progress: false + +jobs: + trigger-nightly-sync: + name: Trigger Production OGM Sync + runs-on: ubuntu-latest + + steps: + - name: Start SSH agent + uses: webfactory/ssh-agent@v0.9.0 + with: + ssh-private-key: ${{ secrets.OGM_KAMAL_SSH_PRIVATE_KEY }} + + - name: Trust production host key + env: + SSH_HOST: ${{ secrets.OGM_KAMAL_SSH_HOST }} + SSH_PORT: ${{ secrets.OGM_KAMAL_SSH_PORT || '22' }} + run: | + mkdir -p "$HOME/.ssh" + ssh-keyscan -p "$SSH_PORT" -H "$SSH_HOST" >> "$HOME/.ssh/known_hosts" + + - name: Refresh OGM repos and enqueue nightly harvest + env: + SSH_HOST: ${{ secrets.OGM_KAMAL_SSH_HOST }} + SSH_PORT: ${{ secrets.OGM_KAMAL_SSH_PORT || '22' }} + SSH_USER: ${{ secrets.OGM_KAMAL_SSH_USER }} + run: | + ssh -o BatchMode=yes -p "$SSH_PORT" "$SSH_USER@$SSH_HOST" ' + set -eu + + container="$(docker ps \ + --filter label=service=ogm-api \ + --filter label=role=worker \ + --filter status=running \ + --format "{{.Names}}" \ + | head -n1)" + + if [ -z "$container" ]; then + container="$(docker ps \ + --filter label=service=ogm-api \ + --filter label=role=web \ + --filter status=running \ + --format "{{.Names}}" \ + | head -n1)" + fi + + if [ -z "$container" ]; then + echo "No running ogm-api web or worker container found." >&2 + exit 1 + fi + + echo "Using container: $container" + docker exec "$container" python /app/backend/scripts/trigger_ogm_nightly_sync.py + ' diff --git a/README.md b/README.md index 18dd8f21..df915f84 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ Then open: - **Website**: `http://localhost:3000` - **API docs (for technical staff)**: `http://localhost:8000/api/docs` +- **OGM repository dashboard**: `http://localhost:8000/api/v1/ogm/repos/dashboard` To stop everything later: @@ -149,6 +150,7 @@ All documentation is now in the top-level `docs/` folder: - **Codebase overview / executive architecture summary**: `docs/backend/codebase_overview.md` - **Caching**: `docs/backend/caching.md` - **Search**: `docs/backend/search.md` +- **OpenGeoMetadata harvesting**: `docs/backend/ogm_harvesting.md` - **Service tiers / API keys / rate limiting**: `docs/backend/service_tiers_runbook.md` - **Scripts (Python utilities)**: `docs/backend/scripts.md` - **MCP / Claude Desktop**: `docs/mcp/` diff --git a/backend/app/api/v1/endpoint_modules/admin.py b/backend/app/api/v1/endpoint_modules/admin.py index f22262f6..66ba2a64 100644 --- a/backend/app/api/v1/endpoint_modules/admin.py +++ b/backend/app/api/v1/endpoint_modules/admin.py @@ -85,7 +85,7 @@ class UpdateAPIKeyRequest(BaseModel): class UpdateOGMRepoRequest(BaseModel): ogm_enabled: Optional[bool] = None - ogm_watch_mode: Optional[str] = None # weekly|webhook|both|manual + ogm_watch_mode: Optional[str] = None # nightly|weekly|webhook|both|manual ogm_notes: Optional[str] = None ogm_tags: Optional[dict] = None @@ -393,12 +393,12 @@ async def update_ogm_repo(repo_name: str, body: UpdateOGMRepoRequest): """Create or update a repo watch entry.""" if body.ogm_watch_mode is not None: mode = body.ogm_watch_mode.lower().strip() - if mode not in {"weekly", "webhook", "both", "manual"}: + if mode not in {"nightly", "weekly", "webhook", "both", "manual"}: raise HTTPException(status_code=400, detail="Invalid ogm_watch_mode") await ogm_repo.upsert_repo( ogm_repo_name=repo_name, ogm_enabled=body.ogm_enabled if body.ogm_enabled is not None else True, - ogm_watch_mode=body.ogm_watch_mode or "weekly", + ogm_watch_mode=body.ogm_watch_mode or "nightly", ogm_notes=body.ogm_notes, ogm_tags=body.ogm_tags, ) diff --git a/backend/app/api/v1/endpoint_modules/ogm.py b/backend/app/api/v1/endpoint_modules/ogm.py index ed7788c7..496ac676 100644 --- a/backend/app/api/v1/endpoint_modules/ogm.py +++ b/backend/app/api/v1/endpoint_modules/ogm.py @@ -1,12 +1,35 @@ -from typing import Optional +from __future__ import annotations -from fastapi import APIRouter, Query +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +from fastapi import APIRouter, Query, Request +from fastapi.responses import HTMLResponse +from fastapi.templating import Jinja2Templates from app.api.v1.utils import create_response from app.services.ogm_harvest.repository import OGMHarvestRepository router = APIRouter() ogm_repo = OGMHarvestRepository() +TEMPLATES_DIR = Path(__file__).resolve().parents[4] / "templates" +templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) if TEMPLATES_DIR.exists() else None + + +def _format_timestamp(value: Any) -> Optional[str]: + if value in (None, ""): + return None + if isinstance(value, datetime): + return value.strftime("%Y-%m-%d %H:%M UTC") + if isinstance(value, str): + try: + normalized = value.replace("Z", "+00:00") + parsed = datetime.fromisoformat(normalized) + return parsed.strftime("%Y-%m-%d %H:%M UTC") + except ValueError: + return value + return str(value) @router.get("/ogm/repos") @@ -18,6 +41,95 @@ async def list_public_ogm_repos(): return create_response({"repos": repos}) +@router.get( + "/ogm/repos/dashboard", + include_in_schema=False, + response_class=HTMLResponse, +) +async def ogm_repo_dashboard(request: Request): + repos = await ogm_repo.list_public_repo_summaries() + + dashboard_repos = [] + total_harvested = 0 + total_available = 0 + repos_with_aardvark = 0 + enabled_repos = 0 + never_harvested = 0 + + for repo in repos: + harvested_count = int(repo.get("harvested_record_count") or 0) + available_count = int(repo.get("available_record_count") or 0) + has_aardvark = bool(repo.get("ogm_has_aardvark")) + enabled = bool(repo.get("ogm_enabled")) + last_harvest_completed = repo.get("last_crawl_completed_at") + + total_harvested += harvested_count + total_available += available_count + repos_with_aardvark += int(has_aardvark) + enabled_repos += int(enabled) + never_harvested += int(not bool(last_harvest_completed)) + + dashboard_repos.append( + { + **repo, + "display_last_commit_at": _format_timestamp(repo.get("last_commit_at")), + "display_last_harvest_at": _format_timestamp(last_harvest_completed), + "display_last_harvest_started_at": _format_timestamp( + repo.get("last_crawl_started_at") + ), + "harvest_gap_count": max(harvested_count - available_count, 0), + } + ) + + summary = { + "repo_count": len(dashboard_repos), + "enabled_repo_count": enabled_repos, + "repos_with_aardvark_count": repos_with_aardvark, + "never_harvested_count": never_harvested, + "harvested_record_count": total_harvested, + "available_record_count": total_available, + } + + if templates is None: + rows = "".join( + ( + "" + f"{repo.get('ogm_repo_name') or ''}" + f"{repo.get('display_last_commit_at') or '-'}" + f"{repo.get('display_last_harvest_at') or '-'}" + f"{'yes' if repo.get('ogm_has_aardvark') else 'no'}" + f"{repo.get('harvested_record_count') or 0}" + f"{repo.get('available_record_count') or 0}" + "" + ) + for repo in dashboard_repos + ) + return HTMLResponse( + ( + "" + "OpenGeoMetadata Repository Dashboard" + "" + "

OpenGeoMetadata Repository Dashboard

" + "

Templates are unavailable, showing a minimal fallback view.

" + "" + "" + "" + f"{rows}
RepositoryLast commitLast harvestAardvarkHarvestedAvailable
" + ) + ) + + return templates.TemplateResponse( + "ogm_repo_dashboard.html", + { + "request": request, + "title": "OpenGeoMetadata Repository Dashboard", + "summary": summary, + "repos": dashboard_repos, + "generated_at": _format_timestamp(datetime.utcnow()), + }, + ) + + @router.get("/ogm/harvest/failures") async def list_public_ogm_harvest_failures( repo_name: Optional[str] = Query(None, description="Filter by a single ogm_repo_name"), diff --git a/backend/app/middleware/rate_limit_middleware.py b/backend/app/middleware/rate_limit_middleware.py index c525937f..8469ad1b 100644 --- a/backend/app/middleware/rate_limit_middleware.py +++ b/backend/app/middleware/rate_limit_middleware.py @@ -32,6 +32,13 @@ def _bypass_rate_limit_for_tests() -> bool: r"^/api/v1/static-map-assets/[0-9a-f]{64}$", re.IGNORECASE, ) +HEALTHCHECK_BYPASS_PATHS = { + "/api/docs", + "/api/openapi.json", + "/api/redoc", +} +ANALYTICS_EVENTS_PATH = "/api/v1/analytics/events" +DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE = 120 def _is_immutable_asset_route(path: str) -> bool: @@ -41,6 +48,43 @@ def _is_immutable_asset_route(path: str) -> bool: ) +def _is_healthcheck_route(path: str) -> bool: + """Return True for lightweight health/documentation routes used by deploy tooling.""" + return path in HEALTHCHECK_BYPASS_PATHS + + +def _is_analytics_events_route(path: str) -> bool: + """Return True for the lightweight frontend analytics ingestion endpoint.""" + return path == ANALYTICS_EVENTS_PATH + + +def _analytics_events_requests_per_minute() -> Optional[int]: + """Return the analytics-event throttle, independent from API service tiers.""" + raw_value = os.getenv( + "ANALYTICS_EVENTS_REQUESTS_PER_MINUTE", + str(DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE), + ).strip() + if raw_value.lower() in {"", "none", "unlimited", "false", "off"}: + return None + try: + limit = int(raw_value) + except ValueError: + logger.warning( + "Invalid ANALYTICS_EVENTS_REQUESTS_PER_MINUTE=%r; using default %s", + raw_value, + DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE, + ) + return DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE + if limit < 0: + logger.warning( + "Invalid negative ANALYTICS_EVENTS_REQUESTS_PER_MINUTE=%r; using default %s", + raw_value, + DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE, + ) + return DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE + return limit + + class RateLimitMiddleware(BaseHTTPMiddleware): """Middleware to enforce rate limiting based on API keys and service tiers.""" @@ -58,6 +102,13 @@ async def dispatch(self, request: Request, call_next): logger.debug("Rate limiting bypassed for tests (DISABLE_RATE_LIMIT_FOR_TESTS=true)") return await call_next(request) + # CORS preflights do not carry Authorization headers, so charging them + # against anonymous quota causes authenticated browser clients to 429 + # before their real request can be sent. + if request.method == "OPTIONS": + logger.debug("Skipping rate limiting for %s (CORS preflight)", request.url.path) + return await call_next(request) + # Extract API key from header or query parameter api_key = self._extract_api_key(request) @@ -67,6 +118,8 @@ async def dispatch(self, request: Request, call_next): skip_rate_limit_reason = None if not _rate_limit_enabled(): skip_rate_limit_reason = "rate limiting disabled" + elif _is_healthcheck_route(request.url.path): + skip_rate_limit_reason = "healthcheck route" elif request.url.path.startswith("/api/v1/admin"): skip_rate_limit_reason = "admin endpoint" elif _is_immutable_asset_route(request.url.path): @@ -93,12 +146,23 @@ async def dispatch(self, request: Request, call_next): # Get identifier (key hash for authenticated, IP for anonymous) identifier = self._get_identifier(request, api_key, tier_info) + rate_limit_tier_name = tier_name + rate_limit_identifier = identifier + rate_limit_requests_per_minute = requests_per_minute + if skip_rate_limit_reason is None and _is_analytics_events_route(request.url.path): + analytics_limit = _analytics_events_requests_per_minute() + rate_limit_tier_name = "analytics_events" + rate_limit_identifier = self._get_analytics_identifier(request) + rate_limit_requests_per_minute = analytics_limit + # Check rate limit (skip check for unlimited tiers) remaining = None reset_time = None - if skip_rate_limit_reason is None and requests_per_minute is not None: + if skip_rate_limit_reason is None and rate_limit_requests_per_minute is not None: allowed, remaining, reset_time = await self.rate_limit_service.check_rate_limit( - tier_name, identifier, requests_per_minute + rate_limit_tier_name, + rate_limit_identifier, + rate_limit_requests_per_minute, ) if not allowed: @@ -121,13 +185,14 @@ async def dispatch(self, request: Request, call_next): content={ "error": "Rate limit exceeded", "message": ( - f"Rate limit of {requests_per_minute} requests per minute exceeded" + f"Rate limit of {rate_limit_requests_per_minute} " + "requests per minute exceeded" ), "retry_after": reset_time, }, ) # Add rate limit headers - response.headers["X-RateLimit-Limit"] = str(requests_per_minute) + response.headers["X-RateLimit-Limit"] = str(rate_limit_requests_per_minute) response.headers["X-RateLimit-Remaining"] = "0" response.headers["X-RateLimit-Reset"] = str(reset_time) response.headers["Retry-After"] = str(reset_time) @@ -144,21 +209,26 @@ async def dispatch(self, request: Request, call_next): # Add rate limit headers only when throttling is active for this request. if skip_rate_limit_reason is None: - if requests_per_minute is not None: + if rate_limit_requests_per_minute is not None: headers = await self.rate_limit_service.get_rate_limit_headers( - tier_name, - identifier, - requests_per_minute, + rate_limit_tier_name, + rate_limit_identifier, + rate_limit_requests_per_minute, remaining=remaining, reset_time=reset_time, ) else: headers = await self.rate_limit_service.get_rate_limit_headers( - tier_name, identifier, requests_per_minute + rate_limit_tier_name, + rate_limit_identifier, + rate_limit_requests_per_minute, ) for header_name, header_value in headers.items(): response.headers[header_name] = header_value + if skip_rate_limit_reason == "healthcheck route": + return response + # Log API usage (fire-and-forget, won't block response). try: api_key_id = tier_info.get("api_key_id") @@ -256,3 +326,13 @@ def _get_identifier(self, request: Request, api_key: Optional[str], tier_info: d # Last resort: use a default identifier return "unknown" + + def _get_analytics_identifier(self, request: Request) -> str: + """Use a per-client identifier for analytics event throttling.""" + ip_address = self._extract_ip_address(request) + if ip_address: + return f"ip:{ip_address}" + origin = request.headers.get("Origin") + if origin: + return f"origin:{origin}" + return "unknown" diff --git a/backend/app/services/ogm_harvest/repository.py b/backend/app/services/ogm_harvest/repository.py index 0e4407b3..25234f35 100644 --- a/backend/app/services/ogm_harvest/repository.py +++ b/backend/app/services/ogm_harvest/repository.py @@ -7,7 +7,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert from db.database import database -from db.models import ogm_harvest_runs, ogm_repos, ogm_resource_state +from db.models import ogm_harvest_runs, ogm_repos, ogm_resource_state, resources class OGMHarvestRepository: @@ -26,8 +26,11 @@ async def list_public_repo_summaries(self) -> List[Dict[str, Any]]: """ Public-facing OGM repo summaries. - For each configured OGM repo, return the latest crawl status and the - latest run's imported/error counts (when available). + For each configured OGM repo, return: + - repo metadata discovered from GitHub + - latest crawl status in this app + - latest run's imported/error counts (when available) + - current harvested resource counts and API-available record counts """ latest_run_ids = ( select( @@ -37,6 +40,35 @@ async def list_public_repo_summaries(self) -> List[Dict[str, Any]]: .group_by(ogm_harvest_runs.c.ogm_repo_name) .subquery() ) + harvested_record_count = ( + select(func.count()) + .select_from(ogm_resource_state) + .where(ogm_resource_state.c.ogm_repo_name == ogm_repos.c.ogm_repo_name) + .where(ogm_resource_state.c.ogm_missing_since.is_(None)) + .scalar_subquery() + ) + published_available_record_count = ( + select(func.count()) + .select_from(resources) + .where(resources.c.b1g_adminTags_sm.is_not(None)) + .where( + resources.c.b1g_adminTags_sm.any( + func.concat("ogm_repo:", ogm_repos.c.ogm_repo_name) + ) + ) + .where(func.coalesce(resources.c.gbl_suppressed_b, False).is_(False)) + .where( + func.lower( + func.coalesce( + func.nullif(resources.c.b1g_publication_state_s, ""), + func.nullif(resources.c.publication_state, ""), + "published", + ) + ) + == "published" + ) + .scalar_subquery() + ) q = ( select( @@ -46,11 +78,15 @@ async def list_public_repo_summaries(self) -> List[Dict[str, Any]]: ogm_repos.c.ogm_last_harvest_started_at, ogm_repos.c.ogm_last_harvest_completed_at, ogm_repos.c.ogm_last_harvest_status, + ogm_repos.c.ogm_last_commit_sha, + ogm_repos.c.ogm_tags, ogm_harvest_runs.c.ogm_id.label("last_run_id"), ogm_harvest_runs.c.ogm_started_at.label("last_run_started_at"), ogm_harvest_runs.c.ogm_completed_at.label("last_run_completed_at"), ogm_harvest_runs.c.ogm_status.label("last_run_status"), ogm_harvest_runs.c.ogm_stats_json.label("last_run_stats_json"), + harvested_record_count.label("harvested_record_count"), + published_available_record_count.label("available_record_count"), ) .select_from( ogm_repos.outerjoin( @@ -73,6 +109,7 @@ def _to_int(value: Any) -> int: for row in rows: item = dict(row) stats = item.get("last_run_stats_json") or {} + tags = item.get("ogm_tags") or {} latest_status = item.get("last_run_status") or item.get("ogm_last_harvest_status") latest_started_at = item.get("last_run_started_at") or item.get( "ogm_last_harvest_started_at" @@ -80,17 +117,32 @@ def _to_int(value: Any) -> int: latest_completed_at = item.get("last_run_completed_at") or item.get( "ogm_last_harvest_completed_at" ) + repo_full_name = tags.get("ogm_repo_full_name") or item.get("ogm_repo_name") + has_aardvark = tags.get("ogm_has_aardvark") + if has_aardvark is None: + has_aardvark = not bool(tags.get("ogm_missing_aardvark")) summaries.append( { "ogm_repo_name": item.get("ogm_repo_name"), + "ogm_repo_full_name": repo_full_name, + "ogm_github_url": ( + f"https://github.com/{repo_full_name}" if repo_full_name else None + ), "ogm_enabled": item.get("ogm_enabled"), "ogm_watch_mode": item.get("ogm_watch_mode"), + "ogm_has_aardvark": bool(has_aardvark), + "ogm_default_branch": tags.get("ogm_default_branch"), + "ogm_archived": bool(tags.get("ogm_archived", False)), + "last_commit_at": tags.get("ogm_pushed_at"), + "last_commit_sha": item.get("ogm_last_commit_sha"), "last_crawl_started_at": latest_started_at, "last_crawl_completed_at": latest_completed_at, "last_crawl_status": latest_status, "last_run_id": item.get("last_run_id"), "harvested_success_count": _to_int(stats.get("imported")), "harvested_failure_count": _to_int(stats.get("errors")), + "harvested_record_count": _to_int(item.get("harvested_record_count")), + "available_record_count": _to_int(item.get("available_record_count")), "harvest_failure_samples": list(stats.get("error_samples") or [])[:5], } ) @@ -107,7 +159,7 @@ async def upsert_repo( self, ogm_repo_name: str, ogm_enabled: bool = True, - ogm_watch_mode: str = "weekly", + ogm_watch_mode: str = "nightly", ogm_notes: Optional[str] = None, ogm_tags: Optional[Dict[str, Any]] = None, ) -> None: diff --git a/backend/app/tasks/ogm_harvest.py b/backend/app/tasks/ogm_harvest.py index aef81eab..4f5ed336 100644 --- a/backend/app/tasks/ogm_harvest.py +++ b/backend/app/tasks/ogm_harvest.py @@ -10,6 +10,8 @@ logger = logging.getLogger(__name__) _loop: Optional[asyncio.AbstractEventLoop] = None +SCHEDULED_OGM_TRIGGERS = {"nightly", "weekly", "scheduled"} +SCHEDULED_OGM_WATCH_MODES = {"nightly", "weekly", "scheduled", "both"} def _get_loop() -> asyncio.AbstractEventLoop: @@ -56,8 +58,8 @@ async def _ogm_harvest_repo_async(repo_name: str, trigger: str) -> Dict[str, Any soft_time_limit=10 * 60, # 10 minutes time_limit=15 * 60, # 15 minutes ) -def ogm_harvest_all(self, trigger: str = "weekly") -> Dict[str, Any]: - """Enqueue harvest jobs for all enabled repos (watch_mode weekly/both).""" +def ogm_harvest_all(self, trigger: str = "nightly") -> Dict[str, Any]: + """Enqueue harvest jobs for all enabled repos in the scheduled watch set.""" return _run(_ogm_harvest_all_async(trigger=trigger)) @@ -72,7 +74,7 @@ async def _ogm_harvest_all_async(trigger: str) -> Dict[str, Any]: if not r.get("ogm_enabled", True): continue mode = (r.get("ogm_watch_mode") or "").lower() - if trigger == "weekly" and mode not in {"weekly", "both"}: + if trigger.lower() in SCHEDULED_OGM_TRIGGERS and mode not in SCHEDULED_OGM_WATCH_MODES: continue selected.append(r["ogm_repo_name"]) diff --git a/backend/scripts/populate_ogm_repos.py b/backend/scripts/populate_ogm_repos.py index b5d98c66..260c486d 100644 --- a/backend/scripts/populate_ogm_repos.py +++ b/backend/scripts/populate_ogm_repos.py @@ -126,9 +126,9 @@ def build_repo_row(repo: Dict[str, Any], *, has_aardvark: bool) -> Dict[str, Any # Policy: # - if no aardvark directory, disable by default (so it won't be harvested) - # - otherwise enable and default to weekly (can be edited via admin endpoint) + # - otherwise enable and default to nightly (can be edited via admin endpoint) ogm_enabled = bool(has_aardvark and not archived) - ogm_watch_mode = "weekly" if ogm_enabled else "manual" + ogm_watch_mode = "nightly" if ogm_enabled else "manual" return { "ogm_repo_name": name, diff --git a/backend/scripts/prime_thumbnail_cache.py b/backend/scripts/prime_thumbnail_cache.py index 530d0853..48f77bf6 100644 --- a/backend/scripts/prime_thumbnail_cache.py +++ b/backend/scripts/prime_thumbnail_cache.py @@ -14,6 +14,7 @@ Examples: python scripts/prime_thumbnail_cache.py python scripts/prime_thumbnail_cache.py --limit 250 --concurrency 4 + python scripts/prime_thumbnail_cache.py --limit 250 --hydrate-assets python scripts/prime_thumbnail_cache.py --force b1g_PJxxfKgpqpUT b1g_abc123 """ @@ -125,11 +126,13 @@ def _store_image_bytes( content_type: str, *, resource_id: str | None = None, + hydrate_assets: bool = True, ) -> bool: - """Store image bytes and MIME metadata in Redis.""" + """Store image bytes in durable storage and optionally hydrate Redis.""" try: - cache_visual_asset(redis_client, f"image:{image_hash}", image_bytes) - cache_visual_asset(redis_client, f"image_type:{image_hash}", content_type) + if hydrate_assets: + cache_visual_asset(redis_client, f"image:{image_hash}", image_bytes) + cache_visual_asset(redis_client, f"image_type:{image_hash}", content_type) store_durable_visual_asset( image_hash, asset_kind="thumbnail", @@ -164,6 +167,7 @@ def _prime_cog_thumbnail( image_hash: str, *, resource_id: str | None = None, + hydrate_assets: bool = True, ) -> bool: with provider_request_slot(source_url, action="thumbnail prime (COG)"): image_bytes = _generate_cog_thumbnail_bytes(source_url) @@ -172,7 +176,13 @@ def _prime_cog_thumbnail( is_valid, _ = _validate_image_content(image_bytes, "image/png") if not is_valid: return False - return _store_image_bytes(image_hash, image_bytes, "image/png", resource_id=resource_id) + return _store_image_bytes( + image_hash, + image_bytes, + "image/png", + resource_id=resource_id, + hydrate_assets=hydrate_assets, + ) def _prime_pmtiles_thumbnail( @@ -180,6 +190,7 @@ def _prime_pmtiles_thumbnail( image_hash: str, *, resource_id: str | None = None, + hydrate_assets: bool = True, ) -> tuple[bool, bool]: """ Prime PMTiles thumbnail cache. @@ -202,6 +213,7 @@ def _prime_pmtiles_thumbnail( image_bytes, content_type or "image/png", resource_id=resource_id, + hydrate_assets=hydrate_assets, ), False, ) @@ -212,6 +224,7 @@ def _prime_remote_thumbnail( source_url: str, *, resource_id: str | None = None, + hydrate_assets: bool = True, ) -> tuple[str, str]: resolved_url = _resolve_image_url(source_url) cooldown_remaining = provider_origin_cooldown_remaining(resolved_url) @@ -304,6 +317,7 @@ def _prime_remote_thumbnail( response.content, detected_type or "image/jpeg", resource_id=resource_id, + hydrate_assets=hydrate_assets, ): record_provider_success(resolved_url) return ("generated", "remote") @@ -386,6 +400,7 @@ async def _prime_thumbnail_for_resource( retry_failures: bool = False, retry_placeheld: bool = False, existing_state: dict[str, Any] | None = None, + hydrate_assets: bool = True, ) -> tuple[str, str, str]: resource_id = str(resource_dict["id"]) @@ -439,17 +454,12 @@ async def _prime_thumbnail_for_resource( cached_image = await image_service.get_cached_image(image_hash) if cached_image: _valid, cached_content_type = _validate_image_content(cached_image, None) - store_durable_visual_asset( + _store_image_bytes( image_hash, - asset_kind="thumbnail", - content_type=cached_content_type or "application/octet-stream", - body=cached_image, - ) - store_durable_visual_asset_link( - resource_id, - asset_hash=image_hash, - asset_kind="thumbnail", - source_signature=image_hash, + cached_image, + cached_content_type or "application/octet-stream", + resource_id=resource_id, + hydrate_assets=hydrate_assets, ) await safe_record_thumbnail_state( ThumbnailStatePayload( @@ -483,6 +493,7 @@ async def _prime_thumbnail_for_resource( source_url, image_hash, resource_id=resource_id, + hydrate_assets=hydrate_assets, ) await safe_record_thumbnail_state( ThumbnailStatePayload( @@ -503,6 +514,7 @@ async def _prime_thumbnail_for_resource( source_url, image_hash, resource_id=resource_id, + hydrate_assets=hydrate_assets, ) if ok: await safe_record_thumbnail_state( @@ -546,6 +558,7 @@ async def _prime_thumbnail_for_resource( image_hash, source_url, resource_id=resource_id, + hydrate_assets=hydrate_assets, ) if remote_status == "deprioritized": return ("deprioritized", resource_id, remote_detail) @@ -586,6 +599,7 @@ async def _process_batch( force: bool, retry_failures: bool, retry_placeheld: bool, + hydrate_assets: bool, counters: Counter[str], progress: tqdm, failures: list[str], @@ -601,6 +615,7 @@ async def _run(resource_dict: dict[str, Any]) -> tuple[str, str, str]: retry_failures=retry_failures, retry_placeheld=retry_placeheld, existing_state=state_map.get(str(resource_dict["id"])), + hydrate_assets=hydrate_assets, ) tasks = [asyncio.create_task(_run(resource_dict)) for resource_dict in batch] @@ -634,6 +649,28 @@ async def _run(args: argparse.Namespace) -> int: logger.info("No resources matched the request.") return 0 + if ( + args.hydrate_assets + and not args.allow_full_hydration + and args.limit is None + and not resource_ids + ): + logger.error( + "Refusing full-corpus Redis asset-body hydration. Use --limit or explicit " + "resource IDs for hotsets, or pass --allow-full-hydration if the host is " + "sized for a full Redis DB 1 image-body cache." + ) + return 2 + + if args.hydrate_assets: + logger.info("Redis thumbnail-body hydration is enabled for this priming run.") + else: + logger.info( + "Redis thumbnail-body hydration is disabled; priming durable assets, links, " + "and thumbnail state. Use --hydrate-assets for small hotset runs that " + "should load image bodies into Redis." + ) + counters: Counter[str] = Counter() failures: list[str] = [] @@ -657,6 +694,7 @@ async def _run(args: argparse.Namespace) -> int: force=args.force, retry_failures=args.retry_failures, retry_placeheld=args.retry_placeheld, + hydrate_assets=args.hydrate_assets, counters=counters, progress=progress, failures=failures, @@ -675,6 +713,7 @@ async def _run(args: argparse.Namespace) -> int: force=args.force, retry_failures=args.retry_failures, retry_placeheld=args.retry_placeheld, + hydrate_assets=args.hydrate_assets, counters=counters, progress=progress, failures=failures, @@ -736,6 +775,23 @@ def _parse_args() -> argparse.Namespace: action="store_true", help="Exit nonzero when any thumbnail fails; default logs failures and continues", ) + parser.add_argument( + "--hydrate-assets", + action="store_true", + help=( + "Load/reload immutable thumbnail asset bodies into Redis DB 1. Default " + "warms durable assets, links, and thumbnail state only to avoid exhausting " + "Redis memory on full-corpus runs." + ), + ) + parser.add_argument( + "--allow-full-hydration", + action="store_true", + help=( + "Allow --hydrate-assets without --limit or explicit resource IDs. Use only " + "on hosts sized for a full Redis DB 1 thumbnail image-body cache." + ), + ) return parser.parse_args() diff --git a/backend/scripts/trigger_ogm_nightly_sync.py b/backend/scripts/trigger_ogm_nightly_sync.py new file mode 100644 index 00000000..9ba2a35c --- /dev/null +++ b/backend/scripts/trigger_ogm_nightly_sync.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# ruff: noqa: E402 +""" +Nightly OpenGeoMetadata repo discovery + harvest enqueue. + +This script is intended for cron or another scheduler. It does two things: + +1. Refreshes the local `ogm_repos` watch list from the GitHub org. +2. Enqueues `ogm_harvest_all(trigger="nightly")` so enabled repos are harvested. + +Run from the backend root, for example: + + cd backend + python scripts/trigger_ogm_nightly_sync.py +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path +from typing import Any, Dict, List + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from app.tasks.ogm_harvest import ogm_harvest_all +from scripts.populate_ogm_repos import ( + _sync_database_url, + build_repo_row, + list_org_repos, + repo_has_metadata_aardvark, + upsert_rows, +) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Refresh ogm_repos from GitHub and enqueue a harvest-all task." + ) + parser.add_argument( + "--org", default="OpenGeoMetadata", help="GitHub org name (default: OpenGeoMetadata)" + ) + parser.add_argument( + "--github-token", default=None, help="GitHub token (or set GITHUB_TOKEN env var)" + ) + parser.add_argument("--per-page", type=int, default=100, help="GitHub API per_page (max 100)") + parser.add_argument("--limit", type=int, default=None, help="Limit repos processed") + parser.add_argument( + "--include-archived", + action="store_true", + help="Include archived repos when refreshing ogm_repos", + ) + parser.add_argument( + "--skip-harvest", + action="store_true", + help="Refresh the repo catalog but do not enqueue ogm_harvest_all", + ) + parser.add_argument("--dry-run", action="store_true", help="Do not write or enqueue anything") + args = parser.parse_args() + + token = args.github_token or os.getenv("GITHUB_TOKEN") + database_url = os.getenv("DATABASE_URL") + if not database_url: + raise SystemExit("DATABASE_URL is required in the environment for this script.") + + sync_db_url = _sync_database_url(database_url) + repos = list_org_repos(args.org, token, per_page=args.per_page) + if not args.include_archived: + repos = [repo for repo in repos if not repo.get("archived", False)] + if args.limit is not None: + repos = repos[: args.limit] + + rows: List[Dict[str, Any]] = [] + processed = 0 + enabled = 0 + missing_aardvark = 0 + + for repo in repos: + name = repo.get("name") + if not isinstance(name, str) or not name: + continue + processed += 1 + default_branch = repo.get("default_branch") + + try: + has_aardvark = repo_has_metadata_aardvark(args.org, name, default_branch, token) + except Exception as exc: + has_aardvark = False + repo.setdefault("notes", str(exc)) + + if not has_aardvark: + missing_aardvark += 1 + + row = build_repo_row(repo, has_aardvark=has_aardvark) + if row["ogm_enabled"]: + enabled += 1 + rows.append(row) + + upserted, _updated = upsert_rows(sync_db_url, rows, dry_run=args.dry_run) + + harvest_task_id = None + if not args.dry_run and not args.skip_harvest: + task = ogm_harvest_all.delay(trigger="nightly") + harvest_task_id = task.id + + print( + json.dumps( + { + "org": args.org, + "processed": processed, + "enabled": enabled, + "missing_aardvark": missing_aardvark, + "upserted": 0 if args.dry_run else upserted, + "harvest_enqueued": bool(harvest_task_id), + "harvest_task_id": harvest_task_id, + "dry_run": bool(args.dry_run), + }, + indent=2, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/backend/static/brand.css b/backend/static/brand.css index d0847750..7635f665 100644 --- a/backend/static/brand.css +++ b/backend/static/brand.css @@ -815,6 +815,215 @@ section#footer-app #footer-title a:hover { border-top-right-radius: 0.1rem; } +/* ========================================================================== + OGM Repository Monitor + ========================================================================== */ + +.monitor-hero, +.monitor-summary, +.monitor-table-wrap { + width: var(--docs-page-inline); + max-width: var(--docs-page-width); + margin-right: auto; + margin-left: auto; +} + +.monitor-hero { + display: grid; + gap: 12px; + padding-bottom: calc(var(--docs-unit) * 2); + border-bottom: var(--docs-rule); +} + +.eyebrow, +.summary-label, +.meta-note, +.repo-subtitle { + margin: 0; + color: var(--docs-muted); + font-family: var(--docs-mono); + font-size: 12px; + line-height: 1.4; + text-transform: uppercase; +} + +.monitor-hero h2 { + max-width: 16ch; + font-size: clamp(40px, 5vw, 72px); + line-height: 0.95; + text-transform: uppercase; +} + +.lede { + max-width: 70ch; + margin: 0; + font-size: 18px; +} + +.monitor-summary { + display: grid; + grid-template-columns: repeat(6, minmax(0, 1fr)); + gap: 0; + margin-top: calc(var(--docs-unit) * 2); + border: var(--docs-rule); +} + +.summary-card { + min-height: calc(var(--docs-unit) * 5); + padding: 18px; + border-right: var(--docs-rule); + background: rgba(255, 255, 255, 0.92); +} + +.summary-card:last-child { + border-right: 0; +} + +.summary-value { + margin: 10px 0 0; + font-size: clamp(28px, 4vw, 42px); + font-weight: 900; + line-height: 1; +} + +.monitor-table-wrap { + margin-top: calc(var(--docs-unit) * 2); + border: var(--docs-rule); + background: rgba(255, 255, 255, 0.96); +} + +.monitor-table-head { + display: flex; + align-items: end; + justify-content: space-between; + gap: var(--docs-unit); + padding: 18px 18px 0; +} + +.monitor-table-head h3 { + font-size: 28px; + text-transform: uppercase; +} + +.monitor-table-scroll { + overflow-x: auto; + padding: 18px; +} + +.monitor-table { + width: 100%; + min-width: 980px; + border-collapse: collapse; +} + +.monitor-table th, +.monitor-table td { + padding: 14px 12px; + border-top: var(--docs-rule); + vertical-align: top; + text-align: left; +} + +.monitor-table th { + font-family: var(--docs-mono); + font-size: 12px; + letter-spacing: 0; + text-transform: uppercase; +} + +.monitor-table tbody tr:hover { + background: rgba(215, 25, 32, 0.04); +} + +.repo-cell { + display: grid; + gap: 6px; +} + +.repo-cell a { + font-size: 18px; + font-weight: 800; + text-decoration: none; +} + +.status-pill { + display: inline-flex; + align-items: center; + justify-content: center; + min-height: 32px; + padding: 0 12px; + border: var(--docs-rule); + font-family: var(--docs-mono); + font-size: 12px; + font-weight: 900; + letter-spacing: 0; + text-transform: uppercase; +} + +.status-ok { + color: var(--docs-white); + background: var(--docs-black); +} + +.status-run { + color: var(--docs-black); + background: #ffe27a; +} + +.status-bad { + color: var(--docs-white); + background: var(--docs-red); +} + +.status-muted { + color: var(--docs-black); + background: #ecece7; +} + +@media (max-width: 1200px) { + .monitor-summary { + grid-template-columns: repeat(3, minmax(0, 1fr)); + } + + .summary-card:nth-child(3n) { + border-right: 0; + } + + .summary-card:nth-child(n + 4) { + border-top: var(--docs-rule); + } +} + +@media (max-width: 767px) { + .monitor-hero h2 { + max-width: none; + font-size: 36px; + } + + .lede { + font-size: 16px; + } + + .monitor-summary { + grid-template-columns: 1fr; + } + + .summary-card, + .summary-card:last-child { + border-right: 0; + border-top: var(--docs-rule); + } + + .summary-card:first-child { + border-top: 0; + } + + .monitor-table-head { + align-items: start; + padding-bottom: 0; + } +} + /* ========================================================================== End of BTAA Geoportal Brand CSS - ========================================================================== */ \ No newline at end of file + ========================================================================== */ diff --git a/backend/templates/docs.html b/backend/templates/docs.html index 020753ec..63ae9b44 100644 --- a/backend/templates/docs.html +++ b/backend/templates/docs.html @@ -22,6 +22,7 @@

BTAA Geospatial API

@@ -181,4 +182,3 @@

BTAA Member Libraries

- diff --git a/backend/templates/ogm_repo_dashboard.html b/backend/templates/ogm_repo_dashboard.html new file mode 100644 index 00000000..c4138699 --- /dev/null +++ b/backend/templates/ogm_repo_dashboard.html @@ -0,0 +1,202 @@ + + + + + + {{ title }} + + + + +
+
+ +
+
+ +
+
+

OpenGeoMetadata Harvest Monitor

+

Repository health, harvest freshness, and API availability in one place.

+

+ This page tracks every repository discovered in the OpenGeoMetadata GitHub organization, + whether it exposes a metadata-aardvark/ directory, when it last changed on GitHub, + when this app last harvested it, and how many records are currently harvested versus published + through this API. +

+

Generated {{ generated_at }}. Times shown in UTC.

+
+ +
+
+

Tracked Repositories

+

{{ summary.repo_count }}

+
+
+

With Aardvark Metadata

+

{{ summary.repos_with_aardvark_count }}

+
+
+

Nightly Enabled

+

{{ summary.enabled_repo_count }}

+
+
+

Current Harvested Records

+

{{ "{:,}".format(summary.harvested_record_count) }}

+
+
+

Published in This API

+

{{ "{:,}".format(summary.available_record_count) }}

+
+
+

Never Harvested Here

+

{{ summary.never_harvested_count }}

+
+
+ +
+
+
+

Repositories

+

JSON source: /api/v1/ogm/repos

+
+
+ +
+ + + + + + + + + + + + + + {% for repo in repos %} + + + + + + + + + + {% endfor %} + +
RepositoryLast CommitLast HarvestAardvarkHarvest StatusHarvested RecordsAvailable in API
+
+ {{ repo.ogm_repo_name }} +

{{ repo.ogm_repo_full_name or repo.ogm_repo_name }}

+

+ mode: {{ repo.ogm_watch_mode or "manual" }} + {% if repo.last_commit_sha %} + ยท sha {{ repo.last_commit_sha[:12] }} + {% endif %} +

+
+
+ {{ repo.display_last_commit_at or "Never observed" }} + + {% if repo.display_last_harvest_at %} +
{{ repo.display_last_harvest_at }}
+ {% if repo.display_last_harvest_started_at and repo.display_last_harvest_started_at != repo.display_last_harvest_at %} +

started {{ repo.display_last_harvest_started_at }}

+ {% endif %} + {% else %} + Never harvested + {% endif %} +
+ {% if repo.ogm_has_aardvark %} + Present + {% else %} + Missing + {% endif %} + + {% set crawl_status = (repo.last_crawl_status or "unknown")|lower %} + {% if crawl_status == "success" %} + Success + {% elif crawl_status == "running" %} + Running + {% elif crawl_status == "failed" %} + Failed + {% else %} + {{ repo.last_crawl_status or "Unknown" }} + {% endif %} + {% if repo.harvested_failure_count %} +

{{ "{:,}".format(repo.harvested_failure_count) }} recent import errors

+ {% endif %} +
+
{{ "{:,}".format(repo.harvested_record_count) }}
+ {% if repo.harvested_success_count %} +

last run imported {{ "{:,}".format(repo.harvested_success_count) }}

+ {% endif %} +
+
{{ "{:,}".format(repo.available_record_count) }}
+ {% if repo.harvest_gap_count %} +

{{ "{:,}".format(repo.harvest_gap_count) }} not yet published

+ {% endif %} +
+
+
+
+ + + + + + diff --git a/backend/tests/api/v1/test_ogm_public_endpoints.py b/backend/tests/api/v1/test_ogm_public_endpoints.py index 11b5d581..15e7c0d2 100644 --- a/backend/tests/api/v1/test_ogm_public_endpoints.py +++ b/backend/tests/api/v1/test_ogm_public_endpoints.py @@ -1,24 +1,46 @@ +import os +from contextlib import contextmanager from unittest.mock import AsyncMock, patch from fastapi.testclient import TestClient +if "postgresql+asyncpg://" not in os.getenv("DATABASE_URL", ""): + os.environ["DATABASE_URL"] = "postgresql+asyncpg://postgres:postgres@localhost/testdb" + from app.main import app -client = TestClient(app) + +@contextmanager +def client(): + with ( + patch("app.main.database.connect", new_callable=AsyncMock), + patch("app.main.database.disconnect", new_callable=AsyncMock), + patch("app.main.init_elasticsearch", new_callable=AsyncMock), + patch("app.main.close_elasticsearch", new_callable=AsyncMock), + patch("app.main.close_store", new_callable=AsyncMock), + TestClient(app) as test_client, + ): + yield test_client def test_public_ogm_repos_endpoint_returns_repo_summaries(): sample_repos = [ { "ogm_repo_name": "edu.utexas", + "ogm_repo_full_name": "OpenGeoMetadata/edu.utexas", + "ogm_github_url": "https://github.com/OpenGeoMetadata/edu.utexas", "ogm_enabled": True, - "ogm_watch_mode": "weekly", + "ogm_watch_mode": "nightly", + "ogm_has_aardvark": True, + "last_commit_at": "2026-02-26T03:15:16Z", "last_crawl_started_at": "2026-02-25T14:57:16.327086", "last_crawl_completed_at": "2026-02-25T14:58:11.946192", "last_crawl_status": "success", "last_run_id": 142, "harvested_success_count": 876, "harvested_failure_count": 1800, + "harvested_record_count": 904, + "available_record_count": 901, } ] @@ -27,7 +49,8 @@ def test_public_ogm_repos_endpoint_returns_repo_summaries(): new_callable=AsyncMock, ) as mock_summaries: mock_summaries.return_value = sample_repos - response = client.get("/api/v1/ogm/repos") + with client() as test_client: + response = test_client.get("/api/v1/ogm/repos") assert response.status_code == 200 data = response.json() @@ -35,6 +58,44 @@ def test_public_ogm_repos_endpoint_returns_repo_summaries(): assert data["repos"] == sample_repos +def test_public_ogm_repo_dashboard_renders_html_monitor(): + sample_repos = [ + { + "ogm_repo_name": "edu.utexas", + "ogm_repo_full_name": "OpenGeoMetadata/edu.utexas", + "ogm_github_url": "https://github.com/OpenGeoMetadata/edu.utexas", + "ogm_enabled": True, + "ogm_watch_mode": "nightly", + "ogm_has_aardvark": True, + "last_commit_at": "2026-02-26T03:15:16Z", + "last_commit_sha": "abc123def4567890", + "last_crawl_started_at": "2026-02-25T14:57:16.327086", + "last_crawl_completed_at": "2026-02-25T14:58:11.946192", + "last_crawl_status": "success", + "last_run_id": 142, + "harvested_success_count": 876, + "harvested_failure_count": 0, + "harvested_record_count": 904, + "available_record_count": 901, + } + ] + + with patch( + "app.api.v1.endpoint_modules.ogm.ogm_repo.list_public_repo_summaries", + new_callable=AsyncMock, + ) as mock_summaries: + mock_summaries.return_value = sample_repos + with client() as test_client: + response = test_client.get("/api/v1/ogm/repos/dashboard") + + assert response.status_code == 200 + assert response.headers["content-type"] == "text/html; charset=utf-8" + assert "OGM Repository Monitor" in response.text + assert "edu.utexas" in response.text + assert "OpenGeoMetadata/edu.utexas" in response.text + assert "/api/v1/ogm/repos" in response.text + + def test_public_ogm_failures_endpoint_lists_failed_runs(): sample_failures = [ { @@ -56,7 +117,8 @@ def test_public_ogm_failures_endpoint_lists_failed_runs(): new_callable=AsyncMock, ) as mock_runs: mock_runs.return_value = sample_failures - response = client.get("/api/v1/ogm/harvest/failures?limit=25&offset=0") + with client() as test_client: + response = test_client.get("/api/v1/ogm/harvest/failures?limit=25&offset=0") assert response.status_code == 200 data = response.json() @@ -82,7 +144,8 @@ def test_public_ogm_failures_endpoint_filters_single_repo(): new_callable=AsyncMock, ) as mock_runs: mock_runs.return_value = [] - response = client.get("/api/v1/ogm/harvest/failures?repo_name=edu.utexas") + with client() as test_client: + response = test_client.get("/api/v1/ogm/harvest/failures?repo_name=edu.utexas") assert response.status_code == 200 data = response.json() @@ -104,7 +167,8 @@ def test_public_ogm_failures_endpoint_can_limit_to_hard_failures_only(): new_callable=AsyncMock, ) as mock_runs: mock_runs.return_value = sample_failures - response = client.get("/api/v1/ogm/harvest/failures?include_with_errors=false") + with client() as test_client: + response = test_client.get("/api/v1/ogm/harvest/failures?include_with_errors=false") assert response.status_code == 200 data = response.json() diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index cb97f18a..e2290e15 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -9,6 +9,21 @@ os.environ["TURNSTILE_ENABLED"] = "false" +def _ensure_async_database_url() -> None: + database_url = os.getenv("DATABASE_URL", "").strip() + if database_url.startswith("postgresql://"): + os.environ["DATABASE_URL"] = database_url.replace( + "postgresql://", + "postgresql+asyncpg://", + 1, + ) + elif not database_url: + os.environ["DATABASE_URL"] = "postgresql+asyncpg://postgres:postgres@localhost/testdb" + + +_ensure_async_database_url() + + def _load_dotenv_if_available(path: str) -> None: try: from dotenv import load_dotenv # type: ignore diff --git a/backend/tests/middleware/test_rate_limit_integration.py b/backend/tests/middleware/test_rate_limit_integration.py index 63ea8e19..beeb83b7 100644 --- a/backend/tests/middleware/test_rate_limit_integration.py +++ b/backend/tests/middleware/test_rate_limit_integration.py @@ -24,6 +24,10 @@ async def test_endpoint(): async def thumbnail_asset(image_hash: str): return JSONResponse({"image_hash": image_hash}) + @app.post("/api/v1/analytics/events") + async def analytics_events(): + return JSONResponse({"status": "accepted"}, status_code=202) + app.add_middleware(RateLimitMiddleware) return app @@ -200,3 +204,26 @@ def test_unlimited_key_never_returns_429(self, unlimited_key_client): assert second.status_code == 200 assert second.headers["X-RateLimit-Limit"] == "unlimited" assert second.headers["X-RateLimit-Remaining"] == "unlimited" + + def test_options_preflight_bypasses_rate_limit(self, rate_limited_client): + """Preflight requests should not consume the following real request's quota.""" + + options = rate_limited_client.options("/api/v1/test-endpoint") + first_get = rate_limited_client.get("/api/v1/test-endpoint") + second_get = rate_limited_client.get("/api/v1/test-endpoint") + + assert options.status_code != 429 + assert first_get.status_code == 200 + assert second_get.status_code == 429 + + def test_analytics_events_use_separate_rate_limit_bucket(self, rate_limited_client): + """Analytics event ingestion should not consume the normal API request bucket.""" + + analytics = rate_limited_client.post("/api/v1/analytics/events", json={"events": []}) + first_get = rate_limited_client.get("/api/v1/test-endpoint") + second_get = rate_limited_client.get("/api/v1/test-endpoint") + + assert analytics.status_code == 202 + assert analytics.headers["X-RateLimit-Limit"] == "120" + assert first_get.status_code == 200 + assert second_get.status_code == 429 diff --git a/backend/tests/middleware/test_rate_limit_middleware.py b/backend/tests/middleware/test_rate_limit_middleware.py index 6c25e40b..991fe54d 100644 --- a/backend/tests/middleware/test_rate_limit_middleware.py +++ b/backend/tests/middleware/test_rate_limit_middleware.py @@ -9,7 +9,11 @@ from fastapi import Request from fastapi.responses import JSONResponse -from app.middleware.rate_limit_middleware import RateLimitMiddleware, _is_immutable_asset_route +from app.middleware.rate_limit_middleware import ( + DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE, + RateLimitMiddleware, + _is_immutable_asset_route, +) class TestRateLimitMiddleware: @@ -209,6 +213,7 @@ async def test_get_tier_info_with_ip_whitelist_rejected(self, middleware): async def test_dispatch_rate_limit_disabled(self, middleware): """Requests should still log even when throttling is disabled.""" request = MagicMock(spec=Request) + request.method = "GET" request.url = MagicMock() request.url.path = "/api/v1/search" request.headers.get = MagicMock(return_value=None) @@ -249,6 +254,7 @@ async def test_dispatch_rate_limit_disabled(self, middleware): async def test_dispatch_admin_endpoint_skipped(self, middleware): """Admin requests should bypass throttling but still log usage.""" request = MagicMock(spec=Request) + request.method = "POST" request.url = MagicMock() request.url.path = "/api/v1/admin/cache/clear" request.headers.get = MagicMock(return_value=None) @@ -285,6 +291,7 @@ async def test_dispatch_admin_endpoint_skipped(self, middleware): async def test_dispatch_immutable_thumbnail_asset_skipped(self, middleware): """Immutable cached thumbnail assets should bypass throttling but still log.""" request = MagicMock(spec=Request) + request.method = "GET" request.url = MagicMock() request.url.path = ( "/api/v1/thumbnails/e7810cca426f65fa9e5e25124ca1b213b6c54deec0901c88805558faa7e25639" @@ -319,10 +326,90 @@ async def test_dispatch_immutable_thumbnail_asset_skipped(self, middleware): middleware.usage_log_service.log_request.assert_called_once() assert response.status_code == 200 + @pytest.mark.asyncio + async def test_dispatch_options_preflight_skipped_without_logging(self, middleware): + """CORS preflights should not consume quota or create usage-log writes.""" + request = MagicMock(spec=Request) + request.method = "OPTIONS" + request.url = MagicMock() + request.url.path = "/api/v1/search" + request.headers.get = MagicMock(return_value=None) + request.query_params.get = MagicMock(return_value=None) + request.client = MagicMock() + request.client.host = "192.168.1.1" + + middleware.usage_log_service.log_request = AsyncMock() + middleware.rate_limit_service.check_rate_limit = AsyncMock() + middleware.api_key_service.get_anonymous_tier = AsyncMock() + call_next = AsyncMock(return_value=JSONResponse(content={}, status_code=200)) + + response = await middleware.dispatch(request, call_next) + + call_next.assert_called_once_with(request) + middleware.api_key_service.get_anonymous_tier.assert_not_called() + middleware.rate_limit_service.check_rate_limit.assert_not_called() + middleware.usage_log_service.log_request.assert_not_called() + assert response.status_code == 200 + + @pytest.mark.asyncio + async def test_dispatch_analytics_uses_separate_ip_bucket(self, middleware): + """Analytics events should not spend the normal anonymous/API-key quota.""" + request = MagicMock(spec=Request) + request.method = "POST" + request.url = MagicMock() + request.url.path = "/api/v1/analytics/events" + request.headers.get = MagicMock(return_value=None) + request.query_params.get = MagicMock(return_value=None) + request.client = MagicMock() + request.client.host = "192.168.1.1" + + tier_info = { + "tier_id": 6, + "tier_name": "anonymous", + "display_name": "Anonymous", + "requests_per_minute": 10, + } + middleware.api_key_service.get_anonymous_tier = AsyncMock(return_value=tier_info) + middleware.rate_limit_service.check_rate_limit = AsyncMock( + return_value=(True, DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE - 1, 1234567890) + ) + middleware.rate_limit_service.get_rate_limit_headers = AsyncMock( + return_value={ + "X-RateLimit-Limit": str(DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE), + "X-RateLimit-Remaining": str(DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE - 1), + "X-RateLimit-Reset": "1234567890", + } + ) + middleware.usage_log_service.log_request = AsyncMock() + call_next = AsyncMock(return_value=JSONResponse(content={}, status_code=202)) + + with patch.dict( + os.environ, + { + "RATE_LIMIT_ENABLED": "true", + "DISABLE_RATE_LIMIT_FOR_TESTS": "false", + }, + clear=False, + ): + response = await middleware.dispatch(request, call_next) + + middleware.rate_limit_service.check_rate_limit.assert_called_once_with( + "analytics_events", + "ip:192.168.1.1", + DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE, + ) + call_next.assert_called_once_with(request) + middleware.usage_log_service.log_request.assert_called_once() + assert response.status_code == 202 + assert response.headers["X-RateLimit-Limit"] == str( + DEFAULT_ANALYTICS_EVENTS_REQUESTS_PER_MINUTE + ) + @pytest.mark.asyncio async def test_dispatch_rate_limit_exceeded(self, middleware): """Test that middleware returns 429 when rate limit exceeded.""" request = MagicMock(spec=Request) + request.method = "GET" request.url = MagicMock() request.url.path = "/api/v1/search" request.headers.get = MagicMock(return_value=None) @@ -366,6 +453,7 @@ async def test_dispatch_rate_limit_exceeded(self, middleware): async def test_dispatch_unlimited_tier(self, middleware): """Test that middleware allows unlimited tier requests.""" request = MagicMock(spec=Request) + request.method = "GET" request.url = MagicMock() request.url.path = "/api/v1/search" request.headers.get = MagicMock(return_value="test-api-key") diff --git a/backend/tests/scripts/test_prime_thumbnail_cache.py b/backend/tests/scripts/test_prime_thumbnail_cache.py index 3a3775f2..437115a3 100644 --- a/backend/tests/scripts/test_prime_thumbnail_cache.py +++ b/backend/tests/scripts/test_prime_thumbnail_cache.py @@ -42,6 +42,7 @@ async def test_prime_thumbnail_cached_remote_records_success(): prime_thumbnail_cache, "safe_record_thumbnail_state", new=AsyncMock() ) as mock_state, patch.object(prime_thumbnail_cache, "ImageService") as mock_service_cls, + patch.object(prime_thumbnail_cache, "_store_image_bytes", return_value=True) as mock_store, ): service = MagicMock() service._get_thumbnail_source_url.return_value = source_url @@ -61,6 +62,13 @@ async def test_prime_thumbnail_cached_remote_records_success(): ) assert result == ("cached", "resource-cached", "thumbnail already cached") + mock_store.assert_called_once_with( + "abc123", + b"cached-image", + "application/octet-stream", + resource_id="resource-cached", + hydrate_assets=True, + ) payload = mock_state.await_args.args[0] assert payload.state == "success" assert payload.source_hash == "abc123" @@ -122,6 +130,7 @@ async def test_prime_thumbnail_resume_rechecks_prior_success_and_rehydrates_cach "_compute_thumbnail_image_hash", return_value="abc123", ), + patch.object(prime_thumbnail_cache, "_store_image_bytes", return_value=True) as mock_store, ): service = MagicMock() service._get_thumbnail_source_url.return_value = source_url @@ -140,6 +149,13 @@ async def test_prime_thumbnail_resume_rechecks_prior_success_and_rehydrates_cach ) assert result == ("cached", "resource-resume-success", "thumbnail already cached") + mock_store.assert_called_once_with( + "abc123", + b"cached-image", + "application/octet-stream", + resource_id="resource-resume-success", + hydrate_assets=True, + ) payload = mock_state.await_args.args[0] assert payload.state == "success" assert payload.source_hash == "abc123" @@ -186,3 +202,64 @@ async def test_prime_thumbnail_retry_failures_allows_work(): assert result == ("generated", "resource-retry-failure", "remote") payload = mock_state.await_args.args[0] assert payload.state == "success" + + +@pytest.mark.asyncio +async def test_run_refuses_full_corpus_redis_asset_hydration(): + args = prime_thumbnail_cache.argparse.Namespace( + resource_ids=[], + limit=None, + batch_size=100, + concurrency=4, + force=False, + retry_failures=False, + retry_placeheld=False, + strict_failures=False, + hydrate_assets=True, + allow_full_hydration=False, + ) + + with patch.object(prime_thumbnail_cache, "_count_resources", return_value=1000): + assert await prime_thumbnail_cache._run(args) == 2 + + +@pytest.mark.asyncio +async def test_prime_thumbnail_cached_remote_can_skip_redis_asset_hydration(): + resource = {"id": "resource-cached-no-redis", "dct_accessrights_s": "Public"} + source_url = "https://example.com/thumb.png" + + with ( + patch.object(prime_thumbnail_cache, "fetch_distribution_context", AsyncMock()), + patch.object( + prime_thumbnail_cache, "safe_record_thumbnail_state", new=AsyncMock() + ) as mock_state, + patch.object(prime_thumbnail_cache, "ImageService") as mock_service_cls, + patch.object(prime_thumbnail_cache, "_store_image_bytes", return_value=True) as mock_store, + ): + service = MagicMock() + service._get_thumbnail_source_url.return_value = source_url + service._is_cog_url.return_value = False + service._is_pmtiles_url.return_value = False + service._is_manifest_url.return_value = False + service.get_cached_image = AsyncMock(return_value=b"cached-image") + mock_service_cls.return_value = service + + with patch.object( + prime_thumbnail_cache, + "_compute_thumbnail_image_hash", + return_value="abc123", + ): + result = await prime_thumbnail_cache._prime_thumbnail_for_resource( + resource, force=False, hydrate_assets=False + ) + + assert result == ("cached", "resource-cached-no-redis", "thumbnail already cached") + mock_store.assert_called_once_with( + "abc123", + b"cached-image", + "application/octet-stream", + resource_id="resource-cached-no-redis", + hydrate_assets=False, + ) + payload = mock_state.await_args.args[0] + assert payload.state == "success" diff --git a/backend/tests/tasks/test_ogm_harvest_tasks.py b/backend/tests/tasks/test_ogm_harvest_tasks.py new file mode 100644 index 00000000..6f9ac54a --- /dev/null +++ b/backend/tests/tasks/test_ogm_harvest_tasks.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from app.tasks import ogm_harvest + + +@pytest.mark.asyncio +async def test_ogm_harvest_all_nightly_trigger_selects_scheduled_modes(monkeypatch): + monkeypatch.setattr(ogm_harvest.database, "is_connected", True) + + list_repos = AsyncMock( + return_value=[ + {"ogm_repo_name": "nightly-repo", "ogm_enabled": True, "ogm_watch_mode": "nightly"}, + {"ogm_repo_name": "weekly-repo", "ogm_enabled": True, "ogm_watch_mode": "weekly"}, + {"ogm_repo_name": "scheduled-repo", "ogm_enabled": True, "ogm_watch_mode": "scheduled"}, + {"ogm_repo_name": "both-repo", "ogm_enabled": True, "ogm_watch_mode": "both"}, + {"ogm_repo_name": "manual-repo", "ogm_enabled": True, "ogm_watch_mode": "manual"}, + {"ogm_repo_name": "disabled-repo", "ogm_enabled": False, "ogm_watch_mode": "nightly"}, + ] + ) + + monkeypatch.setattr( + ogm_harvest.OGMHarvestRepository, + "list_repos", + list_repos, + ) + + enqueued = [] + + def fake_delay(*, repo_name: str, trigger: str): + enqueued.append((repo_name, trigger)) + return SimpleNamespace(id=f"task-{repo_name}") + + monkeypatch.setattr(ogm_harvest.ogm_harvest_repo, "delay", fake_delay) + + result = await ogm_harvest._ogm_harvest_all_async(trigger="nightly") + + assert result["enqueued"] == 4 + assert result["repo_names"] == [ + "nightly-repo", + "weekly-repo", + "scheduled-repo", + "both-repo", + ] + assert enqueued == [ + ("nightly-repo", "nightly"), + ("weekly-repo", "nightly"), + ("scheduled-repo", "nightly"), + ("both-repo", "nightly"), + ] diff --git a/docs/backend/kamal_deployment.md b/docs/backend/kamal_deployment.md index d4fba125..e601c794 100644 --- a/docs/backend/kamal_deployment.md +++ b/docs/backend/kamal_deployment.md @@ -291,6 +291,17 @@ The cron container currently runs: - daily sitemap generation at `4:15 AM America/Chicago` - daily analytics storage maintenance at `4:45 AM America/Chicago` +OpenGeoMetadata repository discovery and harvest enqueueing is handled by +`.github/workflows/ogm-nightly-sync.yml`. The workflow SSHes to production, finds a running +app container, and runs: + +```bash +python /app/backend/scripts/trigger_ogm_nightly_sync.py +``` + +Configure `OGM_KAMAL_SSH_HOST`, `OGM_KAMAL_SSH_PORT`, `OGM_KAMAL_SSH_USER`, and +`OGM_KAMAL_SSH_PRIVATE_KEY` as GitHub Actions secrets before relying on the workflow. + Cron now sets `CRON_TZ=America/Chicago` in the crontab, and bridge delta windows are computed from the previous America/Chicago day before converting to UTC for the bridge API. Each cron shell also loads `/app/scripts/cron_env.sh` via `BASH_ENV`, which restores diff --git a/docs/backend/ogm_harvesting.md b/docs/backend/ogm_harvesting.md index ca394b6b..493768d2 100644 --- a/docs/backend/ogm_harvesting.md +++ b/docs/backend/ogm_harvesting.md @@ -22,6 +22,11 @@ During harvest, imported records now update three local DB surfaces automaticall - `ogm_harvest_repo(repo_name, trigger)` - `ogm_harvest_all(trigger)` +- **Public monitor**: + - `GET /api/v1/ogm/repos` + - `GET /api/v1/ogm/repos/dashboard` + - `GET /api/v1/ogm/harvest/failures` + - **Admin endpoints** (Basic Auth protected under `/api/v1/admin/*`): - `GET /api/v1/admin/ogm/repos` - `PATCH /api/v1/admin/ogm/repos/{repo_name}` @@ -90,11 +95,11 @@ Ensure the backend image includes `git` (the project `Dockerfile` installs it). curl -u admin:changeme -X PATCH \ "http://localhost:8000/api/v1/admin/ogm/repos/edu.stanford.purl" \ -H "Content-Type: application/json" \ - -d '{"ogm_enabled":true,"ogm_watch_mode":"weekly"}' + -d '{"ogm_enabled":true,"ogm_watch_mode":"nightly"}' ``` Supported `ogm_watch_mode` values: -- `weekly`, `webhook`, `both`, `manual` +- `nightly`, `weekly`, `webhook`, `both`, `manual` ### 1a) Populate `ogm_repos` from GitHub (bulk) @@ -105,7 +110,42 @@ docker compose exec api bash -lc "cd /app/backend && python scripts/populate_ogm ``` Notes: -- This script checks whether each repo has a `metadata-aardvark/` directory.\n+ - If **missing**, it sets `ogm_enabled=false` and flags it via `ogm_tags.ogm_missing_aardvark=true`.\n+ - If **present**, it sets `ogm_enabled=true` and `ogm_watch_mode=weekly`.\n+- For better GitHub rate limits, pass a token:\n+ - `GITHUB_TOKEN=... python scripts/populate_ogm_repos.py` +- This script checks whether each repo has a `metadata-aardvark/` directory. +- If metadata is missing, it sets `ogm_enabled=false` and flags the repo with `ogm_tags.ogm_missing_aardvark=true`. +- If metadata is present, it sets `ogm_enabled=true` and `ogm_watch_mode=nightly`. +- For better GitHub rate limits, pass a token: + - `GITHUB_TOKEN=... python scripts/populate_ogm_repos.py` + +### 1b) Nightly discovery and harvest enqueue + +The helper script `scripts/trigger_ogm_nightly_sync.py` refreshes the repository watch list +from the `OpenGeoMetadata` GitHub organization and then enqueues `ogm_harvest_all` with +`trigger="nightly"`. + +```bash +docker compose exec api bash -lc "cd /app/backend && python scripts/trigger_ogm_nightly_sync.py" +``` + +Use `--dry-run` to inspect discovery without writing repo rows or enqueueing harvest work, +and `--skip-harvest` to refresh `ogm_repos` without enqueueing the harvest task. + +The repository also includes `.github/workflows/ogm-nightly-sync.yml`, which can run the +same script on production through SSH. Configure these GitHub Actions secrets before +enabling the workflow: + +- `OGM_KAMAL_SSH_HOST` +- `OGM_KAMAL_SSH_PORT` (optional, defaults to `22`) +- `OGM_KAMAL_SSH_USER` +- `OGM_KAMAL_SSH_PRIVATE_KEY` + +### 1c) Monitor repo health + +Use the dashboard to inspect discovered repositories, Aardvark availability, harvest +freshness, and published API counts: + +```text +GET /api/v1/ogm/repos/dashboard +``` ### 2) Trigger a harvest @@ -121,13 +161,13 @@ curl -u admin:changeme -X POST \ This updates Postgres immediately, including distributions and relationship rows for the touched records. If you need local Elasticsearch/search results to reflect the new or changed records right away, run `make reindex` after the harvest completes. -All enabled weekly repos: +All enabled scheduled repos: ```bash curl -u admin:changeme -X POST \ "http://localhost:8000/api/v1/admin/ogm/harvest" \ -H "Content-Type: application/json" \ - -d '{"ogm_all":true,"ogm_trigger":"weekly"}' + -d '{"ogm_all":true,"ogm_trigger":"nightly"}' ``` ### 3) Query by repo (multi-select)