From 79be01dae65ab8617329b52dc8016b25d6665493 Mon Sep 17 00:00:00 2001 From: kptdobe Date: Mon, 16 Mar 2026 11:14:19 +0100 Subject: [PATCH 1/6] feat: restructure versions --- scripts/README.md | 60 +++++++ scripts/load-env.js | 35 ++++ scripts/version-migrate-analyse.js | 125 ++++++++++++++ scripts/version-migrate-run.js | 251 ++++++++++++++++++++++++++++ scripts/version-migrate-validate.js | 118 +++++++++++++ src/storage/version/audit.js | 194 +++++++++++++++++++++ src/storage/version/get.js | 22 ++- src/storage/version/list.js | 121 ++++++++++++-- src/storage/version/paths.js | 46 +++++ src/storage/version/put.js | 69 +++++--- test/storage/version/list.test.js | 163 ++++++++++++++++++ test/storage/version/put.test.js | 157 ++++++++++------- 12 files changed, 1271 insertions(+), 90 deletions(-) create mode 100644 scripts/README.md create mode 100644 scripts/load-env.js create mode 100644 scripts/version-migrate-analyse.js create mode 100644 scripts/version-migrate-run.js create mode 100644 scripts/version-migrate-validate.js create mode 100644 src/storage/version/audit.js create mode 100644 src/storage/version/paths.js diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..6147900 --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,60 @@ +# Version storage migration scripts + +These scripts migrate version data from the legacy layout (`org/.da-versions/fileId/`) to the new layout (`org/repo/.da-versions/fileId/` plus `audit.txt`). + +## Prerequisites + +- Node.js (ESM) +- Environment: set `AEM_BUCKET_NAME`, `ORG`, and S3 credentials. Easiest: copy `.dev.vars` to `.env` or export vars, and ensure `scripts/load-env.js` is imported so `.dev.vars` / `.env` are loaded. + +## Scripts + +### 1. Analyse (`version-migrate-analyse.js`) + +Lists all version folders under `org/.da-versions/` and samples object counts (empty vs with content). + +```bash +ORG=myorg AEM_BUCKET_NAME=mybucket node scripts/version-migrate-analyse.js +# or with .dev.vars present: +node scripts/version-migrate-analyse.js myorg +``` + +### 2. Migrate (`version-migrate-run.js`) + +For each file ID under `org/.da-versions/`: + +- Copies snapshot objects (contentLength > 0) to `org/repo/.da-versions/fileId/versionId.ext` (repo from object metadata `path`). +- Builds `audit.txt`: deduplicates legacy empty-version metadata (same user + 30 min window), **merges with any existing `audit.txt` already in the new path** (hybrid case: project not yet migrated but new PUTs have been writing audit there), then writes the combined, deduplicated result. + +**Dry run (no writes):** + +```bash +DRY_RUN=1 ORG=myorg AEM_BUCKET_NAME=mybucket node scripts/version-migrate-run.js +``` + +**Execute:** + +```bash +ORG=myorg AEM_BUCKET_NAME=mybucket node scripts/version-migrate-run.js +``` + +### 3. Validate (`version-migrate-validate.js`) + +Compares object counts for a single document: legacy prefix vs new prefix. + +```bash +ORG=myorg node scripts/version-migrate-validate.js myorg repo/path/to/file.html +``` + +## Env vars + +| Variable | Description | +|---------------------|--------------------------------| +| `AEM_BUCKET_NAME` | R2/S3 bucket name | +| `ORG` | Org slug (e.g. `kptdobe`) | +| `S3_ACCESS_KEY_ID` | S3/R2 access key | +| `S3_SECRET_ACCESS_KEY` | S3/R2 secret key | +| `S3_DEF_URL` | S3/R2 endpoint URL | +| `DRY_RUN` | Set to `1` to skip writes (migrate script) | + +Load from `.dev.vars` or `.env` by ensuring the script imports `./load-env.js` first (already done in each script). diff --git a/scripts/load-env.js b/scripts/load-env.js new file mode 100644 index 0000000..dfa4e5f --- /dev/null +++ b/scripts/load-env.js @@ -0,0 +1,35 @@ +/* + * Copyright 2025 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +import { readFileSync, existsSync } from 'fs'; +import { fileURLToPath } from 'url'; +import { dirname, join } from 'path'; + +const scriptDir = dirname(fileURLToPath(import.meta.url)); +const root = join(scriptDir, '..'); + +function loadFile(name) { + const path = join(root, name); + if (!existsSync(path)) return; + const content = readFileSync(path, 'utf8'); + for (const line of content.split('\n')) { + const t = line.trim(); + const eq = t.indexOf('='); + if (t && !t.startsWith('#') && eq > 0) { + const key = t.slice(0, eq).trim(); + const value = t.slice(eq + 1).trim(); + if (!process.env[key]) process.env[key] = value; + } + } +} + +loadFile('.dev.vars'); +loadFile('.env'); diff --git a/scripts/version-migrate-analyse.js b/scripts/version-migrate-analyse.js new file mode 100644 index 0000000..fcff0ed --- /dev/null +++ b/scripts/version-migrate-analyse.js @@ -0,0 +1,125 @@ +/* + * Copyright 2025 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +/* eslint-disable no-await-in-loop -- migration script: sequential to avoid rate limits */ +import './load-env.js'; +import { + S3Client, + ListObjectsV2Command, + HeadObjectCommand, +} from '@aws-sdk/client-s3'; + +const Bucket = process.env.AEM_BUCKET_NAME; +const Org = process.env.ORG || process.argv[2]; + +if (!Bucket || !Org) { + console.error('Set AEM_BUCKET_NAME and ORG (or pass org as first arg)'); + process.exit(1); +} + +const config = { + region: 'auto', + endpoint: process.env.S3_DEF_URL, + credentials: { + accessKeyId: process.env.S3_ACCESS_KEY_ID, + secretAccessKey: process.env.S3_SECRET_ACCESS_KEY, + }, +}; +if (process.env.S3_FORCE_PATH_STYLE === 'true') config.forcePathStyle = true; + +const client = new S3Client(config); +const prefix = `${Org}/.da-versions/`; + +async function listFileIds() { + const ids = []; + let token; + do { + const cmd = new ListObjectsV2Command({ + Bucket, + Prefix: prefix, + Delimiter: '/', + MaxKeys: 1000, + ContinuationToken: token, + }); + const resp = await client.send(cmd); + (resp.CommonPrefixes || []).forEach((cp) => { + const p = cp.Prefix.slice(prefix.length).replace(/\/$/, ''); + if (p) ids.push(p); + }); + token = resp.NextContinuationToken; + } while (token); + return ids; +} + +async function countObjects(fileId) { + const listPrefix = `${prefix}${fileId}/`; + let total = 0; + let empty = 0; + let nonEmpty = 0; + let token; + do { + const cmd = new ListObjectsV2Command({ + Bucket, + Prefix: listPrefix, + MaxKeys: 1000, + ContinuationToken: token, + }); + const resp = await client.send(cmd); + for (const obj of resp.Contents || []) { + total += 1; + try { + const head = await client.send(new HeadObjectCommand({ + Bucket, + Key: obj.Key, + })); + const len = head.ContentLength ?? 0; + if (len === 0) empty += 1; + else nonEmpty += 1; + } catch { + total -= 1; + } + } + token = resp.NextContinuationToken; + } while (token); + return { total, empty, nonEmpty }; +} + +async function main() { + console.log(`Org: ${Org}, Bucket: ${Bucket}, prefix: ${prefix}`); + const fileIds = await listFileIds(); + console.log(`File IDs (version folders): ${fileIds.length}`); + + let totalObjects = 0; + let totalEmpty = 0; + let totalNonEmpty = 0; + + const sampleIds = fileIds.slice(0, 50); + for (const fileId of sampleIds) { + const { total, empty, nonEmpty } = await countObjects(fileId); + totalObjects += total; + totalEmpty += empty; + totalNonEmpty += nonEmpty; + if (total > 0) { + console.log(` ${fileId}: total=${total} empty=${empty} nonEmpty=${nonEmpty}`); + } + } + + if (fileIds.length > 50) { + console.log(` ... (showing first 50; run full count by iterating all ${fileIds.length} IDs)`); + } + + console.log(`Sample totals (first ${Math.min(50, fileIds.length)} IDs): ${totalObjects} objects, ${totalEmpty} empty, ${totalNonEmpty} with content`); +} + +main().catch((e) => { + console.error(e); + process.exit(1); +}); diff --git a/scripts/version-migrate-run.js b/scripts/version-migrate-run.js new file mode 100644 index 0000000..b75ec8f --- /dev/null +++ b/scripts/version-migrate-run.js @@ -0,0 +1,251 @@ +/* + * Copyright 2025 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +/* eslint-disable no-await-in-loop, no-continue -- migration: sequential; skip audit.txt */ +import './load-env.js'; +import { + S3Client, + ListObjectsV2Command, + HeadObjectCommand, + GetObjectCommand, + CopyObjectCommand, + PutObjectCommand, +} from '@aws-sdk/client-s3'; + +const Bucket = process.env.AEM_BUCKET_NAME; +const Org = process.env.ORG || process.argv[2]; +const DRY_RUN = process.env.DRY_RUN === '1' || process.env.DRY_RUN === 'true'; + +const AUDIT_WINDOW_MS = 30 * 60 * 1000; + +const config = { + region: 'auto', + endpoint: process.env.S3_DEF_URL, + credentials: { + accessKeyId: process.env.S3_ACCESS_KEY_ID, + secretAccessKey: process.env.S3_SECRET_ACCESS_KEY, + }, +}; +if (process.env.S3_FORCE_PATH_STYLE === 'true') config.forcePathStyle = true; + +const client = new S3Client(config); +const prefix = `${Org}/.da-versions/`; + +function getRepoFromPath(path) { + if (!path || typeof path !== 'string') return ''; + const first = path.split('/')[0]; + return first || ''; +} + +function usersNormalized(usersJson) { + try { + const arr = JSON.parse(usersJson); + const emails = Array.isArray(arr) ? arr.map((u) => u?.email ?? '').filter(Boolean) : []; + return emails.join(',') || usersJson; + } catch { + return usersJson; + } +} + +function dedupeAuditEntries(entries) { + const out = []; + for (const e of entries.sort((a, b) => (a.timestamp || 0) - (b.timestamp || 0))) { + const last = out[out.length - 1]; + const ts = parseInt(e.timestamp, 10) || 0; + const userNorm = usersNormalized(e.users); + const lastTs = parseInt(last?.timestamp, 10) || 0; + const sameUser = last && usersNormalized(last.users) === userNorm; + const inWindow = sameUser && (ts - lastTs <= AUDIT_WINDOW_MS); + if (inWindow) { + out[out.length - 1] = e; + } else { + out.push(e); + } + } + return out; +} + +function formatAuditLine(entry) { + return [entry.timestamp, entry.users, entry.path].join('\t'); +} + +/** Parse one audit line (tab-separated: timestamp, users, path). Same format as audit.js. */ +function parseAuditLine(line) { + const t = line.trim(); + if (!t) return null; + const parts = t.split('\t'); + if (parts.length < 3) return null; + return { + timestamp: parts[0], + users: parts[1], + path: parts.slice(2).join('\t'), + }; +} + +/** In hybrid case, new path may already have audit.txt. Read and merge with legacy entries. */ +async function readExistingAuditInNewPath(repo, fileId) { + const auditKey = `${Org}/${repo}/.da-versions/${fileId}/audit.txt`; + try { + const resp = await client.send(new GetObjectCommand({ Bucket, Key: auditKey })); + const body = resp.Body; + let text = ''; + if (body) { + if (typeof body.transformToByteArray === 'function') { + const bytes = await body.transformToByteArray(); + text = new TextDecoder().decode(bytes); + } else { + const chunks = []; + for await (const chunk of body) chunks.push(chunk); + text = Buffer.concat(chunks).toString('utf8'); + } + } + const lines = text.split('\n').map(parseAuditLine).filter(Boolean); + return lines; + } catch (e) { + if (e?.$metadata?.httpStatusCode === 404 || e?.name === 'NoSuchKey') return []; + throw e; + } +} + +async function listFileIds() { + const ids = []; + let token; + do { + const resp = await client.send(new ListObjectsV2Command({ + Bucket, + Prefix: prefix, + Delimiter: '/', + MaxKeys: 1000, + ContinuationToken: token, + })); + (resp.CommonPrefixes || []).forEach((cp) => { + const p = cp.Prefix.slice(prefix.length).replace(/\/$/, ''); + if (p) ids.push(p); + }); + token = resp.NextContinuationToken; + } while (token); + return ids; +} + +async function migrateFileId(fileId) { + const listPrefix = `${prefix}${fileId}/`; + const objects = []; + let token; + do { + const resp = await client.send(new ListObjectsV2Command({ + Bucket, + Prefix: listPrefix, + MaxKeys: 1000, + ContinuationToken: token, + })); + (resp.Contents || []).forEach((c) => objects.push(c.Key)); + token = resp.NextContinuationToken; + } while (token); + + const snapshots = []; + const auditEntries = []; + + for (const Key of objects) { + const head = await client.send(new HeadObjectCommand({ Bucket, Key })); + const contentLength = head.ContentLength ?? 0; + const meta = head.Metadata || {}; + const path = meta.path || meta.Path || ''; + const timestamp = meta.timestamp || meta.Timestamp || ''; + const users = meta.users || meta.Users || '[{"email":"anonymous"}]'; + const repo = getRepoFromPath(path); + + const name = Key.split('/').pop(); + if (name !== 'audit.txt') { + if (contentLength > 0) { + snapshots.push({ + Key, + repo, + name, + copySource: `${Bucket}/${Key}`, + }); + } else { + auditEntries.push({ timestamp, users, path }); + } + } + } + + const repoSet = new Set(snapshots.map((s) => s.repo).filter(Boolean)); + const repoFromAudit = auditEntries.length ? getRepoFromPath(auditEntries[0]?.path) : ''; + if (repoFromAudit) repoSet.add(repoFromAudit); + let repo = ''; + if (repoSet.size === 1) { + const [firstRepo] = repoSet; + repo = firstRepo; + } else if (repoSet.size > 1) repo = 'unknown'; + + if (!DRY_RUN) { + for (const s of snapshots) { + const destRepo = s.repo || repo; + if (destRepo) { + const destKey = `${Org}/${destRepo}/.da-versions/${fileId}/${s.name}`; + await client.send(new CopyObjectCommand({ + Bucket, + CopySource: s.copySource, + Key: destKey, + })); + } + } + + if (auditEntries.length && repo) { + const dedupedLegacy = dedupeAuditEntries(auditEntries); + const existingInNew = await readExistingAuditInNewPath(repo, fileId); + const combined = [...dedupedLegacy, ...existingInNew].sort( + (a, b) => (parseInt(a.timestamp, 10) || 0) - (parseInt(b.timestamp, 10) || 0), + ); + const deduped = dedupeAuditEntries(combined); + const body = deduped.map(formatAuditLine).join('\n') + (deduped.length ? '\n' : ''); + const auditKey = `${Org}/${repo}/.da-versions/${fileId}/audit.txt`; + await client.send(new PutObjectCommand({ + Bucket, + Key: auditKey, + Body: body, + ContentType: 'text/plain; charset=utf-8', + })); + } + } + + return { + fileId, + snapshots: snapshots.length, + audit: auditEntries.length, + repo: repo || '(none)', + }; +} + +async function main() { + if (!Bucket || !Org) { + console.error('Set AEM_BUCKET_NAME and ORG (or pass org as first arg)'); + process.exit(1); + } + console.log(`Org: ${Org}, Bucket: ${Bucket}, DRY_RUN: ${DRY_RUN}`); + + const fileIds = await listFileIds(); + console.log(`File IDs to process: ${fileIds.length}`); + + for (const fileId of fileIds) { + try { + const result = await migrateFileId(fileId); + console.log(` ${fileId}: ${result.snapshots} snapshots, ${result.audit} audit -> repo ${result.repo}`); + } catch (e) { + console.error(` ${fileId}: error`, e.message); + } + } +} + +main().catch((e) => { + console.error(e); + process.exit(1); +}); diff --git a/scripts/version-migrate-validate.js b/scripts/version-migrate-validate.js new file mode 100644 index 0000000..79b8c3d --- /dev/null +++ b/scripts/version-migrate-validate.js @@ -0,0 +1,118 @@ +/* + * Copyright 2025 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +/* eslint-disable no-await-in-loop -- do/while with token uses await */ +import './load-env.js'; +import { + S3Client, + ListObjectsV2Command, + HeadObjectCommand, +} from '@aws-sdk/client-s3'; + +const Bucket = process.env.AEM_BUCKET_NAME; +const args = process.argv.slice(2); +const Org = process.env.ORG || args[0]; +const Path = args[1] || args[0]; + +const config = { + region: 'auto', + endpoint: process.env.S3_DEF_URL, + credentials: { + accessKeyId: process.env.S3_ACCESS_KEY_ID, + secretAccessKey: process.env.S3_SECRET_ACCESS_KEY, + }, +}; +if (process.env.S3_FORCE_PATH_STYLE === 'true') config.forcePathStyle = true; + +const client = new S3Client(config); + +async function getDocumentMeta(path) { + const key = `${Org}/${path}`; + try { + const head = await client.send(new HeadObjectCommand({ Bucket, Key: key })); + const id = head.Metadata?.id || head.Metadata?.ID; + return { id, status: 200 }; + } catch (e) { + return { status: e.$metadata?.httpStatusCode || 404 }; + } +} + +async function listLegacyVersions(fileId) { + const prefix = `${Org}/.da-versions/${fileId}/`; + const list = []; + let token; + do { + const resp = await client.send(new ListObjectsV2Command({ + Bucket, + Prefix: prefix, + MaxKeys: 500, + ContinuationToken: token, + })); + (resp.Contents || []).forEach((c) => { + const name = c.Key.slice(prefix.length); + if (name && name !== 'audit.txt') list.push(name); + }); + token = resp.NextContinuationToken; + } while (token); + return list; +} + +async function listNewVersions(repo, fileId) { + const prefix = `${Org}/${repo}/.da-versions/${fileId}/`; + const list = []; + let token; + do { + const resp = await client.send(new ListObjectsV2Command({ + Bucket, + Prefix: prefix, + MaxKeys: 500, + ContinuationToken: token, + })); + (resp.Contents || []).forEach((c) => { + const name = c.Key.slice(prefix.length); + if (name && name !== 'audit.txt') list.push(name); + }); + token = resp.NextContinuationToken; + } while (token); + return list; +} + +async function main() { + if (!Bucket || !Org || !Path) { + console.error('Usage: ORG=org node scripts/version-migrate-validate.js [org] '); + console.error('Example: ORG=kptdobe node scripts/version-migrate-validate.js kptdobe test/docs/foo.html'); + process.exit(1); + } + + const meta = await getDocumentMeta(Path); + if (meta.status !== 200 || !meta.id) { + console.error(`Document ${Path} not found or has no id`); + process.exit(1); + } + + const fileId = meta.id; + const repo = Path.includes('/') ? Path.split('/')[0] : ''; + + const legacy = await listLegacyVersions(fileId); + const migrated = repo ? await listNewVersions(repo, fileId) : []; + + console.log(`FileId: ${fileId}, path: ${Path}, repo: ${repo || '(none)'}`); + console.log(`Legacy prefix count: ${legacy.length}`); + console.log(`New prefix count: ${migrated.length}`); + if (legacy.length !== migrated.length) { + console.log(' Mismatch: legacy vs new count differs'); + } +} + +main().catch((e) => { + console.error(e); + process.exit(1); +}); diff --git a/src/storage/version/audit.js b/src/storage/version/audit.js new file mode 100644 index 0000000..855b0b3 --- /dev/null +++ b/src/storage/version/audit.js @@ -0,0 +1,194 @@ +/* + * Copyright 2025 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +import { + S3Client, + GetObjectCommand, + PutObjectCommand, +} from '@aws-sdk/client-s3'; + +import getS3Config from '../utils/config.js'; +import { auditKey } from './paths.js'; + +/** Same-user edits within this window (ms) collapse into one entry (last timestamp). 30 min. */ +export const AUDIT_TIME_WINDOW_MS = 30 * 60 * 1000; + +const SEP = '\t'; + +/** + * Serialize one audit entry to a line (timestamp \t users \t path). + * @param {{ timestamp: string, users: string, path: string }} entry + * @returns {string} + */ +export function formatAuditLine(entry) { + return [entry.timestamp, entry.users, entry.path].join(SEP); +} + +/** + * Parse one audit line to { timestamp, users, path }. + * @param {string} line + * @returns {{ timestamp: string, users: string, path: string }|null} + */ +export function parseAuditLine(line) { + const t = line.trim(); + if (!t) return null; + const parts = t.split(SEP); + if (parts.length < 3) return null; + return { + timestamp: parts[0], + users: parts[1], + path: parts.slice(2).join(SEP), + }; +} + +/** + * Read audit.txt body stream to string. + * @param {import('stream').Readable|ReadableStream|string} body + * @returns {Promise} + */ +async function streamToString(body) { + if (body == null) return ''; + if (typeof body === 'string') return body; + if (typeof body.text === 'function') { + const text = await body.text(); + return text; + } + const chunks = []; + try { + for await (const chunk of body) { + const buf = Buffer.isBuffer(chunk) || chunk instanceof Uint8Array + ? chunk + : Buffer.from(String(chunk)); + chunks.push(buf); + } + } catch { + return ''; + } + return chunks.length ? Buffer.concat(chunks).toString('utf8') : ''; +} + +/** + * Read all audit lines for a file (new structure). + * @param {object} env + * @param {{ bucket: string, org: string }} ctx - bucket, org + * @param {string} repo + * @param {string} fileId + * @returns {Promise<{ timestamp: number, users: object[], path: string }[]>} + */ +export async function readAuditLines(env, ctx, repo, fileId) { + const config = getS3Config(env); + const client = new S3Client(config); + const key = `${ctx.org}/${auditKey(repo, fileId)}`; + try { + const resp = await client.send(new GetObjectCommand({ + Bucket: ctx.bucket, + Key: key, + })); + const text = await streamToString(resp.Body); + const lines = text.split('\n').map(parseAuditLine).filter(Boolean); + return lines.map((line) => ({ + timestamp: parseInt(line.timestamp, 10) || 0, + users: (() => { + try { + return JSON.parse(line.users); + } catch { + return [{ email: 'anonymous' }]; + } + })(), + path: line.path, + })); + } catch (e) { + if (e.$metadata?.httpStatusCode === 404 || e.name === 'NoSuchKey') { + return []; + } + throw e; + } +} + +/** + * Normalize users for same-user comparison (stable string). + * @param {string} usersJson + * @returns {string} + */ +function usersNormalized(usersJson) { + try { + const arr = JSON.parse(usersJson); + const emails = Array.isArray(arr) ? arr.map((u) => u?.email ?? '').filter(Boolean) : []; + return emails.join(',') || usersJson; + } catch { + return usersJson; + } +} + +/** + * Append or update last line in audit.txt (read-modify-write). If last line is same user + * and within AUDIT_TIME_WINDOW_MS, replace that line with the new timestamp; else append. + * @param {object} env + * @param {{ bucket: string, org: string }} ctx - bucket, org + * @param {string} repo + * @param {string} fileId + * @param {{ timestamp: string, users: string, path: string }} entry + * @returns {Promise<{ status: number }>} + */ +export async function writeAuditEntry(env, ctx, repo, fileId, entry) { + try { + const config = getS3Config(env); + const client = new S3Client(config); + const key = `${ctx.org}/${auditKey(repo, fileId)}`; + const nowMs = parseInt(entry.timestamp, 10) || Date.now(); + const entryUsersNorm = usersNormalized(entry.users); + + let existingText = ''; + try { + const getResp = await client.send(new GetObjectCommand({ + Bucket: ctx.bucket, + Key: key, + })); + const body = getResp?.Body; + existingText = body != null ? await streamToString(body) : ''; + } catch (e) { + if (e?.$metadata?.httpStatusCode !== 404 && e?.name !== 'NoSuchKey') { + throw e; + } + } + + const lines = existingText.split('\n').filter((l) => l.trim()); + const lastLine = lines.length ? parseAuditLine(lines[lines.length - 1]) : null; + + let newContent; + if (lastLine && usersNormalized(lastLine.users) === entryUsersNorm) { + const lastTs = parseInt(lastLine.timestamp, 10) || 0; + if (nowMs - lastTs <= AUDIT_TIME_WINDOW_MS) { + lines[lines.length - 1] = formatAuditLine(entry); + newContent = `${lines.join('\n')}\n`; + } else { + const sep = existingText && !existingText.endsWith('\n') ? '\n' : ''; + newContent = `${existingText}${sep}${formatAuditLine(entry)}\n`; + } + } else { + const sep = existingText && !existingText.endsWith('\n') ? '\n' : ''; + newContent = `${existingText}${sep}${formatAuditLine(entry)}\n`; + } + + const resp = await client.send(new PutObjectCommand({ + Bucket: ctx.bucket, + Key: key, + Body: newContent, + ContentType: 'text/plain; charset=utf-8', + })); + + return { status: resp?.$metadata?.httpStatusCode ?? 200 }; + } catch (e) { + // eslint-disable-next-line no-console + console.error('writeAuditEntry failed', e); + return { status: 500 }; + } +} diff --git a/src/storage/version/get.js b/src/storage/version/get.js index 74dfaea..4bc5b6b 100644 --- a/src/storage/version/get.js +++ b/src/storage/version/get.js @@ -10,7 +10,27 @@ * governing permissions and limitations under the License. */ import getObject from '../object/get.js'; +import { versionKeyNew } from './paths.js'; +/** + * GET version: try new path (repo/.da-versions/fileId/versionId.ext) then legacy. + * daCtx.key can be "repo/fileId/versionId.ext" or "fileId/versionId.ext". + */ export async function getObjectVersion(env, { bucket, org, key }, head, conditionalHeaders) { - return getObject(env, { bucket, org, key: `.da-versions/${key}` }, head, conditionalHeaders); + const parts = key.split('/'); + if (parts.length >= 3) { + const [repo, fileId, ...rest] = parts; + const versionFile = rest.join('/'); + const ext = versionFile.split('.').pop(); + const versionId = versionFile.slice(0, -(ext.length + 1)); + const newKey = versionKeyNew(org, repo, fileId, versionId, ext); + const resp = await getObject(env, { bucket, org, key: newKey }, head, conditionalHeaders); + if (resp.status !== 404) { + return resp; + } + } + + // Legacy path; kept during migration, will be removed when all orgs use new structure. + const legacyKey = `.da-versions/${key}`; + return getObject(env, { bucket, org, key: legacyKey }, head, conditionalHeaders); } diff --git a/src/storage/version/list.js b/src/storage/version/list.js index 96560a6..c585e88 100644 --- a/src/storage/version/list.js +++ b/src/storage/version/list.js @@ -12,16 +12,76 @@ import processQueue from '@adobe/helix-shared-process-queue'; import getObject from '../object/get.js'; import listObjects from '../object/list.js'; +import { readAuditLines } from './audit.js'; const MAX_VERSIONS = 500; const CONCURRENCY = 50; -export async function listObjectVersions(env, { bucket, org, key }) { - const current = await getObject(env, { bucket, org, key }, true); - if (current.status === 404 || !current.metadata.id) { - return 404; +/** + * Try new structure: repo/.da-versions/fileId/ (snapshots) + audit.txt. Merge and sort. + * @returns {Promise<{ status: number, body?: string, contentType?: string }|null>} null = fallback + */ +async function listFromNewStructure(env, { bucket, org, key: _ }, fileId, repo) { + const listResp = await listObjects(env, { + bucket, + org, + key: `${repo}/.da-versions/${fileId}`, + }, MAX_VERSIONS); + if (listResp.status !== 200) { + return null; + } + + const list = JSON.parse(listResp.body); + const snapshotEntries = list.filter((e) => !(e.name === 'audit' && e.ext === 'txt')); + + const snapshotVersions = await processQueue(snapshotEntries, async (entry) => { + const entryResp = await getObject(env, { + bucket, + org, + key: `${repo}/.da-versions/${fileId}/${entry.name}.${entry.ext}`, + }, true); + + if (entryResp.status !== 200 || !entryResp.metadata) { + return undefined; + } + + const timestamp = parseInt(entryResp.metadata.timestamp || '0', 10); + const users = JSON.parse(entryResp.metadata.users || '[{"email":"anonymous"}]'); + const { label, path } = entryResp.metadata; + + return { + url: `/versionsource/${org}/${repo}/${fileId}/${entry.name}.${entry.ext}`, + users, + timestamp, + path, + label: label ?? undefined, + }; + }, CONCURRENCY); + + let auditEntries = []; + try { + const lines = await readAuditLines(env, { bucket, org }, repo, fileId); + auditEntries = lines.map(({ users, timestamp, path }) => ({ users, timestamp, path })); + } catch { + // Ignore audit read errors (e.g. 404) } - const resp = await listObjects(env, { bucket, org, key: `.da-versions/${current.metadata.id}` }, MAX_VERSIONS); + + const merged = [...snapshotVersions.filter(Boolean), ...auditEntries]; + merged.sort((a, b) => (b.timestamp || 0) - (a.timestamp || 0)); + + return { + status: 200, + contentType: listResp.contentType, + body: JSON.stringify(merged), + }; +} + +/** + * Legacy: list org/.da-versions/fileId/, HEAD each, return list (empty = audit entries). + * Kept during migration; will be removed when all orgs use the new structure. + */ +async function listFromLegacyStructure(env, { bucket, org, key: _ }, fileId) { + const resp = await listObjects(env, { bucket, org, key: `.da-versions/${fileId}` }, MAX_VERSIONS); if (resp.status !== 200) { return resp; } @@ -31,11 +91,10 @@ export async function listObjectVersions(env, { bucket, org, key }) { const entryResp = await getObject(env, { bucket, org, - key: `.da-versions/${current.metadata.id}/${entry.name}.${entry.ext}`, + key: `.da-versions/${fileId}/${entry.name}.${entry.ext}`, }, true); if (entryResp.status !== 200 || !entryResp.metadata) { - // Some requests might fail for unknown reasons (system busy, etc.) return undefined; } @@ -45,7 +104,7 @@ export async function listObjectVersions(env, { bucket, org, key }) { if (entryResp.contentLength > 0) { return { - url: `/versionsource/${org}/${current.metadata.id}/${entry.name}.${entry.ext}`, + url: `/versionsource/${org}/${fileId}/${entry.name}.${entry.ext}`, users, timestamp, path, @@ -55,8 +114,7 @@ export async function listObjectVersions(env, { bucket, org, key }) { return { users, timestamp, path }; }, CONCURRENCY); - // Filter out undefined entries (failed requests) - const filteredVersions = versions.filter((version) => version !== undefined); + const filteredVersions = versions.filter((v) => v !== undefined); return { status: resp.status, @@ -64,3 +122,46 @@ export async function listObjectVersions(env, { bucket, org, key }) { body: JSON.stringify(filteredVersions), }; } + +/** + * Backward compat: project not yet migrated but new audit entries written to new path. + * When new path has no snapshots (only audit.txt), merge legacy snapshots + legacy audit + * with new audit so list shows both. + */ +function mergeLegacyAndNewResult(legacyResult, newResult) { + if (legacyResult.status !== 200 || !legacyResult.body) return newResult; + const legacyEntries = JSON.parse(legacyResult.body); + const newEntries = JSON.parse(newResult.body); + const merged = [...legacyEntries, ...newEntries]; + merged.sort((a, b) => (b.timestamp || 0) - (a.timestamp || 0)); + return { + status: 200, + contentType: newResult.contentType || legacyResult.contentType, + body: JSON.stringify(merged), + }; +} + +export async function listObjectVersions(env, { bucket, org, key }) { + const current = await getObject(env, { bucket, org, key }, true); + if (current.status === 404 || !current.metadata.id) { + return 404; + } + + const fileId = current.metadata.id; + const repo = key.includes('/') ? key.split('/')[0] : ''; + + if (repo) { + const newResult = await listFromNewStructure(env, { bucket, org, key }, fileId, repo); + if (newResult) { + const newEntries = JSON.parse(newResult.body); + const hasSnapshotsInNew = newEntries.some((e) => e.url); + if (!hasSnapshotsInNew) { + const legacyResult = await listFromLegacyStructure(env, { bucket, org, key }, fileId); + return mergeLegacyAndNewResult(legacyResult, newResult); + } + return newResult; + } + } + + return listFromLegacyStructure(env, { bucket, org, key }, fileId); +} diff --git a/src/storage/version/paths.js b/src/storage/version/paths.js new file mode 100644 index 0000000..d6fa239 --- /dev/null +++ b/src/storage/version/paths.js @@ -0,0 +1,46 @@ +/* + * Copyright 2025 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ + +/** + * New structure: versions live under repo. Path for a version object. + * @param {string} org + * @param {string} repo + * @param {string} fileId + * @param {string} versionId + * @param {string} ext + * @returns {string} key (org/repo/.da-versions/fileId/versionId.ext) + */ +export function versionKeyNew(org, repo, fileId, versionId, ext) { + return `${repo}/.da-versions/${fileId}/${versionId}.${ext}`; +} + +/** + * Legacy structure: versions at org root. Path for a version object. + * @param {string} org + * @param {string} fileId + * @param {string} versionId + * @param {string} ext + * @returns {string} key (.da-versions/fileId/versionId.ext) + */ +export function versionKeyLegacy(org, fileId, versionId, ext) { + return `.da-versions/${fileId}/${versionId}.${ext}`; +} + +/** + * New structure: audit file path for a file (under repo). + * @param {string} repo + * @param {string} fileId + * @returns {string} key (repo/.da-versions/fileId/audit.txt) + */ +export function auditKey(repo, fileId) { + return `${repo}/.da-versions/${fileId}/audit.txt`; +} diff --git a/src/storage/version/put.js b/src/storage/version/put.js index 71e524b..7020e48 100644 --- a/src/storage/version/put.js +++ b/src/storage/version/put.js @@ -20,6 +20,7 @@ import { getUsersForMetadata, ifMatch, ifNoneMatch, } from '../utils/version.js'; import getObject from '../object/get.js'; +import { writeAuditEntry } from './audit.js'; export function getContentLength(body) { if (body === undefined) { @@ -35,14 +36,24 @@ export function getContentLength(body) { return undefined; } +/** + * @param {object} config - S3 config + * @param {object} params - Bucket, Org, Repo (optional), Body, ID, Version, Ext, + * Metadata, ContentLength, ContentType + * @param {boolean} [noneMatch=true] + * @returns {Promise<{ status: number }>} + */ export async function putVersion(config, { - Bucket, Org, Body, ID, Version, Ext, Metadata, ContentLength, ContentType, + Bucket, Org, Repo, Body, ID, Version, Ext, Metadata, ContentLength, ContentType, }, noneMatch = true) { const length = ContentLength ?? getContentLength(Body); const client = noneMatch ? ifNoneMatch(config) : new S3Client(config); + const key = Repo + ? `${Org}/${Repo}/.da-versions/${ID}/${Version}.${Ext}` + : `${Org}/.da-versions/${ID}/${Version}.${Ext}`; const input = { - Bucket, Key: `${Org}/.da-versions/${ID}/${Version}.${Ext}`, Body, Metadata, ContentLength: length, ContentType, + Bucket, Key: key, Body, Metadata, ContentLength: length, ContentType, }; const command = new PutObjectCommand(input); try { @@ -86,9 +97,6 @@ export async function putObjectWithVersion( clientConditionals = null, ) { const config = getS3Config(env); - // While we are automatically storing the body once for the 'Collab Parse' changes, we never - // do a HEAD, because we may need the content. Once we don't need to do this automatic store - // any more, we can change the 'false' argument in the next line back to !body. const current = await getObject(env, update, false); let ID = current.metadata?.id; @@ -183,29 +191,34 @@ export async function putObjectWithVersion( } const pps = current.metadata?.preparsingstore || '0'; - let storeBody = !body && pps === '0'; + let storeBody = false; let versionCreated = false; - let Preparsingstore = storeBody ? Timestamp : pps; - let Label = storeBody ? 'Collab Parse' : update.label; + let Label = update.label; - if (createVersion) { - if (daCtx.method === 'PUT' - && daCtx.ext === 'html' - && current.contentLength > EMPTY_DOC_SIZE - && (!update.body || update.body.size <= EMPTY_DOC_SIZE)) { - // we are about to empty the document body - // this should almost never happen but it does in some unexpectedcases - // we want then to store a version of the full document as a Restore Point - // eslint-disable-next-line no-console - console.warn(`Empty body, creating a restore point (${current.contentLength} / ${update.body?.size})`); - storeBody = true; - Label = 'Restore Point'; - Preparsingstore = Timestamp; - } + // Restore Point: we are about to empty the document body; store current content as a snapshot + if (daCtx.method === 'PUT' + && daCtx.ext === 'html' + && current.contentLength > EMPTY_DOC_SIZE + && (!update.body || update.body.size <= EMPTY_DOC_SIZE)) { + // eslint-disable-next-line no-console + console.warn(`Empty body, creating a restore point (${current.contentLength} / ${update.body?.size})`); + storeBody = true; + Label = 'Restore Point'; + } + + const Preparsingstore = storeBody ? Timestamp : pps; + // Only create version for explicit label (POST /versionsource) or Restore Point. No Collab Parse. + const shouldCreateVersionObject = createVersion + && (update.label != null || Label === 'Restore Point'); + + const repo = (update.key && update.key.includes('/')) ? update.key.split('/')[0] : ''; + + if (shouldCreateVersionObject) { const versionResp = await putVersion(config, { Bucket: input.Bucket, Org: daCtx.org, + Repo: repo || undefined, Body: (body || storeBody ? current.body : ''), ContentLength: (body || storeBody ? current.contentLength : undefined), ContentType: current.contentType, @@ -224,6 +237,18 @@ export async function putObjectWithVersion( return { status: versionResp.status, metadata: { id: ID } }; } versionCreated = versionResp.status === 200; + } else if (createVersion && repo) { + // Audit entry: write to audit.txt (new structure only, 30 min dedupe) + try { + await writeAuditEntry(env, { bucket: input.Bucket, org: daCtx.org }, repo, ID, { + timestamp: Timestamp, + users: Users, + path: Path, + }); + } catch (e) { + // eslint-disable-next-line no-console + console.error('Failed to write audit entry', e); + } } const metadata = { diff --git a/test/storage/version/list.test.js b/test/storage/version/list.test.js index f7eb9a0..8a83d8a 100644 --- a/test/storage/version/list.test.js +++ b/test/storage/version/list.test.js @@ -412,4 +412,167 @@ describe('Version List', () => { // Verify MAX_VERSIONS (500) is passed to listObjects assert.strictEqual(maxVersionsParam, 500); }); + + describe('backward compat: not migrated but new audit entries in new path', () => { + it('when new path has only audit.txt (no snapshots), merges legacy snapshots + audit with new audit', async () => { + const listObjectCalls = []; + const getObjectCalls = []; + const mockGetObject = async (env, { key }) => { + getObjectCalls.push(key); + if (key === 'myrepo/docs/file.html') { + return { status: 200, metadata: { id: 'file-id-bcompat' } }; + } + if (key === '.da-versions/file-id-bcompat/snap1.html') { + return { + status: 200, + metadata: { + timestamp: '1000', + users: '[{"email":"legacy@example.com"}]', + path: 'myrepo/docs/file.html', + label: 'Legacy snapshot', + }, + contentLength: 100, + }; + } + return { status: 404 }; + }; + + const mockListObjects = async (env, { key }) => { + listObjectCalls.push(key); + if (key === 'myrepo/.da-versions/file-id-bcompat') { + return { + status: 200, + body: JSON.stringify([{ name: 'audit', ext: 'txt' }]), + }; + } + if (key === '.da-versions/file-id-bcompat') { + return { + status: 200, + body: JSON.stringify([{ name: 'snap1', ext: 'html' }]), + }; + } + return { status: 404, body: '[]' }; + }; + + const newAuditLines = [ + { timestamp: 5000, users: [{ email: 'new@example.com' }], path: 'myrepo/docs/file.html' }, + ]; + const mockReadAuditLines = async () => newAuditLines; + + const { listObjectVersions } = await esmock('../../../src/storage/version/list.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/object/list.js': { default: mockListObjects }, + '../../../src/storage/version/audit.js': { readAuditLines: mockReadAuditLines }, + }); + + const result = await listObjectVersions( + {}, + { bucket: 'bkt', org: 'testorg', key: 'myrepo/docs/file.html' }, + ); + + assert.strictEqual(result.status, 200); + const versions = JSON.parse(result.body); + assert.strictEqual(versions.length, 2, 'merged legacy snapshot + new audit'); + const withUrl = versions.filter((v) => v.url); + const withoutUrl = versions.filter((v) => !v.url); + assert.strictEqual(withUrl.length, 1, 'one legacy snapshot with url'); + assert.strictEqual(withoutUrl.length, 1, 'one new audit entry without url'); + assert.strictEqual(withUrl[0].timestamp, 1000); + assert.strictEqual(withoutUrl[0].timestamp, 5000); + assert.ok(listObjectCalls.some((k) => k.includes('myrepo/.da-versions'))); + assert.ok(listObjectCalls.some((k) => k.startsWith('.da-versions/'))); + }); + + it('when new path list returns 404, uses legacy only', async () => { + const mockGetObject = async (env, { key }) => { + if (key === 'repo/path.html') { + return { status: 200, metadata: { id: 'id-404' } }; + } + if (key === '.da-versions/id-404/legacy1.html') { + return { + status: 200, + metadata: { timestamp: '2000', users: '[]', path: 'repo/path.html' }, + contentLength: 50, + }; + } + return { status: 404 }; + }; + + const mockListObjects = async (env, { key }) => { + if (key === 'repo/.da-versions/id-404') { + return { status: 404, body: '[]' }; + } + if (key === '.da-versions/id-404') { + return { + status: 200, + body: JSON.stringify([{ name: 'legacy1', ext: 'html' }]), + }; + } + return { status: 404 }; + }; + + const { listObjectVersions } = await esmock('../../../src/storage/version/list.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/object/list.js': { default: mockListObjects }, + }); + + const result = await listObjectVersions( + {}, + { bucket: 'bkt', org: 'testorg', key: 'repo/path.html' }, + ); + + assert.strictEqual(result.status, 200); + const versions = JSON.parse(result.body); + assert.strictEqual(versions.length, 1); + assert.ok(versions[0].url); + }); + + it('when new path has snapshots, uses new result only (no legacy merge)', async () => { + const mockGetObject = async (env, { key }) => { + if (key === 'repo/doc.html') { + return { status: 200, metadata: { id: 'id-new' } }; + } + if (key === 'repo/.da-versions/id-new/v1.html') { + return { + status: 200, + metadata: { + timestamp: '3000', + users: '[{"email":"u@x.com"}]', + path: 'repo/doc.html', + }, + contentLength: 200, + }; + } + return { status: 404 }; + }; + + const mockListObjects = async (env, { key }) => { + if (key === 'repo/.da-versions/id-new') { + return { + status: 200, + body: JSON.stringify([{ name: 'v1', ext: 'html' }]), + }; + } + return { status: 404 }; + }; + + const mockReadAuditLines = async () => []; + + const { listObjectVersions } = await esmock('../../../src/storage/version/list.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/object/list.js': { default: mockListObjects }, + '../../../src/storage/version/audit.js': { readAuditLines: mockReadAuditLines }, + }); + + const result = await listObjectVersions( + {}, + { bucket: 'bkt', org: 'testorg', key: 'repo/doc.html' }, + ); + + assert.strictEqual(result.status, 200); + const versions = JSON.parse(result.body); + assert.strictEqual(versions.length, 1); + assert.strictEqual(versions[0].url, '/versionsource/testorg/repo/id-new/v1.html'); + }); + }); }); diff --git a/test/storage/version/put.test.js b/test/storage/version/put.test.js index 1a3c85f..8463203 100644 --- a/test/storage/version/put.test.js +++ b/test/storage/version/put.test.js @@ -273,13 +273,8 @@ describe('Version Put', () => { const resp = await putObjectWithVersion(env, daCtx, update, true); assert.equal(200, resp.status); assert.equal('x123', resp.metadata.id); - assert.equal(1, s3VersionSent.length); - assert.equal('prevbody', s3VersionSent[0].input.Body); - assert.equal('bkt', s3VersionSent[0].input.Bucket); - assert.equal('myorg/.da-versions/x123/aaa-bbb.html', s3VersionSent[0].input.Key); - assert.equal('[{"email":"anonymous"}]', s3VersionSent[0].input.Metadata.Users); - assert.equal('a/x.html', s3VersionSent[0].input.Metadata.Path); - assert(s3VersionSent[0].input.Metadata.Timestamp > 0); + // No Collab Parse version: version only created for explicit label or Restore Point + assert.equal(0, s3VersionSent.length); assert.equal(1, s3Sent.length); assert.equal('new-body', s3Sent[0].input.Body); @@ -346,13 +341,8 @@ describe('Version Put', () => { const resp = await putObjectWithVersion(env, daCtx, update, false); assert.equal(202, resp.status); assert.equal('q123-456', resp.metadata.id); - assert.equal(1, s3VersionSent.length); - assert.equal('', s3VersionSent[0].input.Body); - assert.equal('bbb', s3VersionSent[0].input.Bucket); - assert.equal('myorg/.da-versions/q123-456/ver123.html', s3VersionSent[0].input.Key); - assert.equal('[{"email":"anonymous"}]', s3VersionSent[0].input.Metadata.Users); - assert.equal('a/x.html', s3VersionSent[0].input.Metadata.Path); - assert(s3VersionSent[0].input.Metadata.Timestamp > 0); + // No empty audit version: audit will use audit.txt in new structure + assert.equal(0, s3VersionSent.length); assert.equal(1, s3Sent.length); assert.equal('new-body', s3Sent[0].input.Body); @@ -558,18 +548,20 @@ describe('Version Put', () => { users: '[{"email":"anonymous"}]', preparsingstore: 12345, }; - return { body: '', metadata, contentLength: 616 }; + return { + body: '', + metadata, + contentLength: 616, + etag: 'etag-1', + status: 200, + }; }; const sentToS3 = []; const s3Client = { send: async (c) => { sentToS3.push(c); - return { - $metadata: { - httpStatusCode: 201, - }, - }; + return { $metadata: { httpStatusCode: 200 } }; }, }; const mockS3Client = () => s3Client; @@ -580,17 +572,17 @@ describe('Version Put', () => { }, '../../../src/storage/utils/version.js': { ifNoneMatch: mockS3Client, + ifMatch: mockS3Client, }, }); - const resp = await putObjectWithVersion({}, { method: 'HEAD' }, { type: 'text/html' }); + await putObjectWithVersion({}, { method: 'HEAD', org: 'o', ext: 'html' }, { + type: 'text/html', org: 'o', key: 'q', + }); + // No version created for HEAD/collab parse (no label) + assert.equal(0, sentToS3.filter((c) => c.input.Key?.includes('.da-versions')).length); + // Main document updated assert.equal(1, sentToS3.length); - const { input } = sentToS3[0]; - assert.equal('', input.Body, 'Empty body for HEAD'); - assert.equal(0, input.ContentLength, 'Should have used 0 as content length for HEAD'); - assert.equal('/q', input.Metadata.Path); - assert.equal(123, input.Metadata.Timestamp); - assert.equal('[{"email":"anonymous"}]', input.Metadata.Users); }); it('Test putObjectWithVersion BODY', async () => { @@ -656,13 +648,8 @@ describe('Version Put', () => { users: [{ email: 'hi@acme.com' }], }; await putObjectWithVersion({}, ctx, update, true); - assert.equal(1, sentToS3.length); - const { input } = sentToS3[0]; - assert.equal('Somebody...', input.Body); - assert.equal(616, input.ContentLength); - assert.equal('/qwerty', input.Metadata.Path); - assert.equal(1234, input.Metadata.Timestamp); - assert.equal('[{"email":"anonymous"}]', input.Metadata.Users); + // No Collab Parse version (no explicit label) + assert.equal(0, sentToS3.length); assert.equal(1, sentToS3_2.length); const input2 = sentToS3_2[0].input; @@ -877,7 +864,10 @@ describe('Version Put', () => { users: [{ email: 'test@example.com' }], }; - await putObjectWithVersion(env, daCtx, { key: 'test-file.html', type: 'text/html' }, 'test body', 'test-guid'); + // Explicit label required to create a version (no Collab Parse version) + await putObjectWithVersion(env, daCtx, { + key: 'test-file.html', type: 'text/html', label: 'Test version', + }, 'test body', 'test-guid'); assert.strictEqual(sentCommands.length, 2); // Version + main file const putCommand = sentCommands[0]; // First command is the version @@ -1463,26 +1453,73 @@ describe('Version Put', () => { assert.strictEqual(result.status, 200); assert.strictEqual(result.metadata.id, 'html-id-existing'); - // Should have 2 commands: one for version, one for main object - assert.strictEqual(sentCommands.length, 2); - - // First command should store the old version - const versionCommand = sentCommands[0]; - assert.strictEqual(versionCommand.input.Bucket, 'content-bucket'); - assert(versionCommand.input.Key.includes('.da-versions/html-id-existing/')); - assert(versionCommand.input.Key.endsWith('.html')); - assert.strictEqual(versionCommand.input.Body, existingHtmlContent); - assert.strictEqual(versionCommand.input.ContentType, 'text/html'); - assert.strictEqual(versionCommand.input.ContentLength, existingHtmlContent.length); + // No version without explicit label; only main object updated + assert.strictEqual(sentCommands.length, 1); - // Second command should store the new content - const mainCommand = sentCommands[1]; + const mainCommand = sentCommands[0]; assert.strictEqual(mainCommand.input.Bucket, 'content-bucket'); assert.strictEqual(mainCommand.input.Key, 'testorg/pages/index.html'); assert.strictEqual(mainCommand.input.Body, newHtmlFile); assert.strictEqual(mainCommand.input.ContentType, 'text/html'); }); + it('Test putObjectWithVersion with HTML creates version when label provided', async () => { + const existingHtmlContent = 'Old'; + const mockGetObject = async () => ({ + body: existingHtmlContent, + contentLength: existingHtmlContent.length, + contentType: 'text/html', + etag: 'etag-old', + metadata: { + id: 'html-id-existing', + version: 'ver-old', + path: 'pages/index.html', + timestamp: '123', + users: '[{"email":"anonymous"}]', + }, + status: 200, + }); + + const sentCommands = []; + const mockS3Client = { + send: async (c) => { + sentCommands.push(c); + return { $metadata: { httpStatusCode: 200 } }; + }, + }; + + const { putObjectWithVersion } = await esmock('../../../src/storage/version/put.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/utils/version.js': { + ifNoneMatch: () => mockS3Client, + ifMatch: () => mockS3Client, + }, + }); + + const env = {}; + const daCtx = { org: 'testorg', ext: 'html', users: [{ email: 'u@example.com' }] }; + const newHtmlContent = '

New

'; + const newHtmlFile = new File([newHtmlContent], 'index.html', { type: 'text/html' }); + const update = { + bucket: 'content-bucket', + org: 'testorg', + key: 'pages/index.html', + body: newHtmlFile, + type: 'text/html', + label: 'Before redesign', + }; + + const result = await putObjectWithVersion(env, daCtx, update, true); + + assert.strictEqual(result.status, 200); + assert.strictEqual(result.versionCreated, true); + assert.strictEqual(sentCommands.length, 2); + const versionCommand = sentCommands[0]; + assert(versionCommand.input.Key.includes('.da-versions/html-id-existing/')); + assert.strictEqual(versionCommand.input.Body, existingHtmlContent); + assert.strictEqual(versionCommand.input.Metadata.Label, 'Before redesign'); + }); + describe('Versioning behavior: CREATE vs UPDATE', () => { it('JPEG: Binary files NEVER create versions (first or second POST)', async () => { const sentCommands = []; @@ -1707,18 +1744,18 @@ describe('Version Put', () => { type: 'text/html', }; - // FIRST CALL - no version + // FIRST CALL - no version (new file) sentCommands.length = 0; await putObjectWithVersion(env, daCtx, update); assert.strictEqual(sentCommands.length, 1); - // SECOND CALL - creates version for HTML + // SECOND CALL - no version without explicit label (no Collab Parse) sentCommands.length = 0; await putObjectWithVersion(env, daCtx, update); - assert.strictEqual(sentCommands.length, 2); + assert.strictEqual(sentCommands.length, 1); }); - it('JSON: New file (404) creates object WITHOUT version, existing file creates version', async () => { + it('JSON: New file (404) creates object WITHOUT version, existing file no version without label', async () => { const sentCommands = []; let callCount = 0; @@ -1782,15 +1819,15 @@ describe('Version Put', () => { type: 'application/json', }; - // FIRST CALL - no version + // FIRST CALL - no version (new file) sentCommands.length = 0; await putObjectWithVersion(env, daCtx, update); assert.strictEqual(sentCommands.length, 1); - // SECOND CALL - creates version for JSON + // SECOND CALL - no version without explicit label sentCommands.length = 0; await putObjectWithVersion(env, daCtx, update); - assert.strictEqual(sentCommands.length, 2); + assert.strictEqual(sentCommands.length, 1); }); it('PDF: Binary files NEVER create versions (first or second POST)', async () => { @@ -2162,7 +2199,10 @@ describe('Version Put', () => { }, }); - const resp = await putObjectWithVersion({}, { org: 'o', ext: 'html', users: [] }, { org: 'o', key: 'a.html', body: 'new' }, true); + // Explicit label required to create a version + const resp = await putObjectWithVersion({}, { org: 'o', ext: 'html', users: [] }, { + org: 'o', key: 'a.html', body: 'new', label: 'My version', + }, true); assert.equal(200, resp.status); assert.strictEqual(true, resp.versionCreated); }); @@ -2195,7 +2235,10 @@ describe('Version Put', () => { }, }); - const resp = await putObjectWithVersion({}, { org: 'o', ext: 'html', users: [] }, { org: 'o', key: 'a.html', body: 'new' }, true); + // Explicit label so we attempt version; putVersion returns 412 + const resp = await putObjectWithVersion({}, { org: 'o', ext: 'html', users: [] }, { + org: 'o', key: 'a.html', body: 'new', label: 'My version', + }, true); assert.equal(200, resp.status); assert.strictEqual(false, resp.versionCreated); }); From 7f027fc76a3ac5d3d99b413437cdedc0a6417354 Mon Sep 17 00:00:00 2001 From: kptdobe Date: Mon, 16 Mar 2026 13:48:47 +0100 Subject: [PATCH 2/6] chore: various fixes --- scripts/version-migrate-run.js | 19 ++- src/storage/version/audit.js | 82 ++++++--- src/storage/version/list.js | 9 +- src/storage/version/put.js | 18 +- test/storage/version/audit.test.js | 221 ++++++++++++++++++++++++ test/storage/version/put.test.js | 263 +++++++++++++++++++++++++++++ 6 files changed, 582 insertions(+), 30 deletions(-) create mode 100644 test/storage/version/audit.test.js diff --git a/scripts/version-migrate-run.js b/scripts/version-migrate-run.js index b75ec8f..d802010 100644 --- a/scripts/version-migrate-run.js +++ b/scripts/version-migrate-run.js @@ -74,19 +74,32 @@ function dedupeAuditEntries(entries) { } function formatAuditLine(entry) { - return [entry.timestamp, entry.users, entry.path].join('\t'); + const versionLabel = entry.versionLabel ?? ''; + const versionId = entry.versionId ?? ''; + return [entry.timestamp, entry.users, entry.path, versionLabel, versionId].join('\t'); } -/** Parse one audit line (tab-separated: timestamp, users, path). Same format as audit.js. */ +/** Parse one audit line (ts, users, path, versionLabel?, versionId?). Same format as audit.js. */ function parseAuditLine(line) { const t = line.trim(); if (!t) return null; const parts = t.split('\t'); if (parts.length < 3) return null; + let versionLabel = ''; + let versionId = ''; + if (parts.length >= 5) { + versionId = parts.pop(); + versionLabel = parts.pop(); + } else if (parts.length >= 4) { + versionId = parts.pop(); + } + const path = parts.slice(2).join('\t'); return { timestamp: parts[0], users: parts[1], - path: parts.slice(2).join('\t'), + path, + versionLabel, + versionId, }; } diff --git a/src/storage/version/audit.js b/src/storage/version/audit.js index 855b0b3..848d162 100644 --- a/src/storage/version/audit.js +++ b/src/storage/version/audit.js @@ -24,34 +24,51 @@ export const AUDIT_TIME_WINDOW_MS = 30 * 60 * 1000; const SEP = '\t'; /** - * Serialize one audit entry to a line (timestamp \t users \t path). - * @param {{ timestamp: string, users: string, path: string }} entry + * Serialize one audit entry to a line (timestamp \t users \t path \t versionLabel \t versionId). + * versionLabel = human-readable name (e.g. "Restore Point"); versionId = snapshot filename. + * Both empty for edits. + * @param {object} entry - { timestamp, users, path, versionLabel?, versionId? } * @returns {string} */ export function formatAuditLine(entry) { - return [entry.timestamp, entry.users, entry.path].join(SEP); + const versionLabel = entry.versionLabel ?? ''; + const versionId = entry.versionId ?? ''; + return [entry.timestamp, entry.users, entry.path, versionLabel, versionId].join(SEP); } /** - * Parse one audit line to { timestamp, users, path }. + * Parse one audit line to { timestamp, users, path, versionLabel, versionId }. + * Backward compat: 5 cols (label+id), 4 cols (id only), 3 cols (path only). * @param {string} line - * @returns {{ timestamp: string, users: string, path: string }|null} + * @returns {object|null} { timestamp, users, path, versionLabel, versionId } */ export function parseAuditLine(line) { const t = line.trim(); if (!t) return null; const parts = t.split(SEP); if (parts.length < 3) return null; + let versionLabel = ''; + let versionId = ''; + if (parts.length >= 5) { + versionId = parts.pop(); + versionLabel = parts.pop(); + } else if (parts.length >= 4) { + versionId = parts.pop(); + } + const path = parts.slice(2).join(SEP); return { timestamp: parts[0], users: parts[1], - path: parts.slice(2).join(SEP), + path, + versionLabel, + versionId, }; } /** - * Read audit.txt body stream to string. - * @param {import('stream').Readable|ReadableStream|string} body + * Read audit.txt body stream to string. Handles Web ReadableStream (Workers/R2), + * fetch Response.body, and Node-style async iterable streams. + * @param {ReadableStream|import('stream').Readable|string} body * @returns {Promise} */ async function streamToString(body) { @@ -61,6 +78,28 @@ async function streamToString(body) { const text = await body.text(); return text; } + if (typeof body.getReader === 'function') { + const reader = body.getReader(); + const chunks = []; + try { + for (;;) { + // eslint-disable-next-line no-await-in-loop -- stream read must be sequential + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + if (chunks.length === 0) return ''; + const blob = new Uint8Array(chunks.reduce((acc, c) => acc + c.length, 0)); + let off = 0; + for (const c of chunks) { + blob.set(c, off); + off += c.length; + } + return new TextDecoder().decode(blob); + } finally { + reader.releaseLock?.(); + } + } const chunks = []; try { for await (const chunk of body) { @@ -104,6 +143,8 @@ export async function readAuditLines(env, ctx, repo, fileId) { } })(), path: line.path, + versionLabel: line.versionLabel || undefined, + versionId: line.versionId || undefined, })); } catch (e) { if (e.$metadata?.httpStatusCode === 404 || e.name === 'NoSuchKey') { @@ -130,12 +171,13 @@ function usersNormalized(usersJson) { /** * Append or update last line in audit.txt (read-modify-write). If last line is same user - * and within AUDIT_TIME_WINDOW_MS, replace that line with the new timestamp; else append. + * and within AUDIT_TIME_WINDOW_MS and both last and new are edits (no version), replace that + * line; else append. A version entry always appends and is never replaced (breaks the window). * @param {object} env * @param {{ bucket: string, org: string }} ctx - bucket, org * @param {string} repo * @param {string} fileId - * @param {{ timestamp: string, users: string, path: string }} entry + * @param {object} entry - { timestamp, users, path, versionLabel?, versionId? } * @returns {Promise<{ status: number }>} */ export async function writeAuditEntry(env, ctx, repo, fileId, entry) { @@ -163,16 +205,18 @@ export async function writeAuditEntry(env, ctx, repo, fileId, entry) { const lines = existingText.split('\n').filter((l) => l.trim()); const lastLine = lines.length ? parseAuditLine(lines[lines.length - 1]) : null; + const isVersionEntry = (entry.versionLabel ?? '') !== '' || (entry.versionId ?? '') !== ''; + const lastIsVersion = lastLine + && ((lastLine.versionLabel ?? '') !== '' || (lastLine.versionId ?? '') !== ''); + const canCollapse = lastLine + && usersNormalized(lastLine.users) === entryUsersNorm + && !isVersionEntry + && !lastIsVersion + && (nowMs - (parseInt(lastLine.timestamp, 10) || 0) <= AUDIT_TIME_WINDOW_MS); let newContent; - if (lastLine && usersNormalized(lastLine.users) === entryUsersNorm) { - const lastTs = parseInt(lastLine.timestamp, 10) || 0; - if (nowMs - lastTs <= AUDIT_TIME_WINDOW_MS) { - lines[lines.length - 1] = formatAuditLine(entry); - newContent = `${lines.join('\n')}\n`; - } else { - const sep = existingText && !existingText.endsWith('\n') ? '\n' : ''; - newContent = `${existingText}${sep}${formatAuditLine(entry)}\n`; - } + if (canCollapse) { + lines[lines.length - 1] = formatAuditLine(entry); + newContent = `${lines.join('\n')}\n`; } else { const sep = existingText && !existingText.endsWith('\n') ? '\n' : ''; newContent = `${existingText}${sep}${formatAuditLine(entry)}\n`; diff --git a/src/storage/version/list.js b/src/storage/version/list.js index c585e88..faae0a8 100644 --- a/src/storage/version/list.js +++ b/src/storage/version/list.js @@ -61,7 +61,14 @@ async function listFromNewStructure(env, { bucket, org, key: _ }, fileId, repo) let auditEntries = []; try { const lines = await readAuditLines(env, { bucket, org }, repo, fileId); - auditEntries = lines.map(({ users, timestamp, path }) => ({ users, timestamp, path })); + auditEntries = lines.map(({ + users, timestamp, path, versionLabel, versionId, + }) => { + const entry = { users, timestamp, path }; + if (versionLabel) entry.versionLabel = versionLabel; + if (versionId) entry.versionId = versionId; + return entry; + }); } catch { // Ignore audit read errors (e.g. 404) } diff --git a/src/storage/version/put.js b/src/storage/version/put.js index 7020e48..6b7fd56 100644 --- a/src/storage/version/put.js +++ b/src/storage/version/put.js @@ -114,7 +114,7 @@ export async function putObjectWithVersion( const Users = JSON.stringify(getUsersForMetadata(daCtx.users)); const input = buildInput(update); const Timestamp = `${Date.now()}`; - const Path = update.key; + const Path = update.key ?? daCtx.key ?? ''; // Validate conflicting conditionals - both headers present is unusual for PUT let effectiveConditionals = clientConditionals; @@ -212,13 +212,11 @@ export async function putObjectWithVersion( const shouldCreateVersionObject = createVersion && (update.label != null || Label === 'Restore Point'); - const repo = (update.key && update.key.includes('/')) ? update.key.split('/')[0] : ''; - if (shouldCreateVersionObject) { const versionResp = await putVersion(config, { Bucket: input.Bucket, Org: daCtx.org, - Repo: repo || undefined, + Repo: daCtx.site || undefined, Body: (body || storeBody ? current.body : ''), ContentLength: (body || storeBody ? current.contentLength : undefined), ContentType: current.contentType, @@ -237,13 +235,19 @@ export async function putObjectWithVersion( return { status: versionResp.status, metadata: { id: ID } }; } versionCreated = versionResp.status === 200; - } else if (createVersion && repo) { - // Audit entry: write to audit.txt (new structure only, 30 min dedupe) + } + + // Audit: one entry per versionable PUT; versionLabel + versionId when labelled version created. + if (createVersion) { try { - await writeAuditEntry(env, { bucket: input.Bucket, org: daCtx.org }, repo, ID, { + const versionId = versionCreated ? `${Version}.${daCtx.ext}` : undefined; + const versionLabel = versionCreated ? (Label ?? '') : undefined; + await writeAuditEntry(env, { bucket: input.Bucket, org: daCtx.org }, daCtx.site, ID, { timestamp: Timestamp, users: Users, path: Path, + versionLabel, + versionId, }); } catch (e) { // eslint-disable-next-line no-console diff --git a/test/storage/version/audit.test.js b/test/storage/version/audit.test.js new file mode 100644 index 0000000..29e1b27 --- /dev/null +++ b/test/storage/version/audit.test.js @@ -0,0 +1,221 @@ +/* + * Copyright 2025 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +import assert from 'node:assert'; +import esmock from 'esmock'; +import { GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3'; + +describe('Version Audit', () => { + describe('formatAuditLine / parseAuditLine', () => { + it('round-trips one entry (edit, no versionLabel/versionId)', async () => { + const { formatAuditLine, parseAuditLine } = await import('../../../src/storage/version/audit.js'); + const entry = { timestamp: '1000', users: '[{"email":"a@b.com"}]', path: 'repo/doc.html' }; + const line = formatAuditLine(entry); + assert.strictEqual(line, '1000\t[{"email":"a@b.com"}]\trepo/doc.html\t\t'); + const parsed = parseAuditLine(line); + assert.strictEqual(parsed.timestamp, '1000'); + assert.strictEqual(parsed.users, entry.users); + assert.strictEqual(parsed.path, 'repo/doc.html'); + assert.strictEqual(parsed.versionLabel, ''); + assert.strictEqual(parsed.versionId, ''); + }); + + it('round-trips entry with versionLabel and versionId (labelled save)', async () => { + const { formatAuditLine, parseAuditLine } = await import('../../../src/storage/version/audit.js'); + const entry = { + timestamp: '2000', + users: '[{"email":"u@x.com"}]', + path: 'repo/path.html', + versionLabel: 'Release 1', + versionId: 'abc-123.html', + }; + const line = formatAuditLine(entry); + assert.strictEqual(line, '2000\t[{"email":"u@x.com"}]\trepo/path.html\tRelease 1\tabc-123.html'); + const parsed = parseAuditLine(line); + assert.strictEqual(parsed.versionLabel, 'Release 1'); + assert.strictEqual(parsed.versionId, 'abc-123.html'); + }); + + it('parses legacy 3-column line (backward compat)', async () => { + const { parseAuditLine } = await import('../../../src/storage/version/audit.js'); + const line = '1000\t[{}]\trepo/f.html'; + const parsed = parseAuditLine(line); + assert.strictEqual(parsed.timestamp, '1000'); + assert.strictEqual(parsed.versionLabel, ''); + assert.strictEqual(parsed.versionId, ''); + }); + + it('parses legacy 4-column line (versionId only, no label)', async () => { + const { parseAuditLine } = await import('../../../src/storage/version/audit.js'); + const line = '2000\t[{}]\trepo/f.html\told-uuid.html'; + const parsed = parseAuditLine(line); + assert.strictEqual(parsed.timestamp, '2000'); + assert.strictEqual(parsed.versionLabel, ''); + assert.strictEqual(parsed.versionId, 'old-uuid.html'); + }); + }); + + describe('writeAuditEntry read-modify-write', () => { + it('appends new line when existing content is read (Web ReadableStream body)', async () => { + const existingLine = '1000\t[{"email":"a@b.com"}]\trepo/path.html'; + const bodyStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(`${existingLine}\n`)); + controller.close(); + }, + }); + + const putCalls = []; + function createMockS3Client() { + return { + async send(cmd) { + if (cmd instanceof GetObjectCommand) return { Body: bodyStream }; + if (cmd instanceof PutObjectCommand) { + putCalls.push(cmd.input); + return { $metadata: { httpStatusCode: 200 } }; + } + return { $metadata: { httpStatusCode: 200 } }; + }, + }; + } + + const { writeAuditEntry, AUDIT_TIME_WINDOW_MS } = await esmock( + '../../../src/storage/version/audit.js', + { + '@aws-sdk/client-s3': { + S3Client: createMockS3Client, + GetObjectCommand, + PutObjectCommand, + }, + '../../../src/storage/utils/config.js': { default: () => ({}) }, + }, + ); + + const env = {}; + const ctx = { bucket: 'bkt', org: 'org1' }; + const newEntry = { + timestamp: String(1000 + AUDIT_TIME_WINDOW_MS + 1), + users: '[{"email":"a@b.com"}]', + path: 'repo/path.html', + }; + + const result = await writeAuditEntry(env, ctx, 'repo', 'file-id-1', newEntry); + + assert.strictEqual(result.status, 200); + assert.strictEqual(putCalls.length, 1); + const putBody = putCalls[0].Body; + assert.strictEqual(typeof putBody, 'string'); + const lines = putBody.split('\n').filter((l) => l.trim()); + assert.strictEqual(lines.length, 2, 'must append: existing line + new line (would fail if stream not read)'); + assert.ok(lines[0].startsWith('1000\t')); + assert.ok(lines[1].startsWith(String(1000 + AUDIT_TIME_WINDOW_MS + 1))); + }); + + it('overwrites last line when same user and within time window', async () => { + const existingLine = '1000\t[{"email":"x@y.com"}]\trepo/f.html'; + const bodyStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(`${existingLine}\n`)); + controller.close(); + }, + }); + + const putCalls = []; + function createMockS3ClientOverwrite() { + return { + async send(cmd) { + if (cmd instanceof GetObjectCommand) return { Body: bodyStream }; + if (cmd instanceof PutObjectCommand) { + putCalls.push(cmd.input); + return { $metadata: { httpStatusCode: 200 } }; + } + return { $metadata: { httpStatusCode: 200 } }; + }, + }; + } + + const { writeAuditEntry, AUDIT_TIME_WINDOW_MS } = await esmock( + '../../../src/storage/version/audit.js', + { + '@aws-sdk/client-s3': { + S3Client: createMockS3ClientOverwrite, + GetObjectCommand, + PutObjectCommand, + }, + '../../../src/storage/utils/config.js': { default: () => ({}) }, + }, + ); + + const newEntry = { + timestamp: String(1000 + Math.floor(AUDIT_TIME_WINDOW_MS / 2)), + users: '[{"email":"x@y.com"}]', + path: 'repo/f.html', + }; + + await writeAuditEntry({}, { bucket: 'b', org: 'o' }, 'repo', 'fid', newEntry); + + assert.strictEqual(putCalls.length, 1); + const lines = putCalls[0].Body.split('\n').filter((l) => l.trim()); + assert.strictEqual(lines.length, 1, 'must overwrite last line (same user, within window)'); + }); + + it('appends three entries when edit then version then edit (version breaks time window)', async () => { + const baseMs = 1000; + const twoMinMs = 2 * 60 * 1000; + const seventeenMinMs = 17 * 60 * 1000; + const edit1 = `${baseMs}\t[{"email":"u@x.com"}]\trepo/doc.html\t\t`; + const versionAt = `${baseMs + twoMinMs}\t[{"email":"u@x.com"}]\trepo/doc.html\tRelease 1\tuuid.html`; + const existingText = `${edit1}\n${versionAt}\n`; + const bodyStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(existingText)); + controller.close(); + }, + }); + + const putCalls = []; + const mockSend = (cmd) => { + if (cmd instanceof GetObjectCommand) return { Body: bodyStream }; + if (cmd instanceof PutObjectCommand) { + putCalls.push(cmd.input); + return { $metadata: { httpStatusCode: 200 } }; + } + return { $metadata: { httpStatusCode: 200 } }; + }; + + const { writeAuditEntry } = await esmock( + '../../../src/storage/version/audit.js', + { + '@aws-sdk/client-s3': { + S3Client: function S3Client() { this.send = mockSend; }, + GetObjectCommand, + PutObjectCommand, + }, + '../../../src/storage/utils/config.js': { default: () => ({}) }, + }, + ); + + const edit2At = baseMs + seventeenMinMs; + await writeAuditEntry({}, { bucket: 'b', org: 'o' }, 'repo', 'fid', { + timestamp: String(edit2At), + users: '[{"email":"u@x.com"}]', + path: 'repo/doc.html', + }); + + assert.strictEqual(putCalls.length, 1); + const lines = putCalls[0].Body.split('\n').filter((l) => l.trim()); + assert.strictEqual(lines.length, 3, 'edit at 12:23, version at 12:25, edit at 12:40 => 3 entries'); + assert.ok(lines[0].endsWith('\t\t'), 'first line is edit (no version)'); + assert.ok(lines[1].includes('Release 1') && lines[1].includes('uuid.html'), 'second line is version'); + assert.ok(lines[2].startsWith(String(edit2At)) && lines[2].endsWith('\t\t'), 'third line is edit'); + }); + }); +}); diff --git a/test/storage/version/put.test.js b/test/storage/version/put.test.js index 8463203..c23bc05 100644 --- a/test/storage/version/put.test.js +++ b/test/storage/version/put.test.js @@ -2311,4 +2311,267 @@ describe('Version Put', () => { const resp = await postObjectVersion(req, env, ctx); assert.equal(201, resp.status); }); + + describe('audit entry (writeAuditEntry)', () => { + it('writes audit on every versionable PUT (no label) - uses daCtx.site as repo', async () => { + const auditCalls = []; + const mockWriteAuditEntry = async (env, ctx, repo, fileId, entry) => { + auditCalls.push({ + env, + ctx, + repo, + fileId, + entry, + }); + }; + + const mockGetObject = async () => ({ + body: 'content', + contentType: 'text/html', + contentLength: 7, + metadata: { id: 'file-id-1', version: 'v1' }, + status: 200, + }); + + const mockS3Client = { + send: () => ({ $metadata: { httpStatusCode: 200 } }), + }; + + const { putObjectWithVersion } = await esmock('../../../src/storage/version/put.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/utils/version.js': { + ifNoneMatch: () => mockS3Client, + ifMatch: () => mockS3Client, + }, + '../../../src/storage/version/audit.js': { writeAuditEntry: mockWriteAuditEntry }, + }); + + const daCtx = { + org: 'myorg', + ext: 'html', + site: 'daplayground', + users: [{ email: 'u@x.com' }], + }; + const update = { + bucket: 'bkt', + org: 'myorg', + key: 'daplayground/docs/surf.html', + body: 'new body', + type: 'text/html', + }; + + const resp = await putObjectWithVersion({}, daCtx, update, true); + + assert.strictEqual(resp.status, 200); + assert.strictEqual(auditCalls.length, 1, 'writeAuditEntry must be called once'); + assert.strictEqual(auditCalls[0].repo, 'daplayground', 'repo must come from daCtx.site'); + assert.strictEqual(auditCalls[0].fileId, 'file-id-1'); + assert.strictEqual(auditCalls[0].entry.path, 'daplayground/docs/surf.html'); + }); + + it('writes audit when version is also created (label) - audit separate from version', async () => { + const auditCalls = []; + const mockWriteAuditEntry = async (env, ctx, repo, fileId, entry) => { + auditCalls.push({ repo, fileId }); + }; + + const mockGetObject = async () => ({ + body: 'content', + contentType: 'text/html', + contentLength: 7, + metadata: { id: 'doc-id', version: 'ver-1' }, + status: 200, + }); + + const mockS3Client = { + send: () => ({ $metadata: { httpStatusCode: 200 } }), + }; + + const { putObjectWithVersion } = await esmock('../../../src/storage/version/put.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/utils/version.js': { + ifNoneMatch: () => mockS3Client, + ifMatch: () => mockS3Client, + }, + '../../../src/storage/version/audit.js': { writeAuditEntry: mockWriteAuditEntry }, + }); + + const daCtx = { + org: 'o', + ext: 'html', + site: 'mysite', + users: [], + }; + const update = { + org: 'o', + key: 'mysite/page.html', + body: 'new', + label: 'My version', + }; + + const resp = await putObjectWithVersion({}, daCtx, update, true); + + assert.strictEqual(resp.status, 200); + assert.strictEqual(resp.versionCreated, true); + assert.strictEqual(auditCalls.length, 1, 'writeAuditEntry must be called even when version created'); + assert.strictEqual(auditCalls[0].repo, 'mysite'); + }); + + it('audit entry includes versionLabel and versionId when labelled version is created (identifiable in list)', async () => { + const auditCalls = []; + const mockWriteAuditEntry = async (env, ctx, repo, fileId, entry) => { + auditCalls.push({ + repo, + fileId, + entry, + }); + }; + + const mockGetObject = async () => ({ + body: 'content', + contentType: 'text/html', + contentLength: 7, + metadata: { id: 'doc-id', version: 'ver-1' }, + status: 200, + }); + + const mockS3Client = { + send: () => ({ $metadata: { httpStatusCode: 200 } }), + }; + + const { putObjectWithVersion } = await esmock('../../../src/storage/version/put.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/utils/version.js': { + ifNoneMatch: () => mockS3Client, + ifMatch: () => mockS3Client, + }, + '../../../src/storage/version/audit.js': { writeAuditEntry: mockWriteAuditEntry }, + }); + + const daCtx = { + org: 'o', + ext: 'html', + site: 'mysite', + users: [], + }; + const update = { + org: 'o', + key: 'mysite/page.html', + body: 'new', + label: 'Release 1', + }; + + await putObjectWithVersion({}, daCtx, update, true); + + assert.strictEqual(auditCalls.length, 1); + assert.strictEqual( + auditCalls[0].entry.versionLabel, + 'Release 1', + 'audit entry must contain versionLabel when a labelled version was created', + ); + assert.ok( + auditCalls[0].entry.versionId, + 'audit entry must contain versionId when a labelled version was created', + ); + assert.ok( + auditCalls[0].entry.versionId.endsWith('.html'), + 'versionId must be snapshot filename (e.g. uuid.html)', + ); + }); + + it('audit entry has no versionLabel/versionId when no version object created (plain edit)', async () => { + const auditCalls = []; + const mockWriteAuditEntry = async (env, ctx, repo, fileId, entry) => { + auditCalls.push({ entry }); + }; + + const mockGetObject = async () => ({ + body: 'content', + contentType: 'text/html', + metadata: { id: 'doc-id', version: 'v1' }, + status: 200, + }); + + const mockS3Client = { + send: () => ({ $metadata: { httpStatusCode: 200 } }), + }; + + const { putObjectWithVersion } = await esmock('../../../src/storage/version/put.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/utils/version.js': { + ifNoneMatch: () => mockS3Client, + ifMatch: () => mockS3Client, + }, + '../../../src/storage/version/audit.js': { writeAuditEntry: mockWriteAuditEntry }, + }); + + const daCtx = { + org: 'o', + ext: 'html', + site: 'repo', + users: [], + }; + const update = { + org: 'o', + key: 'repo/p.html', + body: 'edit', + type: 'text/html', + }; + + await putObjectWithVersion({}, daCtx, update, true); + + assert.strictEqual(auditCalls.length, 1); + assert.strictEqual( + auditCalls[0].entry.versionLabel, + undefined, + 'audit entry must not have versionLabel for plain edit (no label)', + ); + assert.strictEqual( + auditCalls[0].entry.versionId, + undefined, + 'audit entry must not have versionId for plain edit (no label)', + ); + }); + + it('does not write audit for non-versionable type (e.g. PDF)', async () => { + const auditCalls = []; + const mockWriteAuditEntry = async () => { + auditCalls.push(1); + }; + + const mockGetObject = async () => ({ + body: 'binary', + contentType: 'application/pdf', + metadata: { id: 'pdf-id' }, + status: 200, + }); + + const mockS3Client = { + send: () => ({ $metadata: { httpStatusCode: 200 } }), + }; + + const { putObjectWithVersion } = await esmock('../../../src/storage/version/put.js', { + '../../../src/storage/object/get.js': { default: mockGetObject }, + '../../../src/storage/utils/version.js': { + ifNoneMatch: () => mockS3Client, + ifMatch: () => mockS3Client, + }, + '../../../src/storage/version/audit.js': { writeAuditEntry: mockWriteAuditEntry }, + }); + + const daCtx = { + org: 'o', + ext: 'pdf', + site: 'repo', + users: [], + }; + const update = { + org: 'o', key: 'repo/file.pdf', body: 'x', type: 'application/pdf', + }; + + await putObjectWithVersion({}, daCtx, update, true); + + assert.strictEqual(auditCalls.length, 0, 'no audit for non-versionable type'); + }); + }); }); From 9fc581e8344b435f2e56946d5df0ba266ba2bf83 Mon Sep 17 00:00:00 2001 From: kptdobe Date: Mon, 16 Mar 2026 14:27:47 +0100 Subject: [PATCH 3/6] chore: fine tuning --- scripts/version-migrate-run.js | 18 ++++++++++++++++-- src/storage/version/list.js | 8 +++++--- src/storage/version/put.js | 8 ++++++-- test/storage/version/put.test.js | 6 +++--- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/scripts/version-migrate-run.js b/scripts/version-migrate-run.js index d802010..44434e8 100644 --- a/scripts/version-migrate-run.js +++ b/scripts/version-migrate-run.js @@ -24,7 +24,8 @@ const Bucket = process.env.AEM_BUCKET_NAME; const Org = process.env.ORG || process.argv[2]; const DRY_RUN = process.env.DRY_RUN === '1' || process.env.DRY_RUN === 'true'; -const AUDIT_WINDOW_MS = 30 * 60 * 1000; +const AUDIT_WINDOW_MS = 1000; +// const AUDIT_WINDOW_MS = 30 * 60 * 1000; const config = { region: 'auto', @@ -73,6 +74,18 @@ function dedupeAuditEntries(entries) { return out; } +/** Normalize path (strip repo prefix) and versionId (strip .ext) for audit storage. */ +function normalizeAuditEntry(entry, repo) { + const path = (repo && entry.path && entry.path.startsWith(`${repo}/`)) + ? entry.path.slice(repo.length) + : (entry.path ?? ''); + let { versionId } = entry; + if (versionId && typeof versionId === 'string' && versionId.endsWith('.html')) { + versionId = versionId.slice(0, -5); + } + return { ...entry, path, versionId }; +} + function formatAuditLine(entry) { const versionLabel = entry.versionLabel ?? ''; const versionId = entry.versionId ?? ''; @@ -219,7 +232,8 @@ async function migrateFileId(fileId) { (a, b) => (parseInt(a.timestamp, 10) || 0) - (parseInt(b.timestamp, 10) || 0), ); const deduped = dedupeAuditEntries(combined); - const body = deduped.map(formatAuditLine).join('\n') + (deduped.length ? '\n' : ''); + const normalized = deduped.map((e) => normalizeAuditEntry(e, repo)); + const body = normalized.map(formatAuditLine).join('\n') + (normalized.length ? '\n' : ''); const auditKey = `${Org}/${repo}/.da-versions/${fileId}/audit.txt`; await client.send(new PutObjectCommand({ Bucket, diff --git a/src/storage/version/list.js b/src/storage/version/list.js index faae0a8..7460d02 100644 --- a/src/storage/version/list.js +++ b/src/storage/version/list.js @@ -21,7 +21,8 @@ const CONCURRENCY = 50; * Try new structure: repo/.da-versions/fileId/ (snapshots) + audit.txt. Merge and sort. * @returns {Promise<{ status: number, body?: string, contentType?: string }|null>} null = fallback */ -async function listFromNewStructure(env, { bucket, org, key: _ }, fileId, repo) { +async function listFromNewStructure(env, { bucket, org, key }, fileId, repo) { + const ext = (key && key.includes('.')) ? key.split('.').pop() : 'html'; const listResp = await listObjects(env, { bucket, org, @@ -64,9 +65,10 @@ async function listFromNewStructure(env, { bucket, org, key: _ }, fileId, repo) auditEntries = lines.map(({ users, timestamp, path, versionLabel, versionId, }) => { - const entry = { users, timestamp, path }; + const pathFull = (repo && path && path.startsWith('/')) ? repo + path : path; + const entry = { users, timestamp, path: pathFull }; if (versionLabel) entry.versionLabel = versionLabel; - if (versionId) entry.versionId = versionId; + if (versionId) entry.versionId = ext ? `${versionId}.${ext}` : versionId; return entry; }); } catch { diff --git a/src/storage/version/put.js b/src/storage/version/put.js index 6b7fd56..9076825 100644 --- a/src/storage/version/put.js +++ b/src/storage/version/put.js @@ -238,14 +238,18 @@ export async function putObjectWithVersion( } // Audit: one entry per versionable PUT; versionLabel + versionId when labelled version created. + // Store path without repo prefix and versionId without extension for readability. if (createVersion) { try { - const versionId = versionCreated ? `${Version}.${daCtx.ext}` : undefined; + const versionId = versionCreated ? Version : undefined; const versionLabel = versionCreated ? (Label ?? '') : undefined; + const pathForAudit = (daCtx.site && Path.startsWith(`${daCtx.site}/`)) + ? Path.slice(daCtx.site.length) + : Path; await writeAuditEntry(env, { bucket: input.Bucket, org: daCtx.org }, daCtx.site, ID, { timestamp: Timestamp, users: Users, - path: Path, + path: pathForAudit, versionLabel, versionId, }); diff --git a/test/storage/version/put.test.js b/test/storage/version/put.test.js index c23bc05..08fe5bb 100644 --- a/test/storage/version/put.test.js +++ b/test/storage/version/put.test.js @@ -2366,7 +2366,7 @@ describe('Version Put', () => { assert.strictEqual(auditCalls.length, 1, 'writeAuditEntry must be called once'); assert.strictEqual(auditCalls[0].repo, 'daplayground', 'repo must come from daCtx.site'); assert.strictEqual(auditCalls[0].fileId, 'file-id-1'); - assert.strictEqual(auditCalls[0].entry.path, 'daplayground/docs/surf.html'); + assert.strictEqual(auditCalls[0].entry.path, '/docs/surf.html', 'path stored without repo prefix'); }); it('writes audit when version is also created (label) - audit separate from version', async () => { @@ -2474,8 +2474,8 @@ describe('Version Put', () => { 'audit entry must contain versionId when a labelled version was created', ); assert.ok( - auditCalls[0].entry.versionId.endsWith('.html'), - 'versionId must be snapshot filename (e.g. uuid.html)', + auditCalls[0].entry.versionId && !auditCalls[0].entry.versionId.endsWith('.html'), + 'versionId stored without extension (e.g. uuid)', ); }); From aa32efa8ccbc6b92f8d571fc5448ef65eb414074 Mon Sep 17 00:00:00 2001 From: kptdobe Date: Mon, 16 Mar 2026 15:22:00 +0100 Subject: [PATCH 4/6] fix: merge legacy --- src/storage/version/list.js | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/storage/version/list.js b/src/storage/version/list.js index 7460d02..bd5ba65 100644 --- a/src/storage/version/list.js +++ b/src/storage/version/list.js @@ -133,9 +133,8 @@ async function listFromLegacyStructure(env, { bucket, org, key: _ }, fileId) { } /** - * Backward compat: project not yet migrated but new audit entries written to new path. - * When new path has no snapshots (only audit.txt), merge legacy snapshots + legacy audit - * with new audit so list shows both. + * Backward compat: merge legacy (org/.da-versions/fileId) with new (repo/.da-versions/fileId) + * so list shows both old and new versions/audits until migration is complete. */ function mergeLegacyAndNewResult(legacyResult, newResult) { if (legacyResult.status !== 200 || !legacyResult.body) return newResult; @@ -162,13 +161,8 @@ export async function listObjectVersions(env, { bucket, org, key }) { if (repo) { const newResult = await listFromNewStructure(env, { bucket, org, key }, fileId, repo); if (newResult) { - const newEntries = JSON.parse(newResult.body); - const hasSnapshotsInNew = newEntries.some((e) => e.url); - if (!hasSnapshotsInNew) { - const legacyResult = await listFromLegacyStructure(env, { bucket, org, key }, fileId); - return mergeLegacyAndNewResult(legacyResult, newResult); - } - return newResult; + const legacyResult = await listFromLegacyStructure(env, { bucket, org, key }, fileId); + return mergeLegacyAndNewResult(legacyResult, newResult); } } From 4434dd7149d21cf59f5f813f2220f36207d4a0a5 Mon Sep 17 00:00:00 2001 From: kptdobe Date: Mon, 16 Mar 2026 15:42:43 +0100 Subject: [PATCH 5/6] chore: re-align script --- scripts/version-migrate-run.js | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/scripts/version-migrate-run.js b/scripts/version-migrate-run.js index 44434e8..e5a9455 100644 --- a/scripts/version-migrate-run.js +++ b/scripts/version-migrate-run.js @@ -24,8 +24,8 @@ const Bucket = process.env.AEM_BUCKET_NAME; const Org = process.env.ORG || process.argv[2]; const DRY_RUN = process.env.DRY_RUN === '1' || process.env.DRY_RUN === 'true'; -const AUDIT_WINDOW_MS = 1000; -// const AUDIT_WINDOW_MS = 30 * 60 * 1000; +/** Must match src/storage/version/audit.js AUDIT_TIME_WINDOW_MS for consistent dedup. */ +const AUDIT_WINDOW_MS = 30 * 60 * 1000; const config = { region: 'auto', @@ -56,8 +56,13 @@ function usersNormalized(usersJson) { } } +/** + * Dedupe audit entries: same-user edits within AUDIT_WINDOW_MS collapse (keep last). + * Labelled versions never collapse (match audit.js: version entries "interrupt" the window). + */ function dedupeAuditEntries(entries) { const out = []; + const isVersionEntry = (e) => (e?.versionLabel ?? '') !== '' || (e?.versionId ?? '') !== ''; for (const e of entries.sort((a, b) => (a.timestamp || 0) - (b.timestamp || 0))) { const last = out[out.length - 1]; const ts = parseInt(e.timestamp, 10) || 0; @@ -65,7 +70,9 @@ function dedupeAuditEntries(entries) { const lastTs = parseInt(last?.timestamp, 10) || 0; const sameUser = last && usersNormalized(last.users) === userNorm; const inWindow = sameUser && (ts - lastTs <= AUDIT_WINDOW_MS); - if (inWindow) { + const lastIsVersion = last && isVersionEntry(last); + const canCollapse = inWindow && !isVersionEntry(e) && !lastIsVersion; + if (canCollapse) { out[out.length - 1] = e; } else { out.push(e); @@ -74,14 +81,14 @@ function dedupeAuditEntries(entries) { return out; } -/** Normalize path (strip repo prefix) and versionId (strip .ext) for audit storage. */ +/** Normalize path (strip repo prefix) and versionId (strip extension) for audit storage. */ function normalizeAuditEntry(entry, repo) { const path = (repo && entry.path && entry.path.startsWith(`${repo}/`)) ? entry.path.slice(repo.length) : (entry.path ?? ''); let { versionId } = entry; - if (versionId && typeof versionId === 'string' && versionId.endsWith('.html')) { - versionId = versionId.slice(0, -5); + if (versionId && typeof versionId === 'string' && versionId.includes('.')) { + versionId = versionId.replace(/\.[^.]+$/, ''); } return { ...entry, path, versionId }; } From 9d898c40d78f41aa5220e1d82867f72e0fb6f80e Mon Sep 17 00:00:00 2001 From: kptdobe Date: Mon, 16 Mar 2026 16:30:26 +0100 Subject: [PATCH 6/6] chore: upgrade scripts --- scripts/version-migrate-analyse.js | 99 +++++++++++++++++++-------- scripts/version-migrate-run.js | 101 ++++++++++++++++++++++------ scripts/version-migrate-validate.js | 0 src/storage/version/list.js | 8 ++- 4 files changed, 159 insertions(+), 49 deletions(-) mode change 100644 => 100755 scripts/version-migrate-analyse.js mode change 100644 => 100755 scripts/version-migrate-run.js mode change 100644 => 100755 scripts/version-migrate-validate.js diff --git a/scripts/version-migrate-analyse.js b/scripts/version-migrate-analyse.js old mode 100644 new mode 100755 index fcff0ed..1eda082 --- a/scripts/version-migrate-analyse.js +++ b/scripts/version-migrate-analyse.js @@ -9,17 +9,19 @@ * OF ANY KIND, either express or implied. See the License for the specific language * governing permissions and limitations under the License. */ -/* eslint-disable no-await-in-loop -- migration script: sequential to avoid rate limits */ +/* eslint-disable no-await-in-loop -- script: list + concurrency use await in loops */ import './load-env.js'; import { S3Client, ListObjectsV2Command, - HeadObjectCommand, } from '@aws-sdk/client-s3'; const Bucket = process.env.AEM_BUCKET_NAME; const Org = process.env.ORG || process.argv[2]; +/** Process N file IDs in parallel. */ +const CONCURRENCY = parseInt(process.env.MIGRATE_ANALYSE_CONCURRENCY || '25', 10); + if (!Bucket || !Org) { console.error('Set AEM_BUCKET_NAME and ORG (or pass org as first arg)'); process.exit(1); @@ -38,6 +40,23 @@ if (process.env.S3_FORCE_PATH_STYLE === 'true') config.forcePathStyle = true; const client = new S3Client(config); const prefix = `${Org}/.da-versions/`; +async function runWithConcurrency(limit, items, fn) { + const results = []; + const executing = new Set(); + for (const item of items) { + const p = Promise.resolve().then(() => fn(item)); + results.push(p); + executing.add(p); + p.finally(() => { + executing.delete(p); + }); + if (executing.size >= limit) { + await Promise.race(executing); + } + } + return Promise.all(results); +} + async function listFileIds() { const ids = []; let token; @@ -59,6 +78,10 @@ async function listFileIds() { return ids; } +/** + * Count objects for one file ID using list only (Size in list response; no HEAD). + * @returns {{ fileId: string, total: number, empty: number, nonEmpty: number }} + */ async function countObjects(fileId) { const listPrefix = `${prefix}${fileId}/`; let total = 0; @@ -75,48 +98,70 @@ async function countObjects(fileId) { const resp = await client.send(cmd); for (const obj of resp.Contents || []) { total += 1; - try { - const head = await client.send(new HeadObjectCommand({ - Bucket, - Key: obj.Key, - })); - const len = head.ContentLength ?? 0; - if (len === 0) empty += 1; - else nonEmpty += 1; - } catch { - total -= 1; - } + const size = obj.Size ?? 0; + if (size === 0) empty += 1; + else nonEmpty += 1; } token = resp.NextContinuationToken; } while (token); - return { total, empty, nonEmpty }; + return { + fileId, total, empty, nonEmpty, + }; } async function main() { + const verbose = process.argv.includes('--verbose') || process.argv.includes('-v'); + console.log(`Org: ${Org}, Bucket: ${Bucket}, prefix: ${prefix}`); const fileIds = await listFileIds(); console.log(`File IDs (version folders): ${fileIds.length}`); + if (fileIds.length === 0) { + console.log('Nothing to analyse.'); + return; + } + + console.log(`Analysing all ${fileIds.length} folders (concurrency: ${CONCURRENCY})...`); + const startMs = Date.now(); + const results = await runWithConcurrency(CONCURRENCY, fileIds, countObjects); + const elapsedSec = (Date.now() - startMs) / 1000; let totalObjects = 0; let totalEmpty = 0; let totalNonEmpty = 0; + const withData = results.filter((r) => r.total > 0); - const sampleIds = fileIds.slice(0, 50); - for (const fileId of sampleIds) { - const { total, empty, nonEmpty } = await countObjects(fileId); - totalObjects += total; - totalEmpty += empty; - totalNonEmpty += nonEmpty; - if (total > 0) { - console.log(` ${fileId}: total=${total} empty=${empty} nonEmpty=${nonEmpty}`); - } + for (const r of results) { + totalObjects += r.total; + totalEmpty += r.empty; + totalNonEmpty += r.nonEmpty; } - if (fileIds.length > 50) { - console.log(` ... (showing first 50; run full count by iterating all ${fileIds.length} IDs)`); - } + // Clear summary: what you have and what Migrate will do + console.log(''); + console.log('--- Summary ---'); + console.log(` File IDs (version folders): ${fileIds.length}`); + console.log(` Total objects (legacy): ${totalObjects}`); + console.log(` Empty (metadata only): ${totalEmpty} → will be converted to audit entries`); + console.log(` With content (snapshots): ${totalNonEmpty} → will be copied to org/repo/.da-versions/fileId/`); + console.log(''); + console.log(' Migrate will:'); + console.log(` • Copy ${totalNonEmpty} snapshot(s) to the new path (one per repo/fileId).`); + console.log(` • Convert ${totalEmpty} empty object(s) to audit lines in audit.txt (same-user + 30 min dedup per file; version entries do not collapse). Final line count is lower — run Migrate with DRY_RUN=1 to see exact numbers.`); + console.log(' • Merge with any existing audit.txt already in the new path (hybrid case).'); + console.log(''); + const idsPerSec = fileIds.length / elapsedSec; + const objectsPerSec = totalObjects / elapsedSec; + console.log( + ` Timing: ${elapsedSec.toFixed(1)}s total | ${idsPerSec.toFixed(0)} file IDs/s | ${objectsPerSec.toFixed(0)} objects/s`, + ); - console.log(`Sample totals (first ${Math.min(50, fileIds.length)} IDs): ${totalObjects} objects, ${totalEmpty} empty, ${totalNonEmpty} with content`); + if (verbose && withData.length > 0) { + console.log(''); + console.log('--- Per-file breakdown ---'); + for (const r of withData.sort((a, b) => b.total - a.total)) { + console.log(` ${r.fileId}: total=${r.total} empty=${r.empty} nonEmpty=${r.nonEmpty}`); + } + } } main().catch((e) => { diff --git a/scripts/version-migrate-run.js b/scripts/version-migrate-run.js old mode 100644 new mode 100755 index e5a9455..4a7c7fe --- a/scripts/version-migrate-run.js +++ b/scripts/version-migrate-run.js @@ -40,6 +40,26 @@ if (process.env.S3_FORCE_PATH_STYLE === 'true') config.forcePathStyle = true; const client = new S3Client(config); const prefix = `${Org}/.da-versions/`; +/** Process N file IDs in parallel. */ +const CONCURRENCY = parseInt(process.env.MIGRATE_RUN_CONCURRENCY || '15', 10); + +async function runWithConcurrency(limit, items, fn) { + const results = []; + const executing = new Set(); + for (const item of items) { + const p = Promise.resolve().then(() => fn(item)); + results.push(p); + executing.add(p); + p.finally(() => { + executing.delete(p); + }); + if (executing.size >= limit) { + await Promise.race(executing); + } + } + return Promise.all(results); +} + function getRepoFromPath(path) { if (!path || typeof path !== 'string') return ''; const first = path.split('/')[0]; @@ -219,6 +239,28 @@ async function migrateFileId(fileId) { repo = firstRepo; } else if (repoSet.size > 1) repo = 'unknown'; + let auditLines = 0; + if (auditEntries.length && repo) { + const dedupedLegacy = dedupeAuditEntries(auditEntries); + const existingInNew = await readExistingAuditInNewPath(repo, fileId); + const combined = [...dedupedLegacy, ...existingInNew].sort( + (a, b) => (parseInt(a.timestamp, 10) || 0) - (parseInt(b.timestamp, 10) || 0), + ); + const deduped = dedupeAuditEntries(combined); + const normalized = deduped.map((e) => normalizeAuditEntry(e, repo)); + auditLines = normalized.length; + if (!DRY_RUN) { + const body = normalized.map(formatAuditLine).join('\n') + (normalized.length ? '\n' : ''); + const auditKey = `${Org}/${repo}/.da-versions/${fileId}/audit.txt`; + await client.send(new PutObjectCommand({ + Bucket, + Key: auditKey, + Body: body, + ContentType: 'text/plain; charset=utf-8', + })); + } + } + if (!DRY_RUN) { for (const s of snapshots) { const destRepo = s.repo || repo; @@ -231,30 +273,13 @@ async function migrateFileId(fileId) { })); } } - - if (auditEntries.length && repo) { - const dedupedLegacy = dedupeAuditEntries(auditEntries); - const existingInNew = await readExistingAuditInNewPath(repo, fileId); - const combined = [...dedupedLegacy, ...existingInNew].sort( - (a, b) => (parseInt(a.timestamp, 10) || 0) - (parseInt(b.timestamp, 10) || 0), - ); - const deduped = dedupeAuditEntries(combined); - const normalized = deduped.map((e) => normalizeAuditEntry(e, repo)); - const body = normalized.map(formatAuditLine).join('\n') + (normalized.length ? '\n' : ''); - const auditKey = `${Org}/${repo}/.da-versions/${fileId}/audit.txt`; - await client.send(new PutObjectCommand({ - Bucket, - Key: auditKey, - Body: body, - ContentType: 'text/plain; charset=utf-8', - })); - } } return { fileId, snapshots: snapshots.length, audit: auditEntries.length, + auditLines, repo: repo || '(none)', }; } @@ -267,15 +292,49 @@ async function main() { console.log(`Org: ${Org}, Bucket: ${Bucket}, DRY_RUN: ${DRY_RUN}`); const fileIds = await listFileIds(); - console.log(`File IDs to process: ${fileIds.length}`); + console.log(`File IDs to process: ${fileIds.length} (concurrency: ${CONCURRENCY})`); + console.log(''); - for (const fileId of fileIds) { + const startMs = Date.now(); + const results = await runWithConcurrency(CONCURRENCY, fileIds, async (fileId) => { try { const result = await migrateFileId(fileId); - console.log(` ${fileId}: ${result.snapshots} snapshots, ${result.audit} audit -> repo ${result.repo}`); + console.log(` ${fileId}: ${result.snapshots} snapshots, ${result.auditLines} audit lines -> repo ${result.repo}`); + return { fileId, ...result, error: null }; } catch (e) { console.error(` ${fileId}: error`, e.message); + return { + fileId, snapshots: 0, audit: 0, auditLines: 0, repo: '', error: e, + }; } + }); + + const errors = results.filter((r) => r.error); + const ok = results.filter((r) => !r.error); + const elapsedSec = (Date.now() - startMs) / 1000; + + if (DRY_RUN && ok.length > 0) { + const totalSnapshots = ok.reduce((sum, r) => sum + r.snapshots, 0); + const totalAuditLines = ok.reduce((sum, r) => sum + (r.auditLines || 0), 0); + const filesWithAudit = ok.filter((r) => (r.auditLines || 0) > 0).length; + console.log(''); + console.log('--- DRY RUN summary (no changes were made) ---'); + console.log(` File IDs processed: ${ok.length}${errors.length > 0 ? ` (${errors.length} error(s))` : ''}`); + console.log(` Snapshots would copy: ${totalSnapshots}`); + console.log(` audit.txt would write: ${filesWithAudit} file(s), ${totalAuditLines} total lines (after dedup + merge)`); + const idsPerSec = ok.length / elapsedSec; + console.log(` Timing: ${elapsedSec.toFixed(1)}s total | ${idsPerSec.toFixed(1)} file IDs/s`); + console.log(' Compare with Analyse: "With content" ≈ snapshots above; run without DRY_RUN to apply.'); + } else if (errors.length > 0) { + console.log(''); + console.log(`Done. ${results.length} processed, ${errors.length} error(s).`); + } + + if (ok.length > 0 && !DRY_RUN) { + const totalSnapshots = ok.reduce((sum, r) => sum + r.snapshots, 0); + const totalAuditLines = ok.reduce((sum, r) => sum + (r.auditLines || 0), 0); + console.log(''); + console.log(`Completed in ${elapsedSec.toFixed(1)}s | ${(ok.length / elapsedSec).toFixed(1)} file IDs/s | ${totalSnapshots} snapshots copied, ${totalAuditLines} audit lines written`); } } diff --git a/scripts/version-migrate-validate.js b/scripts/version-migrate-validate.js old mode 100644 new mode 100755 diff --git a/src/storage/version/list.js b/src/storage/version/list.js index bd5ba65..f3c25d6 100644 --- a/src/storage/version/list.js +++ b/src/storage/version/list.js @@ -134,7 +134,8 @@ async function listFromLegacyStructure(env, { bucket, org, key: _ }, fileId) { /** * Backward compat: merge legacy (org/.da-versions/fileId) with new (repo/.da-versions/fileId) - * so list shows both old and new versions/audits until migration is complete. + * when new path has no snapshots yet. When new path has snapshots (file already migrated), + * use new only to avoid duplicates while legacy may still exist (cleanup later). */ function mergeLegacyAndNewResult(legacyResult, newResult) { if (legacyResult.status !== 200 || !legacyResult.body) return newResult; @@ -161,6 +162,11 @@ export async function listObjectVersions(env, { bucket, org, key }) { if (repo) { const newResult = await listFromNewStructure(env, { bucket, org, key }, fileId, repo); if (newResult) { + const newEntries = JSON.parse(newResult.body); + const hasSnapshotsInNew = newEntries.some((e) => e.url); + if (hasSnapshotsInNew) { + return newResult; + } const legacyResult = await listFromLegacyStructure(env, { bucket, org, key }, fileId); return mergeLegacyAndNewResult(legacyResult, newResult); }