Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion typescript/amp/src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ export const JobStatus = Schema.Literal(
"STOPPED",
"STOP_REQUESTED",
"STOPPING",
"FAILED",
"ERROR",
"FATAL",
"UNKNOWN",
).pipe(
Schema.annotations({
Expand Down
64 changes: 64 additions & 0 deletions typescript/amp/src/api/Admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,33 @@ export type DeployDatasetError =
| Error.SchedulerError
| Error.MetadataDbError

export class ListDatasetJobsResponse extends Schema.Class<ListDatasetJobsResponse>("ListDatasetJobsResponse")({
jobs: Schema.Array(Model.JobInfo),
}) {}

/**
* The list dataset jobs endpoint (GET /datasets/{namespace}/{name}/versions/{revision}/jobs).
*/
const listDatasetJobs = HttpApiEndpoint.get(
"listDatasetJobs",
)`/datasets/${datasetNamespace}/${datasetName}/versions/${datasetRevision}/jobs`
.addError(Error.InvalidRequest)
.addError(Error.DatasetNotFound)
.addError(Error.MetadataDbError)
.addSuccess(ListDatasetJobsResponse)

/**
* Error type for the `listDatasetJobs` endpoint.
*
* - InvalidRequest: Invalid namespace, name, or revision in path parameters.
* - DatasetNotFound: The dataset or revision was not found.
* - MetadataDbError: Database error while listing jobs.
*/
export type ListDatasetJobsError =
| Error.InvalidRequest
| Error.DatasetNotFound
| Error.MetadataDbError

/**
* The get dataset manifest endpoint (GET /datasets/{namespace}/{name}/versions/{revision}/manifest).
*/
Expand Down Expand Up @@ -339,6 +366,7 @@ export class DatasetGroup extends HttpApiGroup.make("dataset")
.add(getDatasetVersion)
.add(deployDataset)
.add(getDatasetManifest)
.add(listDatasetJobs)
{}

/**
Expand Down Expand Up @@ -462,6 +490,20 @@ export class Admin extends Context.Tag("Amp/Admin")<Admin, {
revision: Model.DatasetRevision,
) => Effect.Effect<any, HttpError | GetDatasetManifestError>

/**
* List all jobs for a specific dataset version.
*
* @param namespace The namespace of the dataset.
* @param name The name of the dataset.
* @param revision The version/revision of the dataset.
* @return The list of jobs for the dataset version.
*/
readonly listDatasetJobs: (
namespace: Model.DatasetNamespace,
name: Model.DatasetName,
revision: Model.DatasetRevision,
) => Effect.Effect<ListDatasetJobsResponse, HttpError | ListDatasetJobsError>

/**
* Get a job by ID.
*
Expand Down Expand Up @@ -628,6 +670,27 @@ export const make = Effect.fn(function*(url: string, options?: {
return result
})

const listDatasetJobs = Effect.fn("listDatasetJobs")(function*(
namespace: Model.DatasetNamespace,
name: Model.DatasetName,
revision: Model.DatasetRevision,
) {
const result = yield* client.dataset.listDatasetJobs({
path: {
namespace,
name,
revision,
},
}).pipe(
Effect.catchTags({
HttpApiDecodeError: Effect.die,
ParseError: Effect.die,
}),
)

return result
})

const getJobById = Effect.fn("getJobById")(function*(jobId: number) {
const result = yield* client.job.getJobById({
path: {
Expand Down Expand Up @@ -665,6 +728,7 @@ export const make = Effect.fn(function*(url: string, options?: {
getDatasetVersion,
deployDataset,
getDatasetManifest,
listDatasetJobs,
getJobById,
getOutputSchema,
}
Expand Down
76 changes: 75 additions & 1 deletion typescript/amp/src/cli/commands/deploy.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import * as Command from "@effect/cli/Command"
import * as Options from "@effect/cli/Options"
import * as Prompt from "@effect/cli/Prompt"
import * as Console from "effect/Console"
import type * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Admin from "../../api/Admin.ts"
import * as Auth from "../../Auth.ts"
import * as ManifestContext from "../../ManifestContext.ts"
import * as Model from "../../Model.ts"
import { adminUrl, configFile } from "../common.ts"
import { adminUrl, configFile, ExitCode } from "../common.ts"

const TERMINAL_STATUSES: ReadonlyArray<typeof Model.JobStatus.Type> = ["COMPLETED", "STOPPED", "ERROR", "FATAL"]

export const deploy = Command.make("deploy", {
args: {
Expand All @@ -23,6 +27,11 @@ export const deploy = Command.make("deploy", {
Options.withDescription("The block number to end at, inclusive"),
Options.optional,
),
force: Options.boolean("force").pipe(
Options.withAlias("f"),
Options.withDescription("Skip confirmation prompt when base datasets are in terminal states"),
Options.withDefault(false),
),
adminUrl,
},
}).pipe(
Expand All @@ -42,6 +51,9 @@ export const deploy = Command.make("deploy", {
)),
})

// Check base dataset statuses before deploying
yield* checkBaseDatasetsStatus(admin, dataset, args.force)

// Deploy the dataset
yield* admin.deployDataset(dataset.namespace, dataset.name, dataset.revision, {
endBlock: Option.getOrUndefined(args.endBlock),
Expand All @@ -59,3 +71,65 @@ export const deploy = Command.make("deploy", {
))
),
)

const checkBaseDatasetsStatus = Effect.fn(function*(
admin: Context.Tag.Service<typeof Admin.Admin>,
dataset: Model.DatasetReference,
force: boolean,
) {
// Fetch the manifest to check if this is a derived dataset with dependencies
const manifest = yield* admin.getDatasetManifest(dataset.namespace, dataset.name, dataset.revision).pipe(
Effect.catchAll(() => Effect.succeed(null)),
)

if (manifest === null || manifest.kind !== "manifest") {
return
}

const entries = Object.entries(manifest.dependencies) as Array<[string, Model.DatasetReference]>
if (entries.length === 0) {
return
}

// Check the status of each base dataset's latest job concurrently
const results = yield* Effect.all(
entries.map(([alias, depRef]) =>
admin.listDatasetJobs(depRef.namespace, depRef.name, depRef.revision).pipe(
Effect.catchAll(() => Effect.succeed(null)),
Effect.map((jobsResponse) => {
if (jobsResponse === null || jobsResponse.jobs.length === 0) return null

const latestJob = jobsResponse.jobs.reduce((latest, job) => job.id > latest.id ? job : latest)

if (TERMINAL_STATUSES.includes(latestJob.status)) {
return { alias, reference: depRef.encode(), status: latestJob.status }
}
return null
}),
)
),
{ concurrency: "unbounded" },
)

const terminalDeps = results.filter((r): r is NonNullable<typeof r> => r !== null)

if (terminalDeps.length === 0) return

// Display warning
yield* Console.warn("Warning: The following base datasets have jobs in terminal states:")
for (const dep of terminalDeps) {
yield* Console.warn(` - ${dep.alias} (${dep.reference}): ${dep.status}`)
}

if (force) return

const shouldContinue = yield* Prompt.confirm({
message: "One or more base datasets are in a terminal state. Continue deployment?",
initial: false,
})

if (!shouldContinue) {
yield* Console.log("Deployment cancelled.")
return yield* ExitCode.Zero
}
})
Loading