Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ ingress:
# -- Ingress class name. Omit to use the cluster default. When set to nginx, traefik,
# or haproxy, cookie-based session affinity annotations are automatically injected
# when replicaCount > 1.
className: ""
className: "nginx"
# -- Additional ingress annotations.
annotations: {}
# -- Hostname. This value is environment-specific. The default value is a placeholder and should be overridden.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.function.Consumer;

import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull;
import static edu.caltech.ipac.firefly.core.background.JobInfo.LIFE_SPAN;
import static edu.caltech.ipac.firefly.core.background.JobManager.sendUpdate;
import static edu.caltech.ipac.firefly.core.background.JobManager.updateJobInfo;
import static edu.caltech.ipac.firefly.server.util.QueryUtil.combineErrorMsg;
Expand Down Expand Up @@ -50,9 +51,6 @@ default void updateManagedStatus(Consumer<JobInfo> func) {

default String call() {
try {
updateJobInfo(getJobId(), ji -> {
ji.setStartTime(Instant.now());
});
String results = run();
// the worker is set at onStart().
getWorker().onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class JobInfo implements Serializable {

public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD, SUSPENDED, ARCHIVED, UNKNOWN}
private static final int LIFE_SPAN = AppProperties.getIntProperty("job.lifespan", 60*60*24); // default lifespan in seconds; kill job if exceed
public static final int LIFE_SPAN = AppProperties.getIntProperty("job.lifespan", 60*60*24); // default lifespan in seconds; kill job if exceed

// these are uws:job defined properties
public static final String JOB_ID = "jobId";
Expand Down Expand Up @@ -81,7 +81,7 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD,
private Instant creationTime;
private Instant startTime;
private Instant endTime;
private int executionDuration = LIFE_SPAN;
private int executionDuration;
private Instant destruction;
private Map<String, String> parameters = new HashMap<>();
private List<Result> results = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class JobManager {
private static final int KEEP_ALIVE_INTERVAL = AppProperties.getIntProperty("job.keepalive.interval", 60); // default keepalive interval in seconds
private static final int WAIT_COMPLETE = AppProperties.getIntProperty("job.wait.complete", 1); // wait for complete after submit in seconds
private static final int MAX_PACKAGERS = AppProperties.getIntProperty("job.max.packagers", 10); // maximum number of simultaneous packaging threads
private static final int JOB_TTL_DAYS = AppProperties.getIntProperty("job.ttl.days", 7); // Time in days to keep a job in redis. Default to 7 days.
static final int JOB_TTL_DAYS = AppProperties.getIntProperty("job.ttl.days", 7); // Time in days to keep a job in redis. Default to 7 days.

private static final Logger.LoggerImpl LOG = Logger.getLogger();
private static final ExecutorService packagers = Executors.newFixedThreadPool(MAX_PACKAGERS);
Expand Down Expand Up @@ -147,19 +147,14 @@ public static JobInfo submit(Job job) {
RequestOwner reqOwner = ServerContext.getRequestOwner();
String jobId = nextJobId();
updateJobInfo(jobId, true, ji -> { // setting 'true' to add this jobInfo into the datastore
Instant start = Instant.now();
ji.setCreationTime(start);
ji.setDestruction(start.plus(JOB_TTL_DAYS, ChronoUnit.DAYS));
ji.getMeta().setType(job.getType());
ji.getMeta().setRunHost(hostName());
});
// update Job after jobInfo has been created
job.runAs(reqOwner);
job.setJobId(jobId);

sendUpdate(jobId, ji -> {
ji.setPhase(QUEUED);
});
sendUpdate(jobId, null);

try {
Future<String> future = job.getType() == PACKAGE ? packagers.submit(job) : searches.submit(job);
Expand All @@ -169,7 +164,7 @@ public static JobInfo submit(Job job) {
// it's ok; job may take longer to complete
} catch (Exception e) {
// job run() handles exceptions; this only happens if submit or future.get() fails
sendUpdate(jobId, (ji) -> {
updateJobInfo(jobId, (ji) -> {
ji.setPhase(ERROR);
ji.setErrorSummary(new ErrorSummary(e.getMessage()));
});
Expand All @@ -180,6 +175,7 @@ public static JobInfo submit(Job job) {
ji.setOwnerId(reqOwner.getUserKey());
});
}
sendUpdate(jobId, null); // send update to client
return getJobInfo(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
import edu.caltech.ipac.firefly.server.ServerContext;
import edu.caltech.ipac.firefly.server.SrvParam;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;

import static edu.caltech.ipac.firefly.core.background.JobInfo.LIFE_SPAN;
import static edu.caltech.ipac.firefly.core.background.JobManager.*;

/**
Expand Down Expand Up @@ -70,6 +73,11 @@ public void onStart(Worker worker) {
this.worker = worker;
worker.setJob(this);
updateManagedStatus(ji -> { // set these only if it's not a self-managed job
Instant start = Instant.now();
ji.setCreationTime(start);
ji.setStartTime(Instant.now());
ji.setExecutionDuration(LIFE_SPAN);
ji.setDestruction(start.plus(JOB_TTL_DAYS, ChronoUnit.DAYS));
ji.setPhase(JobInfo.Phase.EXECUTING);
});
sendUpdate(jobId, ji -> { // needs to update clients, because these values may change after the job has submitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti
jobUrl = submitJob(req);
if (jobUrl != null) runJob(jobUrl);
updateJob(ji -> {
ji.setPhase(Phase.QUEUED);
ji.setPhase(Phase.PENDING);
});
}
if (isEmpty(jobUrl)) throw new DataAccessException("Job URL is missing");
} catch (Exception e) {
updateJob(ji -> {
ji.setPhase(Phase.ERROR);
Expand All @@ -137,62 +138,72 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti
});
}

int cnt = 0;
long startTimeMs = System.currentTimeMillis();
try {
while (true) {
cnt++;
Phase phase = null;
try {
JobInfo uwsJob = getUwsJobInfo(jobUrl);
if (uwsJob == null) {
String msg = "Failed to retrieve UWS job info";
sendJobUpdate(ji -> {
ji.setPhase(Phase.ERROR);
ji.setErrorSummary(new ErrorSummary(msg));
});
throw new DataAccessException(msg);
}
long elapsedMs = System.currentTimeMillis() - startTimeMs;
JobInfo uwsJob = Try.it(() -> getUwsJobInfo(jobUrl)).get(); //
if (uwsJob != null) {
updateJob(ji -> ji.copyFrom(uwsJob));
phase = ifNotNull(uwsJob.getPhase()).getOrElse(Phase.UNKNOWN);
switch (phase) {
case Phase.COMPLETED:
return getResult(req);
case Phase.ABORTED:
throw new DataAccessException.Aborted(); // exit; stop tracking
case Phase.HELD:
throw new DataAccessException("The job is HELD pending execution and will not automatically be executed");
case Phase.PENDING:
throw new DataAccessException("The job was submitted, but no execution request has been made.");
case Phase.ERROR:
throw new DataAccessException("Job has failed with the error: " + uwsJob.getErrorSummary().message());
case Phase.UNKNOWN: {
if (cnt > 70) {
updateJob(ji -> {
ji.setPhase(Phase.ABORTED);
ji.setErrorSummary(new ErrorSummary("Job aborted: unknown phase for over 2 minutes"));
});
throw new DataAccessException("Job aborted: unknown phase for over 2 minutes");
}
}
Phase phase = uwsJob == null ? Phase.UNKNOWN : uwsJob.getPhase();
switch (phase) {
case Phase.COMPLETED:
return getResult(req);
case Phase.ABORTED:
throw new DataAccessException.Aborted(); // exit; stop tracking
case Phase.HELD:
failAfterDelay(elapsedMs, "Job on hold", "The job is in the HELD phase pending execution and will not be executed automatically.");
case Phase.PENDING:
failAfterDelay(elapsedMs, "Job not started", "The job was submitted, but no execution request has been made.");
case Phase.ERROR:
if (isTransientError(uwsJob.getErrorSummary())) {
failAfterDelay(elapsedMs, "Job failed with a transient error", uwsJob.getErrorSummary().message());
}
default:
// continue to wait
throwException("Job has failed with the error", uwsJob.getErrorSummary().message());
case Phase.UNKNOWN: {
failAfterDelay(elapsedMs, "Job aborted", "UNKNOWN phase for over 2 minutes", Phase.ABORTED);
}
} catch (Exception e) {
sendJobUpdate(ji -> {
ji.setPhase(Phase.ERROR);
ji.setErrorSummary(new ErrorSummary(e.getMessage()));
});
throw e; // catch exception to send update job status, then re-throw to let the caller handle it.
default:
// continue to wait
}
sendJobUpdate(null); // send update to client on each poll.
int wait = cnt < 3 ? 500 : cnt < 20 ? 1000 : 2000;
int wait = elapsedMs < 5_000 ? 500 :
elapsedMs < 30_000 ? 1000 :
elapsedMs < 120_000 ? 2000 :
5000;
TimeUnit.MILLISECONDS.sleep(wait);
}
} catch (InterruptedException e) {
throw new DataAccessException.Aborted();
}
}

void throwException(String error, String cause) throws DataAccessException {
throw new DataAccessException("%s: %s".formatted(error, cause));
}

void failAfterDelay(long elapsedMs, String error, String cause) throws DataAccessException {
failAfterDelay(elapsedMs, error, cause, null);
}

void failAfterDelay(long elapsedMs, String error, String cause, Phase phaseToSet) throws DataAccessException {
if (elapsedMs > 120_000) { // after 2 minutes, throw exception and stop tracking
if (phaseToSet != null) {
updateJob(ji -> {
ji.setPhase(phaseToSet);
ji.setErrorSummary(new ErrorSummary(cause));
});
}
throwException(error, cause);
}
}

boolean isTransientError(ErrorSummary es) {
String type = ifNotNull(es.type()).getOrElse("");
return type.equalsIgnoreCase("transient");
}

/**
* Submit the UWS request then execute it if it's not currently executing.
* @param req request info needed to create/submit the job
Expand Down Expand Up @@ -449,9 +460,7 @@ public static JobInfo convertToJobInfo(Document doc) throws Exception {
String type = errsum.getAttribute(ERROR_TYPE);
String hasDetails = errsum.getAttribute(ERROR_HAS_DETAILS);
String msg = getVal(errsum, prefix + ERROR_MSG);
if (!isEmpty(msg)) {
jobInfo.setErrorSummary(new ErrorSummary(msg, type, Boolean.parseBoolean(hasDetails)));
}
jobInfo.setErrorSummary(new ErrorSummary(msg, type, Boolean.parseBoolean(hasDetails)));
});

Element progress = ifNotEmpty(getEl(root, prefix + JOB_INFO))
Expand Down
Loading