2020-05-28 16:07:19 -05:00
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
|
# or more contributor license agreements. See the NOTICE file
|
|
|
|
|
# distributed with this work for additional information
|
|
|
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
|
|
|
# to you under the Apache License, Version 2.0 (the
|
|
|
|
|
# "License"); you may not use this file except in compliance
|
|
|
|
|
# with the License. You may obtain a copy of the License at
|
|
|
|
|
#
|
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
#
|
|
|
|
|
# Unless required by applicable law or agreed to in writing,
|
|
|
|
|
# software distributed under the License is distributed on an
|
|
|
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
|
# KIND, either express or implied. See the License for the
|
|
|
|
|
# specific language governing permissions and limitations
|
|
|
|
|
# under the License.
|
|
|
|
|
|
|
|
|
|
# cython: language_level = 3
|
|
|
|
|
|
|
|
|
|
from pyarrow.lib cimport *
|
|
|
|
|
from pyarrow.includes.common cimport *
|
|
|
|
|
from pyarrow.includes.libarrow cimport *
|
|
|
|
|
|
GH-35515: [C++][Python] Add non decomposable aggregation UDF (#35514)
### Rationale for this change
Non decomposable aggregation is aggregation that cannot be split into consume/merge/finalize. This is often when the logic rewritten with external python libraries (numpy, pandas, statmodels, etc) and those either cannot be decomposed or not worthy the effect (these are often one-off function instead of reusable one). This PR implements the support for non decomposable aggregation UDFs.
The major issue with non decomposable UDF is that the UDF needs to see all data at once, unlike scalar UDF where UDF only needs to see a batch at a time. This makes non decomposable not so useful as it is same as collect all the data to a pd.DataFrame and apply the UDF on it. However, one very application of non decomposable UDF is with segmented aggregation. To refresh, segmented aggregation works on ordered data and passed one logic chunk at a time (e.g., all data with the same date). With segmented aggregation and non decomposable aggregation UDF, the user can apply any custom aggregation logic over large stream of ordered data, with the memory overhead of a single segment.
### What changes are included in this PR?
This PR is currently WIP and not ready for review.
So far I have implemented the minimal amount of code to make a basic test working but needs clean up, error handling etc.
* [x] First round of self review
* [x] Second round of self review
* [x] Implement and test unary
* [x] Implement and test varargs
* [x] Implement and test Acero support with segmented aggregation
### Are these changes tested?
Added new test calling with compute and acero.
The compute tests calls the aggregation on the full array. The acero test callings the aggregation with segmented aggregation.
### Are there any user-facing changes?
* Closes: #35515
Lead-authored-by: Li Jin <ice.xelloss@gmail.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
2023-06-08 14:12:49 -04:00
|
|
|
cdef class UdfContext(_Weakrefable):
|
2022-05-03 09:46:11 +02:00
|
|
|
cdef:
|
GH-35515: [C++][Python] Add non decomposable aggregation UDF (#35514)
### Rationale for this change
Non decomposable aggregation is aggregation that cannot be split into consume/merge/finalize. This is often when the logic rewritten with external python libraries (numpy, pandas, statmodels, etc) and those either cannot be decomposed or not worthy the effect (these are often one-off function instead of reusable one). This PR implements the support for non decomposable aggregation UDFs.
The major issue with non decomposable UDF is that the UDF needs to see all data at once, unlike scalar UDF where UDF only needs to see a batch at a time. This makes non decomposable not so useful as it is same as collect all the data to a pd.DataFrame and apply the UDF on it. However, one very application of non decomposable UDF is with segmented aggregation. To refresh, segmented aggregation works on ordered data and passed one logic chunk at a time (e.g., all data with the same date). With segmented aggregation and non decomposable aggregation UDF, the user can apply any custom aggregation logic over large stream of ordered data, with the memory overhead of a single segment.
### What changes are included in this PR?
This PR is currently WIP and not ready for review.
So far I have implemented the minimal amount of code to make a basic test working but needs clean up, error handling etc.
* [x] First round of self review
* [x] Second round of self review
* [x] Implement and test unary
* [x] Implement and test varargs
* [x] Implement and test Acero support with segmented aggregation
### Are these changes tested?
Added new test calling with compute and acero.
The compute tests calls the aggregation on the full array. The acero test callings the aggregation with segmented aggregation.
### Are there any user-facing changes?
* Closes: #35515
Lead-authored-by: Li Jin <ice.xelloss@gmail.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
2023-06-08 14:12:49 -04:00
|
|
|
CUdfContext c_context
|
2022-05-03 09:46:11 +02:00
|
|
|
|
GH-35515: [C++][Python] Add non decomposable aggregation UDF (#35514)
### Rationale for this change
Non decomposable aggregation is aggregation that cannot be split into consume/merge/finalize. This is often when the logic rewritten with external python libraries (numpy, pandas, statmodels, etc) and those either cannot be decomposed or not worthy the effect (these are often one-off function instead of reusable one). This PR implements the support for non decomposable aggregation UDFs.
The major issue with non decomposable UDF is that the UDF needs to see all data at once, unlike scalar UDF where UDF only needs to see a batch at a time. This makes non decomposable not so useful as it is same as collect all the data to a pd.DataFrame and apply the UDF on it. However, one very application of non decomposable UDF is with segmented aggregation. To refresh, segmented aggregation works on ordered data and passed one logic chunk at a time (e.g., all data with the same date). With segmented aggregation and non decomposable aggregation UDF, the user can apply any custom aggregation logic over large stream of ordered data, with the memory overhead of a single segment.
### What changes are included in this PR?
This PR is currently WIP and not ready for review.
So far I have implemented the minimal amount of code to make a basic test working but needs clean up, error handling etc.
* [x] First round of self review
* [x] Second round of self review
* [x] Implement and test unary
* [x] Implement and test varargs
* [x] Implement and test Acero support with segmented aggregation
### Are these changes tested?
Added new test calling with compute and acero.
The compute tests calls the aggregation on the full array. The acero test callings the aggregation with segmented aggregation.
### Are there any user-facing changes?
* Closes: #35515
Lead-authored-by: Li Jin <ice.xelloss@gmail.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
2023-06-08 14:12:49 -04:00
|
|
|
cdef void init(self, const CUdfContext& c_context)
|
2020-05-28 16:07:19 -05:00
|
|
|
|
2022-12-23 09:40:40 +01:00
|
|
|
|
2020-07-29 12:24:36 +02:00
|
|
|
cdef class FunctionOptions(_Weakrefable):
|
2021-06-30 14:23:23 -04:00
|
|
|
cdef:
|
2022-01-18 21:39:53 +01:00
|
|
|
shared_ptr[CFunctionOptions] wrapped
|
2020-05-28 16:07:19 -05:00
|
|
|
|
|
|
|
|
cdef const CFunctionOptions* get_options(self) except NULL
|
2022-01-18 21:39:53 +01:00
|
|
|
cdef void init(self, const shared_ptr[CFunctionOptions]& sp)
|
|
|
|
|
|
|
|
|
|
cdef inline shared_ptr[CFunctionOptions] unwrap(self)
|
2022-01-14 14:17:01 +01:00
|
|
|
|
|
|
|
|
|
2022-12-23 09:40:40 +01:00
|
|
|
cdef class _SortOptions(FunctionOptions):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
2022-01-14 14:17:01 +01:00
|
|
|
cdef CExpression _bind(Expression filter, Schema schema) except *
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cdef class Expression(_Weakrefable):
|
|
|
|
|
|
|
|
|
|
cdef:
|
|
|
|
|
CExpression expr
|
|
|
|
|
|
|
|
|
|
cdef void init(self, const CExpression& sp)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
cdef wrap(const CExpression& sp)
|
|
|
|
|
|
|
|
|
|
cdef inline CExpression unwrap(self)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
cdef Expression _expr_or_scalar(object expr)
|
2022-03-29 17:59:04 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
cdef CExpression _true
|
2023-03-03 13:46:53 +01:00
|
|
|
|
|
|
|
|
cdef CFieldRef _ensure_field_ref(value) except *
|
2023-03-22 14:56:56 +01:00
|
|
|
|
2025-02-05 16:03:18 +01:00
|
|
|
cdef vector[CSortKey] unwrap_sort_keys(sort_keys, allow_str=*) except *
|
|
|
|
|
|
2023-03-22 14:56:56 +01:00
|
|
|
cdef CSortOrder unwrap_sort_order(order) except *
|
|
|
|
|
|
|
|
|
|
cdef CNullPlacement unwrap_null_placement(null_placement) except *
|