PySpark SQL Exercises

1. Select Unique Records

from pyspark.sql import SparkSession

# Sample data
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Alice", 25)]
df = spark.createDataFrame(data, ["id", "name", "age"])

df.createOrReplaceTempView("people")

# Select distinct names
unique_names_df = spark.sql("SELECT DISTINCT name FROM people")

# Show result
unique_names_df.show()
    

2. Count Records

# Count the number of rows in the DataFrame
count_df = spark.sql("SELECT COUNT(*) as total_count FROM people")

# Show result
count_df.show()
    

3. Group By and Count

# Group by name and count occurrences
group_by_count_df = spark.sql("SELECT name, COUNT(*) as name_count FROM people GROUP BY name")

# Show result
group_by_count_df.show()
    

4. Group By with Aggregation (SUM)

# Sample data
data = [("Alice", 1000), ("Bob", 1500), ("Alice", 2000)]
df2 = spark.createDataFrame(data, ["name", "salary"])

df2.createOrReplaceTempView("salaries")

# Group by name and sum salaries
sum_salaries_df = spark.sql("SELECT name, SUM(salary) as total_salary FROM salaries GROUP BY name")

# Show result
sum_salaries_df.show()
    

5. Group By with Aggregation (Average)

# Group by name and calculate average salary
avg_salaries_df = spark.sql("SELECT name, AVG(salary) as avg_salary FROM salaries GROUP BY name")

# Show result
avg_salaries_df.show()
    

6. Filter Records with WHERE Clause

# Filter records where salary is greater than 1200
filter_df = spark.sql("SELECT * FROM salaries WHERE salary > 1200")

# Show result
filter_df.show()
    

7. Order Records by a Column

# Order records by salary in descending order
order_by_df = spark.sql("SELECT * FROM salaries ORDER BY salary DESC")

# Show result
order_by_df.show()
    


PySpark Coding Exercises

1. Count the Number of Occurrences of Each Word

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col

# Sample data
data = [("Hello world",), ("Hello PySpark",), ("Spark is great",)]
df = spark.createDataFrame(data, ["text"])

# Split the text into words
words_df = df.select(explode(split(col("text"), " ")).alias("word"))

# Count occurrences of each word
word_count_df = words_df.groupBy("word").count()

# Show result
word_count_df.show()
    

2. Filter Data Based on a Condition

# Sample data
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)]
df = spark.createDataFrame(data, ["id", "name", "age"])

# Filter rows where age > 25
filtered_df = df.filter(col("age") > 25)

# Show result
filtered_df.show()
    

3. Join Two DataFrames

# Sample data for DataFrame 1
data1 = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
df1 = spark.createDataFrame(data1, ["id", "name"])

# Sample data for DataFrame 2
data2 = [(1, "HR"), (2, "Engineering"), (4, "Marketing")]
df2 = spark.createDataFrame(data2, ["id", "department"])

# Inner join on 'id'
joined_df = df1.join(df2, on="id", how="inner")

# Show result
joined_df.show()
    

4. Group By and Aggregate Data

from pyspark.sql.functions import avg

# Sample data
data = [("Alice", "HR", 25), ("Bob", "Engineering", 30), ("Cathy", "HR", 28)]
df = spark.createDataFrame(data, ["name", "department", "age"])

# Group by department and calculate average age
avg_age_df = df.groupBy("department").agg(avg("age").alias("avg_age"))

# Show result
avg_age_df.show()
    

5. Create a UDF (User Defined Function)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Sample data
data = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
df = spark.createDataFrame(data, ["id", "name"])

# Define a UDF to add a prefix to a name
def add_prefix(name):
    return "Mr./Ms. " + name

add_prefix_udf = udf(add_prefix, StringType())

# Apply the UDF
df_with_prefix = df.withColumn("name_with_prefix", add_prefix_udf(col("name")))

# Show result
df_with_prefix.show()
    

6. Handling Missing Data

# Sample data with missing values
data = [(1, "Alice", 25), (2, "Bob", None), (3, "Cathy", 28)]
df = spark.createDataFrame(data, ["id", "name", "age"])

# Fill missing values in 'age' with a default value of 0
filled_df = df.na.fill({"age": 0})

# Show result
filled_df.show()
    

7. Write Data to a CSV File

# Sample data
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)]
df = spark.createDataFrame(data, ["id", "name", "age"])

# Write DataFrame to CSV
df.write.csv("/path/to/output", header=True)
    

PySpark Read Write SQL

Introduction

In this guide, we will deep dive into how to manage data lakes on Databricks using SQL within PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

Loading Data into Data Lake

Let's start by loading data into a data lake using PySpark SQL queries.

spark.sql("SELECT * FROM '/mnt/data/sample.csv'").show()
    

Data Transformation

After loading data into the data lake, you can perform transformations using SQL queries in PySpark. For example, let's perform a group by operation.

spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category").show()
    

Writing Data Back to Data Lake

Once you have processed the data, you can write it back to your data lake in various formats such as Parquet using PySpark.

df_grouped.write.format("parquet").save("/mnt/data/output/")
    

PySpark SQL - Find Duplicate Records

This code finds duplicate records in the data based on a specific column using SQL commands in PySpark.

spark.sql("SELECT email, COUNT(email) FROM customers GROUP BY email HAVING COUNT(email) > 1").show()
    

PySpark SQL - Top Categories by Price

This code retrieves the top categories by the total price using SQL queries in PySpark.

spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category ORDER BY SUM(price) DESC LIMIT 10").show()
    


PySpark Read Write DataFrame

Introduction

In this guide, we will deep dive into how to manage data lakes on Databricks using PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

Loading Data into Data Lake

Let's start by loading data into a data lake using PySpark on Databricks.

df = spark.read.format("csv").option("header", "true").load("/mnt/data/sample.csv")
df.show()
    

Data Transformation

After loading data into the data lake, you can perform transformations using PySpark. For example, let's perform a group by operation.

df_grouped = df.groupBy("category").sum("price")
df_grouped.show()
    

Writing Data Back to Data Lake

Once you have processed the data, you can write it back to your data lake in various formats such as Parquet or Delta.

df_grouped.write.format("parquet").save("/mnt/data/output/")
    

PySpark - Find Duplicate Records

This code finds duplicate records in the data based on a specific column.

df_duplicates = df.groupBy("email").count().filter("count > 1")
df_duplicates.show()
    

PySpark - Top Categories by Price

This code retrieves the top categories by the total price, similar to how you might use SQL's GROUP BY.

df_top_categories = df.groupBy("category").sum("price").orderBy("sum(price)", ascending=False)
df_top_categories.show(10)