Hello, if you have any need, please feel free to consult us, this is my wechat: wx91due
CIS 5450 Homework 3: Spark SQL
Deadline: Tuesday, 15th October, 2024 @ 11:59pm EST
Worth 100 points
Welcome to CIS 5450 Homework 3! In this homework you will gain a mastery of using Spark SQL. By the end, you'll be a star (not that you aren't already one). Over the next few days you will be using an EMR cluster to use Spark to manipulate the datasets about Amazon products and their reviews.
The Necessary Notes and Nags
Before we begin here are some important notes to keep in mind,
- IMPORTANT! I said it twice, it's really important. In this homework, we will be using AWS resources. You are given a quota ($100) to use for the entirety of the homework. There is a small chance you will use all this money, however it is important that at the end of every session, you shut down your EMR cluster.
-
Be sure you use Google Colab for this Homework since we must connect to the EMR cluster and local Jupyter will have issues doing that. Using a Google Colab Notebook with an EMR cluster has two important abnormalities:
- The first line of any cell in which you will use the spark session must be %%spark. Notice that all cells below have this.
- You will, unfortunately, not be able to stop a cell while it is running. If you wish to do so, you will need to restart your cluster. See the Setup EMR Document for reference.
- You are required to use Spark SQL queries to handle the data in the assignment. Mastering SQL is more beneficial than being able to use Spark commands (functions) as it will show up in more areas of programming and data science/analytics than just Spark. Use the following function list to see all the SQL functions avaliable in Spark.
- Throughout the homework you will be manipulating Spark dataframes (sdfs).
- Based on the challenges you've faced in the previous homework, we are including information on the expected schema of your results. Apache Spark is very fiddly but we hope this will help.
- There are portions of this homework that are very hard. We urge you start early to come to office hours and get help if you get stuck. But don't worry, I can see the future, and you all got this.
With that said, let's dive in.
Step 0: Set up EMR
Follow the AWS Academy Getting Started instructions.
Move on to Step 0.1 after you have completed all the steps in the document.
0.2: The Sharp Spark
Now, connect your notebook to the EMR cluster you created. In the first cell, copy the link to the Master Public DNS specified in the setup document. You will need to add http:// to the beginning of the address and the auth details to the end.
For example, if my DNS (directly from the AWS EMR console) is ec2-3-15-237-211.us-east-2.compute.amazonaws.com my address would be,
http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com -a cis545-livy -p password1 -t Basic_Access
Insert this in the # TODO # below. For our example, the cell would read,
Enter your 8-digit Penn Key as an integer in the cell below. This will be used in the autograder. Please also update the cell below, with the same ID!
Run the above cells to setup the autograder. Make sure to have set your 8 digit Penn ID in the cell above. It will also import all the modules you need for the homework.
Step 1: Data Wrangling, Cleaning, and Shaping [32 pts]
In this homework we will be working with two datasets - (1) LinkedIn data containing information on their users like education, experience, industry etc. (2) Stock price information of companies over a 10 year period (2000-2011) where these users have worked at.
The data you will use is stored in an S3 bucket, a cloud storage service. Below, with our help, you will download it onto the nodes of your EMR cluster.
1.1: The Stupendous Schema
When loading data, Spark will try to infer its structure on its own. This process is faulty because Spark will sometimes infer the type incorrectly. Spark's ability to determine types is not reliable, thus you will need to define a schema for both the LinkedIn and Stock Prices Datasets.
A schema is a description of the structure of data. In Spark, schemas are defined using a StructType object. This is a collection of data types, termed StructField's, that specify the structure and variable type of each component of the dataset. For example, suppose we have the following simple JSON object,
{ "student_name": "Alpha Beta", "GPA": 3.6, "courses": [ {"department": "Computer and Information Science", "course_id": "CIS 5450", "semester": "Fall 2021"}, {"department": "Computer and Information Science", "course_id": "CIS 5550", "semester": "Fall 2021"} ], "grad_year": 2023 }We would define its schema as follows,
schema = StructType([ StructField("student_name", StringType(), nullable=True), StructField("GPA", FloatType(), nullable=True), StructField("courses", ArrayType( StructType([ StructField("department", StringType(), nullable=True), StructField("course_id", StringType(), nullable=True), StructField("semester", StringType(), nullable=True) ]) ), nullable=True), StructField("grad_year", IntegerType(), nullable=True) ])Each StructField has the following structure: (name, type, nullable). The nullable flag defines that the specified field may be empty. Your first task is to define the schema of linkedin_small_real.json. A smaller version of the JSON dataset can be found here. Look at how this json dataset looks, the fields and their types.
You will now be defining an explicit schema for the linkedin_small_real.json dataset. We have defined most of the fields so you can compare how the schema looks with the JSON dataset. Your Task will be to define the schema for the following 3 fields - name, experience, events.
Make sure to use nullable=True for all the fields as well as store dates as a StringType().
Note: There is also no grading cell for this step. But your JSON file won't load if it's wrong, so you have a way of testing.
1.2.1: Load LinkedIn Dataset [4 Pts]
In the following cell, you will load the linkedin_small_real.json dataset from your S3 bucket into a Spark dataframe (sdf) called linkedin_data_sdf. If you have constructed schema correctly then spark.read.json() will read in the dataset. You do not need to edit this cell.
If this doesn't work, go back to the prior cell and update your schema!
Note that the cell below will load data even if your schema is incomplete and has left out some columns of the data, so be sure to check that you have included all of the fields from the JSON.
The cell below shows how to run SQL commands on Spark tables. Use this as a template for all your SQL queries in this notebook.
For almost all the questions you will need to create a temporary view using createOrReplaceTempView, then write your sql query as a string and then run the query on spark using spark.sql(query). To see what your query resulted use .show().
You do not need to edit this cell.
We can then copy the answer_sdf to Colab to submit to PennGrader...
Note: double check all the data types of the fields defined for your schema in 1.1. Do not just assume that everything is a string!
In the next cell, we want you to create industry_family_name_df to fetch the data from the linkedin_data table created above, returning rows with schema (_id, industry, family_name). Remove all NULLs from the family_name and industry columns. Sort the columns by _id, industry, family_name, all ascending order. Limit your sdf to 100 rows. Make sure to use the SQL template provided above.
Hint: To access a field inside a json object in SQL you can use the following syntax: parent_variable.child_variable
1.2.3: Load Stock Prices Data [3 Pts]
Just the way you created a schema for the LinkedIn dataset, now create a schema for the Stock Prices data. The schema should be relatively simple, compared to the LinkedIn schema. A tiny version of the data is here in csv format, so you can see what the types should be for the different fields (columns in the csv). Store the Date field as a String.
In the next cell, we want you to display the percentage change in the daily stock prices for each organization. In order to do so, we will need the data from the stocks_sdf table created above. Create a new column called percentage_change that uses the opening and closing stock prices for each organization, for each day, and calculates the percentage change in the stock price as follows:
In order to avoid nulls, calculate the percentage change for only for those organizations and days where the opening price is NOT 0.0. The percentage_change value is a float.
Your final dataframe should include all columns of the original stocks_sdf, as well as the new column you create called percentage_change. Sort the rows by Date and org ascending, in that order.
In this part, we are interested in when individuals began working at a particular company. When you created the schema, you might have noticed that the collection of companies an invidual worked at are contained in the experience field as an array of dictionaries. The company name is stored in the org field, and the start date is in the start field. Here is an example of an experience field:
{ "experience": [ { "org": "The Walt Disney Company", "title" : "Mickey Mouse", "end" : "Present", "start": "November 1928", "desc": "Sailed a boat." }, { "org": "Walt Disney World Resort", "title": "Mickey Mouse Mascot", "start": "January 2005", "desc": "Took pictures with kids." } ] }Your task is to extract each pair of company and start date from these arrays. This is known as "exploding" a row in Spark. If you think about how we used relational data to model a nested list in a separate table -- that's basically what an explode does to the nested data within linkedin.
Create an sdf called raw_start_dates_sdf that contains the company and start date for every experience of every individual in linkedin_data_sdf. Drop any row that contains a null in either column.
+--------------------------+---------------+ |org |start_date | +--------------------------+---------------+ |Walt Disney World Resort |January 2005 | |The Walt Disney Company |November 1928 | |... |... | +--------------------------+---------------+_Hint_: Reference the function list.
_Note_: Some of the entires in org are "weird", i.e. made up of non-english letters and characters. Keep them. DO NOT edit any name in the original dataframe unless we specify. DO NOT drop any row unless there is a null value as stated before. This goes for the rest of the homework as well, unless otherwise specified.
There are two issues with the values in our start_date column. First, the values are saved as strings, not datetime types. This halts us from running functions such as ORDER BY or GROUP BY on common months or years. Second, some values do not have both month and year information or are in other languages. Your task is to filter out and clean the start_date column. We are interested in only those rows that have date in the following format "(month_name) (year)", e.g. "October 2010".
Using raw_start_dates_sdf, create an sdf called filtered_start_dates_sdf with the start_date column filtered in the manner above. Keep only those rows with a start date between January 2000 ('2000-01-01') to December 2011 ('2011-12-01'), inclusive. Ensure that any dates that are not in our desired format are omitted. Drop any row that contains a null in either column. The format of the sdf is shown below:
+--------------------------+---------------+ |org |start_date | +--------------------------+---------------+ |Walt Disney World Resort |2005-01-01 | |... |... | +--------------------------+---------------+_Hint_: Refer to the function list to format the start_date column. In Spark SQL the date format we are interested in is "MMM y".
_Note_: Spark will return the date in the format above, with the day as 01. This is ok, since we are interested in the month and year each individual began working and all dates will have 01 as their day.
In part 4 of this homework, we have to merge the stocks and linkedin dataframes. This would be difficult to do directly, as the companies in our stock dataset are defined by their stock tickers instead of the full names. Thus, we would not be able to merge it with the org field in hire_train_sdf. We must convert them to that format. For this purpose, we can create a user-defined function (udf) to achieve the mentioned conversion.
A udf is defined as a normal Python function and then registered to be used as a Spark SQL function. Your task is to create a udf, TICKER_TO_NAME() that will convert the ticker field in raw_stocks to the company's name. This will be done using the provided ticker_to_name_dict dictionary. We are only interested in the companies in that dictionary. If the ticker value isn't in the table, set it to a string equal to "None". Do not hardcode this.
Fill out the function ticker_to_name() below. Then use spark.udf.register() to register it as a SQL function. The command is provided. You do not need to edit it. Note, we have defined the udf as returning StringType(). Ensure that your function returns this. You must also deal with any potential null cases.
We can now begin to wrangle stocks_sdf with our new TICKER_TO_NAME() function.
Create an sdf called filter_1_stocks_sdf as follows. Convert all the ticker names in stocks_sdf to the company names and save it as org. Next, convert the date field to a datetime type. As explained before this will help order and group the rows in future steps.
Drop any company names that do not appear in ticker_to_name_dict. Keep any date between January 1st 2001 ('2001-01-01') and December 4th 2012 ('2012-12-04') inclusive, in the format shown below (note this is a datetime object not a string):
+----+------------+--------------+ |org |date |Close | +----+------------+--------------+ |IBM |2000-01-03 |... | |... |... |... | +----+------------+--------------+_Hint_: You will use a similar function to filter the dates as in Step 1.3.2. In Spark SQL the format for the date field in stocks_sdf is "yyyy-MM-dd".
Type Markdown and LaTeX:
Now we would like to find for each company, the number of individuals who started in the same month and year. Use filtered_start_dates_sdf and create a new sdf called start_dates_sdf which will contain the total number of employees who began working at the same company on the same start date (name the new column as num_employees). The format of the sdf is shown below:
+--------------------------+---------------+---------------+ |org |start_date |num_employees | +--------------------------+---------------+---------------+ |Walt Disney World Resort |2005-01-01 |1 | |... |... |... | +--------------------------+---------------+---------------+Our next step is to use start_dates_sdf and create a new sdf called raw_hire_train_sdf that has for a single company and a single year, the number of hires in Jan through Dec, as well as the total number of hires that year (name it total_num). Note that for each company you will have several rows corresponding to years between 2000 and 2011. It is alright if for a given company you don't have a given year. However, ensure that for a given company and given year, each month column has an entry, i.e. if no one was hired the value should be 0.
_Note_: We will use the first three letters of each month in naming, i.e. jan, feb, mar, apr, may, jun, jul, aug, sep, oct, nov, dec.
The format of the raw_hire_train_sdf is shown below:
+----+-----+----------+---------+----------+----------+ |org |year |jan_hired | ... |dec_hired |total_num | +----+-----+----------+---------+----------+----------+ |IBM |2008 |... | ... |... |... | |IBM |2009 |... | ... |... |... | |... |... |... | ... |... |... | +----+-----+----------+---------+----------+----------+Create an sdf called hire_train_sdf that contains all the observations in raw_hire_train_sdf with total_num greater than or equal to 20.
The data in filter_1_stocks_sdf gives closing prices on a daily basis. Since we are interested in monthly trends, we will only keep the average of the closing price of each month per year for each org.
Create an sdf filter_2_stocks_sdf that contains only the average of closing prices for each month-year pair sorted by the org alphabetically and then month, year from earliest to latest with the closing price rounded off to 3 decimal places. The format of the sdf is shown below:
+----+------------+--------------+--------------+ |org |month |year |close | +----+------------+--------------+--------------+ |IBM |01 |2000 |... | |... |... |... |... | +----+------------+--------------+--------------+Hint: Think about what sorting things chronologically from earliest to latest means. Is January 2006, January 2007, ... January 2012, February 2006, February 2007, ... really in chronological order? Does January 2007 actually come before February 2006?
Now, we will begin to shape our dataframe into the format of the final training sdf.
Create an sdf filter_3_stocks_sdf that has for a single company and a single year, the average stock price for each month in that year. This is similar to the table you created in Step 3.1. If the data is not avaliable, drop any rows containing any null values, in any column. The format of the sdf is shown below:
+----+-----+----------+---------+----------+ |org |year |jan_stock | ... |dec_stock | +----+-----+----------+---------+----------+ |IBM |2008 |... | ... |... | |IBM |2009 |... | ... |... | |... |... |... | ... |... | +----+-----+----------+---------+----------+This is an optional section, your final score in this homework will not be affected by your performance here. Sample working code is already provided.
However, make sure to run it as we will need to use the results later!
The final element in our training set is the binary output for each case, i.e. the y label.
Create an sdf stocks_train_sdf from filter_3_stocks_sdf with an additional column direction. This should be the direction of percentage change in the closing stock price, i.e. 1 for positive or -1 for negative, from the first month of a given year to the last month of the given year. Make this an integer. The year begins in January and ends in December, inclusive. The format of the sdf is shown below:
+----+-----+----------+---------+----------+-------------+ |org |year |jan_stock | ... |dec_stock |direction | +----+-----+----------+---------+----------+-------------+ |IBM |2008 |... | ... |... |1 | |IBM |2009 |... | ... |... |-1 | |... |... |... | ... |... |... | +----+-----+----------+---------+----------+-------------+While just writing code can be fun. PySpark involves the interaction of standard python with spark and may result in huge, hard to parse error logs. The goal of this section is to give you practice in debugging these error logs
This section calculates the average closing price and volume for each Month, Year combination, considering seasonality's impact on the data. We will explore various data exploration and transformation techniques, including trend analysis, seasonality identification, lag variables, and differencing. These techniques will be introduced in this section and further explored in subsequent sections using Spark.
Some code here will throw errors and your job is to fix them!
Common Errors with UDFs
In this part, we attempt to find the magnitude (absolute value) of the monthly difference column we obtained from the previous section. As we saw in a previous section, a UDF is a good way of doing such operations in spark. Here we have a simple udf where we attempt to use python's well known abs() function to identify the magnitude. However, there are a couple of bugs in this code. Can you find them?
Now that we have individually created the two halves of our training data we will merge them together to create the combined final training sdf.
Create an sdf called training_sdf in the format of the one shown at the beginning of Step 3. Note that in our definition for the stock_result column, the stock_result value for a particular year corresponds to the direction of the stock percentage change in the following year. For example, the stock_result in the 2008 row for IBM will contain the direction of IBM's stock in the year 2009. For the final training dataframe, we only need the entries for the companies where both hiring and stock data are available for the particular year. The format of the sdf is shown below:
+----+-----+----------+---------+----------+----------+---------+----------+-------------+ |org |year |jan_hired | ... |dec_hired |jan_stock | ... |dec_stock |stock_result | +----+-----+----------+---------+----------+----------+---------+----------+-------------+ |IBM |2008 |... | ... |... |... | ... |... |-1 | |IBM |2009 |... | ... |... |... | ... |... |1 | |... |... |... | ... |... |... | ... |... |... | +----+-----+----------+---------+----------+----------+---------+----------+-------------+Note: For a given company, if there is no data for a year X, you can omit the year X-1 from the corresponding solution dataframe, as there exists no data for the following year. You should be using hire_train_sdf
Step 5: Twitter followers [27 Pts]
In this section, we will be working with a dummy dataset representing connections on twitter, where each row consists of a person A, and a person B, such that person A follows person B.
Let's introduce a fun little concept called the Bacon Number! The Bacon number of an actor or actress is the number of degrees of separation they have from actor Kevin Bacon, as defined by the game known as Six Degrees of Kevin Bacon. For example, Kevin Bacon's Bacon number is 0. If an actor works in a movie with Kevin Bacon, the actor's Bacon number is 1. If an actor works with an actor who worked with Kevin Bacon in a movie, the first actor's Bacon number is 2, and so forth.
How do we implement the "Super Bacon" for our dataset though? We define this number as follows: if person A follows person B, and person B follows person C, then the super bacon of C with respect to A will be 2.
Now to calculate this number, we'll use the concepts of graphs and BFS!
5.1.1 Intro to Distributed Breadth-First Search
To start off, we will be implementing a graph traversal algorithm known as Breadth First Search. It works in a way that's equivalent to how a stain spreads on a white t-shirt. Take a look at the graph below:
- Consider starting BFS from point A (green). This is considered the starting frontier/singular origin node.
- The first round of BFS would involve finding all the nodes directly reachable from A, namely B-F (blue circles). These blue nodes make up the next frontier at depth 1 away from our starting node A.
- The second round would then be identifying the red nodes which are the neighbors of the blue nodes. Now, the red nodes all belong to a frontier 2 depth away from A. Note that node A is also a neighbor of a blue node. However, since it has already been visited, it does not get added to this frontier.
This process continues until all the nodes in the graph have been visited. If you would like to learn more about BFS, we highly suggest looking here.
We will now be implementing spark_bfs(G, N, d), our spark flavor of BFS that takes a graph G, a set of origin nodes N, and a max depth d.
In order to write a successful BFS function, you are going to need to figure out
- how to keep track of nodes that we have visited
- how to properly find all the nodes at the next depth
- how to avoid cycles and ensure that we do not constantly loop through the same edges (take a look at J-K in the graph)
5.1.2 Implement One Traversal [6 Pts]
To break down this process, let's think about how we would implement a single traversal of the graph. That is given the green node in the graph above, how are we going to get the blue nodes?
Consider the simple graph below which is different from the graph in the image above:
Now, run the inner function on simple_1_round_bfs_sdf (i.e. result of 1 round of BFS on the simple graph) and store the results in simple_bfs_result. This is ultimately what the output of BFS to depth 2 should look like. Create your temporary sdf G and then call the spark_bfs_1_round function you created above.
5.1.3 Full BFS Implementation
Now, we will fully implement spark_bfs. This function should iteratively call your implemented version of spark_bfs_1_round and ultimately return the output of this function at max_depth.
You are also responsible for initializing the starting dataframe, that is converting the list of origin nodes into a spark dataframe with the nodes logged at distance 0.
Consider the following:
schema = StructType([ StructField("node", StringType(), True) ]) my_sdf = spark.createDataFrame(origins, schema)The schema ultimately specifies the structure of the Spark DataFrame with a string node column. It then calls spark.createDataFrame to map this schema to the origins nodes. Also, you are responsible for ensuring that a view of your graph is available within this function. (Note: you will also need to add in a distance column)
TODO: implement spark_bfs(G,origins,max_depth). In the coming sections, you will run this function on twitter_sdf that you will initialize in 5.2. Note: you may want to run tests on the simple_graph example as the twitter_sdf will take quite some time to run.
Also remember that in section 5.1.2, we had assumed that the temporary view already exists. So you will have to take care of creating that within this function.
These imports might be useful: from pyspark.sql.types import StructType, StructField, StringType, IntegerType
As we can see, the dataframe consists of 2 columns named User1 and User2. These columns indicate an edge in the social network graph formed when people follow each other on twitter. Each row in the dataframe indicates that user1 follows user2. For example, the 0 indexed row has James as User1 and Ashley as User2, which indicates that James follows Ashley.
Let's take a look at which people were the most popular in terms of the number of followers they had. TODO: Count the number of followers each person (Consider only those people who have atleast one follower. i.e. Those whose name exists in the User2 column). Return a dataframe with the schema (Name,Followers). Order your dataframe by Followers in descending order.
Now that we have the Twitter dataset, let's convert it to a graph sdf just like the one we had in section 5.1 (P.S. it's not as hard as it sounds).
TODO: Create twitter_graph_sdf that has the columns from_node and to_node. from_node has all the entries from the User1 column and to_node has all the entries from the User2 column.
In the previous section, we found out that Patricia and Karen have been the most popular people for tour dataset. So let's find out the Super Bacons with respect to Karen.
HW Submission
Before you submit on Gradescope (you must submit your notebook to receive credit):
- Restart and Run-All to make sure there's nothing wrong with your notebook
- Double check that you have the correct PennID (all numbers) in the autograder.
- Make sure you've run all the PennGrader cells
- Go to the "File" tab at the top left, and click "Download .ipynb" and then "Download .py". Rename the files to "homework3.ipynb" and "homework3.py" respectively and upload them to Gradescope
Let the course staff know ASAP if you have any issues submitting, but otherwise best of luck!