Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.types.ArrayType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.MapType;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.types.VariantType;
Expand Down Expand Up @@ -153,13 +154,21 @@ static StringData convertToStringData(Object obj) {
"Cannot convert " + obj + " of type " + obj.getClass() + " to STRING DATA.");
}

static DecimalData convertToDecimalData(Object obj) {
static DecimalData convertToDecimalData(Object obj, DecimalType decimalType) {
if (obj instanceof DecimalData) {
return (DecimalData) obj;
DecimalData dd = (DecimalData) obj;
// Re-convert to target precision and scale if different
if (dd.precision() == decimalType.getPrecision()
&& dd.scale() == decimalType.getScale()) {
return dd;
}
return DecimalData.fromBigDecimal(
dd.toBigDecimal(), decimalType.getPrecision(), decimalType.getScale());
}
if (obj instanceof BigDecimal) {
BigDecimal bd = (BigDecimal) obj;
return DecimalData.fromBigDecimal(bd, bd.precision(), bd.scale());
return DecimalData.fromBigDecimal(
bd, decimalType.getPrecision(), decimalType.getScale());
}
Comment thread
yuxiqian marked this conversation as resolved.
throw new RuntimeException(
"Cannot convert " + obj + " of type " + obj.getClass() + " to DECIMAL DATA.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Function<Object, byte[]> visit(VarBinaryType varBinaryType) {

@Override
public Function<Object, DecimalData> visit(DecimalType decimalType) {
return CommonConverter::convertToDecimalData;
return obj -> CommonConverter.convertToDecimalData(obj, decimalType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void testConvertToDecimal() {
.hasToString("4.2");
assertThat(convertToInternal(new BigDecimal("-3.1415926"), DataTypes.DECIMAL(20, 10)))
.isInstanceOf(DecimalData.class)
.hasToString("-3.1415926");
.hasToString("-3.1415926000");

assertThat(
convertToInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,31 @@ void testNumericCastingsWithTruncation() throws Exception {
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");

// Test DECIMAL with maximum precision (38)
assertThat(runNumericCastingWith(generateCastTo("DECIMAL(38, 0)")))
.containsExactly(
"CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(38, 0),`small_c` DECIMAL(38, 0),`int_c` DECIMAL(38, 0),`bigint_c` DECIMAL(38, 0),`float_c` DECIMAL(38, 0),`double_c` DECIMAL(38, 0),`decimal_c` DECIMAL(38, 0),`valid_char_c` DECIMAL(38, 0),`invalid_char_c` DECIMAL(38, 0)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");

assertThat(runNumericCastingWith(generateCastTo("DECIMAL(38, 10)")))
.containsExactly(
"CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(38, 10),`small_c` DECIMAL(38, 10),`int_c` DECIMAL(38, 10),`bigint_c` DECIMAL(38, 10),`float_c` DECIMAL(38, 10),`double_c` DECIMAL(38, 10),`decimal_c` DECIMAL(38, 10),`valid_char_c` DECIMAL(38, 10),`invalid_char_c` DECIMAL(38, 10)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");

assertThat(runNumericCastingWith(generateCastTo("DECIMAL(38, 18)")))
.containsExactly(
"CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(38, 18),`small_c` DECIMAL(38, 18),`int_c` DECIMAL(38, 18),`bigint_c` DECIMAL(38, 18),`float_c` DECIMAL(38, 18),`double_c` DECIMAL(38, 18),`decimal_c` DECIMAL(38, 18),`valid_char_c` DECIMAL(38, 18),`invalid_char_c` DECIMAL(38, 18)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.000000000000000000, -3.000000000000000000, -4.000000000000000000, -5.000000000000000000, -6.700000000000000000, -8.900000000000000000, -10.110000000000000000, -12.130000000000000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.000000000000000000, 3.000000000000000000, 4.000000000000000000, 5.000000000000000000, 6.700000000000000000, 8.900000000000000000, 10.110000000000000000, 12.130000000000000000, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
}

@Test
Expand Down
59 changes: 58 additions & 1 deletion flink-cdc-composer/src/test/resources/specs/casting.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,69 @@
CAST('FOOBAR' AS DECIMAL(20, 5)) AS comp_7
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(19, 5),`comp_1` DECIMAL(19, 5),`comp_2` DECIMAL(19, 5),`comp_3` DECIMAL(19, 5),`comp_4` DECIMAL(19, 5),`comp_5` DECIMAL(19, 5),`comp_6` DECIMAL(19, 5),`comp_7` DECIMAL(19, 5)}, primaryKeys=id_, options=()}
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(20, 5),`comp_1` DECIMAL(20, 5),`comp_2` DECIMAL(20, 5),`comp_3` DECIMAL(20, 5),`comp_4` DECIMAL(20, 5),`comp_5` DECIMAL(20, 5),`comp_6` DECIMAL(20, 5),`comp_7` DECIMAL(20, 5)}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], after=[-1, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], after=[], op=DELETE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], after=[], op=DELETE, meta=()}
- do: Cast To Decimal(38, 0)
projection: |-
id_
CAST(null AS DECIMAL(38, 0)) AS comp_0
CAST(0 AS DECIMAL(38, 0)) AS comp_1
CAST(1 AS DECIMAL(38, 0)) AS comp_2
CAST('2.22' AS DECIMAL(38, 0)) AS comp_3
CAST('333' AS DECIMAL(38, 0)) AS comp_4
CAST('44.44' AS DECIMAL(38, 0)) AS comp_5
CAST('5555555' AS DECIMAL(38, 0)) AS comp_6
CAST('FOOBAR' AS DECIMAL(38, 0)) AS comp_7
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(38, 0),`comp_1` DECIMAL(38, 0),`comp_2` DECIMAL(38, 0),`comp_3` DECIMAL(38, 0),`comp_4` DECIMAL(38, 0),`comp_5` DECIMAL(38, 0),`comp_6` DECIMAL(38, 0),`comp_7` DECIMAL(38, 0)}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0, 1, 2, 333, 44, 5555555, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0, 1, 2, 333, 44, 5555555, null], after=[-1, null, 0, 1, 2, 333, 44, 5555555, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0, 1, 2, 333, 44, 5555555, null], after=[], op=DELETE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0, 1, 2, 333, 44, 5555555, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0, 1, 2, 333, 44, 5555555, null], after=[], op=DELETE, meta=()}
- do: Cast To Decimal(38, 10)
projection: |-
id_
CAST(null AS DECIMAL(38, 10)) AS comp_0
CAST(0 AS DECIMAL(38, 10)) AS comp_1
CAST(1 AS DECIMAL(38, 10)) AS comp_2
CAST('2.22' AS DECIMAL(38, 10)) AS comp_3
CAST('333' AS DECIMAL(38, 10)) AS comp_4
CAST('44.44' AS DECIMAL(38, 10)) AS comp_5
CAST('5555555' AS DECIMAL(38, 10)) AS comp_6
CAST('FOOBAR' AS DECIMAL(38, 10)) AS comp_7
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(38, 10),`comp_1` DECIMAL(38, 10),`comp_2` DECIMAL(38, 10),`comp_3` DECIMAL(38, 10),`comp_4` DECIMAL(38, 10),`comp_5` DECIMAL(38, 10),`comp_6` DECIMAL(38, 10),`comp_7` DECIMAL(38, 10)}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[-1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[], op=DELETE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[], op=DELETE, meta=()}
- do: Cast To Decimal(38, 18)
projection: |-
id_
CAST(null AS DECIMAL(38, 18)) AS comp_0
CAST(0 AS DECIMAL(38, 18)) AS comp_1
CAST(1 AS DECIMAL(38, 18)) AS comp_2
CAST('2.22' AS DECIMAL(38, 18)) AS comp_3
CAST('333' AS DECIMAL(38, 18)) AS comp_4
CAST('44.44' AS DECIMAL(38, 18)) AS comp_5
CAST('5555555' AS DECIMAL(38, 18)) AS comp_6
CAST('FOOBAR' AS DECIMAL(38, 18)) AS comp_7
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(38, 18),`comp_1` DECIMAL(38, 18),`comp_2` DECIMAL(38, 18),`comp_3` DECIMAL(38, 18),`comp_4` DECIMAL(38, 18),`comp_5` DECIMAL(38, 18),`comp_6` DECIMAL(38, 18),`comp_7` DECIMAL(38, 18)}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], after=[-1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], after=[], op=DELETE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], after=[], op=DELETE, meta=()}
- do: Cast To Timestamp (UTC)
projection: |-
id_
Expand Down
Loading
Loading