Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,25 @@ public final class MqttMessages {
public static final String FIND_MQTT_PLUGIN =
"PayloadFormatManager(), find MQTT Payload Plugin {}.";
public static final String MQTT_PLUGIN_JAR_URLS = "MQTT Plugin jarURLs: {}";
public static final String UNKNOWN_PAYLOAD_FORMAT_NAMED = "Unknown payload format named: ";

// --- JSONPayloadFormatter ---
public static final String PAYLOAD_INVALID = "payload is invalidate";

private MqttMessages() {}
// ---------------------------------------------------------------------------
// Additional auto-collected messages
// ---------------------------------------------------------------------------
public static final String LOG_LINE_PATTERN_PARSING_FAILS_FAILED_LINE_MESSAGE_ARG_EXCEPTION_6EFB0EE2 = "The line pattern parsing fails, and the failed line message is {} ,exception is";
public static final String LOG_CONNECTION_REFUSED_CLIENT_ID_MISSING_EMPTY_VALID_CLIENT_ID_REQUIRED_A566DC15 =
"Connection refused: client_id is missing or empty. A valid client_id is required to establish"
+ " a connection.";
public static final String LOG_RECEIVE_PUBLISH_MESSAGE_CLIENTID_ARG_USERNAME_ARG_QOS_ARG_TOPIC_7E60C3A6 = "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}";
public static final String LOG_MQTT_JSON_INSERT_ERROR_CODE_ARG_MESSAGE_ARG_B1A78FBD = "mqtt json insert error, code={}, message={}";
public static final String LOG_MEET_ERROR_INSERTING_DATABASE_ARG_TABLE_ARG_TAGS_ARG_ATTRIBUTES_173457D5 =
"meet error when inserting database {}, table {}, tags {}, attributes {}, fields {}, at time"
+ " {}, because ";
public static final String LOG_MEET_ERROR_INSERTING_DEVICE_ARG_MEASUREMENTS_ARG_AT_TIME_ARG_680D67D2 = "meet error when inserting device {}, measurements {}, at time {}, because ";
public static final String LOG_START_MQTT_SERVICE_SUCCESSFULLY_LISTENING_IP_ARG_PORT_ARG_47CE46D5 = "Start MQTT service successfully, listening on ip {} port {}";

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,21 @@ public final class MqttMessages {
public static final String FIND_MQTT_PLUGIN =
"PayloadFormatManager(),找到 MQTT Payload 插件 {}。";
public static final String MQTT_PLUGIN_JAR_URLS = "MQTT 插件 jarURLs:{}";
public static final String UNKNOWN_PAYLOAD_FORMAT_NAMED = "未知 payload 格式名称:";

// --- JSONPayloadFormatter ---
public static final String PAYLOAD_INVALID = "payload 无效";

private MqttMessages() {}
// ---------------------------------------------------------------------------
// Additional auto-collected messages
// ---------------------------------------------------------------------------
public static final String LOG_LINE_PATTERN_PARSING_FAILS_FAILED_LINE_MESSAGE_ARG_EXCEPTION_6EFB0EE2 = "行模式解析失败,失败的行消息为 {},异常为";
public static final String LOG_CONNECTION_REFUSED_CLIENT_ID_MISSING_EMPTY_VALID_CLIENT_ID_REQUIRED_A566DC15 = "连接被拒绝:client_id 缺失或为空。建立连接需要有效的 client_id。";
public static final String LOG_RECEIVE_PUBLISH_MESSAGE_CLIENTID_ARG_USERNAME_ARG_QOS_ARG_TOPIC_7E60C3A6 = "收到 publish 消息。clientId: {}, 用户名: {}, qos: {}, 主题: {}, payload: {}";
public static final String LOG_MQTT_JSON_INSERT_ERROR_CODE_ARG_MESSAGE_ARG_B1A78FBD = "MQTT JSON 插入错误,code={},消息={}";
public static final String LOG_MEET_ERROR_INSERTING_DATABASE_ARG_TABLE_ARG_TAGS_ARG_ATTRIBUTES_173457D5 = "插入数据库 {}、表 {}、标签 {}、属性 {}、字段 {}、时间 {} 时遇到错误,原因:";
public static final String LOG_MEET_ERROR_INSERTING_DEVICE_ARG_MEASUREMENTS_ARG_AT_TIME_ARG_680D67D2 = "插入设备 {}、测点 {}、时间 {} 时遇到错误,原因:";
public static final String LOG_START_MQTT_SERVICE_SUCCESSFULLY_LISTENING_IP_ARG_PORT_ARG_47CE46D5 = "MQTT 服务启动成功,监听 IP {},端口 {}";

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public List<Message> format(String topic, ByteBuf payload) {
messages.add(message);
} catch (Exception e) {
log.warn(
"The line pattern parsing fails, and the failed line message is {} ,exception is",
MqttMessages.LOG_LINE_PATTERN_PARSING_FAILS_FAILED_LINE_MESSAGE_ARG_EXCEPTION_6EFB0EE2,
line,
e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public String getID() {
public void onConnect(InterceptConnectMessage msg) {
if (msg.getClientID() == null || msg.getClientID().trim().isEmpty()) {
LOG.error(
"Connection refused: client_id is missing or empty. A valid client_id is required to establish a connection.");
MqttMessages
.LOG_CONNECTION_REFUSED_CLIENT_ID_MISSING_EMPTY_VALID_CLIENT_ID_REQUIRED_A566DC15);
}
if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
MqttClientSession session = new MqttClientSession(msg.getClientID());
Expand Down Expand Up @@ -136,7 +137,8 @@ public void onPublish(InterceptPublishMessage msg) {
String username = msg.getUsername();
MqttQoS qos = msg.getQos();
LOG.debug(
"Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
MqttMessages
.LOG_RECEIVE_PUBLISH_MESSAGE_CLIENTID_ARG_USERNAME_ARG_QOS_ARG_TOPIC_7E60C3A6,
clientId,
username,
qos,
Expand Down Expand Up @@ -197,13 +199,13 @@ private void insertTable(TableMessage message, MqttClientSession session) {
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
LOG.warn(
"mqtt json insert error, code={}, message={}",
MqttMessages.LOG_MQTT_JSON_INSERT_ERROR_CODE_ARG_MESSAGE_ARG_B1A78FBD,
tsStatus.getCode(),
tsStatus.getMessage());
}
} catch (Exception e) {
LOG.warn(
"meet error when inserting database {}, table {}, tags {}, attributes {}, fields {}, at time {}, because ",
MqttMessages.LOG_MEET_ERROR_INSERTING_DATABASE_ARG_TABLE_ARG_TAGS_ARG_ATTRIBUTES_173457D5,
message.getDatabase(),
message.getTable(),
message.getTagKeys(),
Expand Down Expand Up @@ -316,14 +318,14 @@ private void insertTree(TreeMessage message, MqttClientSession session) {
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
LOG.warn(
"mqtt json insert error, code={}, message={}",
MqttMessages.LOG_MQTT_JSON_INSERT_ERROR_CODE_ARG_MESSAGE_ARG_B1A78FBD,
tsStatus.getCode(),
tsStatus.getMessage());
}
}
} catch (Exception e) {
LOG.warn(
"meet error when inserting device {}, measurements {}, at time {}, because ",
MqttMessages.LOG_MEET_ERROR_INSERTING_DEVICE_ARG_MEASUREMENTS_ARG_AT_TIME_ARG_680D67D2,
message.getDevice(),
message.getMeasurements(),
message.getTimestamp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void startup() {
}

LOG.info(
"Start MQTT service successfully, listening on ip {} port {}",
MqttMessages.LOG_START_MQTT_SERVICE_SUCCESSFULLY_LISTENING_IP_ARG_PORT_ARG_47CE46D5,
iotDBConfig.getMqttHost(),
iotDBConfig.getMqttPort());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private static void init() {

public static PayloadFormatter getPayloadFormat(String name) {
PayloadFormatter formatter = mqttPayloadPluginMap.get(name);
Preconditions.checkArgument(formatter != null, "Unknown payload format named: " + name);
Preconditions.checkArgument(
formatter != null, MqttMessages.UNKNOWN_PAYLOAD_FORMAT_NAMED + name);
return formatter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,53 @@ public final class RestMessages {

// --- RequestValidationHandler (v2) ---
public static final String PREFIX_PATHS_EMPTY = "prefix_paths should not be empty";
public static final String SQL_SHOULD_NOT_BE_NULL = "sql should not be null";
public static final String ROW_LIMIT_SHOULD_BE_POSITIVE = "row_limit should be positive";
public static final String ROW_LIMIT_CAMEL_SHOULD_BE_POSITIVE = "rowLimit should be positive";
public static final String PREFIX_PATHS_NOT_NULL = "prefix_paths should not be null";
public static final String TIMESTAMPS_NOT_NULL = "timestamps should not be null";
public static final String IS_ALIGNED_NOT_NULL = "is_aligned should not be null";
public static final String IS_ALIGNED_CAMEL_NOT_NULL = "isAligned should not be null";
public static final String DEVICE_NOT_NULL = "device should not be null";
public static final String DEVICE_ID_NOT_NULL = "deviceId should not be null";
public static final String DATA_TYPES_NOT_NULL = "data_types should not be null";
public static final String DATA_TYPES_CAMEL_NOT_NULL = "dataTypes should not be null";
public static final String MEASUREMENTS_NOT_NULL = "measurements should not be null";
public static final String VALUES_NOT_NULL = "values should not be null";
public static final String DEVICES_NOT_NULL = "devices should not be null";
public static final String DATA_TYPES_LIST_NOT_NULL = "data_types_list should not be null";
public static final String VALUES_LIST_NOT_NULL = "values_list should not be null";
public static final String MEASUREMENTS_LIST_NOT_NULL = "measurements_list should not be null";
public static final String EXPRESSION_NOT_NULL = "expression should not be null";
public static final String PREFIX_PATH_NOT_NULL = "prefix_path should not be null";
public static final String PREFIX_PATH_CAMEL_NOT_NULL = "prefixPath should not be null";
public static final String START_TIME_NOT_NULL = "start_time should not be null";
public static final String START_TIME_CAMEL_NOT_NULL = "startTime should not be null";
public static final String END_TIME_NOT_NULL = "end_time should not be null";
public static final String END_TIME_CAMEL_NOT_NULL = "endTime should not be null";
public static final String DATABASE_NOT_NULL = "database should not be null";
public static final String TABLE_NOT_NULL = "table should not be null";
public static final String COLUMN_NAMES_NOT_NULL = "column_names should not be null";
public static final String COLUMN_CATEGORIES_NOT_NULL =
"column_categories should not be null";
public static final String COLUMN_NAMES_AND_COLUMN_CATEGORIES_SIZE_MISMATCH =
"column_names and column_categories should have the same size";
public static final String COLUMN_CATEGORIES_AND_DATA_TYPES_SIZE_MISMATCH =
"column_categories and data_types should have the same size";
public static final String VALUES_AND_TIMESTAMPS_SIZE_MISMATCH =
"values and timestamps should have the same size";
public static final String ILLEGAL_TABLE_DATA_TYPE =
"The %s data type of %s is illegal";
public static final String ILLEGAL_DEVICE_MEASUREMENT_DATA_TYPE =
"The %s data type of %s.%s is illegal";
public static final String ROW_VALUES_SIZE_MISMATCH =
"The number of values in the %dth row is not equal to the data_types size";
public static final String ERROR_MESSAGE_SEPARATOR = ",";

private RestMessages() {}
// ---------------------------------------------------------------------------
// Additional auto-collected messages
// ---------------------------------------------------------------------------
public static final String EXCEPTION_UNSUPPORTED_DATA_TYPE_0521CEDE = "unsupported data type: ";

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,51 @@ public final class RestMessages {

// --- RequestValidationHandler (v2) ---
public static final String PREFIX_PATHS_EMPTY = "prefix_paths 不能为空";
public static final String SQL_SHOULD_NOT_BE_NULL = "sql 不能为空";
public static final String ROW_LIMIT_SHOULD_BE_POSITIVE = "row_limit 应为正数";
public static final String ROW_LIMIT_CAMEL_SHOULD_BE_POSITIVE = "rowLimit 应为正数";
public static final String PREFIX_PATHS_NOT_NULL = "prefix_paths 不能为空";
public static final String TIMESTAMPS_NOT_NULL = "timestamps 不能为空";
public static final String IS_ALIGNED_NOT_NULL = "is_aligned 不能为空";
public static final String IS_ALIGNED_CAMEL_NOT_NULL = "isAligned 不能为空";
public static final String DEVICE_NOT_NULL = "device 不能为空";
public static final String DEVICE_ID_NOT_NULL = "deviceId 不能为空";
public static final String DATA_TYPES_NOT_NULL = "data_types 不能为空";
public static final String DATA_TYPES_CAMEL_NOT_NULL = "dataTypes 不能为空";
public static final String MEASUREMENTS_NOT_NULL = "measurements 不能为空";
public static final String VALUES_NOT_NULL = "values 不能为空";
public static final String DEVICES_NOT_NULL = "devices 不能为空";
public static final String DATA_TYPES_LIST_NOT_NULL = "data_types_list 不能为空";
public static final String VALUES_LIST_NOT_NULL = "values_list 不能为空";
public static final String MEASUREMENTS_LIST_NOT_NULL = "measurements_list 不能为空";
public static final String EXPRESSION_NOT_NULL = "expression 不能为空";
public static final String PREFIX_PATH_NOT_NULL = "prefix_path 不能为空";
public static final String PREFIX_PATH_CAMEL_NOT_NULL = "prefixPath 不能为空";
public static final String START_TIME_NOT_NULL = "start_time 不能为空";
public static final String START_TIME_CAMEL_NOT_NULL = "startTime 不能为空";
public static final String END_TIME_NOT_NULL = "end_time 不能为空";
public static final String END_TIME_CAMEL_NOT_NULL = "endTime 不能为空";
public static final String DATABASE_NOT_NULL = "database 不能为空";
public static final String TABLE_NOT_NULL = "table 不能为空";
public static final String COLUMN_NAMES_NOT_NULL = "column_names 不能为空";
public static final String COLUMN_CATEGORIES_NOT_NULL = "column_categories 不能为空";
public static final String COLUMN_NAMES_AND_COLUMN_CATEGORIES_SIZE_MISMATCH =
"column_names 和 column_categories 的数量应相同";
public static final String COLUMN_CATEGORIES_AND_DATA_TYPES_SIZE_MISMATCH =
"column_categories 和 data_types 的数量应相同";
public static final String VALUES_AND_TIMESTAMPS_SIZE_MISMATCH =
"values 和 timestamps 的数量应相同";
public static final String ILLEGAL_TABLE_DATA_TYPE = "%s 是 %s 的非法数据类型";
public static final String ILLEGAL_DEVICE_MEASUREMENT_DATA_TYPE =
"%s 是 %s.%s 的非法数据类型";
public static final String ROW_VALUES_SIZE_MISMATCH =
"第 %d 行的 values 数量与 data_types 数量不相等";
public static final String ERROR_MESSAGE_SEPARATOR = ",";

private RestMessages() {}
// ---------------------------------------------------------------------------
// Additional auto-collected messages
// ---------------------------------------------------------------------------
public static final String EXCEPTION_UNSUPPORTED_DATA_TYPE_0521CEDE = "不支持的数据类型: ";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.iotdb.rest.protocol.table.v1.handler;

import org.apache.iotdb.rest.i18n.RestMessages;
import org.apache.iotdb.rest.protocol.table.v1.model.InsertTabletRequest;
import org.apache.iotdb.rest.protocol.table.v1.model.SQL;

Expand All @@ -32,56 +33,54 @@ public class RequestValidationHandler {
private RequestValidationHandler() {}

public static void validateSQL(SQL sql) {
Objects.requireNonNull(sql.getSql(), "sql should not be null");
Objects.requireNonNull(sql.getSql(), RestMessages.SQL_SHOULD_NOT_BE_NULL);
if (sql.getRowLimit() != null) {
Validate.isTrue(sql.getRowLimit() > 0, "row_limit should be positive");
Validate.isTrue(sql.getRowLimit() > 0, RestMessages.ROW_LIMIT_SHOULD_BE_POSITIVE);
}
}

public static void validateInsertTabletRequest(InsertTabletRequest insertTabletRequest) {
Objects.requireNonNull(insertTabletRequest.getDatabase(), "database should not be null");
Objects.requireNonNull(insertTabletRequest.getTable(), "table should not be null");
Objects.requireNonNull(insertTabletRequest.getColumnNames(), "column_names should not be null");
Objects.requireNonNull(insertTabletRequest.getDatabase(), RestMessages.DATABASE_NOT_NULL);
Objects.requireNonNull(insertTabletRequest.getTable(), RestMessages.TABLE_NOT_NULL);
Objects.requireNonNull(
insertTabletRequest.getColumnCategories(), "column_categories should not be null");
Objects.requireNonNull(insertTabletRequest.getDataTypes(), "data_types should not be null");
Objects.requireNonNull(insertTabletRequest.getTimestamps(), "timestamps should not be null");
Objects.requireNonNull(insertTabletRequest.getValues(), "values should not be null");
insertTabletRequest.getColumnNames(), RestMessages.COLUMN_NAMES_NOT_NULL);
Objects.requireNonNull(
insertTabletRequest.getColumnCategories(), RestMessages.COLUMN_CATEGORIES_NOT_NULL);
Objects.requireNonNull(insertTabletRequest.getDataTypes(), RestMessages.DATA_TYPES_NOT_NULL);
Objects.requireNonNull(insertTabletRequest.getTimestamps(), RestMessages.TIMESTAMPS_NOT_NULL);
Objects.requireNonNull(insertTabletRequest.getValues(), RestMessages.VALUES_NOT_NULL);
List<String> errorMessages = new ArrayList<>();
String table = insertTabletRequest.getTable();
if (insertTabletRequest.getColumnCategories().size() == 0
|| insertTabletRequest.getColumnCategories().size()
!= insertTabletRequest.getColumnNames().size()) {
errorMessages.add("column_names and column_categories should have the same size");
errorMessages.add(RestMessages.COLUMN_NAMES_AND_COLUMN_CATEGORIES_SIZE_MISMATCH);
}
if (insertTabletRequest.getColumnCategories().size()
!= insertTabletRequest.getDataTypes().size()) {
errorMessages.add("column_categories and data_types should have the same size");
errorMessages.add(RestMessages.COLUMN_CATEGORIES_AND_DATA_TYPES_SIZE_MISMATCH);
}
if (insertTabletRequest.getTimestamps().size() != insertTabletRequest.getValues().size()) {
errorMessages.add("values and timestamps should have the same size");
errorMessages.add(RestMessages.VALUES_AND_TIMESTAMPS_SIZE_MISMATCH);
}

for (int i = 0; i < insertTabletRequest.getDataTypes().size(); i++) {
String dataType = insertTabletRequest.getDataTypes().get(i);
if (isDataType(dataType)) {
errorMessages.add("The " + dataType + " data type of " + table + " is illegal");
errorMessages.add(String.format(RestMessages.ILLEGAL_TABLE_DATA_TYPE, dataType, table));
}
}

int dataTypeSize = insertTabletRequest.getDataTypes().size();
for (int i = 0; i < insertTabletRequest.getValues().size(); i++) {
List<Object> values = insertTabletRequest.getValues().get(i);
if (dataTypeSize != values.size()) {
errorMessages.add(
"The number of values in the "
+ (i + 1)
+ "th row is not equal to the data_types size");
errorMessages.add(String.format(RestMessages.ROW_VALUES_SIZE_MISMATCH, i + 1));
}
}

if (!errorMessages.isEmpty()) {
throw new RuntimeException(String.join(",", errorMessages));
throw new RuntimeException(String.join(RestMessages.ERROR_MESSAGE_SEPARATOR, errorMessages));
}
}

Expand Down
Loading
Loading