#!/usr/local/cpanel/3rdparty/bin/perl

#                                      Copyright 2025 WebPros International, LLC
#                                                           All rights reserved.
# copyright@cpanel.net                                         http://cpanel.net
# This code is subject to the cPanel license. Unauthorized copying is prohibited.

package bin::comet_backup_runner;

=encoding utf-8

=head1 NAME

bin::comet_backup_runner - Backup job runner for Comet Backup system

=head1 SYNOPSIS

    # Run as script
    /usr/local/cpanel/bin/comet_backup_runner --job_uuid=12345678-1234-1234-1234-123456789012
    /usr/local/cpanel/bin/comet_backup_runner --job_uuid=12345678-1234-1234-1234-123456789012 --bypass-schedule
    /usr/local/cpanel/bin/comet_backup_runner --job_uuid=12345678-1234-1234-1234-123456789012 --dryrun
    
    # Use as module
    use bin::comet_backup_runner ();
    my $runner = bin::comet_backup_runner->new();
    $runner->run_backup_job($job_uuid);

=head1 DESCRIPTION

This modulino handles the execution of Comet Backup jobs. It performs the heavy lifting
of backup operations by coordinating with the Comet Backup Manager APIs, monitoring system
resources, managing parallelism, and logging all activities to a database for quick
retrieval and monitoring.

Key features:
- Executes backup jobs based on job UUID
- Generates unique run UUID for each backup job execution instance
- Validates schedule timing (current day matches schedule, not run within 24 hours)
- Manages system load monitoring and parallelism (3 users concurrently)
- Implements pool-based backup management (3 concurrent backup jobs per user)
- Polls backup jobs for completion status with 60-minute timeout per job
- Logs start/end times, progress, and status for jobs, users, and protected items to SQLite
- Handles job interruption and cancellation
- Integrates with Comet Backup Manager APIs
- Provides detailed progress tracking and error handling

Backup Job Workflow:
1. Load job configuration and validate schedule timing
2. Determine users to backup
3. Process users in parallel (up to 3 concurrent users)
4. For each user, maintain a pool of up to 3 concurrent backup jobs
5. Poll each active backup job every 10 seconds for status and progress
6. Update database with progress information
7. Handle timeouts (60 minutes max per backup job)
8. Record completion status and snapshot IDs in ProtectedItemMap

=cut

use cPstrict;

use Cpanel::AcctUtils::Suspended            ();
use Cpanel::Config::Users                   ();
use Cpanel::DBI::SQLite                     ();
use Cpanel::Exception                       ();
use Cpanel::FileUtils::TouchFile            ();
use Cpanel::FileUtils::Write                ();
use Cpanel::JSON                            ();
use Cpanel::LoadFile                        ();
use Cpanel::Mkdir                           ();
use Cpanel::PID                             ();
use Cpanel::PwCache                         ();
use Cpanel::SafeRun::Object                 ();
use Cpanel::Validate::UUID                  ();
use Data::Dumper                            ();
use DBD::SQLite                             ();
use Errno                                   ();
use Fcntl                                   ();
use Getopt::Long                            ();
use IO::Handle                              ();
use POSIX                                   qw( SIGTERM SIGINT );
use Time::HiRes                             ();
use Time::Piece                             ();
use Time::Seconds                           ();
use Whostmgr::CometBackup::BackupJobs       ();
use Whostmgr::CometBackup::CometBackupAPI   ();
use Whostmgr::CometBackup::Constants        ();
use Whostmgr::CometBackup::Logger           ();
use Whostmgr::CometBackup::MemoryGuard      ();
use Whostmgr::CometBackup::ProtectedItemMap ();

# Exit with modulino pattern
exit __PACKAGE__->main(@ARGV) unless caller;

# Fallback minimum free system memory (MB) for cases where the kernel's
# vm.min_free_kbytes can't be read. The actual default is computed at
# runner startup: 2 * min_free_kbytes_mb + DEFAULT_MIN_FREE_BUFFER_MB,
# which scales the threshold to the box (small on a 1 GB VPS, large on a
# 64 GB host). Override per run with --min_free_memory_mb=N.
use constant DEFAULT_MIN_FREE_MEMORY_MB => 200;
use constant DEFAULT_MIN_FREE_BUFFER_MB => 50;
use constant DEFAULT_MIN_FREE_FLOOR_MB  => 100;    # never auto-compute below this

# Global state
our $INTERRUPTED = 0;

=head1 METHODS

=head2 main

Main entry point when used as a script.

    exit __PACKAGE__->main(@ARGV) unless caller;

=cut

sub main ( $class, @args ) {

    # Plugin policy: nothing in this process should be world-readable.
    umask 0027;

    my %options;
    Getopt::Long::GetOptionsFromArray(
        \@args,
        \%options,
        'help|h',
        'job_uuid=s',
        'run_uuid=s',
        'runany',
        'bypass-schedule',
        'debug',
        'dryrun',
        'max_parallel=i',
        'max_load=f',
        'min_free_memory_mb=i',
    ) or return $class->usage(1);

    return $class->usage(0) if $options{'help'};

    # Either job_uuid or runany is required
    unless ( $options{'job_uuid'} || $options{'runany'} ) {
        print STDERR "Error: Either --job_uuid or --runany is required\n";
        return $class->usage(1);
    }

    # job_uuid and runany are mutually exclusive
    if ( $options{'job_uuid'} && $options{'runany'} ) {
        print STDERR "Error: Cannot use both --job_uuid and --runany together\n";
        return $class->usage(1);
    }

    # Validate UUID format if provided
    if ( $options{'job_uuid'} ) {
        unless ( Cpanel::Validate::UUID::is_valid_uuid( $options{'job_uuid'} ) ) {
            print STDERR "Error: Invalid UUID format: $options{'job_uuid'}\n";
            return 1;
        }
    }

    # Validate run_uuid if provided
    if ( $options{'run_uuid'} && !Cpanel::Validate::UUID::is_valid_uuid( $options{'run_uuid'} ) ) {
        print STDERR "Error: Invalid run_uuid format: $options{'run_uuid'}\n";
        return 1;
    }

    # Override defaults if specified
    $Whostmgr::CometBackup::Constants::MAX_PARALLEL_USERS = $options{'max_parallel'} if defined $options{'max_parallel'};
    $Whostmgr::CometBackup::Constants::MAX_LOAD_AVERAGE   = $options{'max_load'}     if defined $options{'max_load'};

    my $runner = $class->new(
        debug           => $options{'debug'},
        bypass_schedule => $options{'bypass-schedule'},
        dryrun          => $options{'dryrun'},
        run_uuid        => $options{'run_uuid'},
        defined $options{'min_free_memory_mb'} ? ( min_free_memory_mb => $options{'min_free_memory_mb'} ) : (),
    );

    print STDERR "DEBUG: Runner created successfully\n" if $options{'debug'};

    # Only one backup runner is allowed at a time. Concurrent runners are how
    # CPANEL-53247 manifests, so we deliberately have no escape hatch here —
    # callers must cancel any in-flight run before starting a new one.
    print STDERR "DEBUG: Checking for existing runner process\n" if $options{'debug'};
    $runner->_check_existing_runner();
    print STDERR "DEBUG: No existing runner found\n" if $options{'debug'};

    # Create PID file for this runner process
    print STDERR "DEBUG: Creating runner PID file\n" if $options{'debug'};
    $runner->_create_pid_file();
    print STDERR "DEBUG: PID file created\n" if $options{'debug'};

    # Install signal handlers
    $SIG{TERM} = sub { $INTERRUPTED = 1; $runner->_handle_interruption('TERM'); };
    $SIG{INT}  = sub { $INTERRUPTED = 1; $runner->_handle_interruption('INT'); };

    # Run all eligible jobs or specific job
    my $exit_code;
    if ( $options{'runany'} ) {
        print STDERR "DEBUG: Running all eligible backup jobs\n" if $options{'debug'};
        $exit_code = $runner->run_all_eligible_jobs();
    }
    else {
        print STDERR "DEBUG: About to run backup job: $options{'job_uuid'}\n" if $options{'debug'};
        $exit_code = $runner->run_backup_job( $options{'job_uuid'} );
    }

    # Cleanup PID file before exit
    $runner->_cleanup();

    return $exit_code;
}

=head2 usage

Displays usage information.

    return $class->usage($exit_code);

=cut

sub usage ( $class, $exit_code = 0 ) {
    print <<~'EOF';
        Usage: comet_backup_runner [OPTIONS]
        
        Execute Comet Backup jobs with comprehensive logging and monitoring.
        
        Options:
          --job_uuid=UUID    UUID of the backup job to execute (mutually exclusive with --runany)
          --runany           Run all eligible scheduled backup jobs sequentially (mutually exclusive with --job_uuid)
          --run_uuid=UUID    UUID for this specific run instance (optional, auto-generated if not provided)
          --bypass-schedule  Skip the schedule timing/cooldown check. Used by the cron
                             helper which has already made its own scheduling decision.
                             The "one runner at a time" rule still applies; cancel any
                             in-flight run before starting a new one.
          --debug            Enable debug logging
          --dryrun           Simulate execution without making API calls to Comet Backup server
          --max_parallel=N         Maximum parallel user backups (default: 3)
          --max_load=N.N           Maximum system load average (default: 5.0)
          --min_free_memory_mb=N   Minimum MemAvailable (MB) required to start
                                   each protected item. The run aborts cleanly
                                   if free memory drops below this.

                                   Default is auto-computed from the kernel's
                                   vm.min_free_kbytes reclaim watermark:
                                   (2 * min_free_kbytes_mb) + 50 MB buffer,
                                   floored at 100 MB. This scales the threshold
                                   to the host:
                                     1 GB VPS  ->  100 MB
                                     8 GB box  ->  ~182 MB
                                     64 GB box ->  ~430 MB
                                   Falls back to 200 MB if the kernel value
                                   can't be read. Override for testing or
                                   special workloads.
          --help, -h               Show this help message

        Memory safety:
          The runner samples system free memory (/proc/meminfo:MemAvailable) before
          starting each protected item and aborts the run if less than
          --min_free_memory_mb is available, to avoid pushing the host into an OOM
          condition. The threshold is auto-computed by default (see above) so it
          stays appropriate across hardware sizes. Total memory is captured once
          at startup and included in logs.
        
        Examples:
          # Run a specific backup job
          comet_backup_runner --job_uuid=12345678-1234-1234-1234-123456789012

          # Run all eligible scheduled jobs
          comet_backup_runner --runany

          # Run all eligible jobs with debug output
          comet_backup_runner --runany --debug

          # Run a specific job and skip the schedule cooldown check
          comet_backup_runner --job_uuid=12345678-1234-1234-1234-123456789012 --bypass-schedule

          # Dry run mode (no actual API calls)
          comet_backup_runner --runany --dryrun
          
          # With custom run UUID
          comet_backup_runner --job_uuid=12345678-1234-1234-1234-123456789012 --run_uuid=abcdef01-2345-6789-abcd-ef0123456789
        
        
        EOF
    return $exit_code;
}

=head2 new

Creates a new backup runner instance.

    my $runner = bin::comet_backup_runner->new(
        debug           => 1,    # Enable debug logging
        bypass_schedule => 0,    # Skip the schedule/cooldown timing check
        dryrun          => 0,    # Simulate execution without API calls
    );

=cut

sub new ( $class, %args ) {
    my $explicit_threshold = defined $args{min_free_memory_mb};
    my $self               = bless {
        debug                  => $args{debug}           || 0,
        bypass_schedule        => $args{bypass_schedule} || 0,
        dryrun                 => $args{dryrun}          || 0,
        run_uuid               => $args{run_uuid}        || undef,
        min_free_memory_mb     => $explicit_threshold ? $args{min_free_memory_mb} : _compute_default_min_free_memory_mb(),
        min_free_memory_source => $explicit_threshold ? 'explicit'                : 'auto',
        logger                 => undef,
        run_id                 => undef,
        active_processes       => {},
        api_cache              => {},       # Cache for expensive API calls (source_ids, connection_id)
        blocked_destinations   => {},       # dest_id => reason; populated when a destination is unusable for this run
        blocklist_reader       => undef,    # parent-only read end of the cross-fork blocklist pipe (see _open_blocklist_pipe)
        blocklist_writer       => undef,    # writer inherited by forked children for blocklist propagation
        total_memory_mb        => undef,    # captured once at construction from /proc/meminfo:MemTotal
        run_fatal_reason       => undef,    # set when a non-recoverable condition is detected; dispatch loop drops the queue and exits
    }, $class;

    print STDERR "DEBUG: Creating new runner instance\n" if $self->{debug};

    # Initialize logger
    $self->{logger} = Whostmgr::CometBackup::Logger->new( log_level => $self->{debug} ? 'debug' : 'info' );

    print STDERR "DEBUG: Logger initialized\n" if $self->{debug};

    # Capture total system memory once for run-context logging. Live MemAvailable
    # is sampled per item by _check_free_memory.
    my $meminfo = $self->_read_meminfo_kb();
    if ( $meminfo && $meminfo->{MemTotal} ) {
        $self->{total_memory_mb} = int( $meminfo->{MemTotal} / 1024 );
    }

    # Initialize database
    $self->_ensure_run_log_database();

    print STDERR "DEBUG: Database initialized\n" if $self->{debug};

    return $self;
}

=head2 run_backup_job

Executes a backup job by UUID.

    my $exit_code = $runner->run_backup_job($job_uuid);

Returns 0 on success, non-zero on failure.

=cut

sub run_backup_job ( $self, $job_uuid ) {
    my $logger = $self->{logger};

    print STDERR "DEBUG: Starting run_backup_job for UUID: $job_uuid\n" if $self->{debug};

    # Store job UUID for cancel file operations
    $self->{job_uuid} = $job_uuid;

    # Clear any stale cancel file from previous runs.
    # Without this, a newly started run can be cancelled immediately.
    my $stale_cancel_file = Whostmgr::CometBackup::Constants::get_cancel_file_path($job_uuid);
    if ( -f $stale_cancel_file ) {
        unlink $stale_cancel_file;
        $logger->warn(
            "Removed stale cancel file before starting backup job",
            {
                job_uuid    => $job_uuid,
                cancel_file => $stale_cancel_file,
            }
        );
    }

    print STDERR "DEBUG: About to call logger->info\n" if $self->{debug};

    $logger->info(
        "Starting backup job runner",
        {
            job_uuid               => $job_uuid,
            pid                    => $$,
            total_memory_mb        => $self->{total_memory_mb},
            min_free_memory_mb     => $self->{min_free_memory_mb},
            min_free_memory_source => $self->{min_free_memory_source},
        }
    );

    print STDERR "DEBUG: Logger->info completed\n" if $self->{debug};

    # Check if Comet Backup is disabled via touchfile
    # If the touchfile exists, backups should not run (but restorations are still allowed)
    if ( -f $Whostmgr::CometBackup::Constants::DISABLED_TOUCHFILE ) {
        $logger->info(
            "Comet Backup is currently disabled - skipping backup job",
            {
                job_uuid  => $job_uuid,
                touchfile => $Whostmgr::CometBackup::Constants::DISABLED_TOUCHFILE,
            }
        );
        print STDERR "Comet Backup is disabled - backup operations are not allowed\n";
        return 0;    # Return success since this is an expected condition, not an error
    }

    eval {
        print STDERR "DEBUG: Inside eval block\n" if $self->{debug};

        # Load job configuration
        print STDERR "DEBUG: About to load job config\n" if $self->{debug};
        my $job_config = $self->_load_job_config($job_uuid);
        print STDERR "DEBUG: Job config loaded successfully\n" if $self->{debug};

        # Create run log entry
        print STDERR "DEBUG: About to create run log\n" if $self->{debug};
        my $run_id = $self->_create_run_log( $job_uuid, $job_config );
        $self->{run_id} = $run_id;
        print STDERR "DEBUG: Run log created with ID: $run_id, run_uuid: $self->{run_uuid}\n" if $self->{debug};

        # Mark any incomplete runs left by a previous unclean shutdown.
        eval {
            require Whostmgr::CometBackup::BackupRunLogs;
            my $orphaned_count = Whostmgr::CometBackup::BackupRunLogs->new()->mark_orphaned_runs(
                current_run_uuid => $self->{run_uuid},
            );
            $logger->info( "Marked stale backup runs from previous unclean shutdown", { count => $orphaned_count } )
              if $orphaned_count;
        };
        $logger->warn( "Failed to mark orphaned runs from previous shutdown", { error => $@ } ) if $@;

        # Execute the backup job
        print STDERR "DEBUG: About to execute backup job\n" if $self->{debug};
        $self->_execute_backup_job($job_config);

        # Check if the job was cancelled during execution (even if no error was thrown)
        my $cancel_file   = Whostmgr::CometBackup::Constants::get_cancel_file_path($job_uuid);
        my $was_cancelled = $INTERRUPTED || -f $cancel_file;

        if ($was_cancelled) {

            # Mark run as cancelled
            $self->_complete_run_log( $run_id, 'cancelled', 'Job was cancelled by user request' );

            # Append summary to text log file
            $self->_append_run_summary_to_file();

            $logger->info(
                "Backup job cancelled",
                {
                    job_uuid => $job_uuid,
                    run_uuid => $self->{run_uuid},
                    run_id   => $run_id,
                }
            );
        }
        else {
            # A run-fatal condition (currently only insufficient free memory;
            # destination-quota exhaustion is per-destination and uses the
            # blocklist rather than aborting the run) always marks the run
            # failed so the reason is preserved in the DB regardless of which
            # per-item statuses were recorded before the gate fired.
            my $run_status =
              $self->{run_fatal_reason}
              ? 'failed'
              : $self->_determine_run_status($run_id);

            # Mark run with appropriate status; pass run_fatal_reason as the
            # error_message so backup_run_logs.db captures the machine-readable cause.
            $self->_complete_run_log( $run_id, $run_status, $self->{run_fatal_reason} );

            # Append summary to text log file
            $self->_append_run_summary_to_file();

            if ( $run_status eq 'failed' ) {
                $logger->error(
                    "Backup job failed - all protected items failed",
                    {
                        job_uuid         => $job_uuid,
                        run_uuid         => $self->{run_uuid},
                        run_id           => $run_id,
                        status           => 'failed',
                        run_fatal_reason => $self->{run_fatal_reason},
                    }
                );
                if ( $self->{run_fatal_reason} ) {
                    print STDERR "ERROR: Backup job failed (reason: $self->{run_fatal_reason}). See logs for details.\n";
                }
                else {
                    print STDERR "ERROR: Backup job failed - all protected items could not be backed up. See logs for details.\n";
                }
            }
            elsif ( $run_status eq 'partial' ) {
                $logger->warn(
                    "Backup job completed with some protected items failing",
                    {
                        job_uuid => $job_uuid,
                        run_uuid => $self->{run_uuid},
                        run_id   => $run_id,
                        status   => 'partial',
                    }
                );
                print STDERR "WARNING: Backup job completed but some protected items failed. See logs for details.\n";
            }
            else {
                $logger->info(
                    "Backup job completed successfully",
                    {
                        job_uuid => $job_uuid,
                        run_uuid => $self->{run_uuid},
                        run_id   => $run_id,
                    }
                );

                # Update recovery metadata only on successful completion
                # This ensures disaster recovery metadata only reflects servers with complete, successful backups
                eval {
                    require Whostmgr::CometBackup::CometBackupAPI;
                    require Whostmgr::CometBackup::CometBackupAPI::ProtectedItemBatch;

                    my $server_config = Whostmgr::CometBackup::CometBackupAPI::get_server_config();
                    my $target_user   = $server_config->{username};

                    if ($target_user) {
                        my $batch = Whostmgr::CometBackup::CometBackupAPI::ProtectedItemBatch->new( target_user => $target_user );

                        $logger->debug(
                            "Updating recovery metadata after successful backup completion",
                            {
                                job_uuid => $job_uuid,
                                run_id   => $run_id,
                            }
                        );

                        # Mode 2: standalone update with automatic submission and retry logic
                        my $metadata_success = $batch->update_recovery_metadata();

                        if ($metadata_success) {
                            $logger->debug(
                                "Successfully updated recovery metadata",
                                {
                                    job_uuid => $job_uuid,
                                    run_id   => $run_id,
                                }
                            );
                        }
                        else {
                            $logger->warn(
                                "Failed to update recovery metadata, but backup completed successfully",
                                {
                                    job_uuid => $job_uuid,
                                    run_id   => $run_id,
                                }
                            );
                        }
                    }
                };
                if ($@) {

                    # Log but don't fail the backup if metadata update fails
                    $logger->warn(
                        "Exception updating recovery metadata",
                        {
                            job_uuid => $job_uuid,
                            run_id   => $run_id,
                            error    => $@,
                        }
                    );
                }
            }
        }

        # Don't cleanup here - cleanup happens in main() after all jobs complete
        return 0;
    };

    if ( my $error = $@ ) {
        print STDERR "DEBUG: Error caught: $error\n" if $self->{debug};

        # Check if cancellation was requested
        my $cancel_file   = Whostmgr::CometBackup::Constants::get_cancel_file_path($job_uuid);
        my $was_cancelled = $INTERRUPTED || -f $cancel_file;

        if ($was_cancelled) {
            $logger->info(
                "Backup job cancelled",
                {
                    job_uuid => $job_uuid,
                    run_uuid => $self->{run_uuid},
                }
            );

            if ( $self->{run_id} ) {
                $self->_complete_run_log( $self->{run_id}, 'cancelled', 'Job was cancelled by user request' );

                # Append summary to text log file
                $self->_append_run_summary_to_file();
            }
            else {
                # If run_id isn't set yet, still update the job status in backup_jobs
                require Whostmgr::CometBackup::BackupJobs;
                my $jobs = Whostmgr::CometBackup::BackupJobs->new();
                $jobs->update_job_status( $job_uuid, 'cancelled' );
                $jobs->clear_current_run_uuid($job_uuid);
            }
        }
        else {
            $logger->error(
                "Backup job failed",
                {
                    job_uuid => $job_uuid,
                    run_uuid => $self->{run_uuid},
                    error    => $error,
                }
            );

            if ( $self->{run_id} ) {
                $self->_complete_run_log( $self->{run_id}, 'failed', $error );

                # Append summary to text log file
                $self->_append_run_summary_to_file();
            }
        }

        # Don't cleanup here - cleanup happens in main() after all jobs complete
        return 1;
    }

    # Normal completion: non-zero if a run-fatal condition fired (currently
    # only insufficient free memory; per-destination quota exhaustion does
    # not set run_fatal_reason — it adds the destination to the blocklist
    # and lets the rest of the run proceed). The non-zero exit lets the
    # caller/scheduler distinguish graceful abort from success.
    return $self->{run_fatal_reason} ? 1 : 0;
}

=head2 run_all_eligible_jobs

Runs all eligible scheduled backup jobs sequentially.

Checks all active scheduled jobs and runs those that:
- Match the current schedule (day/time)
- Haven't run within the last 24 hours
- Are active (is_active = 1)

Returns 0 if all jobs succeeded, 1 if any job failed.

=cut

sub run_all_eligible_jobs ($self) {
    my $logger = $self->{logger};

    $logger->info(
        "Starting run_all_eligible_jobs",
        {
            pid => $$,
        }
    );

    print STDERR "DEBUG: Looking for eligible backup jobs\n" if $self->{debug};

    # Check if Comet Backup is disabled via touchfile
    # If the touchfile exists, backups should not run (but restorations are still allowed)
    if ( -f $Whostmgr::CometBackup::Constants::DISABLED_TOUCHFILE ) {
        $logger->info(
            "Comet Backup is currently disabled - skipping all scheduled backups",
            {
                touchfile => $Whostmgr::CometBackup::Constants::DISABLED_TOUCHFILE,
            }
        );
        print "Comet Backup is disabled - backup operations are not allowed\n";
        return 0;    # Return success since this is an expected condition, not an error
    }

    # Get all active scheduled jobs
    my $jobs     = Whostmgr::CometBackup::BackupJobs->new();
    my @all_jobs = $jobs->get_backup_jobs( [], { active_only => 1 } );

    print STDERR "DEBUG: Found " . scalar(@all_jobs) . " active jobs\n" if $self->{debug};

    # Filter to only scheduled jobs (not manual). Manual jobs are executed via
    # the run_backup_job API call, not through the --runany sweep, so cron-driven
    # runs never fire them accidentally.
    my @scheduled_jobs = grep { $_->{schedule_type} eq 'scheduled' } @all_jobs;

    print STDERR "DEBUG: Found " . scalar(@scheduled_jobs) . " scheduled jobs\n" if $self->{debug};

    unless (@scheduled_jobs) {
        $logger->info("No scheduled backup jobs found");
        print "No scheduled backup jobs configured.\n";
        return 0;
    }

    # Track results
    my $total_jobs     = scalar(@scheduled_jobs);
    my $eligible_count = 0;
    my $success_count  = 0;
    my $failed_count   = 0;
    my $skipped_count  = 0;
    my @failed_jobs;

    $logger->info(
        "Found scheduled jobs to evaluate",
        {
            total_scheduled => $total_jobs,
        }
    );

    print "Evaluating $total_jobs scheduled backup jobs...\n" unless $self->{debug};

    # Process each job sequentially
    for my $job (@scheduled_jobs) {
        my $job_uuid = $job->{job_id};
        my $job_name = $job->{name};

        print STDERR "DEBUG: Evaluating job $job_uuid ($job_name)\n" if $self->{debug};

        # Reload job config from database to get latest last_run_timestamp
        # This is critical for the 24-hour cooldown check to work correctly
        # when processing multiple jobs sequentially
        my @refreshed_jobs = $jobs->get_backup_jobs( [$job_uuid], { active_only => 1 } );
        unless (@refreshed_jobs) {
            print STDERR "DEBUG: Job $job_uuid no longer active, skipping\n" if $self->{debug};
            $skipped_count++;
            next;
        }
        my $current_job_config = $refreshed_jobs[0];

        # Check if job should run (matches schedule and hasn't run recently)
        # unless --bypass-schedule is set
        my $should_run = 1;
        my $skip_reason;

        unless ( $self->{bypass_schedule} ) {
            eval { $self->_validate_schedule_timing($current_job_config); };
            if ($@) {
                $should_run  = 0;
                $skip_reason = $@;
                chomp $skip_reason                                          if $skip_reason;
                print STDERR "DEBUG: Job $job_uuid skipped: $skip_reason\n" if $self->{debug};
            }
        }

        if ($should_run) {
            $eligible_count++;

            print "Running backup job: $job_name ($job_uuid)\n" unless $self->{debug};

            $logger->info(
                "Running eligible backup job",
                {
                    job_uuid => $job_uuid,
                    job_name => $job_name,
                }
            );

            # Run the job
            my $result = $self->run_backup_job($job_uuid);

            if ( $result == 0 ) {
                $success_count++;
                print "  ✓ Completed successfully\n" unless $self->{debug};
            }
            else {
                $failed_count++;
                push @failed_jobs, { uuid => $job_uuid, name => $job_name };
                print "  ✗ Failed\n" unless $self->{debug};
            }
        }
        else {
            $skipped_count++;
            print "Skipping job: $job_name - $skip_reason\n" unless $self->{debug};

            $logger->info(
                "Skipping job (schedule not matched)",
                {
                    job_uuid => $job_uuid,
                    job_name => $job_name,
                    reason   => $skip_reason,
                }
            );
        }

        # Brief pause between jobs to avoid overwhelming the system
        sleep 1 if @scheduled_jobs > 1;
    }

    # Print summary
    print "\n" . "=" x 70 . "\n";
    print "Backup Job Execution Summary\n";
    print "=" x 70 . "\n";
    print "Total scheduled jobs:  $total_jobs\n";
    print "Eligible to run:       $eligible_count\n";
    print "Successfully ran:      $success_count\n";
    print "Failed:                $failed_count\n";
    print "Skipped (not due):     $skipped_count\n";
    print "=" x 70 . "\n";

    if (@failed_jobs) {
        print "\nFailed jobs:\n";
        for my $failed (@failed_jobs) {
            print "  - $failed->{name} ($failed->{uuid})\n";
        }
        print "\n";
    }

    $logger->info(
        "Completed run_all_eligible_jobs",
        {
            total_jobs     => $total_jobs,
            eligible_count => $eligible_count,
            success_count  => $success_count,
            failed_count   => $failed_count,
            skipped_count  => $skipped_count,
        }
    );

    # Return failure if any jobs failed
    return $failed_count > 0 ? 1 : 0;
}

=head2 cancel_backup_job

Cancels a running backup job by creating a cancel file.

    $runner->cancel_backup_job($job_uuid);

=cut

sub cancel_backup_job ( $self, $job_uuid = undef ) {
    my $logger = $self->{logger};

    # Use stored job_uuid if not provided
    $job_uuid ||= $self->{job_uuid};
    die "No job UUID provided for cancellation" unless $job_uuid;

    my $cancel_file = Whostmgr::CometBackup::Constants::get_cancel_file_path($job_uuid);

    $logger->info(
        "Backup job cancellation requested",
        {
            job_uuid    => $job_uuid,
            cancel_file => $cancel_file,
        }
    );

    # Create cancel file
    Cpanel::FileUtils::TouchFile::touchfile($cancel_file);

    # Set interrupted flag
    $INTERRUPTED = 1;

    return;
}

=head2 _check_existing_runner

Checks if another backup runner process is already active.

=cut

sub _check_existing_runner ($self) {
    return unless -f $Whostmgr::CometBackup::Constants::PID_FILE;

    my $pid = Cpanel::LoadFile::loadfile($Whostmgr::CometBackup::Constants::PID_FILE);
    chomp $pid if $pid;
    return unless $pid && $pid =~ /^\d+$/;

    # Check if process is still running
    if ( kill 0, $pid ) {
        die "Another backup job is already running (PID: $pid). Allow the running job to complete or cancel it before starting a new one.\n";
    }

    # Stale PID file, remove it
    unlink $Whostmgr::CometBackup::Constants::PID_FILE;
    return;
}

=head2 _create_pid_file

Creates a PID file for the current runner process.

=cut

sub _create_pid_file ($self) {
    print STDERR "DEBUG: Creating PID file at: $Whostmgr::CometBackup::Constants::PID_FILE\n" if $self->{debug};

    my $pid_dir = "/var/run";
    Cpanel::Mkdir::ensure_directory_existence_and_mode( $pid_dir, 0755 ) unless -d $pid_dir;

    print STDERR "DEBUG: About to write PID $$ to file\n" if $self->{debug};

    # Use simple file writing
    open my $fh, '>', $Whostmgr::CometBackup::Constants::PID_FILE
      or die "Failed to create PID file $Whostmgr::CometBackup::Constants::PID_FILE: $!";
    print $fh "$$\n";
    close $fh;

    print STDERR "DEBUG: PID file written successfully\n" if $self->{debug};

    return;
}

=head2 _load_job_config

Loads backup job configuration from the database.

=cut

sub _load_job_config ( $self, $job_uuid ) {
    print STDERR "DEBUG: Loading job config for UUID: $job_uuid\n" if $self->{debug};

    my $jobs = Whostmgr::CometBackup::BackupJobs->new();

    print STDERR "DEBUG: BackupJobs object created\n" if $self->{debug};

    my @job_list = $jobs->get_backup_jobs( [$job_uuid] );

    print STDERR "DEBUG: Got " . scalar(@job_list) . " jobs from database\n" if $self->{debug};

    unless (@job_list) {
        die "Backup job not found: $job_uuid\n";
    }

    my $job_config = $job_list[0];

    print STDERR "DEBUG: Job config loaded, checking is_active field\n" if $self->{debug};

    # Validate job is active (checking is_active field from database)
    unless ( $job_config->{is_active} ) {
        die "Backup job is not active: $job_uuid\n";
    }

    # Validate schedule timing for scheduled jobs (unless run_now is set, or
    # the caller explicitly asked to bypass the schedule check — the cron helper
    # passes --bypass-schedule because it has already done its own scheduling
    # decision, and the run_backup_job API path sets run_now for manual runs).
    unless ( $self->{bypass_schedule} || $job_config->{run_now} ) {
        $self->_validate_schedule_timing($job_config);
    }

    return $job_config;
}

=head2 _validate_schedule_timing

Validates that a scheduled backup job should run now based on:
1. Current day/time matches the schedule configuration
2. Job has not run within the last 24 hours

Dies if the job should not run now.

=cut

sub _validate_schedule_timing ( $self, $job_config ) {
    my $logger        = $self->{logger};
    my $job_uuid      = $job_config->{job_id};
    my $schedule_type = $job_config->{schedule_type};

    print STDERR "DEBUG: Validating schedule timing for job $job_uuid\n" if $self->{debug};

    # Manual jobs can run anytime
    return if $schedule_type eq 'manual';

    # For scheduled jobs, validate timing
    return unless $schedule_type eq 'scheduled';

    my $schedule_info = $job_config->{schedule_info};
    unless ( $schedule_info && ref $schedule_info eq 'HASH' ) {
        die "Scheduled job missing schedule_info configuration: $job_uuid\n";
    }

    # FIRST: Validate current day matches schedule (most important check)
    my $frequency = $schedule_info->{frequency} || '';
    my $when      = $schedule_info->{when}      || [];

    print STDERR "DEBUG: Schedule - frequency: $frequency, when: " . ( ref $when eq 'ARRAY' ? join( ',', @$when ) : 'none' ) . "\n" if $self->{debug};

    # For daily with no restrictions, always allow
    return if $frequency eq 'daily' && !@$when;

    # For all other cases, verify current day matches schedule
    my $current_tp = Time::Piece->new();

    if ( $frequency eq 'weekly' || ( $frequency eq 'daily' && @$when ) ) {

        # Check if current day of week matches
        unless ( $self->_current_day_matches_schedule( $current_tp, $when ) ) {
            my $day_name = $current_tp->fullday;

            # Convert when array to readable format (handle both numbers and names)
            my @when_names = map {
                if (/^\d+$/) {
                    $self->_day_number_to_name($_);
                }
                else {
                    ucfirst( lc($_) );    # Just capitalize the day name
                }
            } @$when;

            $logger->info(
                "Backup job schedule does not match current day",
                {
                    job_uuid     => $job_uuid,
                    current_day  => $day_name,
                    scheduled_on => join( ', ', @when_names ),
                }
            );

            die sprintf(
                "Backup job is not scheduled to run on %s. Scheduled days: %s\n",
                $day_name,
                join( ', ', @when_names )
            );
        }
    }
    elsif ( $frequency eq 'monthly' ) {

        # Check if current day of month matches
        # Handle edge case: if scheduled day > days in current month, run on last day of month
        my $current_day_of_month = $current_tp->mday;
        my $last_day_of_month    = $self->_get_last_day_of_month($current_tp);

        print STDERR "DEBUG: Monthly check - current day: $current_day_of_month, last day of month: $last_day_of_month\n" if $self->{debug};

        # Check if any scheduled day matches current day OR if we're on last day and a scheduled day exceeds month length
        my $should_run = 0;

        for my $scheduled_day (@$when) {
            if ( $scheduled_day == $current_day_of_month ) {

                # Exact match
                $should_run = 1;
                print STDERR "DEBUG: Exact match - scheduled day $scheduled_day matches current day $current_day_of_month\n" if $self->{debug};
                last;
            }
            elsif ( $scheduled_day > $last_day_of_month && $current_day_of_month == $last_day_of_month ) {

                # Scheduled day doesn't exist in this month, run on last day
                # Example: scheduled for 31st, but it's February with 28 days, run on 28th
                $should_run = 1;
                print STDERR "DEBUG: Overflow match - scheduled day $scheduled_day > last day $last_day_of_month, running on last day\n" if $self->{debug};
                $logger->info(
                    "Monthly backup scheduled day exceeds month length, running on last day of month",
                    {
                        job_uuid          => $job_uuid,
                        scheduled_day     => $scheduled_day,
                        last_day_of_month => $last_day_of_month,
                        current_day       => $current_day_of_month,
                    }
                );
                last;
            }
        }

        unless ($should_run) {
            $logger->info(
                "Backup job schedule does not match current day of month",
                {
                    job_uuid             => $job_uuid,
                    current_day_of_month => $current_day_of_month,
                    last_day_of_month    => $last_day_of_month,
                    scheduled_days       => join( ', ', @$when ),
                }
            );

            die sprintf(
                "Backup job is not scheduled to run on day %d of the month. Scheduled days: %s\n",
                $current_day_of_month,
                join( ', ', @$when )
            );
        }
    }

    # SECOND: Check if job has run recently (within last 24 hours)
    # Only enforce this if the job has actually run before (last_run is set in job config)
    # The BackupJobs module returns 'last_run' (not 'last_run_timestamp') which defaults
    # to created_timestamp if the job has never run. We need to distinguish between
    # a job that has never run vs. one that ran at creation time.

    # We need to check the database directly to get the raw last_run_timestamp value
    # because BackupJobs->get_backup_jobs() transforms it to 'last_run' and defaults to created_timestamp
    my $job_uuid_for_check = $job_config->{job_id};
    my $dbh                = Whostmgr::CometBackup::BackupJobs->new()->_get_database_handle();
    my $timestamp_sth      = $dbh->prepare('SELECT last_run_timestamp, created_timestamp FROM backup_jobs WHERE uuid = ?');
    $timestamp_sth->execute($job_uuid_for_check);
    my ( $last_run_timestamp, $created_timestamp ) = $timestamp_sth->fetchrow_array();
    $timestamp_sth->finish();

    print STDERR "DEBUG: last_run_timestamp=$last_run_timestamp, created_timestamp=$created_timestamp\n" if $self->{debug};

    # Only check 24-hour cooldown if job has actually run (last_run_timestamp is set and != created_timestamp)
    if ( $last_run_timestamp && $last_run_timestamp != $created_timestamp ) {
        my $now                 = time();
        my $time_since_last_run = $now - $last_run_timestamp;

        if ( $time_since_last_run < 86400 ) {    # 86400 seconds = 24 hours
            my $hours_since        = int( $time_since_last_run / 3600 );
            my $next_eligible_time = localtime( $last_run_timestamp + 86400 );

            $logger->info(
                "Backup job has already run within the last 24 hours",
                {
                    job_uuid           => $job_uuid,
                    last_run           => scalar( localtime($last_run_timestamp) ),
                    hours_since        => $hours_since,
                    next_eligible_time => "$next_eligible_time",
                }
            );

            die sprintf(
                "Backup job has already run %d hours ago. Next eligible run: %s\n",
                $hours_since,
                $next_eligible_time
            );
        }
    }

    # If we get here, the schedule matches and cooldown is satisfied
    $logger->info(
        "Schedule validation passed",
        {
            job_uuid  => $job_uuid,
            frequency => $frequency,
        }
    );

    return;
}

=head2 _current_day_matches_schedule

Helper to check if current day of week matches the schedule.

=cut

sub _current_day_matches_schedule ( $self, $current_tp, $allowed_days ) {
    return 1 unless @$allowed_days;    # No restrictions

    # Convert Time::Piece to our day format
    # Time::Piece day_of_week: 1=Monday, 2=Tuesday, ..., 7=Sunday (perfect for our format!)
    # Our format: 1=Monday, 2=Tuesday, ..., 7=Sunday
    my $our_wday = $current_tp->day_of_week;    # Use day_of_week instead of wday

    # Convert allowed_days to numeric format if they contain day names
    my @numeric_allowed_days;
    for my $allowed_day (@$allowed_days) {
        if ( $allowed_day =~ /^\d+$/ ) {

            # Already numeric
            push @numeric_allowed_days, $allowed_day;
        }
        else {

            # Convert day name to number
            my $numeric_day = $self->_day_name_to_number($allowed_day);
            push @numeric_allowed_days, $numeric_day if defined $numeric_day;
        }
    }

    return grep { $_ == $our_wday } @numeric_allowed_days;
}

=head2 _day_name_to_number

Converts day name to number (1=Monday, 7=Sunday).

=cut

sub _day_name_to_number ( $self, $day_name ) {
    my %day_map = (
        'monday'    => 1,
        'tuesday'   => 2,
        'wednesday' => 3,
        'thursday'  => 4,
        'friday'    => 5,
        'saturday'  => 6,
        'sunday'    => 7,
    );

    my $normalized_name = lc($day_name);
    return $day_map{$normalized_name};
}

=head2 _day_number_to_name

Converts day number to name (1=Monday, 7=Sunday).

=cut

sub _day_number_to_name ( $self, $day_number ) {
    my %number_to_name = (
        1 => 'Monday',
        2 => 'Tuesday',
        3 => 'Wednesday',
        4 => 'Thursday',
        5 => 'Friday',
        6 => 'Saturday',
        7 => 'Sunday',
    );

    return $number_to_name{$day_number} || "Day $day_number";
}

=head2 _get_last_day_of_month

Gets the last day of the month for a given Time::Piece object.

=cut

sub _get_last_day_of_month ( $self, $time_piece ) {
    my $year  = $time_piece->year;
    my $month = $time_piece->mon;

    # Create first day of next month
    my $next_month = $month + 1;
    my $next_year  = $year;

    if ( $next_month > 12 ) {
        $next_month = 1;
        $next_year++;
    }

    # Get first day of next month and subtract one day
    my $first_of_next_month = Time::Piece->strptime(
        sprintf( "%04d-%02d-01 00:00:00", $next_year, $next_month ),
        "%Y-%m-%d %H:%M:%S"
    );

    my $last_of_current_month = $first_of_next_month - Time::Seconds::ONE_DAY;

    return $last_of_current_month->mday;
}

=head2 _execute_backup_job

Executes the main backup job logic.

=cut

sub _execute_backup_job ( $self, $job_config ) {
    my $logger   = $self->{logger};
    my $job_uuid = $job_config->{job_id};    # Use job_id instead of uuid

    print STDERR "DEBUG: Inside _execute_backup_job\n" if $self->{debug};

    if ( $self->{dryrun} ) {
        $logger->info(
            "DRYRUN MODE: Simulating backup job execution without making API calls",
            {
                job_uuid => $job_uuid,
                name     => $job_config->{name},
            }
        );
        print STDERR "DRYRUN MODE: No actual API calls will be made to Comet Backup server\n";
    }

    $logger->info(
        "Executing backup job",
        {
            job_uuid     => $job_uuid,
            name         => $job_config->{name},
            all_accounts => $job_config->{all_accounts},
            dryrun       => $self->{dryrun} ? 'true' : 'false',
        }
    );

    # Reset protected items for this job back to active status
    # This allows items that were marked completed/failed in previous runs to be backed up again
    print STDERR "DEBUG: Resetting protected items status to active for job\n" if $self->{debug};
    $self->_reset_protected_items_for_job($job_uuid);

    # Clean up any stale pkgacct lock files from previous cancelled runs
    # This prevents the new run from getting stuck waiting for locks that will never be released
    print STDERR "DEBUG: Cleaning up stale pkgacct locks\n" if $self->{debug};
    $self->_cleanup_stale_pkgacct_locks($job_uuid);

    print STDERR "DEBUG: About to get users to backup\n" if $self->{debug};

    # Get list of users to backup
    my @users = $self->_get_users_to_backup($job_config);

    print STDERR "DEBUG: Got users list, count: " . scalar(@users) . "\n" if $self->{debug};

    unless (@users) {
        my $error_msg = "Cannot execute backup job: no cPanel user accounts found on the server for this backup job";
        $logger->error( $error_msg, { job_uuid => $job_uuid } );
        die $error_msg;
    }

    print STDERR "DEBUG: Got users list, count: " . scalar(@users) . "\n" if $self->{debug};

    unless (@users) {
        my $error_msg = "Cannot execute backup job: no cPanel user accounts found on the server for this backup job";
        $logger->error( $error_msg, { job_uuid => $job_uuid } );
        die $error_msg;
    }

    print STDERR "DEBUG: Got users list, count: " . scalar(@users) . "\n" if $self->{debug};

    unless (@users) {
        my $error_msg = "Cannot execute backup job: no cPanel user accounts found on the server for this backup job";
        $logger->error( $error_msg, { job_uuid => $job_uuid } );
        die $error_msg;
    }

    # Runtime self-healing: ensure selected users have required protected items
    # even when the job uses an explicit accounts list.
    print STDERR "DEBUG: Checking protected items for selected users\n" if $self->{debug};
    $self->_ensure_protected_items_for_all_users( $job_config, \@users );

    $logger->info(
        "Found users to backup",
        {
            job_uuid   => $job_uuid,
            user_count => scalar(@users),
            users      => \@users,
        }
    );

    # Process users with parallelism and load monitoring
    $self->_process_users_parallel( \@users, $job_config );

    return;
}

=head2 _get_users_to_backup

Determines which users need to be backed up based on job configuration.

=cut

sub _get_users_to_backup ( $self, $job_config ) {
    print STDERR "DEBUG: Inside _get_users_to_backup\n" if $self->{debug};
    my $logger = $self->{logger};

    my @users;

    if ( $job_config->{all_accounts} ) {
        print STDERR "DEBUG: Getting all cPanel users\n" if $self->{debug};

        # Get all cPanel users
        @users = Cpanel::Config::Users::getcpusers();
        print STDERR "DEBUG: Got " . scalar(@users) . " total users from getcpusers\n" if $self->{debug};
    }
    else {
        print STDERR "DEBUG: Using specified accounts\n" if $self->{debug};

        # Use specified accounts
        @users = @{ $job_config->{accounts} || [] };
        print STDERR "DEBUG: Got " . scalar(@users) . " users from job config\n" if $self->{debug};

        # Explicit account lists can contain stale/manual usernames.
        # Keep only users that exist on this server.
        my %existing_users   = map { $_ => 1 } Cpanel::Config::Users::getcpusers();
        my @configured_users = @users;
        @users = grep { $existing_users{$_} } @users;

        my @missing_users = grep { !$existing_users{$_} } @configured_users;
        if (@missing_users) {
            $logger->warn(
                "Skipping configured users that do not exist on this server",
                {
                    job_uuid      => $job_config->{job_id},
                    missing_users => \@missing_users,
                }
            );
        }
    }

    print STDERR "DEBUG: About to filter suspended accounts\n" if $self->{debug};

    # Filter out suspended accounts if needed
    # (This could be configurable in the future)
    @users = grep { !Cpanel::AcctUtils::Suspended::is_suspended($_) } @users;

    print STDERR "DEBUG: After filtering suspended accounts: " . scalar(@users) . " users\n" if $self->{debug};

    return @users;
}

=head2 _process_users_parallel

Processes users for backup with parallelism and load monitoring.

=cut

sub _process_users_parallel ( $self, $users, $job_config ) {
    my $logger    = $self->{logger};
    my @user_list = @$users;
    my %active_forks;
    my $completed_users = 0;
    my $total_users     = scalar(@user_list);

    # Set up a pipe so child processes can propagate destination-blocklist
    # updates back to the parent. Without this the blocklist would be
    # per-user (each child mutates its own copy of $self->{blocked_destinations}
    # post-fork) instead of run-wide. Children write "dest_id\treason\n"
    # lines through $self->{blocklist_writer}; the parent reads from
    # $self->{blocklist_reader} (non-blocking) at the top of each outer-loop
    # iteration and merges entries into $self->{blocked_destinations} BEFORE
    # forking the next user. Subsequent forks inherit the merged blocklist.
    #
    # Lines are short ("guid\tdestination_quota_exhausted\n", ~50-80 bytes)
    # and the Linux pipe atomicity guarantee for writes <= PIPE_BUF (4096
    # bytes) means concurrent children cannot interleave bytes within a
    # line. If pipe() fails we log and fall back to the in-memory-only
    # (per-user) blocklist behavior so the run still completes.
    $self->_open_blocklist_pipe();

    print STDERR "DEBUG: Starting parallel processing with $total_users users\n" if $self->{debug};

    $logger->info(
        "Starting parallel user processing",
        {
            total_users  => $total_users,
            max_parallel => $Whostmgr::CometBackup::Constants::MAX_PARALLEL_USERS,
        }
    );

    my $offline_logged      = 0;
    my $offline_detected_at = 0;
    my $offline_timeout     = 300;    # 5 minutes, same as pool-level timeout

    while ( @user_list || %active_forks ) {

        # Drain any blocklist updates from children that completed (or
        # reported quota mid-run) since the last iteration. Must happen
        # BEFORE forking the next user so the new child inherits the
        # current blocklist.
        $self->_drain_blocklist_pipe();

        # Check for interruption
        my $cancel_file = Whostmgr::CometBackup::Constants::get_cancel_file_path( $self->{job_uuid} );
        if ( $INTERRUPTED || -f $cancel_file ) {
            print STDERR "DEBUG: Cancellation detected\n" if $self->{debug};
            $logger->info(
                "Backup cancellation detected, stopping",
                {
                    job_uuid    => $self->{job_uuid},
                    cancel_file => $cancel_file,
                }
            );
            $self->_terminate_active_processes( \%active_forks );
            last;
        }

        # Sample system free memory at the parent level too. Children also
        # check inside _manage_backup_pool, but they each set run_fatal_reason
        # only in their own forked process — the parent never sees it. Without
        # a parent-level check, the parent reaps a child that aborted on low
        # memory and then immediately forks another user, even though the
        # condition that killed the first one is still present.
        $self->_check_free_memory();

        # Run-fatal gate at the parent level: stop forking new user backups
        # and let any active forks drain. Mirrors the per-user gate in
        # _manage_backup_pool but operates on the user fork list instead of
        # the per-user item queue.
        if ( $self->{run_fatal_reason} ) {
            $logger->error(
                "Run-fatal condition detected at parent level, stopping new user dispatch",
                {
                    run_fatal_reason => $self->{run_fatal_reason},
                    pending_users    => scalar(@user_list),
                    active_forks     => scalar( keys %active_forks ),
                }
            );
            @user_list = ();    # drop queue; let active forks finish
            last if !%active_forks;
        }

        # Check system load
        my $load_avg = $self->_get_load_average();
        if ( $load_avg > $Whostmgr::CometBackup::Constants::MAX_LOAD_AVERAGE ) {
            $logger->warn( "System load too high, waiting", { load_average => $load_avg } );
            sleep 10;
            next;
        }

        # Proactive connection gate: verify the device is online before
        # dispatching more children.  This warms the cache so each child
        # inherits a fresh connection ID and prevents wasting forks when
        # the agent is down.  The run-fatal gate above already empties
        # @user_list, so no separate $self->{run_fatal_reason} check is
        # needed at this layer.
        my $device_is_online = $self->_get_cached_connection_id();

        if ( $device_is_online && @user_list ) {
            $offline_logged      = 0;    # Reset so we log again if it drops later
            $offline_detected_at = 0;

            while ( @user_list && keys(%active_forks) < $Whostmgr::CometBackup::Constants::MAX_PARALLEL_USERS ) {
                my $user = shift @user_list;
                my $pid  = $self->_fork_user_backup( $user, $job_config );

                if ($pid) {
                    $active_forks{$pid} = {
                        user       => $user,
                        start_time => Time::HiRes::time(),
                    };
                    $logger->debug(
                        "Started user backup process",
                        {
                            user => $user,
                            pid  => $pid,
                        }
                    );

                    # Brief delay to avoid database contention between forks
                    select( undef, undef, undef, 0.2 );
                }
            }
        }
        elsif ( !$device_is_online && @user_list ) {

            # Device is unreachable but we still have users queued.
            # Don't fork — let the outer loop reap children and sleep 1s,
            # then the next iteration will re-check via the cache (60s TTL).
            $offline_detected_at ||= time();

            if ( !$offline_logged ) {
                my $remaining_secs = $offline_timeout - ( time() - $offline_detected_at );
                $logger->warn(
                    "Device appears offline - will fail remaining users in ${remaining_secs}s if it stays offline",
                    {
                        users_remaining => scalar(@user_list),
                        active_forks    => scalar( keys %active_forks ),
                        timeout_secs    => $offline_timeout,
                    }
                );
                $offline_logged = 1;
            }

            # If the device stays offline beyond the limit, fail the remaining
            # users so the job completes rather than waiting indefinitely.
            if ( time() - $offline_detected_at >= $offline_timeout ) {
                my $offline_secs = time() - $offline_detected_at;
                $logger->warn(
                    "Device offline timeout reached; failing remaining users",
                    {
                        offline_secs    => $offline_secs,
                        timeout_secs    => $offline_timeout,
                        users_remaining => scalar(@user_list),
                    }
                );
                for my $user (@user_list) {
                    my $user_run_id = eval { $self->_log_user_start( $self->{job_uuid}, $user ) };
                    if ( $@ || !$user_run_id ) {
                        $logger->warn( "Could not create run record for skipped user", { user => $user, error => $@ } );
                        next;
                    }
                    $self->_log_user_complete(
                        $user_run_id, 'failed',
                        "Device offline for ${offline_secs}s; user backup could not be dispatched",
                    );
                }
                @user_list = ();
            }
        }

        # When @user_list is empty we're just draining active forks — no gate needed.

        # Check for completed processes
        print STDERR "DEBUG: About to reap completed processes\n" if $self->{debug};
        my $reaped = $self->_reap_completed_processes( \%active_forks );
        print STDERR "DEBUG: Reaped $reaped processes\n" if $self->{debug};
        $completed_users += $reaped;

        if ( $reaped > 0 ) {
            $logger->info(
                "Backup progress",
                {
                    completed => $completed_users,
                    total     => $total_users,
                    active    => scalar( keys %active_forks ),
                    remaining => scalar(@user_list),
                }
            );
        }

        # Brief pause to avoid busy waiting
        sleep 1;
    }

    # Final drain to capture anything written by the last children before
    # they exited. Children may write to the pipe right before exit, after
    # the most recent parent drain.
    $self->_drain_blocklist_pipe();
    $self->_close_blocklist_pipe();

    $logger->info(
        "Parallel user processing completed",
        {
            completed_users => $completed_users,
            total_users     => $total_users,
        }
    );

    return;
}

=head2 _fork_user_backup

Forks a child process to backup a specific user.

=cut

sub _fork_user_backup ( $self, $user, $job_config ) {
    print STDERR "DEBUG: About to fork for user $user\n" if $self->{debug};

    my $pid = fork();

    if ( !defined $pid ) {
        print STDERR "DEBUG: Fork failed for user $user: $!\n" if $self->{debug};
        $self->{logger}->error(
            "Failed to fork for user backup",
            {
                user  => $user,
                error => $!,
            }
        );
        return;
    }

    if ( $pid == 0 ) {

        # Child process — drop the parent's cached HTTP connection so the
        # child establishes its own.
        Whostmgr::CometBackup::CometBackupAPI::_reset_ua();
        print STDERR "DEBUG: Child process started for user $user, PID $$\n" if $self->{debug};

        # The blocklist pipe is parent-reads / children-write. Close the
        # child's inherited copy of the reader so only the parent holds it.
        # The writer stays open so this child can report blocked
        # destinations back through _record_quota_exhausted_destinations.
        if ( $self->{blocklist_reader} ) {
            close $self->{blocklist_reader};
            delete $self->{blocklist_reader};
        }

        $self->_execute_user_backup( $user, $job_config );

        # Flush and close the writer so the parent sees any final writes
        # and so the kernel can reclaim the FD before exit(0).
        if ( $self->{blocklist_writer} ) {
            close $self->{blocklist_writer};
            delete $self->{blocklist_writer};
        }

        print STDERR "DEBUG: Child process exiting for user $user, PID $$\n" if $self->{debug};
        exit 0;
    }

    # Parent process
    print STDERR "DEBUG: Forked child process $pid for user $user\n" if $self->{debug};
    return $pid;
}

=head2 _execute_user_backup

Executes backup for a specific user.

=cut

sub _execute_user_backup ( $self, $user, $job_config ) {
    my $logger   = $self->{logger};
    my $job_uuid = $job_config->{job_id};    # Use job_id instead of uuid

    print STDERR "DEBUG: Child: Starting backup for user $user (PID $$)\n" if $self->{debug};

    # Log user backup start
    print STDERR "DEBUG: Child: About to log user start for $user\n" if $self->{debug};
    my $user_run_id = $self->_log_user_start( $job_uuid, $user );
    print STDERR "DEBUG: Child: Got user_run_id $user_run_id for $user\n" if $self->{debug};

    eval {
        $logger->info(
            "Starting user backup",
            {
                job_uuid => $job_uuid,
                user     => $user,
            }
        );

        print STDERR "DEBUG: Child: About to get protected items for $user\n" if $self->{debug};

        # Get protected items for this user
        my $protected_items = $self->_get_user_protected_items( $user, $job_config );

        print STDERR "DEBUG: Child: Got " . scalar(@$protected_items) . " protected items for $user\n" if $self->{debug};

        # DEFENSE: Early exit if no valid protected items
        # This prevents the child process from hanging when all items were filtered out
        unless (@$protected_items) {
            $logger->info(
                "No protected items to backup for user, skipping",
                {
                    job_uuid => $job_uuid,
                    user     => $user,
                }
            );
            print STDERR "DEBUG: Child: No protected items to backup for $user, exiting early\n" if $self->{debug};

            # Mark as completed since there's nothing to do
            $self->_log_user_complete( $user_run_id, 'completed' );
            return;
        }

        # Use pool-based backup management instead of simple loop
        print STDERR "DEBUG: Child: About to call _manage_backup_pool for $user with " . scalar(@$protected_items) . " items\n" if $self->{debug};
        $self->_manage_backup_pool( $user, $protected_items, $user_run_id, $job_config );
        print STDERR "DEBUG: Child: Returned from _manage_backup_pool for $user\n" if $self->{debug};

        print STDERR "DEBUG: Child: About to complete user log for $user\n" if $self->{debug};

        $self->_log_user_complete( $user_run_id, 'completed' );

        print STDERR "DEBUG: Child: User log completed for $user\n" if $self->{debug};

        $logger->info(
            "User backup completed",
            {
                job_uuid => $job_uuid,
                user     => $user,
            }
        );

        print STDERR "DEBUG: Child: User backup fully completed for $user\n" if $self->{debug};
    };

    if ( my $error = $@ ) {
        $logger->error(
            "User backup failed",
            {
                job_uuid => $job_uuid,
                user     => $user,
                error    => $error,
            }
        );
        $self->_log_user_complete( $user_run_id, 'failed', $error );
    }

    return;
}

=head2 _get_user_protected_items

Gets the list of protected items to backup for a user.

=cut

sub _get_user_protected_items ( $self, $user, $job_config ) {
    my $protected_items_map = Whostmgr::CometBackup::ProtectedItemMap->new();

    print STDERR "DEBUG: Getting existing protected items for user $user\n" if $self->{debug};

    # Get the list of protected items for this user and job that were created during job setup
    my @items = $protected_items_map->get_protected_items_for_user(
        system_user => $user,
        job_uuid    => $job_config->{job_id},
    );

    print STDERR "DEBUG: Got " . scalar(@items) . " protected items for $user (before filtering)\n" if $self->{debug};

    # DEFENSE: Filter out items that are not active
    # This prevents processing disabled items if a reset failed or there was a race condition
    my @valid_items = grep {
        my $status = $_->{status} || 'active';
        $status eq 'active'       || $status eq '';
    } @items;

    if ( scalar(@valid_items) != scalar(@items) ) {
        my $logger         = $self->{logger};
        my $filtered_count = scalar(@items) - scalar(@valid_items);
        $logger->warn(
            "Filtered out protected items with non-active status",
            {
                user           => $user,
                job_uuid       => $job_config->{job_id},
                total_items    => scalar(@items),
                valid_items    => scalar(@valid_items),
                filtered_count => $filtered_count,
            }
        );
        print STDERR "DEBUG: Filtered out $filtered_count items with invalid status for $user\n" if $self->{debug};
    }

    print STDERR "DEBUG: Got " . scalar(@valid_items) . " valid protected items for $user (after filtering)\n" if $self->{debug};

    unless (@valid_items) {
        my $logger = $self->{logger};
        $logger->warn(
            "No valid Protected Items found for user in job",
            {
                user     => $user,
                job_uuid => $job_config->{job_id},
            }
        );
        return [];
    }

    return \@valid_items;
}

=head2 _reset_protected_items_for_job

Resets all protected items for a job back to 'active' status at the start of a backup run.

This allows items that were marked as completed, failed, timeout, or partial in previous
runs to be backed up again in the new run.

=cut

sub _reset_protected_items_for_job ( $self, $job_uuid ) {
    my $logger = $self->{logger};

    require Whostmgr::CometBackup::ProtectedItemMap;
    my $map = Whostmgr::CometBackup::ProtectedItemMap->new();

    # Get all protected items for this job regardless of status
    my @all_items = $map->get_protected_items_for_job(
        job_uuid => $job_uuid,
    );

    my $reset_count = 0;
    for my $item (@all_items) {
        my $current_status = $item->{status} || '';

        # Reset items that are not disabled so they can run again
        # Disabled items stay disabled
        if ( $current_status ne 'disabled' ) {
            $map->update_protected_item_status(
                protected_item_uid => $item->{protected_item_uid},
                status             => 'active',
            );
            $reset_count++;
        }
    }

    if ( $reset_count > 0 ) {
        $logger->info(
            "Reset protected items to active status for new backup run",
            {
                job_uuid    => $job_uuid,
                reset_count => $reset_count,
                total_items => scalar(@all_items),
            }
        );
    }

    return $reset_count;
}

=head2 _cleanup_stale_pkgacct_locks

Cleans up stale pkgacct lock files from previous runs of this job.

When a backup job is cancelled, the pkgacct lock files may not be cleaned up properly.
This method removes all pkgacct lock files for users in this job to ensure a fresh start.

=cut

sub _cleanup_stale_pkgacct_locks ( $self, $job_uuid ) {
    my $logger = $self->{logger};

    require Whostmgr::CometBackup::Constants;
    my $lock_dir = $Whostmgr::CometBackup::Constants::PKGACCT_LOCK_DIR;

    unless ( -d $lock_dir ) {

        # Lock directory doesn't exist, nothing to clean up
        print STDERR "DEBUG: Lock directory $lock_dir does not exist, nothing to clean\n" if $self->{debug};
        return 0;
    }

    # Get all lock files in the directory (format: $system_user.lock)
    opendir( my $dh, $lock_dir ) or do {
        $logger->warn("Could not open pkgacct lock directory: $!");
        return 0;
    };

    my @lock_files = grep { /\.lock$/ && -f "$lock_dir/$_" } readdir($dh);
    closedir($dh);

    if ( !@lock_files ) {
        print STDERR "DEBUG: No stale lock files found in $lock_dir\n" if $self->{debug};
        return 0;
    }

    print STDERR "DEBUG: Found " . scalar(@lock_files) . " lock files to clean up\n" if $self->{debug};

    my $cleaned_count = 0;
    for my $lock_file (@lock_files) {
        my $full_path = "$lock_dir/$lock_file";

        # Guard against symlink race (TOCTOU): verify path is still a regular
        # file immediately before removal.
        if ( -l $full_path || !-f $full_path ) {
            $logger->warn("Skipping non-regular file during lock cleanup: $full_path");
            next;
        }

        print STDERR "DEBUG: Removing stale lock file: $full_path\n" if $self->{debug};

        if ( unlink $full_path ) {
            $cleaned_count++;
        }
        else {
            $logger->warn("Failed to remove stale lock file $full_path: $!");
        }
    }

    if ( $cleaned_count > 0 ) {
        $logger->info(
            "Cleaned up stale pkgacct lock files",
            {
                job_uuid      => $job_uuid,
                cleaned_count => $cleaned_count,
            }
        );
        print STDERR "DEBUG: Cleaned up $cleaned_count stale pkgacct lock files\n" if $self->{debug};
    }

    return $cleaned_count;
}

=head2 _get_cached_source_ids

Gets Source IDs from Comet server with in-memory caching for the duration of the backup job.

This avoids repeatedly calling the expensive get-user-profile-and-hash API.

=cut

sub _get_cached_source_ids ($self) {

    # Return cached value if available
    return $self->{api_cache}{source_ids} if exists $self->{api_cache}{source_ids};

    my $logger = $self->{logger};
    $logger->debug("Fetching and caching Source IDs from Comet Backup server");

    # Fetch from API
    require Whostmgr::CometBackup::CometBackupAPI;
    my $source_ids = Whostmgr::CometBackup::CometBackupAPI::get_source_ids_from_profile();

    # Cache for the duration of this backup job
    $self->{api_cache}{source_ids} = $source_ids;

    $logger->info(
        "Cached Source IDs for backup job",
        {
            count => $source_ids ? scalar(@$source_ids) : 0,
        }
    );

    return $source_ids;
}

=head2 _get_cached_connection_id

Gets active connection ID from Comet dispatcher with in-memory caching.

Connection IDs change when the device reconnects, so we cache them with a
short TTL (60 seconds) to balance API call frequency against staleness risk.
Any connection error should call C<_invalidate_connection_cache> to force an
immediate re-fetch on the next call.

Both positive (connection ID string) and negative (C<undef> / device offline)
results are cached.  Caching negative results prevents hammering
C<dispatcher/list-active> every loop iteration during an outage.

=cut

sub _get_cached_connection_id ($self) {
    my $now    = time();
    my $cache  = $self->{api_cache}{connection_id} || {};
    my $logger = $self->{logger};

    # Cache TTL: 60 seconds.  Short enough to detect reconnects quickly,
    # long enough to avoid hammering the dispatcher/list-active endpoint.
    # IMPORTANT: We cache both positive (connection ID) and negative (undef/offline)
    # results to prevent API calls every loop iteration when the device is down.
    my $cache_ttl = 60;

    # Return cached result if still fresh (includes cached "offline" state)
    if ( $cache->{cached_at} && ( $now - $cache->{cached_at} ) < $cache_ttl ) {
        return $cache->{id};    # May be undef if device was offline at cache time
    }

    # Call the API to check connection status.
    require Whostmgr::CometBackup::CometBackupAPI;
    my $connection_id = Whostmgr::CometBackup::CometBackupAPI::_get_user_connection_id();

    # Cache the result (positive or negative)
    $self->{api_cache}{connection_id} = {
        id        => $connection_id,
        cached_at => $now,
    };

    if ($connection_id) {
        $logger->info(
            "Connection ID cached",
            {
                connection_id => $connection_id,
                ttl_seconds   => $cache_ttl,
            }
        );
    }
    else {
        $logger->info("Device offline - negative result cached for ${cache_ttl}s");
    }

    return $connection_id;
}

=head2 _invalidate_connection_cache

Clears the cached connection ID so the next call to C<_get_cached_connection_id>
fetches a fresh value from the Comet dispatcher.  Call this whenever an API
operation fails with a connection-related error (e.g. "Connection ID invalid",
"No active connections found") to avoid retrying with a stale ID.

=cut

sub _invalidate_connection_cache ($self) {
    my $had_cache = exists $self->{api_cache}{connection_id} && $self->{api_cache}{connection_id}{id};
    delete $self->{api_cache}{connection_id};
    $self->{logger}->info("Connection ID cache invalidated due to connection error") if $had_cache;
    return;
}

=head2 _backup_protected_item

Starts a backup job for a protected item using Comet Backup APIs.
Returns job information for polling, does not wait for completion.

=cut

sub _backup_protected_item ( $self, $user, $item, $job_config ) {
    my $logger   = $self->{logger};
    my $item_uid = $item->{protected_item_uid};

    # Safety check: Never backup disabled Protected Items
    if ( $item->{status} && $item->{status} eq 'disabled' ) {
        $logger->warn(
            "Skipping disabled Protected Item",
            {
                user     => $user,
                item_uid => $item_uid,
                status   => $item->{status},
            }
        );
        print STDERR "DEBUG: Child: Skipping disabled Protected Item $item_uid for $user\n" if $self->{debug};
        return {
            success       => 0,
            error_message => "Protected Item is disabled",
            skipped       => 1,
        };
    }

    $logger->info(
        "Starting backup of Protected Item",
        {
            user      => $user,
            item_uid  => $item_uid,
            item_type => $item->{item_type},
        }
    );

    print STDERR "DEBUG: Child: Backing up Protected Item $item_uid for $user\n" if $self->{debug};

    my $backup_result;

    if ( $self->{dryrun} ) {
        $logger->info(
            "DRYRUN: Would call backup_protected_item API",
            {
                user      => $user,
                item_uid  => $item_uid,
                item_type => $item->{item_type},
            }
        );

        # Return simulated successful result for dryrun mode
        $backup_result = {
            success     => 1,
            backup_jobs => [
                {
                    success        => 1,
                    destination_id => 'dryrun_destination_1',
                    backup_job_id  => 'dryrun_job_' . time() . '_' . rand(1000),
                },
            ],
        };

        print STDERR "DEBUG: Child: DRYRUN mode - simulated backup result for $item_uid\n" if $self->{debug};
    }
    else {
        require Whostmgr::CometBackup::CometBackupAPI;

        print STDERR "DEBUG: Child: About to call backup_protected_item API for $item_uid\n" if $self->{debug};

        # Get cached source IDs once; connection ID is fetched fresh on each attempt inside the closure
        my $cached_source_ids = $self->_get_cached_source_ids();

        # Get destination IDs from job configuration
        # The job config contains the list of destinations to use for backup
        my $destinations    = $job_config->{destinations} || [];
        my @destination_ids = ();

        # Extract all destination IDs from the job configuration
        # Each protected item should be backed up to ALL configured destinations
        if ( ref $destinations eq 'ARRAY' && @$destinations ) {
            for my $dest (@$destinations) {
                my $dest_id = ref $dest eq 'HASH' ? $dest->{id} : $dest;
                push @destination_ids, $dest_id if $dest_id;
            }
        }

        # Filter out disabled destinations
        # Disabled destinations are skipped for backups but remain available for restores
        if (@destination_ids) {
            eval {
                require Whostmgr::CometBackup::BackupJobs;
                my $jobs         = Whostmgr::CometBackup::BackupJobs->new();
                my %disabled_ids = map { $_ => 1 } $jobs->get_disabled_destination_ids();

                if (%disabled_ids) {
                    my @active_ids;
                    for my $dest_id (@destination_ids) {
                        if ( $disabled_ids{$dest_id} ) {
                            $logger->info(
                                "Skipping disabled destination for backup",
                                {
                                    destination_id => $dest_id,
                                    item_uid       => $item_uid,
                                    user           => $user,
                                }
                            );
                        }
                        else {
                            push @active_ids, $dest_id;
                        }
                    }
                    @destination_ids = @active_ids;
                }
            };
            if ($@) {
                $logger->warn("Failed to check disabled destinations, proceeding with all: $@");
            }
        }

        # Filter out destinations blocked for the remainder of this run.
        # A destination lands on the blocklist when an earlier dispatch in this
        # run returned it in quota_destinations. Per-run only; not persisted.
        if ( @destination_ids && %{ $self->{blocked_destinations} || {} } ) {
            my @still_eligible;
            for my $dest_id (@destination_ids) {
                if ( $self->{blocked_destinations}->{$dest_id} ) {
                    $logger->info(
                        "Skipping blocked destination for backup",
                        {
                            destination_id => $dest_id,
                            reason         => $self->{blocked_destinations}->{$dest_id},
                            item_uid       => $item_uid,
                            user           => $user,
                        }
                    );
                }
                else {
                    push @still_eligible, $dest_id;
                }
            }
            @destination_ids = @still_eligible;
        }

        # If all destinations are disabled or blocked, skip this protected item
        unless (@destination_ids) {
            $logger->warn(
                "All destinations for this backup job are disabled or blocked - skipping backup",
                {
                    item_uid  => $item_uid,
                    item_type => $item->{item_type},
                    user      => $user,
                }
            );

            return {
                success       => 0,
                backup_jobs   => [],
                error_message => 'No eligible destinations (all disabled or blocked for this run)',
                skipped       => 1,
            };
        }

        print STDERR "DEBUG: Child: Using destination_ids: " . join( ', ', @destination_ids ) . " for Protected Item $item_uid\n" if $self->{debug};

        # For pkgacct-based items with multiple destinations, we must run sequentially
        # to avoid directory collisions in pkgacct's temp directory
        my $uses_pkgacct     = $self->_item_uses_pkgacct( $item->{item_type} );
        my $run_sequentially = $uses_pkgacct && scalar(@destination_ids) > 1;

        if ($run_sequentially) {
            $logger->info(
                "Using sequential backup mode for pkgacct item",
                {
                    item_uid          => $item_uid,
                    item_type         => $item->{item_type},
                    destination_count => scalar(@destination_ids),
                }
            );
        }

        # backup_protected_item now retries internally per destination using
        # structured status codes (via no_die => 1) instead of regex-matching
        # error strings.  Stale-connection (404) detection lives inside that
        # call and surfaces via error_type => 'device_offline' in the result;
        # the pool manager invalidates the connection cache and re-queues.
        $backup_result = Whostmgr::CometBackup::CometBackupAPI::backup_protected_item(
            protected_item_uid    => $item_uid,
            destination_ids       => \@destination_ids,
            run_sequentially      => $run_sequentially,
            _cached_source_ids    => $cached_source_ids,
            _cached_connection_id => $self->_get_cached_connection_id(),
        );
    }

    # Prepare more readable log data by expanding array refs
    my $log_data = {
        item_uid      => $item_uid,
        success       => $backup_result->{success},
        error_message => $backup_result->{error_message},
    };

    # Expand backup_jobs array for logging
    if ( $backup_result->{backup_jobs} && ref( $backup_result->{backup_jobs} ) eq 'ARRAY' ) {
        $log_data->{backup_jobs_count} = scalar( @{ $backup_result->{backup_jobs} } );
        $log_data->{backup_jobs}       = [
            map {
                {
                    destination_id => $_->{destination_id},
                    success        => $_->{success},
                    backup_job_id  => $_->{backup_job_id},
                    error_message  => $_->{error_message},
                }
            } @{ $backup_result->{backup_jobs} }
        ];
    }

    # Expand failed_targets array for logging
    if ( $backup_result->{failed_targets} && ref( $backup_result->{failed_targets} ) eq 'ARRAY' ) {
        $log_data->{failed_targets} = $backup_result->{failed_targets};
        $log_data->{failed_count}   = scalar( @{ $backup_result->{failed_targets} } );
    }

    $logger->debug(
        "Protected Item backup initiated",
        $log_data,
    );

    # Harvest per-destination quota exhaustion BEFORE the die-on-failure check.
    # overall_success is 0 whenever any destination fails, so if a 2-destination
    # item has one quota failure and one success, this method dies and the
    # eval in _manage_backup_pool never sees quota_destinations. Recording the
    # blocklist here means the partial-quota case still updates the blocklist
    # in the die path.
    if ( $backup_result->{quota_destinations} && @{ $backup_result->{quota_destinations} } ) {
        $self->_record_quota_exhausted_destinations( $user, $item_uid, $backup_result->{quota_destinations} );
    }

    unless ( $backup_result->{success} ) {
        $logger->debug(
            "Protected Item backup initiation failed",
            {
                item_uid      => $item_uid,
                error_type    => $backup_result->{error_type}    // 'unknown',
                error_message => $backup_result->{error_message} // 'Unknown error',
            }
        );
        return $backup_result;    # Caller decides whether to re-queue or fail permanently
    }

    # Return the backup jobs for polling
    return $backup_result;
}

=head2 _record_quota_exhausted_destinations

Adds the given destination IDs to the run-wide blocklist so subsequent items
skip them. Per-run only; not persisted. No-op for destinations already on the
list. Quota exhaustion is per-destination, not run-fatal: other destinations
keep working.

In the parallel-users path, this method runs inside a forked child; its
in-memory mutation only affects that child. To make the blocklist run-wide
rather than per-user, the child also writes the new entry through
$self->{blocklist_writer}, a pipe set up by _process_users_parallel before
forking. The parent drains the pipe at the top of each outer-loop iteration
and merges entries into its own copy of $self->{blocked_destinations}, so
the next forked user inherits them. See _open_blocklist_pipe.

=cut

sub _record_quota_exhausted_destinations ( $self, $user, $item_uid, $dest_ids ) {
    my $logger = $self->{logger};
    for my $bad_dest_id ( @{ $dest_ids || [] } ) {
        next if $self->{blocked_destinations}->{$bad_dest_id};
        $self->{blocked_destinations}->{$bad_dest_id} = 'destination_quota_exhausted';

        # Propagate to parent if we're in a forked child with a pipe.
        # PIPE_BUF (4096) atomicity guarantees a single short line cannot
        # interleave with another writer's bytes, so no locking is needed.
        # Ignore SIGPIPE in case the parent closed the reader during a
        # teardown path — the in-memory record above is the fallback.
        if ( $self->{blocklist_writer} ) {
            local $SIG{PIPE} = 'IGNORE';
            print { $self->{blocklist_writer} } "$bad_dest_id\tdestination_quota_exhausted\n";
        }

        $logger->error(
            "Destination quota exhausted; blocking destination for the remainder of this run",
            {
                user           => $user,
                item_uid       => $item_uid,
                destination_id => $bad_dest_id,
            }
        );
    }
    return;
}

=head2 _open_blocklist_pipe

Creates the parent-reads / children-write pipe used to propagate
destination-blocklist updates across the fork boundary in
_process_users_parallel. The reader is set non-blocking so the parent can
drain without stalling its outer loop. Stores both ends on $self so they
survive into forked children (each child closes the reader in
_fork_user_backup; the parent closes both at end of run via
_close_blocklist_pipe).

If pipe() or fcntl() fails the run continues with a per-user (in-memory
only) blocklist instead — the original behavior — and a warning is logged.

=cut

sub _open_blocklist_pipe ($self) {
    my $logger = $self->{logger};
    my ( $reader, $writer );
    if ( !pipe( $reader, $writer ) ) {
        $logger->warn( "Failed to create blocklist pipe; falling back to per-user (in-memory) blocklist", { error => "$!" } );
        return;
    }

    # Autoflush so child writes are visible to the parent immediately
    # rather than being held in a per-FD stdio buffer until process exit.
    $writer->autoflush(1);

    my $flags = fcntl( $reader, Fcntl::F_GETFL(), 0 );
    if ( !defined $flags || !fcntl( $reader, Fcntl::F_SETFL(), $flags | Fcntl::O_NONBLOCK() ) ) {
        $logger->warn( "Failed to set blocklist pipe non-blocking; falling back to per-user blocklist", { error => "$!" } );
        close $reader;
        close $writer;
        return;
    }

    $self->{blocklist_reader}  = $reader;
    $self->{blocklist_writer}  = $writer;
    $self->{_blocklist_buffer} = '';
    return;
}

=head2 _drain_blocklist_pipe

Reads any pending blocklist updates from $self->{blocklist_reader} (a
non-blocking pipe set up by _open_blocklist_pipe), parses them, and merges
new entries into $self->{blocked_destinations}. Stops cleanly on EAGAIN /
EWOULDBLOCK. Tolerates partial lines by buffering them on $self until the
next call. Safe to call when no pipe was opened — it is a no-op then.

=cut

sub _drain_blocklist_pipe ($self) {
    my $reader = $self->{blocklist_reader} or return;
    my $logger = $self->{logger};

    while (1) {
        my $chunk = '';
        my $n     = sysread( $reader, $chunk, 4096 );
        if ( !defined $n ) {

            # EAGAIN/EWOULDBLOCK is the expected "no more data" exit;
            # anything else is logged and we stop draining this iteration.
            if ( !$!{EAGAIN} && !$!{EWOULDBLOCK} ) {
                $logger->warn( "Unexpected error reading blocklist pipe; stopping drain this iteration", { error => "$!" } );
            }
            last;
        }
        last if $n == 0;    # EOF: all writers closed (end-of-run drain).
        $self->{_blocklist_buffer} .= $chunk;
    }

    while ( $self->{_blocklist_buffer} =~ s/^([^\n]*)\n// ) {
        my $line = $1;
        next if $line eq '';
        my ( $dest_id, $reason ) = split( /\t/, $line, 2 );
        next if !defined $dest_id || $dest_id eq '';
        $reason //= 'destination_quota_exhausted';
        next if $self->{blocked_destinations}->{$dest_id};
        $self->{blocked_destinations}->{$dest_id} = $reason;
        $logger->info(
            "Parent: merged destination block from child",
            { destination_id => $dest_id, reason => $reason },
        );
    }
    return;
}

=head2 _close_blocklist_pipe

Closes both ends of the blocklist pipe and clears the buffer. Idempotent.

=cut

sub _close_blocklist_pipe ($self) {
    if ( $self->{blocklist_reader} ) {
        close $self->{blocklist_reader};
        delete $self->{blocklist_reader};
    }
    if ( $self->{blocklist_writer} ) {
        close $self->{blocklist_writer};
        delete $self->{blocklist_writer};
    }
    delete $self->{_blocklist_buffer};
    return;
}

=head2 _maybe_block_destination_on_quota

Classifies a polled job-failure status and, if it matches Comet Backup
Server's JOB_STATUS_FAILED_QUOTA (7003), adds the destination to the
per-run blocklist via _record_quota_exhausted_destinations.

The initiation path detects quota via HTTP 507 / text match inside
backup_protected_item, but the realistic case is that the server accepts
the initiation (HTTP 200) and only reports the quota verdict via the
get-job-properties polling response. Without this hook the blocklist
machinery never fires for the common at-quota scenario and the runner
keeps dispatching items to the same exhausted destination.

=cut

sub _maybe_block_destination_on_quota ( $self, $user, $item_uid, $job_properties, $status ) {
    return unless defined $status;
    return unless $status == Whostmgr::CometBackup::Constants::JOB_STATUS_FAILED_QUOTA();
    my $dest_guid = $job_properties->{DestinationGUID};
    return unless $dest_guid;
    $self->_record_quota_exhausted_destinations( $user, $item_uid, [$dest_guid] );
    return;
}

=head2 _read_meminfo_kb

Parses /proc/meminfo and returns a hashref of all numeric fields with kB
units (which is everything we care about: MemTotal, MemAvailable, etc.).
Returns undef if /proc/meminfo can't be opened. Cheap enough to call
per-item; one open, ~50 short lines.

=cut

sub _read_meminfo_kb ($self) {
    return Whostmgr::CometBackup::MemoryGuard::read_meminfo_kb();
}

=head2 _compute_default_min_free_memory_mb

Computes the default value for min_free_memory_mb based on the kernel's
vm.min_free_kbytes reclaim watermark plus a fixed buffer. The kernel starts
direct reclaim and (with mlocked / unreclaimable memory) the OOM-killer when
free memory approaches min_free_kbytes; doubling it and adding a buffer
keeps us safely above the danger zone.

Scales with the host:
  * 1 GB VPS  (min_free_kbytes ~ 11 MB) -> floor of 100 MB
  * 8 GB box  (min_free_kbytes ~ 66 MB) -> 182 MB
  * 64 GB box (min_free_kbytes ~ 190 MB) -> 430 MB

Falls back to DEFAULT_MIN_FREE_MEMORY_MB if /proc/sys/vm/min_free_kbytes
can't be read. Never returns less than DEFAULT_MIN_FREE_FLOOR_MB.

This is a class-level function (no $self) so it can be called inside the
constructor before $self is fully initialized.

=cut

sub _compute_default_min_free_memory_mb {
    return Whostmgr::CometBackup::MemoryGuard::compute_default_min_free_memory_mb();
}

=head2 _check_free_memory

Samples MemAvailable from /proc/meminfo. If free memory has dropped below
$self->{min_free_memory_mb}, sets run_fatal_reason so the dispatch gate
aborts the run on the next outer-loop iteration. Run-abort, not
point-in-time throttle — once tripped it stays tripped.

When the touchfile $Whostmgr::CometBackup::Constants::VERBOSE_LOGGING_TOUCHFILE
exists, every decision point in the check is also logged at info level so an
operator can see what thresholds the runner is seeing and why it is (or
isn't) taking action.

=cut

sub _check_free_memory ($self) {
    my $verbose = $self->_memory_verbose_logging_enabled();
    my $logger  = $self->{logger};

    if ( $self->{run_fatal_reason} ) {
        $logger->info(
            "Memory check skipped: run already marked fatal",
            { run_fatal_reason => $self->{run_fatal_reason} },
        ) if $verbose;
        return;
    }

    my $info = $self->_read_meminfo_kb();
    if ( !$info || !defined $info->{MemAvailable} ) {
        $logger->info(
            "Memory check skipped: could not read MemAvailable from /proc/meminfo (failing open, run continues)",
            { meminfo_ok => $info ? 1 : 0 },
        ) if $verbose;
        return;
    }

    my $free_mb     = int( $info->{MemAvailable} / 1024 );
    my $required_mb = $self->{min_free_memory_mb};

    if ( $free_mb >= $required_mb ) {
        $logger->info(
            sprintf( "Memory check: %d MB free >= %d MB required; run continues.", $free_mb, $required_mb ),
            {
                free_memory_mb     => $free_mb,
                required_memory_mb => $required_mb,
                headroom_mb        => $free_mb - $required_mb,
                total_memory_mb    => $self->{total_memory_mb},
            }
        ) if $verbose;
        return;
    }

    $self->{run_fatal_reason} = 'insufficient_free_memory';
    $logger->error(
        sprintf( "Insufficient free system memory (%d MB available, %d MB required); aborting run.", $free_mb, $required_mb ),
        {
            free_memory_mb     => $free_mb,
            required_memory_mb => $required_mb,
            total_memory_mb    => $self->{total_memory_mb},
        }
    );
    return;
}

=head2 _memory_verbose_logging_enabled

Returns true when the operator-facing verbose-logging touchfile exists.
Checked per call (no caching) so the operator can toggle it during a
long-running backup and see the effect immediately. The stat is cheap
relative to the rest of the per-item work.

=cut

sub _memory_verbose_logging_enabled ($self) {
    return Whostmgr::CometBackup::MemoryGuard::verbose_logging_enabled();
}

=head2 _poll_backup_job

Polls Comet Backup for all backup jobs and finds the one matching the given Protected Item UID.

For agents E<ge> 25.9.6 run-backup-custom returns a C<JobID> directly, so
callers should prefer C<_find_all_jobs_for_item> with C<$known_job_ids> to
avoid a full scan.  This function is kept for the dryrun path and legacy callers.

=cut

sub _poll_backup_job ( $self, $protected_item_uid, $target_user = undef ) {
    my $logger = $self->{logger};

    if ( $self->{dryrun} ) {

        # Simulate job completion after a random short time
        my $simulated_runtime = time() - ( $self->{_dryrun_job_start_times}{$protected_item_uid} || time() );

        if ( $simulated_runtime > 5 ) {    # Complete after 5 seconds in dryrun
            return {
                GUID       => 'dryrun_' . $protected_item_uid,
                SourceGUID => $protected_item_uid,
                Status     => 5000,                              # STATUS_COMPLETE
                EndTime    => time(),
                StartTime  => time() - $simulated_runtime,
                completed  => 1,
                TotalFiles => 100,
                TotalSize  => 1024 * 1024,
            };
        }
        else {
            return {
                GUID       => 'dryrun_' . $protected_item_uid,
                SourceGUID => $protected_item_uid,
                Status     => 1000,                              # STATUS_RUNNING
                EndTime    => 0,
                StartTime  => time() - $simulated_runtime,
                completed  => 0,
                Progress   => {
                    Counter    => int( $simulated_runtime * 10 ),
                    BytesDone  => int( 1024 * 1024 * ( $simulated_runtime / 5 ) ),
                    ItemsDone  => int( 100 * ( $simulated_runtime / 5 ) ),
                    ItemsTotal => 100,
                },
            };
        }
    }

    require Whostmgr::CometBackup::CometBackupAPI;

    print STDERR "DEBUG: _poll_backup_job: About to call get_recent_jobs to find job for Protected Item: $protected_item_uid\n" if $self->{debug};

    # Narrow the upstream query to this Protected Item's SourceGUID. Pre-narrowing
    # cuts the response from a hundreds-of-jobs list (~1 MB) down to the small
    # set of jobs that share this SourceGUID. The newest-by-StartTime selection
    # below still handles the case where a previous run's job for the same PI
    # is still in the table.
    my $all_jobs = Whostmgr::CometBackup::CometBackupAPI::get_recent_jobs(
        target_user => $target_user,
        source_guid => $protected_item_uid,
        hours       => 4,
    );

    unless ($all_jobs) {
        print STDERR "DEBUG: _poll_backup_job: get_recent_jobs returned undef\n" if $self->{debug};
        return undef;
    }

    print STDERR "DEBUG: _poll_backup_job: Got " . scalar( keys %$all_jobs ) . " jobs matching SourceGUID=$protected_item_uid\n" if $self->{debug};

    # Find the job matching this Protected Item UID by SourceGUID
    # CRITICAL: Multiple jobs may have the same SourceGUID if the item was backed up
    # multiple times in the last 24 hours. We must find the MOST RECENT job by StartTime.
    my $matching_job;
    my $latest_start_time = 0;

    for my $job_id ( keys %$all_jobs ) {
        my $job = $all_jobs->{$job_id};
        if ( $job->{SourceGUID} && $job->{SourceGUID} eq $protected_item_uid ) {
            my $job_start_time = $job->{StartTime} // 0;

            # Only consider this job if it's newer than any we've seen
            if ( $job_start_time > $latest_start_time ) {
                $matching_job         = $job;
                $matching_job->{GUID} = $job_id;           # Ensure GUID is set
                $latest_start_time    = $job_start_time;
                print STDERR "DEBUG: _poll_backup_job: Found newer matching job $job_id for Protected Item $protected_item_uid (StartTime: $job_start_time)\n" if $self->{debug};
            }
        }
    }

    unless ($matching_job) {
        print STDERR "DEBUG: _poll_backup_job: No job found matching Protected Item $protected_item_uid\n" if $self->{debug};
        return undef;
    }

    print STDERR "DEBUG: _poll_backup_job: Selected most recent job " . $matching_job->{GUID} . " with StartTime: $latest_start_time\n" if $self->{debug};

    # print STDERR "DEBUG: _poll_backup_job: Job details - " . Data::Dumper::Dumper($matching_job) if $self->{debug};

    # Determine if job is completed
    # Status codes: 5000 = complete, 4xxx = error, EndTime > 0 = finished
    my $end_time  = $matching_job->{EndTime} // 0;
    my $completed = ( $end_time && $end_time > 0 ) ? 1 : 0;
    $matching_job->{completed} = $completed;

    print STDERR "DEBUG: _poll_backup_job: Determined completion for Protected Item $protected_item_uid - EndTime: $end_time, completed: $completed, Status: " . ( $matching_job->{Status} // 'undef' ) . "\n" if $self->{debug};

    return $matching_job;
}

=head2 _find_job_in_list

Finds a job matching the Protected Item UID in a pre-fetched job list.

This is used by the pool manager to avoid making repeated API calls.
The pool fetches all jobs once per interval, then each item searches
the shared list for its matching job.

NOTE: With multiple destinations configured, there may be multiple jobs
for the same Protected Item UID. This method returns the most recent one.
Use _find_all_jobs_for_item() to get all matching jobs.

=cut

sub _find_job_in_list ( $self, $protected_item_uid, $all_jobs ) {
    return undef unless $all_jobs;

    print STDERR "DEBUG: _find_job_in_list: Searching for Protected Item $protected_item_uid in " . scalar( keys %$all_jobs ) . " jobs\n" if $self->{debug};

    # Find the job matching this Protected Item UID by SourceGUID
    # CRITICAL: Multiple jobs may have the same SourceGUID if the item was backed up
    # multiple times in the last 24 hours. We must find the MOST RECENT job by StartTime.
    my $matching_job;
    my $latest_start_time = 0;

    for my $job_id ( keys %$all_jobs ) {
        my $job = $all_jobs->{$job_id};
        if ( $job->{SourceGUID} && $job->{SourceGUID} eq $protected_item_uid ) {
            my $job_start_time = $job->{StartTime} // 0;

            # Only consider this job if it's newer than any we've seen
            if ( $job_start_time > $latest_start_time ) {
                $matching_job         = $job;
                $matching_job->{GUID} = $job_id;           # Ensure GUID is set
                $latest_start_time    = $job_start_time;
                print STDERR "DEBUG: _find_job_in_list: Found newer matching job $job_id for Protected Item $protected_item_uid (StartTime: $job_start_time)\n" if $self->{debug};
            }
        }
    }

    unless ($matching_job) {
        print STDERR "DEBUG: _find_job_in_list: No job found matching Protected Item $protected_item_uid\n" if $self->{debug};
        return undef;
    }

    print STDERR "DEBUG: _find_job_in_list: Selected most recent job " . $matching_job->{GUID} . " with StartTime: $latest_start_time\n" if $self->{debug};

    # Determine if job is completed
    # Status codes: 5000 = complete, 4xxx = error, EndTime > 0 = finished
    my $end_time  = $matching_job->{EndTime} // 0;
    my $completed = ( $end_time && $end_time > 0 ) ? 1 : 0;
    $matching_job->{completed} = $completed;

    print STDERR "DEBUG: _find_job_in_list: Determined completion for Protected Item $protected_item_uid - EndTime: $end_time, completed: $completed, Status: " . ( $matching_job->{Status} // 'undef' ) . "\n" if $self->{debug};

    return $matching_job;
}

=head2 _find_all_jobs_for_item

Finds ALL jobs matching the Protected Item UID in a pre-fetched job list.

This is used when backing up to multiple destinations - each destination
creates a separate backup job with the same SourceGUID but different
DestinationGUID. Returns an array of all matching jobs for the given item.

When C<$known_job_ids> are supplied (job GUIDs returned by run-backup-custom
for agents E<ge> 25.9.6) the function tries a direct hash lookup first.  A
successful direct lookup skips the full SourceGUID scan.  If no direct match
is found the function falls through to the scan, which handles older agents
where the fallback connection_id is stored instead of a real job GUID.

=cut

sub _find_all_jobs_for_item ( $self, $protected_item_uid, $all_jobs, $destination_ids, $tracking_start_time, $known_job_ids = undef ) {
    return [] unless $all_jobs;

    my $logger = $self->{logger};
    my @matching_jobs;

    $logger->debug(
        "Searching for all jobs matching Protected Item",
        {
            protected_item_uid  => $protected_item_uid,
            total_jobs          => scalar( keys %$all_jobs ),
            expected_dest_count => scalar(@$destination_ids),
            tracking_start_time => $tracking_start_time,
        }
    );

    # Create a hash of expected destination IDs for quick lookup
    my %expected_dests = map { $_ => 1 } @$destination_ids;

    # Fast path: direct GUID lookup for job IDs returned by run-backup-custom
    # (agents >= 25.9.6 return a DispatchWithJobIDResponse with a real JobID).
    # The connection_id fallback for older agents won't match any job GUID here,
    # so the scan path below is used for them automatically.
    if ( $known_job_ids && @$known_job_ids ) {
        for my $job_id (@$known_job_ids) {
            next unless $job_id;
            my $job = $all_jobs->{$job_id} or next;
            next unless ( $job->{SourceGUID}                     // '' ) eq $protected_item_uid;
            next unless $expected_dests{ $job->{DestinationGUID} // '' };
            $job->{GUID} = $job_id;
            my $end_time = $job->{EndTime} // 0;
            $job->{completed} = ( $end_time && $end_time > 0 ) ? 1 : 0;
            push @matching_jobs, $job;
        }

        if (@matching_jobs) {
            $logger->debug(
                "Direct job lookup by ID succeeded",
                {
                    protected_item_uid => $protected_item_uid,
                    matched_jobs       => scalar(@matching_jobs),
                }
            );
            return \@matching_jobs;
        }

        # No direct matches — fall through to SourceGUID scan (older agents or
        # jobs not yet visible in the recent-jobs window).
    }

    # Scan path: iterate all jobs and match by SourceGUID
    for my $job_id ( keys %$all_jobs ) {
        my $job = $all_jobs->{$job_id};

        # Match on SourceGUID (Protected Item UID)
        if ( $job->{SourceGUID} && $job->{SourceGUID} eq $protected_item_uid ) {

            # Filter out jobs that started significantly before we began tracking this item
            # This prevents logging old jobs from previous backup runs
            # Use a 60-second buffer to account for clock skew and race conditions
            my $job_start_time = $job->{StartTime} // 0;
            my $time_buffer    = 60;                       # seconds
            if ( $tracking_start_time && $job_start_time && $job_start_time < ( $tracking_start_time - $time_buffer ) ) {
                $logger->debug(
                    "Skipping old job that started before we began tracking this item",
                    {
                        job_id              => $job_id,
                        job_start_time      => $job_start_time,
                        tracking_start_time => $tracking_start_time,
                        buffer_seconds      => $time_buffer,
                    }
                );
                next;
            }

            # Check if this job's destination is one we're tracking
            my $dest_guid = $job->{DestinationGUID} // '';

            # Only include jobs for destinations we're tracking
            if ( $expected_dests{$dest_guid} ) {
                $job->{GUID} = $job_id;    # Ensure GUID is set

                # Determine if job is completed
                my $end_time  = $job->{EndTime} // 0;
                my $completed = ( $end_time && $end_time > 0 ) ? 1 : 0;
                $job->{completed} = $completed;

                push @matching_jobs, $job;

                $logger->debug(
                    "Found matching job for destination",
                    {
                        protected_item_uid => $protected_item_uid,
                        job_id             => $job_id,
                        destination_id     => $dest_guid,
                        completed          => $completed,
                        status             => $job->{Status} // 'unknown',
                    }
                );
            }
        }
    }

    $logger->debug(
        "Found jobs for Protected Item",
        {
            protected_item_uid => $protected_item_uid,
            matched_jobs       => scalar(@matching_jobs),
            expected_jobs      => scalar(@$destination_ids),
        }
    );

    return \@matching_jobs;
}

=head2 _item_uses_pkgacct

Helper function to determine if an item type uses pkgacct internally.

Both pkgacct_program_output and databases_program_output use pkgacct, and
concurrent pkgacct operations for the same system user can cause directory
collisions and backup failures.

=cut

sub _item_uses_pkgacct ( $self, $item_type ) {
    return 1 if $item_type eq 'pkgacct_program_output';
    return 1 if $item_type eq 'databases_program_output';    # Uses pkgacct internally
    return 0;
}

=head2 _manage_backup_pool

Manages a pool of concurrent backup jobs with polling and timeout handling.

This replaces the simple loop in _execute_user_backup with a sophisticated
pool-based system that:
- Maintains up to 3 concurrent backup jobs total
- Polls all jobs via get-jobs API and matches by SourceGUID
- Handles timeouts (60 minutes max per job)
- Updates database with progress information

=cut

sub _manage_backup_pool ( $self, $user, $protected_items, $user_run_id, $job_config ) {
    print STDERR "DEBUG: _manage_backup_pool: ENTERED for user $user\n" if $self->{debug};

    my $logger = $self->{logger};
    print STDERR "DEBUG: _manage_backup_pool: Got logger\n" if $self->{debug};

    my $max_pool_size      = 3;
    my $timeout            = 3600;    # 60 minutes in seconds
    my $poll_interval_fast = 5;       # When pool is full and items are waiting
    my $poll_interval_slow = 15;      # When pool has capacity or items just started

    print STDERR "DEBUG: _manage_backup_pool: About to copy protected_items array\n" if $self->{debug};
    my @pending_items = @$protected_items;
    print STDERR "DEBUG: _manage_backup_pool: Copied array, have " . scalar(@pending_items) . " items\n" if $self->{debug};

    print STDERR "DEBUG: _manage_backup_pool: About to initialize active_items hash\n" if $self->{debug};
    my %active_items;                 # protected_item_uid => { item_run_id, item, start_time, last_poll, destination_id }
    print STDERR "DEBUG: _manage_backup_pool: active_items hash initialized\n" if $self->{debug};

    print STDERR "DEBUG: _manage_backup_pool: About to initialize active_pkgacct_users hash\n" if $self->{debug};
    my %active_pkgacct_users;         # system_user => 1 (tracks which users have active pkgacct-related jobs)
    print STDERR "DEBUG: _manage_backup_pool: active_pkgacct_users hash initialized\n" if $self->{debug};

    # DEFENSE: Safety check - exit early if no items to process
    # This prevents infinite loops when the array is empty
    print STDERR "DEBUG: _manage_backup_pool: About to check if pending_items is empty\n" if $self->{debug};
    unless (@pending_items) {
        print STDERR "DEBUG: _manage_backup_pool: No pending items for $user, exiting early\n" if $self->{debug};
        return;
    }

    print STDERR "DEBUG: _manage_backup_pool: About to call logger->info for 'Starting backup pool management'\n" if $self->{debug};
    $logger->info(
        "Starting backup pool management",
        {
            user               => $user,
            total_items        => scalar(@pending_items),
            max_pool_size      => $max_pool_size,
            timeout_seconds    => $timeout,
            poll_interval_fast => $poll_interval_fast,
            poll_interval_slow => $poll_interval_slow,
        }
    );
    print STDERR "DEBUG: _manage_backup_pool: logger->info call completed\n" if $self->{debug};

    # Track dryrun job start times for simulation
    $self->{_dryrun_job_start_times} = {} if $self->{dryrun};

    # The device ID is needed for log context.
    require Whostmgr::CometBackup::CometBackupAPI;
    my $device_id = eval { Whostmgr::CometBackup::CometBackupAPI::get_device_id() };
    $device_id = undef if $@;
    my $device_check_interval = 10;    # seconds between connection checks
    my $last_device_check     = 0;

    $logger->debug(
        "Device ID resolved for disconnect detection",
        {
            user      => $user,
            device_id => $device_id // 'unavailable',
        }
    );

    # Tracks the epoch time before which we skip new dispatches after a
    # device_offline failure.  Prevents spinning on list-active while we wait
    # for the outer device-check loop to detect whether the agent is back.
    my $dispatch_paused_until = 0;

    # The connection ID that last caused a dispatch failure ("Connection ID
    # invalid").  Dispatch will not resume with the same ID — only a DIFFERENT
    # connection ID (indicating a genuine reconnect) clears the pause.
    my $failed_conn_id = undef;

    # When the device is offline, records the epoch time we first detected it.
    # Reset to 0 whenever the device reconnects.  Undispatched items are failed
    # after $offline_timeout seconds so the run does not loop indefinitely.
    my $offline_detected_at = 0;
    my $offline_timeout     = 300;    # 5 minutes

    # Tracks the connection ID seen at the last device-check.  A change means
    # the agent reconnected mid-run; any in-flight jobs on the old session are
    # dead and must be orphaned immediately.
    my $last_seen_conn_id = undef;

    while ( @pending_items || %active_items ) {
        print STDERR "DEBUG: _manage_backup_pool: Top of while loop - pending: " . scalar(@pending_items) . ", active: " . scalar( keys %active_items ) . "\n" if $self->{debug};

        # Run-fatal gate: stop dispatching new work and let the active set
        # drain. Set by _check_free_memory (insufficient system memory) so we
        # don't keep submitting items into an OOM-bound host. Destination
        # quota exhaustion is NOT run-fatal; it goes on blocked_destinations
        # instead so other destinations keep working.
        if ( $self->{run_fatal_reason} ) {
            $logger->error(
                "Run-fatal condition detected, stopping new dispatch",
                {
                    user             => $user,
                    run_fatal_reason => $self->{run_fatal_reason},
                    pending_items    => scalar(@pending_items),
                    active_items     => scalar( keys %active_items ),
                }
            );
            @pending_items = ();    # drop queue; let active set finish & poll out
            last if !%active_items;
        }

        # Check for cancellation
        print STDERR "DEBUG: _manage_backup_pool: About to get cancel file path\n" if $self->{debug};
        my $cancel_file = Whostmgr::CometBackup::Constants::get_cancel_file_path( $self->{job_uuid} );
        print STDERR "DEBUG: _manage_backup_pool: Got cancel file path: $cancel_file\n" if $self->{debug};
        print STDERR "DEBUG: _manage_backup_pool: About to check cancellation\n"        if $self->{debug};
        if ( $INTERRUPTED || -f $cancel_file ) {
            print STDERR "DEBUG: _manage_backup_pool: Cancellation detected!\n" if $self->{debug};
            $logger->info( "Cancellation detected, stopping backup pool", { user => $user } );
            last;
        }
        print STDERR "DEBUG: _manage_backup_pool: No cancellation, proceeding\n" if $self->{debug};

        # Device disconnect detection: check the cached connection status.
        # The 60s negative cache in _get_cached_connection_id prevents
        # excessive API calls when the device is offline.
        my $now_check = time();
        if (   $device_id
            && !$self->{dryrun}
            && ( %active_items || $dispatch_paused_until )
            && ( $now_check - $last_device_check ) >= $device_check_interval ) {
            $last_device_check = $now_check;

            my $current_conn_id = $self->_get_cached_connection_id();

            if ( !defined $current_conn_id ) {

                # Device is genuinely offline — record the first time we noticed.
                $offline_detected_at ||= $now_check;
                my $active_count = scalar( keys %active_items );

                if ($active_count) {
                    $logger->info(
                        "Device offline: orphaning ${active_count} active item(s)",
                        {
                            user      => $user,
                            device_id => $device_id,
                        }
                    );
                    $self->_orphan_active_items( $now_check, \%active_items );
                }

                last if %active_items == 0 && !@pending_items;

                # Pending items are stuck because the device is offline.  Once the
                # offline window exceeds $offline_timeout, give up on them rather
                # than looping forever waiting for the agent to come back.
                my $offline_secs = $now_check - $offline_detected_at;
                if ( @pending_items && $offline_secs >= $offline_timeout ) {
                    $logger->warn(
                        "Device offline timeout reached; failing undispatched items",
                        {
                            user          => $user,
                            device_id     => $device_id,
                            offline_secs  => $offline_secs,
                            timeout_secs  => $offline_timeout,
                            pending_count => scalar(@pending_items),
                        }
                    );
                    $self->_fail_undispatched_items( \@pending_items, $user_run_id, $offline_secs );
                }
                elsif (@pending_items) {
                    my $remaining = $offline_timeout - $offline_secs;
                    $logger->info(
                        "Device offline; will fail undispatched items in ${remaining}s if it stays offline",
                        {
                            user           => $user,
                            pending_count  => scalar(@pending_items),
                            remaining_secs => $remaining,
                        }
                    );
                }
                last if !@pending_items;
            }
            else {
                # Device is connected (or status unknown — file not yet written).
                # Skip reconnect detection if we got 'UNKNOWN' (file missing).
                if ( $current_conn_id ne 'UNKNOWN' ) {
                    if ( defined $last_seen_conn_id && $current_conn_id ne $last_seen_conn_id ) {
                        $logger->warn(
                            "Agent reconnected mid-run (connection ID changed); orphaning in-flight items",
                            {
                                user         => $user,
                                device_id    => $device_id,
                                old_conn_id  => $last_seen_conn_id,
                                new_conn_id  => $current_conn_id,
                                active_count => scalar( keys %active_items ),
                            }
                        );
                        $self->_orphan_active_items( $now_check, \%active_items );
                        $self->_invalidate_connection_cache();
                    }
                    $last_seen_conn_id = $current_conn_id;

                    # Only clear the dispatch pause if the connection ID is genuinely
                    # new (different from the one that last failed).  This prevents
                    # the fork from hammering run-backup-custom with a stale ID that
                    # Comet's dispatcher/list-active still shows after disconnect.
                    if ( $dispatch_paused_until && ( !defined $failed_conn_id || $current_conn_id ne $failed_conn_id ) ) {
                        $logger->info(
                            "New connection ID detected; resuming dispatch",
                            { new_conn_id => $current_conn_id }
                        );
                        $dispatch_paused_until = 0;
                        $failed_conn_id        = undef;
                    }
                }
                $offline_detected_at = 0 if $offline_detected_at;    # Reset offline timer
            }
        }

        print STDERR "DEBUG: _manage_backup_pool: About to enter inner while loop for starting jobs\n" if $self->{debug};
        while ( @pending_items && keys(%active_items) < $max_pool_size ) {

            # Don't dispatch while the device is known to be offline or while
            # we still see the same stale connection ID that already failed.
            if ( time() < $dispatch_paused_until ) {
                $logger->debug("Dispatch paused; waiting for valid connection");
                last;
            }
            if ( defined $failed_conn_id ) {

                # Pause expired but we have a known-bad ID.  Check whether a
                # new connection has appeared before attempting dispatch.
                my $avail = $self->_get_cached_connection_id();
                if ( defined $avail && $avail eq $failed_conn_id ) {
                    $dispatch_paused_until = time() + 30;
                    $logger->debug("Connection ID still stale; extending dispatch pause");
                    last;
                }
                elsif ( defined $avail && $avail ne $failed_conn_id ) {
                    $logger->info( "New connection ID available; resuming dispatch", { new_conn_id => $avail } );
                    $failed_conn_id = undef;
                    $self->_invalidate_connection_cache();
                }
            }

            print STDERR "DEBUG: _manage_backup_pool: INSIDE inner while loop - starting job\n" if $self->{debug};

            # Sample system free memory before each protected item. If the
            # host has less than $self->{min_free_memory_mb} available,
            # abort the run. The outer-loop gate will pick this up next
            # iteration and drop the rest of the queue.
            $self->_check_free_memory();
            last if $self->{run_fatal_reason};

            print STDERR "DEBUG: _manage_backup_pool: About to peek at pending_items[0]\n" if $self->{debug};
            my $item = $pending_items[0];    # Peek at next item without shifting yet
            print STDERR "DEBUG: _manage_backup_pool: Got item, about to get protected_item_uid\n" if $self->{debug};
            my $item_uid = $item->{protected_item_uid};
            print STDERR "DEBUG: _manage_backup_pool: Got item_uid: $item_uid\n" if $self->{debug};

            # Check for pkgacct conflicts before starting this job
            # pkgacct and databases backups cannot run concurrently for the same system user
            # due to directory collision issues in pkgacct's semi-hardcoded build paths
            # Use GLOBAL file-based locks since backup processes run in parallel forks
            print STDERR "DEBUG: _manage_backup_pool: About to check item_type\n" if $self->{debug};
            my $uses_pkgacct = $self->_item_uses_pkgacct( $item->{item_type} );
            print STDERR "DEBUG: _manage_backup_pool: Got uses_pkgacct: $uses_pkgacct\n" if $self->{debug};
            print STDERR "DEBUG: _manage_backup_pool: About to get system_user\n"        if $self->{debug};
            my $item_system_user = $item->{system_user};
            print STDERR "DEBUG: _manage_backup_pool: Got system_user: $item_system_user\n" if $self->{debug};
            my $has_pkgacct_conflict = 0;
            my $acquired_lock        = 0;

            print STDERR "DEBUG: _manage_backup_pool: About to check if uses_pkgacct\n" if $self->{debug};
            if ($uses_pkgacct) {
                print STDERR "DEBUG: _manage_backup_pool: Item uses pkgacct, attempting to acquire lock\n" if $self->{debug};

                # Try to acquire global lock for this system user
                print STDERR "DEBUG: _manage_backup_pool: About to require Constants module\n" if $self->{debug};
                require Whostmgr::CometBackup::Constants;
                print STDERR "DEBUG: _manage_backup_pool: Constants module loaded\n"                if $self->{debug};
                print STDERR "DEBUG: _manage_backup_pool: About to call try_acquire_pkgacct_lock\n" if $self->{debug};
                $acquired_lock = Whostmgr::CometBackup::Constants::try_acquire_pkgacct_lock( $item_system_user, $item_uid );
                print STDERR "DEBUG: _manage_backup_pool: try_acquire_pkgacct_lock returned: $acquired_lock\n" if $self->{debug};

                if ( !$acquired_lock ) {
                    $has_pkgacct_conflict = 1;
                    print STDERR "DEBUG: _manage_backup_pool: Lock not acquired - deferring pkgacct backup for $item_uid (system_user: $item_system_user)\n" if $self->{debug};

                    # IMPORTANT: Don't use logger in child process - it can hang due to file handles from parent
                }
                else {
                    print STDERR "DEBUG: _manage_backup_pool: Successfully acquired pkgacct lock for $item_system_user\n" if $self->{debug};
                }
            }

            # If there's a conflict, try the next item in queue instead
            if ($has_pkgacct_conflict) {

                # Look for a non-conflicting item further in the queue
                my $found_alternative = 0;
                for my $i ( 1 .. $#pending_items ) {
                    my $alt_item             = $pending_items[$i];
                    my $alt_uses_pkgacct     = $self->_item_uses_pkgacct( $alt_item->{item_type} );
                    my $alt_item_system_user = $alt_item->{system_user};
                    my $alt_item_uid         = $alt_item->{protected_item_uid};

                    # Check if this alternative would have a pkgacct conflict
                    my $alt_has_conflict = 0;
                    if ($alt_uses_pkgacct) {

                        # Try to acquire lock for the alternative item
                        my $alt_acquired_lock = Whostmgr::CometBackup::Constants::try_acquire_pkgacct_lock( $alt_item_system_user, $alt_item_uid );
                        if ( !$alt_acquired_lock ) {
                            $alt_has_conflict = 1;
                        }
                        else {
                            # We acquired the lock for the alternative, so use it
                            # Note: we need to remember we hold this lock
                            $acquired_lock = 1;
                        }
                    }

                    unless ($alt_has_conflict) {

                        # Found a non-conflicting item - move it to the front
                        $item = splice( @pending_items, $i, 1 );
                        unshift @pending_items, $item;
                        $found_alternative = 1;
                        $logger->debug(
                            "Found non-conflicting item to start instead",
                            {
                                item_uid    => $item->{protected_item_uid},
                                item_type   => $item->{item_type},
                                system_user => $item->{system_user},
                            }
                        );
                        last;
                    }
                }

                # If we couldn't find any non-conflicting item, break and wait for polling
                unless ($found_alternative) {
                    $logger->debug("All pending items have pkgacct conflicts, waiting for active jobs to complete");
                    last;
                }
            }

            # Now shift the item we're actually going to start (either original or alternative)
            shift @pending_items;
            $item_uid = $item->{protected_item_uid};

            # Log item start; reuse an existing run-log record when re-queuing
            # after a device_offline failure (avoids duplicate DB entries).
            my $item_run_id = $item->{_item_run_id} // $self->_log_item_start( $user_run_id, $item );
            delete $item->{_item_run_id};

            # Start the backup job
            eval {
                my $backup_result = $self->_backup_protected_item( $user, $item, $job_config );

                # Quota harvest already happened inside _backup_protected_item
                # (before its die-on-failure check), so we don't need to do
                # anything here on the success path.

                # Track dryrun job start time for simulation
                if ( $self->{dryrun} ) {
                    $self->{_dryrun_job_start_times}{$item_uid} = time();
                }

                # Add to active pool tracked by Protected Item UID
                if ( $backup_result->{success} ) {

                    # With multiple destinations, we track all backup jobs for this protected item
                    # Each job has its own destination_id and will be polled individually
                    my @backup_jobs = @{ $backup_result->{backup_jobs} || [] };

                    # For sequential backups (pkgacct items), track remaining destinations
                    my @remaining_destinations = @{ $backup_result->{remaining_destinations} || [] };

                    $logger->info(
                        "Protected Item backup initiated",
                        {
                            user                   => $user,
                            item_uid               => $item_uid,
                            destination_count      => scalar(@backup_jobs),
                            destinations           => [ map { $_->{destination_id} } @backup_jobs ],
                            remaining_destinations => \@remaining_destinations,
                            sequential_mode        => scalar(@remaining_destinations) > 0 ? 1 : 0,
                        }
                    );

                    my $item_start_time = time();
                    $active_items{$item_uid} = {
                        item_run_id            => $item_run_id,
                        item                   => $item,
                        user                   => $user,
                        backup_jobs            => \@backup_jobs,               # Store all backup jobs
                        remaining_destinations => \@remaining_destinations,    # For sequential processing
                        start_time             => $item_start_time,
                        last_poll              => 0,
                        holds_pkgacct_lock     => $acquired_lock,              # Track if we hold the global lock
                        logs_appended          => {},                          # Track which job IDs have had logs written
                        tracking_start_time    => $item_start_time,            # Timestamp when we started tracking - filter old jobs
                    };

                    # No need to track in %active_pkgacct_users since we use global file locks now
                    if ($acquired_lock) {
                        $logger->debug(
                            "Acquired global pkgacct lock for system user",
                            {
                                system_user => $item->{system_user},
                                item_type   => $item->{item_type},
                                item_uid    => $item_uid,
                            }
                        );
                    }

                    # Update database with first destination for initial tracking
                    # Individual destination results will be logged when jobs complete
                    my $first_dest = $backup_jobs[0]{destination_id} // 'unknown';
                    $self->_update_item_run_job_info(
                        $item_run_id,
                        undef,    # No job ID available from run-backup-custom
                        $first_dest
                    );

                    $logger->info(
                        "Added Protected Item to backup pool",
                        {
                            user              => $user,
                            item_uid          => $item_uid,
                            destination_count => scalar(@backup_jobs),
                            pool_size         => scalar( keys %active_items ),
                        }
                    );
                }
                else {
                    my $error_type    = $backup_result->{error_type} // '';
                    my $error_message = $backup_result->{error_message} || 'Backup did not start';

                    if ( $error_type eq 'device_offline' ) {

                        # Agent disconnected before we could dispatch.  Re-queue the item
                        # so it will be retried once the device reconnects; preserve the
                        # existing run-log record so we don't create duplicate entries.
                        $item->{_item_run_id} = $item_run_id;
                        unshift @pending_items, $item;

                        if ($acquired_lock) {
                            require Whostmgr::CometBackup::Constants;
                            Whostmgr::CometBackup::Constants::release_pkgacct_lock( $item->{system_user} );
                        }

                        # Record the stale connection ID so the device-check won't clear
                        # the pause until a genuinely NEW connection appears.
                        $failed_conn_id = $self->{api_cache}{connection_id}{id};
                        $self->_invalidate_connection_cache();
                        $dispatch_paused_until = time() + 30;
                        $offline_detected_at ||= time();

                        $logger->warn(
                            "Dispatch failed: connection ID invalid; pausing 30s",
                            {
                                user        => $user,
                                item_uid    => $item_uid,
                                item_type   => $item->{item_type},
                                failed_conn => $failed_conn_id // 'unknown',
                            }
                        );
                        last;    # exit inner dispatch loop; outer loop handles reconnect check
                    }

                    my $result_status = 'failed';

                    $logger->warn(
                        "Protected Item backup did not start",
                        {
                            user          => $user,
                            item_uid      => $item_uid,
                            item_type     => $item->{item_type},
                            skipped       => $backup_result->{skipped} ? 1 : 0,
                            error_message => $error_message,
                        }
                    );

                    $self->_log_item_complete( $item_run_id, $result_status, $error_message );

                    require Whostmgr::CometBackup::ProtectedItemMap;
                    my $map = Whostmgr::CometBackup::ProtectedItemMap->new();
                    $map->update_protected_item_status(
                        protected_item_uid => $item_uid,
                        status             => $result_status,
                    );

                    # Crucial: release lock here as well, otherwise skipped/failed starts
                    # can leave the user permanently blocked with pkgacct conflict loops.
                    if ($acquired_lock) {
                        require Whostmgr::CometBackup::Constants;
                        Whostmgr::CometBackup::Constants::release_pkgacct_lock( $item->{system_user} );
                        $logger->debug(
                            "Released global pkgacct lock (startup returned unsuccessful)",
                            {
                                system_user => $item->{system_user},
                                item_type   => $item->{item_type},
                                item_uid    => $item_uid,
                            }
                        );
                    }
                }
            };
            if ($@) {
                ( my $error = $@ ) =~ s/\n$//;
                $logger->error(
                    "Failed to start backup for item",
                    {
                        user     => $user,
                        item_uid => $item_uid,
                        error    => $error,
                    }
                );
                $self->_log_item_complete( $item_run_id, 'failed', $error );

                # Mark as failed in database
                require Whostmgr::CometBackup::ProtectedItemMap;
                my $map = Whostmgr::CometBackup::ProtectedItemMap->new();
                $map->update_protected_item_status(
                    protected_item_uid => $item_uid,
                    status             => 'failed',
                );

                # Release global pkgacct lock if we acquired it
                if ($acquired_lock) {
                    require Whostmgr::CometBackup::Constants;
                    Whostmgr::CometBackup::Constants::release_pkgacct_lock( $item->{system_user} );
                    $logger->debug(
                        "Released global pkgacct lock (startup failure)",
                        {
                            system_user => $item->{system_user},
                            item_type   => $item->{item_type},
                        }
                    );
                }
            }

            # Brief delay between starting jobs
            select( undef, undef, undef, 0.2 );
        }

        # Poll active jobs
        my $now = time();

        # Adaptive poll interval: poll faster when the pool is full and
        # items are waiting — we need to know ASAP when a slot frees up.
        my $pool_is_full  = keys(%active_items) >= $max_pool_size;
        my $items_waiting = scalar(@pending_items) > 0;
        my $poll_interval = ( $pool_is_full && $items_waiting ) ? $poll_interval_fast : $poll_interval_slow;

        # Determine if we need to poll (at least one item ready for polling)
        my $should_poll = 0;
        for my $item_uid ( keys %active_items ) {
            my $item_info = $active_items{$item_uid};
            if ( $now - $item_info->{last_poll} >= $poll_interval ) {
                $should_poll = 1;
                last;
            }
        }

        # Fetch all recent jobs for this user once per polling interval and
        # let each active item find its own row in the result. Cannot narrow
        # on HasFinished=false here: the same response is used to detect
        # completion, so filtering out finished jobs upstream would prevent
        # the pool from ever seeing a job end.
        my $all_jobs;
        if ( $should_poll && !$self->{dryrun} ) {
            require Whostmgr::CometBackup::CometBackupAPI;

            print STDERR "DEBUG: Fetching recent jobs for user: $user\n" if $self->{debug};
            $all_jobs = Whostmgr::CometBackup::CometBackupAPI::get_recent_jobs(
                target_user => $user,
                hours       => 4,
            );

            if ($all_jobs) {
                print STDERR "DEBUG: Got " . scalar( keys %$all_jobs ) . " recent jobs for pool items for polling (user: $user)\n" if $self->{debug};
            }
            else {
                $logger->warn( "Failed to retrieve job status from Comet server", { user => $user } );
            }
        }

        for my $item_uid ( keys %active_items ) {
            my $item_info = $active_items{$item_uid};

            # Check timeout
            my $runtime = $now - $item_info->{start_time};
            if ( $runtime > $timeout ) {
                $logger->warn(
                    "Backup job timed out",
                    {
                        user     => $user,
                        item_uid => $item_uid,
                        runtime  => $runtime,
                        timeout  => $timeout,
                    }
                );
                $self->_log_item_complete( $item_info->{item_run_id}, 'timeout', "Job exceeded timeout of $timeout seconds" );

                # Mark as timeout in database
                require Whostmgr::CometBackup::ProtectedItemMap;
                my $map = Whostmgr::CometBackup::ProtectedItemMap->new();
                $map->update_protected_item_status(
                    protected_item_uid => $item_uid,
                    status             => 'timeout',
                );

                $self->_release_pkgacct_lock_if_held( $item_info, 'timeout' );
                delete $active_items{$item_uid};
                next;
            }

            # Poll if enough time has passed
            if ( $now - $item_info->{last_poll} >= $poll_interval ) {
                print STDERR "DEBUG: Polling for Protected Item $item_uid for user $user (runtime: $runtime seconds)\n" if $self->{debug};

                # Extract destination IDs from backup_jobs
                my @destination_ids = map { $_->{destination_id} } @{ $item_info->{backup_jobs} || [] };

                # Get the tracking start time to filter out old jobs
                my $tracking_start_time = $item_info->{tracking_start_time} || $item_info->{start_time};

                # Find ALL jobs for this protected item across all destinations
                my $all_item_jobs;
                if ( $self->{dryrun} ) {

                    # For dryrun, simulate a single completed job
                    $all_item_jobs = [ $self->_poll_backup_job($item_uid) ];
                }
                else {
                    my @known_job_ids = map { $_->{backup_job_id} } @{ $item_info->{backup_jobs} || [] };
                    $all_item_jobs = $self->_find_all_jobs_for_item( $item_uid, $all_jobs, \@destination_ids, $tracking_start_time, \@known_job_ids );
                }

                # Legacy single-job behavior if no backup_jobs array (backward compatibility)
                # This handles old active_items that were created before multi-destination support
                if ( !$item_info->{backup_jobs} ) {
                    my $job_properties = $self->{dryrun} ? $self->_poll_backup_job($item_uid) : $self->_find_job_in_list( $item_uid, $all_jobs );
                    $all_item_jobs = $job_properties ? [$job_properties] : [];
                }

                # Process each job for each destination
                my $all_completed = 0;
                my $any_failed    = 0;
                my @completed_jobs;
                my @failed_jobs;

                if ( $all_item_jobs && @$all_item_jobs ) {
                    $logger->debug(
                        "Processing backup jobs for Protected Item",
                        {
                            item_uid  => $item_uid,
                            job_count => scalar(@$all_item_jobs),
                        }
                    );

                    for my $job_properties (@$all_item_jobs) {
                        next unless $job_properties;

                        print STDERR "DEBUG: Processing job " . ( $job_properties->{GUID} // 'unknown' ) . " for destination " . ( $job_properties->{DestinationGUID} // 'unknown' ) . " - completed: " . ( $job_properties->{completed} ? 'YES' : 'NO' ) . "\n" if $self->{debug};

                        # Update database with progress and job ID
                        $self->_update_item_run_progress( $item_info->{item_run_id}, $job_properties );

                        # Update job ID and destination ID in database
                        if ( $job_properties->{GUID} ) {
                            $self->_update_item_run_job_info(
                                $item_info->{item_run_id},
                                $job_properties->{GUID},
                                $job_properties->{DestinationGUID}
                            );
                        }

                        # Track completion status
                        if ( $job_properties->{completed} ) {
                            my $status    = $job_properties->{Status}       // 0;
                            my $item_type = $item_info->{item}->{item_type} // '';

                            # JOB_STATUS_STOP_SUCCESS (5000) = completed successfully
                            # JOB_STATUS_FAILED_WARNING (7001) = completed with warnings
                            # For homedir_files_folders, warnings are treated as success because
                            # permission errors on container overlay dirs, sockets, etc. are common
                            # and expected in home directory backups.
                            my $is_success = ( $status == Whostmgr::CometBackup::Constants::JOB_STATUS_STOP_SUCCESS() )
                              || ( Whostmgr::CometBackup::Constants::is_job_completed_with_warning($status) && $item_type eq 'homedir_files_folders' );

                            if ($is_success) {
                                push @completed_jobs, $job_properties;

                                $logger->info(
                                    "Backup job completed successfully for destination",
                                    {
                                        user           => $user,
                                        item_uid       => $item_uid,
                                        job_id         => $job_properties->{GUID},
                                        destination_id => $job_properties->{DestinationGUID},
                                        snapshot_id    => $job_properties->{SnapshotID},
                                    }
                                );

                                # Record backup completion in ProtectedItemMap for each successful job
                                require Whostmgr::CometBackup::ProtectedItemMap;
                                my $map = Whostmgr::CometBackup::ProtectedItemMap->new();
                                $map->record_backup_completion(
                                    protected_item_uid => $item_uid,
                                    comet_snapshot_id  => $job_properties->{SnapshotID},
                                );

                                # Append Protected Item logs to dated text file (only once per job)
                                my $job_id = $job_properties->{GUID};
                                if ( !$item_info->{logs_appended}{$job_id} ) {
                                    $self->_append_protected_item_logs_to_file(
                                        $job_id,                              # Comet job ID
                                        $item_uid,                            # Protected Item UID
                                        $item_info->{item}->{item_type},      # Item type
                                        $item_info->{item}->{system_user},    # System username
                                        $job_properties->{DestinationGUID}    # Destination ID
                                    );
                                    $item_info->{logs_appended}{$job_id} = 1;
                                }
                            }
                            else {
                                push @failed_jobs, $job_properties;
                                $any_failed = 1;

                                $logger->error(
                                    "Backup job completed with error for destination",
                                    {
                                        user           => $user,
                                        item_uid       => $item_uid,
                                        job_id         => $job_properties->{GUID},
                                        destination_id => $job_properties->{DestinationGUID},
                                        status         => $status,
                                    }
                                );

                                # The Comet Backup Server delivers the quota
                                # verdict here (JOB_STATUS_FAILED_QUOTA, 7003)
                                # rather than as a non-200 on the initiation
                                # response. Mirror the initiation-path
                                # blocklist behavior so subsequent items in
                                # this run skip the at-quota destination.
                                $self->_maybe_block_destination_on_quota( $user, $item_uid, $job_properties, $status );

                                # Append logs even for failures (only once per job)
                                my $job_id = $job_properties->{GUID};
                                if ( !$item_info->{logs_appended}{$job_id} ) {
                                    $self->_append_protected_item_logs_to_file(
                                        $job_id,
                                        $item_uid,
                                        $item_info->{item}->{item_type},
                                        $item_info->{item}->{system_user},
                                        $job_properties->{DestinationGUID}
                                    );
                                    $item_info->{logs_appended}{$job_id} = 1;
                                }
                            }
                        }
                    }

                    # Check if ALL jobs are completed
                    my $total_expected  = scalar(@destination_ids);
                    my $total_completed = scalar(@completed_jobs) + scalar(@failed_jobs);

                    # For sequential backups, we only expect one job at a time
                    my @remaining_destinations = @{ $item_info->{remaining_destinations} || [] };
                    if (@remaining_destinations) {

                        # Sequential mode: check if current batch is complete
                        $all_completed = ( $total_completed >= scalar( @{ $item_info->{backup_jobs} } ) );
                    }

                    # For backward compatibility, if we don't have destination_ids, check if at least one job completed
                    elsif ( !@destination_ids && ( scalar(@completed_jobs) > 0 || scalar(@failed_jobs) > 0 ) ) {
                        $all_completed = 1;
                    }
                    elsif (@destination_ids) {
                        $all_completed = ( $total_completed >= $total_expected );
                    }

                    $logger->debug(
                        "Job completion status for Protected Item",
                        {
                            item_uid               => $item_uid,
                            total_expected         => $total_expected,
                            completed_ok           => scalar(@completed_jobs),
                            completed_failed       => scalar(@failed_jobs),
                            all_completed          => $all_completed,
                            remaining_destinations => scalar(@remaining_destinations),
                            sequential_mode        => scalar(@remaining_destinations) > 0 ? 1 : 0,
                        }
                    );
                }

                my $orphan_threshold      = 300;    # 5 minutes of zero progress
                my $progress_log_interval = 60;

                # Log periodically so the backup log shows every active item is being monitored.
                if (  !$all_completed
                    && $runtime > $progress_log_interval
                    && ( $now - ( $item_info->{last_progress_log} // 0 ) ) >= $progress_log_interval ) {
                    if ( $self->_has_zero_progress( $all_completed, $all_item_jobs ) ) {
                        $self->{logger}->info(
                            "Backup job has not made progress yet; still monitoring",
                            {
                                user      => $user,
                                item_uid  => $item_uid,
                                elapsed   => "${runtime}s",
                                threshold => "${orphan_threshold}s",
                            }
                        );
                    }
                    else {
                        $self->{logger}->info(
                            "Backup job is in progress; still monitoring",
                            {
                                user     => $user,
                                item_uid => $item_uid,
                                elapsed  => "${runtime}s",
                            }
                        );
                    }
                    $item_info->{last_progress_log} = $now;
                }

                # Orphan detection: zero progress for longer than threshold.
                if ( $self->_is_orphaned_item( $all_completed, $runtime, $orphan_threshold, $all_item_jobs ) ) {
                    $self->_handle_orphaned_item( $item_uid, \%active_items, $item_info, $runtime, $all_item_jobs );
                    next;
                }

                # Stall detection: agent has sent at least one progress update but then
                # went silent.  BackupJobProgress.RecievedTime is the server-side unix
                # timestamp of the last progress packet received from the agent.  If it is
                # non-zero but older than the orphan threshold the agent is considered dead
                # even though the job is still in a running state on the server.
                if ( !$all_completed && $all_item_jobs && @$all_item_jobs ) {
                    my $max_received = 0;
                    for my $job (@$all_item_jobs) {
                        my $rt = ( $job->{Progress} // {} )->{RecievedTime} // 0;
                        $max_received = $rt if $rt > $max_received;
                    }

                    if ($max_received) {
                        my $stall_elapsed = $now - $max_received;
                        if ( $stall_elapsed >= $orphan_threshold ) {
                            $logger->info(
                                "Backup job progress has stalled; agent stopped reporting",
                                {
                                    user          => $user,
                                    item_uid      => $item_uid,
                                    stall_elapsed => "${stall_elapsed}s",
                                    threshold     => "${orphan_threshold}s",
                                    last_received => $max_received,
                                }
                            );
                            $self->_handle_orphaned_item( $item_uid, \%active_items, $item_info, $runtime, $all_item_jobs );
                            next;
                        }
                    }
                }

                $item_info->{last_poll} = $now;

                # Mark the item as complete only when ALL destination jobs finish
                if ($all_completed) {
                    print STDERR "DEBUG: ALL backup jobs for Protected Item $item_uid are COMPLETED\n" if $self->{debug};

                    # Check if we have more destinations to backup (sequential mode)
                    my @remaining_destinations = @{ $item_info->{remaining_destinations} || [] };

                    if ( @remaining_destinations && !$any_failed ) {

                        # Check if this is a pkgacct item and if the global lock is still held by us
                        # If another process has taken the lock, we need to defer
                        my $current_item         = $item_info->{item};
                        my $current_system_user  = $current_item->{system_user};
                        my $current_uses_pkgacct = $self->_item_uses_pkgacct( $current_item->{item_type} );

                        my $can_proceed = 1;
                        if ($current_uses_pkgacct) {

                            # Verify we still hold the lock (it should not have been released yet)
                            # Try to acquire it again - if we can't, someone else has it
                            require Whostmgr::CometBackup::Constants;
                            my $still_have_lock = Whostmgr::CometBackup::Constants::try_acquire_pkgacct_lock( $current_system_user, $item_uid );

                            if ($still_have_lock) {

                                # We just re-acquired it (shouldn't happen if we still held it), release immediately
                                Whostmgr::CometBackup::Constants::release_pkgacct_lock($current_system_user);
                                $can_proceed = 0;
                                $logger->warn(
                                    "Lost pkgacct lock between destinations - deferring next destination",
                                    {
                                        current_item_uid => $item_uid,
                                        current_type     => $current_item->{item_type},
                                        system_user      => $current_system_user,
                                    }
                                );
                            }

                            # If we couldn't acquire it, we still hold it, so we can proceed
                        }

                        # Only start next destination if we can proceed
                        if ($can_proceed) {

                            # Start backup for next destination
                            my $next_destination = shift @remaining_destinations;

                            $logger->info(
                                "Starting next destination in sequential backup",
                                {
                                    user      => $user,
                                    item_uid  => $item_uid,
                                    next_dest => $next_destination,
                                    remaining => scalar(@remaining_destinations),
                                }
                            );

                            # Get cached source IDs; connection ID is fetched fresh to avoid stale TargetID
                            my $cached_source_ids = $self->_get_cached_source_ids();

                            # Start backup for next destination
                            eval {
                                require Whostmgr::CometBackup::CometBackupAPI;

                                my $next_result = Whostmgr::CometBackup::CometBackupAPI::backup_protected_item(
                                    protected_item_uid    => $item_uid,
                                    destination_ids       => [ $next_destination, @remaining_destinations ],
                                    run_sequentially      => 1,
                                    _cached_source_ids    => $cached_source_ids,
                                    _cached_connection_id => $self->_get_cached_connection_id(),
                                );

                                if ( $next_result->{success} ) {

                                    # Update the active item with new jobs and remaining destinations
                                    $item_info->{backup_jobs}            = $next_result->{backup_jobs};
                                    $item_info->{remaining_destinations} = $next_result->{remaining_destinations} || [];
                                    $item_info->{start_time}             = time();                                         # Reset start time for timeout tracking
                                    $item_info->{last_poll}              = 0;

                                    $logger->info(
                                        "Successfully started next sequential destination",
                                        {
                                            item_uid          => $item_uid,
                                            destination_count => scalar( @{ $next_result->{backup_jobs} } ),
                                            still_remaining   => scalar( @{ $next_result->{remaining_destinations} || [] } ),
                                        }
                                    );

                                    # Don't mark as complete - continue polling
                                    next;
                                }
                                else {
                                    $logger->error(
                                        "Failed to start next sequential destination",
                                        {
                                            item_uid      => $item_uid,
                                            destination   => $next_destination,
                                            error_message => $next_result->{error_message},
                                        }
                                    );
                                    $any_failed = 1;
                                }
                            };
                            if ($@) {
                                $logger->error(
                                    "Exception starting next sequential destination",
                                    {
                                        item_uid    => $item_uid,
                                        destination => $next_destination,
                                        error       => $@,
                                    }
                                );
                                $any_failed = 1;
                            }
                        }
                        else {

                            # Keep the item waiting with its remaining destinations intact
                            # It will be retried in the next polling cycle when we can reacquire the lock
                            $logger->debug(
                                "Keeping sequential backup waiting - cannot proceed without pkgacct lock",
                                {
                                    user                   => $user,
                                    item_uid               => $item_uid,
                                    system_user            => $current_system_user,
                                    remaining_destinations => scalar(@remaining_destinations),
                                }
                            );

                            # Don't mark as complete - keep polling
                            next;
                        }
                    }

                    if ($any_failed) {
                        my $error_msg = sprintf(
                            "Backup completed with errors: %d succeeded, %d failed",
                            scalar(@completed_jobs),
                            scalar(@failed_jobs)
                        );
                        $logger->warn(
                            "Protected Item backup completed with partial failures",
                            {
                                user      => $user,
                                item_uid  => $item_uid,
                                succeeded => scalar(@completed_jobs),
                                failed    => scalar(@failed_jobs),
                            }
                        );
                        $self->_log_item_complete( $item_info->{item_run_id}, 'partial', $error_msg );

                        # Mark as completed in database even with partial failures
                        require Whostmgr::CometBackup::ProtectedItemMap;
                        my $map = Whostmgr::CometBackup::ProtectedItemMap->new();
                        $map->update_protected_item_status(
                            protected_item_uid => $item_uid,
                            status             => 'partial',
                        );
                    }
                    else {
                        $logger->info(
                            "All backup jobs completed successfully for Protected Item",
                            {
                                user              => $user,
                                item_uid          => $item_uid,
                                destination_count => scalar(@completed_jobs),
                                runtime           => $runtime,
                            }
                        );
                        $self->_log_item_complete( $item_info->{item_run_id}, 'completed' );
                    }

                    # Mark the Protected Item as completed in the database
                    # This prevents it from being picked up again on subsequent queries
                    require Whostmgr::CometBackup::ProtectedItemMap;
                    my $map = Whostmgr::CometBackup::ProtectedItemMap->new();
                    $map->update_protected_item_status(
                        protected_item_uid => $item_uid,
                        status             => 'completed',
                    );

                    $self->_release_pkgacct_lock_if_held( $item_info, 'completed' );
                    delete $active_items{$item_uid};
                    print STDERR "DEBUG: Removed Protected Item $item_uid from active pool, remaining: " . scalar( keys %active_items ) . "\n" if $self->{debug};
                }
                else {
                    print STDERR "DEBUG: Protected Item $item_uid still running, keeping in pool\n" if $self->{debug};
                }
            }
            else {
                print STDERR "DEBUG: Skipping poll for Protected Item $item_uid (last poll: " . ( $now - $item_info->{last_poll} ) . " seconds ago)\n" if $self->{debug};
            }
        }

        # Brief sleep before next iteration
        sleep 1;
    }

    $logger->info(
        "Backup pool management completed",
        {
            user => $user,
        }
    );

    return;
}

=head2 _backup_homedir_item

Backs up a homedir protected item with include/exclude regex patterns.

=cut

sub _backup_homedir_item ( $self, $user, $item, $job_config ) {
    my $logger = $self->{logger};

    my $config           = $item->{configuration};
    my $include_patterns = $config->{include_regex} || [];
    my $exclude_patterns = $config->{exclude_regex} || [];

    $logger->debug(
        "Homedir backup configuration",
        {
            user             => $user,
            include_patterns => $include_patterns,
            exclude_patterns => $exclude_patterns,
            paths            => $config->{paths} || [],
        }
    );

    print STDERR "DEBUG: Child: Homedir backup config - includes: " . scalar(@$include_patterns) . " excludes: " . scalar(@$exclude_patterns) . " for $user\n" if $self->{debug};

    # For homedir backups, we need to create a Comet protected item
    # with the specific include/exclude patterns
    my $homedir      = Cpanel::PwCache::gethomedir($user);
    my $comet_config = {
        type          => 'Files',
        description   => "$user - Home Directory Backup",
        paths         => $config->{paths} || [$homedir],
        include_regex => $include_patterns,
        exclude_regex => $exclude_patterns,
    };

    # Trigger backup via Comet API with special homedir configuration
    my $backup_job_id = $self->_trigger_comet_backup( $user, $item, $job_config, $comet_config );

    # Monitor backup progress
    $self->_monitor_comet_backup( $backup_job_id, $user, $item );

    return;
}

=head2 _backup_standard_item

Backs up a standard protected item (non-homedir) using local scripts.

=cut

sub _backup_standard_item ( $self, $user, $item, $job_config ) {
    my $logger = $self->{logger};

    my $config = $item->{configuration};

    $logger->debug(
        "Standard backup configuration",
        {
            user          => $user,
            item_type     => $item->{type},
            configuration => $config,
        }
    );

    print STDERR "DEBUG: Child: Standard backup for $item->{type} item $item->{name} for $user\n" if $self->{debug};

    # For standard items, these are processed by local scripts
    # The configuration is passed as arguments to the local backup script

    # Trigger backup via Comet API (which may call local scripts)
    my $backup_job_id = $self->_trigger_comet_backup( $user, $item, $job_config );

    # Monitor backup progress
    $self->_monitor_comet_backup( $backup_job_id, $user, $item );

    return;
}

=head2 _trigger_comet_backup

Triggers a backup job in Comet for the specified item.

=cut

sub _trigger_comet_backup ( $self, $comet_user, $item, $job_config, $comet_config = undef ) {
    my $logger = $self->{logger};

    if ( $self->{dryrun} ) {
        $logger->info(
            "DRYRUN: Would trigger Comet backup",
            {
                comet_user         => $comet_user,
                item               => $item->{name},
                item_type          => $item->{type},
                has_special_config => defined $comet_config,
            }
        );

        if ($comet_config) {
            $logger->info(
                "DRYRUN: Special config details",
                {
                    config_type      => $comet_config->{type},
                    paths            => $comet_config->{paths}         || [],
                    include_patterns => $comet_config->{include_regex} || [],
                    exclude_patterns => $comet_config->{exclude_regex} || [],
                }
            );
        }

        # Return simulated job ID for dryrun
        return "dryrun_job_" . time() . "_" . rand(1000);
    }

    # This would make actual API calls to Comet
    # For now, we'll simulate (until actual API integration is implemented)

    if ($comet_config) {
        $logger->debug(
            "Triggering Comet backup with special config",
            {
                comet_user       => $comet_user,
                item             => $item->{name},
                config_type      => $comet_config->{type},
                include_patterns => scalar( @{ $comet_config->{include_regex} || [] } ),
                exclude_patterns => scalar( @{ $comet_config->{exclude_regex} || [] } ),
            }
        );
    }
    else {
        $logger->debug(
            "Triggering Comet backup",
            {
                comet_user => $comet_user,
                item       => $item->{name},
            }
        );
    }

    # Simulate API call
    sleep 1;

    # Return simulated job ID
    return "comet_job_" . time() . "_" . rand(1000);
}

=head2 _monitor_comet_backup

Monitors the progress of a Comet backup job.

=cut

sub _monitor_comet_backup ( $self, $backup_job_id, $user, $item ) {
    my $logger = $self->{logger};

    if ( $self->{dryrun} ) {
        $logger->info(
            "DRYRUN: Would monitor Comet backup",
            {
                backup_job_id => $backup_job_id,
                user          => $user,
                item          => $item->{name},
            }
        );

        # Skip monitoring in dryrun mode
        return;
    }

    $logger->debug(
        "Monitoring Comet backup",
        {
            backup_job_id => $backup_job_id,
            user          => $user,
            item          => $item->{name},
        }
    );

    # This would poll the Comet API for job status
    # For now, we'll simulate monitoring

    my $max_wait  = 300;    # 5 minutes max
    my $wait_time = 0;

    while ( $wait_time < $max_wait ) {
        my $cancel_file = Whostmgr::CometBackup::Constants::get_cancel_file_path( $self->{job_uuid} );
        last if $INTERRUPTED || -f $cancel_file;

        # Simulate checking job status
        sleep 2;
        $wait_time += 2;

        # Simulate completion after a reasonable time (instead of random)
        # Complete after 4-6 seconds for faster testing
        if ( $wait_time >= 4 ) {
            $logger->info(
                "Comet backup completed",
                {
                    backup_job_id => $backup_job_id,
                    user          => $user,
                    item          => $item->{name},
                }
            );
            return;
        }
    }

    if ( $wait_time >= $max_wait ) {
        die "Backup job timed out: $backup_job_id";
    }

    return;
}

=head2 _get_load_average

Gets the current system load average.

=cut

sub _get_load_average ($self) {

    # Read from /proc/loadavg on Linux
    if ( -f '/proc/loadavg' ) {
        my $loadavg = Cpanel::LoadFile::loadfile('/proc/loadavg');
        if ( $loadavg && $loadavg =~ /^(\d+\.\d+)/ ) {
            return $1;
        }
    }

    # Fallback: use uptime command
    my $uptime_result = Cpanel::SafeRun::Object->new_or_die(
        program => '/usr/bin/uptime',
        args    => [],
    );

    if ( $uptime_result->CHILD_ERROR() == 0 ) {
        my $output = $uptime_result->stdout();
        if ( $output =~ /load average:\s*([0-9.]+)/ ) {
            return $1;
        }
    }

    # Default to 0 if we can't determine load
    return 0;
}

=head2 _has_zero_progress

Returns true if every uncompleted server-side job for a Protected Item is in
a running state (status 6000–6999) with zero C<TotalFiles>, C<UploadSize>,
and C<TotalSize>. This is the core progress check used both for periodic
monitoring logs and for orphan detection.

Returns false if the item has already completed, the job list is empty, any
job is not in a running state, or any job has made measurable progress.

Note: for jobs that have made I<some> progress but then stalled, use the
C<BackupJobProgress.RecievedTime> field returned by the Comet API together
with the stall check in C<_manage_backup_pool>.

=cut

sub _has_zero_progress ( $self, $all_completed, $all_item_jobs ) {
    return 0 if $all_completed;
    return 0 unless $all_item_jobs && @$all_item_jobs;

    for my $job (@$all_item_jobs) {
        next unless $job;
        next if $job->{completed};

        my $status       = $job->{Status} // 0;
        my $is_running   = ( $status >= 6000 && $status < 7000 );
        my $has_progress = ( $job->{TotalFiles} || $job->{UploadSize} || $job->{TotalSize} );

        # Any uncompleted job that is not running, or that has made progress,
        # means this item has not stalled.
        return 0 if !$is_running || $has_progress;
    }

    return 1;
}

=head2 _is_orphaned_item

Returns true if a Protected Item's backup has been running for longer than
the orphan threshold with zero progress on all server-side jobs. This
indicates the Comet agent stopped while the backup was running and the jobs
will never complete on their own.

Returns false if the item has already completed, is still within the
threshold window, has any job that made progress, or has any job not in a
running state.

=cut

sub _is_orphaned_item ( $self, $all_completed, $runtime, $threshold, $all_item_jobs ) {
    return 0 if $runtime <= $threshold;
    return $self->_has_zero_progress( $all_completed, $all_item_jobs );
}

=head2 _orphan_active_items

Orphans every non-completed item in C<%$active_items> at C<$now_check>.
Delegates to C<_handle_orphaned_item> for each, which cancels jobs,
records failure, and removes the entry from the pool.

=cut

sub _orphan_active_items ( $self, $now_check, $active_items ) {
    for my $uid ( keys %$active_items ) {
        my $item_info = $active_items->{$uid};
        next if $item_info->{completed};
        my $runtime = $now_check - ( $item_info->{start_time} // $now_check );
        $self->_handle_orphaned_item( $uid, $active_items, $item_info, $runtime, [] );
    }
    return;
}

=head2 _fail_undispatched_items

Fails every item still in C<@$pending_items> because the device has been
offline for C<$offline_secs> seconds. Each item gets a DB run record
(reusing C<_item_run_id> if already set, creating a fresh one otherwise)
and its protected-item status is updated to C<'failed'>. Empties the
array on return.

=cut

sub _fail_undispatched_items ( $self, $pending_items, $user_run_id, $offline_secs ) {
    require Whostmgr::CometBackup::ProtectedItemMap;
    my $map      = Whostmgr::CometBackup::ProtectedItemMap->new();
    my $fail_msg = "Device offline for ${offline_secs}s; backup could not be dispatched";

    for my $item (@$pending_items) {
        my $run_id = $item->{_item_run_id} // $self->_log_item_start( $user_run_id, $item );
        $self->_log_item_complete( $run_id, 'failed', $fail_msg );
        $map->update_protected_item_status(
            protected_item_uid => $item->{protected_item_uid},
            status             => 'failed',
        );
    }

    @$pending_items = ();
    return;
}

=head2 _handle_orphaned_item

Marks a confirmed orphaned Protected Item as failed and removes it from the
active pool. Called when C<_is_orphaned_item> returns true. C<$runtime> is
the elapsed seconds already computed by the caller. Cancels any still-running
Comet jobs so they do not complete silently on the Comet server after we have
declared the item failed, then records the failure, releases any pkgacct lock
we hold, and removes the item from the active pool.

=cut

sub _handle_orphaned_item ( $self, $item_uid, $active_items, $item_info, $runtime, $all_item_jobs ) {
    my $logger  = $self->{logger};
    my $user    = $item_info->{user};
    my $message = "Protected Item backup failed: no progress made for ${runtime}s.";

    $logger->warn(
        "Backup job made no progress for ${runtime}s; marking as failed.",
        {
            user     => $user,
            item_uid => $item_uid,
            elapsed  => $runtime,
        }
    );

    # Cancel any still-running Comet jobs so the Comet server does not
    # produce a snapshot after we have already declared the item failed.
    # Wrap each cancel in an eval — make_server_request can die on a
    # connectivity hiccup, and we MUST still reach _log_item_complete +
    # ProtectedItemMap update + lock release + delete-from-active below,
    # or the item stays stuck in the pool. Matches the idiom used by
    # RestoreParallelHelpers for the same call.
    require Whostmgr::CometBackup::CometBackupAPI;
    for my $job ( @{ $all_item_jobs // [] } ) {
        next unless $job && !$job->{completed};
        my $job_id = $job->{GUID} or next;
        eval { Whostmgr::CometBackup::CometBackupAPI::cancel_comet_job( job_id => $job_id ); };
        if ($@) {
            $logger->warn(
                "Failed to cancel orphaned Comet job; continuing cleanup",
                { user => $user, item_uid => $item_uid, job_id => $job_id, error => "$@" },
            );
        }
    }

    $self->_log_item_complete( $item_info->{item_run_id}, 'failed', $message );

    require Whostmgr::CometBackup::ProtectedItemMap;
    Whostmgr::CometBackup::ProtectedItemMap->new()->update_protected_item_status(
        protected_item_uid => $item_uid,
        status             => 'failed',
    );

    $self->_release_pkgacct_lock_if_held( $item_info, 'orphaned' );
    delete $active_items->{$item_uid};

    return;
}

=head2 _release_pkgacct_lock_if_held

Releases the global pkgacct file lock for the system user associated with
C<$item_info>, if we are currently holding it. Does nothing if we do not
hold the lock. The C<$context> string is included in the debug log so it
is clear why the lock is being released.

=cut

sub _release_pkgacct_lock_if_held ( $self, $item_info, $context ) {
    return unless $item_info->{holds_pkgacct_lock};

    my $item = $item_info->{item};
    require Whostmgr::CometBackup::Constants;
    Whostmgr::CometBackup::Constants::release_pkgacct_lock( $item->{system_user} );

    $self->{logger}->debug(
        "Released global pkgacct lock ($context)",
        {
            system_user => $item->{system_user},
            item_type   => $item->{item_type},
        }
    );

    return;
}

=head2 _reap_completed_processes

Reaps completed child processes and returns count of processes reaped.

=cut

sub _reap_completed_processes ( $self, $active_forks ) {
    my $logger = $self->{logger};
    my $reaped = 0;

    for my $pid ( keys %$active_forks ) {
        my $result = waitpid( $pid, POSIX::WNOHANG() );

        if ( $result == $pid ) {
            my $user_info = delete $active_forks->{$pid};
            my $duration  = Time::HiRes::time() - $user_info->{start_time};

            $logger->info(
                "User backup process completed",
                {
                    user        => $user_info->{user},
                    pid         => $pid,
                    duration    => sprintf( "%.2f", $duration ),
                    exit_status => $? >> 8,
                }
            );

            $reaped++;
        }
    }

    return $reaped;
}

=head2 _terminate_active_processes

Terminates all active child processes.

=cut

sub _terminate_active_processes ( $self, $active_forks ) {
    my $logger = $self->{logger};

    $logger->info(
        "Terminating active processes",
        {
            process_count => scalar( keys %$active_forks ),
        }
    );

    # Send TERM signal to all children
    for my $pid ( keys %$active_forks ) {
        kill 'TERM', $pid;
    }

    # Wait a bit for graceful shutdown
    sleep 5;

    # Send KILL signal to any remaining processes
    for my $pid ( keys %$active_forks ) {
        if ( kill 0, $pid ) {    # Check if still running
            kill 'KILL', $pid;
        }
    }

    # Clean up process table
    for my $pid ( keys %$active_forks ) {
        waitpid( $pid, 0 );
    }

    return;
}

=head2 _handle_interruption

Handles process interruption signals.

=cut

sub _handle_interruption ( $self, $signal ) {
    my $logger = $self->{logger};

    $logger->info( "Received interruption signal", { signal => $signal } );

    # Set global flag
    $INTERRUPTED = 1;

    # Create cancel file for current job if available
    if ( $self->{job_uuid} ) {
        my $cancel_file = Whostmgr::CometBackup::Constants::get_cancel_file_path( $self->{job_uuid} );
        Cpanel::FileUtils::TouchFile::touchfile($cancel_file);
    }

    return;
}

=head2 _cleanup

Performs cleanup operations.

=cut

sub _cleanup ($self) {

    # Remove PID file
    unlink $Whostmgr::CometBackup::Constants::PID_FILE if -f $Whostmgr::CometBackup::Constants::PID_FILE;

    # Remove cancel file for this job (if we were running a single job)
    if ( $self->{job_uuid} ) {
        my $cancel_file = Whostmgr::CometBackup::Constants::get_cancel_file_path( $self->{job_uuid} );
        unlink $cancel_file if -f $cancel_file;
    }

    return;
}

=head2 _ensure_protected_items_for_all_users

Ensures selected users have the required protected items for the backup job.
This performs runtime self-healing for both all_accounts and explicit-account jobs.

=cut

sub _ensure_protected_items_for_all_users ( $self, $job_config, $users ) {
    my $logger   = $self->{logger};
    my $job_uuid = $job_config->{job_id};

    $logger->info(
        "Checking protected items for selected users",
        {
            job_uuid   => $job_uuid,
            user_count => scalar(@$users),
        }
    );

    if ( $self->{dryrun} ) {
        $logger->info(
            "DRYRUN: Would check and create missing protected items for selected users",
            {
                job_uuid   => $job_uuid,
                user_count => scalar(@$users),
            }
        );
        return;
    }

    my $protected_items_map = Whostmgr::CometBackup::ProtectedItemMap->new();
    my @users_missing_items;
    my $total_missing_items = 0;

    # First pass: check which users are missing protected items
    for my $user (@$users) {

        # Get existing protected items for this user and job
        my @existing_items = $protected_items_map->get_protected_items_for_user(
            system_user => $user,
            job_uuid    => $job_uuid,
        );

        # Determine what protected items should exist based on job configuration
        my $expected_item_types = $self->_get_expected_item_types_from_job_config($job_config);

        # Check for missing item types
        my %existing_types = map  { $_->{item_type} => 1 } @existing_items;
        my @missing_types  = grep { !$existing_types{$_} } @$expected_item_types;

        if (@missing_types) {

            push @users_missing_items, {
                user          => $user,
                missing_types => \@missing_types,
            };
            $total_missing_items += @missing_types;

            $logger->info(
                "Found user with missing protected items",
                {
                    user          => $user,
                    missing_types => \@missing_types,
                }
            );
        }
        else {
        }
    }

    my $created_count = 0;

    if (@users_missing_items) {
        $logger->info(
            "Creating missing protected items",
            {
                job_uuid                 => $job_uuid,
                users_with_missing_items => scalar(@users_missing_items),
                total_missing_items      => $total_missing_items,
            }
        );

        # Second pass: create missing protected items for each user
        for my $user_info (@users_missing_items) {
            my $user          = $user_info->{user};
            my $missing_types = $user_info->{missing_types};

            eval {
                # Create the missing protected items
                my @created_uids = $protected_items_map->create_protected_items_for_user_job(
                    job_uuid    => $job_uuid,
                    system_user => $user,
                    item_types  => $missing_types,
                );

                $created_count += @created_uids;

                $logger->info(
                    "Created missing protected items for user",
                    {
                        user          => $user,
                        created_count => scalar(@created_uids),
                        created_uids  => \@created_uids,
                    }
                );

            };

            if ( my $error = $@ ) {
                $logger->error(
                    "Failed to create missing protected items for user",
                    {
                        user  => $user,
                        error => $error,
                    }
                );

                # Continue with other users rather than failing the entire backup
                next;
            }
        }
    }
    else {
        $logger->info(
            "All users already have required protected items",
            {
                job_uuid   => $job_uuid,
                user_count => scalar(@$users),
            }
        );
    }

    # Sync to the Comet server if either:
    #   (a) we just created new items above, or
    #   (b) the local DB still holds unsynced active items from a previous
    #       run (e.g. a post-restore reconcile that inserted rows but did
    #       not sync them).
    # Without (b) the orphan-source check in
    # CometBackupAPI::backup_protected_item would fail every run for those
    # items because _ensure_protected_items_for_all_users only triggers a
    # sync when it creates new rows.
    my $unsynced_count = 0;
    eval { $unsynced_count = $protected_items_map->count_unsynced_active_items(); };
    if ($@) {
        $logger->warn( "Failed to count unsynced protected items", { error => $@ } );
    }

    if ( $created_count > 0 || $unsynced_count > 0 ) {

        $logger->info(
            "Syncing protected items to Comet server",
            {
                job_uuid       => $job_uuid,
                created_count  => $created_count,
                unsynced_count => $unsynced_count,
            }
        );

        eval {
            require Whostmgr::CometBackup::CometBackupAPI::ProtectedItemBatch;
            my $success = Whostmgr::CometBackup::CometBackupAPI::ProtectedItemBatch->sync_protected_items();

            if ($success) {
                $logger->info(
                    "Successfully synced protected items to Comet server",
                    {
                        job_uuid       => $job_uuid,
                        created_count  => $created_count,
                        unsynced_count => $unsynced_count,
                    }
                );
            }
            else {
                $logger->error(
                    "Failed to sync protected items to Comet server",
                    {
                        job_uuid       => $job_uuid,
                        created_count  => $created_count,
                        unsynced_count => $unsynced_count,
                    }
                );

                # Log as error but don't fail the backup - the items exist locally
                # and will be retried on the next sync operation
            }
        };

        if ( my $error = $@ ) {
            $logger->warn(
                "Exception while syncing protected items to Comet server",
                {
                    job_uuid       => $job_uuid,
                    created_count  => $created_count,
                    unsynced_count => $unsynced_count,
                    error          => $error,
                }
            );

            # Log as warning but don't fail the backup - the items exist locally
        }
    }

    $logger->info(
        "Completed protected items check and creation",
        {
            job_uuid                 => $job_uuid,
            users_checked            => scalar(@$users),
            users_with_missing_items => scalar(@users_missing_items),
            total_created_items      => $created_count,
        }
    );

    return;
}

=head2 _get_expected_item_types_from_job_config

Determines what protected item types should exist based on job configuration.

=cut

sub _get_expected_item_types_from_job_config ( $self, $job_config ) {
    my $backup_options = $job_config->{backup_options} || {};
    my @item_types;

    # This logic mirrors the logic in ProtectedItemMap->_get_item_types_from_job
    # but works from the already-loaded job configuration

    # 1. Files and Folders PI for homedir (includes email content filtering)
    # Check for homedir_files first (new explicit flag), fall back to homedir being truthy and not 0
    my $homedir_enabled = 0;
    if ( exists $backup_options->{homedir_files} ) {

        # New explicit flag from UI
        $homedir_enabled = $backup_options->{homedir_files} ? 1 : 0;
    }
    elsif ( exists $backup_options->{homedir} ) {

        # Legacy: check if homedir is explicitly set to 0 (disabled), or truthy value, or hashref (enabled with options)
        my $homedir_val = $backup_options->{homedir};
        if ( !defined $homedir_val || $homedir_val eq '0' || $homedir_val == 0 ) {
            $homedir_enabled = 0;
        }
        else {
            $homedir_enabled = 1;
        }
    }

    if ($homedir_enabled) {
        push @item_types, 'homedir_files_folders';
    }

    # 2. Program Output PI for databases only
    # databases can be: 0 (disabled), 1 (enabled - legacy), or {"mysql":1,"postgresql":0} (new format)
    my $databases_enabled = 0;
    if ( my $db_opt = $backup_options->{databases} ) {
        if ( ref $db_opt eq 'HASH' ) {

            # New format: check if either mysql or postgresql is enabled
            $databases_enabled = ( $db_opt->{mysql} || $db_opt->{postgresql} ) ? 1 : 0;
        }
        else {
            # Legacy boolean or truthy value
            $databases_enabled = $db_opt ? 1 : 0;
        }
    }

    if ($databases_enabled) {
        push @item_types, 'databases_program_output';
    }

    # 3. Program Output PI for panel_config (everything except databases and homedir)
    # This creates a single panel_config item that uses pkgacct with --skiphomedir and --skipacctdb
    if ( $backup_options->{panel_config} ) {
        push @item_types, 'pkgacct_program_output';
    }

    return \@item_types;
}

# Database methods for run logging

=head2 _ensure_run_log_database

Ensures the run log database exists and is properly initialized.

=cut

sub _ensure_run_log_database ($self) {
    my $db_dir = "/var/cpanel/comet";
    Cpanel::Mkdir::ensure_directory_existence_and_mode( $db_dir, 0755 ) unless -d $db_dir;

    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    # Create tables if they don't exist
    $dbh->do(<<~'SQL');
        CREATE TABLE IF NOT EXISTS backup_runs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            run_uuid TEXT NOT NULL UNIQUE,
            job_uuid TEXT NOT NULL,
            start_time INTEGER NOT NULL,
            end_time INTEGER,
            status TEXT DEFAULT 'running',
            error_message TEXT
        )
        SQL

    # Migration: Check for schema updates needed
    # Check current schema of backup_runs table
    my $column_check = $dbh->prepare("PRAGMA table_info(backup_runs)");
    $column_check->execute();
    my $has_created_at = 0;
    my $has_run_uuid   = 0;
    my $has_job_name   = 0;
    while ( my $row = $column_check->fetchrow_hashref() ) {
        if ( $row->{name} eq 'created_at' ) {
            $has_created_at = 1;
        }
        elsif ( $row->{name} eq 'run_uuid' ) {
            $has_run_uuid = 1;
        }
        elsif ( $row->{name} eq 'job_name' ) {
            $has_job_name = 1;
        }
    }
    $column_check->finish();

    # If schema needs updating (missing run_uuid, has created_at, or has job_name), recreate table
    if ( $has_created_at || !$has_run_uuid || $has_job_name ) {
        $dbh->do("BEGIN TRANSACTION");

        # Create new table with correct schema (run_uuid added, job_name removed)
        $dbh->do(<<~'SQL');
            CREATE TABLE backup_runs_new (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                run_uuid TEXT NOT NULL UNIQUE,
                job_uuid TEXT NOT NULL,
                start_time INTEGER NOT NULL,
                end_time INTEGER,
                status TEXT DEFAULT 'running',
                error_message TEXT
            )
            SQL

        # Copy data from old table to new table, generating run_uuid for existing records, dropping job_name
        $dbh->do(<<~'SQL');
            INSERT INTO backup_runs_new (id, run_uuid, job_uuid, start_time, end_time, status, error_message)
            SELECT 
                id, 
                lower(hex(randomblob(4))) || '-' || lower(hex(randomblob(2))) || '-4' || substr(lower(hex(randomblob(2))),2) || '-' || substr('89ab',abs(random()) % 4 + 1, 1) || substr(lower(hex(randomblob(2))),2) || '-' || lower(hex(randomblob(6))) as run_uuid,
                job_uuid, 
                start_time, 
                end_time, 
                status, 
                error_message 
            FROM backup_runs
            SQL

        # Drop old table and rename new table
        $dbh->do("DROP TABLE backup_runs");
        $dbh->do("ALTER TABLE backup_runs_new RENAME TO backup_runs");

        $dbh->do("COMMIT");
    }

    $dbh->do(<<~'SQL');
        CREATE TABLE IF NOT EXISTS user_runs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            run_id INTEGER NOT NULL,
            username TEXT NOT NULL,
            start_time INTEGER NOT NULL,
            end_time INTEGER,
            status TEXT DEFAULT 'running',
            error_message TEXT,
            FOREIGN KEY (run_id) REFERENCES backup_runs(id)
        )
        SQL

    $dbh->do(<<~'SQL');
        CREATE TABLE IF NOT EXISTS item_runs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_run_id INTEGER NOT NULL,
            protected_item_id TEXT NOT NULL,
            item_type TEXT NOT NULL,
            comet_job_id TEXT,
            destination_id TEXT,
            snapshot_id TEXT,
            start_time INTEGER NOT NULL,
            end_time INTEGER,
            status TEXT DEFAULT 'running',
            error_message TEXT,
            job_properties TEXT,
            last_poll_time INTEGER,
            FOREIGN KEY (user_run_id) REFERENCES user_runs(id)
        )
        SQL

    # Create indexes
    $dbh->do("CREATE INDEX IF NOT EXISTS idx_backup_runs_run_uuid ON backup_runs(run_uuid)");
    $dbh->do("CREATE INDEX IF NOT EXISTS idx_backup_runs_job_uuid ON backup_runs(job_uuid)");
    $dbh->do("CREATE INDEX IF NOT EXISTS idx_backup_runs_start_time ON backup_runs(start_time)");
    $dbh->do("CREATE INDEX IF NOT EXISTS idx_user_runs_run_id ON user_runs(run_id)");
    $dbh->do("CREATE INDEX IF NOT EXISTS idx_item_runs_user_run_id ON item_runs(user_run_id)");

    $dbh->disconnect();
    return;
}

=head2 _create_run_log

Creates a new run log entry with a unique run UUID and returns the run ID.

Each execution of a backup job gets a unique run_uuid to distinguish individual
run instances from the job_uuid (which identifies the backup job configuration).
The run_uuid is stored in $self->{run_uuid} for use throughout the backup process.

Note: Job name is not stored in the run log to avoid redundancy - it can be
retrieved from the job configuration database using the job_uuid when needed.

Returns: Database run ID (integer)

=cut

sub _create_run_log ( $self, $job_uuid, $job_config ) {
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    # Use provided run_uuid or generate a new one
    my $run_uuid;
    if ( $self->{run_uuid} ) {
        $run_uuid = $self->{run_uuid};
        print STDERR "DEBUG: Using provided run_uuid: $run_uuid\n" if $self->{debug};
    }
    else {
        # Generate a unique run UUID for this specific backup run instance
        require Data::UUID;
        my $ug = Data::UUID->new();
        $run_uuid = lc( $ug->create_str() );

        # Store the run_uuid in the instance for later use
        $self->{run_uuid} = $run_uuid;
        print STDERR "DEBUG: Generated new run_uuid: $run_uuid\n" if $self->{debug};
    }

    my $sth = $dbh->prepare(<<~'SQL');
        INSERT INTO backup_runs (run_uuid, job_uuid, start_time, status)
        VALUES (?, ?, ?, 'running')
        SQL

    $sth->execute(
        $run_uuid,
        $job_uuid,
        time()
    );
    $sth->finish();

    my $run_id = $dbh->last_insert_id( "", "", "", "" );
    $dbh->disconnect();

    # Update the backup job status to 'running' in backup_jobs table
    my $jobs = Whostmgr::CometBackup::BackupJobs->new();
    $jobs->update_job_status( $job_uuid, 'running' );

    return $run_id;
}

=head2 _complete_run_log

Completes a run log entry and updates the job's last_run_timestamp for cooldown tracking.

=cut

sub _complete_run_log ( $self, $run_id, $status, $error_message = undef ) {
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    # Get job_uuid for this run so we can update last_run_timestamp
    my $job_sth = $dbh->prepare('SELECT job_uuid FROM backup_runs WHERE id = ?');
    $job_sth->execute($run_id);
    my ($job_uuid) = $job_sth->fetchrow_array();
    $job_sth->finish();

    my $current_time = time();
    my $sth          = $dbh->prepare(<<~'SQL');
        UPDATE backup_runs 
        SET end_time = ?, status = ?, error_message = ?
        WHERE id = ?
        SQL

    $sth->execute( $current_time, $status, $error_message, $run_id );
    $sth->finish();
    $dbh->disconnect();

    # Update the job's last_run_timestamp and status in backup_jobs database
    if ($job_uuid) {
        my $jobs = Whostmgr::CometBackup::BackupJobs->new();

        # Update last_run_timestamp for both completed and failed runs
        $jobs->update_last_run_timestamp( $job_uuid, $current_time );

        # Update job status to match run status (completed or failed)
        $jobs->update_job_status( $job_uuid, $status );

        # Clear the current_run_uuid now that the job has finished
        $jobs->clear_current_run_uuid($job_uuid);
    }

    return;
}

=head2 _determine_run_status

Determines if a backup run should be marked as 'completed', 'partial', or 'failed' based on
the status of protected items in that run:
- 'failed' if all protected items have failed, timed out, or are in a partial state
- 'partial' if some (but not all) protected items have failed/timed out/partial
- 'completed' if all protected items succeeded

Returns the appropriate status string.

=cut

sub _determine_run_status ( $self, $run_id ) {
    print STDERR "DEBUG: _determine_run_status: Checking item statuses in run $run_id\n" if $self->{debug};

    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    # Get counts of items by status
    # Need to join item_runs -> user_runs -> backup_runs to filter by run_id
    my $sth = $dbh->prepare(<<~'SQL');
        SELECT 
            COUNT(*) as total_count,
            SUM(CASE WHEN ir.status IN ('failed', 'timeout', 'partial') THEN 1 ELSE 0 END) as failed_count,
            SUM(CASE WHEN ir.status = 'completed' THEN 1 ELSE 0 END) as completed_count
        FROM item_runs ir
        JOIN user_runs ur ON ir.user_run_id = ur.id
        WHERE ur.run_id = ?
        SQL

    $sth->execute($run_id);
    my ( $total_count, $failed_count, $completed_count ) = $sth->fetchrow_array();
    $sth->finish();
    $dbh->disconnect();

    $failed_count    ||= 0;
    $completed_count ||= 0;

    print STDERR "DEBUG: _determine_run_status: Total=$total_count, Failed/Timeout/Partial=$failed_count, Completed=$completed_count\n" if $self->{debug};

    # If no items exist, mark as completed (shouldn't happen, but be safe)
    return 'completed' unless $total_count && $total_count > 0;

    # If all items failed/timed out/partial, mark as failed
    if ( $failed_count > 0 && $completed_count == 0 ) {
        print STDERR "DEBUG: _determine_run_status: All items failed, marking run as 'failed'\n" if $self->{debug};
        return 'failed';
    }

    # If some items failed but some succeeded, mark as partial
    if ( $failed_count > 0 && $completed_count > 0 ) {
        print STDERR "DEBUG: _determine_run_status: Some items failed, marking run as 'partial'\n" if $self->{debug};
        return 'partial';
    }

    # All items succeeded
    print STDERR "DEBUG: _determine_run_status: All items succeeded, marking run as 'completed'\n" if $self->{debug};
    return 'completed';
}

=head2 _log_user_start

Logs the start of a user backup and returns user run ID.

=cut

sub _log_user_start ( $self, $job_uuid, $username ) {
    print STDERR "DEBUG: _log_user_start: About to connect to database for $username\n" if $self->{debug};

    # Get run_id from job_uuid
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    print STDERR "DEBUG: _log_user_start: Database connected for $username\n" if $self->{debug};

    print STDERR "DEBUG: _log_user_start: About to prepare SELECT query for $username\n" if $self->{debug};
    my $run_sth = $dbh->prepare("SELECT id FROM backup_runs WHERE job_uuid = ? AND status = 'running' ORDER BY start_time DESC LIMIT 1");

    print STDERR "DEBUG: _log_user_start: About to execute SELECT query for $username\n" if $self->{debug};
    $run_sth->execute($job_uuid);

    print STDERR "DEBUG: _log_user_start: About to fetch run_id for $username\n" if $self->{debug};
    my ($run_id) = $run_sth->fetchrow_array();
    $run_sth->finish();    # Properly finish the statement handle

    print STDERR "DEBUG: _log_user_start: Got run_id: $run_id for $username\n" if $self->{debug};

    unless ($run_id) {
        die "Could not find active run for job: $job_uuid";
    }

    print STDERR "DEBUG: _log_user_start: About to prepare INSERT query for $username\n" if $self->{debug};
    my $sth = $dbh->prepare(<<~'SQL');
        INSERT INTO user_runs (run_id, username, start_time)
        VALUES (?, ?, ?)
        SQL

    print STDERR "DEBUG: _log_user_start: About to execute INSERT for $username\n" if $self->{debug};
    $sth->execute( $run_id, $username, time() );
    $sth->finish();    # Properly finish the statement handle

    print STDERR "DEBUG: _log_user_start: INSERT completed for $username\n" if $self->{debug};

    my $user_run_id = $dbh->last_insert_id( "", "", "", "" );
    $dbh->disconnect();

    return $user_run_id;
}

=head2 _log_user_complete

Completes a user run log entry.

=cut

sub _log_user_complete ( $self, $user_run_id, $status, $error_message = undef ) {
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    my $sth = $dbh->prepare(<<~'SQL');
        UPDATE user_runs 
        SET end_time = ?, status = ?, error_message = ?
        WHERE id = ?
        SQL

    $sth->execute( time(), $status, $error_message, $user_run_id );
    $sth->finish();
    $dbh->disconnect();

    return;
}

=head2 _log_item_start

Logs the start of a protected item backup and returns item run ID.

=cut

sub _log_item_start ( $self, $user_run_id, $item ) {
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    my $sth = $dbh->prepare(<<~'SQL');
        INSERT INTO item_runs (user_run_id, protected_item_id, item_type, start_time)
        VALUES (?, ?, ?, ?)
        SQL

    $sth->execute(
        $user_run_id,
        $item->{protected_item_uid},    # Use Protected Item UID
        $item->{item_type},
        time()
    );
    $sth->finish();

    my $item_run_id = $dbh->last_insert_id( "", "", "", "" );
    $dbh->disconnect();

    return $item_run_id;
}

=head2 _log_item_complete

Completes a protected item run log entry.

=cut

sub _log_item_complete ( $self, $item_run_id, $status, $error_message = undef ) {
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    my $sth = $dbh->prepare(<<~'SQL');
        UPDATE item_runs 
        SET end_time = ?, status = ?, error_message = ?
        WHERE id = ?
        SQL

    $sth->execute( time(), $status, $error_message, $item_run_id );
    $sth->finish();
    $dbh->disconnect();

    return;
}

=head2 _update_item_run_job_info

Updates an item run with Comet job ID and destination ID.

=cut

sub _update_item_run_job_info ( $self, $item_run_id, $job_id, $destination_id ) {
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    my $sth = $dbh->prepare(<<~'SQL');
        UPDATE item_runs 
        SET comet_job_id = ?, destination_id = ?
        WHERE id = ?
        SQL

    $sth->execute( $job_id, $destination_id, $item_run_id );
    $sth->finish();
    $dbh->disconnect();

    return;
}

=head2 _update_item_run_progress

Updates an item run with progress information from job polling.

=cut

sub _update_item_run_progress ( $self, $item_run_id, $job_properties ) {
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    # Serialize job properties as JSON
    require Cpanel::JSON;
    my $job_properties_json = Cpanel::JSON::Dump($job_properties);

    # Extract snapshot_id from job properties
    my $snapshot_id = $job_properties->{SnapshotID} // undef;

    my $sth = $dbh->prepare(<<~'SQL');
        UPDATE item_runs 
        SET job_properties = ?, snapshot_id = ?, last_poll_time = ?
        WHERE id = ?
        SQL

    $sth->execute( $job_properties_json, $snapshot_id, time(), $item_run_id );
    $sth->finish();
    $dbh->disconnect();

    return;
}

=head2 _get_backup_run_log_file

Gets the path to the dated text log file for this backup run.

Creates the log directory and the log file if they don't exist.

Returns the full path to the log file.

=cut

sub _get_backup_run_log_file ($self) {
    my $job_uuid = $self->{job_uuid};
    my $run_uuid = $self->{run_uuid};
    my $logger   = $self->{logger};

    # Ensure log directory exists
    my $log_dir = $Whostmgr::CometBackup::Constants::TEXT_LOG_DIR;
    unless ( -d $log_dir ) {
        Cpanel::Mkdir::ensure_directory_existence_and_mode( $log_dir, 0755 );
        $logger->info( "Created text log directory", { directory => $log_dir } );
    }

    # Create log file path: {job_uuid}_{run_uuid}.log
    my $log_file = sprintf( '%s/%s_%s.log', $log_dir, $job_uuid, $run_uuid );

    return $log_file;
}

=head2 _append_protected_item_logs_to_file

Fetches log lines from the Comet API for a completed Protected Item and appends them to the dated text log file.

Uses file locking to prevent corruption when multiple items complete simultaneously.

=cut

sub _append_protected_item_logs_to_file ( $self, $comet_job_id, $item_uid, $item_type, $system_user, $destination_id ) {
    my $logger = $self->{logger};

    # Get the log file path
    my $log_file = $self->_get_backup_run_log_file();

    $logger->debug(
        "Appending Protected Item logs to file",
        {
            log_file     => $log_file,
            comet_job_id => $comet_job_id,
            item_uid     => $item_uid,
            item_type    => $item_type,
            system_user  => $system_user,
        }
    );

    # Fetch log entries from Comet API
    my @log_entries;
    eval {
        require Whostmgr::CometBackup::CometBackupAPI;
        my $log_data = Whostmgr::CometBackup::CometBackupAPI::make_server_request(
            "user/web/get-job-log-entries",
            {
                JobID => $comet_job_id,
            }
        );

        # The API returns an array of log entry objects
        if ( $log_data && ref($log_data) eq 'ARRAY' ) {
            @log_entries = @$log_data;
        }
    };

    if ($@) {
        $logger->warn(
            "Failed to fetch log entries from Comet API",
            {
                comet_job_id => $comet_job_id,
                item_uid     => $item_uid,
                error        => $@,
            }
        );

        # Create a placeholder log entry
        @log_entries = (
            {
                Time     => time(),
                Severity => 'WARNING',
                Message  => "Failed to fetch detailed log entries from Comet API: $@",
            }
        );
    }

    # Open log file with exclusive lock and append mode
    require Fcntl;
    my $fh;
    unless ( sysopen( $fh, $log_file, Fcntl::O_WRONLY() | Fcntl::O_APPEND() | Fcntl::O_CREAT(), 0644 ) ) {
        $logger->error(
            "Failed to open log file for writing",
            {
                log_file => $log_file,
                error    => $!,
            }
        );
        return;
    }

    # Set UTF-8 encoding on filehandle to prevent "Wide character" warnings
    binmode( $fh, ':utf8' );

    # Acquire exclusive lock
    unless ( flock( $fh, Fcntl::LOCK_EX() ) ) {
        $logger->error(
            "Failed to acquire lock on log file",
            {
                log_file => $log_file,
                error    => $!,
            }
        );
        close $fh;
        return;
    }

    # Write header for this Protected Item
    my $timestamp = scalar localtime( time() );
    print $fh "=" x 80 . "\n";
    print $fh "Protected Item: $item_uid\n";
    print $fh "Item Type: $item_type\n";
    print $fh "System User: $system_user\n";
    print $fh "Comet Job ID: $comet_job_id\n";
    print $fh "Logged at: $timestamp\n";
    print $fh "Destination ID: $destination_id\n";
    print $fh "=" x 80 . "\n\n";

    # Write log entries
    if (@log_entries) {
        for my $entry (@log_entries) {
            my $entry_time     = $entry->{Time}     || 0;
            my $entry_severity = $entry->{Severity} || 'INFO';
            my $entry_message  = $entry->{Message}  || '';

            my $formatted_time = $entry_time ? scalar localtime($entry_time) : 'Unknown Time';

            print $fh sprintf( "[%s] [%s] %s\n", $formatted_time, $entry_severity, $entry_message );
        }
    }
    else {
        print $fh "[INFO] No detailed log entries available for this Protected Item\n";
    }

    print $fh "\n";

    # Release lock and close file
    flock( $fh, Fcntl::LOCK_UN() );
    close $fh;

    $logger->info(
        "Appended Protected Item logs to file",
        {
            log_file      => $log_file,
            item_uid      => $item_uid,
            entries_count => scalar(@log_entries),
        }
    );

    return;
}

=head2 _append_run_summary_to_file

Appends a summary of the entire backup run to the dated text log file.

This is called after all Protected Items have been processed.

=cut

sub _append_run_summary_to_file ($self) {
    my $logger = $self->{logger};

    # Get the log file path
    my $log_file = $self->_get_backup_run_log_file();
    my $run_uuid = $self->{run_uuid};

    $logger->debug(
        "Appending run summary to file",
        {
            log_file => $log_file,
            run_uuid => $run_uuid,
        }
    );

    # Gather statistics from the database
    my $dbh = Cpanel::DBI::SQLite->connect(
        {
            database   => $Whostmgr::CometBackup::Constants::RUN_LOG_DB,
            RaiseError => 1,
        }
    );

    # Get backup run info
    my $run_sth = $dbh->prepare(<<~'SQL');
        SELECT job_uuid, start_time, end_time, status, error_message
        FROM backup_runs
        WHERE run_uuid = ?
        SQL
    $run_sth->execute($run_uuid);
    my $run_info = $run_sth->fetchrow_hashref();
    $run_sth->finish();

    unless ($run_info) {
        $logger->error( "Failed to get run info for summary", { run_uuid => $run_uuid } );
        $dbh->disconnect();
        return;
    }

    # Get the backup run id for this run_uuid
    my $run_id_sth = $dbh->prepare('SELECT id FROM backup_runs WHERE run_uuid = ?');
    $run_id_sth->execute($run_uuid);
    my ($run_id) = $run_id_sth->fetchrow_array();
    $run_id_sth->finish();

    unless ($run_id) {
        $logger->error( "Failed to get run_id for summary", { run_uuid => $run_uuid } );
        $dbh->disconnect();
        return;
    }

    # Get user statistics
    my $user_stats_sth = $dbh->prepare(<<~'SQL');
        SELECT 
            COUNT(DISTINCT username) as total_users,
            COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed_users,
            COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_users
        FROM user_runs
        WHERE run_id = ?
        SQL
    $user_stats_sth->execute($run_id);
    my $user_stats = $user_stats_sth->fetchrow_hashref();
    $user_stats_sth->finish();

    # Get Protected Item statistics
    my $item_stats_sth = $dbh->prepare(<<~'SQL');
        SELECT 
            COUNT(*) as total_items,
            COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed_items,
            COUNT(CASE WHEN status IN ('failed', 'timeout', 'partial') THEN 1 END) as failed_items,
            COUNT(CASE WHEN status = 'timeout' THEN 1 END) as timeout_items
        FROM item_runs
        WHERE user_run_id IN (SELECT id FROM user_runs WHERE run_id = ?)
        SQL
    $item_stats_sth->execute($run_id);
    my $item_stats = $item_stats_sth->fetchrow_hashref();
    $item_stats_sth->finish();

    # Get failure reasons from item-level run logs for better summary diagnostics.
    my $failure_reasons_sth = $dbh->prepare(<<~'SQL');
        SELECT
            COALESCE(error_message, 'Unknown failure') as error_message,
            COUNT(*) as failure_count
        FROM item_runs
        WHERE user_run_id IN (SELECT id FROM user_runs WHERE run_id = ?)
        AND status IN ('failed', 'timeout', 'partial')
        GROUP BY COALESCE(error_message, 'Unknown failure')
        ORDER BY failure_count DESC, error_message ASC
        SQL
    $failure_reasons_sth->execute($run_id);
    my @failure_reasons;
    while ( my $row = $failure_reasons_sth->fetchrow_hashref() ) {
        push @failure_reasons, $row;
    }
    $failure_reasons_sth->finish();

    # Find users with zero protected items backed up
    my $users_with_zero_items_sth = $dbh->prepare(<<~'SQL');
        SELECT DISTINCT username
        FROM user_runs
        WHERE run_id = ?
        AND id NOT IN (SELECT DISTINCT user_run_id FROM item_runs WHERE user_run_id IN (SELECT id FROM user_runs WHERE run_id = ?))
        ORDER BY username
        SQL
    $users_with_zero_items_sth->execute( $run_id, $run_id );
    my @users_with_zero_items;
    while ( my $row = $users_with_zero_items_sth->fetchrow_hashref() ) {
        push @users_with_zero_items, $row->{username};
    }
    $users_with_zero_items_sth->finish();

    # Find users that had protected items but they were all disabled or filtered out
    # Use ProtectedItemMap to query the protected_items database
    my $protected_items_map = Whostmgr::CometBackup::ProtectedItemMap->new();
    my %users_with_disabled;

    for my $username (@users_with_zero_items) {

        # Get all protected items for this user in this job
        my @protected_items = $protected_items_map->get_protected_items_for_user(
            system_user => $username,
            job_uuid    => $run_info->{job_uuid},
        );

        # Filter for disabled items
        my @disabled_items = grep { $_->{status} && $_->{status} eq 'disabled' } @protected_items;

        if (@disabled_items) {
            my %item_types;
            for my $item (@disabled_items) {
                $item_types{ $item->{item_type} }++;
            }

            my $item_type_summary = join( ', ', map { "$_ ($item_types{$_})" } sort keys %item_types );

            $users_with_disabled{$username} = {
                disabled_count    => scalar(@disabled_items),
                item_type_summary => $item_type_summary,
            };
        }
    }

    # Log users with zero protected items backed up
    if (@users_with_zero_items) {

        # Check if they had disabled items
        my @zero_no_disabled;
        my @zero_with_disabled;

        for my $user (@users_with_zero_items) {
            if ( $users_with_disabled{$user} ) {
                push @zero_with_disabled, $user;
            }
            else {
                push @zero_no_disabled, $user;
            }
        }

        if (@zero_with_disabled) {
            my $disabled_details = '';
            for my $user (@zero_with_disabled) {
                my $disabled_info = $users_with_disabled{$user};
                $disabled_details .= "\n      - $user: " . $disabled_info->{disabled_count} . " disabled items (" . $disabled_info->{item_type_summary} . ")";
            }
            $logger->warn(
                "User(s) with zero protected items backed up (all items disabled)",
                {
                    count   => scalar(@zero_with_disabled),
                    users   => join( ', ', @zero_with_disabled ),
                    details => $disabled_details,
                }
            );
        }
        if (@zero_no_disabled) {
            $logger->warn(
                "User(s) with zero protected items backed up",
                {
                    count => scalar(@zero_no_disabled),
                    users => join( ', ', @zero_no_disabled ),
                }
            );
        }
    }

    $dbh->disconnect();

    # Open log file with exclusive lock and append mode
    require Fcntl;
    my $fh;
    unless ( sysopen( $fh, $log_file, Fcntl::O_WRONLY() | Fcntl::O_APPEND() | Fcntl::O_CREAT(), 0644 ) ) {
        $logger->error(
            "Failed to open log file for writing summary",
            {
                log_file => $log_file,
                error    => $!,
            }
        );
        return;
    }

    # Set UTF-8 encoding on filehandle to prevent "Wide character" warnings
    binmode( $fh, ':utf8' );

    # Acquire exclusive lock
    unless ( flock( $fh, Fcntl::LOCK_EX() ) ) {
        $logger->error(
            "Failed to acquire lock on log file for summary",
            {
                log_file => $log_file,
                error    => $!,
            }
        );
        close $fh;
        return;
    }

    # Write summary
    my $start_time_formatted = scalar localtime( $run_info->{start_time} );
    my $end_time_formatted   = scalar localtime( $run_info->{end_time} || time() );
    my $duration             = ( $run_info->{end_time}                 || time() ) - $run_info->{start_time};
    my $duration_formatted   = sprintf( "%d hours, %d minutes, %d seconds", int( $duration / 3600 ), int( ( $duration % 3600 ) / 60 ), $duration % 60 );

    print $fh "\n";
    print $fh "=" x 80 . "\n";
    print $fh "BACKUP RUN SUMMARY\n";
    print $fh "=" x 80 . "\n";
    print $fh "Run UUID: $run_uuid\n";
    print $fh "Job UUID: $run_info->{job_uuid}\n";
    print $fh "Start Time: $start_time_formatted\n";
    print $fh "End Time: $end_time_formatted\n";
    print $fh "Duration: $duration_formatted\n";
    print $fh "Status: " . ( $run_info->{status} || 'unknown' ) . "\n";

    if ( $run_info->{error_message} ) {
        print $fh "Error: $run_info->{error_message}\n";
    }
    print $fh "\n";
    print $fh "User Account Statistics:\n";
    print $fh "  Total Users: " . ( $user_stats->{total_users}     || 0 ) . "\n";
    print $fh "  Completed:   " . ( $user_stats->{completed_users} || 0 ) . "\n";
    print $fh "  Failed:      " . ( $user_stats->{failed_users}    || 0 ) . "\n";

    if (@users_with_zero_items) {
        my @zero_no_disabled;
        my @zero_with_disabled;

        for my $user (@users_with_zero_items) {
            if ( $users_with_disabled{$user} ) {
                push @zero_with_disabled, $user;
            }
            else {
                push @zero_no_disabled, $user;
            }
        }

        if (@zero_with_disabled) {
            print $fh "  Users with zero protected items (disabled):\n";
            for my $user (@zero_with_disabled) {
                my $disabled_info = $users_with_disabled{$user};
                print $fh "    - $user: " . $disabled_info->{disabled_count} . " disabled items (" . $disabled_info->{item_type_summary} . ")\n";
            }
        }

        if (@zero_no_disabled) {
            print $fh "  Users with zero items (no protected items found): " . join( ', ', @zero_no_disabled ) . "\n";
        }
    }

    print $fh "\n";
    print $fh "Protected Item Statistics:\n";
    print $fh "  Total Items: " . ( $item_stats->{total_items}     || 0 ) . "\n";
    print $fh "  Completed:   " . ( $item_stats->{completed_items} || 0 ) . "\n";
    print $fh "  Failed:      " . ( $item_stats->{failed_items}    || 0 ) . "\n";
    print $fh "  Timed Out:   " . ( $item_stats->{timeout_items}   || 0 ) . "\n";

    if (@failure_reasons) {
        print $fh "\n";
        print $fh "Failure Reason Breakdown:\n";
        for my $reason (@failure_reasons) {
            my $error_message = $reason->{error_message} // 'Unknown failure';
            my $failure_count = $reason->{failure_count} || 0;
            print $fh "  - $failure_count item(s): $error_message\n";
        }

        my ($disabled_dest_reason) = grep { defined $_->{error_message} && $_->{error_message} eq 'All configured destinations are disabled' } @failure_reasons;

        if ($disabled_dest_reason) {
            print $fh "\n";
            print $fh "Explanation:\n";
            print $fh "  Some protected items were skipped because every destination configured in this backup job is currently disabled.\n";
            print $fh "  Re-enable at least one destination for the job, or update the job to use an enabled destination.\n";
        }
    }

    print $fh "=" x 80 . "\n";
    print $fh "\n";

    # Release lock and close file
    flock( $fh, Fcntl::LOCK_UN() );
    close $fh;

    $logger->info(
        "Appended run summary to file",
        {
            log_file    => $log_file,
            run_uuid    => $run_uuid,
            total_users => $user_stats->{total_users} || 0,
            total_items => $item_stats->{total_items} || 0,
        }
    );

    return;
}

1;
