Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions bigframes/core/compile/ibis_compiler/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,29 @@ def _(
).to_expr()


@compile_ordered_unary_agg.register
def _(
op: agg_ops.StringAggOp,
column: ibis_types.Column,
window=None,
order_by: typing.Sequence[ibis_types.Value] = [],
) -> ibis_types.ArrayValue:
if window is not None:
raise NotImplementedError(
f"StringAgg with windowing is not supported. {constants.FEEDBACK_LINK}"
)

return (
ibis_ops.StringAgg(
column, # type: ignore
sep=op.sep, # type: ignore
order_by=order_by, # type: ignore
)
.to_expr()
.fill_null(ibis_types.literal(""))
)


@compile_binary_agg.register
def _(
op: agg_ops.CorrOp, left: ibis_types.Column, right: ibis_types.Column, window=None
Expand Down
4 changes: 4 additions & 0 deletions bigframes/core/compile/ibis_compiler/scalar_op_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,10 @@ def array_reduce_op_impl(x: ibis_types.Value, op: ops.ArrayReduceOp):
lambda arr_vals: agg_compilers.compile_unary_agg(
op.aggregation, typing.cast(ibis_types.Column, arr_vals)
)
if op.aggregation.order_independent
else agg_compilers.compile_ordered_unary_agg(
op.aggregation, typing.cast(ibis_types.Column, arr_vals)
)
Comment on lines 1220 to +1230
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just use a if-else block here:

casted_x = typing.cast(ibis_types.ArrayValue, x)
if op.aggregation.order_independent:
  return casted_x.reduce(...)
else:
  return casted_x.reduce(...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

)


Expand Down
23 changes: 20 additions & 3 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,26 @@ def skips_nulls(self):
return True

def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
return pd.ArrowDtype(
pa.list_(dtypes.bigframes_dtype_to_arrow_dtype(input_types[0]))
)
return dtypes.list_type(input_types[0])


@dataclasses.dataclass(frozen=True)
class StringAggOp(UnaryAggregateOp):
name: ClassVar[str] = "string_agg"
sep: str = ","

@property
def order_independent(self):
return False

@property
def skips_nulls(self):
return True

def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
if input_types[0] != dtypes.STRING_DTYPE:
raise TypeError(f"Type {input_types[0]} is not string-like")
return dtypes.STRING_DTYPE


@dataclasses.dataclass(frozen=True)
Expand Down
6 changes: 6 additions & 0 deletions bigframes/operations/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import bigframes.dataframe as df
import bigframes.operations as ops
from bigframes.operations._op_converters import convert_index, convert_slice
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.base
import bigframes.series as series

Expand Down Expand Up @@ -295,6 +296,11 @@ def cat(
) -> series.Series:
return self._apply_binary_op(others, ops.strconcat_op, alignment=join)

def join(self, sep: str) -> series.Series:
return self._apply_unary_op(
ops.ArrayReduceOp(aggregation=agg_ops.StringAggOp(sep=sep))
)

def to_blob(self, connection: Optional[str] = None) -> series.Series:
"""Create a BigFrames Blob series from a series of URIs.

Expand Down
11 changes: 11 additions & 0 deletions tests/system/small/operations/test_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,3 +736,14 @@ def test_getitem_w_struct_array():
expected = bpd.Series(expected_data, dtype=bpd.ArrowDtype((pa_struct)))

assert_series_equal(result.to_pandas(), expected.to_pandas())


def test_string_join(session):
pd_series = pd.Series([["a", "b", "c"], ["100"], ["hello", "world"], []])
bf_series = session.read_pandas(pd_series)

pd_result = pd_series.str.join("--")
bf_result = bf_series.str.join("--").to_pandas()

pd_result = pd_result.astype("string[pyarrow]")
assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False)
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,22 @@ def visit_ArrayAggregate(self, op, *, arg, order_by, where):
expr = arg
return sge.IgnoreNulls(this=self.agg.array_agg(expr, where=where))

def visit_StringAgg(self, op, *, arg, sep, order_by, where):
if len(order_by) > 0:
expr = sge.Order(
this=arg,
expressions=[
# Avoid adding NULLS FIRST / NULLS LAST in SQL, which is
# unsupported in ARRAY_AGG by reconstructing the node as
# plain SQL text.
f"({order_column.args['this'].sql(dialect='bigquery')}) {'DESC' if order_column.args.get('desc') else 'ASC'}"
for order_column in order_by
],
)
else:
expr = arg
return self.agg.string_agg(expr, sep, where=where)

def visit_FirstNonNullValue(self, op, *, arg):
return sge.IgnoreNulls(this=sge.FirstValue(this=arg))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,20 @@ class ArrayAggregate(Filterable, Reduction):
@attribute
def dtype(self):
return dt.Array(self.arg.dtype)


@public
class StringAgg(Filterable, Reduction):
"""
Collects the elements of this expression into a string. Similar to
the ibis `GroupConcat`, but adds `order_by_*` parameter.
"""

arg: Column
sep: Value[dt.String]

order_by: VarTuple[Value] = ()

@attribute
def dtype(self):
return dt.string
40 changes: 40 additions & 0 deletions third_party/bigframes_vendored/pandas/core/strings/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,3 +1298,43 @@ def center(
bigframes.series.Series: Returns Series or Index with minimum number of char in object.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def join(self, sep: str):
"""
Join lists contained as elements in the Series/Index with passed delimiter.

If the elements of a Series are lists themselves, join the content of these
lists using the delimiter passed to the function.
This function is an equivalent to :meth:`str.join`.

**Examples:**

>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None
>>> import pandas as pd

Example with a list that contains non-string elements.

>>> s = bpd.Series([['lion', 'elephant', 'zebra'],
... ['dragon'],
... ['duck', 'swan', 'fish', 'guppy']])
>>> s
0 ['lion' 'elephant' 'zebra']
1 ['dragon']
2 ['duck' 'swan' 'fish' 'guppy']
dtype: list<item: string>[pyarrow]

>>> s.str.join('-')
0 lion-elephant-zebra
1 dragon
2 duck-swan-fish-guppy
dtype: string

Args:
sep (str):
Delimiter to use between list entries.

Returns:
bigframes.series.Series: The list entries concatenated by intervening occurrences of the delimiter.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)