TDM 20200: Project 9 — 2024

Motivation: Spark uses a distributed computing model to process data, which means that data is processed in parallel across a cluster of machines. PySpark is a Spark API that allows you to interact with Spark through the Python shell, or in Jupyter Lab, or in DataBricks, etc. PySpark provides a way to access Spark’s framework using Python. It combines the simplicity of Python with the power of Apache Spark.

Context: This is the second project in which we will continue to understand components of Spark’s ecosystem that PySpark can use

Scope: Python, Spark SQL, Spark Streaming

Learning Objectives
  • Develop skills and techniques to use PySpark to read a dataset, perform transformations like filtering, mapping and execute actions like count, collect

  • Understand how to use Spark Streaming

Dataset(s)

The following questions will use the following dataset:

/anvil/projects/tdm/data/amazon/amazon_fine_food_reviews.csv

Readings and Resources

You need to use 2 cores for your Jupyter Lab session for Project 9 this week.

Questions

Question 1 (2 points)

  1. Create a PySpark session, and then load the dataset using PySpark.

  2. Calculate the average Score for the reviews, grouped by ProductId. (There are 74258 ProductId values, so you do not need to display them all. If you show() the results, only 20 of the 74258 ProductId values and their average Score values will appear. That is OK for the purposes of this question.)

  3. Save the output for all 74258 ProductId values and their average Score values to a file named averageScores.csv.

You may import the following modules:

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

While reading the csv file into a data frame, you may need to specify the option that tells PySpark that there are headers. Otherwise, the header will be treated as part of the data itself.

read.option("header","true")

You may use the following option to make the column names accessible as DataFrame attributes.

option("inferSchema","true")

After all the operations are complete, you may need to close the SparkSession.

spark.stop()

A PySpark DataFrame’s write() method is useful to write the results into a file. Here we give sample code that describes how to write a csv file to the current directory.

someDF.write.csv("file.csv",header= True)

It is not necessary to submit the file with the project solutions.

Question 2 (2 points)

  1. Use PySpark SQL to calculate the average helpfulness ratio (HelpfulnessNumerator/HelpfulnessDenominator) for each product.

  2. Save the output for all 74258 ProductId values and their average helpfulness ratio values to a file named averageHelpfulness.csv.

  • You may need to use filter() to exclude rows with zeros in the column HelpfulnessDenominator, as follows:

filteredDF = myDF.filter(col("HelpfulnessDenominator")>0)

The withColumn() is useful for adding a new column to a DataFrame. For instance, in this example, the first argument is the new column, and the second argument specifies how the values of the new column value are to be created.

filteredDF.withColumn("HelpfulnessRatio",col("HelpfulnessNumerator") / col("HelpfulnessDenominator"))

A few more notes:

  • groupBy('ProductId') will perform the aggregation for each product

  • agg() is useful for performing aggregation operations on the grouped data. It can take different kinds of aggregations as its argument, for instance, avg, max, min etc.

  • Refer to .withColumn

It is not necessary to submit the file with the project solutions.

Question 3 (2 points)

In questions 1 and 2, we used the batch processing mode to do the data processing. In other words, the dataset is processed in one go. Alternatively, we can use Spark Streaming concepts. This technique would allow us to even work on a data set in which the data is being provided in a real-time data stream. (In this project, we are just working with a fixed data set, but we still want students to know how to work with streaming data.)

  1. Please count the number of reviews for each ProductId, in a streaming style (simulating a real-time data monitoring and analytics).

  2. Display the results from 20 rows of the output.

  • To simplify the data processing, we will use the directory /anvil/projects/tdm/data/amazon/spark (which has a copy of the csv file in this directory)

  • You may refer to the following statements to get the source directory for the dataset

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Create a PySpark session
spark = SparkSession.builder.appName("Amazon Fine Food Reviews Streaming").getOrCreate()

data_path = "/anvil/projects/tdm/data/amazon/spark/"
myschema = spark.read.option("header", "true").option("inferSchema", "true").csv(data_path)
streamingDF = spark.readStream.schema(myschema.schema).option("header", "true").csv(data_path)

You may use a start() method on the query to start the streaming computation. You may also an awaitTermination() method, to keep the application running indefinitely (until manually stopped, or until an error occurs). This will allow Spark to continuously process incoming data.

  • You may need to restart the kernel if you make a new Spark session.

Question 4 (2 points)

Use a streaming session like you did in Question 3.

  1. Display the ProductId values and Score values for the first 20 rows in which the Score is strictly larger than 3. Output these values to the screen as the new data arrives in the streaming session.

Filtering streaming data for reviews with a score strictly greater than 3 is a straightforward operation. You may use a filter condition on the streaming DataFrame, for instance, like this

.select("ProductId","Score").where("Score >= 3")

It is also necessary to remove the .outputMode("complete") because we are no longer aggregating results from a complete stream. Instead, we are just outputting first 20 results that satisfy the given criteria that the Score is strictly larger than 3.

Question 5 (2 points)

  1. Please state your understanding of PySpark streaming concepts in 2 or more sentences.

Project 09 Assignment Checklist

  • Jupyter Lab notebook with your code, comments and outputs for the assignment

    • firstname-lastname-project09.ipynb

  • Python file with code and comments for the assignment

    • firstname-lastname-project09.py

  • Submit files through Gradescope

Please make sure to double check that your submission is complete, and contains all of your code and output before submitting. If you are on a spotty internet connection, it is recommended to download your submission after submitting it to make sure what you think you submitted, was what you actually submitted.

In addition, please review our submission guidelines before submitting your project.