diff --git a/packages/plugin-dkg-publisher/migrate.js b/packages/plugin-dkg-publisher/migrate.js deleted file mode 100644 index 4953594c..00000000 --- a/packages/plugin-dkg-publisher/migrate.js +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env node - -/** - * Database migration script for DKG Publisher Plugin - * This script runs Drizzle migrations to create/update database tables - */ - -const { drizzle } = require("drizzle-orm/mysql2"); -const { migrate } = require("drizzle-orm/mysql2/migrator"); -const mysql = require("mysql2/promise"); -const dotenv = require("dotenv"); -const path = require("path"); - -// Load environment variables -dotenv.config({ path: ".env.publisher" }); -dotenv.config(); // Also load from .env if present - -async function runMigrations() { - console.log("šŸš€ Starting database migrations..."); - - // Get database URL from environment - const databaseUrl = process.env.DKGP_DATABASE_URL; - - if (!databaseUrl) { - console.error("āŒ DKGP_DATABASE_URL not found in environment variables"); - console.log("Make sure you have run the setup script: npm run setup"); - process.exit(1); - } - - console.log("šŸ“Š Connecting to database..."); - - let connection; - try { - // Create connection - connection = mysql.createPool(databaseUrl); - - // Create Drizzle instance - const db = drizzle(connection); - - // Run migrations - console.log("šŸ”§ Running migrations..."); - await migrate(db, { - migrationsFolder: path.join(__dirname, "src/database/migrations"), - }); - - console.log("āœ… Migrations completed successfully!"); - - // Verify tables were created - const [tables] = await connection.execute("SHOW TABLES"); - console.log( - `šŸ“‹ Created ${tables.length} tables:`, - tables.map((t) => Object.values(t)[0]).join(", "), - ); - } catch (error) { - console.error("āŒ Migration failed:", error.message); - process.exit(1); - } finally { - if (connection) { - await connection.end(); - console.log("šŸ”Œ Database connection closed"); - } - } -} - -// Run migrations -runMigrations(); diff --git a/packages/plugin-dkg-publisher/setup.js b/packages/plugin-dkg-publisher/setup.js index a52ea17f..f7847fad 100644 --- a/packages/plugin-dkg-publisher/setup.js +++ b/packages/plugin-dkg-publisher/setup.js @@ -49,7 +49,21 @@ function ask(question, options = {}) { ? question.replace(/:/g, " (input hidden):") : question; + // Mute output for password fields so typed characters are not echoed + if (options.password) { + rl._writeToOutput = (str) => { + // Only suppress characters after the prompt has been written + if (str === prompt || str.includes(prompt)) { + process.stdout.write(str); + } + }; + } + rl.question(`${colors.yellow}${prompt}${colors.reset} `, (answer) => { + if (options.password) { + // Print a newline since the user's Enter was not echoed + process.stdout.write("\n"); + } rl.close(); // Handle empty input - use default if available @@ -131,6 +145,90 @@ function getAddressFromPrivateKey(privateKey) { } } +// Bootstrap migration journal for databases created by a previous version of +// setup.js (raw DDL, no __drizzle_migrations table). Without this, migrate() +// would try to re-run CREATE TABLE statements on existing tables and fail. +// This mirrors the logic in src/database/bootstrap.ts but uses raw mysql2 queries. +async function bootstrapJournalForSetup(pool) { + const fsSync = require("fs"); + + const [journals] = await pool.execute( + `SELECT COUNT(*) as cnt FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = '__drizzle_migrations'`, + ); + if (Number(journals[0].cnt) > 0) return; + + const [tables] = await pool.execute( + `SELECT COUNT(*) as cnt FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name IN ('assets', 'wallets', 'publishing_attempts', 'batches')`, + ); + const tableCount = Number(tables[0].cnt); + if (tableCount === 0) return; // Fresh DB + + if (tableCount < 4) { + throw new Error( + "Database is in a partial state (some tables missing). Please choose 'Start fresh' (option 1).", + ); + } + + log(" Bootstrapping migration journal for existing database...", "cyan"); + + await pool.execute(` + CREATE TABLE IF NOT EXISTS __drizzle_migrations ( + id SERIAL PRIMARY KEY, + hash text NOT NULL, + created_at bigint + ) + `); + + const journalPath = path.join( + __dirname, + "src/database/migrations/meta/_journal.json", + ); + const journal = JSON.parse(fsSync.readFileSync(journalPath, "utf-8")); + + // Seed 0000 and 0001 (setup.js schema = post-0001 state) + for (const entry of journal.entries) { + if (entry.idx > 1) break; + const sqlFile = path.join( + __dirname, + `src/database/migrations/${entry.tag}.sql`, + ); + const content = fsSync.readFileSync(sqlFile, "utf-8"); + const hash = crypto.createHash("sha256").update(content).digest("hex"); + await pool.execute( + `INSERT INTO __drizzle_migrations (hash, created_at) VALUES (?, ?)`, + [hash, entry.when], + ); + } + + // Check if 0002 changes are already present + const entry0002 = journal.entries.find((e) => e.idx === 2); + if (entry0002) { + const [hasErrorDetails] = await pool.execute( + `SELECT COUNT(*) as cnt FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = 'publishing_attempts' AND column_name = 'error_details'`, + ); + const [hasPrivateKey] = await pool.execute( + `SELECT COUNT(*) as cnt FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = 'wallets' AND column_name = 'private_key'`, + ); + if ( + Number(hasErrorDetails[0].cnt) > 0 && + Number(hasPrivateKey[0].cnt) > 0 + ) { + const sqlFile = path.join( + __dirname, + `src/database/migrations/${entry0002.tag}.sql`, + ); + const content = fsSync.readFileSync(sqlFile, "utf-8"); + const hash = crypto.createHash("sha256").update(content).digest("hex"); + await pool.execute( + `INSERT INTO __drizzle_migrations (hash, created_at) VALUES (?, ?)`, + [hash, entry0002.when], + ); + } + } + + log(" āœ“ Migration journal bootstrapped", "green"); +} + // Check if configuration already exists async function checkExistingConfig() { const existingEnv = ".env.publisher"; @@ -312,7 +410,7 @@ async function addWalletsOnly(configDetails) { // Insert new wallets for (const wallet of wallets) { await connection.execute( - `INSERT INTO wallets (address, private_key_encrypted, blockchain) VALUES (?, ?, ?)`, + `INSERT INTO wallets (address, private_key, blockchain) VALUES (?, ?, ?)`, [wallet.address, wallet.privateKey, wallet.blockchain], ); } @@ -351,21 +449,23 @@ async function setup() { // Check for existing configuration const { hasConfig, configDetails } = await checkExistingConfig(); + let setupMode; + if (hasConfig) { log("šŸ” Existing configuration detected:", "yellow"); if (configDetails.env) log(" • .env.publisher found", "cyan"); if (configDetails.compose) log(" • docker-compose.knowledge-manager.yml found", "cyan"); - const setupMode = await ask( - "\nChoose setup mode:\n1. Update existing configuration\n2. Start fresh (will backup existing files)\n3. Add wallets only\nChoice (1-3):", + setupMode = await ask( + "\nChoose setup mode:\n1. Start fresh (āš ļø will delete existing tables and backup config files)\n2. Update existing configuration\n3. Add wallets only\nChoice (1-3):", { validate: (input) => ["1", "2", "3"].includes(input), error: "Please enter 1, 2, or 3", }, ); - if (setupMode === "2") { + if (setupMode === "1") { // Backup existing files const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); try { @@ -435,7 +535,7 @@ async function setup() { logStep("3/7", "DKG Network Configuration"); const dkgEndpoint = await ask( - "DKG OT-Node URL (default: http://localhost:8900):", + "DKG Engine URL (default: http://localhost:8900):", { default: "http://localhost:8900", }, @@ -656,7 +756,8 @@ JWT_SECRET=${jwtSecret} # DATADOG_API_KEY= `; - await createFile(".env.publisher", envContent); + const overwriteConfig = setupMode === "1" || setupMode === "2"; // Fresh or Update + await createFile(".env.publisher", envContent, overwriteConfig); // Skip wallet configuration file - wallets will be inserted directly into database @@ -707,6 +808,7 @@ volumes: await createFile( "docker-compose.knowledge-manager.yml", dockerComposeContent, + overwriteConfig, ); // Package.json scripts @@ -776,177 +878,60 @@ volumes: await connection.changeUser({ database: dbName }); log(`āœ“ Connected to database '${dbName}'`, "green"); - // Create tables one by one (MySQL can't handle multiple CREATE statements in one execute) - log("Creating tables...", "cyan"); - - // Drop existing tables if they exist to ensure clean schema - log(" Dropping existing tables if they exist...", "white"); - await connection.execute("SET FOREIGN_KEY_CHECKS = 0"); - await connection.execute("DROP TABLE IF EXISTS wallet_metrics"); - await connection.execute("DROP TABLE IF EXISTS publishing_attempts"); - await connection.execute("DROP TABLE IF EXISTS assets"); - await connection.execute("DROP TABLE IF EXISTS wallets"); - await connection.execute("DROP TABLE IF EXISTS batches"); - await connection.execute("DROP TABLE IF EXISTS metrics_hourly"); - await connection.execute("SET FOREIGN_KEY_CHECKS = 1"); - - // Assets table with new schema - log(" Creating assets table...", "white"); - await connection.execute(` - CREATE TABLE assets ( - id INT AUTO_INCREMENT PRIMARY KEY, - wallet_id INT, - batch_id INT, - - -- Content and metadata - content_url TEXT NOT NULL, - content_size BIGINT NOT NULL, - source VARCHAR(100), - source_id VARCHAR(255), - - -- Publishing configuration - priority INTEGER DEFAULT 50, - privacy ENUM('private', 'public') DEFAULT 'private', - epochs INTEGER DEFAULT 2, - replications INTEGER DEFAULT 1, - max_attempts INTEGER DEFAULT 3, - - -- Status and attempts - status ENUM('pending', 'queued', 'assigned', 'publishing', 'published', 'failed') NOT NULL DEFAULT 'pending', - status_message TEXT, - attempt_count INTEGER DEFAULT 0, - retry_count INTEGER DEFAULT 0, - next_retry_at TIMESTAMP NULL, - last_error TEXT, - - -- Publishing results - ual VARCHAR(255) UNIQUE, - transaction_hash VARCHAR(66), - blockchain VARCHAR(50), - - -- Timestamps - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - queued_at TIMESTAMP NULL, - assigned_at TIMESTAMP NULL, - publishing_started_at TIMESTAMP NULL, - published_at TIMESTAMP NULL, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - - INDEX idx_status (status), - INDEX idx_retry (status, next_retry_at), - INDEX idx_source (source, source_id), - INDEX idx_pending (status, created_at), - INDEX idx_batch (batch_id) - ) - `); - - // Wallets table - log(" Creating wallets table...", "white"); - await connection.execute(` - CREATE TABLE wallets ( - id INT AUTO_INCREMENT PRIMARY KEY, - address VARCHAR(42) UNIQUE NOT NULL, - private_key_encrypted TEXT NOT NULL, - blockchain VARCHAR(50) NOT NULL, - is_active BOOLEAN DEFAULT TRUE, - is_locked BOOLEAN DEFAULT FALSE, - locked_by VARCHAR(100), - locked_at TIMESTAMP NULL, - last_used_at TIMESTAMP NULL, - total_uses INTEGER DEFAULT 0, - successful_uses INTEGER DEFAULT 0, - failed_uses INTEGER DEFAULT 0, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - INDEX idx_available (is_active, is_locked, last_used_at) - ) - `); - - // Publishing attempts table - log(" Creating publishing_attempts table...", "white"); - await connection.execute(` - CREATE TABLE publishing_attempts ( - id INT AUTO_INCREMENT PRIMARY KEY, - asset_id INT NOT NULL, - attempt_number INTEGER NOT NULL, - worker_id VARCHAR(100), - wallet_address VARCHAR(42) NOT NULL, - wallet_id INT, - otnode_url TEXT, - blockchain VARCHAR(50), - transaction_hash VARCHAR(66), - gas_used BIGINT, - status ENUM('started', 'success', 'failed', 'timeout') NOT NULL, - ual VARCHAR(255), - error_type VARCHAR(50), - error_message TEXT, - started_at TIMESTAMP NOT NULL, - completed_at TIMESTAMP NULL, - duration_seconds INTEGER, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (asset_id) REFERENCES assets(id) ON DELETE CASCADE, - INDEX idx_asset_attempts (asset_id, attempt_number), - INDEX idx_wallet_usage (wallet_address, started_at) - ) - `); - - // Batches table - log(" Creating batches table...", "white"); - await connection.execute(` - CREATE TABLE batches ( - id INT AUTO_INCREMENT PRIMARY KEY, - batch_name VARCHAR(255), - source VARCHAR(100), - total_assets INT NOT NULL DEFAULT 0, - pending_count INT NOT NULL DEFAULT 0, - processing_count INT NOT NULL DEFAULT 0, - published_count INT NOT NULL DEFAULT 0, - failed_count INT NOT NULL DEFAULT 0, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - completed_at TIMESTAMP NULL, - INDEX idx_batch_status (created_at, completed_at) - ) - `); - - // Metrics hourly table - log(" Creating metrics_hourly table...", "white"); - await connection.execute(` - CREATE TABLE metrics_hourly ( - hour_timestamp TIMESTAMP PRIMARY KEY NOT NULL, - assets_registered INT DEFAULT 0, - assets_published INT DEFAULT 0, - assets_failed INT DEFAULT 0, - avg_publish_duration_seconds INT, - total_gas_used BIGINT, - unique_wallets_used INT, - INDEX idx_metrics_hour (hour_timestamp) - ) - `); - - // Wallet metrics table - log(" Creating wallet_metrics table...", "white"); - await connection.execute(` - CREATE TABLE wallet_metrics ( - wallet_id INT NOT NULL, - date TIMESTAMP NOT NULL, - total_publishes INT DEFAULT 0, - successful_publishes INT DEFAULT 0, - failed_publishes INT DEFAULT 0, - avg_duration_seconds INT, - total_gas_used BIGINT, - PRIMARY KEY (wallet_id, date), - FOREIGN KEY (wallet_id) REFERENCES wallets(id) ON DELETE CASCADE - ) - `); - - // Add foreign key constraints after all tables are created - log(" Adding foreign key constraints...", "white"); - await connection.execute(` - ALTER TABLE assets - ADD CONSTRAINT fk_assets_wallet_id - FOREIGN KEY (wallet_id) REFERENCES wallets(id) ON DELETE SET NULL - `); - - log("āœ“ Database tables created/verified", "green"); + // Run Drizzle migrations to create/update tables + log("Running database migrations...", "cyan"); + + // Check if this is a fresh start (Mode 1) — drop all tables first + if (setupMode === "1" || !hasConfig) { + log(" Dropping existing tables for fresh setup...", "white"); + await connection.execute("SET FOREIGN_KEY_CHECKS = 0"); + await connection.execute("DROP TABLE IF EXISTS __drizzle_migrations"); + await connection.execute("DROP TABLE IF EXISTS wallet_metrics"); + await connection.execute("DROP TABLE IF EXISTS publishing_attempts"); + await connection.execute("DROP TABLE IF EXISTS assets"); + await connection.execute("DROP TABLE IF EXISTS wallets"); + await connection.execute("DROP TABLE IF EXISTS batches"); + await connection.execute("DROP TABLE IF EXISTS metrics_hourly"); + await connection.execute("SET FOREIGN_KEY_CHECKS = 1"); + } + + // Close the single connection — Drizzle needs a pool + await connection.end(); + connection = null; + + // Run Drizzle migrations via a dedicated connection with FK checks disabled. + // Migration 0001 changes column types that have FK references, and MySQL + // validates FK compatibility on each ALTER TABLE. + const { drizzle } = require("drizzle-orm/mysql2"); + const { migrate } = require("drizzle-orm/mysql2/migrator"); + const migrationConn = await mysql.createConnection({ + host: dbHost, + port: parseInt(dbPort), + user: dbUser, + password: dbPassword, + database: dbName, + }); + try { + // Bootstrap migration journal for existing databases without one + // (e.g. created by a previous version of setup.js with raw DDL) + await bootstrapJournalForSetup(migrationConn); + + await migrate(drizzle(migrationConn), { + migrationsFolder: path.join(__dirname, "src/database/migrations"), + }); + } finally { + await migrationConn.end(); + } + log("āœ“ Database migrations completed", "green"); + + // Reconnect with single connection for wallet insertion + connection = await mysql.createConnection({ + host: dbHost, + port: parseInt(dbPort), + user: dbUser, + password: dbPassword, + database: dbName, + }); // Verify tables were created const [tables] = await connection.execute("SHOW TABLES"); @@ -991,7 +976,7 @@ volumes: const privateKey = wallet.privateKey; const [result] = await connection.execute( - `INSERT INTO wallets (address, private_key_encrypted, blockchain) VALUES (?, ?, ?)`, + `INSERT INTO wallets (address, private_key, blockchain) VALUES (?, ?, ?)`, [wallet.address, privateKey, wallet.blockchain], ); @@ -1086,7 +1071,6 @@ volumes: log("• Add more wallets: npm run setup (choose option 3)", "white"); log(" Workers will auto-restart to match new wallet count", "white"); log("• View dashboard at: /admin/queues (when agent is running)", "white"); - log("• Docker alternative: npm run km:docker:up", "white"); log("• Check health: GET /api/knowledge/health", "white"); log("\nExample usage in DKG Agent plugin:", "yellow"); @@ -1110,16 +1094,12 @@ GET /api/knowledge/metrics/queue GET /api/knowledge/metrics/wallets GET /api/knowledge/health -// MCP Tool (for Claude integration) +// MCP Tool knowledge-asset-publish`, "white", ); log("\nāš ļø Security Notes:", "red"); - log( - "• Wallet private keys are encrypted and stored in the database", - "yellow", - ); log("• Keep your DATABASE_URL and ENCRYPTION_KEY secure", "yellow"); log("• Use environment variables for production deployments", "yellow"); diff --git a/packages/plugin-dkg-publisher/src/database/bootstrap.ts b/packages/plugin-dkg-publisher/src/database/bootstrap.ts new file mode 100644 index 00000000..f6191d24 --- /dev/null +++ b/packages/plugin-dkg-publisher/src/database/bootstrap.ts @@ -0,0 +1,131 @@ +import { Database } from "./index"; +import { sql } from "drizzle-orm"; +import crypto from "crypto"; +import fs from "fs"; +import path from "path"; + +/** + * Bootstrap migration journal for databases created by setup.js (raw DDL). + * + * setup.js creates tables directly without Drizzle migration tracking. + * Without this guard, runMigrations() would try to run 0000 (CREATE TABLE) + * on existing tables and fail. + * + * Logic: + * 1. If __drizzle_migrations exists → already managed by Drizzle, return early + * 2. If core tables don't all exist → fresh DB, let migrations handle it + * 3. If tables exist but no journal → seed journal with already-applied migrations + */ +export async function bootstrapMigrationJournal(db: Database): Promise { + // Check if __drizzle_migrations table exists + const journalExists = await tableExists(db, "__drizzle_migrations"); + if (journalExists) { + return; // Already managed by Drizzle + } + + // Check if core tables exist (setup.js creates these) + const coreTables = ["assets", "wallets", "publishing_attempts", "batches"]; + const existingTables = await Promise.all( + coreTables.map((t) => tableExists(db, t)), + ); + const existingCount = existingTables.filter(Boolean).length; + + if (existingCount === 0) { + return; // Fresh DB — let migrations create everything + } + + if (existingCount < coreTables.length) { + const missing = coreTables.filter((_, i) => !existingTables[i]); + throw new Error( + `Database is in a partial state: tables ${missing.join(", ")} are missing. ` + + `This usually means setup.js crashed mid-creation. ` + + `Please drop all tables and run setup again.`, + ); + } + + // Tables exist but no journal — setup.js-created database + console.log( + "šŸ“‹ Detected setup.js-created database without migration journal. Bootstrapping...", + ); + + // Create the __drizzle_migrations table (same schema Drizzle uses) + await db.execute(sql` + CREATE TABLE IF NOT EXISTS \`__drizzle_migrations\` ( + id SERIAL PRIMARY KEY, + hash text NOT NULL, + created_at bigint + ) + `); + + // Read migration files and compute hashes to seed the journal + const migrationsDir = path.join( + __dirname, + "../src/database/migrations", + ); + const journalPath = path.join(migrationsDir, "meta/_journal.json"); + const journal = JSON.parse(fs.readFileSync(journalPath, "utf-8")); + + // Always seed 0000 and 0001 (setup.js schema = post-0001 state) + for (const entry of journal.entries) { + if (entry.idx > 1) break; // Only seed 0000 and 0001 unconditionally + + const sqlFile = path.join(migrationsDir, `${entry.tag}.sql`); + const content = fs.readFileSync(sqlFile, "utf-8"); + const hash = crypto.createHash("sha256").update(content).digest("hex"); + + await db.execute( + sql`INSERT INTO \`__drizzle_migrations\` (hash, created_at) VALUES (${hash}, ${entry.when})`, + ); + } + + console.log(" āœ“ Seeded journal with migrations 0000 and 0001"); + + // Check if 0002 changes are already present + const entry0002 = journal.entries.find( + (e: { idx: number }) => e.idx === 2, + ); + if (entry0002) { + const hasErrorDetails = await columnExists( + db, + "publishing_attempts", + "error_details", + ); + const hasPrivateKey = await columnExists(db, "wallets", "private_key"); + + if (hasErrorDetails && hasPrivateKey) { + const sqlFile = path.join(migrationsDir, `${entry0002.tag}.sql`); + const content = fs.readFileSync(sqlFile, "utf-8"); + const hash = crypto + .createHash("sha256") + .update(content) + .digest("hex"); + + await db.execute( + sql`INSERT INTO \`__drizzle_migrations\` (hash, created_at) VALUES (${hash}, ${entry0002.when})`, + ); + console.log(" āœ“ Seeded journal with migration 0002 (already applied)"); + } + } + + console.log("āœ… Migration journal bootstrapped successfully"); +} + +async function tableExists(db: Database, tableName: string): Promise { + const result = await db.execute( + sql`SELECT COUNT(*) as cnt FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = ${tableName}`, + ); + const rows = result[0] as Array<{ cnt: number | bigint }>; + return Number(rows[0]?.cnt) > 0; +} + +async function columnExists( + db: Database, + tableName: string, + columnName: string, +): Promise { + const result = await db.execute( + sql`SELECT COUNT(*) as cnt FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = ${tableName} AND column_name = ${columnName}`, + ); + const rows = result[0] as Array<{ cnt: number | bigint }>; + return Number(rows[0]?.cnt) > 0; +} diff --git a/packages/plugin-dkg-publisher/src/database/index.ts b/packages/plugin-dkg-publisher/src/database/index.ts index cb16f959..a8962988 100644 --- a/packages/plugin-dkg-publisher/src/database/index.ts +++ b/packages/plugin-dkg-publisher/src/database/index.ts @@ -11,10 +11,16 @@ export function createDatabase(connectionString: string) { return drizzle(pool, { schema, mode: "default" }); } -export async function runMigrations(db: Database) { - await migrate(db, { - migrationsFolder: path.join(__dirname, "./migrations"), - }); +export async function runMigrations(connectionString: string) { + // Use a dedicated single connection (not the pool) for migration isolation. + const connection = await mysql.createConnection(connectionString); + try { + await migrate(drizzle(connection), { + migrationsFolder: path.join(__dirname, "../src/database/migrations"), + }); + } finally { + await connection.end(); + } } export * from "./schema"; diff --git a/packages/plugin-dkg-publisher/src/database/migrations/0001_amused_dexter_bennett.sql b/packages/plugin-dkg-publisher/src/database/migrations/0001_amused_dexter_bennett.sql index 86dda8a8..dcd4c76e 100644 --- a/packages/plugin-dkg-publisher/src/database/migrations/0001_amused_dexter_bennett.sql +++ b/packages/plugin-dkg-publisher/src/database/migrations/0001_amused_dexter_bennett.sql @@ -1,14 +1,17 @@ +ALTER TABLE `publishing_attempts` DROP FOREIGN KEY `publishing_attempts_asset_id_assets_id_fk`;--> statement-breakpoint +ALTER TABLE `publishing_attempts` DROP FOREIGN KEY `publishing_attempts_wallet_id_wallets_id_fk`;--> statement-breakpoint +ALTER TABLE `wallet_metrics` DROP FOREIGN KEY `wallet_metrics_wallet_id_wallets_id_fk`;--> statement-breakpoint DROP INDEX `idx_content_hash` ON `assets`;--> statement-breakpoint DROP INDEX `idx_source` ON `assets`;--> statement-breakpoint -ALTER TABLE `assets` MODIFY COLUMN `id` serial AUTO_INCREMENT NOT NULL;--> statement-breakpoint +ALTER TABLE `assets` MODIFY COLUMN `id` int NOT NULL AUTO_INCREMENT;--> statement-breakpoint ALTER TABLE `assets` MODIFY COLUMN `batch_id` int;--> statement-breakpoint ALTER TABLE `assets` MODIFY COLUMN `status` enum('pending','queued','assigned','publishing','published','failed') NOT NULL DEFAULT 'pending';--> statement-breakpoint -ALTER TABLE `batches` MODIFY COLUMN `id` serial AUTO_INCREMENT NOT NULL;--> statement-breakpoint -ALTER TABLE `publishing_attempts` MODIFY COLUMN `id` serial AUTO_INCREMENT NOT NULL;--> statement-breakpoint +ALTER TABLE `batches` MODIFY COLUMN `id` int NOT NULL AUTO_INCREMENT;--> statement-breakpoint +ALTER TABLE `publishing_attempts` MODIFY COLUMN `id` int NOT NULL AUTO_INCREMENT;--> statement-breakpoint ALTER TABLE `publishing_attempts` MODIFY COLUMN `asset_id` int NOT NULL;--> statement-breakpoint ALTER TABLE `publishing_attempts` MODIFY COLUMN `wallet_id` int;--> statement-breakpoint ALTER TABLE `wallet_metrics` MODIFY COLUMN `wallet_id` int NOT NULL;--> statement-breakpoint -ALTER TABLE `wallets` MODIFY COLUMN `id` serial AUTO_INCREMENT NOT NULL;--> statement-breakpoint +ALTER TABLE `wallets` MODIFY COLUMN `id` int NOT NULL AUTO_INCREMENT;--> statement-breakpoint ALTER TABLE `assets` DROP INDEX `assets_content_hash_unique`;--> statement-breakpoint ALTER TABLE `assets` ADD `wallet_id` int;--> statement-breakpoint ALTER TABLE `assets` ADD `source` varchar(100);--> statement-breakpoint @@ -18,6 +21,9 @@ ALTER TABLE `assets` ADD `queued_at` timestamp;--> statement-breakpoint ALTER TABLE `assets` ADD `assigned_at` timestamp;--> statement-breakpoint ALTER TABLE `assets` ADD `publishing_started_at` timestamp;--> statement-breakpoint CREATE INDEX `idx_source` ON `assets` (`source`,`source_id`);--> statement-breakpoint +ALTER TABLE `publishing_attempts` ADD CONSTRAINT `publishing_attempts_asset_id_assets_id_fk` FOREIGN KEY (`asset_id`) REFERENCES `assets`(`id`) ON DELETE cascade ON UPDATE no action;--> statement-breakpoint +ALTER TABLE `publishing_attempts` ADD CONSTRAINT `publishing_attempts_wallet_id_wallets_id_fk` FOREIGN KEY (`wallet_id`) REFERENCES `wallets`(`id`) ON DELETE no action ON UPDATE no action;--> statement-breakpoint +ALTER TABLE `wallet_metrics` ADD CONSTRAINT `wallet_metrics_wallet_id_wallets_id_fk` FOREIGN KEY (`wallet_id`) REFERENCES `wallets`(`id`) ON DELETE cascade ON UPDATE no action;--> statement-breakpoint ALTER TABLE `assets` ADD CONSTRAINT `assets_wallet_id_wallets_id_fk` FOREIGN KEY (`wallet_id`) REFERENCES `wallets`(`id`) ON DELETE set null ON UPDATE no action;--> statement-breakpoint ALTER TABLE `assets` DROP COLUMN `content_hash`;--> statement-breakpoint ALTER TABLE `assets` DROP COLUMN `external_id`;--> statement-breakpoint diff --git a/packages/plugin-dkg-publisher/src/database/migrations/0002_add_error_details.sql b/packages/plugin-dkg-publisher/src/database/migrations/0002_add_error_details.sql new file mode 100644 index 00000000..d2bd8fea --- /dev/null +++ b/packages/plugin-dkg-publisher/src/database/migrations/0002_add_error_details.sql @@ -0,0 +1,2 @@ +ALTER TABLE `publishing_attempts` ADD COLUMN `error_details` json;--> statement-breakpoint +ALTER TABLE `wallets` CHANGE COLUMN `private_key_encrypted` `private_key` text NOT NULL; diff --git a/packages/plugin-dkg-publisher/src/database/migrations/20250905_rename_private_key_column.sql b/packages/plugin-dkg-publisher/src/database/migrations/20250905_rename_private_key_column.sql deleted file mode 100644 index bba25d67..00000000 --- a/packages/plugin-dkg-publisher/src/database/migrations/20250905_rename_private_key_column.sql +++ /dev/null @@ -1,5 +0,0 @@ --- Migration to rename private_key_encrypted column to private_key --- The column contains unencrypted private keys, so the name should reflect that - -ALTER TABLE wallets -CHANGE COLUMN private_key_encrypted private_key TEXT NOT NULL; \ No newline at end of file diff --git a/packages/plugin-dkg-publisher/src/database/migrations/meta/0002_snapshot.json b/packages/plugin-dkg-publisher/src/database/migrations/meta/0002_snapshot.json new file mode 100644 index 00000000..11d851d4 --- /dev/null +++ b/packages/plugin-dkg-publisher/src/database/migrations/meta/0002_snapshot.json @@ -0,0 +1,813 @@ +{ + "version": "5", + "dialect": "mysql", + "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "prevId": "f0a69756-52d0-4f78-bfe8-7c3cafc00cce", + "tables": { + "assets": { + "name": "assets", + "columns": { + "id": { + "name": "id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "wallet_id": { + "name": "wallet_id", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "batch_id": { + "name": "batch_id", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "content_url": { + "name": "content_url", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "content_size": { + "name": "content_size", + "type": "bigint", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "source": { + "name": "source", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "source_id": { + "name": "source_id", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "priority": { + "name": "priority", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 50 + }, + "privacy": { + "name": "privacy", + "type": "enum('private','public')", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'private'" + }, + "epochs": { + "name": "epochs", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 2 + }, + "replications": { + "name": "replications", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 1 + }, + "max_attempts": { + "name": "max_attempts", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 3 + }, + "status": { + "name": "status", + "type": "enum('pending','queued','assigned','publishing','published','failed')", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'pending'" + }, + "status_message": { + "name": "status_message", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "attempt_count": { + "name": "attempt_count", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "retry_count": { + "name": "retry_count", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "next_retry_at": { + "name": "next_retry_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_error": { + "name": "last_error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "ual": { + "name": "ual", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "transaction_hash": { + "name": "transaction_hash", + "type": "varchar(66)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "blockchain": { + "name": "blockchain", + "type": "varchar(50)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + }, + "queued_at": { + "name": "queued_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "assigned_at": { + "name": "assigned_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "publishing_started_at": { + "name": "publishing_started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "published_at": { + "name": "published_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "onUpdate": true, + "default": "(now())" + } + }, + "indexes": { + "idx_status": { + "name": "idx_status", + "columns": ["status"], + "isUnique": false + }, + "idx_retry": { + "name": "idx_retry", + "columns": ["status", "next_retry_at"], + "isUnique": false + }, + "idx_source": { + "name": "idx_source", + "columns": ["source", "source_id"], + "isUnique": false + }, + "idx_pending": { + "name": "idx_pending", + "columns": ["status", "created_at"], + "isUnique": false + }, + "idx_batch": { + "name": "idx_batch", + "columns": ["batch_id"], + "isUnique": false + } + }, + "foreignKeys": { + "assets_wallet_id_wallets_id_fk": { + "name": "assets_wallet_id_wallets_id_fk", + "tableFrom": "assets", + "tableTo": "wallets", + "columnsFrom": ["wallet_id"], + "columnsTo": ["id"], + "onDelete": "set null", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "assets_id": { + "name": "assets_id", + "columns": ["id"] + } + }, + "uniqueConstraints": { + "assets_ual_unique": { + "name": "assets_ual_unique", + "columns": ["ual"] + } + } + }, + "batches": { + "name": "batches", + "columns": { + "id": { + "name": "id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "batch_name": { + "name": "batch_name", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "source": { + "name": "source", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_assets": { + "name": "total_assets", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "pending_count": { + "name": "pending_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "processing_count": { + "name": "processing_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "published_count": { + "name": "published_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "failed_count": { + "name": "failed_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "idx_batch_status": { + "name": "idx_batch_status", + "columns": ["created_at", "completed_at"], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": { + "batches_id": { + "name": "batches_id", + "columns": ["id"] + } + }, + "uniqueConstraints": {} + }, + "metrics_hourly": { + "name": "metrics_hourly", + "columns": { + "hour_timestamp": { + "name": "hour_timestamp", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "assets_registered": { + "name": "assets_registered", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "assets_published": { + "name": "assets_published", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "assets_failed": { + "name": "assets_failed", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "avg_publish_duration_seconds": { + "name": "avg_publish_duration_seconds", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_gas_used": { + "name": "total_gas_used", + "type": "bigint", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "unique_wallets_used": { + "name": "unique_wallets_used", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "idx_metrics_hour": { + "name": "idx_metrics_hour", + "columns": ["hour_timestamp"], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": { + "metrics_hourly_hour_timestamp": { + "name": "metrics_hourly_hour_timestamp", + "columns": ["hour_timestamp"] + } + }, + "uniqueConstraints": {} + }, + "publishing_attempts": { + "name": "publishing_attempts", + "columns": { + "id": { + "name": "id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "asset_id": { + "name": "asset_id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "attempt_number": { + "name": "attempt_number", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "worker_id": { + "name": "worker_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "wallet_address": { + "name": "wallet_address", + "type": "varchar(42)", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "wallet_id": { + "name": "wallet_id", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "otnode_url": { + "name": "otnode_url", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "blockchain": { + "name": "blockchain", + "type": "varchar(50)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "transaction_hash": { + "name": "transaction_hash", + "type": "varchar(66)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "gas_used": { + "name": "gas_used", + "type": "bigint", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "status": { + "name": "status", + "type": "enum('started','success','failed','timeout')", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "ual": { + "name": "ual", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error_type": { + "name": "error_type", + "type": "varchar(50)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error_message": { + "name": "error_message", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error_details": { + "name": "error_details", + "type": "json", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "duration_seconds": { + "name": "duration_seconds", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + } + }, + "indexes": { + "idx_asset_attempts": { + "name": "idx_asset_attempts", + "columns": ["asset_id", "attempt_number"], + "isUnique": false + }, + "idx_wallet_usage": { + "name": "idx_wallet_usage", + "columns": ["wallet_address", "started_at"], + "isUnique": false + } + }, + "foreignKeys": { + "publishing_attempts_asset_id_assets_id_fk": { + "name": "publishing_attempts_asset_id_assets_id_fk", + "tableFrom": "publishing_attempts", + "tableTo": "assets", + "columnsFrom": ["asset_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "publishing_attempts_wallet_id_wallets_id_fk": { + "name": "publishing_attempts_wallet_id_wallets_id_fk", + "tableFrom": "publishing_attempts", + "tableTo": "wallets", + "columnsFrom": ["wallet_id"], + "columnsTo": ["id"], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "publishing_attempts_id": { + "name": "publishing_attempts_id", + "columns": ["id"] + } + }, + "uniqueConstraints": {} + }, + "wallet_metrics": { + "name": "wallet_metrics", + "columns": { + "wallet_id": { + "name": "wallet_id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "date": { + "name": "date", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "total_publishes": { + "name": "total_publishes", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "successful_publishes": { + "name": "successful_publishes", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "failed_publishes": { + "name": "failed_publishes", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "avg_duration_seconds": { + "name": "avg_duration_seconds", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_gas_used": { + "name": "total_gas_used", + "type": "bigint", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "wallet_metrics_wallet_id_wallets_id_fk": { + "name": "wallet_metrics_wallet_id_wallets_id_fk", + "tableFrom": "wallet_metrics", + "tableTo": "wallets", + "columnsFrom": ["wallet_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "wallet_metrics_wallet_id_date_pk": { + "name": "wallet_metrics_wallet_id_date_pk", + "columns": ["wallet_id", "date"] + } + }, + "uniqueConstraints": {} + }, + "wallets": { + "name": "wallets", + "columns": { + "id": { + "name": "id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "address": { + "name": "address", + "type": "varchar(42)", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "private_key": { + "name": "private_key", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "blockchain": { + "name": "blockchain", + "type": "varchar(50)", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "is_active": { + "name": "is_active", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": true + }, + "is_locked": { + "name": "is_locked", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": false + }, + "locked_by": { + "name": "locked_by", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "locked_at": { + "name": "locked_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_used_at": { + "name": "last_used_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_uses": { + "name": "total_uses", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "successful_uses": { + "name": "successful_uses", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "failed_uses": { + "name": "failed_uses", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + } + }, + "indexes": { + "idx_available": { + "name": "idx_available", + "columns": ["is_active", "is_locked", "last_used_at"], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": { + "wallets_id": { + "name": "wallets_id", + "columns": ["id"] + } + }, + "uniqueConstraints": { + "wallets_address_unique": { + "name": "wallets_address_unique", + "columns": ["address"] + } + } + } + }, + "schemas": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + } +} diff --git a/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json b/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json index 1bc3a605..b229091a 100644 --- a/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json +++ b/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json @@ -15,6 +15,13 @@ "when": 1756889502071, "tag": "0001_amused_dexter_bennett", "breakpoints": true + }, + { + "idx": 2, + "version": "5", + "when": 1756900000000, + "tag": "0002_add_error_details", + "breakpoints": true } ] } diff --git a/packages/plugin-dkg-publisher/src/database/schema.ts b/packages/plugin-dkg-publisher/src/database/schema.ts index 9b523c3b..c04009ca 100644 --- a/packages/plugin-dkg-publisher/src/database/schema.ts +++ b/packages/plugin-dkg-publisher/src/database/schema.ts @@ -11,7 +11,7 @@ import { index, primaryKey, char, - serial, + json, } from "drizzle-orm/mysql-core"; import { relations } from "drizzle-orm"; @@ -19,7 +19,7 @@ import { relations } from "drizzle-orm"; export const assets = mysqlTable( "assets", { - id: serial("id").primaryKey(), + id: int("id").autoincrement().primaryKey(), walletId: int("wallet_id").references(() => wallets.id, { onDelete: "set null", }), @@ -81,9 +81,9 @@ export const assets = mysqlTable( export const wallets = mysqlTable( "wallets", { - id: serial("id").primaryKey(), + id: int("id").autoincrement().primaryKey(), address: varchar("address", { length: 42 }).notNull().unique(), - privateKey: text("private_key_encrypted").notNull(), + privateKey: text("private_key").notNull(), blockchain: varchar("blockchain", { length: 50 }).notNull(), isActive: boolean("is_active").default(true), isLocked: boolean("is_locked").default(false), @@ -108,7 +108,7 @@ export const wallets = mysqlTable( export const publishingAttempts = mysqlTable( "publishing_attempts", { - id: serial("id").primaryKey(), + id: int("id").autoincrement().primaryKey(), assetId: int("asset_id") .notNull() .references(() => assets.id, { onDelete: "cascade" }), @@ -129,6 +129,7 @@ export const publishingAttempts = mysqlTable( ual: varchar("ual", { length: 255 }), errorType: varchar("error_type", { length: 50 }), errorMessage: text("error_message"), + errorDetails: json("error_details"), startedAt: timestamp("started_at").notNull(), completedAt: timestamp("completed_at"), durationSeconds: int("duration_seconds"), @@ -150,7 +151,7 @@ export const publishingAttempts = mysqlTable( export const batches = mysqlTable( "batches", { - id: serial("id").primaryKey(), + id: int("id").autoincrement().primaryKey(), batchName: varchar("batch_name", { length: 255 }), source: varchar("source", { length: 100 }), totalAssets: int("total_assets").notNull().default(0), diff --git a/packages/plugin-dkg-publisher/src/services/AssetService.ts b/packages/plugin-dkg-publisher/src/services/AssetService.ts index 6c955c9a..acf0c2c3 100644 --- a/packages/plugin-dkg-publisher/src/services/AssetService.ts +++ b/packages/plugin-dkg-publisher/src/services/AssetService.ts @@ -395,6 +395,7 @@ export class AssetService extends EventEmitter { transactionHash?: string; errorType?: string; errorMessage?: string; + errorDetails?: Record; gasUsed?: number; durationSeconds?: number; }, diff --git a/packages/plugin-dkg-publisher/src/services/PublishingService.ts b/packages/plugin-dkg-publisher/src/services/PublishingService.ts index 1774c2aa..797c9178 100644 --- a/packages/plugin-dkg-publisher/src/services/PublishingService.ts +++ b/packages/plugin-dkg-publisher/src/services/PublishingService.ts @@ -4,12 +4,14 @@ import { assets } from "../database/schema"; import { eq, and, sql } from "drizzle-orm"; import { publishingLogger as logger } from "./Logger"; import { DkgService } from "./DkgService"; +import { serializeErrorDetails } from "./errorUtils"; export interface PublishResult { success: boolean; ual?: string; transactionHash?: string; error?: string; + errorDetails?: Record; } export class PublishingService { @@ -163,7 +165,11 @@ export class PublishingService { status: result.operation.publish.status, }); - throw new Error(`DKG API Error: ${errorType} - ${errorMessage}`); + const err = new Error(`DKG API Error: ${errorType} - ${errorMessage}`); + (err as any).operationId = result.operation.publish.operationId; + (err as any).operationStatus = result.operation.publish.status; + (err as any).dkgOperation = result.operation.publish; + throw err; } // ONLY update as published if we actually have a UAL @@ -223,6 +229,7 @@ export class PublishingService { return { success: false, error: error.message, + errorDetails: serializeErrorDetails(error), }; } } diff --git a/packages/plugin-dkg-publisher/src/services/QueueService.ts b/packages/plugin-dkg-publisher/src/services/QueueService.ts index 1e391beb..0f366967 100644 --- a/packages/plugin-dkg-publisher/src/services/QueueService.ts +++ b/packages/plugin-dkg-publisher/src/services/QueueService.ts @@ -8,6 +8,7 @@ import { PublishingService } from "./PublishingService"; import { WalletService } from "./WalletService"; import { AssetService } from "./AssetService"; import { queueLogger as logger } from "./Logger"; +import { serializeErrorDetails } from "./errorUtils"; export interface QueueStats { waiting: number; @@ -275,7 +276,11 @@ export class QueueService { `āŒ PUBLISHING FAILED for asset ${assetId}: ${result.error}`, { workerId: i, assetId, error: result.error }, ); - throw new Error(result.error || "Publishing failed"); + const publishError = new Error(result.error || "Publishing failed"); + if (result.errorDetails) { + (publishError as any).errorDetails = result.errorDetails; + } + throw publishError; } logger.info(`šŸŽ‰ PUBLISHING SUCCESSFUL for asset ${assetId}`, { @@ -320,11 +325,14 @@ export class QueueService { } // Update publishing attempt record as failed + // Use pre-serialized errorDetails from PublishingService if available, + // otherwise serialize the raw error (e.g. "No available wallets") if (attemptId) { await this.assetService.updatePublishingAttempt(attemptId, { status: "failed", errorType: error.name || "Error", errorMessage: error.message, + errorDetails: error.errorDetails ?? serializeErrorDetails(error), durationSeconds: Math.floor( (Date.now() - job.timestamp) / 1000, ), diff --git a/packages/plugin-dkg-publisher/src/services/WalletService.ts b/packages/plugin-dkg-publisher/src/services/WalletService.ts index 3e95f6d0..45923127 100644 --- a/packages/plugin-dkg-publisher/src/services/WalletService.ts +++ b/packages/plugin-dkg-publisher/src/services/WalletService.ts @@ -134,10 +134,7 @@ export class WalletService { return null; } - // Add private key (stored as plaintext for now) - const walletWithKey = wallet[0] as any; - walletWithKey.privateKey = walletWithKey.privateKeyEncrypted; - return walletWithKey; + return wallet[0]; } /** diff --git a/packages/plugin-dkg-publisher/src/services/errorUtils.ts b/packages/plugin-dkg-publisher/src/services/errorUtils.ts new file mode 100644 index 00000000..073320d7 --- /dev/null +++ b/packages/plugin-dkg-publisher/src/services/errorUtils.ts @@ -0,0 +1,63 @@ +/** + * Serializes an error object into a plain object capturing all useful properties. + * Handles circular references, BigInt values, and non-enumerable Error properties. + */ +export function serializeErrorDetails(error: unknown): Record { + if (error === null || error === undefined) { + return { message: String(error) }; + } + + if (typeof error === "string") { + return { message: error }; + } + + if (typeof error !== "object") { + return { message: String(error) }; + } + + const seen = new WeakSet(); + + function sanitize(value: unknown): unknown { + if (value === null || value === undefined) return value; + if (typeof value === "bigint") return value.toString(); + if (typeof value === "function") return undefined; + if (typeof value !== "object") return value; + + if (seen.has(value as object)) return "[Circular]"; + seen.add(value as object); + + if (Array.isArray(value)) { + return value.map(sanitize); + } + + const result: Record = {}; + for (const key of Object.keys(value as Record)) { + if (key === "stack") continue; + result[key] = sanitize((value as Record)[key]); + } + return result; + } + + // Add root error to seen set to prevent circular references back to it + seen.add(error as object); + + const err = error as Record; + const result: Record = {}; + + // Extract standard Error properties (non-enumerable on some engines) + if (error instanceof Error) { + result.message = err.message; + result.name = err.name; + if ((error as any).code !== undefined) result.code = (error as any).code; + if ((error as any).reason !== undefined) + result.reason = (error as any).reason; + } + + // Merge all enumerable properties + for (const key of Object.keys(err)) { + if (key === "stack") continue; + result[key] = sanitize(err[key]); + } + + return result; +} diff --git a/packages/plugin-dkg-publisher/src/services/index.ts b/packages/plugin-dkg-publisher/src/services/index.ts index e74df5b2..12112f54 100644 --- a/packages/plugin-dkg-publisher/src/services/index.ts +++ b/packages/plugin-dkg-publisher/src/services/index.ts @@ -1,5 +1,6 @@ import IORedis from "ioredis"; -import { Database, createDatabase } from "../database"; +import { Database, createDatabase, runMigrations } from "../database"; +import { bootstrapMigrationJournal } from "../database/bootstrap"; import { ServiceContainer } from "./ServiceContainer"; import { WalletService } from "./WalletService"; import { AssetService } from "./AssetService"; @@ -23,10 +24,13 @@ export async function initializeServices( console.log(`šŸ”§ initializeServices called at ${Date.now()}`); const container = new ServiceContainer(); - // Initialize database + // Initialize database and run migrations const db = createDatabase(config.database.connectionString); container.register("db", db); + await bootstrapMigrationJournal(db); + await runMigrations(config.database.connectionString); + // Initialize Redis const redis = new IORedis({ host: config.redis.host,