Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor,
spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad,
spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains, SparkDateDiff,
SparkDateTrunc, SparkSizeFunc, SparkStringSpace,
SparkDateTrunc, SparkMakeDate, SparkSizeFunc, SparkStringSpace,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -195,6 +195,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
Arc::new(ScalarUDF::new_from_impl(SparkContains::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
]
Expand Down
15 changes: 12 additions & 3 deletions native/spark-expr/src/datetime_funcs/date_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,18 @@ impl ScalarUDFImpl for SparkDateDiff {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let [end_date, start_date] = take_function_args(self.name(), args.args)?;

// Convert scalars to arrays for uniform processing
let end_arr = end_date.into_array(1)?;
let start_arr = start_date.into_array(1)?;
// Determine the batch size from array arguments (scalars have no inherent size)
let num_rows = [&end_date, &start_date]
.iter()
.find_map(|arg| match arg {
ColumnarValue::Array(array) => Some(array.len()),
ColumnarValue::Scalar(_) => None,
})
.unwrap_or(1);

// Convert scalars to arrays for uniform processing, using the correct batch size
let end_arr = end_date.into_array(num_rows)?;
let start_arr = start_date.into_array(num_rows)?;

let end_date_array = end_arr
.as_any()
Expand Down
236 changes: 236 additions & 0 deletions native/spark-expr/src/datetime_funcs/make_date.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Array, Date32Array, Int32Array};
use arrow::compute::cast;
use arrow::datatypes::DataType;
use chrono::NaiveDate;
use datafusion::common::{utils::take_function_args, DataFusionError, Result};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

/// Spark-compatible make_date function.
/// Creates a date from year, month, and day columns.
/// Returns NULL for invalid dates (e.g., Feb 30, month 13, etc.) instead of throwing an error.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkMakeDate {
signature: Signature,
}

impl SparkMakeDate {
pub fn new() -> Self {
Self {
// Accept any numeric type - we'll cast to Int32 internally
signature: Signature::any(3, Volatility::Immutable),
}
}
}

impl Default for SparkMakeDate {
fn default() -> Self {
Self::new()
}
}

/// Cast an array to Int32Array if it's not already Int32.
fn cast_to_int32(arr: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
if arr.data_type() == &DataType::Int32 {
Ok(Arc::clone(arr))
} else {
cast(arr.as_ref(), &DataType::Int32)
.map_err(|e| DataFusionError::Execution(format!("Failed to cast to Int32: {e}")))
}
}

/// Convert year, month, day to days since Unix epoch (1970-01-01).
/// Returns None if the date is invalid.
fn make_date(year: i32, month: i32, day: i32) -> Option<i32> {
// Validate month and day ranges first
if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
return None;
}

// Try to create a valid date
NaiveDate::from_ymd_opt(year, month as u32, day as u32).map(|date| {
date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
.num_days() as i32
})
}

impl ScalarUDFImpl for SparkMakeDate {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"make_date"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Date32)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let [year, month, day] = take_function_args(self.name(), args.args)?;

// Determine the batch size from array arguments (scalars have no inherent size)
let num_rows = [&year, &month, &day]
.iter()
.find_map(|arg| match arg {
ColumnarValue::Array(array) => Some(array.len()),
ColumnarValue::Scalar(_) => None,
})
.unwrap_or(1);

// Convert scalars to arrays for uniform processing, using the correct batch size
let year_arr = year.into_array(num_rows)?;
let month_arr = month.into_array(num_rows)?;
let day_arr = day.into_array(num_rows)?;

// Cast to Int32 if needed (handles Int64 literals from SQL)
let year_arr = cast_to_int32(&year_arr)?;
let month_arr = cast_to_int32(&month_arr)?;
let day_arr = cast_to_int32(&day_arr)?;

let year_array = year_arr
.as_any()
.downcast_ref::<Int32Array>()
.ok_or_else(|| {
DataFusionError::Execution("make_date: failed to cast year to Int32".to_string())
})?;

let month_array = month_arr
.as_any()
.downcast_ref::<Int32Array>()
.ok_or_else(|| {
DataFusionError::Execution("make_date: failed to cast month to Int32".to_string())
})?;

let day_array = day_arr
.as_any()
.downcast_ref::<Int32Array>()
.ok_or_else(|| {
DataFusionError::Execution("make_date: failed to cast day to Int32".to_string())
})?;

let len = year_array.len();
let mut builder = Date32Array::builder(len);

for i in 0..len {
if year_array.is_null(i) || month_array.is_null(i) || day_array.is_null(i) {
builder.append_null();
} else {
let y = year_array.value(i);
let m = month_array.value(i);
let d = day_array.value(i);

match make_date(y, m, d) {
Some(days) => builder.append_value(days),
None => builder.append_null(),
}
}
}

Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_make_date_valid() {
// Unix epoch
assert_eq!(make_date(1970, 1, 1), Some(0));
// Day after epoch
assert_eq!(make_date(1970, 1, 2), Some(1));
// Day before epoch
assert_eq!(make_date(1969, 12, 31), Some(-1));
// Leap years - just verify they return Some (valid dates)
assert!(make_date(2000, 2, 29).is_some()); // 2000 is a leap year
assert!(make_date(2004, 2, 29).is_some()); // 2004 is a leap year
// Regular date
assert!(make_date(2023, 6, 15).is_some());
}

#[test]
fn test_make_date_invalid_month() {
assert_eq!(make_date(2023, 0, 15), None);
assert_eq!(make_date(2023, 13, 15), None);
assert_eq!(make_date(2023, -1, 15), None);
}

#[test]
fn test_make_date_invalid_day() {
assert_eq!(make_date(2023, 6, 0), None);
assert_eq!(make_date(2023, 6, 32), None);
assert_eq!(make_date(2023, 6, -1), None);
}

#[test]
fn test_make_date_invalid_dates() {
// Feb 30 never exists
assert_eq!(make_date(2023, 2, 30), None);
// Feb 29 on non-leap year
assert_eq!(make_date(2023, 2, 29), None);
// 1900 is not a leap year (divisible by 100 but not 400)
assert_eq!(make_date(1900, 2, 29), None);
// 2100 will not be a leap year
assert_eq!(make_date(2100, 2, 29), None);
// April has 30 days
assert_eq!(make_date(2023, 4, 31), None);
}

#[test]
fn test_make_date_extreme_years() {
// Spark supports dates from 0001-01-01 to 9999-12-31 (Proleptic Gregorian calendar)

// Minimum valid date in Spark: 0001-01-01
assert!(make_date(1, 1, 1).is_some(), "Year 1 should be valid");

// Maximum valid date in Spark: 9999-12-31
assert!(
make_date(9999, 12, 31).is_some(),
"Year 9999 should be valid"
);

// Year 0 - In Proleptic Gregorian calendar, year 0 = 1 BCE
// Spark returns NULL for year 0 in make_date
// chrono supports year 0, but we should match Spark's behavior
// For now, chrono allows it - this may need adjustment for full Spark compatibility
let year_0_result = make_date(0, 1, 1);
// chrono allows year 0 (1 BCE in proleptic Gregorian)
assert!(year_0_result.is_some(), "chrono allows year 0");

// Negative years - Spark returns NULL for negative years
// chrono supports negative years (BCE dates)
let negative_year_result = make_date(-1, 1, 1);
// chrono allows negative years
assert!(
negative_year_result.is_some(),
"chrono allows negative years"
);
}
}
2 changes: 2 additions & 0 deletions native/spark-expr/src/datetime_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod date_diff;
mod date_trunc;
mod extract_date_part;
mod make_date;
mod timestamp_trunc;
mod unix_timestamp;

Expand All @@ -26,5 +27,6 @@ pub use date_trunc::SparkDateTrunc;
pub use extract_date_part::SparkHour;
pub use extract_date_part::SparkMinute;
pub use extract_date_part::SparkSecond;
pub use make_date::SparkMakeDate;
pub use timestamp_trunc::TimestampTruncExpr;
pub use unix_timestamp::SparkUnixTimestamp;
4 changes: 2 additions & 2 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ pub use comet_scalar_funcs::{
};
pub use csv_funcs::*;
pub use datetime_funcs::{
SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp,
TimestampTruncExpr,
SparkDateDiff, SparkDateTrunc, SparkHour, SparkMakeDate, SparkMinute, SparkSecond,
SparkUnixTimestamp, TimestampTruncExpr,
};
pub use error::{SparkError, SparkResult};
pub use hash_funcs::*;
Expand Down
43 changes: 43 additions & 0 deletions native/spark-expr/tests/spark_expr_reg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod tests {
use datafusion::execution::FunctionRegistry;
use datafusion::prelude::SessionContext;
use datafusion_comet_spark_expr::create_comet_physical_fun;
use datafusion_comet_spark_expr::register_all_comet_functions;

#[tokio::test]
async fn test_udf_registration() -> Result<()> {
Expand All @@ -48,4 +49,46 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_make_date_returns_null_for_invalid_input() -> Result<()> {
// Setup session with all Comet functions registered
let mut ctx = SessionContext::new();
register_all_comet_functions(&mut ctx)?;

// Test that make_date returns NULL for invalid month (0)
// DataFusion's built-in make_date would throw an error
let df = ctx.sql("SELECT make_date(2023, 0, 15)").await?;
let results = df.collect().await?;

// Should return one row with NULL
assert_eq!(results.len(), 1);
assert_eq!(results[0].num_rows(), 1);

// The result should be NULL for invalid input
let column = results[0].column(0);
assert!(column.is_null(0), "Expected NULL for invalid month");

Ok(())
}

#[tokio::test]
async fn test_make_date_valid_input() -> Result<()> {
// Setup session with all Comet functions registered
let mut ctx = SessionContext::new();
register_all_comet_functions(&mut ctx)?;

// Test that make_date works for valid input
let df = ctx.sql("SELECT make_date(1970, 1, 1)").await?;
let results = df.collect().await?;

assert_eq!(results.len(), 1);
assert_eq!(results[0].num_rows(), 1);

// Should return epoch date (1970-01-01 = day 0)
let column = results[0].column(0);
assert!(!column.is_null(0), "Expected valid date for epoch");

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[FromUnixTime] -> CometFromUnixTime,
classOf[LastDay] -> CometLastDay,
classOf[Hour] -> CometHour,
classOf[MakeDate] -> CometMakeDate,
classOf[Minute] -> CometMinute,
classOf[Second] -> CometSecond,
classOf[TruncDate] -> CometTruncDate,
Expand Down
4 changes: 3 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -310,6 +310,8 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add")

object CometDateSub extends CometScalarFunction[DateSub]("date_sub")

object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")

object CometLastDay extends CometScalarFunction[LastDay]("last_day")

object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff")
Expand Down
Loading
Loading