diff --git a/typescript/amp/src/Model.ts b/typescript/amp/src/Model.ts index 545362e74..fc48ff0c4 100644 --- a/typescript/amp/src/Model.ts +++ b/typescript/amp/src/Model.ts @@ -376,7 +376,8 @@ export const JobStatus = Schema.Literal( "STOPPED", "STOP_REQUESTED", "STOPPING", - "FAILED", + "ERROR", + "FATAL", "UNKNOWN", ).pipe( Schema.annotations({ diff --git a/typescript/amp/src/api/Admin.ts b/typescript/amp/src/api/Admin.ts index fe6b6d9da..d75267047 100644 --- a/typescript/amp/src/api/Admin.ts +++ b/typescript/amp/src/api/Admin.ts @@ -206,6 +206,33 @@ export type DeployDatasetError = | Error.SchedulerError | Error.MetadataDbError +export class ListDatasetJobsResponse extends Schema.Class("ListDatasetJobsResponse")({ + jobs: Schema.Array(Model.JobInfo), +}) {} + +/** + * The list dataset jobs endpoint (GET /datasets/{namespace}/{name}/versions/{revision}/jobs). + */ +const listDatasetJobs = HttpApiEndpoint.get( + "listDatasetJobs", +)`/datasets/${datasetNamespace}/${datasetName}/versions/${datasetRevision}/jobs` + .addError(Error.InvalidRequest) + .addError(Error.DatasetNotFound) + .addError(Error.MetadataDbError) + .addSuccess(ListDatasetJobsResponse) + +/** + * Error type for the `listDatasetJobs` endpoint. + * + * - InvalidRequest: Invalid namespace, name, or revision in path parameters. + * - DatasetNotFound: The dataset or revision was not found. + * - MetadataDbError: Database error while listing jobs. + */ +export type ListDatasetJobsError = + | Error.InvalidRequest + | Error.DatasetNotFound + | Error.MetadataDbError + /** * The get dataset manifest endpoint (GET /datasets/{namespace}/{name}/versions/{revision}/manifest). */ @@ -339,6 +366,7 @@ export class DatasetGroup extends HttpApiGroup.make("dataset") .add(getDatasetVersion) .add(deployDataset) .add(getDatasetManifest) + .add(listDatasetJobs) {} /** @@ -462,6 +490,20 @@ export class Admin extends Context.Tag("Amp/Admin") Effect.Effect + /** + * List all jobs for a specific dataset version. + * + * @param namespace The namespace of the dataset. + * @param name The name of the dataset. + * @param revision The version/revision of the dataset. + * @return The list of jobs for the dataset version. + */ + readonly listDatasetJobs: ( + namespace: Model.DatasetNamespace, + name: Model.DatasetName, + revision: Model.DatasetRevision, + ) => Effect.Effect + /** * Get a job by ID. * @@ -628,6 +670,27 @@ export const make = Effect.fn(function*(url: string, options?: { return result }) + const listDatasetJobs = Effect.fn("listDatasetJobs")(function*( + namespace: Model.DatasetNamespace, + name: Model.DatasetName, + revision: Model.DatasetRevision, + ) { + const result = yield* client.dataset.listDatasetJobs({ + path: { + namespace, + name, + revision, + }, + }).pipe( + Effect.catchTags({ + HttpApiDecodeError: Effect.die, + ParseError: Effect.die, + }), + ) + + return result + }) + const getJobById = Effect.fn("getJobById")(function*(jobId: number) { const result = yield* client.job.getJobById({ path: { @@ -665,6 +728,7 @@ export const make = Effect.fn(function*(url: string, options?: { getDatasetVersion, deployDataset, getDatasetManifest, + listDatasetJobs, getJobById, getOutputSchema, } diff --git a/typescript/amp/src/cli/commands/deploy.ts b/typescript/amp/src/cli/commands/deploy.ts index 9a9df607b..97b355924 100644 --- a/typescript/amp/src/cli/commands/deploy.ts +++ b/typescript/amp/src/cli/commands/deploy.ts @@ -1,6 +1,8 @@ import * as Command from "@effect/cli/Command" import * as Options from "@effect/cli/Options" +import * as Prompt from "@effect/cli/Prompt" import * as Console from "effect/Console" +import type * as Context from "effect/Context" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" import * as Option from "effect/Option" @@ -8,7 +10,9 @@ import * as Admin from "../../api/Admin.ts" import * as Auth from "../../Auth.ts" import * as ManifestContext from "../../ManifestContext.ts" import * as Model from "../../Model.ts" -import { adminUrl, configFile } from "../common.ts" +import { adminUrl, configFile, ExitCode } from "../common.ts" + +const TERMINAL_STATUSES: ReadonlyArray = ["COMPLETED", "STOPPED", "ERROR", "FATAL"] export const deploy = Command.make("deploy", { args: { @@ -23,6 +27,11 @@ export const deploy = Command.make("deploy", { Options.withDescription("The block number to end at, inclusive"), Options.optional, ), + force: Options.boolean("force").pipe( + Options.withAlias("f"), + Options.withDescription("Skip confirmation prompt when base datasets are in terminal states"), + Options.withDefault(false), + ), adminUrl, }, }).pipe( @@ -42,6 +51,9 @@ export const deploy = Command.make("deploy", { )), }) + // Check base dataset statuses before deploying + yield* checkBaseDatasetsStatus(admin, dataset, args.force) + // Deploy the dataset yield* admin.deployDataset(dataset.namespace, dataset.name, dataset.revision, { endBlock: Option.getOrUndefined(args.endBlock), @@ -59,3 +71,65 @@ export const deploy = Command.make("deploy", { )) ), ) + +const checkBaseDatasetsStatus = Effect.fn(function*( + admin: Context.Tag.Service, + dataset: Model.DatasetReference, + force: boolean, +) { + // Fetch the manifest to check if this is a derived dataset with dependencies + const manifest = yield* admin.getDatasetManifest(dataset.namespace, dataset.name, dataset.revision).pipe( + Effect.catchAll(() => Effect.succeed(null)), + ) + + if (manifest === null || manifest.kind !== "manifest") { + return + } + + const entries = Object.entries(manifest.dependencies) as Array<[string, Model.DatasetReference]> + if (entries.length === 0) { + return + } + + // Check the status of each base dataset's latest job concurrently + const results = yield* Effect.all( + entries.map(([alias, depRef]) => + admin.listDatasetJobs(depRef.namespace, depRef.name, depRef.revision).pipe( + Effect.catchAll(() => Effect.succeed(null)), + Effect.map((jobsResponse) => { + if (jobsResponse === null || jobsResponse.jobs.length === 0) return null + + const latestJob = jobsResponse.jobs.reduce((latest, job) => job.id > latest.id ? job : latest) + + if (TERMINAL_STATUSES.includes(latestJob.status)) { + return { alias, reference: depRef.encode(), status: latestJob.status } + } + return null + }), + ) + ), + { concurrency: "unbounded" }, + ) + + const terminalDeps = results.filter((r): r is NonNullable => r !== null) + + if (terminalDeps.length === 0) return + + // Display warning + yield* Console.warn("Warning: The following base datasets have jobs in terminal states:") + for (const dep of terminalDeps) { + yield* Console.warn(` - ${dep.alias} (${dep.reference}): ${dep.status}`) + } + + if (force) return + + const shouldContinue = yield* Prompt.confirm({ + message: "One or more base datasets are in a terminal state. Continue deployment?", + initial: false, + }) + + if (!shouldContinue) { + yield* Console.log("Deployment cancelled.") + return yield* ExitCode.Zero + } +})