-
Notifications
You must be signed in to change notification settings - Fork 6
feat: GTFS validator task sync #1650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
ca2e5c2
feat: GTFS validator orchestrator — generic task tracker + extended r…
davidgamez a0d1fbf
fix: correct test_task_execution_tracker failures
davidgamez d26de11
fix lint
davidgamez 3f778bc
fix lint
davidgamez 9f1bb41
refactor: make bypass_db_update explicit in rebuild_missing_validatio…
davidgamez ba7a93b
split parameters
davidgamez b7038bd
refactor: make filter_after_in_days optional with no default
davidgamez 6d962a9
feat: add dispatch_complete indicator to validation run status
davidgamez bdc97fd
docs: update validation reports README with new params and pre-releas…
davidgamez bfac479
docs: fix BigQuery ingestion trigger command
davidgamez a1cd68a
feat: add filter_op_statuses param to rebuild_missing_validation_reports
davidgamez fba0131
docs: add filter_op_statuses to payload example in README
davidgamez f26a95b
perf: apply limit before GCS blob check in rebuild_missing_validation…
davidgamez 84fd922
perf: stop GCS blob checks early once limit is reached
davidgamez 75feb89
fix: use 'metadata' column name in on_conflict_do_update set_ dict
davidgamez 2a88475
fix converter permissions
davidgamez 22e0feb
Revert "fix converter permissions"
davidgamez 7aa011c
replace get_bucket with bucket that requires less permissions
davidgamez 5b633e6
fix unit tests
davidgamez 05714e3
feat: replace get_validation_run_status with generic self-scheduling …
davidgamez e62ef12
refactor: use Cloud Tasks retry (503) instead of self-scheduling in s…
davidgamez d7e9a2a
apply lint
davidgamez ad20ec7
refactor: rename _task_run_id to task_run_id (public attribute)
davidgamez c364cc8
feat: add get_task_run_status read-only task
davidgamez 5d5a0d9
fix lint
davidgamez 71891d0
fix: delay initial sync_task_run_status fire by 10 minutes
davidgamez d69932f
feat: add total_already_tracked to dry run response
davidgamez f9ff073
fix import
davidgamez c46450a
chore: remove update_validation_report — superseded by rebuild_missin…
davidgamez 0bc17f8
update docs
davidgamez e6d1085
Merge branch 'main' into feat/gtfs-validators-orchestrator
davidgamez b4766ba
Merge branch 'main' into feat/gtfs-validators-orchestrator
davidgamez 869cfc7
Merge branch 'main' into feat/gtfs-validators-orchestrator
davidgamez df72000
fix lint
davidgamez 9a19d71
fix failing tests
davidgamez c7fb554
change table name in file, remove bypass_db_update from db table and …
davidgamez 64f2342
fx rename
davidgamez e932a6e
revert table name change
davidgamez e93e1cb
Merge branch 'main' into feat/gtfs-validators-orchestrator
davidgamez adf8bce
add on delete cascade
davidgamez d2f7f02
fix queue name
davidgamez fe1113a
reset task status on retry
davidgamez File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
File renamed without changes.
458 changes: 458 additions & 0 deletions
458
functions-python/helpers/task_execution/task_execution_tracker.py
Large diffs are not rendered by default.
Oops, something went wrong.
203 changes: 203 additions & 0 deletions
203
functions-python/helpers/tests/test_task_execution_tracker.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,203 @@ | ||
| # | ||
| # MobilityData 2026 | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| import unittest | ||
| import uuid | ||
| from datetime import datetime, timezone | ||
| from unittest.mock import MagicMock | ||
|
|
||
| from task_execution.task_execution_tracker import ( | ||
| TaskExecutionTracker, | ||
| STATUS_IN_PROGRESS, | ||
| STATUS_TRIGGERED, | ||
| STATUS_COMPLETED, | ||
| STATUS_FAILED, | ||
| ) | ||
|
|
||
|
|
||
| def _make_tracker(task_name="test_task", run_id="v1.0"): | ||
| """Return a tracker with a mock DB session.""" | ||
| session = MagicMock() | ||
| tracker = TaskExecutionTracker( | ||
| task_name=task_name, run_id=run_id, db_session=session | ||
| ) | ||
| return tracker, session | ||
|
|
||
|
|
||
| class TestTaskExecutionTrackerStartRun(unittest.TestCase): | ||
| def test_start_run_upserts_task_run(self): | ||
| tracker, session = _make_tracker() | ||
| run_uuid = uuid.uuid4() | ||
| execute_result = MagicMock() | ||
| execute_result.scalar_one.return_value = run_uuid | ||
| session.execute.return_value = execute_result | ||
|
|
||
| result = tracker.start_run(total_count=100, params={"env": "staging"}) | ||
|
|
||
| self.assertEqual(result, run_uuid) | ||
| self.assertEqual(tracker.task_run_id, run_uuid) | ||
| session.execute.assert_called_once() | ||
| session.flush.assert_called_once() | ||
|
|
||
| def test_start_run_caches_task_run_id(self): | ||
| tracker, session = _make_tracker() | ||
| run_uuid = uuid.uuid4() | ||
| execute_result = MagicMock() | ||
| execute_result.scalar_one.return_value = run_uuid | ||
| session.execute.return_value = execute_result | ||
|
|
||
| tracker.start_run(total_count=10) | ||
| tracker.start_run(total_count=20) # second call | ||
|
|
||
| self.assertEqual(tracker.task_run_id, run_uuid) | ||
|
|
||
| def test_start_run_resets_status_to_in_progress_on_rerun(self): | ||
| """Re-running the same task_name/run_id must reset status and completed_at on conflict.""" | ||
| tracker, session = _make_tracker() | ||
| run_uuid = uuid.uuid4() | ||
| execute_result = MagicMock() | ||
| execute_result.scalar_one.return_value = run_uuid | ||
| session.execute.return_value = execute_result | ||
|
|
||
| tracker.start_run(total_count=5) | ||
|
|
||
| stmt_compiled = str(session.execute.call_args[0][0]) | ||
| # The ON CONFLICT DO UPDATE clause must include status and completed_at | ||
| self.assertIn("DO UPDATE SET", stmt_compiled) | ||
| self.assertIn("status", stmt_compiled) | ||
| self.assertIn("completed_at", stmt_compiled) | ||
|
|
||
|
|
||
| class TestTaskExecutionTrackerIsTriggered(unittest.TestCase): | ||
| def test_returns_true_when_triggered_row_exists(self): | ||
| tracker, session = _make_tracker() | ||
| existing_row = MagicMock() | ||
| session.query.return_value.filter.return_value.filter.return_value.first.return_value = ( | ||
| existing_row | ||
| ) | ||
|
|
||
| result = tracker.is_triggered("ds-123") | ||
| self.assertTrue(result) | ||
|
|
||
| def test_returns_false_when_no_row(self): | ||
| tracker, session = _make_tracker() | ||
| session.query.return_value.filter.return_value.filter.return_value.first.return_value = ( | ||
| None | ||
| ) | ||
|
|
||
| result = tracker.is_triggered("ds-999") | ||
| self.assertFalse(result) | ||
|
|
||
| def test_handles_none_entity_id(self): | ||
| tracker, session = _make_tracker() | ||
| session.query.return_value.filter.return_value.filter.return_value.first.return_value = ( | ||
| None | ||
| ) | ||
|
|
||
| result = tracker.is_triggered(None) | ||
| self.assertFalse(result) | ||
|
|
||
|
|
||
| class TestTaskExecutionTrackerMarkTriggered(unittest.TestCase): | ||
| def test_mark_triggered_inserts_execution_log(self): | ||
| tracker, session = _make_tracker() | ||
| tracker.task_run_id = uuid.uuid4() | ||
|
|
||
| tracker.mark_triggered("ds-1", execution_ref="projects/x/executions/abc") | ||
|
|
||
| session.execute.assert_called_once() | ||
| session.flush.assert_called_once() | ||
|
|
||
| def test_mark_triggered_with_metadata(self): | ||
| tracker, session = _make_tracker() | ||
| tracker.task_run_id = uuid.uuid4() | ||
|
|
||
| tracker.mark_triggered("ds-1", metadata={"feed_id": "f-1"}) | ||
|
|
||
| session.execute.assert_called_once() | ||
|
|
||
|
|
||
| class TestTaskExecutionTrackerMarkCompleted(unittest.TestCase): | ||
| def test_mark_completed_updates_status(self): | ||
| tracker, session = _make_tracker() | ||
| query_mock = MagicMock() | ||
| session.query.return_value.filter.return_value.filter.return_value = query_mock | ||
|
|
||
| tracker.mark_completed("ds-1") | ||
|
|
||
| query_mock.update.assert_called_once() | ||
| update_args = query_mock.update.call_args[0][0] | ||
| self.assertEqual(update_args["status"], STATUS_COMPLETED) | ||
| self.assertIn("completed_at", update_args) | ||
|
|
||
|
|
||
| class TestTaskExecutionTrackerMarkFailed(unittest.TestCase): | ||
| def test_mark_failed_sets_error_message(self): | ||
| tracker, session = _make_tracker() | ||
| query_mock = MagicMock() | ||
| session.query.return_value.filter.return_value.filter.return_value = query_mock | ||
|
|
||
| tracker.mark_failed("ds-1", error_message="Workflow timed out") | ||
|
|
||
| query_mock.update.assert_called_once() | ||
| update_args = query_mock.update.call_args[0][0] | ||
| self.assertEqual(update_args["status"], STATUS_FAILED) | ||
| self.assertEqual(update_args["error_message"], "Workflow timed out") | ||
|
|
||
|
|
||
| class TestTaskExecutionTrackerGetSummary(unittest.TestCase): | ||
| def _make_task_run(self, status=STATUS_IN_PROGRESS, total_count=10): | ||
| run = MagicMock() | ||
| run.status = status | ||
| run.total_count = total_count | ||
| run.created_at = datetime.now(timezone.utc) | ||
| return run | ||
|
|
||
| def test_returns_none_summary_when_no_run(self): | ||
| tracker, session = _make_tracker() | ||
| session.query.return_value.filter.return_value.first.return_value = None | ||
| session.query.return_value.filter.return_value.all.return_value = [] | ||
|
|
||
| summary = tracker.get_summary() | ||
|
|
||
| self.assertIsNone(summary["run_status"]) | ||
| self.assertEqual(summary["triggered"], 0) | ||
| self.assertEqual(summary["completed"], 0) | ||
|
|
||
| def test_counts_by_status(self): | ||
| tracker, session = _make_tracker() | ||
| task_run = self._make_task_run(total_count=5) | ||
|
|
||
| rows = [ | ||
| MagicMock(status=STATUS_TRIGGERED), | ||
| MagicMock(status=STATUS_TRIGGERED), | ||
| MagicMock(status=STATUS_COMPLETED), | ||
| MagicMock(status=STATUS_FAILED), | ||
| ] | ||
|
|
||
| def query_side_effect(*args): | ||
| m = MagicMock() | ||
| m.filter.return_value.first.return_value = task_run | ||
| m.filter.return_value.all.return_value = rows | ||
| return m | ||
|
|
||
| session.query.side_effect = query_side_effect | ||
|
|
||
| summary = tracker.get_summary() | ||
| self.assertEqual(summary["triggered"], 2) | ||
| self.assertEqual(summary["completed"], 1) | ||
| self.assertEqual(summary["failed"], 1) | ||
| self.assertEqual(summary["pending"], 1) # 5 total - 4 processed |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.