[SPARK-55726][PYTHON][TEST] Add ASV microbenchmark for grouped map pandas UDF#54533
[SPARK-55726][PYTHON][TEST] Add ASV microbenchmark for grouped map pandas UDF#54533Yicong-Huang wants to merge 4 commits intoapache:masterfrom
Conversation
c08abf5 to
6c7997f
Compare
python/benchmarks/__init__.py
Outdated
| import os | ||
| import sys | ||
|
|
||
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
There was a problem hiding this comment.
what is this line for?
There was a problem hiding this comment.
asv framework cannot find pyspark. this line uses relative path of python/benchmarks folder to pyspark PYTHONPATH. I am not sure if there is a better way to do so. The existing ASV benchmarks don’t import PySpark, they only exercise pyarrow and pandas (both installed in the virtualenv), so they haven’t run into this issue.
There was a problem hiding this comment.
can we change the path in bench_eval_type.py?
There was a problem hiding this comment.
I think we will put more benchmark against pyspark code in the future, so leaving it in the __init__ can help it to be shared?
There was a problem hiding this comment.
Or, we could move things under python/benchmarks/pyspark/ so that other pyarrow/pandas benchmarks won't be affected.
There was a problem hiding this comment.
@fangchenli any suggested way to config it?
There was a problem hiding this comment.
@fangchenli any suggested way to config it?
If --python=same is used and pyspark is installed in the current env, you don't need to modify the path here. Another way is to remove --python=same, asv would create a new venv and install pyspark there.
There was a problem hiding this comment.
I think the problem is I want asv to benchmark the current source code instead of the pyspark installed from pip.
There was a problem hiding this comment.
I mean installing the source code in the current env.
First, install the current source code in editable mode via pip install -e python/packaging/classic
If you do pip list or conda list, you should see something like
pyspark 4.2.0.dev0 ...../spark/python/packaging/classic
Then, when you set --python=same, pyspark is discoverable to asv.
Without --python=same, asv would create a new venv, build a pyspark wheel from the current source code, then install the wheel in the new venv and run the benchmarks. The build and install commands are here:
spark/python/benchmarks/asv.conf.json
Lines 14 to 19 in 168e5cf
There was a problem hiding this comment.
Thanks! I've tested the editable mode and the without --python=same, both method work! I've removed the hack in __init__.py.
| eval_type = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF | ||
|
|
||
| # ---- varying group size (float data, identity UDF) ---- | ||
| for name, (rows_per_group, n_cols, num_groups) in { |
There was a problem hiding this comment.
does it also split big group? (following spark.sql.execution.arrow.maxRecordsPerBatch which defaults to 10000`)
There was a problem hiding this comment.
ah good point. as I am simulating the input, those simulations current do not respect these configs. I can update it to split so that we can benchmark the concatenation logic in side eval type as well. Any other configs that I should consider in such as simulation?
There was a problem hiding this comment.
simulated split and default at 10000. benchmark results updated. large group would take a bit longer.
There was a problem hiding this comment.
I am afraid spark.sql.execution.arrow.maxRecordsPerBatch doesn't take effect in Python side, the batching should happen in the JVM side.
python/benchmarks/bench_eval_type.py
Outdated
| def setup(self): | ||
| eval_type = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF | ||
| runner_conf = { | ||
| "spark.sql.execution.arrow.maxRecordsPerBatch": self._MAX_RECORDS_PER_BATCH |
There was a problem hiding this comment.
I suspect in the python side, SQL_GROUPED_MAP_PANDAS_UDF doesn't have to respect maxRecordsPerBatch, or maxRecordsPerBatch doesn't take effect.
Because the batching actually happens in the JVM side.
There was a problem hiding this comment.
understood. I see this only affects TransformWithState related serializer. I will remove this from runner_conf but still simulate the split.
There was a problem hiding this comment.
TBH, I am still not 100% sure whether we need to simulate the split
There was a problem hiding this comment.
I think simulating is needed. during my refactoring a big performance diff is on concatenating batches, or iterating batches. So adding the split is more real.
9677aae to
60054db
Compare
What changes were proposed in this pull request?
Add an ASV microbenchmark for
SQL_GROUPED_MAP_PANDAS_UDFinpython/benchmarks/bench_eval_type.py.The benchmark simulates the full
worker.pypipeline by constructing the complete binary protocol thatmain(infile, outfile)expects.Large groups (100k rows/group) are split into Arrow sub-batches of 10k rows via
spark.sql.execution.arrow.maxRecordsPerBatch(default), passed through RunnerConf, mirroring the JVM-side behaviour.Why are the changes needed?
This is part of SPARK-55724 to add per-eval-type microbenchmarks for PySpark UDF worker pipelines. These benchmarks help catch performance regressions in the Python-side serialization/deserialization path (e.g., SPARK-55459 fixed a 3x regression in
applyInPandas).Does this PR introduce any user-facing change?
No.
How was this patch tested?
asv run --python=same -a repeat='(5,10,10.0)'on Apple M4 Max:Was this patch authored or co-authored using generative AI tooling?
No.