Skip to content

Feat/cal itp import#1670

Open
ianktc wants to merge 6 commits into
mainfrom
feat/cal-itp-import
Open

Feat/cal itp import#1670
ianktc wants to merge 6 commits into
mainfrom
feat/cal-itp-import

Conversation

@ianktc
Copy link
Copy Markdown
Contributor

@ianktc ianktc commented Apr 24, 2026

Summary:

Closes #1642

This pull request introduces support for importing data from Cal-ITP into the system involving an import handler and its tests.

Cal-ITP Import Feature:

functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.py contains import handler and associated import logic
functions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sql is the CKAN API SQL query to retrieve feeds from Cal-ITP
functions-python/tasks_executor/tests/tasks/data_import/cal_itp/test_cal_itp_import.py contains the associated unit and e2e tests
infra/functions-python/main.tf includes the Google Cloud Scheduler job to run the import monthly
functions-python/tasks_executor/src/main.py includes the handler to the task list

Out of scope:
Redirecting MDB feeds to new Cal-ITP: the redirect and csv defining redirect links will be included in follow up PR
Include licensing for Cal-ITP feeds (follow up PR after confirmation with Cal-ITP)

Cal-ITP Import — Execution Flow & Design Doc

Based on PR #1670 (feat/cal-itp-import)


Overview

The Cal-ITP import pipeline fetches GTFS schedule and real-time feeds from the California Integrated Travel Project (Cal-ITP) CKAN API and upserts them into the Mobility Feed API database. It runs as a scheduled HTTP Cloud Function (tasks_executor), triggered monthly by Cloud Scheduler, and fans out to dataset download and web revalidation tasks on completion.


Architecture Diagram

Cloud Scheduler (3 AM UTC, 1st of month)
  │  POST {"task": "cal_itp_import", "payload": {"dry_run": false}}
  ▼
tasks_executor  (Cloud Function — 8 GiB, 1000s timeout, Python 3.11)
  │
  ├─ main.py: tasks_executor()          ← HTTP entry point, routes by "task" key
  │
  └─ import_cal_itp_handler()           ← parses dry_run flag, calls orchestrator
       │
       └─ _import_cal_itp()             ← @with_db_session, core logic
            │
            ├─ _fetch_cal_itp_datasets()           → Cal-ITP CKAN API (SQL query)
            ├─ _filter_cal_itp_records()            → Bay Area 511 + customer-facing filter
            ├─ _process_cal_itp_dataset() × N       → per-dataset upsert (batched)
            ├─ _deprecate_stale_feeds()             → mark unseen cal_itp feeds deprecated
            └─ commit_changes()
                 ├─ db_session.commit()
                 ├─ Pub/Sub → datasets-batch-topic  (trigger dataset download per new feed)
                 └─ Cloud Tasks → web_revalidation_task_queue  (revalidate changed feeds)

Step-by-Step Execution Flow

1. Cloud Scheduler Trigger

Terraform resource: google_cloud_scheduler_job.cal_itp_import_schedule
(infra/functions-python/main.tf ~line 564)

Property Value
Schedule 0 0 3 * * — 3 AM UTC, monthly (1st of each month)
Target HTTP POST → tasks_executor Cloud Function URL
Auth OIDC token from functions_service_account
Body {"task": "cal_itp_import", "payload": {"dry_run": false}}
Attempt deadline 320 seconds
Active in prod Yes (paused in lower environments)

2. Cloud Function — tasks_executor

Terraform resource: google_cloudfunctions2_function.tasks_executor
(infra/functions-python/main.tf ~line 1090)

Property Value
Entry point tasks_executor (in main.py)
Memory 8 GiB
Timeout 1000 seconds
Max instances 200
Max concurrency per instance 1
VPC Private ranges only via vpc-connector
Secrets FEEDS_DATABASE_URL, FEEDS_CREDENTIALS, WEB_APP_REVALIDATE_SECRET

Key env vars set by Terraform:

  • DATASET_PROCESSING_TOPIC_NAMEdatasets-batch-topic-{env} (Pub/Sub)
  • WEB_REVALIDATION_QUEUE → Cloud Tasks queue name
  • WEB_APP_REVALIDATE_URL → web app revalidation endpoint
  • PROJECT_ID, ENVIRONMENT, SERVICE_ACCOUNT_EMAIL

3. HTTP Router — main.py:tasks_executor()

The function parses request.get_json() for a "task" key and dispatches to the registered handler:

tasks = {
    "cal_itp_import": {"handler": import_cal_itp_handler, ...},
    # + 12 other tasks (tdg_import, jbda_import, revalidate_feed, ...)
}
handler = tasks[task]["handler"]
result = handler(payload=payload)

For unknown tasks → HTTP 400. For handler exceptions → HTTP 500.


4. import_cal_itp_handler(payload)

File: import_cal_itp_feeds.py

  • Parses dry_run from payload (default: True)
  • Calls _import_cal_itp(dry_run=dry_run)
  • Logs and returns summary dict:
    {
      "message": "Cal-ITP import executed successfully.",
      "created_gtfs": 12,
      "updated_gtfs": 5,
      "created_rt": 8,
      "total_processed_items": 120,
      "params": {"dry_run": false}
    }

5. _import_cal_itp(db_session, dry_run) — Orchestrator

Decorated with @with_db_session (manages SQLAlchemy session lifecycle).

1. _fetch_cal_itp_datasets()        → raw list of dataset dicts from CKAN
2. _filter_cal_itp_records()        → filtered list (Bay Area + customer-facing rules)
3. for each dataset:
     _process_cal_itp_dataset()     → upsert feeds, accumulate changed IDs
     if batch boundary crossed:
       commit_changes() (partial)
4. _deprecate_stale_feeds()         → mark feeds not seen this run as deprecated
5. commit_changes() (final)
6. if dry_run: db_session.rollback() and skip all triggers

Batch size is controlled by COMMIT_BATCH_SIZE env var (default: 5).


6. Data Fetching — _fetch_cal_itp_datasets()

  • Endpoint: https://data.ca.gov/api/3/action/datastore_search_sql?sql=<encoded>
  • SQL query: ckan_query.sql — joins 4 CKAN datasets:
Dataset UUID Content
gtfs_datasets e4ca5bd4-... Feed URLs, entity types
services dbacfa9f-... Service / agency metadata
provider_gtfs_data ebe116fb-... Customer-facing flag, regional type
organizations 677e1271-... Caltrans district name
  • Filters: is_public = 'Yes' AND at least one feed URL present
  • Returns: List of flat dicts containing service metadata + all feed URLs for that service

7. Record Filtering — _filter_cal_itp_records()

Records are grouped by service_source_record_id. For each group:

Bay Area 511 services (detected by "Bay Area 511 Regional" in any name column):

  • Apply priority-based deduplication:
    1. Regional Precursor Feed (preferred)
    2. Regional Subfeed
    3. Combined Regional Feed
    4. If none match → keep all

All other services:

  • Keep only records where gtfs_service_data_customer_facing == true/yes/1

8. Per-Dataset Processing — _process_cal_itp_dataset()

For each filtered dataset record:

a. Resource Expansion

Expand one dataset dict into 1–4 resource dicts:

  • 1× GTFS Schedule (if schedule_dataset_url present)
  • Up to 3× GTFS-RT (trip_updates, vehicle_positions, service_alerts)
    • Each gets "entity_type": ["{rt_type}"] (a single-element list)

Resources are sorted: schedule first, then RT feeds.

b. Validation — _validate_required_cal_itp_fields()

For each resource, validates required fields exist and are non-empty:

  • Schedule: schedule_source_record_id, schedule_gtfs_dataset_name, schedule_dataset_url
  • RT: {type}_source_record_id, {type}_gtfs_dataset_name, {type}_dataset_url

Raises InvalidCalItpFeedError on failure; resource is skipped.

c. Stable ID Generation

cal_itp-{service_source_record_id}-{type_code}

Type codes: s (schedule), tu (trip updates), vp (vehicle positions), sa (service alerts)

d. Location Mapping — _get_cal_itp_locations()

Maps caltrans_district_nameLocation DB row:

  • Country: United States (hardcoded)
  • State: California (hardcoded)
  • City: caltrans_district_name

e. GTFS Schedule Feed Processing

  1. HEAD probe via _probe_head_format() — verify URL returns a ZIP
  2. _delete_and_recreate_feed_if_type_changed() — handles type conflicts (delete + flush + recreate)
  3. Fingerprint comparison:
    • API fingerprint: (stable_id, feed_name, provider, producer_url)
    • DB fingerprint: same fields read from existing row
    • If equal → skip all writes (no-op)
  4. Update common fields: feed_name, provider, producer_url, operational_status, locations
  5. _ensure_cal_itp_external_id() — ensure Externalid row exists for this feed

f. GTFS-RT Feed Processing

  1. Same create/update flow as schedule
  2. _get_entity_types_from_resource() — maps RT type string to entity type codes
    • ENTITY_TYPES_MAP: {"trip_updates": "tu", "vehicle_positions": "vp", "service_alerts": "sa"}
  3. get_or_create_entity_type(db_session, et) — upserts Entitytype rows
  4. Links RT feed → schedule feed via static_current_feed reference
  5. RT fingerprint additionally includes static_refs and entity_types

g. Error Handling Per Resource

  • Savepoint created before each resource
  • IntegrityError → rollback to savepoint, log, continue
  • Generic Exception → rollback to savepoint, log, continue

9. Stale Feed Deprecation — _deprecate_stale_feeds()

After all datasets are processed:

  • Query all Feed rows where stable_id LIKE 'cal_itp-%'
  • Any with a stable ID not in processed_stable_ids → set status = "deprecated"
  • Ensures feeds that no longer appear in Cal-ITP data are cleaned up automatically

10. Commit & Downstream Triggers — commit_changes()

db_session.commit()
  │
  ├── for each feed in feeds_to_publish:
  │     trigger_dataset_download(feed, execution_id)
  │       └── Pub/Sub publish → datasets-batch-topic-{env}
  │             payload: {execution_id, producer_url, feed_stable_id, feed_id, ...}
  │
  └── if changed_feed_stable_ids:
        create_web_revalidation_task(changed_feed_stable_ids)
          └── Cloud Tasks enqueue → web_revalidation_task_queue
                payload: {"task": "revalidate_feed", "payload": {"feed_stable_id": "..."}}
                scheduled at next :00 or :30 boundary (30-min deduplication window)

On IntegrityError: rollback, log, re-raise (propagates to caller).


11. Dry Run Mode

When dry_run=True (the default when called without a payload):

  • All DB writes happen in the session but are rolled back at the end
  • No Pub/Sub messages published
  • No Cloud Tasks enqueued
  • Returns the same summary dict so results can be inspected

Key Design Decisions

Decision Rationale
Fingerprint-based diffing Avoids unnecessary DB writes and downstream triggers on unchanged feeds
Savepoint per resource Isolates per-resource failures; one bad feed doesn't abort the whole import
Batched commits (default 5) Balances memory usage vs. DB round-trips for large imports
Stable IDs (cal_itp-{id}-{type}) Enables idempotent upserts and stale detection across runs
Stale deprecation pass Automatically cleans up feeds removed from Cal-ITP without manual intervention
Dry run default Safe to invoke manually/in dev without side effects
Cloud Tasks time bucketing Deduplicates revalidation requests within 30-minute windows to avoid fan-out storms

Entity Types Map

CKAN field Entity type code Description
trip_updates tu GTFS-RT trip updates
vehicle_positions vp GTFS-RT vehicle positions
service_alerts sa GTFS-RT service alerts

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Cal-ITP data import pipeline to the tasks executor, including the import implementation, CKAN query, tests, and a scheduled monthly execution in GCP.

Changes:

  • Introduces Cal-ITP import handler + CKAN SQL query for retrieving feed records.
  • Registers the new cal_itp_import task in the tasks executor and adds unit/e2e tests.
  • Adds a monthly Cloud Scheduler job to invoke the Cal-ITP import task.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
infra/functions-python/main.tf Adds a monthly Cloud Scheduler job to call the tasks executor with cal_itp_import.
functions-python/tasks_executor/src/main.py Registers the new cal_itp_import task and handler.
functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.py Implements Cal-ITP dataset retrieval, filtering, upsert logic, and orchestration/commit hooks.
functions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sql Provides the CKAN datastore SQL used to retrieve Cal-ITP feed records.
functions-python/tasks_executor/tests/tasks/data_import/test_cal_itp_import.py Adds helper/unit tests and an end-to-end DB test for the Cal-ITP import flow.


import logging
import os
import json
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json is imported but not used anywhere in this module. If unused-import linting is enabled, this will fail; please remove it (or use it if intended).

Suggested change
import json

Copilot uses AI. Check for mistakes.
from tasks.data_import.data_import_utils import (
get_or_create_feed,
get_or_create_entity_type,
get_license,
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_license is imported but only referenced in commented-out code. Remove the unused import until license handling is implemented to avoid unused-import lint failures.

Suggested change
get_license,

Copilot uses AI. Check for mistakes.
Comment on lines +547 to +551
self.assertEqual(rt.producer_url, "https://cal-itp.example/tu.pb")

# RT should be linked to the schedule feed
rt_sched_ids = [f.id for f in rt.gtfs_feeds]
self.assertEqual(rt_sched_ids, [sched.id])
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The happy-path import test verifies the RT feed URL and schedule linkage, but it doesn’t assert that the RT feed’s entitytypes were populated (e.g. ["tu"] for trip_updates). Adding that assertion (as done in test_tdg_import.py) would catch regressions where entity_type parsing breaks and RT feeds end up with empty entity types.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +314 to +318
district_name = dataset.get("caltrans_district_name", []) or []
country_name = "United States"
state_province = "California"

locations: List[Any] = []
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

district_name falls back to [], but create_or_get_location(..., city_name=...) expects a string/None and will fail when building the location id (it joins components). Use None (or an empty string) as the fallback instead of a list, and avoid appending a None location to the list.

Copilot uses AI. Check for mistakes.
_raw_resources.append({
**_common_fields,
"format": GTFS_REALTIME,
"entity_type":f"{_rt_type}",
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RT resources are built with entity_type as a string, but _get_entity_types_from_resource iterates that field assuming it’s a list of strings. Iterating a string yields characters, so entity types end up empty and the RT feed won’t get its entitytypes set correctly. Pass a list (e.g. [_rt_type]) or normalize string→list in _get_entity_types_from_resource.

Suggested change
"entity_type":f"{_rt_type}",
"entity_type": [_rt_type],

Copilot uses AI. Check for mistakes.
Comment on lines +347 to +369
service_id = resource.get("service_source_record_id")
res_format = resource.get("format")
if res_format == GTFS_SCHEDULE:
feed_type = 'schedule'
try:
res_id = resource.get("schedule_source_record_id")
res_name = resource.get("schedule_gtfs_dataset_name")
res_url = resource.get("schedule_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
elif res_format == GTFS_REALTIME:
feed_type = next(
(t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")),
None,
)
if feed_type is None:
raise InvalidCalItpFeedError("Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP")
try:
res_id = resource.get(f"{feed_type}_source_record_id")
res_name = resource.get(f"{feed_type}_gtfs_dataset_name")
res_url = resource.get(f"{feed_type}_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_validate_required_cal_itp_fields doesn’t validate that required values are present/non-empty (and dict.get(...) won’t raise, so the try/except blocks won’t catch missing fields). This can allow service_id, res_id, res_name, or res_url to be None and later produce invalid stable_ids / DB rows. Add explicit checks and raise InvalidCalItpFeedError with a clear message when any required field is missing.

Suggested change
service_id = resource.get("service_source_record_id")
res_format = resource.get("format")
if res_format == GTFS_SCHEDULE:
feed_type = 'schedule'
try:
res_id = resource.get("schedule_source_record_id")
res_name = resource.get("schedule_gtfs_dataset_name")
res_url = resource.get("schedule_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
elif res_format == GTFS_REALTIME:
feed_type = next(
(t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")),
None,
)
if feed_type is None:
raise InvalidCalItpFeedError("Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP")
try:
res_id = resource.get(f"{feed_type}_source_record_id")
res_name = resource.get(f"{feed_type}_gtfs_dataset_name")
res_url = resource.get(f"{feed_type}_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
def _get_required_field(field_name: str, context: str) -> str:
value = resource.get(field_name)
if value is None or (isinstance(value, str) and not value.strip()):
raise InvalidCalItpFeedError(
f"Cal-ITP resource is missing required field '{field_name}' for {context}"
)
return value
service_id = _get_required_field("service_source_record_id", "service")
res_format = resource.get("format")
if res_format == GTFS_SCHEDULE:
feed_type = "schedule"
res_id = _get_required_field("schedule_source_record_id", "schedule feed")
res_name = _get_required_field("schedule_gtfs_dataset_name", "schedule feed")
res_url = _get_required_field("schedule_dataset_url", "schedule feed")
elif res_format == GTFS_REALTIME:
feed_type = next(
(t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")),
None,
)
if feed_type is None:
raise InvalidCalItpFeedError(
"Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP"
)
res_id = _get_required_field(
f"{feed_type}_source_record_id", f"realtime {feed_type} feed"
)
res_name = _get_required_field(
f"{feed_type}_gtfs_dataset_name", f"realtime {feed_type} feed"
)
res_url = _get_required_field(
f"{feed_type}_dataset_url", f"realtime {feed_type} feed"
)

Copilot uses AI. Check for mistakes.
Comment on lines +579 to +583
def _process_cal_itp_dataset(
db_session: Session,
session_http: requests.Session,
dataset: dict,
processed_stable_ids: Optional[set] = None,
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processed_stable_ids is declared optional (Optional[set] = None) but later used unconditionally via .add(...). Either make it a required argument (no default) or initialize it to set() when None to avoid an AttributeError if _process_cal_itp_dataset is called directly.

Copilot uses AI. Check for mistakes.
Comment on lines +679 to +683
if res_format == GTFS_SCHEDULE:
# Requirement: if GTFS url returns non zip, skip it
status_code, content_type, detected_format = _probe_head_format(
session_http, res_url
)
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says “skip non zip” but the actual skip logic is commented out below, so non-zip schedule URLs will still be imported. Either enforce the requirement (enable the check) or remove/update the comment and dead code to match current behavior.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Define the sync logic between CAOD and MobilityDatabase

2 participants