Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
#include "Framework/Traits.h"

#include <string>
namespace o2::framework {
namespace o2::framework
{
std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors);
std::string serializeSchema(std::shared_ptr<arrow::Schema>& schema);
}
} // namespace o2::framework

namespace o2::soa
{
Expand Down
33 changes: 14 additions & 19 deletions Framework/Core/src/AODReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ auto make_spawn(InputSpec const& input, ProcessingContext& pc)
return o2::framework::spawner<D>(extractOriginals<sources.size(), sources>(pc), input.binding.c_str(), projectors.data(), projector, schema);
}

struct Maker
{
struct Maker {
std::string binding;
std::vector<std::string> labels;
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
Expand Down Expand Up @@ -169,7 +168,6 @@ struct Maker

return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector);
}

};

struct Spawnable {
Expand All @@ -185,17 +183,17 @@ struct Spawnable {
header::DataHeader::SubSpecificationType version;

Spawnable(InputSpec const& spec)
: binding{spec.binding}
: binding{spec.binding}
{
auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
origin = origin_;
description = description_;
version = version_;
auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps){ return cps.name.compare("projectors") == 0; });
auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; });
std::stringstream iws(loc->defaultValue.get<std::string>());
projectors = ExpressionJSONHelpers::read(iws);

loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps){ return cps.name.compare("schema") == 0; });
loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; });
iws.clear();
iws.str(loc->defaultValue.get<std::string>());
outputSchema = ArrowJSONHelpers::read(iws);
Expand All @@ -209,14 +207,14 @@ struct Spawnable {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto& p : projectors) {
expressions::walk(p.node.get(),
[&fields](expressions::Node* n) mutable {
if (n->self.index() == 1) {
auto& b = std::get<expressions::BindingNode>(n->self);
if ( std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr<arrow::Field> const& field){ return field->name() == b.name; }) == fields.end() ) {
fields.emplace_back(std::make_shared<arrow::Field>(b.name, expressions::concreteArrowType(b.type)));
}
}
});
[&fields](expressions::Node* n) mutable {
if (n->self.index() == 1) {
auto& b = std::get<expressions::BindingNode>(n->self);
if (std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr<arrow::Field> const& field) { return field->name() == b.name; }) == fields.end()) {
fields.emplace_back(std::make_shared<arrow::Field>(b.name, expressions::concreteArrowType(b.type)));
}
}
});
}
inputSchema = std::make_shared<arrow::Schema>(fields);

Expand All @@ -227,8 +225,7 @@ struct Spawnable {
expressions::createExpressionTree(
expressions::createOperations(p),
inputSchema),
outputSchema->field(i))
);
outputSchema->field(i)));
++i;
}
}
Expand All @@ -248,10 +245,8 @@ struct Spawnable {
outputSchema,
origin,
description,
version
};
version};
}

};

} // namespace
Expand Down
18 changes: 10 additions & 8 deletions Framework/Core/src/ExpressionJSONHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, std::vector<o2
w.EndObject();
}

namespace {
namespace
{
struct SchemaReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, SchemaReader> {
using Ch = rapidjson::UTF8<>::Ch;
using SizeType = rapidjson::SizeType;
Expand Down Expand Up @@ -679,7 +680,7 @@ struct SchemaReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, Sch
{
debug << "Ending array" << std::endl;
if (states.top() == State::IN_LIST) {
//finalize schema
// finalize schema
schema = std::make_shared<arrow::Schema>(fields);
states.pop();
return true;
Expand Down Expand Up @@ -773,13 +774,13 @@ struct SchemaReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, Sch
return false;
}

bool Int(int i) {
bool Int(int i)
{
debug << "Int(" << i << ")" << std::endl;
return Uint(i);
}

};
}
} // namespace

std::shared_ptr<arrow::Schema> o2::framework::ArrowJSONHelpers::read(std::istream& s)
{
Expand All @@ -789,13 +790,14 @@ std::shared_ptr<arrow::Schema> o2::framework::ArrowJSONHelpers::read(std::istrea

bool ok = reader.Parse(isw, sreader);

if(!ok) {
if (!ok) {
throw framework::runtime_error_f("Cannot parse serialized Expression, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset());
}
return sreader.schema;
}

namespace {
namespace
{
void writeSchema(rapidjson::Writer<rapidjson::OStreamWrapper>& w, arrow::Schema* schema)
{
for (auto& f : schema->fields()) {
Expand All @@ -807,7 +809,7 @@ void writeSchema(rapidjson::Writer<rapidjson::OStreamWrapper>& w, arrow::Schema*
w.EndObject();
}
}
}
} // namespace

void o2::framework::ArrowJSONHelpers::write(std::ostream& o, std::shared_ptr<arrow::Schema>& schema)
{
Expand Down