From 63896905a5f0f7d4f4786852d36e9ed5040c593b Mon Sep 17 00:00:00 2001 From: Miriad Date: Tue, 3 Mar 2026 17:43:14 +0000 Subject: [PATCH 1/3] feat: redesign YouTube stats tracking with Supabase pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace Sanity-based youtubeUpdateTask queue with Supabase youtube_stats table - New 3-phase pipeline: discover → fetch → sync (lib/youtube-stats.ts) - Route reduced from 349 to 54 lines (85% reduction) - Sanity only written to when stats actually change - Remove youtubeUpdateTask schema, types, and config references - Add Supabase migration for youtube_stats table with indexes and RPC - Update cron route to forward ?action= param for phase-specific runs - Add cleanup route to delete existing youtubeUpdateTask documents - Add architecture documentation Co-authored-by: youtubeviews --- app/api/cron/route.tsx | 5 +- app/api/youtube/cleanup/route.tsx | 70 ++++ app/api/youtube/views/route.tsx | 391 +++--------------- docs/youtube-stats-architecture.md | 237 +++++++++++ lib/youtube-stats.ts | 363 ++++++++++++++++ sanity.config.ts | 2 - sanity/schemas/documents/youtubeUpdateTask.ts | 56 --- sanity/types.ts | 13 - supabase/migrations/001_youtube_stats.sql | 50 +++ 9 files changed, 772 insertions(+), 415 deletions(-) create mode 100644 app/api/youtube/cleanup/route.tsx create mode 100644 docs/youtube-stats-architecture.md create mode 100644 lib/youtube-stats.ts delete mode 100644 sanity/schemas/documents/youtubeUpdateTask.ts create mode 100644 supabase/migrations/001_youtube_stats.sql diff --git a/app/api/cron/route.tsx b/app/api/cron/route.tsx index ffe8863a..88534b31 100644 --- a/app/api/cron/route.tsx +++ b/app/api/cron/route.tsx @@ -12,7 +12,10 @@ export function GET(request: NextRequest) { }); } try { - const url = `${publicURL()}/api/youtube/views`; + // Forward the action param if present (discover, fetch, sync) + const action = request.nextUrl.searchParams.get("action"); + const params = action ? `?action=${action}` : ""; + const url = `${publicURL()}/api/youtube/views${params}`; console.log("[CRON] Triggering YouTube views update:", url); fetch(url, { method: "POST", diff --git a/app/api/youtube/cleanup/route.tsx b/app/api/youtube/cleanup/route.tsx new file mode 100644 index 00000000..a9765627 --- /dev/null +++ b/app/api/youtube/cleanup/route.tsx @@ -0,0 +1,70 @@ +/** + * One-time cleanup script: Delete all youtubeUpdateTask documents from Sanity. + * + * Run via: POST /api/youtube/cleanup + * Auth: Bearer CRON_SECRET + * + * DELETE THIS FILE after running it once. + */ +export const fetchCache = "force-no-store"; + +import { createClient } from "next-sanity"; +import type { NextRequest } from "next/server"; +import { apiVersion, dataset, projectId } from "@/sanity/lib/api"; + +const sanityWriteClient = createClient({ + projectId, + dataset, + apiVersion, + token: process.env.SANITY_API_TOKEN || process.env.SANITY_API_WRITE_TOKEN, + perspective: "published", + useCdn: false, +}); + +export async function POST(request: NextRequest) { + const authHeader = request.headers.get("authorization"); + if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { + return new Response("Unauthorized", { status: 401 }); + } + + try { + // Count existing tasks + const count = await sanityWriteClient.fetch( + `count(*[_type == "youtubeUpdateTask"])` + ); + console.log(`[CLEANUP] Found ${count} youtubeUpdateTask documents to delete`); + + if (count === 0) { + return Response.json({ success: true, deleted: 0, message: "No documents to delete" }); + } + + // Delete in batches of 100 + let totalDeleted = 0; + while (true) { + const tasks: { _id: string }[] = await sanityWriteClient.fetch( + `*[_type == "youtubeUpdateTask"][0...100]{ _id }` + ); + + if (!tasks || tasks.length === 0) break; + + // Use transaction for batch delete + const tx = sanityWriteClient.transaction(); + for (const task of tasks) { + tx.delete(task._id); + } + await tx.commit({ visibility: "async" }); + + totalDeleted += tasks.length; + console.log(`[CLEANUP] Deleted batch of ${tasks.length} (total: ${totalDeleted})`); + } + + console.log(`[CLEANUP] Done. Deleted ${totalDeleted} youtubeUpdateTask documents.`); + return Response.json({ success: true, deleted: totalDeleted }); + } catch (error) { + console.error("[CLEANUP] Error:", error); + return Response.json( + { success: false, error: String(error) }, + { status: 500 } + ); + } +} diff --git a/app/api/youtube/views/route.tsx b/app/api/youtube/views/route.tsx index dd66e412..520c7579 100644 --- a/app/api/youtube/views/route.tsx +++ b/app/api/youtube/views/route.tsx @@ -1,348 +1,53 @@ export const fetchCache = "force-no-store"; -import { publicURL, youtubeParser } from "@/lib/utils"; -import { createClient } from "next-sanity"; -import type { NextRequest } from "next/server"; -import { apiVersion, dataset, projectId, studioUrl } from "@/sanity/lib/api"; - -const sanityWriteClient = createClient({ - projectId, - dataset, - apiVersion, - token: process.env.SANITY_API_WRITE_TOKEN, - perspective: "published", - useCdn: false, -}); - -async function processBatchTasks() { - console.log( - "[YOUTUBE] Fetching up to 10 pending youtubeUpdateTask tasks from Sanity", - ); - let tasks = await sanityWriteClient.fetch( - `*[_type == "youtubeUpdateTask" && status == "pending"]| order(lastChecked asc)[0...10]{ _id, targetDoc->{_id, _type, youtube}, status }`, - ); - console.log(`[YOUTUBE] Fetched ${tasks?.length || 0} tasks`); - if (!tasks || tasks.length === 0) { - console.log("[YOUTUBE] No pending tasks found"); - return { processed: 0 }; - } - - // Prepare video IDs and map taskId to docId - const validTasks = []; - const errorTasks = []; - for (const task of tasks) { - const { _id: taskId, targetDoc } = task; - if (!targetDoc || !targetDoc.youtube) { - errorTasks.push({ taskId, error: "Missing YouTube field on targetDoc" }); - continue; - } - const id = youtubeParser(targetDoc.youtube); - if (!id) { - errorTasks.push({ taskId, error: "Invalid YouTube URL" }); - continue; - } - validTasks.push({ taskId, docId: targetDoc._id, youtubeId: id }); - } - console.log( - `[YOUTUBE] validTasks: ${validTasks.length}, errorTasks: ${errorTasks.length}`, - ); - - // Mark all valid tasks as inProgress (sequential) - try { - for (const t of validTasks) { - console.log(`[YOUTUBE] Marking task ${t.taskId} as inProgress`); - try { - await sanityWriteClient - .patch(t.taskId) - .set({ status: "inProgress", lastChecked: new Date().toISOString() }) - .commit({ visibility: "async" }); - console.log(`[YOUTUBE] Task ${t.taskId} marked as inProgress`); - } catch (err) { - console.error( - `[YOUTUBE] Error marking task ${t.taskId} as inProgress:`, - err, - ); - } - } - console.log(`[YOUTUBE] Marked ${validTasks.length} tasks as inProgress`); - } catch (err) { - console.error("[YOUTUBE] Error marking tasks as inProgress", err); - } - - // Mark all error tasks as error (sequential) - if (errorTasks.length > 0) { - try { - for (const t of errorTasks) { - console.log( - `[YOUTUBE] Marking error task ${t.taskId} as error: ${t.error}`, - ); - try { - await sanityWriteClient - .patch(t.taskId) - .set({ - status: "error", - errorMessage: t.error, - lastChecked: new Date().toISOString(), - }) - .commit({ visibility: "async" }); - console.log(`[YOUTUBE] Error task ${t.taskId} marked as error`); - } catch (err) { - console.error(`[YOUTUBE] Error marking error task ${t.taskId}:`, err); - } - } - console.log(`[YOUTUBE] Marked ${errorTasks.length} tasks as error`); - } catch (err) { - console.error("[YOUTUBE] Error marking error tasks", err); - } - } - if (validTasks.length === 0) { - console.log("[YOUTUBE] No valid tasks to process"); - return { processed: 0, errors: errorTasks.length }; - } - - // Batch YouTube API call in groups of 50 IDs - const statsMap = new Map(); - const batchSize = 50; - let apiError = null; - for (let i = 0; i < validTasks.length; i += batchSize) { - const batch = validTasks.slice(i, i + batchSize); - const ids = batch.map((t) => t.youtubeId).join(","); - console.log(`[YOUTUBE] Fetching stats for IDs batch: ${ids}`); - let videoResp: Response, json: any; - try { - videoResp = await fetch( - `https://www.googleapis.com/youtube/v3/videos?id=${ids}&key=${process.env.YOUTUBE_API_KEY}&fields=items(id,statistics)&part=statistics`, - ); - json = await videoResp.json(); - console.log(`[YOUTUBE] YouTube API response status: ${videoResp.status}`); - if (videoResp.status !== 200) { - console.error("[YOUTUBE] YouTube API error", json); - apiError = json; - // Mark all tasks in this batch as error - for (const t of batch) { - console.log( - `[YOUTUBE] Marking batch task ${t.taskId} as error due to API error`, - ); - try { - await sanityWriteClient - .patch(t.taskId) - .set({ - status: "error", - errorMessage: JSON.stringify(json), - lastChecked: new Date().toISOString(), - }) - .commit({ visibility: "async" }); - console.log(`[YOUTUBE] Batch task ${t.taskId} marked as error`); - } catch (err) { - console.error( - `[YOUTUBE] Error marking task ${t.taskId} as error after YouTube API error:`, - err, - ); - } - } - break; - } - for (const item of json?.items || []) { - console.log(`[YOUTUBE] Adding stats for video ID: ${item.id}`); - statsMap.set(item.id, item.statistics); - } - } catch (err) { - console.error("[YOUTUBE] Error fetching YouTube stats", err); - apiError = err; - for (const t of batch) { - console.log( - `[YOUTUBE] Marking batch task ${t.taskId} as error due to fetch error`, - ); - try { - await sanityWriteClient - .patch(t.taskId) - .set({ - status: "error", - errorMessage: String(err), - lastChecked: new Date().toISOString(), - }) - .commit({ visibility: "async" }); - console.log(`[YOUTUBE] Batch task ${t.taskId} marked as error`); - } catch (err2) { - console.error( - `[YOUTUBE] Error marking task ${t.taskId} as error after fetch error:`, - err2, - ); - } - } - break; - } - } - console.log(`[YOUTUBE] StatsMap size: ${statsMap.size}`); - if (apiError) { - return { processed: 0, errors: validTasks.length }; - } - - let completed = 0; - const patchOps = []; - const completedTaskOps = []; - const erroredTaskOps = []; - for (const t of validTasks) { - console.log( - `[YOUTUBE] Processing validTask: taskId=${t.taskId}, docId=${t.docId}, youtubeId=${t.youtubeId}`, - ); - const statistics = statsMap.get(t.youtubeId); - if (!statistics) { - console.log( - `[YOUTUBE] No statistics found for youtubeId=${t.youtubeId}, marking task ${t.taskId} as error`, - ); - try { - await sanityWriteClient - .patch(t.taskId) - .set({ - status: "error", - errorMessage: "No statistics found", - lastChecked: new Date().toISOString(), - }) - .commit({ visibility: "async" }); - console.log( - `[YOUTUBE] Task ${t.taskId} marked as error (no statistics)`, - ); - } catch (err) { - console.error( - `[YOUTUBE] Error marking task ${t.taskId} as error:`, - err, - ); - } - continue; - } - try { - console.log( - `[YOUTUBE] Updating doc ${t.docId} with statistics for youtubeId=${t.youtubeId}`, - ); - await sanityWriteClient - .patch(t.docId) - .set({ - "statistics.youtube.commentCount": Number.parseInt( - statistics.commentCount, - ), - "statistics.youtube.favoriteCount": Number.parseInt( - statistics.favoriteCount, - ), - "statistics.youtube.likeCount": Number.parseInt(statistics.likeCount), - "statistics.youtube.viewCount": Number.parseInt(statistics.viewCount), - }) - .commit({ visibility: "async" }); - console.log(`[YOUTUBE] Updated doc ${t.docId}`); - await sanityWriteClient - .patch(t.taskId) - .set({ - status: "completed", - lastChecked: new Date().toISOString(), - errorMessage: undefined, - }) - .commit({ visibility: "async" }); - console.log(`[YOUTUBE] Task ${t.taskId} marked as completed`); - completed++; - } catch (err) { - console.error( - `[YOUTUBE] Error updating doc ${t.docId} or task ${t.taskId}:`, - err, - ); - } - } - // Log summary - console.log(`[YOUTUBE] Patched ${completed} docs/tasks sequentially`); - return { - processed: completed, - errors: errorTasks.length + (validTasks.length - completed), - }; -} +import type { NextRequest } from "next/server"; +import { + syncSanityVideosToSupabase, + fetchAndStoreYouTubeStats, + pushStatsToSanity, +} from "@/lib/youtube-stats"; export async function POST(request: NextRequest) { - const authHeader = request.headers.get("authorization"); - if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { - console.error( - "[YOUTUBE] Unauthorized request: invalid authorization header", - ); - return new Response("Unauthorized", { status: 401 }); - } - - try { - console.log("[YOUTUBE] POST handler started"); - // Repopulate youtubeUpdateTask queue if empty - let tasks = await sanityWriteClient.fetch( - `*[_type == "youtubeUpdateTask" && status == "pending"]| order(lastChecked asc)[0...1]{ _id }`, - ); - console.log(`[YOUTUBE] Pending tasks in queue: ${tasks?.length || 0}`); - if (!tasks || tasks.length === 0) { - console.log( - "[YOUTUBE] No pending tasks, repopulating queue from posts and podcasts", - ); - const podcasts = await sanityWriteClient.fetch( - '*[_type == "podcast" && defined(youtube)]{_id, _type, youtube,date} | order(date desc)[0...1000]', - ); - const posts = await sanityWriteClient.fetch( - '*[_type == "post" && defined(youtube)]{_id, _type, youtube,date} | order(date desc)[0...1000]', - ); - const allDocs = [...podcasts, ...posts]; - console.log(`[YOUTUBE] Found ${allDocs.length} docs with YouTube`); - // Fetch all existing youtubeUpdateTask docs for these docs - const existingTasks = await sanityWriteClient.fetch( - '*[_type == "youtubeUpdateTask" && defined(targetDoc._ref)]{_id, targetDoc}', - ); - const existingTaskMap = new Map( - existingTasks.map( - (t: { _id: string; targetDoc?: { _ref?: string } }) => [ - t.targetDoc?._ref, - t._id, - ], - ), - ); - for (const doc of allDocs) { - const taskId = existingTaskMap.get(doc._id); - if (typeof taskId === "string" && taskId) { - // Update status to pending - try { - await sanityWriteClient - .patch(taskId) - .set({ status: "pending", lastChecked: null }) - .commit({ visibility: "async" }); - console.log( - `[YOUTUBE] Marked existing youtubeUpdateTask ${taskId} as pending for doc ${doc._id}`, - ); - } catch (err) { - console.error( - `[YOUTUBE] Error marking youtubeUpdateTask ${taskId} as pending for doc ${doc._id}:`, - err, - ); - } - } else { - // Create new youtubeUpdateTask - try { - await sanityWriteClient.create({ - _type: "youtubeUpdateTask", - targetDoc: { _type: "reference", _ref: doc._id }, - status: "pending", - lastChecked: null, - }); - console.log( - `[YOUTUBE] Created new youtubeUpdateTask for doc ${doc._id}`, - ); - } catch (err) { - console.error( - `[YOUTUBE] Error creating youtubeUpdateTask for doc ${doc._id}:`, - err, - ); - } - } - } - } - - // Process a batch of tasks - const result = await processBatchTasks(); - console.log("[YOUTUBE] Batch process result", result); - return Response.json({ success: true, ...result }); - } catch (error) { - console.error("[YOUTUBE] Unexpected error:", error); - return Response.json( - { success: false, error: String(error) }, - { status: 500 }, - ); - } + // Authenticate via CRON_SECRET + const authHeader = request.headers.get("authorization"); + if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { + console.error("[YOUTUBE] Unauthorized request: invalid authorization header"); + return new Response("Unauthorized", { status: 401 }); + } + + const { searchParams } = new URL(request.url); + const action = searchParams.get("action"); + + try { + console.log(`[YOUTUBE] POST handler started (action=${action ?? "all"})`); + + const results: Record = {}; + + // Phase 1: Discover — sync Sanity videos to Supabase registry + if (!action || action === "discover") { + console.log("[YOUTUBE] Running discover phase..."); + results.discover = await syncSanityVideosToSupabase(); + } + + // Phase 2: Fetch — poll YouTube API and store stats in Supabase + if (!action || action === "fetch") { + console.log("[YOUTUBE] Running fetch phase..."); + results.fetch = await fetchAndStoreYouTubeStats(); + } + + // Phase 3: Sync — push updated stats from Supabase to Sanity + if (!action || action === "sync") { + console.log("[YOUTUBE] Running sync phase..."); + results.sync = await pushStatsToSanity(); + } + + console.log("[YOUTUBE] Completed successfully", results); + return Response.json({ success: true, ...results }); + } catch (error) { + console.error("[YOUTUBE] Unexpected error:", error); + return Response.json( + { success: false, error: String(error) }, + { status: 500 } + ); + } } diff --git a/docs/youtube-stats-architecture.md b/docs/youtube-stats-architecture.md new file mode 100644 index 00000000..9bd4e004 --- /dev/null +++ b/docs/youtube-stats-architecture.md @@ -0,0 +1,237 @@ +# YouTube Stats Architecture + +## Overview + +This document describes the redesigned YouTube views tracking system for codingcat.dev. The system fetches YouTube video statistics (views, likes, comments, favorites) and stores them on Sanity content documents. + +## The Old Approach (and Its Problems) + +The previous implementation used **Sanity as a task queue**: + +1. Created `youtubeUpdateTask` documents in Sanity with status fields (`pending`, `inProgress`, `completed`, `error`) +2. Every cron run would: + - Query pending tasks from Sanity + - Mark each task as `inProgress` (N writes) + - Fetch YouTube API statistics + - Update target content documents (N writes) + - Mark tasks as `completed` (N writes) +3. When the queue was empty, it re-fetched ALL posts and podcasts with YouTube URLs (up to 2,000 docs), checked for existing tasks, and created/reset them + +**Problems:** +- **3N+ Sanity mutations per cron run** just for tracking state, plus the actual stats updates +- Sanity is not designed to be a task queue — this was expensive and slow +- No separation between polling state and content updates +- Error recovery was complex and fragile +- Each cron run could generate hundreds of Sanity API calls + +## The New Approach + +The redesigned system uses a **three-phase pipeline** with Supabase as the tracking layer: + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ 1. DISCOVER │ ──▶ │ 2. FETCH │ ──▶ │ 3. SYNC │ +│ Sanity → │ │ YouTube → │ │ Supabase → │ +│ Supabase │ │ Supabase │ │ Sanity │ +└─────────────┘ └─────────────┘ └─────────────┘ +``` + +### Phase 1: Discover (`syncSanityVideosToSupabase`) + +- Queries Sanity for all posts/podcasts with YouTube URLs +- Upserts them into the `youtube_stats` Supabase table +- This creates a **registry** of all videos we need to track +- Idempotent — safe to run repeatedly + +### Phase 2: Fetch (`fetchAndStoreYouTubeStats`) + +- Queries Supabase for records ordered by `last_fetched_at ASC NULLS FIRST` (oldest/never-fetched first) +- Calls the YouTube Data API v3 for their statistics (batched in groups of 50) +- Updates the Supabase rows with the latest stats +- **Zero Sanity writes** — all state is in Supabase + +### Phase 3: Sync (`pushStatsToSanity`) + +- Queries Supabase for records where `last_fetched_at > last_synced_to_sanity_at` (or never synced) +- Patches the corresponding Sanity documents with the statistics +- Updates `last_synced_to_sanity_at` after successful push +- **Only writes to Sanity when we have actual data to update** + +### Why This Is Better + +| Aspect | Old | New | +|--------|-----|-----| +| Sanity writes per cron | 3N+ (task state + content) | N (content only, when changed) | +| Task queue | Sanity documents | Supabase table | +| State tracking | Sanity mutations | Supabase updates (cheap) | +| Error recovery | Complex status management | Simple: re-fetch stale records | +| Phases | Monolithic | Independent, can run separately | + +## Database Schema + +The `youtube_stats` table in Supabase: + +| Column | Type | Description | +|--------|------|-------------| +| `id` | uuid (PK) | Auto-generated | +| `sanity_doc_id` | text (UNIQUE) | Sanity document `_id` | +| `sanity_doc_type` | text | "post" or "podcast" | +| `youtube_id` | text | YouTube video ID | +| `youtube_url` | text | Original URL | +| `view_count` | bigint | YouTube view count | +| `like_count` | bigint | YouTube like count | +| `comment_count` | bigint | YouTube comment count | +| `favorite_count` | bigint | YouTube favorite count | +| `last_fetched_at` | timestamptz | Last YouTube API fetch | +| `last_synced_to_sanity_at` | timestamptz | Last Sanity push | +| `created_at` | timestamptz | Row creation time | +| `updated_at` | timestamptz | Auto-updated on change | + +Indexes: +- `youtube_id` — for lookups by video ID +- `last_fetched_at` (NULLS FIRST) — for finding stale records + +## Running the Migration + +Apply the migration to your Supabase project: + +```bash +# Using Supabase CLI (if linked) +supabase db push + +# Or apply directly via psql +psql "$DATABASE_URL" -f supabase/migrations/001_youtube_stats.sql + +# Or apply via the Supabase Dashboard SQL Editor +# Copy the contents of supabase/migrations/001_youtube_stats.sql and run it +``` + +## API Endpoint + +``` +POST /api/youtube/views +Authorization: Bearer +``` + +Query parameters: +- `?action=discover` — run Phase 1 only +- `?action=fetch` — run Phase 2 only +- `?action=sync` — run Phase 3 only +- No action — run all three phases in sequence + +## Cron Configuration (Supabase) + +The cron is triggered by **Supabase cron jobs** (via `pg_cron` + `pg_net`), which call the existing `GET /api/cron` gateway route. That route fire-and-forgets a `POST /api/youtube/views` internally, so it returns immediately and won't time out. + +### How it works + +``` +Supabase pg_cron + → HTTP GET /api/cron?action=... (Bearer CRON_SECRET) + → fire-and-forget POST /api/youtube/views?action=... (Bearer CRON_SECRET) + → runs discover / fetch / sync phases +``` + +### Recommended Supabase cron jobs + +Set these up in the Supabase Dashboard → SQL Editor (or via a migration): + +```sql +-- Enable the extensions (if not already enabled) +CREATE EXTENSION IF NOT EXISTS pg_cron; +CREATE EXTENSION IF NOT EXISTS pg_net; + +-- 1. Discover new videos once per day at 3am UTC +SELECT cron.schedule( + 'youtube-stats-discover', + '0 3 * * *', + $$ + SELECT net.http_get( + url := 'https://codingcat.dev/api/cron?action=discover', + headers := jsonb_build_object( + 'Authorization', 'Bearer ' || current_setting('app.settings.cron_secret'), + 'Cache-Control', 'no-cache' + ) + ); + $$ +); + +-- 2. Fetch YouTube stats every hour +SELECT cron.schedule( + 'youtube-stats-fetch', + '0 * * * *', + $$ + SELECT net.http_get( + url := 'https://codingcat.dev/api/cron?action=fetch', + headers := jsonb_build_object( + 'Authorization', 'Bearer ' || current_setting('app.settings.cron_secret'), + 'Cache-Control', 'no-cache' + ) + ); + $$ +); + +-- 3. Sync stats to Sanity every hour, offset by 30 minutes +SELECT cron.schedule( + 'youtube-stats-sync', + '30 * * * *', + $$ + SELECT net.http_get( + url := 'https://codingcat.dev/api/cron?action=sync', + headers := jsonb_build_object( + 'Authorization', 'Bearer ' || current_setting('app.settings.cron_secret'), + 'Cache-Control', 'no-cache' + ) + ); + $$ +); +``` + +> **Note:** You'll need to set `app.settings.cron_secret` in your Supabase project settings, +> or replace `current_setting('app.settings.cron_secret')` with your actual `CRON_SECRET` value. +> Alternatively, if you already have a single cron job calling `/api/cron` (the existing setup), +> you can keep that as-is — it will run all three phases in sequence by default. + +### Simple option: keep the existing single cron + +If you'd rather not set up three separate cron jobs, the existing Supabase cron that calls `GET /api/cron` (no action param) will run **all three phases** in sequence. This is the simplest setup: + +```sql +-- Single cron: runs discover → fetch → sync every day at midnight +SELECT cron.schedule( + 'youtube-stats-all', + '0 0 * * *', + $$ + SELECT net.http_get( + url := 'https://codingcat.dev/api/cron', + headers := jsonb_build_object( + 'Authorization', 'Bearer ' || current_setting('app.settings.cron_secret'), + 'Cache-Control', 'no-cache' + ) + ); + $$ +); +``` + +### Managing cron jobs + +```sql +-- List all scheduled jobs +SELECT * FROM cron.job; + +-- Unschedule a job +SELECT cron.unschedule('youtube-stats-discover'); + +-- Check recent job runs +SELECT * FROM cron.job_run_details ORDER BY start_time DESC LIMIT 20; +``` + +## Environment Variables Required + +- `YOUTUBE_API_KEY` — YouTube Data API v3 key (read-only stats) +- `SUPABASE_URL` / `NEXT_PUBLIC_SUPABASE_URL` — Supabase project URL +- `SUPABASE_SERVICE_ROLE_KEY` — Supabase service role key (bypasses RLS) +- `SANITY_API_WRITE_TOKEN` — Sanity API token with write access +- `CRON_SECRET` — Bearer token for authenticating cron requests +- `NEXT_PUBLIC_SANITY_PROJECT_ID` — Sanity project ID +- `NEXT_PUBLIC_SANITY_DATASET` — Sanity dataset name diff --git a/lib/youtube-stats.ts b/lib/youtube-stats.ts new file mode 100644 index 00000000..30481017 --- /dev/null +++ b/lib/youtube-stats.ts @@ -0,0 +1,363 @@ +import { createClient } from "@supabase/supabase-js"; +import { createClient as createSanityClient } from "next-sanity"; +import { apiVersion, dataset, projectId } from "@/sanity/lib/api"; +import { youtubeParser } from "@/lib/utils"; + +// --------------------------------------------------------------------------- +// Clients +// --------------------------------------------------------------------------- + +function getSupabaseAdmin() { + return createClient( + process.env.SUPABASE_URL || process.env.NEXT_PUBLIC_SUPABASE_URL!, + process.env.SUPABASE_SERVICE_ROLE_KEY!, + { auth: { autoRefreshToken: false, persistSession: false } } + ); +} + +function getSanityWriteClient() { + return createSanityClient({ + projectId, + dataset, + apiVersion, + token: process.env.SANITY_API_TOKEN || process.env.SANITY_API_WRITE_TOKEN, + perspective: "published", + useCdn: false, + }); +} + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export interface YouTubeStatistics { + viewCount: string; + likeCount: string; + favoriteCount: string; + commentCount: string; +} + +export interface YouTubeStatsRow { + id: string; + sanity_doc_id: string; + sanity_doc_type: string; + youtube_id: string; + youtube_url: string | null; + view_count: number; + like_count: number; + comment_count: number; + favorite_count: number; + last_fetched_at: string | null; + last_synced_to_sanity_at: string | null; + created_at: string; + updated_at: string; +} + +// --------------------------------------------------------------------------- +// 1. getYouTubeStats — fetch statistics from YouTube Data API v3 +// --------------------------------------------------------------------------- + +/** + * Fetches YouTube video statistics for the given video IDs. + * Batches requests in groups of 50 (YouTube API limit). + * Returns a Map from videoId to statistics object. + */ +export async function getYouTubeStats( + videoIds: string[] +): Promise> { + const apiKey = process.env.YOUTUBE_API_KEY; + if (!apiKey) { + throw new Error("Missing YOUTUBE_API_KEY environment variable"); + } + + const statsMap = new Map(); + const batchSize = 50; + + for (let i = 0; i < videoIds.length; i += batchSize) { + const batch = videoIds.slice(i, i + batchSize); + const ids = batch.join(","); + + const url = `https://www.googleapis.com/youtube/v3/videos?id=${ids}&key=${apiKey}&fields=items(id,statistics)&part=statistics`; + + const response = await fetch(url); + if (!response.ok) { + const errorBody = await response.text(); + console.error( + `[youtube-stats] YouTube API error (status ${response.status}): ${errorBody}` + ); + throw new Error( + `YouTube API returned ${response.status}: ${errorBody}` + ); + } + + const json = await response.json(); + for (const item of json.items ?? []) { + statsMap.set(item.id, item.statistics as YouTubeStatistics); + } + } + + return statsMap; +} + +// --------------------------------------------------------------------------- +// 2. syncSanityVideosToSupabase — discover videos from Sanity → Supabase +// --------------------------------------------------------------------------- + +/** + * Queries Sanity for all posts and podcasts that have a YouTube URL, + * then upserts them into the `youtube_stats` Supabase table. + * Returns the count of upserted records. + */ +export async function syncSanityVideosToSupabase(): Promise<{ + discovered: number; + errors: number; +}> { + const sanity = getSanityWriteClient(); + const supabase = getSupabaseAdmin(); + + // Fetch all docs with a youtube field from Sanity + const docs: Array<{ + _id: string; + _type: string; + youtube: string; + }> = await sanity.fetch( + `*[_type in ["post", "podcast"] && defined(youtube)]{_id, _type, youtube}` + ); + + console.log(`[youtube-stats] Discovered ${docs.length} docs with YouTube URLs in Sanity`); + + let discovered = 0; + let errors = 0; + + // Upsert in batches to avoid overwhelming Supabase + const batchSize = 200; + for (let i = 0; i < docs.length; i += batchSize) { + const batch = docs.slice(i, i + batchSize); + const rows = batch + .map((doc) => { + const youtubeId = youtubeParser(doc.youtube); + if (!youtubeId) { + console.warn( + `[youtube-stats] Could not parse YouTube ID from URL: ${doc.youtube} (doc: ${doc._id})` + ); + errors++; + return null; + } + return { + sanity_doc_id: doc._id, + sanity_doc_type: doc._type, + youtube_id: youtubeId, + youtube_url: doc.youtube, + }; + }) + .filter(Boolean); + + if (rows.length === 0) continue; + + const { error } = await supabase.from("youtube_stats").upsert(rows, { + onConflict: "sanity_doc_id", + ignoreDuplicates: false, + }); + + if (error) { + console.error(`[youtube-stats] Supabase upsert error:`, error); + errors += rows.length; + } else { + discovered += rows.length; + } + } + + console.log( + `[youtube-stats] Sync complete: ${discovered} upserted, ${errors} errors` + ); + return { discovered, errors }; +} + +// --------------------------------------------------------------------------- +// 3. fetchAndStoreYouTubeStats — poll YouTube API → store in Supabase +// --------------------------------------------------------------------------- + +/** + * Fetches the oldest/never-fetched records from Supabase, calls the YouTube + * API for their stats, and updates the Supabase rows. + * Returns the count of updated records. + */ +export async function fetchAndStoreYouTubeStats( + batchSize: number = 50 +): Promise<{ fetched: number; errors: number }> { + const supabase = getSupabaseAdmin(); + + // Get the oldest-fetched (or never-fetched) records + const { data: rows, error: queryError } = await supabase + .from("youtube_stats") + .select("*") + .order("last_fetched_at", { ascending: true, nullsFirst: true }) + .limit(batchSize); + + if (queryError) { + console.error(`[youtube-stats] Error querying Supabase:`, queryError); + throw new Error(`Supabase query error: ${queryError.message}`); + } + + if (!rows || rows.length === 0) { + console.log("[youtube-stats] No records to fetch stats for"); + return { fetched: 0, errors: 0 }; + } + + console.log(`[youtube-stats] Fetching YouTube stats for ${rows.length} records`); + + // Collect unique video IDs + const videoIds = [...new Set(rows.map((r: YouTubeStatsRow) => r.youtube_id))]; + + let statsMap: Map; + try { + statsMap = await getYouTubeStats(videoIds); + } catch (err) { + console.error("[youtube-stats] Failed to fetch YouTube stats:", err); + throw err; + } + + console.log(`[youtube-stats] Got stats for ${statsMap.size} videos from YouTube API`); + + let fetched = 0; + let errors = 0; + const now = new Date().toISOString(); + + for (const row of rows as YouTubeStatsRow[]) { + const stats = statsMap.get(row.youtube_id); + if (!stats) { + console.warn( + `[youtube-stats] No stats returned for youtube_id=${row.youtube_id} (sanity_doc_id=${row.sanity_doc_id})` + ); + // Still update last_fetched_at so we don't keep retrying immediately + await supabase + .from("youtube_stats") + .update({ last_fetched_at: now }) + .eq("id", row.id); + errors++; + continue; + } + + const { error: updateError } = await supabase + .from("youtube_stats") + .update({ + view_count: parseInt(stats.viewCount, 10) || 0, + like_count: parseInt(stats.likeCount, 10) || 0, + comment_count: parseInt(stats.commentCount, 10) || 0, + favorite_count: parseInt(stats.favoriteCount, 10) || 0, + last_fetched_at: now, + }) + .eq("id", row.id); + + if (updateError) { + console.error( + `[youtube-stats] Error updating row ${row.id}:`, + updateError + ); + errors++; + } else { + fetched++; + } + } + + console.log( + `[youtube-stats] Fetch complete: ${fetched} updated, ${errors} errors` + ); + return { fetched, errors }; +} + +// --------------------------------------------------------------------------- +// 4. pushStatsToSanity — push updated stats from Supabase → Sanity +// --------------------------------------------------------------------------- + +/** + * Finds Supabase records where stats have been fetched but not yet synced + * to Sanity, then patches the corresponding Sanity documents. + * Returns the count of synced records. + */ +export async function pushStatsToSanity( + batchSize: number = 50 +): Promise<{ synced: number; errors: number }> { + const supabase = getSupabaseAdmin(); + const sanity = getSanityWriteClient(); + + // Find records that need syncing: + // last_fetched_at is set AND (last_synced_to_sanity_at is null OR last_fetched_at > last_synced_to_sanity_at) + // PostgREST can't compare two columns directly, so we use .rpc() with a raw SQL function. + // Fallback: query all fetched records and filter in JS (fine for batch sizes ≤ 200). + const { data: allFetched, error: queryError } = await supabase + .from("youtube_stats") + .select("*") + .not("last_fetched_at", "is", null) + .order("last_fetched_at", { ascending: false }) + .limit(batchSize * 2); + + if (queryError) { + console.error(`[youtube-stats] Error querying Supabase for sync:`, queryError); + throw new Error(`Supabase query error: ${queryError.message}`); + } + + // Filter in JS: needs sync if never synced, or if fetched after last sync + const rows = (allFetched ?? []).filter((row: YouTubeStatsRow) => { + if (!row.last_synced_to_sanity_at) return true; + return new Date(row.last_fetched_at!) > new Date(row.last_synced_to_sanity_at); + }).slice(0, batchSize); + + if (queryError) { + console.error(`[youtube-stats] Error querying Supabase for sync:`, queryError); + throw new Error(`Supabase query error: ${queryError.message}`); + } + + if (!rows || rows.length === 0) { + console.log("[youtube-stats] No records need syncing to Sanity"); + return { synced: 0, errors: 0 }; + } + + console.log(`[youtube-stats] Pushing stats to Sanity for ${rows.length} records`); + + let synced = 0; + let errors = 0; + + for (const row of rows as YouTubeStatsRow[]) { + try { + await sanity + .patch(row.sanity_doc_id) + .set({ + "statistics.youtube.viewCount": row.view_count, + "statistics.youtube.likeCount": row.like_count, + "statistics.youtube.commentCount": row.comment_count, + "statistics.youtube.favoriteCount": row.favorite_count, + }) + .commit({ visibility: "async" }); + + // Mark as synced in Supabase + const now = new Date().toISOString(); + const { error: updateError } = await supabase + .from("youtube_stats") + .update({ last_synced_to_sanity_at: now }) + .eq("id", row.id); + + if (updateError) { + console.error( + `[youtube-stats] Error updating sync timestamp for row ${row.id}:`, + updateError + ); + // The Sanity patch succeeded, so we count it but note the tracking error + errors++; + } + + synced++; + } catch (err) { + console.error( + `[youtube-stats] Error patching Sanity doc ${row.sanity_doc_id}:`, + err + ); + errors++; + } + } + + console.log( + `[youtube-stats] Sync to Sanity complete: ${synced} synced, ${errors} errors` + ); + return { synced, errors }; +} diff --git a/sanity.config.ts b/sanity.config.ts index f86171a9..eba1a1c3 100644 --- a/sanity.config.ts +++ b/sanity.config.ts @@ -46,7 +46,6 @@ import settings from "@/sanity/schemas/singletons/settings"; import sponsor from "@/sanity/schemas/documents/sponsor"; import sponsorshipRequest from "@/sanity/schemas/documents/sponsorshipRequest"; -import youtubeUpdateTask from "@/sanity/schemas/documents/youtubeUpdateTask"; import contentIdea from "@/sanity/schemas/documents/contentIdea"; import automatedVideo from "@/sanity/schemas/documents/automatedVideo"; import sponsorLead from "@/sanity/schemas/documents/sponsorLead"; @@ -149,7 +148,6 @@ export default defineConfig({ podcastType, post, sponsor, - youtubeUpdateTask, previewSession, sponsorshipRequest, contentIdea, diff --git a/sanity/schemas/documents/youtubeUpdateTask.ts b/sanity/schemas/documents/youtubeUpdateTask.ts deleted file mode 100644 index 7ed46e5a..00000000 --- a/sanity/schemas/documents/youtubeUpdateTask.ts +++ /dev/null @@ -1,56 +0,0 @@ -// /sanity/schemas/documents/youtubeUpdateTask.ts -import { defineField, defineType } from "sanity"; - -export default defineType({ - name: "youtubeUpdateTask", - title: "YouTube Update Task", - type: "document", - fields: [ - defineField({ - name: "targetDoc", - title: "Target Document", - type: "reference", - to: [{ type: "post" }, { type: "podcast" }], - validation: (Rule) => Rule.required(), - }), - defineField({ - name: "status", - title: "Status", - type: "string", - options: { - list: [ - { title: "Pending", value: "pending" }, - { title: "In Progress", value: "inProgress" }, - { title: "Completed", value: "completed" }, - { title: "Error", value: "error" }, - ], - layout: "dropdown", - }, - initialValue: "pending", - }), - defineField({ - name: "lastChecked", - title: "Last Checked", - type: "datetime", - }), - defineField({ - name: "errorMessage", - title: "Error Message", - type: "text", - hidden: ({ parent }) => parent?.status !== "error", - }), - ], - preview: { - select: { - title: "targetDoc.title", - status: "status", - lastChecked: "lastChecked", - }, - prepare({ title, status, lastChecked }) { - return { - title: title ? `Update: ${title}` : "YouTube Update Task", - subtitle: `${status || "pending"}${lastChecked ? " | " + new Date(lastChecked).toLocaleString() : ""}`, - }; - }, - }, -}); diff --git a/sanity/types.ts b/sanity/types.ts index 12c4bbe8..bd9dbbae 100644 --- a/sanity/types.ts +++ b/sanity/types.ts @@ -229,18 +229,6 @@ export type PodcastReference = { [internalGroqTypeReferenceTo]?: "podcast"; }; -export type YoutubeUpdateTask = { - _id: string; - _type: "youtubeUpdateTask"; - _createdAt: string; - _updatedAt: string; - _rev: string; - targetDoc?: PostReference | PodcastReference; - status?: "pending" | "inProgress" | "completed" | "error"; - lastChecked?: string; - errorMessage?: string; -}; - export type CourseReference = { _ref: string; _type: "reference"; @@ -2017,7 +2005,6 @@ export type AllSanitySchemaTypes = | PreviewSession | PostReference | PodcastReference - | YoutubeUpdateTask | CourseReference | PageReference | Sponsor diff --git a/supabase/migrations/001_youtube_stats.sql b/supabase/migrations/001_youtube_stats.sql new file mode 100644 index 00000000..183a6289 --- /dev/null +++ b/supabase/migrations/001_youtube_stats.sql @@ -0,0 +1,50 @@ +-- Migration: Create youtube_stats table for tracking YouTube video statistics +-- This replaces the Sanity-based youtubeUpdateTask queue with a Supabase table + +CREATE TABLE IF NOT EXISTS youtube_stats ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + sanity_doc_id text NOT NULL UNIQUE, + sanity_doc_type text NOT NULL, + youtube_id text NOT NULL, + youtube_url text, + view_count bigint DEFAULT 0, + like_count bigint DEFAULT 0, + comment_count bigint DEFAULT 0, + favorite_count bigint DEFAULT 0, + last_fetched_at timestamptz, + last_synced_to_sanity_at timestamptz, + created_at timestamptz DEFAULT now(), + updated_at timestamptz DEFAULT now() +); + +-- Index on youtube_id for lookups by video ID +CREATE INDEX IF NOT EXISTS idx_youtube_stats_youtube_id ON youtube_stats (youtube_id); + +-- Index on last_fetched_at for finding stale/never-fetched records +CREATE INDEX IF NOT EXISTS idx_youtube_stats_last_fetched_at ON youtube_stats (last_fetched_at NULLS FIRST); + +-- Trigger function to auto-update updated_at on row modification +CREATE OR REPLACE FUNCTION update_youtube_stats_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = now(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trg_youtube_stats_updated_at + BEFORE UPDATE ON youtube_stats + FOR EACH ROW + EXECUTE FUNCTION update_youtube_stats_updated_at(); + +-- RPC function to get records that need syncing to Sanity +-- (where last_fetched_at > last_synced_to_sanity_at, or never synced) +CREATE OR REPLACE FUNCTION get_unsynced_youtube_stats(batch_limit int DEFAULT 50) +RETURNS SETOF youtube_stats AS $$ + SELECT * + FROM youtube_stats + WHERE last_fetched_at IS NOT NULL + AND (last_synced_to_sanity_at IS NULL OR last_fetched_at > last_synced_to_sanity_at) + ORDER BY last_fetched_at DESC + LIMIT batch_limit; +$$ LANGUAGE sql STABLE; From ac8a6bff97e6ce38ab04f997e7c05d6bb1b83b0d Mon Sep 17 00:00:00 2001 From: Miriad Date: Tue, 3 Mar 2026 18:18:45 +0000 Subject: [PATCH 2/3] chore: remove cleanup route (youtubeUpdateTask data already purged) Co-authored-by: youtubeviews --- app/api/youtube/cleanup/route.tsx | 70 ------------------------------- 1 file changed, 70 deletions(-) delete mode 100644 app/api/youtube/cleanup/route.tsx diff --git a/app/api/youtube/cleanup/route.tsx b/app/api/youtube/cleanup/route.tsx deleted file mode 100644 index a9765627..00000000 --- a/app/api/youtube/cleanup/route.tsx +++ /dev/null @@ -1,70 +0,0 @@ -/** - * One-time cleanup script: Delete all youtubeUpdateTask documents from Sanity. - * - * Run via: POST /api/youtube/cleanup - * Auth: Bearer CRON_SECRET - * - * DELETE THIS FILE after running it once. - */ -export const fetchCache = "force-no-store"; - -import { createClient } from "next-sanity"; -import type { NextRequest } from "next/server"; -import { apiVersion, dataset, projectId } from "@/sanity/lib/api"; - -const sanityWriteClient = createClient({ - projectId, - dataset, - apiVersion, - token: process.env.SANITY_API_TOKEN || process.env.SANITY_API_WRITE_TOKEN, - perspective: "published", - useCdn: false, -}); - -export async function POST(request: NextRequest) { - const authHeader = request.headers.get("authorization"); - if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { - return new Response("Unauthorized", { status: 401 }); - } - - try { - // Count existing tasks - const count = await sanityWriteClient.fetch( - `count(*[_type == "youtubeUpdateTask"])` - ); - console.log(`[CLEANUP] Found ${count} youtubeUpdateTask documents to delete`); - - if (count === 0) { - return Response.json({ success: true, deleted: 0, message: "No documents to delete" }); - } - - // Delete in batches of 100 - let totalDeleted = 0; - while (true) { - const tasks: { _id: string }[] = await sanityWriteClient.fetch( - `*[_type == "youtubeUpdateTask"][0...100]{ _id }` - ); - - if (!tasks || tasks.length === 0) break; - - // Use transaction for batch delete - const tx = sanityWriteClient.transaction(); - for (const task of tasks) { - tx.delete(task._id); - } - await tx.commit({ visibility: "async" }); - - totalDeleted += tasks.length; - console.log(`[CLEANUP] Deleted batch of ${tasks.length} (total: ${totalDeleted})`); - } - - console.log(`[CLEANUP] Done. Deleted ${totalDeleted} youtubeUpdateTask documents.`); - return Response.json({ success: true, deleted: totalDeleted }); - } catch (error) { - console.error("[CLEANUP] Error:", error); - return Response.json( - { success: false, error: String(error) }, - { status: 500 } - ); - } -} From d122920cb057f088cd9f118527925e9482c2669e Mon Sep 17 00:00:00 2001 From: Miriad Date: Tue, 3 Mar 2026 18:20:01 +0000 Subject: [PATCH 3/3] fix: remove duplicate queryError check causing TypeScript build error Co-authored-by: youtubeviews --- lib/youtube-stats.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/youtube-stats.ts b/lib/youtube-stats.ts index 30481017..7458664b 100644 --- a/lib/youtube-stats.ts +++ b/lib/youtube-stats.ts @@ -303,11 +303,6 @@ export async function pushStatsToSanity( return new Date(row.last_fetched_at!) > new Date(row.last_synced_to_sanity_at); }).slice(0, batchSize); - if (queryError) { - console.error(`[youtube-stats] Error querying Supabase for sync:`, queryError); - throw new Error(`Supabase query error: ${queryError.message}`); - } - if (!rows || rows.length === 0) { console.log("[youtube-stats] No records need syncing to Sanity"); return { synced: 0, errors: 0 };