From 1477ac7d18b7f152fbad23d73aed7a50e8f2be47 Mon Sep 17 00:00:00 2001 From: colin Date: Fri, 18 Apr 2025 00:38:08 +0800 Subject: [PATCH 1/5] add batch api for tablet. --- cpp/src/common/allocator/my_string.h | 13 +++ cpp/src/common/container/bit_map.h | 4 + cpp/src/common/tablet.cc | 20 ++++- cpp/src/common/tablet.h | 113 ++++++++++++++++++++------- cpp/test/common/tablet_test.cc | 33 ++++++++ 5 files changed, 153 insertions(+), 30 deletions(-) diff --git a/cpp/src/common/allocator/my_string.h b/cpp/src/common/allocator/my_string.h index 9f5d8a5a0..a236bb6ca 100644 --- a/cpp/src/common/allocator/my_string.h +++ b/cpp/src/common/allocator/my_string.h @@ -60,6 +60,19 @@ struct String { return common::E_OK; } + FORCE_INLINE int dup_from(const char* str, common::PageArena &pa) { + len_ = strlen(str); + if (UNLIKELY(len_ == 0)) { + return common::E_OK; + } + buf_ = pa.alloc(len_); + if (IS_NULL(buf_)) { + return common::E_OOM; + } + memcpy(buf_, str, len_); + return common::E_OK; + } + FORCE_INLINE bool operator==(const String &other) const { return equal_to(other); } diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index 9d0367449..c2f8f4e59 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -55,6 +55,10 @@ class BitMap { *start_addr = (*start_addr) & (~bit_mask); } + FORCE_INLINE void set_zero() { + memset(bitmap_, 0x00, size_); + } + FORCE_INLINE bool test(uint32_t index) { uint32_t offset = index >> 3; ASSERT(offset < size_); diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index ac4a2708b..86e575999 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -21,8 +21,6 @@ #include -#include "utils/errno_define.h" - using namespace common; namespace storage { @@ -285,6 +283,20 @@ int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, return add_value(row_index, measurement_name, String(val)); } +template <> +int Tablet::set_batch_data(uint32_t col_index, char **data) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_SCHEMA; + } + + for (int i = 0; i < max_row_num_; i++) { + value_matrix_[col_index].string_data->dup_from(data[i], + page_arena_); + } + bitmaps_[col_index].set_zero(); + return common::E_OK; +} + template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, bool val); template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, @@ -323,6 +335,10 @@ void Tablet::set_column_categories( } } +void Tablet::set_null_value(uint32_t col_index, uint32_t row_index) { + bitmaps_[col_index].set(row_index); +} + std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; id_array.push_back(insert_target_name_); diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index e69d477e1..019267892 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -29,6 +29,7 @@ #include "common/db_common.h" #include "device_id.h" #include "schema.h" +#include "utils/errno_define.h" namespace storage { @@ -38,10 +39,12 @@ class TabletRowIterator; class TabletColIterator; /** - * @brief Represents a collection of data rows with associated metadata for insertion into a table. + * @brief Represents a collection of data rows with associated metadata for + * insertion into a table. * - * This class is used to manage and organize data that will be inserted into a specific target table. - * It handles the storage of timestamps and values, along with their associated metadata such as column names and types. + * This class is used to manage and organize data that will be inserted into a + * specific target table. It handles the storage of timestamps and values, along + * with their associated metadata such as column names and types. */ class Tablet { struct ValueMatrixEntry { @@ -105,7 +108,8 @@ class Tablet { [](const std::string &name, common::TSDataType type) { return MeasurementSchema(name, type); }); - schema_vec_ = std::make_shared>(measurement_vec); + schema_vec_ = + std::make_shared>(measurement_vec); init(); } @@ -123,7 +127,8 @@ class Tablet { schema_vec_ = std::make_shared>(); for (size_t i = 0; i < column_names.size(); i++) { schema_vec_->emplace_back( - MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]), + MeasurementSchema(column_names[i], data_types[i], + common::get_value_encoder(data_types[i]), common::get_default_compressor())); } set_column_categories(column_categories); @@ -133,24 +138,26 @@ class Tablet { /** * @brief Constructs a Tablet object with the given parameters. * - * @param column_names A vector containing the names of the columns in the tablet. - * Each name corresponds to a column in the target table. + * @param column_names A vector containing the names of the columns in the + * tablet. Each name corresponds to a column in the target table. * @param data_types A vector containing the data types of each column. * These must match the schema of the target table. - * @param max_rows The maximum number of rows that this tablet can hold. Defaults to DEFAULT_MAX_ROWS. + * @param max_rows The maximum number of rows that this tablet can hold. + * Defaults to DEFAULT_MAX_ROWS. */ Tablet(const std::vector &column_names, - const std::vector &data_types, - uint32_t max_rows = DEFAULT_MAX_ROWS) - : max_row_num_(max_rows), - cur_row_size_(0), - timestamps_(nullptr), - value_matrix_(nullptr), - bitmaps_(nullptr) { + const std::vector &data_types, + uint32_t max_rows = DEFAULT_MAX_ROWS) + : max_row_num_(max_rows), + cur_row_size_(0), + timestamps_(nullptr), + value_matrix_(nullptr), + bitmaps_(nullptr) { schema_vec_ = std::make_shared>(); for (size_t i = 0; i < column_names.size(); i++) { schema_vec_->emplace_back( - MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]), + MeasurementSchema(column_names[i], data_types[i], + common::get_value_encoder(data_types[i]), common::get_default_compressor())); } init(); @@ -158,9 +165,7 @@ class Tablet { ~Tablet() { destroy(); } - const std::string& get_table_name() const{ - return insert_target_name_; - } + const std::string &get_table_name() const { return insert_target_name_; } void set_table_name(const std::string &table_name) { insert_target_name_ = table_name; } @@ -170,8 +175,8 @@ class Tablet { /** * @brief Adds a timestamp to the specified row. * - * @param row_index The index of the row to which the timestamp will be added. - * Must be less than the maximum number of rows. + * @param row_index The index of the row to which the timestamp will be + * added. Must be less than the maximum number of rows. * @param timestamp The timestamp value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -180,12 +185,14 @@ class Tablet { void *get_value(int row_index, uint32_t schema_index, common::TSDataType &data_type) const; /** - * @brief Template function to add a value of type T to the specified row and column. + * @brief Template function to add a value of type T to the specified row + * and column. * * @tparam T The type of the value to add. * @param row_index The index of the row to which the value will be added. * Must be less than the maximum number of rows. - * @param schema_index The index of the column schema corresponding to the value being added. + * @param schema_index The index of the column schema corresponding to the + * value being added. * @param val The value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -196,13 +203,14 @@ class Tablet { const std::vector &column_categories); std::shared_ptr get_device_id(int i) const; /** - * @brief Template function to add a value of type T to the specified row and column by name. + * @brief Template function to add a value of type T to the specified row + * and column by name. * * @tparam T The type of the value to add. * @param row_index The index of the row to which the value will be added. * Must be less than the maximum number of rows. - * @param measurement_name The name of the column to which the value will be added. - * Must match one of the column names provided during construction. + * @param measurement_name The name of the column to which the value will be + * added. Must match one of the column names provided during construction. * @param val The value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -210,7 +218,8 @@ class Tablet { int add_value(uint32_t row_index, const std::string &measurement_name, T val); - FORCE_INLINE const std::string &get_column_name(uint32_t column_index) const { + FORCE_INLINE const std::string &get_column_name( + uint32_t column_index) const { return schema_vec_->at(column_index).measurement_name_; } @@ -218,7 +227,7 @@ class Tablet { schema_vec_->at(column_index).measurement_name_ = name; } - const std::map& get_schema_map() const { + const std::map &get_schema_map() const { return schema_map_; } @@ -226,6 +235,54 @@ class Tablet { schema_map_ = schema_map; } + template + int set_batch_data(uint32_t col_index, T *data) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_ARG; + } + auto schema = schema_vec_->at(col_index); + switch (schema.data_type_) { + case common::BOOLEAN: + memcpy(value_matrix_[col_index].bool_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::INT32: + memcpy(value_matrix_[col_index].int32_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::INT64: + memcpy(value_matrix_[col_index].int64_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::FLOAT: + memcpy(value_matrix_[col_index].float_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::DOUBLE: + memcpy(value_matrix_[col_index].double_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + default:; + } + bitmaps_[col_index].set_zero(); + return common::E_OK; + } + + int set_batch_data(uint32_t col_index, char **data) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_SCHEMA; + } + + for (int i = 0; i < max_row_num_; i++) { + value_matrix_[col_index].string_data[i].dup_from(data[i], + page_arena_); + } + bitmaps_[col_index].set_zero(); + return common::E_OK; + } + + void set_null_value(uint32_t col_index, uint32_t row_index); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc index 71863f0c7..fe8a54950 100644 --- a/cpp/test/common/tablet_test.cc +++ b/cpp/test/common/tablet_test.cc @@ -61,4 +61,37 @@ TEST(TabletTest, LargeQuantities) { EXPECT_EQ(tablet.get_column_count(), schema_vec.size()); } +TEST(TabletTest, TabletBatchReadWrite) { + std::vector column_names = { + "id1", "id2", "id3", "id4","id5","id6" + }; + std::vector datatypes = { + common::TSDataType::BOOLEAN, common::TSDataType::INT32, + common::TSDataType::INT64, common::TSDataType::FLOAT, + common::TSDataType::DOUBLE, common::TSDataType::STRING + }; + Tablet tablet(column_names, datatypes, 100); + bool bool_vec[100] = {false}; + bool_vec[10] = true; + + common::TSDataType datatype; + tablet.set_batch_data(0, bool_vec); + ASSERT_TRUE(*(bool*)(tablet.get_value(10, 0, datatype))); + ASSERT_EQ(common::TSDataType::BOOLEAN, datatype); + int32_t i32_vec[100] = {false}; + i32_vec[99] = 123; + tablet.set_batch_data(1, i32_vec); + ASSERT_EQ(0, *(int32_t *)(tablet.get_value(10, 1, datatype))); + ASSERT_EQ(123, *(int32_t *)(tablet.get_value(99, 1, datatype))); + char** str = (char**) malloc(100 * sizeof(char*)); + for (int i = 0; i < 100; i++) { + str[i] = strdup(std::string("val" + std::to_string(i)).c_str()); + } + tablet.set_batch_data(5, str); + ASSERT_EQ(common::String("val10"), *(common::String*)tablet.get_value(10, 5, datatype)); + + tablet.set_null_value(5, 20); + ASSERT_EQ(nullptr, tablet.get_value(20, 5, datatype)); +} + } // namespace storage \ No newline at end of file From 065b255a42e99f991d3ed2f083257a9d6608f818 Mon Sep 17 00:00:00 2001 From: colin Date: Fri, 18 Apr 2025 13:50:16 +0800 Subject: [PATCH 2/5] add tmp code. --- cpp/src/common/tablet.cc | 10 +++++++++- cpp/src/common/tablet.h | 2 +- cpp/src/cwrapper/tsfile_cwrapper.cc | 23 +++++++++++++++++++++++ cpp/src/cwrapper/tsfile_cwrapper.h | 4 ++++ python/tsfile/tsfile_cpp.pxd | 4 ++++ 5 files changed, 41 insertions(+), 2 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 86e575999..a5a827295 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -335,8 +335,16 @@ void Tablet::set_column_categories( } } -void Tablet::set_null_value(uint32_t col_index, uint32_t row_index) { +int Tablet::set_null_value(uint32_t col_index, uint32_t row_index) { + if (col_index < 0 || col_index >= schema_vec_->size()) { + return common::E_INVALID_ARG; + } + + if (row_index < 0 || row_index >= max_row_num_) { + return common::E_INVALID_ARG; + } bitmaps_[col_index].set(row_index); + return common::E_OK; } std::shared_ptr Tablet::get_device_id(int i) const { diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 019267892..aea3ddd55 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -281,7 +281,7 @@ class Tablet { return common::E_OK; } - void set_null_value(uint32_t col_index, uint32_t row_index); + int set_null_value(uint32_t col_index, uint32_t row_index); friend class TabletColIterator; friend class TsFileWriter; diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 371e8ced4..ce61a8611 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -667,6 +667,29 @@ ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { return ret; } +ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, + const void *data) { + auto tab = static_cast(tablet); + int ret = 0; + ret = tab->set_batch_data(col_index, data); + return ret; +} + +ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, + const char **data) { + auto tab = static_cast(tablet); + int ret = 0; + ret = tab->set_batch_data(col_index, data); + return ret; +} + +ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, + uint32_t col_index) { + auto tab = static_cast(tablet); + int ret = tab->set_null_value(col_index, row_index); + return ret; +} + ERRNO _tsfile_writer_close(TsFileWriter writer) { auto *w = static_cast(writer); int ret = w->flush(); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index d43e5dce0..bc7c55b56 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -549,6 +549,10 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); +ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data); +ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data); +ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index); + // Write a tablet into a device. ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index b4f9ccf86..1fea5da16 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -158,6 +158,10 @@ cdef extern from "./tsfile_cwrapper.h": void _free_tsfile_ts_record(TsRecord * record); + ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void * data); + ErrorCode _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data); + ErrorCode _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index); + # resulSet : query data from tsfile reader ResultSet tsfile_query_table(TsFileReader reader, const char * table_name, From 81a83ab7f85a0e7e6c921437d861097cfb107abe Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 25 Apr 2025 15:18:22 +0800 Subject: [PATCH 3/5] add batch read. --- cpp/src/common/container/bit_map.h | 5 + cpp/src/common/tablet.cc | 10 +- cpp/src/common/tablet.h | 23 ++--- cpp/src/cwrapper/tsfile_cwrapper.cc | 15 ++- cpp/src/cwrapper/tsfile_cwrapper.h | 3 +- cpp/test/common/tablet_test.cc | 13 ++- python/setup.py | 2 +- python/tsfile/tsfile_cpp.pxd | 6 +- python/tsfile/tsfile_writer.pyx | 136 ++++++++++++++++++++++++++++ 9 files changed, 185 insertions(+), 28 deletions(-) diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index c2f8f4e59..5a2b2f265 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -59,6 +59,11 @@ class BitMap { memset(bitmap_, 0x00, size_); } + FORCE_INLINE void set_bitmap(char* bitmap) { + memcpy(bitmap_, bitmap, size_); + } + + FORCE_INLINE bool test(uint32_t index) { uint32_t offset = index >> 3; ASSERT(offset < size_); diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index a5a827295..73c761c1a 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -284,16 +284,18 @@ int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, } template <> -int Tablet::set_batch_data(uint32_t col_index, char **data) { +int Tablet::set_batch_data(uint32_t col_index, char **data, char *mask) { if (col_index > schema_vec_->size()) { return common::E_INVALID_SCHEMA; } for (int i = 0; i < max_row_num_; i++) { - value_matrix_[col_index].string_data->dup_from(data[i], - page_arena_); + if (data[i] != nullptr) { + value_matrix_[col_index].string_data[i].dup_from(data[i], + page_arena_); + bitmaps_[col_index].clear(i); + } } - bitmaps_[col_index].set_zero(); return common::E_OK; } diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index aea3ddd55..20eb95c0c 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -172,6 +172,7 @@ class Tablet { size_t get_column_count() const { return schema_vec_->size(); } uint32_t get_cur_row_size() const { return cur_row_size_; } + uint32_t get_max_row_size() const { return max_row_num_; } /** * @brief Adds a timestamp to the specified row. * @@ -236,7 +237,7 @@ class Tablet { } template - int set_batch_data(uint32_t col_index, T *data) { + int set_batch_data(uint32_t col_index, T *data, char *mask) { if (col_index > schema_vec_->size()) { return common::E_INVALID_ARG; } @@ -264,23 +265,19 @@ class Tablet { break; default:; } - bitmaps_[col_index].set_zero(); - return common::E_OK; - } - int set_batch_data(uint32_t col_index, char **data) { - if (col_index > schema_vec_->size()) { - return common::E_INVALID_SCHEMA; + int size = (max_row_num_ + 7) / 8; + for (int i = 0; i < size; i++) { + mask[i] = ~mask[i]; } - - for (int i = 0; i < max_row_num_; i++) { - value_matrix_[col_index].string_data[i].dup_from(data[i], - page_arena_); - } - bitmaps_[col_index].set_zero(); + bitmaps_[col_index].set_bitmap(mask); return common::E_OK; } + void set_batch_timestamp(int64_t* timestamp) { + memcpy(timestamps_, timestamp, max_row_num_); + } + int set_null_value(uint32_t col_index, uint32_t row_index); friend class TabletColIterator; diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index ce61a8611..4df05f6a0 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -668,10 +668,10 @@ ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { } ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, - const void *data) { + const void *data, char* mask) { auto tab = static_cast(tablet); int ret = 0; - ret = tab->set_batch_data(col_index, data); + ret = tab->set_batch_data(col_index, data, mask); return ret; } @@ -679,8 +679,17 @@ ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char **data) { auto tab = static_cast(tablet); int ret = 0; - ret = tab->set_batch_data(col_index, data); + ret = tab->set_batch_data(col_index, data, nullptr); return ret; +}; + +ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num) { + auto tab = static_cast(tablet); + if (max_row_num != tab->get_max_row_size()) { + return common::E_INVALID_ARG; + } + tab->set_batch_timestamp(timestamp); + return common::E_OK; } ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index bc7c55b56..ffa135628 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -549,8 +549,9 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); -ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data); +ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data, char* mask); ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data); +ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num); ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index); // Write a tablet into a device. diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc index fe8a54950..d2ce87d7f 100644 --- a/cpp/test/common/tablet_test.cc +++ b/cpp/test/common/tablet_test.cc @@ -75,19 +75,26 @@ TEST(TabletTest, TabletBatchReadWrite) { bool_vec[10] = true; common::TSDataType datatype; - tablet.set_batch_data(0, bool_vec); + char* mask = new char[(100 + 7)/8]; + for (int i = 0; i < (100 + 7)/8; i++) { + mask[i] = 0xff; + } + tablet.set_batch_data(0, bool_vec, mask); ASSERT_TRUE(*(bool*)(tablet.get_value(10, 0, datatype))); ASSERT_EQ(common::TSDataType::BOOLEAN, datatype); int32_t i32_vec[100] = {false}; i32_vec[99] = 123; - tablet.set_batch_data(1, i32_vec); + for (int i = 0; i < (100 + 7)/8; i++) { + mask[i] = 0xff; + } + tablet.set_batch_data(1, i32_vec, mask); ASSERT_EQ(0, *(int32_t *)(tablet.get_value(10, 1, datatype))); ASSERT_EQ(123, *(int32_t *)(tablet.get_value(99, 1, datatype))); char** str = (char**) malloc(100 * sizeof(char*)); for (int i = 0; i < 100; i++) { str[i] = strdup(std::string("val" + std::to_string(i)).c_str()); } - tablet.set_batch_data(5, str); + tablet.set_batch_data(5, str, nullptr); ASSERT_EQ(common::String("val10"), *(common::String*)tablet.get_value(10, 5, datatype)); tablet.set_null_value(5, 20); diff --git a/python/setup.py b/python/setup.py index 6edeea0b9..84727278d 100644 --- a/python/setup.py +++ b/python/setup.py @@ -24,7 +24,7 @@ import shutil import os -version = "2.1.0.dev0" +version = "2.1.0.dev" system = platform.system() def copy_tsfile_lib(source_dir, target_dir, suffix): diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 1fea5da16..b65585cce 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -130,7 +130,7 @@ cdef extern from "./tsfile_cwrapper.h": TSDataType * data_types, int column_num, int max_rows); - Tablet tablet_new(const char** column_names, TSDataType * data_types, int column_num); + Tablet tablet_new(const char** column_names, TSDataType * data_types, int column_num, uint32_t max_rows); ErrorCode tablet_add_timestamp(Tablet tablet, uint32_t row_index, int64_t timestamp); ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t row_index, uint32_t column_index, @@ -158,9 +158,9 @@ cdef extern from "./tsfile_cwrapper.h": void _free_tsfile_ts_record(TsRecord * record); - ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void * data); + ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data, char* mask); ErrorCode _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data); - ErrorCode _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index); + ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num); # resulSet : query data from tsfile reader ResultSet tsfile_query_table(TsFileReader reader, diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx index a670e5cb9..64cab0a06 100644 --- a/python/tsfile/tsfile_writer.pyx +++ b/python/tsfile/tsfile_writer.pyx @@ -17,6 +17,11 @@ # #cython: language_level=3 +import pandas as pd +import numpy as np + +from libc.stdlib cimport free +from libc.stdlib cimport malloc from .tsfile_cpp cimport * from .tsfile_py_cpp cimport * @@ -25,6 +30,137 @@ from tsfile.row_record import RowRecord from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema as DeviceSchemaPy from tsfile.schema import TableSchema as TableSchemaPy from tsfile.tablet import Tablet as TabletPy +from tsfile.constants import TSDataType as TSDataTypePy + +_pandas_dtype_to_ts = { + "bool": TSDataTypePy.BOOLEAN, + "int32": TSDataTypePy.INT32, + "int64": TSDataTypePy.INT64, + "float32": TSDataTypePy.FLOAT, + "float64": TSDataTypePy.DOUBLE, + "string": TSDataTypePy.STRING +} + +cdef bint is_compatible(TSDataTypePy expected, TSDataTypePy actual): + if expected == actual: + return True + if expected == TSDataTypePy.INT64 and actual == TSDataTypePy.INT32: + return True + if expected == TSDataTypePy.DOUBLE and actual == TSDataTypePy.FLOAT: + return True + return False + +def convert_series(pd.Series series, TSDataTypePy target) -> np.ndarray: + dtype_map = { + TSDataTypePy.INT64: "int64", + TSDataTypePy.INT32: "int32", + TSDataTypePy.FLOAT: "float32", + TSDataTypePy.DOUBLE: "float64", + TSDataTypePy.BOOLEAN: "bool", + TSDataTypePy.STRING: "str", + } + + target_str = dtype_map.get(target) + if str(series.dtype) == target_str: + return series.to_numpy() + return series.astype(target_str).to_numpy() +def encode_or_null(x): + if pd.isna(x): + return None + return str(x).encode('utf-8') + +cdef class CTablet: + cdef Tablet tablet + cdef object column_name + cdef object data_type + cdef object max_row_num + cdef char** column_names + cdef int column_num + cdef TSDataType * column_data_types + + def __init__(self, column_name: list[str], data_types: list[TSDataTypePy], max_row_num: int = 1024): + + self.column_name = column_name + self.data_type = data_types + self.max_row_num = max_row_num + column_num = len(column_name) + if len(data_types) != column_num: + raise ValueError("Length of column_name and data_types must be equal") + column_names = malloc(sizeof(char*) * column_num) + column_data_types = malloc(sizeof(TSDataType) * column_num) + + ind = 0 + for name, dtype in zip(column_name, data_types): + column_names[ind] = strdup(name.encode('utf-8')) + column_data_types[ind] = to_c_data_type(dtype) + + + + cdef init_c_tablet(self): + if self.tablet != NULL: + free_tablet(self.tablet) + tablet_new(self.column_names, self.column_data_types, self.column_num, self.max_row_num) + + cpdef from_data_frame(self, data_frame: pd.DataFrame): + cdef void * data_ptr + cdef const uint32_t * mask_ptr + cdef size_t length + cdef char** str_ptr + cdef bytes item + if not isinstance(data_frame, pd.DataFrame): + raise TypeError("Input must be a pandas DataFrame") + if data_frame.shape[1] != len(self.column_name) + 1: + raise ValueError(f"DataFrame column count {data_frame.shape[1]} doesn't match expected {len(self.column_name) + 1}") + + if "time" not in data_frame.columns: + raise ValueError("Missing required column: 'time'") + if not pd.api.types.is_integer_dtype(data_frame["time"]): + raise TypeError("Column 'time' must be of integer type") + if data_frame["time"].dtype != np.int64: + raise TypeError(f"Column 'time' must be int64, but got {data_frame['time'].dtype}") + + self.init_c_tablet() + + data_ptr = data_frame["time"].to_numpy().data + _tablet_set_batch_timestamp(self.tablet, data_ptr) + + for i, col_name in enumerate(self.column_name): + if col_name not in data_frame.columns: + raise KeyError(f"Column '{col_name}' missing from DataFrame") + series = data_frame[col_name] + dtype_str = str(series.dtype) + if dtype_str not in _pandas_dtype_to_ts: + raise TypeError(f"Unsupported pandas dtype {dtype_str} for column {col_name}") + actual_ts_type = _pandas_dtype_to_ts[dtype_str] + expected_ts_type = self.data_type[i] + if not is_compatible(expected_ts_type, actual_ts_type): + raise TypeError( + f"Column '{col_name}' type mismatch: expected {expected_ts_type.name}, got {actual_ts_type.name}") + + if expected_ts_type == TSDataTypePy.STRING: + str_ptr = malloc(sizeof(char*) * self.max_row_num) + array = series.fillna("").astype(str).apply(encode_or_null).to_numpy(dtype=object) + for i in range(self.max_row_num): + if array[i] is None: + str_ptr[i] = NULL + else: + str_ptr[i] = array[i] + _tablet_set_batch_str(self.tablet, i, str_ptr) + for i in range(self.max_row_num): + if str_ptr[i] != NULL: + free(str_ptr[i]) + free(str_ptr) + else: + array = convert_series(series, expected_ts_type) + mask = series.notna.to_numpy().astype(np.byte) + data_ptr = array.data + mask_ptr = mask.data + + _tablet_set_batch_data(self.tablet, i, data_ptr, mask_ptr) + + + + cdef class TsFileWriterPy: cdef TsFileWriter writer From 305672a89625705e6e2c6f8b60b5573f2dc56625 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 7 May 2025 13:39:01 +0800 Subject: [PATCH 4/5] fix some err. --- cpp/src/common/tablet.cc | 3 +-- cpp/src/common/tablet.h | 1 + cpp/test/common/tablet_test.cc | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 73c761c1a..c216e1346 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -283,8 +283,7 @@ int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, return add_value(row_index, measurement_name, String(val)); } -template <> -int Tablet::set_batch_data(uint32_t col_index, char **data, char *mask) { +int Tablet::set_batch_data_char(uint32_t col_index, char **data) { if (col_index > schema_vec_->size()) { return common::E_INVALID_SCHEMA; } diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 20eb95c0c..31e82bf86 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -236,6 +236,7 @@ class Tablet { schema_map_ = schema_map; } + int set_batch_data_char(uint32_t col_index, char **data); template int set_batch_data(uint32_t col_index, T *data, char *mask) { if (col_index > schema_vec_->size()) { diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc index d2ce87d7f..57de63ca6 100644 --- a/cpp/test/common/tablet_test.cc +++ b/cpp/test/common/tablet_test.cc @@ -94,11 +94,16 @@ TEST(TabletTest, TabletBatchReadWrite) { for (int i = 0; i < 100; i++) { str[i] = strdup(std::string("val" + std::to_string(i)).c_str()); } - tablet.set_batch_data(5, str, nullptr); + tablet.set_batch_data_char(5, str); ASSERT_EQ(common::String("val10"), *(common::String*)tablet.get_value(10, 5, datatype)); tablet.set_null_value(5, 20); ASSERT_EQ(nullptr, tablet.get_value(20, 5, datatype)); + for (int i = 0; i < 100; i++) { + free(str[i]); + } + free(str); + delete [] mask; } } // namespace storage \ No newline at end of file From 9f5106b05019484610ba73c4bd4dd9695d12cffa Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 9 May 2025 15:44:36 +0800 Subject: [PATCH 5/5] add batch write for tsfile. --- cpp/src/common/device_id.h | 2 + cpp/src/common/tablet.cc | 4 ++ cpp/src/common/tablet.h | 1 + cpp/src/cwrapper/tsfile_cwrapper.cc | 18 +++--- cpp/src/cwrapper/tsfile_cwrapper.h | 8 +-- python/tsfile/__init__.py | 3 +- python/tsfile/tsfile_cpp.pxd | 3 +- python/tsfile/tsfile_table_writer.py | 9 ++- python/tsfile/tsfile_writer.pyx | 90 +++++++++++++++++++--------- 9 files changed, 95 insertions(+), 43 deletions(-) diff --git a/cpp/src/common/device_id.h b/cpp/src/common/device_id.h index 021cb6aaf..071d6f249 100644 --- a/cpp/src/common/device_id.h +++ b/cpp/src/common/device_id.h @@ -67,6 +67,8 @@ class StringArrayDeviceID : public IDeviceID { public: explicit StringArrayDeviceID(const std::vector& segments) : segments_(formalize(segments)) {} + StringArrayDeviceID(const std::vector& segments, bool fast) + :segments_(segments) {} explicit StringArrayDeviceID(const std::string& device_id_string) : segments_(split_device_id_string(device_id_string)) {} diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index c216e1346..94f22db65 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -350,6 +350,7 @@ int Tablet::set_null_value(uint32_t col_index, uint32_t row_index) { std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; + id_array.reserve(id_column_indexes_.size() + 1); id_array.push_back(insert_target_name_); for (auto id_column_idx : id_column_indexes_) { common::TSDataType data_type = INVALID_DATATYPE; @@ -364,6 +365,9 @@ std::shared_ptr Tablet::get_device_id(int i) const { break; } } + if (id_array.size() == id_column_indexes_.size() + 1) { + return std::make_shared(id_array, true); + } return std::make_shared(id_array); } diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 31e82bf86..c7c5db47f 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -277,6 +277,7 @@ class Tablet { void set_batch_timestamp(int64_t* timestamp) { memcpy(timestamps_, timestamp, max_row_num_); + cur_row_size_ = max_row_num_; } int set_null_value(uint32_t col_index, uint32_t row_index); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 4df05f6a0..b0088e9bd 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -666,28 +666,28 @@ ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { const int ret = w->write_record(*record); return ret; } +void _tablet_set_target_name(Tablet tablet, char *target_name) { + auto tab = static_cast(tablet); + tab->set_table_name(target_name); +} -ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, - const void *data, char* mask) { +ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void *data, + char *mask) { auto tab = static_cast(tablet); int ret = 0; ret = tab->set_batch_data(col_index, data, mask); return ret; } -ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, - const char **data) { +ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char **data) { auto tab = static_cast(tablet); int ret = 0; - ret = tab->set_batch_data(col_index, data, nullptr); + ret = tab->set_batch_data_char(col_index, data); return ret; }; -ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num) { +ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t *timestamp) { auto tab = static_cast(tablet); - if (max_row_num != tab->get_max_row_size()) { - return common::E_INVALID_ARG; - } tab->set_batch_timestamp(timestamp); return common::E_OK; } diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index ffa135628..b01cc6a3d 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -548,10 +548,10 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(int64_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); - -ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data, char* mask); -ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data); -ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num); +void _tablet_set_target_name(Tablet tablet, char* target_name); +ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void* data, char* mask); +ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char** data); +ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp); ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index); // Write a tablet into a device. diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index df51bcfad..d15adf4f4 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -32,4 +32,5 @@ from .exceptions import * from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as ResultSet from .tsfile_writer import TsFileWriterPy as TsFileWriter -from .tsfile_table_writer import TsFileTableWriter \ No newline at end of file +from .tsfile_writer import CTablet +from .tsfile_table_writer import TsFileTableWriter diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index b65585cce..5acad9895 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -158,9 +158,10 @@ cdef extern from "./tsfile_cwrapper.h": void _free_tsfile_ts_record(TsRecord * record); + void _tablet_set_target_name(Tablet tablet, char * target_name); ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data, char* mask); ErrorCode _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data); - ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num); + ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp); # resulSet : query data from tsfile reader ResultSet tsfile_query_table(TsFileReader reader, diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index c312a0ed4..9b49b1769 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -17,7 +17,7 @@ # from tsfile import TableSchema, Tablet, TableNotExistError -from tsfile import TsFileWriter +from tsfile import TsFileWriter, CTablet class TsFileTableWriter: @@ -53,6 +53,13 @@ def write_table(self, tablet: Tablet): raise TableNotExistError self.writer.write_table(tablet) + def write_ctablet(self, tablet: CTablet): + if self.exclusive_table_name_ is None: + raise TableNotExistError + tablet.set_target_name(self.exclusive_table_name_) + self.writer.write_ctablet(tablet) + + def close(self): """ Close TsFileTableWriter and will flush data automatically. diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx index 64cab0a06..f3562af52 100644 --- a/python/tsfile/tsfile_writer.pyx +++ b/python/tsfile/tsfile_writer.pyx @@ -20,11 +20,17 @@ import pandas as pd import numpy as np +from cpython.unicode cimport PyUnicode_AsUTF8String + from libc.stdlib cimport free from libc.stdlib cimport malloc +from libc.string cimport strdup from .tsfile_cpp cimport * from .tsfile_py_cpp cimport * +import numpy as np +import pandas as pd +cimport numpy as cnp from tsfile.row_record import RowRecord from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema as DeviceSchemaPy @@ -41,7 +47,7 @@ _pandas_dtype_to_ts = { "string": TSDataTypePy.STRING } -cdef bint is_compatible(TSDataTypePy expected, TSDataTypePy actual): +cdef bint is_compatible(object expected, object actual): if expected == actual: return True if expected == TSDataTypePy.INT64 and actual == TSDataTypePy.INT32: @@ -50,7 +56,7 @@ cdef bint is_compatible(TSDataTypePy expected, TSDataTypePy actual): return True return False -def convert_series(pd.Series series, TSDataTypePy target) -> np.ndarray: +cdef object convert_series(object series, object target): dtype_map = { TSDataTypePy.INT64: "int64", TSDataTypePy.INT32: "int32", @@ -64,7 +70,7 @@ def convert_series(pd.Series series, TSDataTypePy target) -> np.ndarray: if str(series.dtype) == target_str: return series.to_numpy() return series.astype(target_str).to_numpy() -def encode_or_null(x): +cdef encode_or_null(x): if pd.isna(x): return None return str(x).encode('utf-8') @@ -83,30 +89,40 @@ cdef class CTablet: self.column_name = column_name self.data_type = data_types self.max_row_num = max_row_num - column_num = len(column_name) - if len(data_types) != column_num: + self.column_num = len(column_name) + if len(data_types) != self.column_num: raise ValueError("Length of column_name and data_types must be equal") - column_names = malloc(sizeof(char*) * column_num) - column_data_types = malloc(sizeof(TSDataType) * column_num) + self.column_names = malloc(sizeof(char*) * self.column_num) + self.column_data_types = malloc(sizeof(TSDataType) * self.column_num) ind = 0 for name, dtype in zip(column_name, data_types): - column_names[ind] = strdup(name.encode('utf-8')) - column_data_types[ind] = to_c_data_type(dtype) - - + self.column_names[ind] = strdup(name.encode('utf-8')) + self.column_data_types[ind] = to_c_data_type(dtype) + ind = ind + 1 + + cpdef set_target_name(self, object target_name): + cdef bytes device_id_bytes + cdef char * device_id_c + device_id_bytes = PyUnicode_AsUTF8String(target_name) + device_id_c = device_id_bytes + _tablet_set_target_name(self.tablet, device_id_c) + cdef Tablet get_tablet(self): + return self.tablet cdef init_c_tablet(self): if self.tablet != NULL: - free_tablet(self.tablet) - tablet_new(self.column_names, self.column_data_types, self.column_num, self.max_row_num) + free_tablet(&self.tablet) + self.tablet = tablet_new(self.column_names, self.column_data_types, self.column_num, self.max_row_num) cpdef from_data_frame(self, data_frame: pd.DataFrame): cdef void * data_ptr - cdef const uint32_t * mask_ptr + cdef char * mask_ptr cdef size_t length cdef char** str_ptr cdef bytes item + cdef int64_t* time_ptr + cdef cnp.ndarray[cnp.int64_t, ndim=1] time_array if not isinstance(data_frame, pd.DataFrame): raise TypeError("Input must be a pandas DataFrame") if data_frame.shape[1] != len(self.column_name) + 1: @@ -119,12 +135,15 @@ cdef class CTablet: if data_frame["time"].dtype != np.int64: raise TypeError(f"Column 'time' must be int64, but got {data_frame['time'].dtype}") - self.init_c_tablet() + if self.max_row_num !=len(data_frame["time"]): + raise ValueError(f"Time column length {len(data_frame['time'])} doesn't match expected {self.max_row_num}") - data_ptr = data_frame["time"].to_numpy().data - _tablet_set_batch_timestamp(self.tablet, data_ptr) + self.init_c_tablet() + time_array = data_frame["time"].to_numpy() + time_ptr = time_array.data + _tablet_set_batch_timestamp(self.tablet, time_ptr) - for i, col_name in enumerate(self.column_name): + for ind, col_name in enumerate(self.column_name): if col_name not in data_frame.columns: raise KeyError(f"Column '{col_name}' missing from DataFrame") series = data_frame[col_name] @@ -132,7 +151,7 @@ cdef class CTablet: if dtype_str not in _pandas_dtype_to_ts: raise TypeError(f"Unsupported pandas dtype {dtype_str} for column {col_name}") actual_ts_type = _pandas_dtype_to_ts[dtype_str] - expected_ts_type = self.data_type[i] + expected_ts_type = self.data_type[ind] if not is_compatible(expected_ts_type, actual_ts_type): raise TypeError( f"Column '{col_name}' type mismatch: expected {expected_ts_type.name}, got {actual_ts_type.name}") @@ -144,23 +163,33 @@ cdef class CTablet: if array[i] is None: str_ptr[i] = NULL else: - str_ptr[i] = array[i] - _tablet_set_batch_str(self.tablet, i, str_ptr) + str_ptr[i] = strdup(array[i]) + _tablet_set_batch_str(self.tablet, ind, str_ptr) for i in range(self.max_row_num): if str_ptr[i] != NULL: free(str_ptr[i]) free(str_ptr) else: array = convert_series(series, expected_ts_type) - mask = series.notna.to_numpy().astype(np.byte) - data_ptr = array.data - mask_ptr = mask.data - - _tablet_set_batch_data(self.tablet, i, data_ptr, mask_ptr) - + mask = series.notna().to_numpy().astype(np.byte) + data_ptr = cnp.PyArray_DATA(array) + mask_ptr = cnp.PyArray_DATA(mask) + _tablet_set_batch_data(self.tablet, ind, data_ptr, mask_ptr) + def __dealloc__(self): + if self.tablet != NULL: + free_tablet(&self.tablet) + self.tablet = NULL + if self.column_names != NULL: + for i in range(self.column_num): + free(self.column_names[i]) + free(self.column_names) + self.column_names = NULL + if self.column_data_types != NULL: + free(self.column_data_types) + self.column_data_types = NULL cdef class TsFileWriterPy: cdef TsFileWriter writer @@ -249,6 +278,13 @@ cdef class TsFileWriterPy: finally: free_c_tablet(ctablet) + + def write_ctablet(self, tablet: CTablet): + cdef ErrorCode errno + errno = _tsfile_writer_write_table(self.writer, tablet.get_tablet()) + check_error(errno) + + cpdef close(self): """ Flush data and Close tsfile writer.