# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import unittest from inspect import getmembers, isfunction from pyspark.util import is_remote_only from pyspark.errors import PySparkTypeError, PySparkValueError from pyspark.sql.types import ( _drop_metadata, StringType, StructType, StructField, ArrayType, IntegerType, ) from pyspark.testing import assertDataFrameEqual from pyspark.testing.pandasutils import PandasOnSparkTestUtils from pyspark.testing.connectutils import ReusedMixedTestCase, should_test_connect if should_test_connect: from pyspark.sql.connect.column import Column from pyspark.sql import functions as SF from pyspark.sql.window import Window as SW from pyspark.sql.connect import functions as CF from pyspark.sql.connect.window import Window as CW from pyspark.errors.exceptions.connect import AnalysisException, SparkConnectException @unittest.skipIf(is_remote_only(), "Requires JVM access") class SparkConnectFunctionTests(ReusedMixedTestCase, PandasOnSparkTestUtils): """These test cases exercise the interface to the proto plan generation but do not call Spark.""" def test_count_star(self): # SPARK-42099: test count(*), count(col(*)) and count(expr(*)) data = [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")] cdf = self.connect.createDataFrame(data, schema=["age", "name"]) sdf = self.spark.createDataFrame(data, schema=["age", "name"]) self.assertEqual( cdf.select(CF.count(CF.expr("*")), CF.count(cdf.age)).collect(), sdf.select(SF.count(SF.expr("*")), SF.count(sdf.age)).collect(), ) self.assertEqual( cdf.select(CF.count(CF.col("*")), CF.count(cdf.age)).collect(), sdf.select(SF.count(SF.col("*")), SF.count(sdf.age)).collect(), ) self.assertEqual( cdf.select(CF.count("*"), CF.count(cdf.age)).collect(), sdf.select(SF.count("*"), SF.count(sdf.age)).collect(), ) self.assertEqual( cdf.groupby("name").agg({"*": "count"}).sort("name").collect(), sdf.groupby("name").agg({"*": "count"}).sort("name").collect(), ) self.assertEqual( cdf.groupby("name") .agg(CF.count(CF.expr("*")), CF.count(cdf.age)) .sort("name") .collect(), sdf.groupby("name") .agg(SF.count(SF.expr("*")), SF.count(sdf.age)) .sort("name") .collect(), ) self.assertEqual( cdf.groupby("name") .agg(CF.count(CF.col("*")), CF.count(cdf.age)) .sort("name") .collect(), sdf.groupby("name") .agg(SF.count(SF.col("*")), SF.count(sdf.age)) .sort("name") .collect(), ) self.assertEqual( cdf.groupby("name").agg(CF.count("*"), CF.count(cdf.age)).sort("name").collect(), sdf.groupby("name").agg(SF.count("*"), SF.count(sdf.age)).sort("name").collect(), ) def test_broadcast(self): query = """ SELECT * FROM VALUES (0, float("NAN"), NULL), (1, NULL, 2.0), (2, 2.1, 3.5) AS tab(a, b, c) """ # +---+----+----+ # | a| b| c| # +---+----+----+ # | 0| NaN|NULL| # | 1|NULL| 2.0| # | 2| 2.1| 3.5| # +---+----+----+ cdf = self.connect.sql(query) cdf1 = cdf.select(cdf.a, "b") cdf2 = cdf.select(cdf.a, "c") sdf = self.spark.sql(query) sdf1 = sdf.select(sdf.a, "b") sdf2 = sdf.select(sdf.a, "c") self.assert_eq( cdf1.join(cdf2, on="a").toPandas(), sdf1.join(sdf2, on="a").toPandas(), ) self.assert_eq( cdf1.join(CF.broadcast(cdf2), on="a").toPandas(), sdf1.join(SF.broadcast(sdf2), on="a").toPandas(), ) self.assert_eq( CF.broadcast(cdf1).join(cdf2, on="a").toPandas(), SF.broadcast(sdf1).join(sdf2, on="a").toPandas(), ) self.assert_eq( CF.broadcast(cdf1).join(CF.broadcast(cdf2), on="a").toPandas(), SF.broadcast(sdf1).join(SF.broadcast(sdf2), on="a").toPandas(), ) with self.assertRaises(PySparkTypeError) as pe: CF.broadcast(cdf.a) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "DataFrame", "arg_name": "df", "arg_type": "Column", }, ) def test_normal_functions(self): query = """ SELECT * FROM VALUES (0, float("NAN"), NULL), (1, NULL, 2.0), (2, 2.1, 3.5) AS tab(a, b, c) """ # +---+----+----+ # | a| b| c| # +---+----+----+ # | 0| NaN|NULL| # | 1|NULL| 2.0| # | 2| 2.1| 3.5| # +---+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) self.assert_eq( cdf.select(CF.bitwise_not(cdf.a)).toPandas(), sdf.select(SF.bitwise_not(sdf.a)).toPandas(), ) self.assert_eq( cdf.select(CF.bitwiseNOT(cdf.a)).toPandas(), sdf.select(SF.bitwiseNOT(sdf.a)).toPandas(), ) self.assert_eq( cdf.select(CF.coalesce(cdf.a, "b", cdf.c)).toPandas(), sdf.select(SF.coalesce(sdf.a, "b", sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.expr("a + b - c")).toPandas(), sdf.select(SF.expr("a + b - c")).toPandas(), ) self.assert_eq( cdf.select(CF.greatest(cdf.a, "b", cdf.c)).toPandas(), sdf.select(SF.greatest(sdf.a, "b", sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.isnan(cdf.a), CF.isnan("b")).toPandas(), sdf.select(SF.isnan(sdf.a), SF.isnan("b")).toPandas(), ) self.assert_eq( cdf.select(CF.isnull(cdf.a), CF.isnull("b")).toPandas(), sdf.select(SF.isnull(sdf.a), SF.isnull("b")).toPandas(), ) self.assert_eq( cdf.select(CF.input_file_name()).toPandas(), sdf.select(SF.input_file_name()).toPandas(), ) self.assert_eq( cdf.select(CF.least(cdf.a, "b", cdf.c)).toPandas(), sdf.select(SF.least(sdf.a, "b", sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.monotonically_increasing_id()).toPandas(), sdf.select(SF.monotonically_increasing_id()).toPandas(), ) self.assert_eq( cdf.select(CF.nanvl("b", cdf.c)).toPandas(), sdf.select(SF.nanvl("b", sdf.c)).toPandas(), ) # Can not compare the values due to the random seed self.assertEqual( cdf.select(CF.rand()).count(), sdf.select(SF.rand()).count(), ) self.assert_eq( cdf.select(CF.rand(100)).toPandas(), sdf.select(SF.rand(100)).toPandas(), ) # Can not compare the values due to the random seed self.assertEqual( cdf.select(CF.randn()).count(), sdf.select(SF.randn()).count(), ) self.assert_eq( cdf.select(CF.randn(100)).toPandas(), sdf.select(SF.randn(100)).toPandas(), ) self.assert_eq( cdf.select(CF.spark_partition_id()).toPandas(), sdf.select(SF.spark_partition_id()).toPandas(), ) def test_when_otherwise(self): query = """ SELECT * FROM VALUES (0, float("NAN"), NULL), (1, NULL, 2.0), (2, 2.1, 3.5), (3, 3.1, float("NAN")) AS tab(a, b, c) """ # +---+----+----+ # | a| b| c| # +---+----+----+ # | 0| NaN|NULL| # | 1|NULL| 2.0| # | 2| 2.1| 3.5| # | 3| 3.1| NaN| # +---+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) self.assert_eq( cdf.select(CF.when(cdf.a == 0, 1.0).otherwise(2.0)).toPandas(), sdf.select(SF.when(sdf.a == 0, 1.0).otherwise(2.0)).toPandas(), ) self.assert_eq( cdf.select(CF.when(cdf.a < 1, cdf.b).otherwise(cdf.c)).toPandas(), sdf.select(SF.when(sdf.a < 1, sdf.b).otherwise(sdf.c)).toPandas(), ) self.assert_eq( cdf.select( CF.when(cdf.a == 0, 1.0) .when(CF.col("a") == 1, 2.0) .when(cdf.a == 2, -1.0) .otherwise(cdf.c) ).toPandas(), sdf.select( SF.when(sdf.a == 0, 1.0) .when(SF.col("a") == 1, 2.0) .when(sdf.a == 2, -1.0) .otherwise(sdf.c) ).toPandas(), ) self.assert_eq( cdf.select( CF.when(cdf.a < cdf.b, 1.0) .when(CF.col("a") == 1, CF.abs("c") + cdf.b) .otherwise(cdf.c + CF.col("a")) ).toPandas(), sdf.select( SF.when(sdf.a < sdf.b, 1.0) .when(SF.col("a") == 1, SF.abs("c") + sdf.b) .otherwise(sdf.c + SF.col("a")) ).toPandas(), ) # when without otherwise self.assert_eq( cdf.select(CF.when(cdf.a < 1, cdf.b)).toPandas(), sdf.select(SF.when(sdf.a < 1, sdf.b)).toPandas(), ) self.assert_eq( cdf.select( CF.when(cdf.a == 0, 1.0) .when(CF.col("a") == 1, cdf.b + CF.col("c")) .when(cdf.a == 2, CF.abs(cdf.b)) ).toPandas(), sdf.select( SF.when(sdf.a == 0, 1.0) .when(SF.col("a") == 1, sdf.b + SF.col("c")) .when(sdf.a == 2, SF.abs(sdf.b)) ).toPandas(), ) # check error with self.assertRaisesRegex( TypeError, "when.* can only be applied on a Column previously generated by when.* function", ): cdf.a.when(cdf.a == 0, 1.0) with self.assertRaisesRegex( TypeError, "when.* can only be applied on a Column previously generated by when.* function", ): CF.col("c").when(cdf.a == 0, 1.0) with self.assertRaisesRegex( TypeError, "otherwise.* can only be applied on a Column previously generated by when", ): cdf.a.otherwise(1.0) with self.assertRaisesRegex( TypeError, "otherwise.* can only be applied on a Column previously generated by when", ): CF.col("c").otherwise(1.0) with self.assertRaisesRegex( TypeError, "otherwise.* can only be applied once on a Column previously generated by when", ): CF.when(cdf.a == 0, 1.0).otherwise(1.0).otherwise(1.0) with self.assertRaises(PySparkTypeError) as pe: CF.when(True, 1.0).otherwise(1.0) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "Column", "arg_name": "condition", "arg_type": "bool", }, ) def test_sorting_functions_with_column(self): funs = [ CF.asc_nulls_first, CF.asc_nulls_last, CF.desc_nulls_first, CF.desc_nulls_last, ] exprs = [CF.col("x"), "x"] for fun in funs: for _expr in exprs: res = fun(_expr) self.assertIsInstance(res, Column) self.assertIn(f"""{fun.__name__.replace("_", " ").upper()}'""", str(res)) for _expr in exprs: res = CF.asc(_expr) self.assertIsInstance(res, Column) self.assertIn("""ASC NULLS FIRST'""", str(res)) for _expr in exprs: res = CF.desc(_expr) self.assertIsInstance(res, Column) self.assertIn("""DESC NULLS LAST'""", str(res)) def test_sort_with_nulls_order(self): query = """ SELECT * FROM VALUES (false, 1, NULL), (true, NULL, 2.0), (NULL, 3, 3.0) AS tab(a, b, c) """ # +-----+----+----+ # | a| b| c| # +-----+----+----+ # |false| 1|NULL| # | true|NULL| 2.0| # | NULL| 3| 3.0| # +-----+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) for c in ["a", "b", "c"]: self.assert_eq( cdf.orderBy(CF.asc(c)).toPandas(), sdf.orderBy(SF.asc(c)).toPandas(), ) self.assert_eq( cdf.orderBy(CF.asc_nulls_first(c)).toPandas(), sdf.orderBy(SF.asc_nulls_first(c)).toPandas(), ) self.assert_eq( cdf.orderBy(CF.asc_nulls_last(c)).toPandas(), sdf.orderBy(SF.asc_nulls_last(c)).toPandas(), ) self.assert_eq( cdf.orderBy(CF.desc(c)).toPandas(), sdf.orderBy(SF.desc(c)).toPandas(), ) self.assert_eq( cdf.orderBy(CF.desc_nulls_first(c)).toPandas(), sdf.orderBy(SF.desc_nulls_first(c)).toPandas(), ) self.assert_eq( cdf.orderBy(CF.desc_nulls_last(c)).toPandas(), sdf.orderBy(SF.desc_nulls_last(c)).toPandas(), ) def test_math_functions(self): query = """ SELECT * FROM VALUES (false, 1, NULL), (true, NULL, 2.0), (NULL, 3, 3.5) AS tab(a, b, c) """ # +-----+----+----+ # | a| b| c| # +-----+----+----+ # |false| 1|NULL| # | true|NULL| 2.0| # | NULL| 3| 3.5| # +-----+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) for cfunc, sfunc in [ (CF.abs, SF.abs), (CF.acos, SF.acos), (CF.acosh, SF.acosh), (CF.asin, SF.asin), (CF.asinh, SF.asinh), (CF.atan, SF.atan), (CF.atanh, SF.atanh), (CF.bin, SF.bin), (CF.cbrt, SF.cbrt), (CF.ceil, SF.ceil), (CF.cos, SF.cos), (CF.cosh, SF.cosh), (CF.cot, SF.cot), (CF.csc, SF.csc), (CF.degrees, SF.degrees), (CF.toDegrees, SF.toDegrees), (CF.exp, SF.exp), (CF.expm1, SF.expm1), (CF.factorial, SF.factorial), (CF.floor, SF.floor), (CF.hex, SF.hex), (CF.log, SF.log), (CF.log10, SF.log10), (CF.log1p, SF.log1p), (CF.log2, SF.log2), (CF.radians, SF.radians), (CF.toRadians, SF.toRadians), (CF.rint, SF.rint), (CF.sec, SF.sec), (CF.signum, SF.signum), (CF.sin, SF.sin), (CF.sinh, SF.sinh), (CF.sqrt, SF.sqrt), (CF.tan, SF.tan), (CF.tanh, SF.tanh), (CF.unhex, SF.unhex), ]: self.assert_eq( cdf.select(cfunc("b"), cfunc(cdf.c)).toPandas(), sdf.select(sfunc("b"), sfunc(sdf.c)).toPandas(), ) # test log(arg1, arg2) self.assert_eq( cdf.select(CF.log(1.1, "b"), CF.log(1.2, cdf.c)).toPandas(), sdf.select(SF.log(1.1, "b"), SF.log(1.2, sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.atan2("b", cdf.c)).toPandas(), sdf.select(SF.atan2("b", sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.bround("b", 1)).toPandas(), sdf.select(SF.bround("b", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.conv("b", 2, 16)).toPandas(), sdf.select(SF.conv("b", 2, 16)).toPandas(), ) self.assert_eq( cdf.select(CF.hypot("b", cdf.c)).toPandas(), sdf.select(SF.hypot("b", sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.pmod("b", cdf.c)).toPandas(), sdf.select(SF.pmod("b", sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.pow("b", cdf.c)).toPandas(), sdf.select(SF.pow("b", sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.round("b", 1)).toPandas(), sdf.select(SF.round("b", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.shiftleft("b", 1)).toPandas(), sdf.select(SF.shiftleft("b", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.shiftLeft("b", 1)).toPandas(), sdf.select(SF.shiftLeft("b", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.shiftright("b", 1)).toPandas(), sdf.select(SF.shiftright("b", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.shiftRight("b", 1)).toPandas(), sdf.select(SF.shiftRight("b", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.shiftrightunsigned("b", 1)).toPandas(), sdf.select(SF.shiftrightunsigned("b", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.shiftRightUnsigned("b", 1)).toPandas(), sdf.select(SF.shiftRightUnsigned("b", 1)).toPandas(), ) def test_aggregation_functions(self): query = """ SELECT * FROM VALUES (0, float("NAN"), NULL), (1, NULL, 2.0), (1, 2.1, 3.5), (0, 0.5, 1.0) AS tab(a, b, c) """ # +---+----+----+ # | a| b| c| # +---+----+----+ # | 0| NaN|NULL| # | 1|NULL| 2.0| # | 1| 2.1| 3.5| # | 0| 0.5| 1.0| # +---+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) for cfunc, sfunc in [ (CF.approx_count_distinct, SF.approx_count_distinct), (CF.approxCountDistinct, SF.approxCountDistinct), (CF.avg, SF.avg), (CF.listagg, SF.listagg), (CF.listagg_distinct, SF.listagg_distinct), (CF.string_agg, SF.string_agg), (CF.string_agg_distinct, SF.string_agg_distinct), (CF.count, SF.count), (CF.first, SF.first), (CF.kurtosis, SF.kurtosis), (CF.last, SF.last), (CF.max, SF.max), (CF.mean, SF.mean), (CF.median, SF.median), (CF.min, SF.min), (CF.mode, SF.mode), (CF.product, SF.product), (CF.skewness, SF.skewness), (CF.stddev, SF.stddev), (CF.stddev_pop, SF.stddev_pop), (CF.stddev_samp, SF.stddev_samp), (CF.sum, SF.sum), (CF.sum_distinct, SF.sum_distinct), (CF.sumDistinct, SF.sumDistinct), (CF.var_pop, SF.var_pop), (CF.var_samp, SF.var_samp), (CF.variance, SF.variance), ]: self.assert_eq( cdf.select(cfunc("b"), cfunc(cdf.c)).toPandas(), sdf.select(sfunc("b"), sfunc(sdf.c)).toPandas(), check_exact=False, ) self.assert_eq( cdf.groupBy("a").agg(cfunc("b"), cfunc(cdf.c)).toPandas(), sdf.groupBy("a").agg(sfunc("b"), sfunc(sdf.c)).toPandas(), check_exact=False, ) for cfunc, sfunc in [ (CF.collect_list, SF.collect_list), (CF.collect_set, SF.collect_set), ]: self.assert_eq( cdf.select(CF.sort_array(cfunc("b")), CF.sort_array(cfunc(cdf.c))).toPandas(), sdf.select(SF.sort_array(sfunc("b")), SF.sort_array(sfunc(sdf.c))).toPandas(), check_exact=False, ) self.assert_eq( cdf.groupBy("a") .agg(CF.sort_array(cfunc("b")), CF.sort_array(cfunc(cdf.c))) .toPandas(), sdf.groupBy("a") .agg(SF.sort_array(sfunc("b")), SF.sort_array(sfunc(sdf.c))) .toPandas(), check_exact=False, ) for cfunc, sfunc in [ (CF.corr, SF.corr), (CF.covar_pop, SF.covar_pop), (CF.covar_samp, SF.covar_samp), (CF.max_by, SF.max_by), (CF.min_by, SF.min_by), ]: self.assert_eq( cdf.select(cfunc(cdf.b, "c")).toPandas(), sdf.select(sfunc(sdf.b, "c")).toPandas(), ) self.assert_eq( cdf.groupBy("a").agg(cfunc(cdf.b, "c")).orderBy("a").toPandas(), sdf.groupBy("a").agg(sfunc(sdf.b, "c")).orderBy("a").toPandas(), ) # test max_by and min_by with k parameter self.assert_eq( cdf.select(CF.max_by(cdf.b, "c", 2)).toPandas(), sdf.select(SF.max_by(sdf.b, "c", 2)).toPandas(), ) self.assert_eq( cdf.select(CF.min_by(cdf.b, "c", 2)).toPandas(), sdf.select(SF.min_by(sdf.b, "c", 2)).toPandas(), ) # test grouping self.assert_eq( cdf.cube("a").agg(CF.grouping("a"), CF.sum("c")).orderBy("a").toPandas(), sdf.cube("a").agg(SF.grouping("a"), SF.sum("c")).orderBy("a").toPandas(), ) # test grouping_id self.assert_eq( cdf.cube("a").agg(CF.grouping_id(), CF.sum("c")).orderBy("a").toPandas(), sdf.cube("a").agg(SF.grouping_id(), SF.sum("c")).orderBy("a").toPandas(), ) # test percentile_approx self.assert_eq( cdf.select(CF.percentile_approx(cdf.b, 0.5, 1000)).toPandas(), sdf.select(SF.percentile_approx(sdf.b, 0.5, 1000)).toPandas(), ) self.assert_eq( cdf.select(CF.percentile_approx(cdf.b, [0.1, 0.9])).toPandas(), sdf.select(SF.percentile_approx(sdf.b, [0.1, 0.9])).toPandas(), check_exact=False, ) self.assert_eq( cdf.groupBy("a").agg(CF.percentile_approx("b", 0.5)).orderBy("a").toPandas(), sdf.groupBy("a").agg(SF.percentile_approx("b", 0.5)).orderBy("a").toPandas(), check_exact=False, ) self.assert_eq( cdf.groupBy("a").agg(CF.percentile_approx(cdf.b, [0.1, 0.9])).orderBy("a").toPandas(), sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 0.9])).orderBy("a").toPandas(), check_exact=False, ) # test count_distinct self.assert_eq( cdf.select(CF.count_distinct("b"), CF.count_distinct(cdf.c)).toPandas(), sdf.select(SF.count_distinct("b"), SF.count_distinct(sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.countDistinct("b"), CF.countDistinct(cdf.c)).toPandas(), sdf.select(SF.countDistinct("b"), SF.countDistinct(sdf.c)).toPandas(), ) # The output column names of 'groupBy.agg(count_distinct)' in PySpark # are incorrect, see SPARK-41391. self.assert_eq( cdf.groupBy("a") .agg(CF.count_distinct("b").alias("x"), CF.count_distinct(cdf.c).alias("y")) .orderBy("a") .toPandas(), sdf.groupBy("a") .agg(SF.count_distinct("b").alias("x"), SF.count_distinct(sdf.c).alias("y")) .orderBy("a") .toPandas(), ) def test_window_functions(self): self.assertEqual(CW.unboundedPreceding, SW.unboundedPreceding) self.assertEqual(CW.unboundedFollowing, SW.unboundedFollowing) self.assertEqual(CW.currentRow, SW.currentRow) query = """ SELECT * FROM VALUES (0, float("NAN"), NULL), (1, NULL, 2.0), (1, 2.1, 3.5), (0, 0.5, 1.0), (0, 1.5, 1.1), (1, 2.2, -1.0), (1, 0.1, -0.1), (0, 0.0, 5.0) AS tab(a, b, c) """ # +---+----+----+ # | a| b| c| # +---+----+----+ # | 0| NaN|NULL| # | 1|NULL| 2.0| # | 1| 2.1| 3.5| # | 0| 0.5| 1.0| # | 0| 1.5| 1.1| # | 1| 2.2|-1.0| # | 1| 0.1|-0.1| # | 0| 0.0| 5.0| # +---+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test window functions for ccol, scol in [ (CF.row_number(), SF.row_number()), (CF.rank(), SF.rank()), (CF.dense_rank(), SF.dense_rank()), (CF.percent_rank(), SF.percent_rank()), (CF.cume_dist(), SF.cume_dist()), (CF.lag("c", 1), SF.lag("c", 1)), (CF.lag("c", 1, -1.0), SF.lag("c", 1, -1.0)), (CF.lag(cdf.c, -1), SF.lag(sdf.c, -1)), (CF.lag(cdf.c, -1, float("nan")), SF.lag(sdf.c, -1, float("nan"))), (CF.lead("c", 1), SF.lead("c", 1)), (CF.lead("c", 1, -1.0), SF.lead("c", 1, -1.0)), (CF.lead(cdf.c, -1), SF.lead(sdf.c, -1)), (CF.lead(cdf.c, -1, float("nan")), SF.lead(sdf.c, -1, float("nan"))), (CF.nth_value("c", 1), SF.nth_value("c", 1)), (CF.nth_value(cdf.c, 2), SF.nth_value(sdf.c, 2)), (CF.nth_value(cdf.c, 2, True), SF.nth_value(sdf.c, 2, True)), (CF.nth_value(cdf.c, 2, False), SF.nth_value(sdf.c, 2, False)), (CF.ntile(1), SF.ntile(1)), (CF.ntile(2), SF.ntile(2)), (CF.ntile(4), SF.ntile(4)), ]: for cwin, swin in [ (CW.orderBy("b"), SW.orderBy("b")), (CW.partitionBy("a").orderBy("b"), SW.partitionBy("a").orderBy("b")), ( CW.partitionBy("a").orderBy(CF.col("b").desc()), SW.partitionBy("a").orderBy(SF.col("b").desc()), ), (CW.partitionBy("a", cdf.c).orderBy("b"), SW.partitionBy("a", sdf.c).orderBy("b")), (CW.partitionBy("a").orderBy("b", cdf.c), SW.partitionBy("a").orderBy("b", sdf.c)), ( CW.partitionBy("a").orderBy("b", cdf.c.desc()), SW.partitionBy("a").orderBy("b", sdf.c.desc()), ), ]: self.assert_eq( cdf.select(ccol.over(cwin)).toPandas(), sdf.select(scol.over(swin)).toPandas(), ) # test aggregation functions for ccol, scol in [ (CF.count("c"), SF.count("c")), (CF.sum("c"), SF.sum("c")), (CF.max(cdf.c), SF.max(sdf.c)), (CF.min(cdf.c), SF.min(sdf.c)), ]: for cwin, swin in [ (CW.orderBy("b"), SW.orderBy("b")), ( CW.orderBy("b").rowsBetween(CW.currentRow, CW.currentRow), SW.orderBy("b").rowsBetween(SW.currentRow, SW.currentRow), ), ( CW.orderBy(cdf.b.desc()).rowsBetween(CW.currentRow - 1, CW.currentRow + 2), SW.orderBy(sdf.b.desc()).rowsBetween(SW.currentRow - 1, SW.currentRow + 2), ), ( CW.orderBy("b").rowsBetween(CW.unboundedPreceding, CW.currentRow), SW.orderBy("b").rowsBetween(SW.unboundedPreceding, SW.currentRow), ), ( CW.orderBy(cdf.b.desc()).rowsBetween(CW.currentRow, CW.unboundedFollowing), SW.orderBy(sdf.b.desc()).rowsBetween(SW.currentRow, SW.unboundedFollowing), ), ( CW.orderBy("b").rangeBetween(CW.currentRow, CW.currentRow), SW.orderBy("b").rangeBetween(SW.currentRow, SW.currentRow), ), ( CW.orderBy("b").rangeBetween(CW.currentRow - 1, CW.currentRow + 2), SW.orderBy("b").rangeBetween(SW.currentRow - 1, SW.currentRow + 2), ), ( CW.orderBy("b").rangeBetween(CW.unboundedPreceding, CW.currentRow), SW.orderBy("b").rangeBetween(SW.unboundedPreceding, SW.currentRow), ), ( CW.orderBy("b").rangeBetween(CW.currentRow, CW.unboundedFollowing), SW.orderBy("b").rangeBetween(SW.currentRow, SW.unboundedFollowing), ), (CW.partitionBy("a").orderBy("b"), SW.partitionBy("a").orderBy("b")), ( CW.partitionBy(cdf.a) .orderBy(CF.asc_nulls_last("b")) .rowsBetween(CW.currentRow, CW.currentRow), SW.partitionBy(sdf.a) .orderBy(SF.asc_nulls_last("b")) .rowsBetween(SW.currentRow, SW.currentRow), ), ( CW.partitionBy("a") .orderBy(cdf.b.desc()) .rowsBetween(CW.currentRow - 1, CW.currentRow + 2), SW.partitionBy("a") .orderBy(sdf.b.desc()) .rowsBetween(SW.currentRow - 1, SW.currentRow + 2), ), ( CW.partitionBy("a") .orderBy("b") .rowsBetween(CW.unboundedPreceding, CW.currentRow), SW.partitionBy("a") .orderBy("b") .rowsBetween(SW.unboundedPreceding, SW.currentRow), ), ( CW.partitionBy("a") .orderBy("b") .rowsBetween(CW.currentRow, CW.unboundedFollowing), SW.partitionBy("a") .orderBy("b") .rowsBetween(SW.currentRow, SW.unboundedFollowing), ), ( CW.partitionBy(cdf.a) .orderBy(cdf.b.desc(), "c") .rangeBetween(CW.currentRow, CW.currentRow), SW.partitionBy(sdf.a) .orderBy(sdf.b.desc(), "c") .rangeBetween(SW.currentRow, SW.currentRow), ), ( CW.partitionBy("a") .orderBy("b") .rangeBetween(CW.currentRow - 1, CW.currentRow + 2), SW.partitionBy("a") .orderBy("b") .rangeBetween(SW.currentRow - 1, SW.currentRow + 2), ), ( CW.partitionBy("a") .orderBy(CF.desc_nulls_last("b")) .rangeBetween(CW.unboundedPreceding, CW.currentRow), SW.partitionBy("a") .orderBy(SF.desc_nulls_last("b")) .rangeBetween(SW.unboundedPreceding, SW.currentRow), ), ( CW.partitionBy("a") .orderBy("b") .rangeBetween(CW.currentRow, CW.unboundedFollowing), SW.partitionBy("a") .orderBy("b") .rangeBetween(SW.currentRow, SW.unboundedFollowing), ), ]: self.assert_eq( cdf.select(ccol.over(cwin)).toPandas(), sdf.select(scol.over(swin)).toPandas(), ) # check error with self.assertRaises(PySparkValueError) as pe: cdf.select(CF.sum("a").over(CW.orderBy("b").rowsBetween(0, (1 << 33)))).show() self.check_error( exception=pe.exception, errorClass="VALUE_NOT_BETWEEN", messageParameters={"arg_name": "end", "min": "-2147483648", "max": "2147483647"}, ) with self.assertRaises(PySparkTypeError) as pe: cdf.select(CF.rank().over(cdf.a)) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "WindowSpec", "arg_name": "window", "arg_type": "Column", }, ) # invalid window function with self.assertRaises(AnalysisException): cdf.select(cdf.b.over(CW.orderBy("b"))).show() # invalid window frame # following functions require Windowframe(RowFrame, UnboundedPreceding, CurrentRow) for ccol in [ CF.row_number(), CF.rank(), CF.dense_rank(), CF.percent_rank(), CF.lag("c", 1), CF.lead("c", 1), CF.ntile(1), ]: with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rowsBetween(CW.currentRow, CW.currentRow + 123)) ).show() with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rangeBetween(CW.currentRow, CW.currentRow + 123)) ).show() with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rangeBetween(CW.unboundedPreceding, CW.currentRow)) ).show() # Function 'cume_dist' requires Windowframe(RangeFrame, UnboundedPreceding, CurrentRow) ccol = CF.cume_dist() with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rangeBetween(CW.currentRow, CW.currentRow + 123)) ).show() with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rowsBetween(CW.currentRow, CW.currentRow + 123)) ).show() with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rowsBetween(CW.unboundedPreceding, CW.currentRow)) ).show() def test_window_order(self): # SPARK-41773: test window function with order data = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")] # +---+--------+ # | id|category| # +---+--------+ # | 1| a| # | 1| a| # | 2| a| # | 1| b| # | 2| b| # | 3| b| # +---+--------+ cdf = self.connect.createDataFrame(data, ["id", "category"]) sdf = self.spark.createDataFrame(data, ["id", "category"]) cw = CW.partitionBy("id").orderBy("category") sw = SW.partitionBy("id").orderBy("category") self.assert_eq( cdf.withColumn("row_number", CF.row_number().over(cw)).toPandas(), sdf.withColumn("row_number", SF.row_number().over(sw)).toPandas(), ) cw = CW.partitionBy("category").orderBy("id") sw = SW.partitionBy("category").orderBy("id") self.assert_eq( cdf.withColumn("row_number", CF.row_number().over(cw)).toPandas(), sdf.withColumn("row_number", SF.row_number().over(sw)).toPandas(), ) cw = CW.partitionBy("category").orderBy("id").rowsBetween(CW.currentRow, 1) sw = SW.partitionBy("category").orderBy("id").rowsBetween(SW.currentRow, 1) self.assert_eq( cdf.withColumn("sum", CF.sum("id").over(cw)).sort("id", "category", "sum").toPandas(), sdf.withColumn("sum", SF.sum("id").over(sw)).sort("id", "category", "sum").toPandas(), ) cw = CW.partitionBy("category").orderBy("id").rangeBetween(CW.currentRow, 1) sw = SW.partitionBy("category").orderBy("id").rangeBetween(SW.currentRow, 1) self.assert_eq( cdf.withColumn("sum", CF.sum("id").over(cw)).sort("id", "category").toPandas(), sdf.withColumn("sum", SF.sum("id").over(sw)).sort("id", "category").toPandas(), ) def test_collection_functions(self): query = """ SELECT * FROM VALUES (ARRAY('a', 'ab'), ARRAY(1, 2, 3), ARRAY(1, NULL, 3), 1, 2, 'a'), (ARRAY('x', NULL), NULL, ARRAY(1, 3), 3, 4, 'x'), (NULL, ARRAY(-1, -2, -3), Array(), 5, 6, NULL) AS tab(a, b, c, d, e, f) """ # +---------+------------+------------+---+---+----+ # | a| b| c| d| e| f| # +---------+------------+------------+---+---+----+ # | [a, ab]| [1, 2, 3]|[1, null, 3]| 1| 2| a| # |[x, null]| NULL| [1, 3]| 3| 4| x| # | NULL|[-1, -2, -3]| []| 5| 6|NULL| # +---------+------------+------------+---+---+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) for cfunc, sfunc in [ (CF.array_distinct, SF.array_distinct), (CF.array_compact, SF.array_compact), (CF.array_max, SF.array_max), (CF.array_min, SF.array_min), (CF.reverse, SF.reverse), (CF.size, SF.size), ]: self.assert_eq( cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(), sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(), check_exact=False, ) for cfunc, sfunc in [ (CF.array_except, SF.array_except), (CF.array_intersect, SF.array_intersect), (CF.array_union, SF.array_union), (CF.arrays_overlap, SF.arrays_overlap), ]: self.assert_eq( cdf.select(cfunc("b", cdf.c)).toPandas(), sdf.select(sfunc("b", sdf.c)).toPandas(), check_exact=False, ) for cfunc, sfunc in [ (CF.array_position, SF.array_position), (CF.array_remove, SF.array_remove), ]: self.assert_eq( cdf.select(cfunc(cdf.a, "ab")).toPandas(), sdf.select(sfunc(sdf.a, "ab")).toPandas(), check_exact=False, ) # test array self.assert_eq( cdf.select(CF.array(cdf.d, "e")).toPandas(), sdf.select(SF.array(sdf.d, "e")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array(cdf.d, "e", CF.lit(99))).toPandas(), sdf.select(SF.array(sdf.d, "e", SF.lit(99))).toPandas(), check_exact=False, ) # test array_contains self.assert_eq( cdf.select(CF.array_contains(cdf.a, "ab")).toPandas(), sdf.select(SF.array_contains(sdf.a, "ab")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_contains(cdf.a, cdf.f)).toPandas(), sdf.select(SF.array_contains(sdf.a, sdf.f)).toPandas(), check_exact=False, ) # test array_append self.assert_eq( cdf.select(CF.array_append(cdf.a, "xyz")).toPandas(), sdf.select(SF.array_append(sdf.a, "xyz")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_append(cdf.a, CF.lit("ab"))).toPandas(), sdf.select(SF.array_append(sdf.a, SF.lit("ab"))).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_append(cdf.a, cdf.f)).toPandas(), sdf.select(SF.array_append(sdf.a, sdf.f)).toPandas(), check_exact=False, ) # test array_prepend self.assert_eq( cdf.select(CF.array_prepend(cdf.a, "xyz")).toPandas(), sdf.select(SF.array_prepend(sdf.a, "xyz")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_prepend(cdf.a, CF.lit("ab"))).toPandas(), sdf.select(SF.array_prepend(sdf.a, SF.lit("ab"))).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_prepend(cdf.a, cdf.f)).toPandas(), sdf.select(SF.array_prepend(sdf.a, sdf.f)).toPandas(), check_exact=False, ) # test array_insert self.assert_eq( cdf.select(CF.array_insert(cdf.a, -5, "ab")).toPandas(), sdf.select(SF.array_insert(sdf.a, -5, "ab")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_insert(cdf.a, 3, cdf.f)).toPandas(), sdf.select(SF.array_insert(sdf.a, 3, sdf.f)).toPandas(), check_exact=False, ) # test array_join self.assert_eq( cdf.select( CF.array_join(cdf.a, ","), CF.array_join("b", ":"), CF.array_join("c", "~") ).toPandas(), sdf.select( SF.array_join(sdf.a, ","), SF.array_join("b", ":"), SF.array_join("c", "~") ).toPandas(), check_exact=False, ) self.assert_eq( cdf.select( CF.array_join(cdf.a, ",", "_null_"), CF.array_join("b", ":", ".null."), CF.array_join("c", "~", "NULL"), ).toPandas(), sdf.select( SF.array_join(sdf.a, ",", "_null_"), SF.array_join("b", ":", ".null."), SF.array_join("c", "~", "NULL"), ).toPandas(), check_exact=False, ) # test array_repeat self.assert_eq( cdf.select(CF.array_repeat(cdf.f, "d")).toPandas(), sdf.select(SF.array_repeat(sdf.f, "d")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_repeat("f", cdf.d)).toPandas(), sdf.select(SF.array_repeat("f", sdf.d)).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.array_repeat("f", 3)).toPandas(), sdf.select(SF.array_repeat("f", 3)).toPandas(), check_exact=False, ) # test arrays_zip # TODO: Make toPandas support complex nested types like Array # DataFrame.iloc[:, 0] (column name="arrays_zip(b, c)") values are different (66.66667 %) # [index]: [0, 1, 2] # [left]: [[{'b': 1, 'c': 1.0}, {'b': 2, 'c': None}, {'b': 3, 'c': 3.0}], None, # [{'b': -1, 'c': None}, {'b': -2, 'c': None}, {'b': -3, 'c': None}]] # [right]: [[(1, 1), (2, None), (3, 3)], None, [(-1, None), (-2, None), (-3, None)]] self.compare_by_show( cdf.select(CF.arrays_zip(cdf.b, "c")), sdf.select(SF.arrays_zip(sdf.b, "c")), ) # test concat self.assert_eq( cdf.select(CF.concat("d", cdf.e, CF.lit(-1))).toPandas(), sdf.select(SF.concat("d", sdf.e, SF.lit(-1))).toPandas(), ) # test create_map self.compare_by_show( cdf.select(CF.create_map(cdf.d, cdf.e)), sdf.select(SF.create_map(sdf.d, sdf.e)) ) self.compare_by_show( cdf.select(CF.create_map(cdf.d, "e", "e", CF.lit(1))), sdf.select(SF.create_map(sdf.d, "e", "e", SF.lit(1))), ) # test element_at self.assert_eq( cdf.select(CF.element_at("a", 1)).toPandas(), sdf.select(SF.element_at("a", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.element_at(cdf.a, 1)).toPandas(), sdf.select(SF.element_at(sdf.a, 1)).toPandas(), ) # test get self.assert_eq( cdf.select(CF.get("a", 1)).toPandas(), sdf.select(SF.get("a", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.get(cdf.a, 1)).toPandas(), sdf.select(SF.get(sdf.a, 1)).toPandas(), ) # test shuffle # Can not compare the values due to the random permutation self.assertEqual( cdf.select(CF.shuffle(cdf.a), CF.shuffle("b")).count(), sdf.select(SF.shuffle(sdf.a), SF.shuffle("b")).count(), ) # test slice self.assert_eq( cdf.select(CF.slice(cdf.a, 1, 2), CF.slice("c", 2, 3)).toPandas(), sdf.select(SF.slice(sdf.a, 1, 2), SF.slice("c", 2, 3)).toPandas(), check_exact=False, ) with self.assertRaises(PySparkTypeError) as pe: CF.slice(cdf.a, 1.0, 2) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "Column, int or str", "arg_name": "start", "arg_type": "float", }, ) with self.assertRaises(PySparkTypeError) as pe: CF.slice(cdf.a, 1, 2.0) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "Column, int or str", "arg_name": "length", "arg_type": "float", }, ) # test sort_array self.assert_eq( cdf.select(CF.sort_array(cdf.a, True), CF.sort_array("c", False)).toPandas(), sdf.select(SF.sort_array(sdf.a, True), SF.sort_array("c", False)).toPandas(), check_exact=False, ) # test struct self.compare_by_show( cdf.select(CF.struct(cdf.a, "d", "e", cdf.f)), sdf.select(SF.struct(sdf.a, "d", "e", sdf.f)), ) # test sequence self.assert_eq( cdf.select(CF.sequence(CF.lit(1), CF.lit(5))).toPandas(), sdf.select(SF.sequence(SF.lit(1), SF.lit(5))).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.sequence(CF.lit(1), CF.lit(5), CF.lit(1))).toPandas(), sdf.select(SF.sequence(SF.lit(1), SF.lit(5), SF.lit(1))).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.sequence(cdf.d, "e")).toPandas(), sdf.select(SF.sequence(sdf.d, "e")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.sequence(cdf.d, "e", CF.lit(1))).toPandas(), sdf.select(SF.sequence(sdf.d, "e", SF.lit(1))).toPandas(), check_exact=False, ) def test_map_collection_functions(self): query = """ SELECT * FROM VALUES (MAP('a', 'ab'), MAP('x', 'ab'), MAP(1, 2, 3, 4), 1, 'a', ARRAY(1, 2), ARRAY('X', 'Y')), (MAP('x', 'yz'), MAP('c', NULL), NULL, 2, 'x', ARRAY(3, 4), ARRAY('A', 'B')), (MAP('c', 'de'), NULL, MAP(-1, NULL, -3, -4), -3, 'c', NULL, ARRAY('Z')) AS tab(a, b, c, e, f, g, h) """ # +---------+-----------+----------------------+---+---+------+------+ # | a| b| c| e| f| g| h| # +---------+-----------+----------------------+---+---+------+------+ # |{a -> ab}| {x -> ab}| {1 -> 2, 3 -> 4}| 1| a|[1, 2]|[X, Y]| # |{x -> yz}|{c -> null}| NULL| 2| x|[3, 4]|[A, B]| # |{c -> de}| NULL|{-1 -> null, -3 -> -4}| -3| c| NULL| [Z]| # +---------+-----------+----------------------+---+---+------+------+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test map_concat self.compare_by_show( cdf.select(CF.map_concat(cdf.a, "b")), sdf.select(SF.map_concat(sdf.a, "b")), ) # test map_contains_key self.compare_by_show( cdf.select(CF.map_contains_key(cdf.a, "a"), CF.map_contains_key("c", 3)), sdf.select(SF.map_contains_key(sdf.a, "a"), SF.map_contains_key("c", 3)), ) # test map_entries self.compare_by_show( cdf.select(CF.map_entries(cdf.a), CF.map_entries("b")), sdf.select(SF.map_entries(sdf.a), SF.map_entries("b")), ) # test map_from_arrays self.compare_by_show( cdf.select(CF.map_from_arrays(cdf.g, "h")), sdf.select(SF.map_from_arrays(sdf.g, "h")), ) # test map_keys and map_values self.compare_by_show( cdf.select(CF.map_keys(cdf.a), CF.map_values("b")), sdf.select(SF.map_keys(sdf.a), SF.map_values("b")), ) # test size self.assert_eq( cdf.select(CF.size(cdf.a), CF.size("c")).toPandas(), sdf.select(SF.size(sdf.a), SF.size("c")).toPandas(), ) def test_generator_functions(self): query = """ SELECT * FROM VALUES (ARRAY('a', 'ab'), ARRAY(1, 2, 3), ARRAY(1, NULL, 3), MAP(1, 2, 3, 4), 1, FLOAT(2.0), 3), (ARRAY('x', NULL), NULL, ARRAY(1, 3), NULL, 3, FLOAT(4.0), 5), (NULL, ARRAY(-1, -2, -3), Array(), MAP(-1, NULL, -3, -4), 7, FLOAT('NAN'), 9) AS tab(a, b, c, d, e, f, g) """ # +---------+------------+------------+----------------------+---+---+---+ # | a| b| c| d| e| f| g| # +---------+------------+------------+----------------------+---+---+---+ # | [a, ab]| [1, 2, 3]|[1, null, 3]| {1 -> 2, 3 -> 4}| 1|2.0| 3| # |[x, null]| NULL| [1, 3]| NULL| 3|4.0| 5| # | NULL|[-1, -2, -3]| []|{-1 -> null, -3 -> -4}| 7|NaN| 9| # +---------+------------+------------+----------------------+---+---+---+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test explode with arrays self.assert_eq( cdf.select(CF.explode(cdf.a), CF.col("b")).toPandas(), sdf.select(SF.explode(sdf.a), SF.col("b")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.explode("a"), "b").toPandas(), sdf.select(SF.explode("a"), "b").toPandas(), check_exact=False, ) # test explode with maps self.assert_eq( cdf.select(CF.explode(cdf.d), CF.col("c")).toPandas(), sdf.select(SF.explode(sdf.d), SF.col("c")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.explode("d"), "c").toPandas(), sdf.select(SF.explode("d"), "c").toPandas(), check_exact=False, ) # test explode_outer with arrays self.assert_eq( cdf.select(CF.explode_outer(cdf.a), CF.col("b")).toPandas(), sdf.select(SF.explode_outer(sdf.a), SF.col("b")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.explode_outer("a"), "b").toPandas(), sdf.select(SF.explode_outer("a"), "b").toPandas(), check_exact=False, ) # test explode_outer with maps self.assert_eq( cdf.select(CF.explode_outer(cdf.d), CF.col("c")).toPandas(), sdf.select(SF.explode_outer(sdf.d), SF.col("c")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.explode_outer("d"), "c").toPandas(), sdf.select(SF.explode_outer("d"), "c").toPandas(), check_exact=False, ) # test flatten self.assert_eq( cdf.select(CF.flatten(CF.array("b", cdf.c)), CF.col("b")).toPandas(), sdf.select(SF.flatten(SF.array("b", sdf.c)), SF.col("b")).toPandas(), check_exact=False, ) # test inline self.assert_eq( cdf.select(CF.expr("ARRAY(STRUCT(e, f), STRUCT(g AS e, f))").alias("X")) .select(CF.inline("X")) .toPandas(), sdf.select(SF.expr("ARRAY(STRUCT(e, f), STRUCT(g AS e, f))").alias("X")) .select(SF.inline("X")) .toPandas(), check_exact=False, ) # test inline_outer self.assert_eq( cdf.select(CF.expr("ARRAY(STRUCT(e, f), STRUCT(g AS e, f))").alias("X")) .select(CF.inline_outer("X")) .toPandas(), sdf.select(SF.expr("ARRAY(STRUCT(e, f), STRUCT(g AS e, f))").alias("X")) .select(SF.inline_outer("X")) .toPandas(), check_exact=False, ) # test posexplode with arrays self.assert_eq( cdf.select(CF.posexplode(cdf.a), CF.col("b")).toPandas(), sdf.select(SF.posexplode(sdf.a), SF.col("b")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.posexplode("a"), "b").toPandas(), sdf.select(SF.posexplode("a"), "b").toPandas(), check_exact=False, ) # test posexplode with maps self.assert_eq( cdf.select(CF.posexplode(cdf.d), CF.col("c")).toPandas(), sdf.select(SF.posexplode(sdf.d), SF.col("c")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.posexplode("d"), "c").toPandas(), sdf.select(SF.posexplode("d"), "c").toPandas(), check_exact=False, ) # test posexplode_outer with arrays self.assert_eq( cdf.select(CF.posexplode_outer(cdf.a), CF.col("b")).toPandas(), sdf.select(SF.posexplode_outer(sdf.a), SF.col("b")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.posexplode_outer("a"), "b").toPandas(), sdf.select(SF.posexplode_outer("a"), "b").toPandas(), check_exact=False, ) # test posexplode_outer with maps self.assert_eq( cdf.select(CF.posexplode_outer(cdf.d), CF.col("c")).toPandas(), sdf.select(SF.posexplode_outer(sdf.d), SF.col("c")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.posexplode_outer("d"), "c").toPandas(), sdf.select(SF.posexplode_outer("d"), "c").toPandas(), check_exact=False, ) def test_lambda_functions(self): query = """ SELECT * FROM VALUES (ARRAY('a', 'ab'), ARRAY(1, 2, 3), ARRAY(1, NULL, 3), 1, 2, 'a', NULL, MAP(0, 0)), (ARRAY('x', NULL), NULL, ARRAY(1, 3), 3, 4, 'x', MAP(2, 0), MAP(-1, 1)), (NULL, ARRAY(-1, -2, -3), Array(), 5, 6, NULL, MAP(-1, 2, -3, -4), NULL) AS tab(a, b, c, d, e, f, g, h) """ # +---------+------------+------------+---+---+----+-------------------+---------+ # | a| b| c| d| e| f| g| h| # +---------+------------+------------+---+---+----+-------------------+---------+ # | [a, ab]| [1, 2, 3]|[1, null, 3]| 1| 2| a| NULL| {0 -> 0}| # |[x, null]| NULL| [1, 3]| 3| 4| x| {2 -> 0}|{-1 -> 1}| # | NULL|[-1, -2, -3]| []| 5| 6|NULL|{-1 -> 2, -3 -> -4}| NULL| # +---------+------------+------------+---+---+----+-------------------+---------+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test exists self.assert_eq( cdf.select(CF.exists(cdf.b, lambda x: x < 0)).toPandas(), sdf.select(SF.exists(sdf.b, lambda x: x < 0)).toPandas(), ) self.assert_eq( cdf.select(CF.exists("a", lambda x: CF.isnull(x))).toPandas(), sdf.select(SF.exists("a", lambda x: SF.isnull(x))).toPandas(), ) # test aggregate # aggregate without finish self.assert_eq( cdf.select(CF.aggregate(cdf.b, "d", lambda acc, x: acc + x)).toPandas(), sdf.select(SF.aggregate(sdf.b, "d", lambda acc, x: acc + x)).toPandas(), ) self.assert_eq( cdf.select(CF.aggregate("b", cdf.d, lambda acc, x: acc + x)).toPandas(), sdf.select(SF.aggregate("b", sdf.d, lambda acc, x: acc + x)).toPandas(), ) # aggregate with finish self.assert_eq( cdf.select( CF.aggregate(cdf.b, "d", lambda acc, x: acc + x, lambda acc: acc + 100) ).toPandas(), sdf.select( SF.aggregate(sdf.b, "d", lambda acc, x: acc + x, lambda acc: acc + 100) ).toPandas(), ) self.assert_eq( cdf.select( CF.aggregate("b", cdf.d, lambda acc, x: acc + x, lambda acc: acc + 100) ).toPandas(), sdf.select( SF.aggregate("b", sdf.d, lambda acc, x: acc + x, lambda acc: acc + 100) ).toPandas(), ) # test array_sort self.assert_eq( cdf.select(CF.array_sort(cdf.b, lambda x, y: CF.abs(x) - CF.abs(y))).toPandas(), sdf.select(SF.array_sort(sdf.b, lambda x, y: SF.abs(x) - SF.abs(y))).toPandas(), check_exact=False, ) self.assert_eq( cdf.select( CF.array_sort( "a", lambda x, y: CF.when(x.isNull() | y.isNull(), CF.lit(0)).otherwise( CF.length(y) - CF.length(x) ), ) ).toPandas(), sdf.select( SF.array_sort( "a", lambda x, y: SF.when(x.isNull() | y.isNull(), SF.lit(0)).otherwise( SF.length(y) - SF.length(x) ), ) ).toPandas(), check_exact=False, ) # test filter self.assert_eq( cdf.select(CF.filter(cdf.b, lambda x: x < 0)).toPandas(), sdf.select(SF.filter(sdf.b, lambda x: x < 0)).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.filter("a", lambda x: ~CF.isnull(x))).toPandas(), sdf.select(SF.filter("a", lambda x: ~SF.isnull(x))).toPandas(), check_exact=False, ) # test forall self.assert_eq( cdf.select(CF.filter(cdf.b, lambda x: x != 0)).toPandas(), sdf.select(SF.filter(sdf.b, lambda x: x != 0)).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.filter("a", lambda x: ~CF.isnull(x))).toPandas(), sdf.select(SF.filter("a", lambda x: ~SF.isnull(x))).toPandas(), check_exact=False, ) # test transform # transform without index self.assert_eq( cdf.select(CF.transform(cdf.b, lambda x: x + 1)).toPandas(), sdf.select(SF.transform(sdf.b, lambda x: x + 1)).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.transform("b", lambda x: x + 1)).toPandas(), sdf.select(SF.transform("b", lambda x: x + 1)).toPandas(), check_exact=False, ) # transform with index self.assert_eq( cdf.select(CF.transform(cdf.b, lambda x, i: x + 1 - i)).toPandas(), sdf.select(SF.transform(sdf.b, lambda x, i: x + 1 - i)).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.transform("b", lambda x, i: x + 1 - i)).toPandas(), sdf.select(SF.transform("b", lambda x, i: x + 1 - i)).toPandas(), check_exact=False, ) # test zip_with self.assert_eq( cdf.select(CF.zip_with(cdf.b, "c", lambda v1, v2: v1 - CF.abs(v2))).toPandas(), sdf.select(SF.zip_with(sdf.b, "c", lambda v1, v2: v1 - SF.abs(v2))).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.zip_with("b", cdf.c, lambda v1, v2: v1 - CF.abs(v2))).toPandas(), sdf.select(SF.zip_with("b", sdf.c, lambda v1, v2: v1 - SF.abs(v2))).toPandas(), check_exact=False, ) # test map_filter self.compare_by_show( cdf.select(CF.map_filter(cdf.g, lambda k, v: k > v)), sdf.select(SF.map_filter(sdf.g, lambda k, v: k > v)), ) self.compare_by_show( cdf.select(CF.map_filter("g", lambda k, v: k > v)), sdf.select(SF.map_filter("g", lambda k, v: k > v)), ) # test map_zip_with self.compare_by_show( cdf.select(CF.map_zip_with(cdf.g, "h", lambda k, v1, v2: v1 + v2)), sdf.select(SF.map_zip_with(sdf.g, "h", lambda k, v1, v2: v1 + v2)), ) self.compare_by_show( cdf.select(CF.map_zip_with("g", cdf.h, lambda k, v1, v2: v1 + v2)), sdf.select(SF.map_zip_with("g", sdf.h, lambda k, v1, v2: v1 + v2)), ) # test transform_keys self.compare_by_show( cdf.select(CF.transform_keys(cdf.g, lambda k, v: k - 1)), sdf.select(SF.transform_keys(sdf.g, lambda k, v: k - 1)), ) self.compare_by_show( cdf.select(CF.transform_keys("g", lambda k, v: k - 1)), sdf.select(SF.transform_keys("g", lambda k, v: k - 1)), ) # test transform_values self.compare_by_show( cdf.select(CF.transform_values(cdf.g, lambda k, v: CF.abs(v) + 1)), sdf.select(SF.transform_values(sdf.g, lambda k, v: SF.abs(v) + 1)), ) self.compare_by_show( cdf.select(CF.transform_values("g", lambda k, v: CF.abs(v) + 1)), sdf.select(SF.transform_values("g", lambda k, v: SF.abs(v) + 1)), ) def test_nested_lambda_function(self): # SPARK-42089: test nested lambda function query = "SELECT array(1, 2, 3) as numbers, array('a', 'b', 'c') as letters" cdf = self.connect.sql(query).select( CF.flatten( CF.transform( "numbers", lambda number: CF.transform( "letters", lambda letter: CF.struct(number.alias("n"), letter.alias("l")) ), ) ) ) sdf = self.spark.sql(query).select( SF.flatten( SF.transform( "numbers", lambda number: SF.transform( "letters", lambda letter: SF.struct(number.alias("n"), letter.alias("l")) ), ) ) ) # TODO: 'cdf.schema' has an extra metadata '{'__autoGeneratedAlias': 'true'}' self.assertEqual(_drop_metadata(cdf.schema), _drop_metadata(sdf.schema)) assertDataFrameEqual(cdf, sdf) def test_csv_functions(self): query = """ SELECT * FROM VALUES ('1,2,3', 'a,b,5.0'), ('3,4,5', 'x,y,6.0') AS tab(a, b) """ # +-----+-------+ # | a| b| # +-----+-------+ # |1,2,3|a,b,5.0| # |3,4,5|x,y,6.0| # +-----+-------+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test from_csv self.compare_by_show( cdf.select( CF.from_csv(cdf.a, "a INT, b INT, c INT"), CF.from_csv("b", "x STRING, y STRING, z DOUBLE"), ), sdf.select( SF.from_csv(sdf.a, "a INT, b INT, c INT"), SF.from_csv("b", "x STRING, y STRING, z DOUBLE"), ), ) self.compare_by_show( cdf.select( CF.from_csv(cdf.a, CF.lit("a INT, b INT, c INT")), CF.from_csv("b", CF.lit("x STRING, y STRING, z DOUBLE")), ), sdf.select( SF.from_csv(sdf.a, SF.lit("a INT, b INT, c INT")), SF.from_csv("b", SF.lit("x STRING, y STRING, z DOUBLE")), ), ) self.compare_by_show( cdf.select( CF.from_csv(cdf.a, CF.lit("a INT, b INT, c INT"), {"maxCharsPerColumn": "3"}), CF.from_csv( "b", CF.lit("x STRING, y STRING, z DOUBLE"), {"maxCharsPerColumn": "3"} ), ), sdf.select( SF.from_csv(sdf.a, SF.lit("a INT, b INT, c INT"), {"maxCharsPerColumn": "3"}), SF.from_csv( "b", SF.lit("x STRING, y STRING, z DOUBLE"), {"maxCharsPerColumn": "3"} ), ), ) # test schema_of_csv self.assert_eq( cdf.select(CF.schema_of_csv(CF.lit('{"a": 0}'))).toPandas(), sdf.select(SF.schema_of_csv(SF.lit('{"a": 0}'))).toPandas(), ) self.assert_eq( cdf.select( CF.schema_of_csv(CF.lit('{"a": 0}'), {"maxCharsPerColumn": "10"}) ).toPandas(), sdf.select( SF.schema_of_csv(SF.lit('{"a": 0}'), {"maxCharsPerColumn": "10"}) ).toPandas(), ) # test to_csv self.compare_by_show( cdf.select(CF.to_csv(CF.struct(CF.lit("a"), CF.lit("b")))), sdf.select(SF.to_csv(SF.struct(SF.lit("a"), SF.lit("b")))), ) self.compare_by_show( cdf.select(CF.to_csv(CF.struct(CF.lit("a"), CF.lit("b")), {"maxCharsPerColumn": "10"})), sdf.select(SF.to_csv(SF.struct(SF.lit("a"), SF.lit("b")), {"maxCharsPerColumn": "10"})), ) def test_json_functions(self): query = """ SELECT * FROM VALUES ('{"a": 1}', '[1, 2, 3]', '{"f1": "value1", "f2": "value2"}'), ('{"a": 0}', '[4, 5, 6]', '{"f1": "value12"}') AS tab(a, b, c) """ # +--------+---------+--------------------------------+ # | a| b| c| # +--------+---------+--------------------------------+ # |{"a": 1}|[1, 2, 3]|{"f1": "value1", "f2": "value2"}| # |{"a": 0}|[4, 5, 6]| {"f1": "value12"}| # +--------+---------+--------------------------------+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test from_json for schema in [ "a INT", "MAP", StructType([StructField("a", IntegerType())]), ArrayType(StructType([StructField("a", IntegerType())])), ]: self.compare_by_show( cdf.select(CF.from_json(cdf.a, schema)), sdf.select(SF.from_json(sdf.a, schema)), ) self.compare_by_show( cdf.select(CF.from_json("a", schema)), sdf.select(SF.from_json("a", schema)), ) self.compare_by_show( cdf.select(CF.from_json(cdf.a, schema, {"mode": "FAILFAST"})), sdf.select(SF.from_json(sdf.a, schema, {"mode": "FAILFAST"})), ) self.compare_by_show( cdf.select(CF.from_json("a", schema, {"mode": "FAILFAST"})), sdf.select(SF.from_json("a", schema, {"mode": "FAILFAST"})), ) for schema in [ "ARRAY", ArrayType(IntegerType()), ]: self.compare_by_show( cdf.select(CF.from_json(cdf.b, schema)), sdf.select(SF.from_json(sdf.b, schema)), ) self.compare_by_show( cdf.select(CF.from_json("b", schema)), sdf.select(SF.from_json("b", schema)), ) self.compare_by_show( cdf.select(CF.from_json(cdf.b, schema, {"mode": "FAILFAST"})), sdf.select(SF.from_json(sdf.b, schema, {"mode": "FAILFAST"})), ) self.compare_by_show( cdf.select(CF.from_json("b", schema, {"mode": "FAILFAST"})), sdf.select(SF.from_json("b", schema, {"mode": "FAILFAST"})), ) # SPARK-41880: from_json support non-literal expression c_schema = CF.schema_of_json(CF.lit("""{"a": 2}""")) s_schema = SF.schema_of_json(SF.lit("""{"a": 2}""")) self.compare_by_show( cdf.select(CF.from_json(cdf.a, c_schema)), sdf.select(SF.from_json(sdf.a, s_schema)), ) self.compare_by_show( cdf.select(CF.from_json("a", c_schema)), sdf.select(SF.from_json("a", s_schema)), ) self.compare_by_show( cdf.select(CF.from_json(cdf.a, c_schema, {"mode": "FAILFAST"})), sdf.select(SF.from_json(sdf.a, s_schema, {"mode": "FAILFAST"})), ) self.compare_by_show( cdf.select(CF.from_json("a", c_schema, {"mode": "FAILFAST"})), sdf.select(SF.from_json("a", s_schema, {"mode": "FAILFAST"})), ) with self.assertRaises(PySparkTypeError) as pe: CF.from_json("a", [c_schema]) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "Column, str or DataType", "arg_name": "schema", "arg_type": "list", }, ) # test get_json_object self.assert_eq( cdf.select( CF.get_json_object("c", "$.f1"), CF.get_json_object(cdf.c, "$.f2"), ).toPandas(), sdf.select( SF.get_json_object("c", "$.f1"), SF.get_json_object(sdf.c, "$.f2"), ).toPandas(), ) # test json_tuple self.assert_eq( cdf.select(CF.json_tuple("c", "f1", "f2")).toPandas(), sdf.select(SF.json_tuple("c", "f1", "f2")).toPandas(), ) self.assert_eq( cdf.select(CF.json_tuple(cdf.c, "f1", "f2")).toPandas(), sdf.select(SF.json_tuple(sdf.c, "f1", "f2")).toPandas(), ) # test schema_of_json self.assert_eq( cdf.select(CF.schema_of_json(CF.lit('{"a": 0}'))).toPandas(), sdf.select(SF.schema_of_json(SF.lit('{"a": 0}'))).toPandas(), ) self.assert_eq( cdf.select(CF.schema_of_json(CF.lit('{"a": 0}'), {"mode": "FAILFAST"})).toPandas(), sdf.select(SF.schema_of_json(SF.lit('{"a": 0}'), {"mode": "FAILFAST"})).toPandas(), ) # test to_json self.compare_by_show( cdf.select(CF.to_json(CF.struct(CF.lit("a"), CF.lit("b")))), sdf.select(SF.to_json(SF.struct(SF.lit("a"), SF.lit("b")))), ) self.compare_by_show( cdf.select(CF.to_json(CF.struct(CF.lit("a"), CF.lit("b")), {"mode": "FAILFAST"})), sdf.select(SF.to_json(SF.struct(SF.lit("a"), SF.lit("b")), {"mode": "FAILFAST"})), ) def test_xml_functions(self): query = """ SELECT * FROM VALUES ('

1

', '

123

', '

5.0

'), ('

0

', '

456

', '

') AS tab(a, b, c) """ # +---------------+-------------------------------+---------------------------------+ # | a| b| c| # +---------------+-------------------------------+---------------------------------+ # |

1

|

123

|

5.0

| # |

1

|

456

|

| # +---------------+-------------------------------+---------------------------------+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test from_xml for schema in [ "a INT", StructType([StructField("a", IntegerType())]), StructType([StructField("a", ArrayType(IntegerType()))]), ]: self.compare_by_show( cdf.select(CF.from_xml(cdf.a, schema)), sdf.select(SF.from_xml(sdf.a, schema)), ) self.compare_by_show( cdf.select(CF.from_xml("a", schema)), sdf.select(SF.from_xml("a", schema)), ) self.compare_by_show( cdf.select(CF.from_xml(cdf.a, schema, {"mode": "FAILFAST"})), sdf.select(SF.from_xml(sdf.a, schema, {"mode": "FAILFAST"})), ) self.compare_by_show( cdf.select(CF.from_xml("a", schema, {"mode": "FAILFAST"})), sdf.select(SF.from_xml("a", schema, {"mode": "FAILFAST"})), ) for schema in [ "STRUCT>", StructType([StructField("a", ArrayType(IntegerType()))]), ]: self.compare_by_show( cdf.select(CF.from_xml(cdf.b, schema)), sdf.select(SF.from_xml(sdf.b, schema)), ) self.compare_by_show( cdf.select(CF.from_xml("b", schema)), sdf.select(SF.from_xml("b", schema)), ) self.compare_by_show( cdf.select(CF.from_xml(cdf.b, schema, {"mode": "FAILFAST"})), sdf.select(SF.from_xml(sdf.b, schema, {"mode": "FAILFAST"})), ) self.compare_by_show( cdf.select(CF.from_xml("b", schema, {"mode": "FAILFAST"})), sdf.select(SF.from_xml("b", schema, {"mode": "FAILFAST"})), ) self.compare_by_show( sdf.select(SF.to_xml(SF.struct(SF.from_xml("b", schema)), {"rowTag": "person"})), sdf.select(SF.to_xml(SF.struct(SF.from_xml("b", schema)), {"rowTag": "person"})), ) c_schema = CF.schema_of_xml(CF.lit("""

1

""")) s_schema = SF.schema_of_xml(SF.lit("""

1

""")) self.compare_by_show( cdf.select(CF.from_xml(cdf.a, c_schema)), sdf.select(SF.from_xml(sdf.a, s_schema)), ) self.compare_by_show( cdf.select(CF.from_xml("a", c_schema)), sdf.select(SF.from_xml("a", s_schema)), ) self.compare_by_show( cdf.select(CF.from_xml(cdf.a, c_schema, {"mode": "FAILFAST"})), sdf.select(SF.from_xml(sdf.a, s_schema, {"mode": "FAILFAST"})), ) self.compare_by_show( cdf.select(CF.from_xml("a", c_schema, {"mode": "FAILFAST"})), sdf.select(SF.from_xml("a", s_schema, {"mode": "FAILFAST"})), ) with self.assertRaises(PySparkTypeError) as pe: CF.from_xml("a", [c_schema]) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "StructType, Column or str", "arg_name": "schema", "arg_type": "list", }, ) # test schema_of_xml self.assert_eq( cdf.select(CF.schema_of_xml(CF.lit("

1

"))).toPandas(), sdf.select(SF.schema_of_xml(SF.lit("

1

"))).toPandas(), ) self.assert_eq( cdf.select( CF.schema_of_xml(CF.lit("

1

"), {"mode": "FAILFAST"}) ).toPandas(), sdf.select( SF.schema_of_xml(SF.lit("

1

"), {"mode": "FAILFAST"}) ).toPandas(), ) # test to_xml self.compare_by_show( cdf.select(CF.to_xml(CF.struct(CF.lit("a"), CF.lit("b")))), sdf.select(SF.to_xml(SF.struct(SF.lit("a"), SF.lit("b")))), ) self.compare_by_show( cdf.select(CF.to_xml(CF.struct(CF.lit("a"), CF.lit("b")), {"mode": "FAILFAST"})), sdf.select(SF.to_xml(SF.struct(SF.lit("a"), SF.lit("b")), {"mode": "FAILFAST"})), ) def test_string_functions_one_arg(self): query = """ SELECT * FROM VALUES (' ab ', 'ab ', NULL), (' ab', NULL, 'ab') AS tab(a, b, c) """ # +--------+-----+----+ # | a| b| c| # +--------+-----+----+ # | ab |ab |NULL| # | ab| NULL| ab| # +--------+-----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) for cfunc, sfunc in [ (CF.upper, SF.upper), (CF.lower, SF.lower), (CF.ascii, SF.ascii), (CF.base64, SF.base64), (CF.unbase64, SF.unbase64), (CF.ltrim, SF.ltrim), (CF.rtrim, SF.rtrim), (CF.trim, SF.trim), (CF.sentences, SF.sentences), (CF.initcap, SF.initcap), (CF.soundex, SF.soundex), (CF.hex, SF.hex), (CF.unhex, SF.unhex), (CF.length, SF.length), (CF.octet_length, SF.octet_length), (CF.bit_length, SF.bit_length), (CF.reverse, SF.reverse), ]: self.assert_eq( cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(), sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(), ) query = """ SELECT * FROM VALUES (' 1 ', '2 ', NULL), (' 3', NULL, '4') AS tab(a, b, c) """ cdf = self.connect.sql(query) sdf = self.spark.sql(query) self.assert_eq( cdf.select(CF.bin(cdf.a), CF.bin(cdf.b)).toPandas(), sdf.select(SF.bin(sdf.a), SF.bin(sdf.b)).toPandas(), ) def test_string_functions_multi_args(self): query = """ SELECT * FROM VALUES (1, 'abcdef', 'ghij', 'hello world', 'a.b.c.d'), (2, 'abcd', 'efghij', 'how are you', 'a.b.c') AS tab(a, b, c, d, e) """ # +---+------+------+-----------+-------+ # | a| b| c| d| e| # +---+------+------+-----------+-------+ # | 1|abcdef| ghij|hello world|a.b.c.d| # | 2| abcd|efghij|how are you| a.b.c| # +---+------+------+-----------+-------+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) self.assert_eq( cdf.select(CF.format_number(cdf.a, 2)).toPandas(), sdf.select(SF.format_number(sdf.a, 2)).toPandas(), ) self.assert_eq( cdf.select(CF.format_number("a", 5)).toPandas(), sdf.select(SF.format_number("a", 5)).toPandas(), ) self.assert_eq( cdf.select(CF.concat_ws("-", cdf.b, "c")).toPandas(), sdf.select(SF.concat_ws("-", sdf.b, "c")).toPandas(), ) self.assert_eq( cdf.select(CF.decode("c", "UTF-8")).toPandas(), sdf.select(SF.decode("c", "UTF-8")).toPandas(), ) self.assert_eq( cdf.select(CF.encode("c", "UTF-8")).toPandas(), sdf.select(SF.encode("c", "UTF-8")).toPandas(), ) self.assert_eq( cdf.select(CF.format_string("%d %s", cdf.a, cdf.b)).toPandas(), sdf.select(SF.format_string("%d %s", sdf.a, sdf.b)).toPandas(), ) self.assert_eq( cdf.select(CF.instr(cdf.b, "b")).toPandas(), sdf.select(SF.instr(sdf.b, "b")).toPandas() ) self.assert_eq( cdf.select(CF.overlay(cdf.b, cdf.c, 2)).toPandas(), sdf.select(SF.overlay(sdf.b, sdf.c, 2)).toPandas(), ) self.assert_eq( cdf.select(CF.substring(cdf.b, 1, 2)).toPandas(), sdf.select(SF.substring(sdf.b, 1, 2)).toPandas(), ) self.assert_eq( cdf.select(CF.substring_index(cdf.e, ".", 2)).toPandas(), sdf.select(SF.substring_index(sdf.e, ".", 2)).toPandas(), ) self.assert_eq( cdf.select(CF.levenshtein(cdf.b, cdf.c)).toPandas(), sdf.select(SF.levenshtein(sdf.b, sdf.c)).toPandas(), ) self.assert_eq( cdf.select(CF.levenshtein(cdf.b, cdf.c, 1)).toPandas(), sdf.select(SF.levenshtein(sdf.b, sdf.c, 1)).toPandas(), ) self.assert_eq( cdf.select(CF.locate("e", cdf.b)).toPandas(), sdf.select(SF.locate("e", sdf.b)).toPandas(), ) self.assert_eq( cdf.select(CF.lpad(cdf.b, 10, "#")).toPandas(), sdf.select(SF.lpad(sdf.b, 10, "#")).toPandas(), ) self.assert_eq( cdf.select(CF.rpad(cdf.b, 10, "#")).toPandas(), sdf.select(SF.rpad(sdf.b, 10, "#")).toPandas(), ) self.assert_eq( cdf.select(CF.repeat(cdf.b, 2)).toPandas(), sdf.select(SF.repeat(sdf.b, 2)).toPandas() ) self.assert_eq( cdf.select(CF.split(cdf.b, "[bd]")).toPandas(), sdf.select(SF.split(sdf.b, "[bd]")).toPandas(), check_exact=False, ) self.assert_eq( cdf.select(CF.regexp_extract(cdf.b, "(a+)(b)?(c)", 1)).toPandas(), sdf.select(SF.regexp_extract(sdf.b, "(a+)(b)?(c)", 1)).toPandas(), ) self.assert_eq( cdf.select(CF.regexp_replace(cdf.b, "(a+)(b)?(c)", "--")).toPandas(), sdf.select(SF.regexp_replace(sdf.b, "(a+)(b)?(c)", "--")).toPandas(), ) self.assert_eq( cdf.select(CF.translate(cdf.b, "abc", "xyz")).toPandas(), sdf.select(SF.translate(sdf.b, "abc", "xyz")).toPandas(), ) # TODO(SPARK-41283): To compare toPandas for test cases with dtypes marked def test_date_ts_functions(self): query = """ SELECT * FROM VALUES ('1997-02-28 10:30:00', '2023-03-01 06:00:00', 'JST', 1428476400, 2020, 12, 6), ('2000-01-01 04:30:05', '2020-05-01 12:15:00', 'PST', 1403892395, 2022, 12, 6) AS tab(ts1, ts2, tz, seconds, Y, M, D) """ # +-------------------+-------------------+---+----------+----+---+---+ # | ts1| ts2| tz| seconds| Y| M| D| # +-------------------+-------------------+---+----------+----+---+---+ # |1997-02-28 10:30:00|2023-03-01 06:00:00|JST|1428476400|2020| 12| 6| # |2000-01-01 04:30:05|2020-05-01 12:15:00|PST|1403892395|2022| 12| 6| # +-------------------+-------------------+---+----------+----+---+---+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # With no parameters for cfunc, sfunc in [ (CF.current_date, SF.current_date), ]: self.assert_eq( cdf.select(cfunc()).toPandas(), sdf.select(sfunc()).toPandas(), ) # current_timestamp # [left]: datetime64[ns, America/Los_Angeles] # [right]: datetime64[ns] # TODO: compare the return values after resolving dtypes difference self.assertEqual( cdf.select(CF.current_timestamp()).count(), sdf.select(SF.current_timestamp()).count(), ) # localtimestamp s_pdf0 = sdf.select(SF.localtimestamp()).toPandas() c_pdf = cdf.select(CF.localtimestamp()).toPandas() s_pdf1 = sdf.select(SF.localtimestamp()).toPandas() self.assert_eq(s_pdf0 < c_pdf, c_pdf < s_pdf1) # With only column parameter for cfunc, sfunc in [ (CF.year, SF.year), (CF.quarter, SF.quarter), (CF.month, SF.month), (CF.dayofweek, SF.dayofweek), (CF.dayofmonth, SF.dayofmonth), (CF.dayofyear, SF.dayofyear), (CF.hour, SF.hour), (CF.minute, SF.minute), (CF.second, SF.second), (CF.weekofyear, SF.weekofyear), (CF.last_day, SF.last_day), (CF.unix_timestamp, SF.unix_timestamp), ]: self.assert_eq( cdf.select(cfunc(cdf.ts1)).toPandas(), sdf.select(sfunc(sdf.ts1)).toPandas(), ) # With format parameter for cfunc, sfunc in [ (CF.date_format, SF.date_format), (CF.to_date, SF.to_date), ]: self.assert_eq( cdf.select(cfunc(cdf.ts1, format="yyyy-MM-dd HH:mm:ss")).toPandas(), sdf.select(sfunc(sdf.ts1, format="yyyy-MM-dd HH:mm:ss")).toPandas(), ) self.compare_by_show( # [left]: datetime64[ns, America/Los_Angeles] # [right]: datetime64[ns] cdf.select(CF.to_timestamp(cdf.ts1, format="yyyy-MM-dd HH:mm:ss")), sdf.select(SF.to_timestamp(sdf.ts1, format="yyyy-MM-dd HH:mm:ss")), ) # With tz parameter for cfunc, sfunc in [ (CF.from_utc_timestamp, SF.from_utc_timestamp), (CF.to_utc_timestamp, SF.to_utc_timestamp), # [left]: datetime64[ns, America/Los_Angeles] # [right]: datetime64[ns] ]: self.compare_by_show( cdf.select(cfunc(cdf.ts1, tz=cdf.tz)), sdf.select(sfunc(sdf.ts1, tz=sdf.tz)), ) # With numeric parameter for cfunc, sfunc in [ (CF.date_add, SF.date_add), (CF.date_sub, SF.date_sub), (CF.add_months, SF.add_months), ]: self.assert_eq( cdf.select(cfunc(cdf.ts1, cdf.D)).toPandas(), sdf.select(sfunc(sdf.ts1, sdf.D)).toPandas(), ) # With another timestamp as parameter for cfunc, sfunc in [ (CF.datediff, SF.datediff), (CF.months_between, SF.months_between), ]: self.assert_eq( cdf.select(cfunc(cdf.ts1, cdf.ts2)).toPandas(), sdf.select(sfunc(sdf.ts1, sdf.ts2)).toPandas(), ) # With seconds parameter self.compare_by_show( # [left]: datetime64[ns, America/Los_Angeles] # [right]: datetime64[ns] cdf.select(CF.timestamp_seconds(cdf.seconds)), sdf.select(SF.timestamp_seconds(sdf.seconds)), ) # make_date self.assert_eq( cdf.select(CF.make_date(cdf.Y, cdf.M, cdf.D)).toPandas(), sdf.select(SF.make_date(sdf.Y, sdf.M, sdf.D)).toPandas(), ) # date_trunc self.compare_by_show( # [left]: datetime64[ns, America/Los_Angeles] # [right]: datetime64[ns] cdf.select(CF.date_trunc("day", cdf.ts1)), sdf.select(SF.date_trunc("day", sdf.ts1)), ) # trunc self.assert_eq( cdf.select(CF.trunc(cdf.ts1, "year")).toPandas(), sdf.select(SF.trunc(sdf.ts1, "year")).toPandas(), ) # next_day self.assert_eq( cdf.select(CF.next_day(cdf.ts1, "Mon")).toPandas(), sdf.select(SF.next_day(sdf.ts1, "Mon")).toPandas(), ) def test_time_window_functions(self): query = """ SELECT * FROM VALUES (TIMESTAMP('2022-12-25 10:30:00'), 1), (TIMESTAMP('2022-12-25 10:31:00'), 2), (TIMESTAMP('2022-12-25 10:32:00'), 1), (TIMESTAMP('2022-12-25 10:33:00'), 2), (TIMESTAMP('2022-12-26 09:30:00'), 1), (TIMESTAMP('2022-12-26 09:35:00'), 3) AS tab(date, val) """ # +-------------------+---+ # | date|val| # +-------------------+---+ # |2022-12-25 10:30:00| 1| # |2022-12-25 10:31:00| 2| # |2022-12-25 10:32:00| 1| # |2022-12-25 10:33:00| 2| # |2022-12-26 09:30:00| 1| # |2022-12-26 09:35:00| 3| # +-------------------+---+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test window self.compare_by_show( cdf.select(CF.window("date", "15 seconds")), sdf.select(SF.window("date", "15 seconds")), truncate=100, ) self.compare_by_show( cdf.select(CF.window(cdf.date, "1 minute")), sdf.select(SF.window(sdf.date, "1 minute")), truncate=100, ) self.compare_by_show( cdf.select(CF.window("date", "15 seconds", "5 seconds")), sdf.select(SF.window("date", "15 seconds", "5 seconds")), truncate=100, ) self.compare_by_show( cdf.select(CF.window(cdf.date, "1 minute", "10 seconds")), sdf.select(SF.window(sdf.date, "1 minute", "10 seconds")), truncate=100, ) self.compare_by_show( cdf.select(CF.window("date", "15 seconds", "10 seconds", "5 seconds")), sdf.select(SF.window("date", "15 seconds", "10 seconds", "5 seconds")), truncate=100, ) self.compare_by_show( cdf.select(CF.window(cdf.date, "1 minute", "10 seconds", "5 seconds")), sdf.select(SF.window(sdf.date, "1 minute", "10 seconds", "5 seconds")), truncate=100, ) with self.assertRaises(PySparkTypeError) as pe: CF.window("date", "15 seconds", 10, "5 seconds") self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={ "expected_type": "str", "arg_name": "slideDuration", "arg_type": "int", }, ) with self.assertRaises(PySparkTypeError) as pe: CF.window("date", "15 seconds", "10 seconds", 5) self.check_error( exception=pe.exception, errorClass="NOT_EXPECTED_TYPE", messageParameters={"expected_type": "str", "arg_name": "startTime", "arg_type": "int"}, ) # test session_window self.compare_by_show( cdf.select(CF.session_window("date", "15 seconds")), sdf.select(SF.session_window("date", "15 seconds")), truncate=100, ) self.compare_by_show( cdf.select(CF.session_window(cdf.date, "1 minute")), sdf.select(SF.session_window(sdf.date, "1 minute")), truncate=100, ) # test window_time self.compare_by_show( cdf.groupBy(CF.window("date", "5 seconds")) .agg(CF.sum("val").alias("sum")) .select(CF.window_time("window")), sdf.groupBy(SF.window("date", "5 seconds")) .agg(SF.sum("val").alias("sum")) .select(SF.window_time("window")), truncate=100, ) def test_misc_functions(self): query = """ SELECT a, b, c, BINARY(c) as d FROM VALUES (0, float("NAN"), 'x'), (1, NULL, 'y'), (1, 2.1, 'z'), (0, 0.5, NULL) AS tab(a, b, c) """ # +---+----+----+----+ # | a| b| c| d| # +---+----+----+----+ # | 0| NaN| x|[78]| # | 1|NULL| y|[79]| # | 1| 2.1| z|[7A]| # | 0| 0.5|NULL|NULL| # +---+----+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # test assert_true with self.assertRaises(SparkConnectException): cdf.select(CF.assert_true(cdf.a > 0, "a should be positive!")).show() # test raise_error with self.assertRaises(SparkConnectException): cdf.select(CF.raise_error("a should be positive!")).show() # test crc32 self.assert_eq( cdf.select(CF.crc32(cdf.d)).toPandas(), sdf.select(SF.crc32(sdf.d)).toPandas(), ) # test hash self.assert_eq( cdf.select(CF.hash(cdf.a, "b", cdf.c)).toPandas(), sdf.select(SF.hash(sdf.a, "b", sdf.c)).toPandas(), ) # test xxhash64 self.assert_eq( cdf.select(CF.xxhash64(cdf.a, "b", cdf.c)).toPandas(), sdf.select(SF.xxhash64(sdf.a, "b", sdf.c)).toPandas(), ) # test md5 self.assert_eq( cdf.select(CF.md5(cdf.d), CF.md5("c")).toPandas(), sdf.select(SF.md5(sdf.d), SF.md5("c")).toPandas(), ) # test sha1 self.assert_eq( cdf.select(CF.sha1(cdf.d), CF.sha1("c")).toPandas(), sdf.select(SF.sha1(sdf.d), SF.sha1("c")).toPandas(), ) # test sha2 self.assert_eq( cdf.select(CF.sha2(cdf.c, 256), CF.sha2("d", 512)).toPandas(), sdf.select(SF.sha2(sdf.c, 256), SF.sha2("d", 512)).toPandas(), ) def test_call_udf(self): query = """ SELECT a, b, c, BINARY(c) as d FROM VALUES (-1.0, float("NAN"), 'x'), (-2.1, NULL, 'y'), (1, 2.1, 'z'), (0, 0.5, NULL) AS tab(a, b, c) """ # +----+----+----+----+ # | a| b| c| d| # +----+----+----+----+ # |-1.0| NaN| x|[78]| # |-2.1|NULL| y|[79]| # | 1.0| 2.1| z|[7A]| # | 0.0| 0.5|NULL|NULL| # +----+----+----+----+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) self.assert_eq( cdf.select( CF.call_udf("abs", cdf.a), CF.call_udf("xxhash64", "b", cdf.c, "d") ).toPandas(), sdf.select( SF.call_udf("abs", sdf.a), SF.call_udf("xxhash64", "b", sdf.c, "d") ).toPandas(), ) def test_udf(self): query = """ SELECT a, b, c FROM VALUES (1, 1.0, 'x'), (2, 2.0, 'y'), (3, 3.0, 'z') AS tab(a, b, c) """ # +---+---+---+ # | a| b| c| # +---+---+---+ # | 1|1.0| x| # | 2|2.0| y| # | 3|3.0| z| # +---+---+---+ cdf = self.connect.sql(query) sdf = self.spark.sql(query) # as a normal function self.assert_eq( cdf.withColumn("A", CF.udf(lambda x: x + 1)(cdf.a)).toPandas(), sdf.withColumn("A", SF.udf(lambda x: x + 1)(sdf.a)).toPandas(), ) self.assert_eq( # returnType as DDL strings cdf.withColumn("C", CF.udf(lambda x: len(x), "int")(cdf.c)).toPandas(), sdf.withColumn("C", SF.udf(lambda x: len(x), "int")(sdf.c)).toPandas(), ) self.assert_eq( # returnType as DataType cdf.withColumn("C", CF.udf(lambda x: len(x), IntegerType())(cdf.c)).toPandas(), sdf.withColumn("C", SF.udf(lambda x: len(x), IntegerType())(sdf.c)).toPandas(), ) # as a decorator @CF.udf(StringType()) def cfun(x): return x + "a" @SF.udf(StringType()) def sfun(x): return x + "a" self.assert_eq( cdf.withColumn("A", cfun(cdf.c)).toPandas(), sdf.withColumn("A", sfun(sdf.c)).toPandas(), ) def test_udtf(self): class TestUDTF: def eval(self, x: int, y: int): yield x, x + 1 yield y, y + 1 sfunc = SF.udtf(TestUDTF, returnType="a: int, b: int") cfunc = CF.udtf(TestUDTF, returnType="a: int, b: int") assertDataFrameEqual(sfunc(SF.lit(1), SF.lit(1)), cfunc(CF.lit(1), CF.lit(1))) self.spark.udtf.register("test_udtf", sfunc) self.connect.udtf.register("test_udtf", cfunc) query = "select * from test_udtf(1, 2)" assertDataFrameEqual(self.spark.sql(query), self.connect.sql(query)) def test_pandas_udf_import(self): self.assert_eq(getattr(CF, "pandas_udf"), getattr(SF, "pandas_udf")) def test_function_parity(self): # This test compares the available list of functions in pyspark.sql.functions with those # available in the Spark Connect Python Client in pyspark.sql.connect.functions sf_fn = {name for (name, value) in getmembers(SF, isfunction) if name[0] != "_"} cf_fn = {name for (name, value) in getmembers(CF, isfunction) if name[0] != "_"} # Functions in classic PySpark we do not expect to be available in Spark Connect sf_excluded_fn = set() self.assertEqual( sf_fn - cf_fn, sf_excluded_fn, "Missing functions in Spark Connect not as expected", ) # Functions in Spark Connect we do not expect to be available in classic PySpark cf_excluded_fn = set() self.assertEqual( cf_fn - sf_fn, cf_excluded_fn, "Missing functions in classic PySpark not as expected", ) # SPARK-45216: Fix non-deterministic seeded Dataset APIs def test_non_deterministic_with_seed(self): df = self.connect.createDataFrame([([*range(0, 10, 1)],)], ["a"]) r = CF.rand() r2 = CF.randn() r3 = CF.shuffle("a") res = df.select(r, r, r2, r2, r3, r3).collect() for i in range(3): self.assertEqual(res[0][i * 2], res[0][i * 2 + 1]) if __name__ == "__main__": from pyspark.testing import main main()