-
Notifications
You must be signed in to change notification settings - Fork 156
Add lambda function and array related functions #3584
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java
Show resolved
Hide resolved
switch (targetType) { | ||
case DOUBLE: | ||
List<Object> unboxed = | ||
IntStream.range(0, args.length - 1) | ||
.mapToObj(i -> ((Number) args[i]).doubleValue()) | ||
.collect(Collectors.toList()); | ||
|
||
return unboxed; | ||
case FLOAT: | ||
List<Object> unboxedFloat = | ||
IntStream.range(0, args.length - 1) | ||
.mapToObj(i -> ((Number) args[i]).floatValue()) | ||
.collect(Collectors.toList()); | ||
return unboxedFloat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you explain why this special logic needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to internally convert it. Otherwise, the calcite will directly cast like DOUBLE to INTEGER, which will raise exception.
import org.apache.calcite.sql.type.SqlTypeName; | ||
import org.opensearch.sql.expression.function.ImplementorUDF; | ||
|
||
public class ArrayFunctionImpl extends ImplementorUDF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we reuse SqlLibraryOperators.ARRAY
? Again, please add a reason in PR description for any new added function why it must implement by ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already update the implementation. Wrap the implementation of SqlLibraryOperators.ARRAY
import org.apache.calcite.sql.type.SqlReturnTypeInference; | ||
import org.opensearch.sql.expression.function.ImplementorUDF; | ||
|
||
public class ExistsFunctionImpl extends ImplementorUDF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we reuse SqlLibraryOperators.ARRAY_CONTAINS
? please check all SqlLibraryOperators.ARRAY_*
first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed. All SqlLibraryOperators.ARRAY_* is for array related function which is not related to lambda. We use SqlLibraryOperators .array_length
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
arguments, | ||
node.getFuncName(), | ||
lambdaNode.getType()); | ||
lambdaNode = analyze(arg, lambdaContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why analyze reduce
twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduce is very special case since it will sometimes change the type of accumulator. For example, reduce([1.0, 2.0], 0, (acc, x) -> acc + x, acc -> acc * 10). Here the lambda (acc, x) -> acc + x firstly will find (integer, double) and then during the iteration, find (double, double). Current solution is we will first analyze once and find the return type is double, then use double as the expected input and cast the initial value of acc to the expected type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it necessary to detect a non-any type in analyzing phase? What if the input list has type of ARRAY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And does it make sense to infer the return type by using leastRestrictive(arg0. getComponentType(), arg1.getType()
instead of analyzing twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it necessary to detect a non-any type in analyzing phase? What if the input list has type of ARRAY?
ANY would block the implementation in two parts: 1. The UDF part sometimes needs type to choose implementation 2. any would also be blocker for type checker. For example, we use calcite multiply, which only support numeric/interval * numeric/interval, any would throw exception when check the type.
And does it make sense to infer the return type by using
leastRestrictive(arg0. getComponentType(), arg1.getType()
instead of analyzing twice.
leastRestrictive(arg0. getComponentType(), arg1.getType() doesn't work here. For example, acc=0, (acc, x) -> acc + length(x) * 1.0 would return double, which means we need to cast acc base value to double. But leastRestrictive(integer, string) wouldn't be double.
* @return We wrap it here to accept null since the original return type inference will generate | ||
* non-nullable type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Spark array accept null either? Why we do this wrap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Spark array accept null.
|
||
Version: 3.1.0 | ||
|
||
Usage: ``array(value1, value2, value3...)`` create an array with input values. Currently we don't allow mixture types. We will infer a least restricted type, for example ``array(1, "demo")`` -> ["1", "demo"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: what is the reason to support infer a least restricted type
instead of throwing exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a Limitation:
after each Usage:
to explain these functions only work with plugins.calcite.enabled=true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: what is the reason to support
infer a least restricted type
instead of throwing exception?
This is aligned with SPARK.
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
// case DATETIME_INTERVAL -> | ||
// SqlTypeName.INTERVAL_TYPES.stream().map(OpenSearchTypeFactory.TYPE_FACTORY::createSqlIntervalType).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how DATETIME_INTERVAL impact reduce
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a useless change, I will revert it.
Signed-off-by: xinyual <[email protected]>
arguments, | ||
node.getFuncName(), | ||
lambdaNode.getType()); | ||
lambdaNode = analyze(arg, lambdaContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it necessary to detect a non-any type in analyzing phase? What if the input list has type of ARRAY?
arguments, | ||
node.getFuncName(), | ||
lambdaNode.getType()); | ||
lambdaNode = analyze(arg, lambdaContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And does it make sense to infer the return type by using leastRestrictive(arg0. getComponentType(), arg1.getType()
instead of analyzing twice.
import org.opensearch.sql.expression.function.UDFOperandMetadata; | ||
|
||
// TODO: Support array of mixture types. | ||
public class ArrayFunctionImpl extends ImplementorUDF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be great to add description and a simple example for each function. It should only show the functionality of this function and as simple as possible. For example, array(1, 2, 3) -> [1, 2, 3]
would be enough for array function.
It will improve the code readability for developer, different from the doc for customer. You can do it later in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already add for each. Please check.
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Description
This pr adds lambda function and array related functions. Calcite don't have array related functions so we need to implement by ourselves.
Now the logic for lambda is:
We will consider lambda function as a new PPL expression and parse it regularly to construct rexnode. To get return type for lambda expression, we need to firstly map the argument type in the calciteContext. For example, forall(array(1, 2, 3), x -> x > 0), then x -> INTEGER.
We also have an exception for reduce because the acc is the dynamic type.
The calcite/lin4j generate code according to the input type. For example, reduce(array(1.0,2.0 ,3.0), 0, (acc, x) -> acc + x). Ideally, we should map acc -> INTEGER, x -> DOUBLE. But if we map through this, the code of + would be
plus(INTERGER acc, DOUBLE x)
, then after first apply, the acc would be double, then it will throw exception. Thus, we apply ANY to the acc and infer the return type ingetReturnTypeInference
The function is aligned with https://github.com/opensearch-project/opensearch-spark/blob/main/docs/ppl-lang/functions/ppl-collection.md
TODO: nested object is not supported in lambda currently. It will be automatically supported when we support this. E.g. x -> x.a > 0
For detailed implementation and description:
SqlLibraryOperators.ARRAY
SqlLibraryOperators.ARRAY_LENGTH
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
#3575
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.