Pause jobs in a queue managed by SLURN with a Python script.
Features included:
- Launch up to MAX_JOBS = 100 jobs concurrently.
- Re-check the queue every INTERVAL_MINUTES = 10 minutes.
- Track and re-launch failed jobs (up to MAX_RETRIES = 3 times).
- Automatically deletes .out and .err files before relaunching.
- Logs launched, failed, and retried jobs with timestamps.
Required files in /home/jant.encinar/py-linux/15_docking-YAS/
:
_job_list.txt
: list of job base names (without .sbatch)- One .sbatch file per job
- This next script as
job_launcher.py
(or any name you prefer):nohup python3 job_launcher.py &
import subprocess
import time
import os
from datetime import datetime
# CONFIGURATION
MAX_JOBS = 100
INTERVAL_MINUTES = 10
MAX_RETRIES = 3
JOB_LIST_FILE = '/home/jant.encinar/py-linux/15_docking-YAS/_job_list.txt'
JOB_SCRIPTS_DIR = '/home/jant.encinar/py-linux/15_docking-YAS/'
LOG_LAUNCHED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-launched.txt')
LOG_RETRIES = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retried.txt')
LOG_FAILED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-failed.txt')
LOG_RETRY_COUNTER = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retry-counter.txt')
USER = os.getenv("USER")
# ------------------ Helper Functions ------------------ #
def get_final_state(jobid):
result = subprocess.run(['sacct', '-j', jobid, '--format=JobID,State', '--noheader'],
capture_output=True, text=True)
for line in result.stdout.strip().split('\n'):
if jobid in line:
return line.strip().split()[1]
return "UNKNOWN"
def delete_out_err_files(jobname):
for ext in ['.out', '.err']:
file_path = os.path.join(JOB_SCRIPTS_DIR, f"{jobname}{ext}")
if os.path.exists(file_path):
os.remove(file_path)
print(f">> Deleted file: {file_path}")
def load_retry_counter():
counter = {}
if os.path.exists(LOG_RETRY_COUNTER):
with open(LOG_RETRY_COUNTER) as f:
for line in f:
parts = line.strip().split()
if len(parts) == 2:
counter[parts[0]] = int(parts[1])
return counter
def save_retry_counter(counter):
with open(LOG_RETRY_COUNTER, 'w') as f:
for job, count in counter.items():
f.write(f"{job}\t{count}\n")
# ------------------ Main Script ------------------ #
with open(JOB_LIST_FILE) as f:
all_jobs = [line.strip() for line in f if line.strip()]
launched_jobs = {}
if os.path.exists(LOG_LAUNCHED):
with open(LOG_LAUNCHED) as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 3:
jobname, _, jobid = parts[:3]
launched_jobs[jobname] = jobid
retry_counter = load_retry_counter()
while True:
print(">> Checking job queue...")
result = subprocess.run(['squeue', '-u', USER, '--noheader', '--format=%j'],
capture_output=True, text=True)
jobs_in_queue = set(result.stdout.strip().split('\n'))
to_retry = []
for jobname, jobid in list(launched_jobs.items()):
if jobname not in jobs_in_queue:
final_state = get_final_state(jobid)
if final_state not in ("COMPLETED", "RUNNING", "PENDING"):
print(f">> Job {jobname} (ID {jobid}) failed ({final_state}), will be considered for retry.")
to_retry.append((jobname, jobid, final_state))
del launched_jobs[jobname]
active_jobs = len([j for j in jobs_in_queue if j])
available_slots = MAX_JOBS - active_jobs
if available_slots > 0:
new_jobs = []
# Retry jobs if below retry limit
for jobname, jobid, state in to_retry:
attempts = retry_counter.get(jobname, 0)
if attempts < MAX_RETRIES:
new_jobs.append((jobname, True, jobid, state))
else:
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(LOG_FAILED, 'a') as f:
f.write(f"{jobname}\t{date}\t{jobid}\tExceeded max retries ({MAX_RETRIES})\n")
print(f"!! Job {jobname} reached retry limit ({MAX_RETRIES}).")
# Add new jobs from list
remaining_jobs = [j for j in all_jobs if j not in launched_jobs and j not in jobs_in_queue and j not in [x[0] for x in to_retry]]
new_jobs += [(j, False, None, None) for j in remaining_jobs]
# Limit to available job slots
new_jobs = new_jobs[:available_slots]
for jobname, is_retry, old_jobid, old_state in new_jobs:
script_path = os.path.join(JOB_SCRIPTS_DIR, f"{jobname}.sbatch")
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if os.path.exists(script_path):
if is_retry:
delete_out_err_files(jobname)
result = subprocess.run(['sbatch', script_path], capture_output=True, text=True)
if result.returncode == 0:
jobid_line = result.stdout.strip()
try:
new_jobid = jobid_line.split()[-1]
except IndexError:
new_jobid = "UNKNOWN"
launched_jobs[jobname] = new_jobid
with open(LOG_LAUNCHED, 'a') as log:
log.write(f"{jobname}\t{date}\t{new_jobid}\n")
print(f">> Job launched: {jobname} (JobID: {new_jobid})")
if is_retry:
retry_counter[jobname] = retry_counter.get(jobname, 0) + 1
with open(LOG_RETRIES, 'a') as log:
log.write(f"{jobname}\t{date}\t{new_jobid}\t{old_state}\n")
else:
print(f"!! Error launching {jobname}: {result.stderr}")
with open(LOG_FAILED, 'a') as f:
f.write(f"{jobname}\t{date}\tN/A\tError in sbatch\n")
else:
print(f"!! Script not found: {script_path}")
with open(LOG_FAILED, 'a') as f:
f.write(f"{jobname}\t{date}\tN/A\tScript not found\n")
save_retry_counter(retry_counter)
else:
print(">> Job limit reached. Waiting...")
print(f">> Sleeping for {INTERVAL_MINUTES} minutes...\n")
time.sleep(INTERVAL_MINUTES * 60)
Just below an example of .sbatch file per launch a job...
#!/bin/bash
#SBATCH --job-name=dc0001
#SBATCH --cpus-per-task=8
#SBATCH --mem=16G
#SBATCH --time=7-0
#SBATCH --qos=long
#SBATCH --output=/home/jant.encinar/runDC/D0001-MP7-noQ-0001/D0001-MP7-noQ-0001_salida-%j.out
#SBATCH --error=/home/jant.encinar/runDC/D0001-MP7-noQ-0001/D0001-MP7-noQ-0001_error-%j.err
# Configurar las variables de entorno
cd /home/jant.encinar/yasara-25-1-13
export LD_LIBRARY_PATH=/home/jant.encinar/yasara-25-1-13:$LD_LIBRARY_PATH
# Verificar las dependencias
ldd /home/jant.encinar/yasara-25-1-13/yasara
# Ejecutar yasara
/home/jant.encinar/yasara-25-1-13/yasara -txt /home/jant.encinar/py-linux/15_docking-YAS/_dock_runscreening-salva-9-kcal-mol_25-1-13.mcr "MacroTarget='/home/jant.encinar/runDC/D0001-MP7-noQ-0001/D0001-MP7-noQ-0001'"
# Borra ficheros de entrada y salida despues de terminar el trabajo
cd /home/jant.encinar/runDC/D0001-MP7-noQ-0001/
rm *.fld *.gpf *.xyz *.dlg *.pdbqt *.xml *.adr *.sdf *.sce *.err *.out *.map *_bestposes.pdb *_bestposes.sdf *_checkpoint.sce
cd /home/jant.encinar/runDC/
mv /home/jant.encinar/runDC/D0001-MP7-noQ-0001/ /home/jant.encinar/runDC/D0001-MP7-noQ-0001_fin/
New version that consider only jobs in one SLURM partition, for example "GPU" partition.
import subprocess
import time
import os
from datetime import datetime
# CONFIGURATION
MAX_JOBS = 100
INTERVAL_MINUTES = 10
MAX_RETRIES = 3
QUEUE = 'gpu' # <--- Only consider jobs in this SLURM partition
JOB_LIST_FILE = '/path/sbatch_scripts/job_list.txt'
JOB_SCRIPTS_DIR = '/path/sbatch_scripts/'
LOG_LAUNCHED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-launched.txt')
LOG_RETRIES = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retried.txt')
LOG_FAILED = os.path.join(JOB_SCRIPTS_DIR, '_jobs-failed.txt')
LOG_RETRY_COUNTER = os.path.join(JOB_SCRIPTS_DIR, '_jobs-retry-counter.txt')
USER = os.getenv("USER")
# ------------------ Helper Functions ------------------ #
def get_final_state(jobid):
result = subprocess.run(['sacct', '-j', jobid, '--format=JobID,State', '--noheader'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
for line in result.stdout.strip().split('\n'):
if jobid in line:
return line.strip().split()[1]
return "UNKNOWN"
def delete_out_err_files(jobname):
for ext in ['.out', '.err']:
file_path = os.path.join(JOB_SCRIPTS_DIR, f"{jobname}{ext}")
if os.path.exists(file_path):
os.remove(file_path)
print(">> Deleted file: {}".format(file_path))
def load_retry_counter():
counter = {}
if os.path.exists(LOG_RETRY_COUNTER):
with open(LOG_RETRY_COUNTER) as f:
for line in f:
parts = line.strip().split()
if len(parts) == 2:
counter[parts[0]] = int(parts[1])
return counter
def save_retry_counter(counter):
with open(LOG_RETRY_COUNTER, 'w') as f:
for job, count in counter.items():
f.write("{}\t{}\n".format(job, count))
# ------------------ Main Script ------------------ #
with open(JOB_LIST_FILE) as f:
all_jobs = [line.strip() for line in f if line.strip()]
launched_jobs = {}
if os.path.exists(LOG_LAUNCHED):
with open(LOG_LAUNCHED) as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 3:
jobname, _, jobid = parts[:3]
launched_jobs[jobname] = jobid
retry_counter = load_retry_counter()
while True:
print(">> Checking job queue (partition: {})...".format(QUEUE))
result = subprocess.run(['squeue', '-u', USER, '--partition=' + QUEUE, '--noheader', '--format=%j'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
jobs_in_queue = set(result.stdout.strip().split('\n'))
to_retry = []
for jobname, jobid in list(launched_jobs.items()):
if jobname not in jobs_in_queue:
final_state = get_final_state(jobid)
if final_state not in ("COMPLETED", "RUNNING", "PENDING"):
print(">> Job {} (ID {}) failed ({}) and will be considered for retry.".format(jobname, jobid, final_state))
to_retry.append((jobname, jobid, final_state))
del launched_jobs[jobname]
active_jobs = len([j for j in jobs_in_queue if j])
available_slots = MAX_JOBS - active_jobs
if available_slots > 0:
new_jobs = []
for jobname, jobid, state in to_retry:
attempts = retry_counter.get(jobname, 0)
if attempts < MAX_RETRIES:
new_jobs.append((jobname, True, jobid, state))
else:
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(LOG_FAILED, 'a') as f:
f.write("{}\t{}\t{}\tExceeded max retries ({})\n".format(jobname, date, jobid, MAX_RETRIES))
print("!! Job {} exceeded retry limit.".format(jobname))
remaining_jobs = [j for j in all_jobs if j not in launched_jobs and j not in jobs_in_queue and j not in [x[0] for x in to_retry]]
new_jobs += [(j, False, None, None) for j in remaining_jobs]
new_jobs = new_jobs[:available_slots]
for jobname, is_retry, old_jobid, old_state in new_jobs:
script_path = os.path.join(JOB_SCRIPTS_DIR, jobname + '.sbatch')
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if os.path.exists(script_path):
if is_retry:
delete_out_err_files(jobname)
result = subprocess.run(['sbatch', script_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if result.returncode == 0:
jobid_line = result.stdout.strip()
try:
new_jobid = jobid_line.split()[-1]
except IndexError:
new_jobid = "UNKNOWN"
launched_jobs[jobname] = new_jobid
with open(LOG_LAUNCHED, 'a') as log:
log.write("{}\t{}\t{}\n".format(jobname, date, new_jobid))
print(">> Launched job: {} (JobID: {})".format(jobname, new_jobid))
if is_retry:
retry_counter[jobname] = retry_counter.get(jobname, 0) + 1
with open(LOG_RETRIES, 'a') as log:
log.write("{}\t{}\t{}\t{}\n".format(jobname, date, new_jobid, old_state))
else:
print("!! Failed to launch {}: {}".format(jobname, result.stderr))
with open(LOG_FAILED, 'a') as f:
f.write("{}\t{}\tN/A\tError in sbatch\n".format(jobname, date))
else:
print("!! Script not found: {}".format(script_path))
with open(LOG_FAILED, 'a') as f:
f.write("{}\t{}\tN/A\tScript not found\n".format(jobname, date))
save_retry_counter(retry_counter)
else:
print(">> Maximum job limit reached. Waiting...")
print(">> Sleeping for {} minutes...\n".format(INTERVAL_MINUTES))
time.sleep(INTERVAL_MINUTES * 60)