Pause jobs in a queue managed by SLURN with a Python script.


Features included:

  1. Launch up to MAX_JOBS = 100 jobs concurrently.
  2. Re-check the queue every INTERVAL_MINUTES = 10 minutes.
  3. Track and re-launch failed jobs (up to MAX_RETRIES = 3 times).
  4. Automatically deletes .out and .err files before relaunching.
  5. Logs launched, failed, and retried jobs with timestamps.

Required files in /home/jant.encinar/py-linux/15_docking-YAS/:

  1. _job_list.txt: list of job base names (without .sbatch)
  2. One .sbatch file per job
  3. This next script as job_launcher.py (or any name you prefer): nohup python3 job_launcher.py &

import subprocess
import time
import os
from datetime import datetime

# CONFIGURATION
MAX_JOBS = 100
INTERVAL_MINUTES = 10
MAX_RETRIES = 3

JOB_LIST_FILE = '/home/jant.encinar/py-linux/15_docking-YAS/_job_list.txt'
JOB_SCRIPTS_DIR = '/home/jant.encinar/py-linux/15_docking-YAS/'

LOG_LAUNCHED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-launched.txt')
LOG_RETRIES = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retried.txt')
LOG_FAILED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-failed.txt')
LOG_RETRY_COUNTER = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retry-counter.txt')

USER = os.getenv("USER")

# ------------------ Helper Functions ------------------ #

def get_final_state(jobid):
    result = subprocess.run(['sacct', '-j', jobid, '--format=JobID,State', '--noheader'],
                            capture_output=True, text=True)
    for line in result.stdout.strip().split('\n'):
        if jobid in line:
            return line.strip().split()[1]
    return "UNKNOWN"

def delete_out_err_files(jobname):
    for ext in ['.out', '.err']:
        file_path = os.path.join(JOB_SCRIPTS_DIR, f"{jobname}{ext}")
        if os.path.exists(file_path):
            os.remove(file_path)
            print(f">> Deleted file: {file_path}")

def load_retry_counter():
    counter = {}
    if os.path.exists(LOG_RETRY_COUNTER):
        with open(LOG_RETRY_COUNTER) as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) == 2:
                    counter[parts[0]] = int(parts[1])
    return counter

def save_retry_counter(counter):
    with open(LOG_RETRY_COUNTER, 'w') as f:
        for job, count in counter.items():
            f.write(f"{job}\t{count}\n")

# ------------------ Main Script ------------------ #

with open(JOB_LIST_FILE) as f:
    all_jobs = [line.strip() for line in f if line.strip()]

launched_jobs = {}
if os.path.exists(LOG_LAUNCHED):
    with open(LOG_LAUNCHED) as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 3:
                jobname, _, jobid = parts[:3]
                launched_jobs[jobname] = jobid

retry_counter = load_retry_counter()

while True:
    print(">> Checking job queue...")

    result = subprocess.run(['squeue', '-u', USER, '--noheader', '--format=%j'],
                            capture_output=True, text=True)
    jobs_in_queue = set(result.stdout.strip().split('\n'))

    to_retry = []
    for jobname, jobid in list(launched_jobs.items()):
        if jobname not in jobs_in_queue:
            final_state = get_final_state(jobid)
            if final_state not in ("COMPLETED", "RUNNING", "PENDING"):
                print(f">> Job {jobname} (ID {jobid}) failed ({final_state}), will be considered for retry.")
                to_retry.append((jobname, jobid, final_state))
                del launched_jobs[jobname]

    active_jobs = len([j for j in jobs_in_queue if j])
    available_slots = MAX_JOBS - active_jobs

    if available_slots > 0:
        new_jobs = []

        # Retry jobs if below retry limit
        for jobname, jobid, state in to_retry:
            attempts = retry_counter.get(jobname, 0)
            if attempts < MAX_RETRIES:
                new_jobs.append((jobname, True, jobid, state))
            else:
                date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                with open(LOG_FAILED, 'a') as f:
                    f.write(f"{jobname}\t{date}\t{jobid}\tExceeded max retries ({MAX_RETRIES})\n")
                print(f"!! Job {jobname} reached retry limit ({MAX_RETRIES}).")

        # Add new jobs from list
        remaining_jobs = [j for j in all_jobs if j not in launched_jobs and j not in jobs_in_queue and j not in [x[0] for x in to_retry]]
        new_jobs += [(j, False, None, None) for j in remaining_jobs]

        # Limit to available job slots
        new_jobs = new_jobs[:available_slots]

        for jobname, is_retry, old_jobid, old_state in new_jobs:
            script_path = os.path.join(JOB_SCRIPTS_DIR, f"{jobname}.sbatch")
            date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

            if os.path.exists(script_path):
                if is_retry:
                    delete_out_err_files(jobname)

                result = subprocess.run(['sbatch', script_path], capture_output=True, text=True)
                if result.returncode == 0:
                    jobid_line = result.stdout.strip()
                    try:
                        new_jobid = jobid_line.split()[-1]
                    except IndexError:
                        new_jobid = "UNKNOWN"

                    launched_jobs[jobname] = new_jobid
                    with open(LOG_LAUNCHED, 'a') as log:
                        log.write(f"{jobname}\t{date}\t{new_jobid}\n")
                    print(f">> Job launched: {jobname} (JobID: {new_jobid})")

                    if is_retry:
                        retry_counter[jobname] = retry_counter.get(jobname, 0) + 1
                        with open(LOG_RETRIES, 'a') as log:
                            log.write(f"{jobname}\t{date}\t{new_jobid}\t{old_state}\n")
                else:
                    print(f"!! Error launching {jobname}: {result.stderr}")
                    with open(LOG_FAILED, 'a') as f:
                        f.write(f"{jobname}\t{date}\tN/A\tError in sbatch\n")
            else:
                print(f"!! Script not found: {script_path}")
                with open(LOG_FAILED, 'a') as f:
                    f.write(f"{jobname}\t{date}\tN/A\tScript not found\n")

        save_retry_counter(retry_counter)
    else:
        print(">> Job limit reached. Waiting...")

    print(f">> Sleeping for {INTERVAL_MINUTES} minutes...\n")
    time.sleep(INTERVAL_MINUTES * 60)

Just below an example of .sbatch file per launch a job...


#!/bin/bash
#SBATCH --job-name=dc0001
#SBATCH --cpus-per-task=8
#SBATCH --mem=16G
#SBATCH --time=7-0
#SBATCH --qos=long
#SBATCH --output=/home/jant.encinar/runDC/D0001-MP7-noQ-0001/D0001-MP7-noQ-0001_salida-%j.out
#SBATCH --error=/home/jant.encinar/runDC/D0001-MP7-noQ-0001/D0001-MP7-noQ-0001_error-%j.err

# Configurar las variables de entorno
cd /home/jant.encinar/yasara-25-1-13
export LD_LIBRARY_PATH=/home/jant.encinar/yasara-25-1-13:$LD_LIBRARY_PATH

# Verificar las dependencias
ldd /home/jant.encinar/yasara-25-1-13/yasara

# Ejecutar yasara
/home/jant.encinar/yasara-25-1-13/yasara -txt /home/jant.encinar/py-linux/15_docking-YAS/_dock_runscreening-salva-9-kcal-mol_25-1-13.mcr "MacroTarget='/home/jant.encinar/runDC/D0001-MP7-noQ-0001/D0001-MP7-noQ-0001'"

# Borra ficheros de entrada y salida despues de terminar el trabajo 
cd /home/jant.encinar/runDC/D0001-MP7-noQ-0001/
rm *.fld *.gpf *.xyz *.dlg *.pdbqt *.xml *.adr *.sdf *.sce *.err *.out *.map *_bestposes.pdb *_bestposes.sdf *_checkpoint.sce
cd /home/jant.encinar/runDC/
mv /home/jant.encinar/runDC/D0001-MP7-noQ-0001/ /home/jant.encinar/runDC/D0001-MP7-noQ-0001_fin/


New version that consider only jobs in one SLURM partition, for example "GPU" partition.

import subprocess
import time
import os
from datetime import datetime

# CONFIGURATION
MAX_JOBS = 100
INTERVAL_MINUTES = 10
MAX_RETRIES = 3

QUEUE = 'gpu'  # <--- Only consider jobs in this SLURM partition

JOB_LIST_FILE = '/path/sbatch_scripts/job_list.txt'
JOB_SCRIPTS_DIR = '/path/sbatch_scripts/'

LOG_LAUNCHED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-launched.txt')
LOG_RETRIES = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retried.txt')
LOG_FAILED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-failed.txt')
LOG_RETRY_COUNTER = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retry-counter.txt')

USER = os.getenv("USER")

# ------------------ Helper Functions ------------------ #

def get_final_state(jobid):
    result = subprocess.run(['sacct', '-j', jobid, '--format=JobID,State', '--noheader'],
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    for line in result.stdout.strip().split('\n'):
        if jobid in line:
            return line.strip().split()[1]
    return "UNKNOWN"

def delete_out_err_files(jobname):
    for ext in ['.out', '.err']:
        file_path = os.path.join(JOB_SCRIPTS_DIR, f"{jobname}{ext}")
        if os.path.exists(file_path):
            os.remove(file_path)
            print(">> Deleted file: {}".format(file_path))

def load_retry_counter():
    counter = {}
    if os.path.exists(LOG_RETRY_COUNTER):
        with open(LOG_RETRY_COUNTER) as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) == 2:
                    counter[parts[0]] = int(parts[1])
    return counter

def save_retry_counter(counter):
    with open(LOG_RETRY_COUNTER, 'w') as f:
        for job, count in counter.items():
            f.write("{}\t{}\n".format(job, count))

# ------------------ Main Script ------------------ #

with open(JOB_LIST_FILE) as f:
    all_jobs = [line.strip() for line in f if line.strip()]

launched_jobs = {}
if os.path.exists(LOG_LAUNCHED):
    with open(LOG_LAUNCHED) as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 3:
                jobname, _, jobid = parts[:3]
                launched_jobs[jobname] = jobid

retry_counter = load_retry_counter()

while True:
    print(">> Checking job queue (partition: {})...".format(QUEUE))

    result = subprocess.run(['squeue', '-u', USER, '--partition=' + QUEUE, '--noheader', '--format=%j'],
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    jobs_in_queue = set(result.stdout.strip().split('\n'))

    to_retry = []
    for jobname, jobid in list(launched_jobs.items()):
        if jobname not in jobs_in_queue:
            final_state = get_final_state(jobid)
            if final_state not in ("COMPLETED", "RUNNING", "PENDING"):
                print(">> Job {} (ID {}) failed ({}) and will be considered for retry.".format(jobname, jobid, final_state))
                to_retry.append((jobname, jobid, final_state))
                del launched_jobs[jobname]

    active_jobs = len([j for j in jobs_in_queue if j])
    available_slots = MAX_JOBS - active_jobs

    if available_slots > 0:
        new_jobs = []

        for jobname, jobid, state in to_retry:
            attempts = retry_counter.get(jobname, 0)
            if attempts < MAX_RETRIES:
                new_jobs.append((jobname, True, jobid, state))
            else:
                date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                with open(LOG_FAILED, 'a') as f:
                    f.write("{}\t{}\t{}\tExceeded max retries ({})\n".format(jobname, date, jobid, MAX_RETRIES))
                print("!! Job {} exceeded retry limit.".format(jobname))

        remaining_jobs = [j for j in all_jobs if j not in launched_jobs and j not in jobs_in_queue and j not in [x[0] for x in to_retry]]
        new_jobs += [(j, False, None, None) for j in remaining_jobs]

        new_jobs = new_jobs[:available_slots]

        for jobname, is_retry, old_jobid, old_state in new_jobs:
            script_path = os.path.join(JOB_SCRIPTS_DIR, jobname + '.sbatch')
            date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

            if os.path.exists(script_path):
                if is_retry:
                    delete_out_err_files(jobname)

                result = subprocess.run(['sbatch', script_path],
                                        stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                if result.returncode == 0:
                    jobid_line = result.stdout.strip()
                    try:
                        new_jobid = jobid_line.split()[-1]
                    except IndexError:
                        new_jobid = "UNKNOWN"

                    launched_jobs[jobname] = new_jobid
                    with open(LOG_LAUNCHED, 'a') as log:
                        log.write("{}\t{}\t{}\n".format(jobname, date, new_jobid))
                    print(">> Launched job: {} (JobID: {})".format(jobname, new_jobid))

                    if is_retry:
                        retry_counter[jobname] = retry_counter.get(jobname, 0) + 1
                        with open(LOG_RETRIES, 'a') as log:
                            log.write("{}\t{}\t{}\t{}\n".format(jobname, date, new_jobid, old_state))
                else:
                    print("!! Failed to launch {}: {}".format(jobname, result.stderr))
                    with open(LOG_FAILED, 'a') as f:
                        f.write("{}\t{}\tN/A\tError in sbatch\n".format(jobname, date))
            else:
                print("!! Script not found: {}".format(script_path))
                with open(LOG_FAILED, 'a') as f:
                    f.write("{}\t{}\tN/A\tScript not found\n".format(jobname, date))

        save_retry_counter(retry_counter)
    else:
        print(">> Maximum job limit reached. Waiting...")

    print(">> Sleeping for {} minutes...\n".format(INTERVAL_MINUTES))
    time.sleep(INTERVAL_MINUTES * 60)