Skip to content
2 changes: 1 addition & 1 deletion PCAxis.Serializers/PCAxis.Serializers.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</PackageReference>

<PackageReference Include="Newtonsoft.Json" Version="13.0.4" />
<PackageReference Include="Parquet.Net" Version="4.25.0" />
<PackageReference Include="Parquet.Net" Version="5.5.0" />
<PackageReference Include="PCAxis.Core" Version="1.3.0" />
<PackageReference Include="PCAxis.Metadata" Version="1.0.5" />
<PackageReference Include="PCAxis.Query" Version="1.0.10" />
Expand Down
108 changes: 85 additions & 23 deletions PCAxis.Serializers/Parquet/ParquetBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
using System.Collections.Generic;
using System.Linq;

using Parquet.Rows;
using Parquet.Data;
using Parquet.Schema;

using PCAxis.Paxiom;
using PCAxis.Paxiom.Operations;

namespace PCAxis.Serializers
namespace PCAxis.Serializers.Parquet
{
public class ParquetBuilder
{
Expand Down Expand Up @@ -50,10 +50,10 @@ private static Dictionary<double, string> BuildDataSymbolMap(PXMeta meta)


/// <summary>
/// Populates the Parquet table based on the PXModel data and metadata.
/// Populates Parquet columns based on the PXModel data and metadata.
/// </summary>
/// <returns>The populated Parquet table.</returns>
public Table PopulateTable()
/// <returns>The populated Parquet data columns.</returns>
public DataColumn[] PopulateColumns()
{
int matrixSize = model.Data.MatrixColumnCount * model.Data.MatrixRowCount;
double[] data = new double[matrixSize];
Expand All @@ -63,50 +63,114 @@ public Table PopulateTable()
.Select(v => v.IsContentVariable && v.Values.Count > 1)
.ToArray();

// Generate one logical row index per output row.
var indices = GenerateDataPointIndices(variableValueCounts, isContentMulti);

for (int m = 0; m < matrixSize; m++)
{
data[m] = model.Data.ReadElement(m);
}

List<DataField> dataFields = CreateDataFields();
ParquetSchema schema = CreateSchema();
DataField[] dataFields = schema.GetDataFields();
Dictionary<string, int> fieldNameToColumnIndex = dataFields.Select((field, idx) => new { field.Name, idx })
.ToDictionary(x => x.Name, x => x.idx);

int rowCount = indices.Count;
// Keep an object buffer per column, then cast to typed arrays in one pass.
var columnBuffers = new object[dataFields.Length][];
for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
columnBuffers[columnIndex] = new object[rowCount];
}

// Build each row once and write values directly into their column buffers.
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
var row = PopulateRow(indices[rowIndex], dataFields.Length, variableValueCounts, data, fieldNameToColumnIndex);
for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
columnBuffers[columnIndex][rowIndex] = row[columnIndex];
}
}

var columns = new DataColumn[dataFields.Length];

// Convert object buffers to field-typed arrays required by DataColumn.
for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
Array typedValues = ConvertToTypedArray(dataFields[columnIndex], columnBuffers[columnIndex]);
columns[columnIndex] = new DataColumn(dataFields[columnIndex], typedValues);
}

return columns;
}

private static Array ConvertToTypedArray(DataField field, object[] values)
{
// DataColumn expects an Array with the exact CLR element type of the field.
Type clrType = field.ClrType;

if (clrType == typeof(double))
{
var result = new double[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = values[i] == null ? default : Convert.ToDouble(values[i]);
}

return result;
}

if (clrType == typeof(string))
{
var result = new string[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = values[i] as string;
}

var table = new Table(dataFields.ToArray());
return result;
}

foreach (var index in indices)
if (clrType == typeof(DateTime))
{
var row = PopulateRow(index, dataFields, variableValueCounts, data);
table.Add(row);
var result = new DateTime[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = values[i] == null ? default : (DateTime)values[i];
}

return result;
}

return table;
throw new NotSupportedException($"Unsupported Parquet field type '{clrType}'.");
}


/// <summary>
/// Creates the Parquet schema fields based on the PXModel metadata.
/// Creates the Parquet schema based on the PXModel metadata.
/// </summary>
/// <returns>The list of Parquet data fields.</returns>
private List<DataField> CreateDataFields()
/// <returns>The Parquet schema.</returns>
private ParquetSchema CreateSchema()
{
List<DataField> dataFields = new List<DataField>();
var dataFields = new List<DataField>();
int variableCount = model.Meta.Variables.Count;

for (int i = 0; i < variableCount; i++)
{
var variable = model.Meta.Variables[i];
if (variable.IsContentVariable && variable.Values.Count > 1)
{
ParquetBuilder.AddContentVariableFields(dataFields, variable);
AddContentVariableFields(dataFields, variable);
}
else
{
ParquetBuilder.AddNonContentVariableFields(dataFields, variable);
AddNonContentVariableFields(dataFields, variable);
}
}

return dataFields;
return new ParquetSchema(dataFields.Cast<Field>());
}


Expand Down Expand Up @@ -161,16 +225,14 @@ private static void AddNonContentVariableFields(List<DataField> dataFields, Vari
/// Populates a single row in the Parquet table based on the specified index and data.
/// </summary>
/// <param name="index">The index representing the position of the row in the PXModel data.</param>
/// <param name="dataFields">The list of Parquet data fields representing the schema.</param>
/// <param name="fieldCount">The number of fields in the row schema.</param>
/// <param name="variableValueCounts">The counts of values for each variable in the model.</param>
/// <param name="data">The array containing the PXModel data.</param>
/// <returns>The populated Parquet row.</returns>
private object[] PopulateRow(int[] index, List<DataField> dataFields, int[] variableValueCounts, double[] data)
private object[] PopulateRow(int[] index, int fieldCount, int[] variableValueCounts, double[] data, Dictionary<string, int> dataFieldIndices)
{
int variableCount = model.Meta.Variables.Count;
var row = new object[dataFields.Count];
Dictionary<string, int> dataFieldIndices = dataFields.Select((field, idx) => new { field.Name, idx })
.ToDictionary(x => x.Name, x => x.idx);
var row = new object[fieldCount];

for (int i = 0; i < variableCount; i++)
{
Expand Down
29 changes: 21 additions & 8 deletions PCAxis.Serializers/ParquetSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;

using Parquet;
using Parquet.Rows;
using Parquet.Data;
using Parquet.Schema;

using PCAxis.Paxiom;
using PCAxis.Serializers.Parquet;

namespace PCAxis.Serializers
{
Expand Down Expand Up @@ -34,19 +37,29 @@ public void Serialize(PXModel model, string path)
public void Serialize(PXModel model, Stream stream)
{
var pb = new ParquetBuilder(model);
var table = pb.PopulateTable();
WriteTableAsync(table, stream).GetAwaiter().GetResult();
var columns = pb.PopulateColumns();
WriteColumnsAsync(columns, stream).GetAwaiter().GetResult();
}

/// <summary>
/// Asynchronously writes the Parquet table to the specified stream.
/// Asynchronously writes Parquet data columns to the specified stream.
/// </summary>
/// <param name="table">The Parquet table to be written.</param>
/// <param name="stream">The stream to write the Parquet table.</param>
/// <param name="columns">The Parquet data columns to be written.</param>
/// <param name="stream">The stream to write the Parquet data.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
private static async Task WriteTableAsync(Table table, Stream stream)
private static async Task WriteColumnsAsync(DataColumn[] columns, Stream stream)
{
await table.WriteAsync(stream);
var schema = new ParquetSchema(columns.Select(column => column.Field).ToList());
using (var writer = await ParquetWriter.CreateAsync(schema, stream))
{
using (var rowGroupWriter = writer.CreateRowGroup())
{
foreach (var column in columns)
{
await rowGroupWriter.WriteColumnAsync(column);
}
}
}
}
}
}
96 changes: 65 additions & 31 deletions UnitTests/Parquet/ParquetSerializationIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

using Microsoft.VisualStudio.TestTools.UnitTesting;

using Parquet.Rows;
using Parquet;
using Parquet.Data;
using Parquet.Schema;

using PCAxis.Paxiom;
using PCAxis.Serializers;
Expand All @@ -32,7 +34,7 @@ public void TestInitialize()

[TestMethod, Description("Tests the serialization of PXModel to Parquet format and its correctness.")]
[DynamicData(nameof(GetPxFilePaths))]
public void ShouldSerializePxModel(string pxFile)
public async Task ShouldSerializePxModel(string pxFile)
{
var model = GetPxModelFromFile(pxFile);

Expand All @@ -41,47 +43,46 @@ public void ShouldSerializePxModel(string pxFile)

SerializePxModelToParquet(model, outputFile);

// Sync wrapper around async call
Table table = ReadBackParquetFileSync(outputFile);
var columns = await ReadBackParquetFile(outputFile);

// Assertion: Ensure that the table's row count equals the number of observations
// for a single ContentsCode. If the model has multiple contents, the serializer
// emits additional content columns rather than duplicating rows.
int contentCount = model.Meta.ContentVariable != null ? model.Meta.ContentVariable.Values.Count : 1;
int expectedRows = model.Data.MatrixSize / contentCount;
Assert.AreEqual(expectedRows, table.Count, $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet.");
Assert.HasCount(expectedRows, columns[0], $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet.");

// Assertion: Calculate the amount of columns we should have, based on the metadata
// Number of columns in meta, number of columns in table.

var numberOfColsInPx = CalculateNumberOfColumnsFromPxFile(model);
var numberOfColsInParq = table.Schema.DataFields.Length;

Assert.AreEqual(numberOfColsInParq, numberOfColsInPx, $"Mismatch in column number for {fileNameWithoutExtension}.parquet.");
int numberOfColsInParq = columns.Count;
Assert.AreEqual(numberOfColsInPx, numberOfColsInParq, $"Mismatch in column number for {fileNameWithoutExtension}.parquet.");
}

[TestMethod, Description("Tests correct ordering of time variable (pxfile: 16216.px)")]
[DeploymentItem("TestFiles/14216.px")]
public void TestTimeVariableOrdering()
public async Task TestTimeVariableOrdering()
{
var pxFile = "14216.px";
var model = GetPxModelFromFile(pxFile);
string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(pxFile);
string outputFile = Path.Combine(OutputDirectoryPath, $"{fileNameWithoutExtension}.parquet");
SerializePxModelToParquet(model, outputFile);
Table table = ReadBackParquetFileSync(outputFile);
var columns = await ReadBackParquetFile(outputFile);

Assert.AreEqual(2, table.Count, "Test number of rows");
int rowCount = columns.Count == 0 ? 0 : columns[0].Length;
Assert.AreEqual(2, rowCount, "Test number of rows");

Assert.AreEqual("0801", table[0].Values[0], "Test tettsted");
Assert.AreEqual("2025", table[0].Values[1], "Test year");
Assert.AreEqual(275.87, table[0].Values[3], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1110887"), table[0].Values[5], "Test ContentsCode_Bosatte");
Assert.AreEqual("0801", columns[0][0], "Test tettsted");
Assert.AreEqual("2025", columns[1][0], "Test year");
Assert.AreEqual(275.87, columns[3][0], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1110887"), columns[5][0], "Test ContentsCode_Bosatte");

Assert.AreEqual("0801", table[1].Values[0], "Test tettsted");
Assert.AreEqual("2024", table[1].Values[1], "Test year");
Assert.AreEqual(276.30, table[1].Values[3], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1098061"), table[1].Values[5], "Test ContentsCode_Bosatte");
Assert.AreEqual("0801", columns[0][1], "Test tettsted");
Assert.AreEqual("2024", columns[1][1], "Test year");
Assert.AreEqual(276.30, columns[3][1], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1098061"), columns[5][1], "Test ContentsCode_Bosatte");
}

private static int CalculateNumberOfColumnsFromPxFile(PXModel model)
Expand Down Expand Up @@ -110,15 +111,50 @@ private static int CalculateNumberOfColumnsFromPxFile(PXModel model)
return numberOfCols;
}

private static Task<Table> ReadBackParquetFileAsync(string parquetFile)
private static async Task<List<object[]>> ReadBackParquetFile(string parquetFile)
{
return Table.ReadAsync(parquetFile);
}
using Stream fs = File.OpenRead(parquetFile);
using ParquetReader reader = await ParquetReader.CreateAsync(fs);
ParquetSchema schema = reader.Schema;
DataField[] dataFields = schema.GetDataFields();
int totalRowCount = 0;

// First pass: count rows across all row groups so each column array can be preallocated.
for (int i = 0; i < reader.RowGroupCount; i++)
{
using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i);
totalRowCount += (int)rowGroupReader.RowCount;
}

// Synchronous wrapper around the asynchronous method
private static Table ReadBackParquetFileSync(string parquetFile)
{
return Task.Run(() => ReadBackParquetFileAsync(parquetFile)).Result;
var columns = new List<object[]>(dataFields.Length);
// Allocate one dense array per column to avoid per-value list growth/conversion.
for (int fieldIndex = 0; fieldIndex < dataFields.Length; fieldIndex++)
{
columns.Add(new object[totalRowCount]);
}

int rowOffset = 0;

// Second pass: read each row group and copy values into final column arrays.
for (int i = 0; i < reader.RowGroupCount; i++)
{
using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i);
int rowCount = (int)rowGroupReader.RowCount;

for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
DataColumn columnData = await rowGroupReader.ReadColumnAsync(dataFields[columnIndex]);
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
columns[columnIndex][rowOffset + rowIndex] = columnData.Data.GetValue(rowIndex);
}
}

// Move the write window to the next row-group segment.
rowOffset += rowCount;
}

return columns;
}

private static PXModel GetPxModelFromFile(string pxFile)
Expand All @@ -133,11 +169,9 @@ private static PXModel GetPxModelFromFile(string pxFile)

private static void SerializePxModelToParquet(PXModel model, string outputPath)
{
using (FileStream stream = new FileStream(outputPath, FileMode.Create))
{
var parquetSer = new ParquetSerializer();
parquetSer.Serialize(model, stream);
}
using FileStream stream = new(outputPath, FileMode.Create);
var parquetSer = new ParquetSerializer();
parquetSer.Serialize(model, stream);
}

public static IEnumerable<object[]> GetPxFilePaths()
Expand Down
Loading
Loading