A comprehensive data loading module for Databricks that provides parallel file processing with multiple loading strategies, file tracking, and robust error handling. Now with enhanced cluster mode for optimized Databricks operations.
New users should start with the Quickstart guide which explains installation using Poetry and demonstrates the configuration workflow using a small demo. See Pipeline State Management for details on restarting interrupted runs.
- File Monitoring: Automatically discovers and processes new files from configured locations
- File Tracking: Tracks processing status to prevent duplicate processing using Delta tables
- Parallel Processing: Configurable parallel execution for efficient file processing
- Multiple Loading Strategies:
- SCD2 (Slowly Changing Dimensions Type 2): Maintains historical records with change tracking
- Append: Simple append operation for tables without primary keys
- Overwrite: Replace table contents (coming soon)
- Merge: Custom merge logic (coming soon)
- Schema Evolution: Automatic schema evolution support
- Error Handling: Robust error handling with configurable retry logic
- Monitoring: Comprehensive logging and metrics collection
- State Management: Pipeline progress saved to disk for safe restarts
- Reset Command: Easily clear saved state with
reset-state - Optimization: Automatic table optimization and vacuum operations
- 🆕 Cluster Mode: Enhanced Databricks cluster integration with:
- Automatic environment detection and optimization
- Resource monitoring and management
- Unity Catalog support
- Job dependency management
- Cluster-aware performance tuning
# Install dependencies using Poetry
poetry install
# Run with configuration file
poetry run python -m data_loader.main run --config config.yaml# Run with cluster-specific optimizations
poetry run python -m data_loader.main run-cluster --config config.yaml
# Run with Unity Catalog
poetry run python -m data_loader.main run-cluster --config config.yaml --unity-catalog
# Check cluster status
poetry run python -m data_loader.main cluster-status --config config.yamlSeveral example scripts are included in this repository. Execute them using
poetry run to ensure all dependencies are loaded from the virtual environment.
# Basic configuration driven demo
poetry run python demo/run_demo.py
# Demonstrate configuration merging
poetry run python demo/config_merge_demo.py
# Showcase pipeline state management
poetry run python demo/state_management_demo.py
# Reset pipeline state
poetry run python demo/reset_state_demo.py
# Full example usage script
poetry run python example_usage.py
# Cluster mode demonstration
poetry run python cluster_demo.pydata_loader/
├── config/ # Configuration management
│ ├── table_config.py # Table and loading strategy configuration
│ └── databricks_config.py # Databricks-specific settings
├── core/ # Core processing components
│ ├── file_tracker.py # File processing status tracking
│ ├── processor.py # Main orchestrator
│ └── parallel_executor.py # Parallel processing framework
├── cluster/ # 🆕 Cluster mode components
│ ├── cluster_config.py # Environment detection and configuration
│ ├── cluster_processor.py # Cluster-optimized processor
│ ├── resource_manager.py # Resource monitoring and optimization
│ └── job_orchestrator.py # Dependency and workflow management
├── strategies/ # Loading strategy implementations
│ ├── base_strategy.py # Base strategy interface
│ ├── scd2_strategy.py # SCD2 implementation
│ └── append_strategy.py # Append strategy implementation
├── utils/ # Utility functions
│ ├── logger.py # Logging utilities
│ └── helpers.py # Helper functions
└── main.py # Entry point for Databricks jobs
- Install dependencies with Poetry:
poetry install- (Optional) Activate the virtual environment created by Poetry:
poetry shellThe data loader uses YAML configuration files to define tables, loading strategies, and processing options.
You can load a configuration file programmatically using
load_config_from_file:
from data_loader.config import load_config_from_file
config = load_config_from_file("path/to/config.yaml")raw_data_path: /mnt/raw/
processed_data_path: /mnt/processed/
checkpoint_path: /mnt/checkpoints/
file_tracker_table: file_processing_tracker
file_tracker_database: metadata
max_parallel_jobs: 4
retry_attempts: 3
timeout_minutes: 60
log_level: INFO
enable_metrics: true
tables:
- table_name: customers
database_name: analytics
source_path_pattern: /mnt/raw/customers/*.parquet
loading_strategy: scd2
primary_keys:
- customer_id
tracking_columns:
- name
- email
- address
file_format: parquet
schema_evolution: true
partition_columns:
- date_partition
- table_name: transactions
database_name: analytics
source_path_pattern: /mnt/raw/transactions/*.parquet
loading_strategy: append
file_format: parquet
schema_evolution: true
partition_columns:
- transaction_dateraw_data_path: Path to raw data locationprocessed_data_path: Path to processed data locationcheckpoint_path: Path for checkpoints and metadatamax_parallel_jobs: Maximum number of concurrent processing jobsretry_attempts: Number of retry attempts for failed filestimeout_minutes: Timeout for processing a single file
table_name: Name of the target tabledatabase_name: Target database/schema namesource_path_pattern: File path pattern to match source files (supports wildcards)loading_strategy: Loading strategy (scd2,append,overwrite,merge)file_format: Source file format (parquet,csv,json,delta)schema_evolution: Enable automatic schema evolutionpartition_columns: Columns to partition the target table by
primary_keys: Primary key columns for SCD2tracking_columns: Columns to track for changesscd2_effective_date_column: Effective date column namescd2_end_date_column: End date column namescd2_current_flag_column: Current flag column name
# Run with configuration file
python -m data_loader.main run --config config.yaml
# Run with inline YAML configuration
python -m data_loader.main run --config-json 'raw_data_path: /mnt/raw/\n...'
# Run specific tables only
python -m data_loader.main run --config config.yaml --tables "customers,transactions"
# Dry run to see what would be processed
python -m data_loader.main run --config config.yaml --dry-run
# Run with optimization and vacuum
python -m data_loader.main run --config config.yaml --optimize --vacuum# Run with cluster optimizations (recommended)
python -m data_loader.main run-cluster --config config.yaml
# Run with Unity Catalog support
python -m data_loader.main run-cluster --config config.yaml --unity-catalog
# Run with resource monitoring
python -m data_loader.main run-cluster --config config.yaml --monitoring
# Dry run with cluster status
python -m data_loader.main run-cluster --config config.yaml --dry-run
# Check cluster configuration and health
python -m data_loader.main cluster-status --config config.yamlpython -m data_loader.main status --config config.yamlpython -m data_loader.main create-example-config --output my_config.yaml- Upload the package to Databricks workspace or DBFS
- Create a new job with the following configuration:
- Cluster: Use a cluster with Databricks Runtime 11.0+ and Delta Lake support
- Task Type: Python script
- Script path: Path to
main.pyin your uploaded package - Parameters:
["run", "--config", "/path/to/config.yaml"]
-
Upload the package to Databricks workspace or DBFS
-
Create a new job with the following configuration:
- Cluster: Use a cluster with Databricks Runtime 11.0+ and Delta Lake support
- Task Type: Python script
- Script path: Path to
main.pyin your uploaded package - Parameters:
["run-cluster", "--config", "/path/to/config.yaml", "--unity-catalog"]
-
Set up file trigger (if using file-based triggers):
- Configure the job to trigger on file arrival in your raw data location
- Use Databricks Auto Loader for streaming ingestion scenarios
{
"job_clusters": [{
"job_cluster_key": "data-loader-cluster",
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4,
"spark_conf": {
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
}
}
}],
"tasks": [{
"task_key": "data-loader",
"job_cluster_key": "data-loader-cluster",
"python_wheel_task": {
"package_name": "databricks_data_loader",
"entry_point": "main",
"parameters": ["run-cluster", "--config", "/mnt/config/data_loader.json"]
}
}]
}from data_loader.config.table_config import DataLoaderConfig
from data_loader.core.processor import DataProcessor
# Load configuration
config = DataLoaderConfig(**config_dict)
# Initialize processor
processor = DataProcessor(config)
# Process all tables
results = processor.process_all_tables()
# Process specific table
table_config = config.get_table_config("customers")
table_result = processor.process_table(table_config)
# Check status
status = processor.get_processing_status()from data_loader.config.table_config import DataLoaderConfig
from data_loader.cluster import ClusterConfig, ClusterDataProcessor, DatabricksEnvironment
# Load base configuration
base_config = DataLoaderConfig(**config_dict)
# Detect Databricks environment and create cluster configuration
environment = DatabricksEnvironment.detect_environment()
cluster_config = ClusterConfig.from_base_config(
base_config=base_config,
environment=environment,
enable_cluster_optimizations=True,
use_unity_catalog=True
)
# Initialize cluster processor
processor = ClusterDataProcessor(cluster_config)
# Validate cluster configuration
validation = processor.validate_cluster_configuration()
if not validation['valid']:
raise ValueError(f"Configuration invalid: {validation['errors']}")
# Process with cluster optimizations
results = processor.process_all_tables()
# Get comprehensive cluster status
cluster_status = processor.get_cluster_status()The SCD2 strategy maintains historical records by:
- Comparing incoming records with current records
- Identifying new and changed records
- Marking changed records as inactive (setting end_date and is_current=false)
- Inserting new/changed records as active
Requirements:
primary_keys: Columns that uniquely identify recordstracking_columns: Columns to monitor for changes- SCD2 metadata columns (effective_date, end_date, is_current)
The append strategy simply adds new data to the target table without any deduplication or change detection. Suitable for:
- Event/transaction tables
- Log tables
- Tables without primary keys
- Any scenario where all incoming data should be preserved
Features:
- Automatic audit column addition (
_load_timestamp,_source_file,_batch_id) - Optional deduplication
- Late-arriving data handling
Automatically detects and optimizes for Databricks environments:
- Cluster Type: Single User, Shared, or No Isolation Shared
- Resource Allocation: Worker count, cores, memory configuration
- Runtime Features: Unity Catalog availability, Delta Lake optimization
- Optimal Parallelism: Calculates ideal parallel job count based on cluster size
Real-time monitoring and optimization:
# Monitor cluster resources
resources = processor.resource_manager.get_cluster_resources()
health = processor.resource_manager.get_health_status()
# Get optimization recommendations
recommendations = processor.resource_manager.get_optimization_recommendations()Seamless integration with Unity Catalog:
# Enable Unity Catalog support
cluster_config = ClusterConfig.from_base_config(
base_config=base_config,
use_unity_catalog=True,
default_catalog="production"
)
# Tables automatically use: catalog.schema.table formatManage complex workflow dependencies:
cluster_config = ClusterConfig(
base_config=base_config,
enable_job_dependencies=True,
upstream_dependencies=[
"job:bronze-pipeline-job-id",
"table:bronze.raw_events",
"file:/mnt/config/ready.flag"
],
downstream_notifications=[
"webhook:https://hooks.slack.com/...",
"job:silver-transformation-job-id"
]
)Automatic Spark configuration based on cluster characteristics:
- Delta Lake optimizations: Auto-compaction, optimized writes
- Adaptive query execution: Dynamic partition coalescing, skew join handling
- Memory management: Optimal memory allocation and garbage collection
- Shuffle optimization: Adaptive shuffle partitions based on data size
For detailed cluster mode documentation, see CLUSTER_MODE.md.
The data loader maintains a Delta table to track file processing status:
CREATE TABLE metadata.file_processing_tracker (
file_path STRING,
file_size INT,
file_modified_time TIMESTAMP,
table_name STRING,
status STRING, -- pending, processing, completed, failed, skipped
processing_start_time TIMESTAMP,
processing_end_time TIMESTAMP,
error_message STRING,
retry_count INT,
created_at TIMESTAMP,
updated_at TIMESTAMP
);This ensures that:
- Files are never processed more than once
- Failed files can be retried
- Processing history is maintained
- Status can be monitored and reported
- Structured logging with configurable levels
- JSON format support for log aggregation
- File and console output options
- Performance metrics and execution timing
- File processing statistics
- Table-level metrics
- Success/failure rates
- Execution times
- Resource usage monitoring
The data loader provides robust error handling:
- File-level errors: Individual file failures don't stop the entire process
- Retry logic: Configurable retry attempts with exponential backoff
- Error tracking: All errors are logged and tracked in the file tracker
- Graceful degradation: Processing continues even if some files fail
- Configurable number of concurrent workers
- Thread-safe file status tracking
- Resource usage monitoring
- Delta Lake optimizations enabled by default
- Adaptive query execution
- Auto-compaction and optimize write
- Partitioning support
- Partitioning: Use appropriate partition columns for large tables
- File sizes: Aim for file sizes between 100MB-1GB for optimal performance
- Batch processing: Process files in batches rather than one-by-one
- Resource allocation: Size your cluster appropriately for the workload
Run the test suite:
# Run all tests
poetry run pytest data_loader/tests/
# Run with coverage
poetry run pytest --cov=data_loader data_loader/tests/
# Run specific test file
poetry run pytest data_loader/tests/test_basic.py- Clone the repository
- Install dependencies with Poetry:
poetry install - Activate the environment:
poetry shell(optional) - Run tests to verify setup:
poetry run pytest
- Create a new strategy class inheriting from
BaseLoadingStrategy - Implement required methods:
load_data(),validate_config() - Add strategy to the factory in
processor.py - Add configuration options to
table_config.py - Add tests for the new strategy
- Permission errors: Ensure the Databricks cluster has access to all specified paths
- Schema conflicts: Enable schema evolution or ensure consistent schemas
- Memory issues: Reduce batch sizes or increase cluster memory
- Timeout errors: Increase timeout settings or optimize file processing
Enable debug logging for detailed execution information:
python -m data_loader.main run --config config.yaml --log-level DEBUGFor running the loader as a Databricks job, use the data_loader.job_runner module. Configure widgets config, log_level, optimize and vacuum or set the environment variables DATALOADER_CONFIG_FILE etc. See docs/databricks_job.md for details.
See ROADMAP.md for detailed development plans, upcoming features, and long-term vision for the Databricks Data Loader framework.
- Fork the repository
- Create a feature branch
- Make your changes with tests
- Run the test suite
- Submit a pull request
For major features, please refer to the roadmap to ensure alignment with project direction.
This project is licensed under the MIT License - see the LICENSE file for details.