Skip to content

Add lambda function and array related functions #3584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jun 10, 2025

Conversation

xinyual
Copy link
Contributor

@xinyual xinyual commented Apr 27, 2025

Description

This pr adds lambda function and array related functions. Calcite don't have array related functions so we need to implement by ourselves.
Now the logic for lambda is:
We will consider lambda function as a new PPL expression and parse it regularly to construct rexnode. To get return type for lambda expression, we need to firstly map the argument type in the calciteContext. For example, forall(array(1, 2, 3), x -> x > 0), then x -> INTEGER.
We also have an exception for reduce because the acc is the dynamic type.
The calcite/lin4j generate code according to the input type. For example, reduce(array(1.0,2.0 ,3.0), 0, (acc, x) -> acc + x). Ideally, we should map acc -> INTEGER, x -> DOUBLE. But if we map through this, the code of + would be plus(INTERGER acc, DOUBLE x), then after first apply, the acc would be double, then it will throw exception. Thus, we apply ANY to the acc and infer the return type in getReturnTypeInference

The function is aligned with https://github.com/opensearch-project/opensearch-spark/blob/main/docs/ppl-lang/functions/ppl-collection.md

TODO: nested object is not supported in lambda currently. It will be automatically supported when we support this. E.g. x -> x.a > 0

For detailed implementation and description:

Functions argument description return type implementation
ARRAY ARRAY(value1: ANY, value2:ANY, ...) create an array with input values. Currently we don't allow mixture types. We will infer a least restricted type, for example array(1, "demo") -> ["1", "demo"] ARRAY wrap SqlLibraryOperators.ARRAY
ARRAY_LENGTH ARRAY_LENGTH(value: ARRAY) return array length integer SqlLibraryOperators.ARRAY_LENGTH
FORALL forall(value:ARRAY, function: LAMBDA) check whether all element inside array can meet the lambda function. The function should also return boolean. boolean implement by ourselves since we cannot find matched built-in calcite one.
EXISTS exists(value:ARRAY, function: LAMBDA) check whether existing one of element inside array can meet the lambda function. The function should also return boolean. boolean implement by ourselves since we cannot find matched built-in calcite one.
FILTER filter(value:ARRAY, function: LAMBDA) filter the element in the array by the lambda function. The function should return boolean array implement by ourselves since we cannot find matched built-in calcite one.
TRANSFORM transform(value:ARRAY, function: LAMBDA) transform the element of array one by one using lambda. Transform can accept one more argument like (x, i) -> x + i, where i is the index of element in array. array implement by ourselves since we cannot find matched built-in calcite one.
REDUCE reduce(value:ARRAY, base_value:ANY, acc_function: LAMBDA)/reduce(value:ARRAY, base_value:ANY, acc_function: LAMBDA, reduce_function:LAMBDA) The function will first use acc_function to go through all element and return value to the acc. Then apply reduce function to the acc if exists. The acc_function's lambda format is (acc,x) -> ..., the reduce_function format is (acc) -> ... ANY, according to the lambda function implement by ourselves since we cannot find matched built-in calcite one.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]
#3575

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

xinyual added 11 commits April 23, 2025 16:49
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Comment on lines 63 to 76
switch (targetType) {
case DOUBLE:
List<Object> unboxed =
IntStream.range(0, args.length - 1)
.mapToObj(i -> ((Number) args[i]).doubleValue())
.collect(Collectors.toList());

return unboxed;
case FLOAT:
List<Object> unboxedFloat =
IntStream.range(0, args.length - 1)
.mapToObj(i -> ((Number) args[i]).floatValue())
.collect(Collectors.toList());
return unboxedFloat;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain why this special logic needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to internally convert it. Otherwise, the calcite will directly cast like DOUBLE to INTEGER, which will raise exception.

import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.expression.function.ImplementorUDF;

public class ArrayFunctionImpl extends ImplementorUDF {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we reuse SqlLibraryOperators.ARRAY? Again, please add a reason in PR description for any new added function why it must implement by ourselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already update the implementation. Wrap the implementation of SqlLibraryOperators.ARRAY

import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.opensearch.sql.expression.function.ImplementorUDF;

public class ExistsFunctionImpl extends ImplementorUDF {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we reuse SqlLibraryOperators.ARRAY_CONTAINS? please check all SqlLibraryOperators.ARRAY_* first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed. All SqlLibraryOperators.ARRAY_* is for array related function which is not related to lambda. We use SqlLibraryOperators .array_length

xinyual added 9 commits May 30, 2025 10:55
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
arguments,
node.getFuncName(),
lambdaNode.getType());
lambdaNode = analyze(arg, lambdaContext);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why analyze reduce twice?

Copy link
Contributor Author

@xinyual xinyual Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce is very special case since it will sometimes change the type of accumulator. For example, reduce([1.0, 2.0], 0, (acc, x) -> acc + x, acc -> acc * 10). Here the lambda (acc, x) -> acc + x firstly will find (integer, double) and then during the iteration, find (double, double). Current solution is we will first analyze once and find the return type is double, then use double as the expected input and cast the initial value of acc to the expected type.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it necessary to detect a non-any type in analyzing phase? What if the input list has type of ARRAY?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And does it make sense to infer the return type by using leastRestrictive(arg0. getComponentType(), arg1.getType() instead of analyzing twice.

Copy link
Contributor Author

@xinyual xinyual Jun 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it necessary to detect a non-any type in analyzing phase? What if the input list has type of ARRAY?

ANY would block the implementation in two parts: 1. The UDF part sometimes needs type to choose implementation 2. any would also be blocker for type checker. For example, we use calcite multiply, which only support numeric/interval * numeric/interval, any would throw exception when check the type.

And does it make sense to infer the return type by using leastRestrictive(arg0. getComponentType(), arg1.getType() instead of analyzing twice.

leastRestrictive(arg0. getComponentType(), arg1.getType() doesn't work here. For example, acc=0, (acc, x) -> acc + length(x) * 1.0 would return double, which means we need to cast acc base value to double. But leastRestrictive(integer, string) wouldn't be double.

Comment on lines +35 to +36
* @return We wrap it here to accept null since the original return type inference will generate
* non-nullable type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark array accept null either? Why we do this wrap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Spark array accept null.


Version: 3.1.0

Usage: ``array(value1, value2, value3...)`` create an array with input values. Currently we don't allow mixture types. We will infer a least restricted type, for example ``array(1, "demo")`` -> ["1", "demo"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: what is the reason to support infer a least restricted type instead of throwing exception?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a Limitation: after each Usage: to explain these functions only work with plugins.calcite.enabled=true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: what is the reason to support infer a least restricted type instead of throwing exception?

This is aligned with SPARK.

LantaoJin
LantaoJin previously approved these changes Jun 7, 2025
Comment on lines 401 to 402
// case DATETIME_INTERVAL ->
// SqlTypeName.INTERVAL_TYPES.stream().map(OpenSearchTypeFactory.TYPE_FACTORY::createSqlIntervalType).toList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how DATETIME_INTERVAL impact reduce?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a useless change, I will revert it.

Signed-off-by: xinyual <[email protected]>
LantaoJin
LantaoJin previously approved these changes Jun 9, 2025
arguments,
node.getFuncName(),
lambdaNode.getType());
lambdaNode = analyze(arg, lambdaContext);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it necessary to detect a non-any type in analyzing phase? What if the input list has type of ARRAY?

arguments,
node.getFuncName(),
lambdaNode.getType());
lambdaNode = analyze(arg, lambdaContext);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And does it make sense to infer the return type by using leastRestrictive(arg0. getComponentType(), arg1.getType() instead of analyzing twice.

import org.opensearch.sql.expression.function.UDFOperandMetadata;

// TODO: Support array of mixture types.
public class ArrayFunctionImpl extends ImplementorUDF {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be great to add description and a simple example for each function. It should only show the functionality of this function and as simple as possible. For example, array(1, 2, 3) -> [1, 2, 3] would be enough for array function.

It will improve the code readability for developer, different from the doc for customer. You can do it later in another PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already add for each. Please check.

xinyual added 2 commits June 10, 2025 13:48
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
@LantaoJin LantaoJin merged commit 122ae79 into opensearch-project:main Jun 10, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
calcite calcite migration releated
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants