Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(Inge

int stringBuilderTokenCount = 0;
StringBuilder stringBuilder = new();
Dictionary<string, object>? accumulatedMetadata = null;
foreach (IngestionDocumentElement element in document.EnumerateContent())
{
cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -57,6 +58,7 @@ public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(Inge

int contentToProcessTokenCount = _tokenizer.CountTokens(elementContent!, considerNormalization: false);
ReadOnlyMemory<char> contentToProcess = elementContent.AsMemory();
bool elementMetadataAccumulated = false;
while (stringBuilderTokenCount + contentToProcessTokenCount >= _maxTokensPerChunk)
{
int index = _tokenizer.GetIndexByTokenCount(
Expand All @@ -66,35 +68,59 @@ public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(Inge
out int _,
considerNormalization: false);

// Accumulate metadata the first time this element contributes content.
if (!elementMetadataAccumulated && index > 0)
{
AccumulateMetadata(element, ref accumulatedMetadata);
elementMetadataAccumulated = true;
}

unsafe
{
fixed (char* ptr = &MemoryMarshal.GetReference(contentToProcess.Span))
{
_ = stringBuilder.Append(ptr, index);
}
}
yield return FinalizeChunk();
yield return FinalizeChunk(ref accumulatedMetadata);

contentToProcess = contentToProcess.Slice(index);
contentToProcessTokenCount = _tokenizer.CountTokens(contentToProcess.Span, considerNormalization: false);
}

// Accumulate metadata if the element only contributed content after the loop.
if (!elementMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
}

_ = stringBuilder.Append(contentToProcess);
stringBuilderTokenCount += contentToProcessTokenCount;
}

if (stringBuilder.Length > 0)
{
yield return FinalizeChunk();
yield return FinalizeChunk(ref accumulatedMetadata);
}
yield break;

IngestionChunk<string> FinalizeChunk()
IngestionChunk<string> FinalizeChunk(ref Dictionary<string, object>? metadata)
{
IngestionChunk<string> chunk = new IngestionChunk<string>(
content: stringBuilder.ToString(),
document: document,
context: string.Empty);

if (metadata is { Count: > 0 })
{
foreach (var kvp in metadata)
{
chunk.Metadata[kvp.Key] = kvp.Value;
}

metadata = null;
}

_ = stringBuilder.Clear();
stringBuilderTokenCount = 0;

Expand All @@ -121,5 +147,29 @@ IngestionChunk<string> FinalizeChunk()
}
}

private static void AccumulateMetadata(IngestionDocumentElement element, ref Dictionary<string, object>? accumulated)
{
if (!element.HasMetadata)
{
return;
}

accumulated ??= [];
foreach (var kvp in element.Metadata)
{
if (kvp.Value is not null)
{
#if NET
accumulated.TryAdd(kvp.Key, kvp.Value);
#else
if (!accumulated.ContainsKey(kvp.Key))
{
accumulated[kvp.Key] = kvp.Value;
}
#endif
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,
{
// Not using yield return here as we use ref structs.
List<IngestionChunk<string>> chunks = [];
Dictionary<string, object>? accumulatedMetadata = null;

int contextTokenCount = CountTokens(context.AsSpan());
int totalTokenCount = contextTokenCount;
Expand Down Expand Up @@ -70,12 +71,15 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,
int elementTokenCount = CountTokens(semanticContent.AsSpan());
if (elementTokenCount + totalTokenCount <= _maxTokensPerChunk)
{
// Element fits in the current chunk — accumulate its metadata here.
AccumulateMetadata(element, ref accumulatedMetadata);
totalTokenCount += elementTokenCount;
AppendNewLineAndSpan(_currentChunk, semanticContent.AsSpan());
}
else if (element is IngestionDocumentTable table)
{
ValueStringBuilder tableBuilder = new(initialCapacity: 8000);
bool tableMetadataAccumulated = false;

try
{
Expand Down Expand Up @@ -113,6 +117,13 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,
// We append the table as long as it's not just the header.
if (rowIndex != 1)
{
// Accumulate metadata before first table content append.
if (!tableMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
tableMetadataAccumulated = true;
}

AppendNewLineAndSpan(_currentChunk, tableBuilder.AsSpan(0, tableLength - Environment.NewLine.Length));
}

Expand All @@ -137,6 +148,12 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,
totalTokenCount += lastRowTokens;
}

// Accumulate metadata before appending remaining table content.
if (!tableMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
}

AppendNewLineAndSpan(_currentChunk, tableBuilder.AsSpan(0, tableLength - Environment.NewLine.Length));
}
finally
Expand All @@ -147,6 +164,7 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,
else
{
ReadOnlySpan<char> remainingContent = semanticContent.AsSpan();
bool elementMetadataAccumulated = false;

while (!remainingContent.IsEmpty)
{
Expand All @@ -170,6 +188,13 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,
tokenCount = CountTokens(remainingContent.Slice(0, index));
}

// Accumulate metadata the first time this element contributes content.
if (!elementMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
elementMetadataAccumulated = true;
}

totalTokenCount += tokenCount;
ReadOnlySpan<char> spanToAppend = remainingContent.Slice(0, index);
AppendNewLineAndSpan(_currentChunk, spanToAppend);
Expand All @@ -196,7 +221,9 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,

if (totalTokenCount > contextTokenCount)
{
chunks.Add(new(_currentChunk.ToString(), document, context));
var chunk = new IngestionChunk<string>(_currentChunk.ToString(), document, context);
ApplyMetadata(chunk, accumulatedMetadata);
chunks.Add(chunk);
}

_currentChunk.Clear();
Expand All @@ -205,7 +232,10 @@ internal IEnumerable<IngestionChunk<string>> Process(IngestionDocument document,

void Commit()
{
chunks.Add(new(_currentChunk.ToString(), document, context));
var chunk = new IngestionChunk<string>(_currentChunk.ToString(), document, context);
ApplyMetadata(chunk, accumulatedMetadata);
chunks.Add(chunk);
accumulatedMetadata = null;

// We keep the context in the current chunk as it's the same for all elements.
_currentChunk.Remove(
Expand Down Expand Up @@ -268,6 +298,43 @@ private static void AddMarkdownTableSeparatorRow(int columnCount, ref ValueStrin
vsb.Append(Environment.NewLine);
}

private static void AccumulateMetadata(IngestionDocumentElement element, ref Dictionary<string, object>? accumulated)
{
if (!element.HasMetadata)
{
return;
}

accumulated ??= [];
foreach (var kvp in element.Metadata)
{
if (kvp.Value is not null)
{
#if NET
accumulated.TryAdd(kvp.Key, kvp.Value);
#else
if (!accumulated.ContainsKey(kvp.Key))
{
accumulated[kvp.Key] = kvp.Value;
}
#endif
}
}
}

private static void ApplyMetadata(IngestionChunk<string> chunk, Dictionary<string, object>? accumulated)
{
if (accumulated is null or { Count: 0 })
{
return;
}

foreach (var kvp in accumulated)
{
chunk.Metadata[kvp.Key] = kvp.Value;
}
}

private int CountTokens(ReadOnlySpan<char> input)
=> _tokenizer.CountTokens(input, considerNormalization: false);
}
Loading
Loading