diff --git a/docs/MEDIA_RECOVERY_GUIDE.md b/docs/MEDIA_RECOVERY_GUIDE.md new file mode 100644 index 000000000..22834d073 --- /dev/null +++ b/docs/MEDIA_RECOVERY_GUIDE.md @@ -0,0 +1,267 @@ +# Media Recovery System — Agent Reference Guide + +> **Branch:** `fix/media-key-jsonb-updateMediaMessage` +> **4 commits** on top of `release/2.3.7` — all production-tested. + +## Overview + +This patch adds 3 new endpoints + 2 bug fixes to Evolution API, making it **self-sufficient** for WhatsApp media recovery without external orchestrators. + +### Problem Solved + +WhatsApp CDN URLs expire after ~7 days. Once expired, media files (SOR documents, images, etc.) become permanently inaccessible unless you: +1. Have the original `mediaKey` + `directPath` (stored in EA's Message table) +2. Can trigger WhatsApp's `updateMediaMessage` protocol (asks sender to re-upload) +3. Can request historical messages via `fetchMessageHistory` (on-demand history sync) + +Previously, only OwnPilot had these capabilities. Now EA has them natively. + +--- + +## Endpoints + +### 1. `POST /chat/retryMediaFromMetadata/{instance}` + +**Purpose:** Download media using caller-supplied metadata — does NOT require message to exist in EA's DB. + +**When to use:** +- You have `mediaKey` + `directPath` from an external source (e.g., OwnPilot DB) +- The message exists in EA but `getBase64FromMediaMessage` fails (DB lookup issue) + +**Request:** +```json +{ + "messageId": "3EB0D228037ED522E72774", + "remoteJid": "120363423491841999@g.us", + "participant": "119365882089638@lid", + "fromMe": false, + "mediaKey": "base64-encoded-key", + "directPath": "/v/t62.7119-24/...", + "url": "https://mmg.whatsapp.net/...", + "mimeType": "application/octet-stream", + "filename": "2314CP_82_V1.SOR", + "fileLength": 20973, + "convertToMp4": false +} +``` + +**Response:** +```json +{ + "base64": "TWFwAMgAfA...", + "mimetype": "application/octet-stream", + "filename": "2314CP_82_V1.SOR" +} +``` + +**Algorithm:** +1. Reconstruct minimal WAMessage proto from provided metadata +2. Try direct `downloadMediaMessage` (fast-path — CDN still valid) +3. On failure → explicit `updateMediaMessage` with 30s timeout (Baileys RC9 workaround) +4. Retry download with refreshed URL + +**Edge Cases:** +- `mediaKey` from PostgreSQL JSONB may be stored as `{0: 123, 1: 45, ...}` object instead of Uint8Array — the code handles both formats via lexicographic sort fix (commit `262c9300`) +- `updateMediaMessage` times out after 30s if sender is permanently offline — throws `BadRequestException` +- Audio files with `convertToMp4: true` are processed via `processAudioMp4` + +--- + +### 2. `POST /chat/fetchGroupHistory/{instance}` + +**Purpose:** Trigger WhatsApp on-demand history sync for a group. WhatsApp responds with old message protos containing **fresh mediaKey + directPath**. + +**When to use:** +- Messages are missing from EA's DB (were sent before EA was connected) +- You need fresh mediaKeys for messages whose CDN URLs expired + +**Request:** +```json +{ + "groupJid": "120363423491841999@g.us", + "count": 50, + "anchorMessageId": "3EB0DCCA32F22B9AA2A3B4", + "anchorTimestamp": 1765216930, + "anchorFromMe": false, + "anchorParticipant": "90383560261829@lid" +} +``` + +**Response (immediate — 202-style):** +```json +{ + "sessionId": "3EB006B411C1B0933F9410", + "groupJid": "120363423491841999@g.us", + "count": 50, + "message": "History sync requested. WhatsApp will deliver messages via messaging-history.set event (async)." +} +``` + +**Algorithm:** +1. Validate groupJid ends with `@g.us` +2. Rate-limit check (1 call per 30 seconds) +3. Call `sock.fetchMessageHistory(count, anchorKey, anchorTimestamp)` +4. WhatsApp delivers messages asynchronously via `messaging-history.set` event +5. Messages are stored in DB if `DATABASE_SAVE_DATA_HISTORIC=true` + +**CRITICAL Prerequisites:** +- `DATABASE_SAVE_DATA_HISTORIC=true` must be set in env — otherwise messages arrive but are NOT saved to DB +- `daysLimitImportMessages` in Chatwoot config should be high (e.g., 1000) — otherwise old messages are filtered out +- EA must be the **sole linked device** on the WhatsApp number — if another client (e.g., OwnPilot) is connected, WhatsApp may route the response to that client instead + +**Edge Cases:** +- Rate limited: 1 call per 30 seconds. Calling faster throws `BadRequestException` with wait time +- Empty anchor (`anchorMessageId: ""`) — WhatsApp may not respond at all +- WhatsApp returns messages OLDER than the anchor (backward direction only) +- Duplicate messages are handled by `messagesRepository.has(m.key.id)` check — no duplicates in DB +- Max 50 messages per call (WhatsApp protocol limit) +- Response is async — poll DB count or check logs to verify delivery + +**Iterative Fetching Pattern:** +``` +1. Find oldest message in DB → use as anchor +2. Call fetchGroupHistory +3. Wait 35s (30s rate-limit + 5s buffer) +4. Check if DB count increased +5. If increased → repeat from step 1 (new oldest message = new anchor) +6. If no increase → reached beginning of history +``` + +--- + +### 3. `POST /chat/batchRecoverMedia/{instance}` + +**Purpose:** End-to-end batch recovery pipeline. For each message: DB lookup → download → MinIO upload → media record → mediaUrl update. + +**When to use:** +- You have message IDs in EA's DB with expired CDN URLs +- You want to permanently store media in MinIO (S3) and update DB references + +**Request:** +```json +{ + "messageIds": ["3EB0D228037ED522E72774", "3EB0DCCA32F22B9AA2A3B4"], + "continueOnError": true, + "storeToMinIO": true +} +``` + +**Response:** +```json +{ + "total": 2, + "ok": 1, + "skip": 1, + "error": 0, + "results": [ + { + "messageId": "3EB0D228037ED522E72774", + "status": "ok", + "mediaUrl": "http://minio:9000/evolution-media/..." + }, + { + "messageId": "3EB0DCCA32F22B9AA2A3B4", + "status": "skip", + "error": "Already stored in MinIO" + } + ] +} +``` + +**Algorithm per message:** +1. Fetch message from DB by `key.id` + `instanceId` +2. Extract media metadata from `documentMessage | imageMessage | videoMessage | audioMessage | stickerMessage` +3. Skip if no `mediaKey`/`directPath` +4. Skip if `mediaUrl` already points to non-WhatsApp URL (already in MinIO) +5. Handle JSONB mediaKey format: `Object.keys().sort((a,b)=>parseInt(a)-parseInt(b))` for numeric key ordering +6. Call `retryMediaFromMetadata` with `getBuffer=true` +7. Upload buffer to MinIO via `s3Service.uploadFile` +8. Upsert `Media` record in DB +9. Update `message.mediaUrl` in the document message content + +**Edge Cases:** +- JSONB mediaKey sort: PostgreSQL stores `{0:x, 1:y, 10:z, 2:w}` — lexicographic sort gives wrong byte order. Numeric sort fix applied. +- `continueOnError: false` — stops at first failure, returns partial results +- `storeToMinIO: false` — downloads but doesn't upload (useful for testing) +- S3 not enabled — downloads and reports size but doesn't upload +- Message not found in DB → `status: "skip"` +- Empty buffer after download → `status: "error"` +- Presigned URLs in `mediaUrl` expire after 7 days — but the object persists in MinIO. Generate new presigned URL via `s3Service.getObjectUrl()` + +**Batch Processing Pattern:** +```python +# Recommended: 10 per batch, 1-2s delay between batches +for batch in chunks(message_ids, 10): + response = POST /chat/batchRecoverMedia/{instance} { messageIds: batch } + # Each batch takes ~10-30s depending on CDN/updateMediaMessage +``` + +--- + +## Bug Fixes (included in this patch) + +### Fix 1: JSONB mediaKey Sort (commit `262c9300`) + +**Problem:** PostgreSQL stores Uint8Array as JSONB object `{0: 182, 1: 45, 10: 67, 2: 99, ...}`. JavaScript `Object.keys()` returns lexicographic order: `["0", "1", "10", "2", ...]` — wrong byte sequence → HKDF decryption fails. + +**Fix:** `Object.keys(mediaKey).sort((a, b) => parseInt(a) - parseInt(b)).map(k => mediaKey[k])` + +**Affected:** `getBase64FromMediaMessage` + `batchRecoverMedia` + +### Fix 2: Baileys RC9 `reuploadRequest` Dead Code (commit `f268571b`) + +**Problem:** Baileys 7.0.0-rc.9 wires `reuploadRequest` callback in download options, but the catch block checks `error.status` while the actual error has `output.statusCode` — callback never triggers on 410/404. + +**Fix:** Explicit `updateMediaMessage()` call in the catch block with 30s timeout, bypassing Baileys' broken internal retry. + +**Affected:** `getBase64FromMediaMessage` + `retryMediaFromMetadata` + +--- + +## Environment Configuration + +**Required for history sync to work:** +```env +DATABASE_SAVE_DATA_HISTORIC=true # MUST be set — otherwise messaging-history.set messages are dropped +``` + +**Required for old message import:** +```sql +-- In Chatwoot table, increase daysLimitImportMessages (default: 3 days) +UPDATE "Chatwoot" SET "daysLimitImportMessages" = 1000 +WHERE "instanceId" = ''; +``` + +**Required for MinIO storage:** +```env +S3_ENABLED=true +S3_BUCKET=evolution-media +S3_PORT=9000 +S3_ENDPOINT=minio +S3_ACCESS_KEY= +S3_SECRET_KEY= +``` + +--- + +## Production Results + +Tested on GoConnectIT WhatsApp instance (Euronet SOR documents): + +| Metric | Before | After | +|--------|--------|-------| +| Total messages | 1646 | 1870 (+224) | +| Oldest message | Dec 8, 2025 | Nov 10, 2025 | +| SOR files in MinIO | 0 | 1132/1137 (99.6%) | +| Irrecoverable | — | 5 (sender permanently offline) | + +--- + +## File Changes + +| File | Changes | +|------|---------| +| `src/api/dto/chat.dto.ts` | +3 DTOs: `RetryMediaFromMetadataDto`, `FetchGroupHistoryDto`, `BatchRecoverMediaDto` | +| `src/api/controllers/chat.controller.ts` | +3 controller methods | +| `src/api/routes/chat.router.ts` | +3 route registrations | +| `src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts` | +3 service methods, 2 bug fixes | diff --git a/src/api/controllers/chat.controller.ts b/src/api/controllers/chat.controller.ts index 22e90b9fa..28fe36972 100644 --- a/src/api/controllers/chat.controller.ts +++ b/src/api/controllers/chat.controller.ts @@ -1,7 +1,9 @@ import { ArchiveChatDto, + BatchRecoverMediaDto, BlockUserDto, DeleteMessage, + FetchGroupHistoryDto, getBase64FromMediaMessageDto, MarkChatUnreadDto, NumberDto, @@ -10,6 +12,7 @@ import { ProfilePictureDto, ProfileStatusDto, ReadMessageDto, + RetryMediaFromMetadataDto, SendPresenceDto, UpdateMessageDto, WhatsAppNumberDto, @@ -58,6 +61,18 @@ export class ChatController { return await this.waMonitor.waInstances[instanceName].getBase64FromMediaMessage(data); } + public async retryMediaFromMetadata({ instanceName }: InstanceDto, data: RetryMediaFromMetadataDto) { + return await this.waMonitor.waInstances[instanceName].retryMediaFromMetadata(data); + } + + public async fetchGroupHistory({ instanceName }: InstanceDto, data: FetchGroupHistoryDto) { + return await this.waMonitor.waInstances[instanceName].fetchGroupHistory(data); + } + + public async batchRecoverMedia({ instanceName }: InstanceDto, data: BatchRecoverMediaDto) { + return await this.waMonitor.waInstances[instanceName].batchRecoverMedia(data); + } + public async fetchMessages({ instanceName }: InstanceDto, query: Query) { return await this.waMonitor.waInstances[instanceName].fetchMessages(query); } diff --git a/src/api/dto/chat.dto.ts b/src/api/dto/chat.dto.ts index b11f32b05..87d78c1ae 100644 --- a/src/api/dto/chat.dto.ts +++ b/src/api/dto/chat.dto.ts @@ -127,3 +127,35 @@ export class BlockUserDto { number: string; status: 'block' | 'unblock'; } + +export class RetryMediaFromMetadataDto { + messageId: string; + remoteJid: string; + participant?: string; + fromMe?: boolean; + mediaKey: string; // base64-encoded + directPath: string; + url: string; + mimeType?: string; + filename?: string; + fileLength?: number; + convertToMp4?: boolean; +} + +export class FetchGroupHistoryDto { + groupJid: string; + count?: number; // default 50, max 50 + anchorMessageId?: string; + anchorTimestamp?: number; + anchorFromMe?: boolean; + anchorParticipant?: string; +} + +export class BatchRecoverMediaDto { + /** List of WhatsApp message key IDs to recover */ + messageIds: string[]; + /** Continue processing remaining messages on individual failure (default: true) */ + continueOnError?: boolean; + /** Upload recovered buffer to MinIO and update DB mediaUrl (default: true, requires S3 enabled) */ + storeToMinIO?: boolean; +} diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 60e857fcc..33e2c7e46 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -2,10 +2,13 @@ import { getCollectionsDto } from '@api/dto/business.dto'; import { OfferCallDto } from '@api/dto/call.dto'; import { ArchiveChatDto, + BatchRecoverMediaDto, BlockUserDto, DeleteMessage, + FetchGroupHistoryDto, getBase64FromMediaMessageDto, LastMessage, + RetryMediaFromMetadataDto, MarkChatUnreadDto, NumberBusiness, OnWhatsAppDto, @@ -3889,8 +3892,21 @@ export class BaileysStartupService extends ChannelStartupService { } } - if (typeof mediaMessage['mediaKey'] === 'object') { - msg.message[mediaType].mediaKey = Uint8Array.from(Object.values(mediaMessage['mediaKey'])); + if (typeof mediaMessage['mediaKey'] === 'string') { + // base64-encoded string (e.g. from HTTP request body) → Uint8Array + // This matches OwnPilot retryMediaFromMetadata: new Uint8Array(Buffer.from(base64, 'base64')) + msg.message[mediaType].mediaKey = new Uint8Array(Buffer.from(mediaMessage['mediaKey'], 'base64')); + } else if ( + typeof mediaMessage['mediaKey'] === 'object' && + !Buffer.isBuffer(mediaMessage['mediaKey']) && + !(mediaMessage['mediaKey'] instanceof Uint8Array) + ) { + // Plain object {0:b0, 1:b1, ...} from PostgreSQL JSONB deserialization. + // CRITICAL: JSONB stores keys lexicographically ("0","1","10","11",...,"2",...,"9") + // so Object.values() gives WRONG byte order. Must sort keys numerically first. + const keyObj = mediaMessage['mediaKey'] as Record; + const sortedKeys = Object.keys(keyObj).sort((a, b) => parseInt(a, 10) - parseInt(b, 10)); + msg.message[mediaType].mediaKey = new Uint8Array(sortedKeys.map((k) => keyObj[k])); } let buffer: Buffer; @@ -3903,30 +3919,57 @@ export class BaileysStartupService extends ChannelStartupService { { logger: P({ level: 'error' }) as any, reuploadRequest: this.client.updateMediaMessage }, ); } catch { - this.logger.error('Download Media failed, trying to retry in 5 seconds...'); - await new Promise((resolve) => setTimeout(resolve, 5000)); - const mediaType = Object.keys(msg.message).find((key) => key.endsWith('Message')); - if (!mediaType) throw new Error('Could not determine mediaType for fallback'); + this.logger.error('Download Media failed, attempting explicit updateMediaMessage (Baileys RC9 reuploadRequest bug workaround)...'); + // Baileys RC9 bug: reuploadRequest callback never triggers because downloadMediaMessage + // checks error.status but Boom sets output.statusCode — so the automatic re-upload path + // is dead code. Fix: explicitly call updateMediaMessage to get a fresh CDN URL, + // then retry the download. This mirrors the technique used in OwnPilot retryMediaFromMetadata(). try { - const media = await downloadContentFromMessage( - { - mediaKey: msg.message?.[mediaType]?.mediaKey, - directPath: msg.message?.[mediaType]?.directPath, - url: `https://mmg.whatsapp.net${msg?.message?.[mediaType]?.directPath}`, - }, - await this.mapMediaType(mediaType), + const REUPLOAD_TIMEOUT_MS = 30_000; + this.logger.info('Requesting media re-upload from sender device via updateMediaMessage...'); + const updatedMsg = await Promise.race([ + this.client.updateMediaMessage({ key: msg.key, message: msg.message }), + new Promise((_, reject) => + setTimeout( + () => reject(new Error('updateMediaMessage timed out after 30s — sender device may be offline')), + REUPLOAD_TIMEOUT_MS, + ), + ), + ]); + buffer = await downloadMediaMessage( + updatedMsg, + 'buffer', {}, + { logger: P({ level: 'error' }) as any, reuploadRequest: this.client.updateMediaMessage }, ); - const chunks = []; - for await (const chunk of media) { - chunks.push(chunk); + this.logger.info('Download Media successful after explicit updateMediaMessage!'); + } catch (reuploadErr) { + this.logger.error(`updateMediaMessage failed: ${reuploadErr?.message} — falling back to downloadContentFromMessage...`); + await new Promise((resolve) => setTimeout(resolve, 5000)); + const mediaType = Object.keys(msg.message).find((key) => key.endsWith('Message')); + if (!mediaType) throw new Error('Could not determine mediaType for fallback'); + + try { + const media = await downloadContentFromMessage( + { + mediaKey: msg.message?.[mediaType]?.mediaKey, + directPath: msg.message?.[mediaType]?.directPath, + url: `https://mmg.whatsapp.net${msg?.message?.[mediaType]?.directPath}`, + }, + await this.mapMediaType(mediaType), + {}, + ); + const chunks = []; + for await (const chunk of media) { + chunks.push(chunk); + } + buffer = Buffer.concat(chunks); + this.logger.info('Download Media with downloadContentFromMessage was successful!'); + } catch (fallbackErr) { + this.logger.error('Download Media with downloadContentFromMessage also failed!'); + throw fallbackErr; } - buffer = Buffer.concat(chunks); - this.logger.info('Download Media with downloadContentFromMessage was successful!'); - } catch (fallbackErr) { - this.logger.error('Download Media with downloadContentFromMessage also failed!'); - throw fallbackErr; } } const typeMessage = getContentType(msg.message); @@ -5119,4 +5162,263 @@ export class BaileysStartupService extends ChannelStartupService { }, }; } + + /** + * Retry media download using caller-supplied metadata (mediaKey base64, directPath, url). + * Does NOT require the message to exist in EA's own DB — mirrors OwnPilot retryMediaFromMetadata(). + * + * Algorithm (same as OwnPilot): + * 1. Reconstruct minimal WAMessage from provided metadata + * 2. Try direct downloadMediaMessage (fast-path for valid CDN) + * 3. If expired → explicit updateMediaMessage [30s timeout] → retry download + */ + public async retryMediaFromMetadata(data: RetryMediaFromMetadataDto, getBuffer = false) { + if (!this.client || this.connectionStatus?.state !== 'open') { + throw new BadRequestException('WhatsApp is not connected'); + } + + const { + messageId, remoteJid, participant, fromMe = false, + mediaKey, directPath, url, mimeType, filename, fileLength, convertToMp4 = false, + } = data; + + // Reconstruct WAMessage proto from caller-supplied metadata + const mediaKeyBytes = new Uint8Array(Buffer.from(mediaKey, 'base64')); + const reconstructedMsg = { + key: { id: messageId, remoteJid, fromMe, participant }, + message: { + documentMessage: { + url, + directPath, + mediaKey: mediaKeyBytes, + mimetype: mimeType ?? 'application/octet-stream', + fileName: filename, + fileLength: fileLength != null ? BigInt(fileLength) as any : undefined, + }, + }, + }; + + let buffer: Buffer; + + // Step 1: direct download + try { + buffer = await downloadMediaMessage( + reconstructedMsg as any, + 'buffer', + {}, + { logger: P({ level: 'error' }) as any, reuploadRequest: this.client.updateMediaMessage }, + ); + this.logger.log(`[retryMediaFromMetadata] Direct download success msgId=${messageId} size=${buffer?.length}`); + } catch { + // Step 2: explicit updateMediaMessage (Baileys RC9 bug workaround — same as getBase64FromMediaMessage) + this.logger.error(`[retryMediaFromMetadata] Direct download failed for msgId=${messageId}, requesting re-upload...`); + try { + const updatedMsg = await Promise.race([ + this.client.updateMediaMessage(reconstructedMsg as any), + new Promise((_, reject) => + setTimeout(() => reject(new Error('updateMediaMessage timed out — sender offline')), 30_000), + ), + ]); + buffer = await downloadMediaMessage( + updatedMsg, + 'buffer', + {}, + { logger: P({ level: 'error' }) as any, reuploadRequest: this.client.updateMediaMessage }, + ); + this.logger.log(`[retryMediaFromMetadata] Re-upload success msgId=${messageId} size=${buffer?.length}`); + } catch (err: any) { + throw new BadRequestException(`Media recovery failed for msgId=${messageId}: ${err?.message}`); + } + } + + if (!buffer) throw new BadRequestException(`Empty buffer for msgId=${messageId}`); + + if (convertToMp4 && mimeType?.includes('audio')) { + try { + const mp4 = await this.processAudioMp4(buffer.toString('base64')); + const mp4Str = Buffer.isBuffer(mp4) ? (mp4 as Buffer).toString('base64') : (mp4 as string); + return getBuffer ? Buffer.from(mp4Str, 'base64') : { base64: mp4Str, mimetype: 'video/mp4', filename }; + } catch { /* fall through to raw */ } + } + + const base64 = buffer.toString('base64'); + return getBuffer ? buffer : { base64, mimetype: mimeType, filename }; + } + + /** + * Trigger on-demand WhatsApp history sync for a group. + * WhatsApp responds with old message protos — including fresh mediaKey + directPath — + * which are then stored in EA's Message table via messaging-history.set event. + * This is how OwnPilot recovers mediaKeys for old messages not in EA's DB. + * + * Rate-limited: 1 call per 30 seconds (WhatsApp ban risk). + */ + private _lastHistoryFetchTime = 0; + + public async fetchGroupHistory(data: FetchGroupHistoryDto) { + if (!this.client || this.connectionStatus?.state !== 'open') { + throw new BadRequestException('WhatsApp is not connected'); + } + + const { groupJid, count = 50, anchorMessageId, anchorTimestamp, anchorFromMe = false, anchorParticipant } = data; + + if (!groupJid.endsWith('@g.us')) { + throw new BadRequestException('groupJid must end with @g.us'); + } + + const now = Date.now(); + if (now - this._lastHistoryFetchTime < 30_000) { + const waitSec = Math.ceil((30_000 - (now - this._lastHistoryFetchTime)) / 1000); + throw new BadRequestException(`Rate limited — wait ${waitSec}s before next fetchGroupHistory`); + } + this._lastHistoryFetchTime = now; + + const safeCount = Math.min(Math.max(count, 1), 50); + const anchorKey = { + remoteJid: groupJid, + fromMe: anchorFromMe, + id: anchorMessageId ?? '', + participant: anchorParticipant, + }; + const anchorTs = anchorTimestamp ?? 0; + + const sessionId = await this.client.fetchMessageHistory(safeCount, anchorKey, anchorTs); + + this.logger.log( + `[fetchGroupHistory] Requested ${safeCount} messages for ${groupJid} ` + + `anchorId=${anchorKey.id || 'none'} anchorTs=${anchorTs} sessionId=${sessionId}`, + ); + + return { + sessionId, + groupJid, + count: safeCount, + message: 'History sync requested. WhatsApp will deliver messages via messaging-history.set event (async).', + }; + } + + /** + * Batch media recovery pipeline — mirrors OwnPilot's recover-media endpoint. + * + * For each messageId: + * 1. Fetch metadata (mediaKey, directPath, url, mimeType, fileName) from EA's Message table + * 2. Call retryMediaFromMetadata (direct download → on fail: updateMediaMessage + retry) + * 3. If S3 enabled and storeToMinIO=true: + * a. Upload buffer to MinIO (same path structure as regular media handler) + * b. Upsert prismaRepository.media record + * c. Update message.mediaUrl in DB + * + * This makes EA self-sufficient for expired CDN recovery without external orchestrators. + */ + public async batchRecoverMedia(data: BatchRecoverMediaDto) { + const { messageIds, continueOnError = true, storeToMinIO = true } = data; + + if (!messageIds?.length) throw new BadRequestException('messageIds array is required and must not be empty'); + + const s3Enabled = this.configService.get('S3').ENABLE; + const results: Array<{ messageId: string; status: 'ok' | 'skip' | 'error'; mediaUrl?: string; error?: string }> = []; + + for (const msgKeyId of messageIds) { + try { + // Step 1: Fetch message metadata from DB + const dbMsg = await this.prismaRepository.message.findFirst({ + where: { key: { path: ['id'], equals: msgKeyId }, instanceId: this.instanceId }, + }); + + if (!dbMsg) { + results.push({ messageId: msgKeyId, status: 'skip', error: 'Message not found in DB' }); + continue; + } + + const msgContent: any = typeof dbMsg.message === 'object' && dbMsg.message !== null ? dbMsg.message : {}; + const doc = msgContent.documentMessage ?? msgContent.imageMessage ?? msgContent.videoMessage + ?? msgContent.audioMessage ?? msgContent.stickerMessage; + + if (!doc?.mediaKey || !doc?.directPath) { + results.push({ messageId: msgKeyId, status: 'skip', error: 'No mediaKey/directPath in message' }); + continue; + } + + // Skip if already has a MinIO mediaUrl (already recovered) + if (doc.mediaUrl && !doc.mediaUrl.includes('mmg.whatsapp.net')) { + results.push({ messageId: msgKeyId, status: 'skip', error: 'Already stored in MinIO' }); + continue; + } + + const key: any = typeof dbMsg.key === 'object' && dbMsg.key !== null ? dbMsg.key : {}; + const mimeType: string = doc.mimetype ?? 'application/octet-stream'; + const filename: string = doc.fileName ?? doc.title ?? `${msgKeyId}.bin`; + + // Step 2: Download (with expired URL recovery) + const buffer = await this.retryMediaFromMetadata( + { + messageId: msgKeyId, + remoteJid: key.remoteJid ?? '', + participant: key.participant, + fromMe: key.fromMe ?? false, + mediaKey: typeof doc.mediaKey === 'string' ? doc.mediaKey + : Buffer.from(Object.keys(doc.mediaKey).sort((a, b) => parseInt(a) - parseInt(b)).map((k) => doc.mediaKey[k])).toString('base64'), + directPath: doc.directPath, + url: doc.url ?? '', + mimeType, + filename, + }, + true, // getBuffer=true + ) as Buffer; + + if (!Buffer.isBuffer(buffer) || buffer.length === 0) { + results.push({ messageId: msgKeyId, status: 'error', error: 'Empty buffer returned' }); + if (!continueOnError) break; + continue; + } + + // Step 3: Upload to MinIO + update DB + if (s3Enabled && storeToMinIO) { + const mediaType = 'document'; + const fullName = join( + `${this.instance.id}`, + key.remoteJid ?? 'unknown', + mediaType, + `${Date.now()}_${filename}`, + ); + + await s3Service.uploadFile(fullName, buffer, buffer.length, { 'Content-Type': mimeType }); + + // Upsert media record + const existingMedia = await this.prismaRepository.media.findFirst({ + where: { messageId: dbMsg.id, instanceId: this.instanceId }, + }); + if (!existingMedia) { + await this.prismaRepository.media.create({ + data: { messageId: dbMsg.id, instanceId: this.instanceId, type: mediaType, fileName: fullName, mimetype: mimeType }, + }); + } + + // Update message.mediaUrl so future reads skip re-download + const mediaUrl = await s3Service.getObjectUrl(fullName); + const updatedContent = { ...msgContent }; + const docKey = Object.keys(updatedContent).find((k) => k.endsWith('Message')); + if (docKey) updatedContent[docKey] = { ...updatedContent[docKey], mediaUrl }; + await this.prismaRepository.message.update({ where: { id: dbMsg.id }, data: { message: updatedContent } }); + + this.logger.log(`[batchRecoverMedia] ✓ ${filename} → MinIO: ${fullName} (${buffer.length} bytes)`); + results.push({ messageId: msgKeyId, status: 'ok', mediaUrl }); + } else { + // S3 disabled — just report size + this.logger.log(`[batchRecoverMedia] ✓ ${filename} downloaded (${buffer.length} bytes, S3 disabled)`); + results.push({ messageId: msgKeyId, status: 'ok' }); + } + } catch (err: any) { + this.logger.error(`[batchRecoverMedia] ✗ ${msgKeyId}: ${err?.message}`); + results.push({ messageId: msgKeyId, status: 'error', error: err?.message }); + if (!continueOnError) break; + } + } + + const ok = results.filter((r) => r.status === 'ok').length; + const skip = results.filter((r) => r.status === 'skip').length; + const error = results.filter((r) => r.status === 'error').length; + + return { total: messageIds.length, ok, skip, error, results }; + } } diff --git a/src/api/routes/chat.router.ts b/src/api/routes/chat.router.ts index 158947ed2..bdee347ad 100644 --- a/src/api/routes/chat.router.ts +++ b/src/api/routes/chat.router.ts @@ -1,8 +1,10 @@ import { RouterBroker } from '@api/abstract/abstract.router'; import { ArchiveChatDto, + BatchRecoverMediaDto, BlockUserDto, DeleteMessage, + FetchGroupHistoryDto, getBase64FromMediaMessageDto, MarkChatUnreadDto, NumberDto, @@ -11,6 +13,7 @@ import { ProfilePictureDto, ProfileStatusDto, ReadMessageDto, + RetryMediaFromMetadataDto, SendPresenceDto, UpdateMessageDto, WhatsAppNumberDto, @@ -120,6 +123,36 @@ export class ChatRouter extends RouterBroker { return res.status(HttpStatus.CREATED).json(response); }) + .post(this.routerPath('retryMediaFromMetadata'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: null, + ClassRef: RetryMediaFromMetadataDto, + execute: (instance, data) => chatController.retryMediaFromMetadata(instance, data), + }); + + return res.status(HttpStatus.CREATED).json(response); + }) + .post(this.routerPath('fetchGroupHistory'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: null, + ClassRef: FetchGroupHistoryDto, + execute: (instance, data) => chatController.fetchGroupHistory(instance, data), + }); + + return res.status(HttpStatus.OK).json(response); + }) + .post(this.routerPath('batchRecoverMedia'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: null, + ClassRef: BatchRecoverMediaDto, + execute: (instance, data) => chatController.batchRecoverMedia(instance, data), + }); + + return res.status(HttpStatus.OK).json(response); + }) // TODO: corrigir updateMessage para medias tambem .post(this.routerPath('updateMessage'), ...guards, async (req, res) => { const response = await this.dataValidate({