Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 94 additions & 26 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ jobs:
repository: nextcloud/context_chat
path: apps/context_chat
persist-credentials: false
# todo: remove later
ref: feat/reverse-content-flow

- name: Checkout backend
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
Expand Down Expand Up @@ -167,6 +169,10 @@ jobs:
cd ..
rm -rf documentation

- name: Run files scan
run: |
./occ files:scan --all

- name: Setup python 3.11
uses: actions/setup-python@42375524e23c412d93fb67b49958b491fce71c38 # v5
with:
Expand Down Expand Up @@ -197,26 +203,101 @@ jobs:
ls -la context_chat_backend/persistent_storage/*
sleep 30 # Wait for the em server to get ready

- name: Scan files, baseline
run: |
./occ files:scan admin
./occ context_chat:scan admin -m text/plain

- name: Check python memory usage
- name: Initial memory usage check
run: |
ps -p $(cat pid.txt) -o pid,cmd,%mem,rss --sort=-%mem
ps -p $(cat pid.txt) -o %mem --no-headers > initial_mem.txt

- name: Scan files
- name: Run cron jobs
run: |
./occ files:scan admin
./occ context_chat:scan admin -m text/markdown &
./occ context_chat:scan admin -m text/x-rst
# every 10 seconds indefinitely
while true; do
php cron.php
sleep 10
done &

- name: Check python memory usage
- name: Periodically check context_chat stats for 15 minutes to allow the backend to index the files
run: |
ps -p $(cat pid.txt) -o pid,cmd,%mem,rss --sort=-%mem
ps -p $(cat pid.txt) -o %mem --no-headers > after_scan_mem.txt
success=0
echo "::group::Checking stats periodically for 15 minutes to allow the backend to index the files"
for i in {1..90}; do
echo "Checking stats, attempt $i..."

stats_err=$(mktemp)
stats=$(timeout 5 ./occ context_chat:stats 2>"$stats_err")
stats_exit=$?
echo "Stats output:"
echo "$stats"
if [ -s "$stats_err" ]; then
echo "Stderr:"
cat "$stats_err"
fi
echo "---"
rm -f "$stats_err"

# Check for critical errors in output
if [ $stats_exit -ne 0 ] || echo "$stats" | grep -q "Error during request"; then
echo "Backend connection error detected (exit=$stats_exit), retrying..."
sleep 10
continue
fi

# Extract Total eligible files
total_files=$(echo "$stats" | grep -oP 'Total eligible files:\s*\K\d+' || echo "")

# Extract Indexed documents count (files__default)
indexed_count=$(echo "$stats" | grep -oP "'files__default'\s*=>\s*\K\d+" || echo "")

# Validate parsed values
if [ -z "$total_files" ] || [ -z "$indexed_count" ]; then
echo "Error: Could not parse stats output properly"
if echo "$stats" | grep -q "Indexed documents:"; then
echo " Indexed documents section found but could not extract count"
fi
sleep 10
continue
fi

echo "Total eligible files: $total_files"
echo "Indexed documents (files__default): $indexed_count"

# Calculate absolute difference
diff=$((total_files - indexed_count))
if [ $diff -lt 0 ]; then
diff=$((-diff))
fi

# Calculate 2% threshold using bc for floating point support
threshold=$(echo "scale=4; $total_files * 0.02" | bc)

# Check if difference is within tolerance
if (( $(echo "$diff <= $threshold" | bc -l) )); then
echo "Indexing within 2% tolerance (diff=$diff, threshold=$threshold)"
success=1
break
else
pct=$(echo "scale=2; ($diff / $total_files) * 100" | bc)
echo "Outside 2% tolerance: diff=$diff (${pct}%), threshold=$threshold"
fi

# Check if backend is still alive
ccb_alive=$(ps -p $(cat pid.txt) -o cmd= | grep -c "main.py" || echo "0")
if [ "$ccb_alive" -eq 0 ]; then
echo "Error: Context Chat Backend process is not running. Exiting."
exit 1
fi

sleep 10
done

echo "::endgroup::"

./occ context_chat:stats

if [ $success -ne 1 ]; then
echo "Max attempts reached"
exit 1
fi

- name: Run the prompts
run: |
Expand Down Expand Up @@ -250,19 +331,6 @@ jobs:
echo "Memory usage during scan is stable. No memory leak detected."
fi

- name: Compare memory usage and detect leak
run: |
initial_mem=$(cat after_scan_mem.txt | tr -d ' ')
final_mem=$(cat after_prompt_mem.txt | tr -d ' ')
echo "Initial Memory Usage: $initial_mem%"
echo "Memory Usage after prompt: $final_mem%"

if (( $(echo "$final_mem > $initial_mem" | bc -l) )); then
echo "Memory usage has increased during prompt. Possible memory leak detected!"
else
echo "Memory usage during prompt is stable. No memory leak detected."
fi

- name: Show server logs
if: always()
run: |
Expand Down
14 changes: 14 additions & 0 deletions appinfo/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,19 @@ Setup background job workers as described here: https://docs.nextcloud.com/serve
<description>Password to be used for authenticating requests to the OpenAI-compatible endpoint set in CC_EM_BASE_URL.</description>
</variable>
</environment-variables>
<k8s-service-roles>
<role>
<name>rp</name>
<display-name>Request Processing Mode</display-name>
<env>APP_ROLE=rp</env>
<expose>true</expose>
</role>
<role>
<name>indexing</name>
<display-name>Indexing Mode</display-name>
<env>APP_ROLE=indexing</env>
<expose>false</expose>
</role>
</k8s-service-roles>
</external-app>
</info>
71 changes: 47 additions & 24 deletions context_chat_backend/chain/ingest/doc_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,28 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
#

import asyncio
import logging
import re
import tempfile
from collections.abc import Callable
from typing import BinaryIO
from io import BytesIO

import docx2txt
from epub2txt import epub2txt
from fastapi import UploadFile
from langchain_unstructured import UnstructuredLoader
from odfdo import Document
from pandas import read_csv, read_excel
from pypdf import PdfReader
from pypdf.errors import FileNotDecryptedError as PdfFileNotDecryptedError
from striprtf import striprtf

from ...types import SourceItem, TaskProcException
from .task_proc import do_ocr, do_transcription

logger = logging.getLogger('ccb.doc_loader')

def _temp_file_wrapper(file: BinaryIO, loader: Callable, sep: str = '\n') -> str:
def _temp_file_wrapper(file: BytesIO, loader: Callable, sep: str = '\n') -> str:
raw_bytes = file.read()
with tempfile.NamedTemporaryFile(mode='wb') as tmp:
tmp.write(raw_bytes)
Expand All @@ -35,46 +38,46 @@ def _temp_file_wrapper(file: BinaryIO, loader: Callable, sep: str = '\n') -> str

# -- LOADERS -- #

def _load_pdf(file: BinaryIO) -> str:
def _load_pdf(file: BytesIO) -> str:
pdf_reader = PdfReader(file)
return '\n\n'.join([page.extract_text().strip() for page in pdf_reader.pages])


def _load_csv(file: BinaryIO) -> str:
def _load_csv(file: BytesIO) -> str:
return read_csv(file).to_string(header=False, na_rep='')


def _load_epub(file: BinaryIO) -> str:
def _load_epub(file: BytesIO) -> str:
return _temp_file_wrapper(file, epub2txt).strip()


def _load_docx(file: BinaryIO) -> str:
def _load_docx(file: BytesIO) -> str:
return docx2txt.process(file).strip()


def _load_odt(file: BinaryIO) -> str:
def _load_odt(file: BytesIO) -> str:
return _temp_file_wrapper(file, lambda fp: Document(fp).get_formatted_text()).strip()


def _load_ppt_x(file: BinaryIO) -> str:
def _load_ppt_x(file: BytesIO) -> str:
return _temp_file_wrapper(file, lambda fp: UnstructuredLoader(fp).load()).strip()


def _load_rtf(file: BinaryIO) -> str:
def _load_rtf(file: BytesIO) -> str:
return striprtf.rtf_to_text(file.read().decode('utf-8', 'ignore')).strip()


def _load_xml(file: BinaryIO) -> str:
def _load_xml(file: BytesIO) -> str:
data = file.read().decode('utf-8', 'ignore')
data = re.sub(r'</.+>', '', data)
return data.strip()


def _load_xlsx(file: BinaryIO) -> str:
def _load_xlsx(file: BytesIO) -> str:
return read_excel(file, na_filter=False).to_string(header=False, na_rep='')


def _load_email(file: BinaryIO, ext: str = 'eml') -> str | None:
def _load_email(file: BytesIO, ext: str = 'eml') -> str | None:
# NOTE: msg format is not tested
if ext not in ['eml', 'msg']:
return None
Expand Down Expand Up @@ -115,30 +118,50 @@ def attachment_partitioner(
}


def decode_source(source: UploadFile) -> str | None:
def decode_source(source: SourceItem) -> str | None:
io_obj: BytesIO | None = None
try:
# .pot files are powerpoint templates but also plain text files,
# so we skip them to prevent decoding errors
if source.headers['title'].endswith('.pot'):
if source.title.endswith('.pot'):
return None

mimetype = source.headers['type']
mimetype = source.type
if mimetype is None:
return None

try:
if mimetype.startswith('image/'):
return asyncio.run(do_ocr(source.userIds[0], source.file_id))
if mimetype.startswith('audio/'):
return asyncio.run(do_transcription(source.userIds[0], source.file_id))
except TaskProcException as e:
# todo: convert this to error obj return
# todo: short circuit all other ocr/transcription files when a fatal error arrives
# todo: maybe with a global ttl, with a retryable tag
logger.warning(f'OCR task failed for source file ({source.reference}): {e}')
return None
except ValueError:
# should not happen
logger.warning(f'Unexpected ValueError for source file ({source.reference})')
return None

if isinstance(source.content, str):
io_obj = BytesIO(source.content.encode('utf-8', 'ignore'))
else:
io_obj = source.content

if _loader_map.get(mimetype):
result = _loader_map[mimetype](source.file)
source.file.close()
result = _loader_map[mimetype](io_obj)
return result.encode('utf-8', 'ignore').decode('utf-8', 'ignore')

result = source.file.read().decode('utf-8', 'ignore')
source.file.close()
return result
return io_obj.read().decode('utf-8', 'ignore')
except PdfFileNotDecryptedError:
logger.warning(f'PDF file ({source.filename}) is encrypted and cannot be read')
logger.warning(f'PDF file ({source.reference}) is encrypted and cannot be read')
return None
except Exception:
logger.exception(f'Error decoding source file ({source.filename})', stack_info=True)
logger.exception(f'Error decoding source file ({source.reference})', stack_info=True)
return None
finally:
source.file.close() # Ensure file is closed after processing
if io_obj is not None:
io_obj.close()
Loading
Loading