diff --git a/PCAxis.Serializers/PCAxis.Serializers.csproj b/PCAxis.Serializers/PCAxis.Serializers.csproj
index 9a20396..7f5eff4 100644
--- a/PCAxis.Serializers/PCAxis.Serializers.csproj
+++ b/PCAxis.Serializers/PCAxis.Serializers.csproj
@@ -27,7 +27,7 @@
-
+
diff --git a/PCAxis.Serializers/Parquet/ParquetBuilder.cs b/PCAxis.Serializers/Parquet/ParquetBuilder.cs
index 605251e..4d1a97d 100644
--- a/PCAxis.Serializers/Parquet/ParquetBuilder.cs
+++ b/PCAxis.Serializers/Parquet/ParquetBuilder.cs
@@ -2,13 +2,13 @@
using System.Collections.Generic;
using System.Linq;
-using Parquet.Rows;
+using Parquet.Data;
using Parquet.Schema;
using PCAxis.Paxiom;
using PCAxis.Paxiom.Operations;
-namespace PCAxis.Serializers
+namespace PCAxis.Serializers.Parquet
{
public class ParquetBuilder
{
@@ -50,10 +50,10 @@ private static Dictionary BuildDataSymbolMap(PXMeta meta)
///
- /// Populates the Parquet table based on the PXModel data and metadata.
+ /// Populates Parquet columns based on the PXModel data and metadata.
///
- /// The populated Parquet table.
- public Table PopulateTable()
+ /// The populated Parquet data columns.
+ public DataColumn[] PopulateColumns()
{
int matrixSize = model.Data.MatrixColumnCount * model.Data.MatrixRowCount;
double[] data = new double[matrixSize];
@@ -63,6 +63,7 @@ public Table PopulateTable()
.Select(v => v.IsContentVariable && v.Values.Count > 1)
.ToArray();
+ // Generate one logical row index per output row.
var indices = GenerateDataPointIndices(variableValueCounts, isContentMulti);
for (int m = 0; m < matrixSize; m++)
@@ -70,27 +71,90 @@ public Table PopulateTable()
data[m] = model.Data.ReadElement(m);
}
- List dataFields = CreateDataFields();
+ ParquetSchema schema = CreateSchema();
+ DataField[] dataFields = schema.GetDataFields();
+ Dictionary fieldNameToColumnIndex = dataFields.Select((field, idx) => new { field.Name, idx })
+ .ToDictionary(x => x.Name, x => x.idx);
+
+ int rowCount = indices.Count;
+ // Keep an object buffer per column, then cast to typed arrays in one pass.
+ var columnBuffers = new object[dataFields.Length][];
+ for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
+ {
+ columnBuffers[columnIndex] = new object[rowCount];
+ }
+
+ // Build each row once and write values directly into their column buffers.
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
+ {
+ var row = PopulateRow(indices[rowIndex], dataFields.Length, variableValueCounts, data, fieldNameToColumnIndex);
+ for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
+ {
+ columnBuffers[columnIndex][rowIndex] = row[columnIndex];
+ }
+ }
+
+ var columns = new DataColumn[dataFields.Length];
+
+ // Convert object buffers to field-typed arrays required by DataColumn.
+ for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
+ {
+ Array typedValues = ConvertToTypedArray(dataFields[columnIndex], columnBuffers[columnIndex]);
+ columns[columnIndex] = new DataColumn(dataFields[columnIndex], typedValues);
+ }
+
+ return columns;
+ }
+
+ private static Array ConvertToTypedArray(DataField field, object[] values)
+ {
+ // DataColumn expects an Array with the exact CLR element type of the field.
+ Type clrType = field.ClrType;
+
+ if (clrType == typeof(double))
+ {
+ var result = new double[values.Length];
+ for (int i = 0; i < values.Length; i++)
+ {
+ result[i] = values[i] == null ? default : Convert.ToDouble(values[i]);
+ }
+
+ return result;
+ }
+
+ if (clrType == typeof(string))
+ {
+ var result = new string[values.Length];
+ for (int i = 0; i < values.Length; i++)
+ {
+ result[i] = values[i] as string;
+ }
- var table = new Table(dataFields.ToArray());
+ return result;
+ }
- foreach (var index in indices)
+ if (clrType == typeof(DateTime))
{
- var row = PopulateRow(index, dataFields, variableValueCounts, data);
- table.Add(row);
+ var result = new DateTime[values.Length];
+ for (int i = 0; i < values.Length; i++)
+ {
+ result[i] = values[i] == null ? default : (DateTime)values[i];
+ }
+
+ return result;
}
- return table;
+ throw new NotSupportedException($"Unsupported Parquet field type '{clrType}'.");
}
///
- /// Creates the Parquet schema fields based on the PXModel metadata.
+ /// Creates the Parquet schema based on the PXModel metadata.
///
- /// The list of Parquet data fields.
- private List CreateDataFields()
+ /// The Parquet schema.
+ private ParquetSchema CreateSchema()
{
- List dataFields = new List();
+ var dataFields = new List();
int variableCount = model.Meta.Variables.Count;
for (int i = 0; i < variableCount; i++)
@@ -98,15 +162,15 @@ private List CreateDataFields()
var variable = model.Meta.Variables[i];
if (variable.IsContentVariable && variable.Values.Count > 1)
{
- ParquetBuilder.AddContentVariableFields(dataFields, variable);
+ AddContentVariableFields(dataFields, variable);
}
else
{
- ParquetBuilder.AddNonContentVariableFields(dataFields, variable);
+ AddNonContentVariableFields(dataFields, variable);
}
}
- return dataFields;
+ return new ParquetSchema(dataFields.Cast());
}
@@ -161,16 +225,14 @@ private static void AddNonContentVariableFields(List dataFields, Vari
/// Populates a single row in the Parquet table based on the specified index and data.
///
/// The index representing the position of the row in the PXModel data.
- /// The list of Parquet data fields representing the schema.
+ /// The number of fields in the row schema.
/// The counts of values for each variable in the model.
/// The array containing the PXModel data.
/// The populated Parquet row.
- private object[] PopulateRow(int[] index, List dataFields, int[] variableValueCounts, double[] data)
+ private object[] PopulateRow(int[] index, int fieldCount, int[] variableValueCounts, double[] data, Dictionary dataFieldIndices)
{
int variableCount = model.Meta.Variables.Count;
- var row = new object[dataFields.Count];
- Dictionary dataFieldIndices = dataFields.Select((field, idx) => new { field.Name, idx })
- .ToDictionary(x => x.Name, x => x.idx);
+ var row = new object[fieldCount];
for (int i = 0; i < variableCount; i++)
{
diff --git a/PCAxis.Serializers/ParquetSerializer.cs b/PCAxis.Serializers/ParquetSerializer.cs
index c58d346..e81f790 100644
--- a/PCAxis.Serializers/ParquetSerializer.cs
+++ b/PCAxis.Serializers/ParquetSerializer.cs
@@ -1,10 +1,13 @@
using System.IO;
+using System.Linq;
using System.Threading.Tasks;
using Parquet;
-using Parquet.Rows;
+using Parquet.Data;
+using Parquet.Schema;
using PCAxis.Paxiom;
+using PCAxis.Serializers.Parquet;
namespace PCAxis.Serializers
{
@@ -34,19 +37,29 @@ public void Serialize(PXModel model, string path)
public void Serialize(PXModel model, Stream stream)
{
var pb = new ParquetBuilder(model);
- var table = pb.PopulateTable();
- WriteTableAsync(table, stream).GetAwaiter().GetResult();
+ var columns = pb.PopulateColumns();
+ WriteColumnsAsync(columns, stream).GetAwaiter().GetResult();
}
///
- /// Asynchronously writes the Parquet table to the specified stream.
+ /// Asynchronously writes Parquet data columns to the specified stream.
///
- /// The Parquet table to be written.
- /// The stream to write the Parquet table.
+ /// The Parquet data columns to be written.
+ /// The stream to write the Parquet data.
/// A Task representing the asynchronous operation.
- private static async Task WriteTableAsync(Table table, Stream stream)
+ private static async Task WriteColumnsAsync(DataColumn[] columns, Stream stream)
{
- await table.WriteAsync(stream);
+ var schema = new ParquetSchema(columns.Select(column => column.Field).ToList());
+ using (var writer = await ParquetWriter.CreateAsync(schema, stream))
+ {
+ using (var rowGroupWriter = writer.CreateRowGroup())
+ {
+ foreach (var column in columns)
+ {
+ await rowGroupWriter.WriteColumnAsync(column);
+ }
+ }
+ }
}
}
}
diff --git a/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs b/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs
index 442f3e7..6a0df27 100644
--- a/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs
+++ b/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs
@@ -5,7 +5,9 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Parquet.Rows;
+using Parquet;
+using Parquet.Data;
+using Parquet.Schema;
using PCAxis.Paxiom;
using PCAxis.Serializers;
@@ -32,7 +34,7 @@ public void TestInitialize()
[TestMethod, Description("Tests the serialization of PXModel to Parquet format and its correctness.")]
[DynamicData(nameof(GetPxFilePaths))]
- public void ShouldSerializePxModel(string pxFile)
+ public async Task ShouldSerializePxModel(string pxFile)
{
var model = GetPxModelFromFile(pxFile);
@@ -41,47 +43,46 @@ public void ShouldSerializePxModel(string pxFile)
SerializePxModelToParquet(model, outputFile);
- // Sync wrapper around async call
- Table table = ReadBackParquetFileSync(outputFile);
+ var columns = await ReadBackParquetFile(outputFile);
// Assertion: Ensure that the table's row count equals the number of observations
// for a single ContentsCode. If the model has multiple contents, the serializer
// emits additional content columns rather than duplicating rows.
int contentCount = model.Meta.ContentVariable != null ? model.Meta.ContentVariable.Values.Count : 1;
int expectedRows = model.Data.MatrixSize / contentCount;
- Assert.AreEqual(expectedRows, table.Count, $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet.");
+ Assert.HasCount(expectedRows, columns[0], $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet.");
// Assertion: Calculate the amount of columns we should have, based on the metadata
// Number of columns in meta, number of columns in table.
var numberOfColsInPx = CalculateNumberOfColumnsFromPxFile(model);
- var numberOfColsInParq = table.Schema.DataFields.Length;
-
- Assert.AreEqual(numberOfColsInParq, numberOfColsInPx, $"Mismatch in column number for {fileNameWithoutExtension}.parquet.");
+ int numberOfColsInParq = columns.Count;
+ Assert.AreEqual(numberOfColsInPx, numberOfColsInParq, $"Mismatch in column number for {fileNameWithoutExtension}.parquet.");
}
[TestMethod, Description("Tests correct ordering of time variable (pxfile: 16216.px)")]
[DeploymentItem("TestFiles/14216.px")]
- public void TestTimeVariableOrdering()
+ public async Task TestTimeVariableOrdering()
{
var pxFile = "14216.px";
var model = GetPxModelFromFile(pxFile);
string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(pxFile);
string outputFile = Path.Combine(OutputDirectoryPath, $"{fileNameWithoutExtension}.parquet");
SerializePxModelToParquet(model, outputFile);
- Table table = ReadBackParquetFileSync(outputFile);
+ var columns = await ReadBackParquetFile(outputFile);
- Assert.AreEqual(2, table.Count, "Test number of rows");
+ int rowCount = columns.Count == 0 ? 0 : columns[0].Length;
+ Assert.AreEqual(2, rowCount, "Test number of rows");
- Assert.AreEqual("0801", table[0].Values[0], "Test tettsted");
- Assert.AreEqual("2025", table[0].Values[1], "Test year");
- Assert.AreEqual(275.87, table[0].Values[3], "Test ContentsCode_Areal");
- Assert.AreEqual(double.Parse("1110887"), table[0].Values[5], "Test ContentsCode_Bosatte");
+ Assert.AreEqual("0801", columns[0][0], "Test tettsted");
+ Assert.AreEqual("2025", columns[1][0], "Test year");
+ Assert.AreEqual(275.87, columns[3][0], "Test ContentsCode_Areal");
+ Assert.AreEqual(double.Parse("1110887"), columns[5][0], "Test ContentsCode_Bosatte");
- Assert.AreEqual("0801", table[1].Values[0], "Test tettsted");
- Assert.AreEqual("2024", table[1].Values[1], "Test year");
- Assert.AreEqual(276.30, table[1].Values[3], "Test ContentsCode_Areal");
- Assert.AreEqual(double.Parse("1098061"), table[1].Values[5], "Test ContentsCode_Bosatte");
+ Assert.AreEqual("0801", columns[0][1], "Test tettsted");
+ Assert.AreEqual("2024", columns[1][1], "Test year");
+ Assert.AreEqual(276.30, columns[3][1], "Test ContentsCode_Areal");
+ Assert.AreEqual(double.Parse("1098061"), columns[5][1], "Test ContentsCode_Bosatte");
}
private static int CalculateNumberOfColumnsFromPxFile(PXModel model)
@@ -110,15 +111,50 @@ private static int CalculateNumberOfColumnsFromPxFile(PXModel model)
return numberOfCols;
}
- private static Task
ReadBackParquetFileAsync(string parquetFile)
+ private static async Task> ReadBackParquetFile(string parquetFile)
{
- return Table.ReadAsync(parquetFile);
- }
+ using Stream fs = File.OpenRead(parquetFile);
+ using ParquetReader reader = await ParquetReader.CreateAsync(fs);
+ ParquetSchema schema = reader.Schema;
+ DataField[] dataFields = schema.GetDataFields();
+ int totalRowCount = 0;
+
+ // First pass: count rows across all row groups so each column array can be preallocated.
+ for (int i = 0; i < reader.RowGroupCount; i++)
+ {
+ using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i);
+ totalRowCount += (int)rowGroupReader.RowCount;
+ }
- // Synchronous wrapper around the asynchronous method
- private static Table ReadBackParquetFileSync(string parquetFile)
- {
- return Task.Run(() => ReadBackParquetFileAsync(parquetFile)).Result;
+ var columns = new List