diff --git a/helm/values.yaml b/helm/values.yaml index f12a88dd1f..8ff269bc1f 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -30,7 +30,7 @@ ingress: # -- Ingress class name. Omit to use the cluster default. When set to nginx, traefik, # or haproxy, cookie-based session affinity annotations are automatically injected # when replicaCount > 1. - className: "" + className: "nginx" # -- Additional ingress annotations. annotations: {} # -- Hostname. This value is environment-specific. The default value is a placeholder and should be overridden. diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java index 46c1634d1f..5ce979b6e6 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java @@ -10,6 +10,7 @@ import java.util.function.Consumer; import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull; +import static edu.caltech.ipac.firefly.core.background.JobInfo.LIFE_SPAN; import static edu.caltech.ipac.firefly.core.background.JobManager.sendUpdate; import static edu.caltech.ipac.firefly.core.background.JobManager.updateJobInfo; import static edu.caltech.ipac.firefly.server.util.QueryUtil.combineErrorMsg; @@ -50,9 +51,6 @@ default void updateManagedStatus(Consumer func) { default String call() { try { - updateJobInfo(getJobId(), ji -> { - ji.setStartTime(Instant.now()); - }); String results = run(); // the worker is set at onStart(). getWorker().onComplete(); diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java index fa5d9303b3..b28d578f99 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java @@ -33,7 +33,7 @@ public class JobInfo implements Serializable { public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD, SUSPENDED, ARCHIVED, UNKNOWN} - private static final int LIFE_SPAN = AppProperties.getIntProperty("job.lifespan", 60*60*24); // default lifespan in seconds; kill job if exceed + public static final int LIFE_SPAN = AppProperties.getIntProperty("job.lifespan", 60*60*24); // default lifespan in seconds; kill job if exceed // these are uws:job defined properties public static final String JOB_ID = "jobId"; @@ -81,7 +81,7 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD, private Instant creationTime; private Instant startTime; private Instant endTime; - private int executionDuration = LIFE_SPAN; + private int executionDuration; private Instant destruction; private Map parameters = new HashMap<>(); private List results = new ArrayList<>(); diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java index 1823f73d08..5d002283a7 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java @@ -73,7 +73,7 @@ public class JobManager { private static final int KEEP_ALIVE_INTERVAL = AppProperties.getIntProperty("job.keepalive.interval", 60); // default keepalive interval in seconds private static final int WAIT_COMPLETE = AppProperties.getIntProperty("job.wait.complete", 1); // wait for complete after submit in seconds private static final int MAX_PACKAGERS = AppProperties.getIntProperty("job.max.packagers", 10); // maximum number of simultaneous packaging threads - private static final int JOB_TTL_DAYS = AppProperties.getIntProperty("job.ttl.days", 7); // Time in days to keep a job in redis. Default to 7 days. + static final int JOB_TTL_DAYS = AppProperties.getIntProperty("job.ttl.days", 7); // Time in days to keep a job in redis. Default to 7 days. private static final Logger.LoggerImpl LOG = Logger.getLogger(); private static final ExecutorService packagers = Executors.newFixedThreadPool(MAX_PACKAGERS); @@ -147,9 +147,6 @@ public static JobInfo submit(Job job) { RequestOwner reqOwner = ServerContext.getRequestOwner(); String jobId = nextJobId(); updateJobInfo(jobId, true, ji -> { // setting 'true' to add this jobInfo into the datastore - Instant start = Instant.now(); - ji.setCreationTime(start); - ji.setDestruction(start.plus(JOB_TTL_DAYS, ChronoUnit.DAYS)); ji.getMeta().setType(job.getType()); ji.getMeta().setRunHost(hostName()); }); @@ -157,9 +154,7 @@ public static JobInfo submit(Job job) { job.runAs(reqOwner); job.setJobId(jobId); - sendUpdate(jobId, ji -> { - ji.setPhase(QUEUED); - }); + sendUpdate(jobId, null); try { Future future = job.getType() == PACKAGE ? packagers.submit(job) : searches.submit(job); @@ -169,7 +164,7 @@ public static JobInfo submit(Job job) { // it's ok; job may take longer to complete } catch (Exception e) { // job run() handles exceptions; this only happens if submit or future.get() fails - sendUpdate(jobId, (ji) -> { + updateJobInfo(jobId, (ji) -> { ji.setPhase(ERROR); ji.setErrorSummary(new ErrorSummary(e.getMessage())); }); @@ -180,6 +175,7 @@ public static JobInfo submit(Job job) { ji.setOwnerId(reqOwner.getUserKey()); }); } + sendUpdate(jobId, null); // send update to client return getJobInfo(jobId); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java index a8d0409a51..30b3ff0e32 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java @@ -9,8 +9,11 @@ import edu.caltech.ipac.firefly.server.ServerContext; import edu.caltech.ipac.firefly.server.SrvParam; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Map; +import static edu.caltech.ipac.firefly.core.background.JobInfo.LIFE_SPAN; import static edu.caltech.ipac.firefly.core.background.JobManager.*; /** @@ -70,6 +73,11 @@ public void onStart(Worker worker) { this.worker = worker; worker.setJob(this); updateManagedStatus(ji -> { // set these only if it's not a self-managed job + Instant start = Instant.now(); + ji.setCreationTime(start); + ji.setStartTime(Instant.now()); + ji.setExecutionDuration(LIFE_SPAN); + ji.setDestruction(start.plus(JOB_TTL_DAYS, ChronoUnit.DAYS)); ji.setPhase(JobInfo.Phase.EXECUTING); }); sendUpdate(jobId, ji -> { // needs to update clients, because these values may change after the job has submitted diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java b/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java index d14293a857..510e50c5d5 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java @@ -122,9 +122,10 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti jobUrl = submitJob(req); if (jobUrl != null) runJob(jobUrl); updateJob(ji -> { - ji.setPhase(Phase.QUEUED); + ji.setPhase(Phase.PENDING); }); } + if (isEmpty(jobUrl)) throw new DataAccessException("Job URL is missing"); } catch (Exception e) { updateJob(ji -> { ji.setPhase(Phase.ERROR); @@ -137,55 +138,40 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti }); } - int cnt = 0; + long startTimeMs = System.currentTimeMillis(); try { while (true) { - cnt++; - Phase phase = null; - try { - JobInfo uwsJob = getUwsJobInfo(jobUrl); - if (uwsJob == null) { - String msg = "Failed to retrieve UWS job info"; - sendJobUpdate(ji -> { - ji.setPhase(Phase.ERROR); - ji.setErrorSummary(new ErrorSummary(msg)); - }); - throw new DataAccessException(msg); - } + long elapsedMs = System.currentTimeMillis() - startTimeMs; + JobInfo uwsJob = Try.it(() -> getUwsJobInfo(jobUrl)).get(); // + if (uwsJob != null) { updateJob(ji -> ji.copyFrom(uwsJob)); - phase = ifNotNull(uwsJob.getPhase()).getOrElse(Phase.UNKNOWN); - switch (phase) { - case Phase.COMPLETED: - return getResult(req); - case Phase.ABORTED: - throw new DataAccessException.Aborted(); // exit; stop tracking - case Phase.HELD: - throw new DataAccessException("The job is HELD pending execution and will not automatically be executed"); - case Phase.PENDING: - throw new DataAccessException("The job was submitted, but no execution request has been made."); - case Phase.ERROR: - throw new DataAccessException("Job has failed with the error: " + uwsJob.getErrorSummary().message()); - case Phase.UNKNOWN: { - if (cnt > 70) { - updateJob(ji -> { - ji.setPhase(Phase.ABORTED); - ji.setErrorSummary(new ErrorSummary("Job aborted: unknown phase for over 2 minutes")); - }); - throw new DataAccessException("Job aborted: unknown phase for over 2 minutes"); - } + } + Phase phase = uwsJob == null ? Phase.UNKNOWN : uwsJob.getPhase(); + switch (phase) { + case Phase.COMPLETED: + return getResult(req); + case Phase.ABORTED: + throw new DataAccessException.Aborted(); // exit; stop tracking + case Phase.HELD: + failAfterDelay(elapsedMs, "Job on hold", "The job is in the HELD phase pending execution and will not be executed automatically."); + case Phase.PENDING: + failAfterDelay(elapsedMs, "Job not started", "The job was submitted, but no execution request has been made."); + case Phase.ERROR: + if (isTransientError(uwsJob.getErrorSummary())) { + failAfterDelay(elapsedMs, "Job failed with a transient error", uwsJob.getErrorSummary().message()); } - default: - // continue to wait + throwException("Job has failed with the error", uwsJob.getErrorSummary().message()); + case Phase.UNKNOWN: { + failAfterDelay(elapsedMs, "Job aborted", "UNKNOWN phase for over 2 minutes", Phase.ABORTED); } - } catch (Exception e) { - sendJobUpdate(ji -> { - ji.setPhase(Phase.ERROR); - ji.setErrorSummary(new ErrorSummary(e.getMessage())); - }); - throw e; // catch exception to send update job status, then re-throw to let the caller handle it. + default: + // continue to wait } sendJobUpdate(null); // send update to client on each poll. - int wait = cnt < 3 ? 500 : cnt < 20 ? 1000 : 2000; + int wait = elapsedMs < 5_000 ? 500 : + elapsedMs < 30_000 ? 1000 : + elapsedMs < 120_000 ? 2000 : + 5000; TimeUnit.MILLISECONDS.sleep(wait); } } catch (InterruptedException e) { @@ -193,6 +179,31 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti } } + void throwException(String error, String cause) throws DataAccessException { + throw new DataAccessException("%s: %s".formatted(error, cause)); + } + + void failAfterDelay(long elapsedMs, String error, String cause) throws DataAccessException { + failAfterDelay(elapsedMs, error, cause, null); + } + + void failAfterDelay(long elapsedMs, String error, String cause, Phase phaseToSet) throws DataAccessException { + if (elapsedMs > 120_000) { // after 2 minutes, throw exception and stop tracking + if (phaseToSet != null) { + updateJob(ji -> { + ji.setPhase(phaseToSet); + ji.setErrorSummary(new ErrorSummary(cause)); + }); + } + throwException(error, cause); + } + } + + boolean isTransientError(ErrorSummary es) { + String type = ifNotNull(es.type()).getOrElse(""); + return type.equalsIgnoreCase("transient"); + } + /** * Submit the UWS request then execute it if it's not currently executing. * @param req request info needed to create/submit the job @@ -449,9 +460,7 @@ public static JobInfo convertToJobInfo(Document doc) throws Exception { String type = errsum.getAttribute(ERROR_TYPE); String hasDetails = errsum.getAttribute(ERROR_HAS_DETAILS); String msg = getVal(errsum, prefix + ERROR_MSG); - if (!isEmpty(msg)) { - jobInfo.setErrorSummary(new ErrorSummary(msg, type, Boolean.parseBoolean(hasDetails))); - } + jobInfo.setErrorSummary(new ErrorSummary(msg, type, Boolean.parseBoolean(hasDetails))); }); Element progress = ifNotEmpty(getEl(root, prefix + JOB_INFO))