This document outlines the detailed implementation plan for the parallel processing framework, which is a key component of the Codegen upgrade plan. The framework will enable Codegen to leverage multi-core systems for improved performance when processing large codebases.
- Improve parsing and analysis performance by at least 3x on multi-core systems
- Maintain memory efficiency during parallel operations
- Provide configurable parallelism to adapt to different hardware environments
- Ensure thread safety for all critical operations
- Implement intelligent work distribution for optimal resource utilization
+------------------+ +------------------+ +------------------+
| TaskDefinition | | TaskScheduler | | TaskExecutor |
|------------------| |------------------| |------------------|
| - task_id | | - queue | | - thread_pool |
| - dependencies | | - schedule() | | - execute() |
| - priority | | - cancel() | | - wait() |
| - callable | | - wait_all() | | - shutdown() |
| - state | | - get_status() | | |
+------------------+ +------------------+ +------------------+
| | |
| | |
v v v
+------------------+ +------------------+ +------------------+
| TaskResult | | TaskQueue | | ThreadPool |
|------------------| |------------------| |------------------|
| - task_id | | - priority_queue | | - workers |
| - status | | - push() | | - queue |
| - result | | - pop() | | - submit() |
| - error | | - peek() | | - resize() |
+------------------+ +------------------+ +------------------+
+------------------+ +------------------+ +------------------+
| WorkItem | | WorkDistributor | | WorkerStats |
|------------------| |------------------| |------------------|
| - item_id | | - strategy | | - worker_id |
| - dependencies | | - distribute() | | - completed |
| - estimated_cost | | - balance() | | - pending |
| - data | | - optimize() | | - avg_time |
+------------------+ +------------------+ +------------------+
| | |
| | |
v v v
+------------------+ +------------------+ +------------------+
| BatchProcessor | | DistStrategy | | LoadBalancer |
|------------------| |------------------| |------------------|
| - batch_size | | - round_robin | | - threshold |
| - process_batch()| | - work_stealing | | - rebalance() |
| - split_batch() | | - cost_based | | - detect_skew() |
+------------------+ +------------------+ +------------------+
+------------------+ +------------------+ +------------------+
| SharedState | | Barrier | | ResultCollector |
|------------------| |------------------| |------------------|
| - data | | - count | | - results |
| - lock | | - wait() | | - collect() |
| - read() | | - release() | | - aggregate() |
| - write() | | - reset() | | - summarize() |
+------------------+ +------------------+ +------------------+
| | |
| | |
v v v
+------------------+ +------------------+ +------------------+
| AtomicCounter | | PhaseManager | | ProgressTracker |
|------------------| |------------------| |------------------|
| - value | | - phases | | - total |
| - increment() | | - current_phase | | - completed |
| - decrement() | | - advance() | | - update() |
+------------------+ +------------------+ +------------------+
class TaskDefinition:
"""Defines a unit of work that can be executed in parallel."""
def __init__(self,
callable_fn,
task_id=None,
dependencies=None,
priority=0):
"""
Initialize a task definition.
Args:
callable_fn: The function to execute
task_id: Unique identifier for the task
dependencies: List of task IDs that must complete before this task
priority: Execution priority (higher values = higher priority)
"""
self.task_id = task_id or str(uuid.uuid4())
self.callable = callable_fn
self.dependencies = dependencies or []
self.priority = priority
self.state = TaskState.PENDING
def can_execute(self, completed_tasks):
"""Check if all dependencies are satisfied."""
return all(dep in completed_tasks for dep in self.dependencies)class TaskScheduler:
"""Schedules and manages task execution."""
def __init__(self, num_workers=None, max_memory_percent=75):
"""
Initialize the task scheduler.
Args:
num_workers: Number of worker threads (default: CPU count)
max_memory_percent: Maximum memory usage percentage
"""
self.num_workers = num_workers or os.cpu_count()
self.max_memory_percent = max_memory_percent
self.queue = TaskQueue()
self.executor = TaskExecutor(self.num_workers)
self.completed_tasks = set()
self.results = {}
self.lock = threading.RLock()
def schedule(self, task):
"""Schedule a task for execution."""
with self.lock:
self.queue.push(task)
self._process_queue()
def schedule_batch(self, tasks):
"""Schedule multiple tasks at once."""
with self.lock:
for task in tasks:
self.queue.push(task)
self._process_queue()
def _process_queue(self):
"""Process tasks in the queue that are ready to execute."""
with self.lock:
executable_tasks = []
for _ in range(self.queue.size()):
task = self.queue.peek()
if task.can_execute(self.completed_tasks):
self.queue.pop()
executable_tasks.append(task)
else:
# Skip this task and try the next one
self.queue.rotate()
for task in executable_tasks:
self._submit_task(task)
def _submit_task(self, task):
"""Submit a task to the executor."""
future = self.executor.submit(self._task_wrapper, task)
future.add_done_callback(self._task_completed)
def _task_wrapper(self, task):
"""Wrapper around the task callable to handle exceptions."""
try:
result = task.callable()
return TaskResult(task.task_id, TaskState.COMPLETED, result)
except Exception as e:
return TaskResult(task.task_id, TaskState.FAILED, error=e)
def _task_completed(self, future):
"""Callback when a task completes."""
try:
result = future.result()
with self.lock:
self.completed_tasks.add(result.task_id)
self.results[result.task_id] = result
self._process_queue() # Process more tasks
except Exception as e:
logger.error(f"Error in task completion callback: {e}")
def wait_all(self):
"""Wait for all scheduled tasks to complete."""
self.executor.wait()
def shutdown(self):
"""Shutdown the scheduler and executor."""
self.executor.shutdown()class ThreadPool:
"""Custom thread pool for executing tasks."""
def __init__(self, num_workers):
"""
Initialize the thread pool.
Args:
num_workers: Number of worker threads
"""
self.num_workers = num_workers
self.queue = queue.Queue()
self.workers = []
self.shutdown_flag = threading.Event()
self._start_workers()
def _start_workers(self):
"""Start worker threads."""
for i in range(self.num_workers):
worker = threading.Thread(
target=self._worker_loop,
name=f"Worker-{i}",
daemon=True
)
worker.start()
self.workers.append(worker)
def _worker_loop(self):
"""Main worker thread loop."""
while not self.shutdown_flag.is_set():
try:
task = self.queue.get(timeout=0.1)
if task is None: # Sentinel value
break
task.run()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in worker thread: {e}")
finally:
self.queue.task_done()
def submit(self, task):
"""Submit a task to the thread pool."""
if self.shutdown_flag.is_set():
raise RuntimeError("ThreadPool is shutting down")
self.queue.put(task)
def resize(self, new_size):
"""Resize the thread pool."""
if new_size == self.num_workers:
return
if new_size < self.num_workers:
# Reduce the number of workers
for _ in range(self.num_workers - new_size):
self.queue.put(None) # Add sentinel to stop worker
# Wait for workers to exit
for worker in self.workers[new_size:]:
worker.join(timeout=1.0)
self.workers = self.workers[:new_size]
else:
# Increase the number of workers
for i in range(self.num_workers, new_size):
worker = threading.Thread(
target=self._worker_loop,
name=f"Worker-{i}",
daemon=True
)
worker.start()
self.workers.append(worker)
self.num_workers = new_size
def shutdown(self, wait=True):
"""Shutdown the thread pool."""
self.shutdown_flag.set()
# Add sentinel values to stop all workers
for _ in range(len(self.workers)):
self.queue.put(None)
if wait:
for worker in self.workers:
worker.join(timeout=1.0)class ParallelFileProcessor:
"""Process files in parallel using the task system."""
def __init__(self, codebase, num_workers=None):
"""
Initialize the parallel file processor.
Args:
codebase: The codebase to process
num_workers: Number of worker threads
"""
self.codebase = codebase
self.scheduler = TaskScheduler(num_workers)
def process_files(self, file_paths, processor_fn):
"""
Process multiple files in parallel.
Args:
file_paths: List of file paths to process
processor_fn: Function to apply to each file
Returns:
Dictionary mapping file paths to processing results
"""
# Create tasks for each file
for file_path in file_paths:
task = TaskDefinition(
lambda path=file_path: processor_fn(path),
task_id=file_path
)
self.scheduler.schedule(task)
# Wait for all tasks to complete
self.scheduler.wait_all()
# Collect and return results
results = {}
for file_path in file_paths:
if file_path in self.scheduler.results:
result = self.scheduler.results[file_path]
if result.status == TaskState.COMPLETED:
results[file_path] = result.result
else:
logger.error(f"Failed to process {file_path}: {result.error}")
return results
def shutdown(self):
"""Shutdown the processor."""
self.scheduler.shutdown()def discover_files_parallel(codebase, root_dir, file_patterns, exclusion_patterns=None):
"""
Discover files in parallel.
Args:
codebase: The codebase instance
root_dir: Root directory to start discovery
file_patterns: List of file patterns to include
exclusion_patterns: List of patterns to exclude
Returns:
List of discovered file paths
"""
exclusion_patterns = exclusion_patterns or []
# Split directory traversal into top-level directories for parallelism
top_dirs = [os.path.join(root_dir, d) for d in os.listdir(root_dir)
if os.path.isdir(os.path.join(root_dir, d))]
# Create processor
processor = ParallelFileProcessor(codebase)
# Define discovery function for each top directory
def discover_in_dir(directory):
discovered = []
for root, _, files in os.walk(directory):
# Check if this directory should be excluded
if any(re.match(pattern, root) for pattern in exclusion_patterns):
continue
for file in files:
file_path = os.path.join(root, file)
# Check if file matches include patterns and not exclude patterns
if (any(re.match(pattern, file) for pattern in file_patterns) and
not any(re.match(pattern, file_path) for pattern in exclusion_patterns)):
discovered.append(file_path)
return discovered
# Process directories in parallel
results = processor.process_files(top_dirs, discover_in_dir)
processor.shutdown()
# Combine results
all_files = []
for files in results.values():
all_files.extend(files)
return all_filesdef parse_files_parallel(codebase, file_paths):
"""
Parse multiple files in parallel.
Args:
codebase: The codebase instance
file_paths: List of file paths to parse
Returns:
Dictionary mapping file paths to parsed ASTs
"""
# Create processor
processor = ParallelFileProcessor(codebase)
# Define parsing function
def parse_file(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_ext = os.path.splitext(file_path)[1]
parser = get_parser_for_extension(file_ext)
if parser is None:
return None
return parser.parse(content, file_path)
# Parse files in parallel
results = processor.process_files(file_paths, parse_file)
processor.shutdown()
return resultsclass ParallelProcessingConfig:
"""Configuration for parallel processing."""
def __init__(self):
"""Initialize with default values."""
# Number of worker threads (None = use CPU count)
self.num_workers = None
# Maximum memory usage percentage
self.max_memory_percent = 75
# Batch size for processing
self.batch_size = 100
# Priority boost for critical path tasks
self.critical_path_priority = 10
# Enable work stealing between threads
self.enable_work_stealing = True
# Minimum task size for parallelization (smaller tasks run sequentially)
self.min_task_size = 1
# Maximum number of tasks in flight
self.max_tasks_in_flight = 1000
@classmethod
def from_dict(cls, config_dict):
"""Create configuration from dictionary."""
config = cls()
for key, value in config_dict.items():
if hasattr(config, key):
setattr(config, key, value)
return config
def to_dict(self):
"""Convert configuration to dictionary."""
return {
key: getattr(self, key)
for key in dir(self)
if not key.startswith('_') and not callable(getattr(self, key))
}class ParallelAutoTuner:
"""Auto-tunes parallel processing parameters."""
def __init__(self, codebase):
"""
Initialize the auto-tuner.
Args:
codebase: The codebase instance
"""
self.codebase = codebase
self.config = ParallelProcessingConfig()
self.metrics = {}
def tune(self):
"""Auto-tune parameters based on system and codebase."""
# Adjust workers based on CPU count and available memory
cpu_count = os.cpu_count()
available_memory = psutil.virtual_memory().available
total_memory = psutil.virtual_memory().total
# Start with CPU count
self.config.num_workers = cpu_count
# Adjust based on memory constraints
memory_per_worker = 200 * 1024 * 1024 # 200MB per worker estimate
max_workers_by_memory = available_memory // memory_per_worker
# Take the minimum
self.config.num_workers = min(self.config.num_workers, max_workers_by_memory)
# Ensure at least one worker
self.config.num_workers = max(1, self.config.num_workers)
# Adjust batch size based on codebase size
codebase_size = len(self.codebase.files)
if codebase_size < 100:
self.config.batch_size = 10
elif codebase_size < 1000:
self.config.batch_size = 50
else:
self.config.batch_size = 100
# Adjust max tasks in flight
self.config.max_tasks_in_flight = self.config.num_workers * 10
return self.config
def benchmark(self, file_sample_size=100):
"""Run benchmarks to optimize configuration."""
# Sample files for benchmarking
all_files = list(self.codebase.files.keys())
sample_size = min(file_sample_size, len(all_files))
sample_files = random.sample(all_files, sample_size)
# Test different worker counts
worker_counts = [1, 2, 4, 8, 16, os.cpu_count()]
worker_counts = sorted(set(worker_counts))
results = {}
for workers in worker_counts:
if workers > os.cpu_count():
continue
# Configure processor
processor = ParallelFileProcessor(self.codebase, workers)
# Benchmark parsing
start_time = time.time()
processor.process_files(sample_files, lambda f: self.codebase.parse_file(f))
elapsed = time.time() - start_time
results[workers] = elapsed
processor.shutdown()
# Find optimal worker count
optimal_workers = min(results.items(), key=lambda x: x[1])[0]
self.config.num_workers = optimal_workers
return resultsclass Codebase:
"""Enhanced Codebase class with parallel processing capabilities."""
def __init__(self, root_dir, parallel=True, parallel_config=None):
"""
Initialize the codebase.
Args:
root_dir: Root directory of the codebase
parallel: Whether to enable parallel processing
parallel_config: Configuration for parallel processing
"""
self.root_dir = root_dir
self.parallel = parallel
# Initialize parallel processing
if self.parallel:
self.parallel_config = parallel_config or ParallelProcessingConfig()
self.parallel_processor = ParallelFileProcessor(self, self.parallel_config.num_workers)
else:
self.parallel_processor = None
# Initialize other components
self.files = {}
self.symbols = {}
# ... other initialization ...
# Discover and parse files
self._discover_files()
self._parse_files()
def _discover_files(self):
"""Discover files in the codebase."""
if self.parallel:
file_patterns = ["*.py", "*.ts", "*.js", "*.tsx", "*.jsx"]
exclusion_patterns = ["**/node_modules/**", "**/.git/**", "**/venv/**"]
self.file_paths = discover_files_parallel(
self, self.root_dir, file_patterns, exclusion_patterns
)
else:
# Existing sequential implementation
pass
def _parse_files(self):
"""Parse all discovered files."""
if self.parallel:
parsed_files = parse_files_parallel(self, self.file_paths)
# Process parsed results
for file_path, ast in parsed_files.items():
if ast is not None:
self.files[file_path] = File(file_path, ast, self)
else:
# Existing sequential implementation
pass
def auto_tune_parallel(self):
"""Auto-tune parallel processing parameters."""
if not self.parallel:
return
tuner = ParallelAutoTuner(self)
self.parallel_config = tuner.tune()
# Update processor with new configuration
if self.parallel_processor:
self.parallel_processor.shutdown()
self.parallel_processor = ParallelFileProcessor(
self, self.parallel_config.num_workers
)
return self.parallel_config-
Unit Tests:
- Test each component of the parallel framework in isolation
- Verify correct behavior with different configurations
- Test edge cases and error handling
-
Integration Tests:
- Test integration with file discovery and parsing
- Verify correct behavior with different codebase sizes
- Test with different file types and structures
-
Performance Tests:
- Benchmark against sequential implementation
- Test scaling with different numbers of cores
- Measure memory usage during parallel operations
-
Stress Tests:
- Test with very large codebases
- Test with limited system resources
- Test recovery from failures
-
Phase 1: Implementation
- Implement core task system
- Implement thread pool and work distribution
- Implement integration with file discovery and parsing
-
Phase 2: Testing
- Run unit and integration tests
- Perform performance benchmarks
- Address any issues discovered
-
Phase 3: Documentation
- Document API and configuration options
- Create usage examples
- Update architecture documentation
-
Phase 4: Rollout
- Release as opt-in feature
- Gather feedback from early adopters
- Make adjustments based on real-world usage
The parallel processing framework will significantly improve Codegen's performance on multi-core systems, particularly for large codebases. By implementing a flexible, configurable system with intelligent work distribution, we can achieve optimal resource utilization while maintaining the robustness and reliability expected from Codegen.
This implementation plan provides a detailed roadmap for developing the parallel processing framework, integrating it with the existing codebase, and ensuring it meets the performance and reliability goals of the Codegen upgrade plan.