Functions Examples! 💕¶
🚀 Extend your pyspark powers with pyspark+
Setup [Optional]¶
Create a virtualenv 🔧
Create a new virtualenv before start this notebook to be able to select it as the kernel, if you want!- Create a new virtualenv.
pyenv virtualenv 3.9.16 .envPysparkPlus
pyenv activate .envPysparkPlus
pip install --upgrade pip
pip install ipykernel
Delete the virtualenv.
pyenv deactivate .envPysparkPlus pyenv virtualenv-delete -f .envPysparkPlus
Should return empty
pyenv versions | grep .envPysparkPlus
Required! 💢¶
In [1]:
Copied!
pip install pysparkplus
pip install pysparkplus
Collecting pysparkplus Downloading pysparkplus-0.0.3-py3-none-any.whl (4.0 kB) Collecting pyspark<4.0.0,>=3.4.0 (from pysparkplus) Using cached pyspark-3.4.0-py2.py3-none-any.whl Collecting strplus<2.0.0,>=1.0.6 (from pysparkplus) Downloading strplus-1.0.8-py3-none-any.whl (9.8 kB) Collecting py4j==0.10.9.7 (from pyspark<4.0.0,>=3.4.0->pysparkplus) Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB) Installing collected packages: py4j, strplus, pyspark, pysparkplus Successfully installed py4j-0.10.9.7 pyspark-3.4.0 pysparkplus-0.0.3 strplus-1.0.8 Note: you may need to restart the kernel to use updated packages.
In [2]:
Copied!
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("testPysparkPlus").getOrCreate()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("testPysparkPlus").getOrCreate()
your 131072x1 screen size is bogus. expect trouble 23/04/23 17:26:48 WARN Utils: Your hostname, DESKTOP-O03M3NM resolves to a loopback address: 127.0.1.1; using 172.17.155.166 instead (on interface eth0) 23/04/23 17:26:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/04/23 17:26:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Simple deduplicate!¶
In [3]:
Copied!
from pysparkplus.functions import deduplicate
from pysparkplus.functions import deduplicate
In [4]:
Copied!
df = spark.createDataFrame([{"name":"Rose"}, {"name":"Rose"}])
df.show()
df = spark.createDataFrame([{"name":"Rose"}, {"name":"Rose"}])
df.show()
+----+ |name| +----+ |Rose| |Rose| +----+
In [5]:
Copied!
df_dedup = deduplicate(df, by_columns="name")
df_dedup.show()
df_dedup = deduplicate(df, by_columns="name")
df_dedup.show()
[Stage 3:============================================> (6 + 2) / 8]
+----+ |name| +----+ |Rose| +----+
In [6]:
Copied!
df_two_cols = spark.createDataFrame([{"name":"Rose", "age":10}, {"name":"Rose", "age":5}])
df_two_cols.show()
df_two_cols = spark.createDataFrame([{"name":"Rose", "age":10}, {"name":"Rose", "age":5}])
df_two_cols.show()
+---+----+ |age|name| +---+----+ | 10|Rose| | 5|Rose| +---+----+
In [7]:
Copied!
df_two_dedup = deduplicate(df_two_cols, by_columns="name")
df_two_dedup.show()
df_two_dedup = deduplicate(df_two_cols, by_columns="name")
df_two_dedup.show()
+---+----+ |age|name| +---+----+ | 10|Rose| +---+----+
In [8]:
Copied!
df_two_dedup = deduplicate(df_two_cols, by_columns="name", order_by="age")
df_two_dedup.show()
df_two_dedup = deduplicate(df_two_cols, by_columns="name", order_by="age")
df_two_dedup.show()
+---+----+ |age|name| +---+----+ | 10|Rose| +---+----+
In [9]:
Copied!
df = spark.createDataFrame([(1, "a"), (2, "b"), (1, "a"), (3, "c")], ["col1", "col2"])
df.show()
df = spark.createDataFrame([(1, "a"), (2, "b"), (1, "a"), (3, "c")], ["col1", "col2"])
df.show()
+----+----+ |col1|col2| +----+----+ | 1| a| | 2| b| | 1| a| | 3| c| +----+----+
In [10]:
Copied!
df_dedup = deduplicate(df, "col1")
df_dedup.show()
df_dedup = deduplicate(df, "col1")
df_dedup.show()
+----+----+ |col1|col2| +----+----+ | 1| a| | 2| b| | 3| c| +----+----+
In [11]:
Copied!
df_dedup = deduplicate(df, ["col1", "col2"], order_by="col1")
df_dedup.show()
df_dedup = deduplicate(df, ["col1", "col2"], order_by="col1")
df_dedup.show()
+----+----+ |col1|col2| +----+----+ | 1| a| | 2| b| | 3| c| +----+----+
In [12]:
Copied!
### Run project local 📀
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
sys.path.insert(0, os.path.abspath(".."))
### Run project local 📀
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
sys.path.insert(0, os.path.abspath(".."))