[FLINK-39401] Extend raw format to support line-delimiter option#27897
[FLINK-39401] Extend raw format to support line-delimiter option#27897featzhang wants to merge 1 commit intoapache:masterfrom
Conversation
Add a new optional `raw.line-delimiter` config option to the raw format. - RawFormatOptions: add LINE_DELIMITER ConfigOption<String> with no default value - RawFormatFactory: read the option and pass it to schema constructors; register it in optionalOptions() - RawFormatDeserializationSchema: override deserialize(byte[], Collector) to split the message by the delimiter and emit one RowData per part when the delimiter is set; single-record deserialize(byte[]) is unchanged for backward compatibility - RawFormatSerializationSchema: append delimiter bytes to the serialized value when the delimiter is set; null rows are unaffected - RawFormatFactoryTest: add testLineDelimiterOption() covering factory wiring with the new option - RawFormatLineDelimiterTest: new test class covering deserialization splitting (newline, custom delimiter, GBK charset, null message) and serialization appending (newline, custom delimiter, null row)
|
@flinkbot run azure |
|
@featzhang Thanks for this contribution. The commit message in the PR should include the Jira ticket id. |
| if (lineDelimiter == null || valueBytes == null) { | ||
| return valueBytes; | ||
| } | ||
| byte[] delimiterBytes = lineDelimiter.getBytes(Charset.forName(charsetName)); |
There was a problem hiding this comment.
Thanks for the Jira note on split(-1) — makes sense for preserving intentional empty middle segments.
One thing I noticed though: the serializer appends the delimiter to every row, so a message it produces always ends with the delimiter. When that message is read back with the same config, split(-1) will emit a trailing empty row. So the two halves of the feature are subtly incompatible when used together on the same table.
When added a round-trip test:
- Serialize "hello" → produces "hello\n"
- Deserialize "hello\n" → expected 1 row, got 2
Would be good to note in the PR about the limitation.
| Set<ConfigOption<?>> options = new HashSet<>(); | ||
| options.add(RawFormatOptions.ENDIANNESS); | ||
| options.add(RawFormatOptions.CHARSET); | ||
| options.add(RawFormatOptions.LINE_DELIMITER); |
There was a problem hiding this comment.
raw.line-delimiter is not added to docs/content/docs/connectors/table/formats/raw.md or the Chinese equivalent.
|
|
||
| Charset charset = Charset.forName(charsetName); | ||
| String decoded = new String(message, charset); | ||
| String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1); |
There was a problem hiding this comment.
nit: Pattern.quote(lineDelimiter) is recompiled on every message — since the delimiter is fixed, could this be pre-compiled as a Pattern field in the constructor?
| if (lineDelimiter == null || valueBytes == null) { | ||
| return valueBytes; | ||
| } | ||
| byte[] delimiterBytes = lineDelimiter.getBytes(Charset.forName(charsetName)); |
There was a problem hiding this comment.
nit: delimiterBytes is allocated on every row — since both lineDelimiter and charsetName are fixed at construction time, this could be computed once and stored as a field.
Summary
This PR extends the
rawformat to support a new optionalraw.line-delimiterconfig option.When
raw.line-delimiteris set:raw.charset, split by the delimiter (String.split(Pattern.quote(delimiter), -1)), and oneRowDatais emitted per segment viadeserialize(byte[], Collector<T>).When
raw.line-delimiteris not set, all existing behavior is preserved exactly (backward compatible).Example SQL
Changes
RawFormatOptionsLINE_DELIMITERConfigOption<String>with no default valueRawFormatFactoryoptionalOptions()RawFormatDeserializationSchemadeserialize(byte[], Collector)to split by delimiter when set; addlineDelimiterfield toequals/hashCodeRawFormatSerializationSchemalineDelimiterfield toequals/hashCodeRawFormatFactoryTesttestLineDelimiterOption()RawFormatLineDelimiterTestTest Plan
RawFormatLineDelimiterTest(9 tests):\ndelimiter → 3 rows||→ 3 rows\ndelimiter → correct splitting\n→ appends\n||→ appends||RawFormatFactoryTest.testLineDelimiterOption(): verifies factory produces schemas with correct delimiter