ETL Refresh Logic for Catalog-Related Data Jobs
Overview
The Publix Mobile App Chat system maintains several catalog-related datasets that must be synchronized between Azure Storage and SQL Database to ensure chat responses remain accurate and up-to-date. Databricks ETL jobs run on scheduled intervals to detect and load new data when available.
Datasets Managed
The refresh logic applies to the following catalog-related datasets:
- Weekly Ad - Current promotional items and pricing
- TPR (Temporary Price Reduction) - Time-limited price discounts
- Store - Store location and operational data
- Digital Coupon - Available digital coupons for customers
- Deals Picked For You - Personalized promotional offers
Data Flow Architecture
Refresh Detection Logic
Watermark-Based Comparison
Each scheduled job determines whether a refresh is required by comparing timestamp watermarks between source and target:
- Query Source: Extract
MAX(DateInserted)from the source storage dataset - Query Target: Extract
MAX(DateInserted)from the corresponding SQL table - Compare Values: Evaluate if source watermark is greater than target watermark
Decision Flow
Full Dataset Snapshot Pattern
Key Principle
Each DateInserted watermark in the source represents a complete snapshot
of the dataset at that point in time. When a refresh is triggered, the job must
load the entire dataset for the discovered watermark—not just incremental
changes.
Why Full Snapshots?
- Ensures data consistency and completeness
- Prevents partial or stale records in SQL
- Simplifies synchronization logic
- Aligns with source data structure
Refresh Behavior by Scenario
| Scenario | Source MAX | Target MAX | Action | Rationale |
|---|---|---|---|---|
| New data available | 2024-01-15 | 2024-01-10 | Refresh | Source has newer snapshot |
| Data is current | 2024-01-10 | 2024-01-10 | Skip | No new data to process |
| Initial load | 2024-01-10 | NULL | Refresh | Target table is empty |
| Source missing data | NULL | 2024-01-10 | Skip | Source has no data to process |
Job Execution Schedule
- Jobs run on a scheduled basis (frequency varies by dataset)
- Each execution performs watermark comparison before processing
- No manual intervention required for standard refresh cycles
- Failed jobs should alert and retry according to Databricks retry policy
Implementation Requirements
Source Storage Requirements
- Data must include a
DateInsertedcolumn/field - Each watermark value represents a full dataset snapshot
- Storage must be accessible to Databricks workspace
Target SQL Requirements
- Table must include a
DateInsertedcolumn - Column must support datetime comparison operations
- Appropriate indexes on
DateInsertedfor query performance
Job Responsibilities
- Connect to source storage container
- Query and identify MAX
DateInsertedfrom source - Connect to target SQL database
- Query and identify MAX
DateInsertedfrom target - Compare watermarks and determine refresh necessity
- If refresh required:
- Extract full dataset for source watermark
- Transform data as needed
- Load complete snapshot into SQL table
- Log execution results and metrics
- Handle errors and alert on failures
Consistency Guarantees
- Atomicity: Each refresh loads a complete dataset or none at all
- Consistency: SQL reflects the full snapshot for a given
DateInserted - Idempotency: Re-running a job with the same watermarks produces identical results
- Isolation: Each dataset refresh operates independently
Monitoring and Observability
Key Metrics to Track
- Last successful refresh timestamp per dataset
- Source vs. target watermark lag
- Refresh execution duration
- Data volume processed per refresh
- Job success/failure rates
Alerting Scenarios
- Refresh job failures
- Watermark lag exceeding threshold (e.g., 24+ hours)
- Data volume anomalies (unexpected spikes or drops)
- SQL load errors or timeouts
Maintenance Considerations
- Watermark Integrity: Ensure
DateInsertedvalues are populated correctly in source data - Schema Evolution: Coordinate changes between storage format and SQL table schema
- Performance Tuning: Monitor query and load performance as data volumes grow
- Retention Policies: Define and implement data retention for historical snapshots
- Disaster Recovery: Maintain backup and recovery procedures for both source and target