Skip to main content Link Search Menu Expand Document (external link)

Assignment 1 Due Wednesday Feb 14, 11:00am ET

Parallelizing Python Processing with Dask

FAQ

  • When creating new EC2 instances, you need to explicitly add a new security group under Network settings. See details in Part 0: Important step. This is to ensure that your Dask workers are able to communicate with your Dask scheduler using TCP protocol.
  • Added point distribution.
  • Google Drive enforces throttling since too many people started downloading this dataset in the last minute! To address the gdown issue, simply download the dataset from the original source to your local computer. Then use scp to copy the downloaded ZIP file to your EC2 machines: scp -i vockey.pem <dataset_file_name> ubuntu@<public_IPv4_DNS_address_of_your_EC2>:~/.
  • %%time reports both the CPU time and the wall-clock time. You only need to refer to the wall time when measuring the execution time of your code.

Overview

This assignment is designed to support your in-class understanding of how distributed and parallel Python analytics tools work and get some hands-on experience in using them. You will deploy Dask.distributed as the computing framework. You will then write Python Dask code to do some basic descriptive statistics tasks on a medium-sized real-world dataset. You will also produce a short report detailing your observations and takeaways.

Learning outcomes

After completing this assignment, you should be able to:

  • Configure and deploy Dask.distributed on a Linux cluster.
  • Write basic Python applications to analyze large CSV files using Dask.distributed.
  • Understand the scalability of distributed data processing frameworks.
  • Describe how Dask.distributed works with datasets that are shared among multiple worker nodes.

Part 0: Environment setup

Like the previous assignment you will complete this assignment on AWS. Hopefully by now you have got yourself familiar with setting things up in AWS via Academy. See Assignment 0 about how to use AWS Academy to access AWS cloud resources.

In this assignment, you should create 3 EC2 VM instances of type t3.large to host a 3-node Dask cluster. We recommend creating 3 EC2 instances in one go by following the instructions from Assignment 0 (Step 1).

Important step: Change network settings by adding a new ALL TCP security group

IMPORTANT: You need to configure the Network settings of all EC2 instances slightly differently from previous practice. For A1, you need to explicitly add a new security group by allowing all TCP traffic. This is to ensure that your Dask workers are able to communicate with your Dask scheduler using TCP.

Add Security Group in Network Setting

When the three EC2 instances are up and running, you should enable password-free SSH service between all the three EC2 instances. To do so, generate a private/public key pair using:

$ ssh-keygen -t rsa

on the scheduler EC2 instance node (which we call vm1). The other EC2 instances will be assigned as workers. Then, manually copy the public key of vm1 to the authorized_key file in all the 3 instances (including vm1 and and the other 2 instances) under ~/.ssh/. To get the content of the public key saved in id_rsa.pub, do:

$ cat ~/.ssh/id_rsa.pub

Then copy the entire output of the above command to authorized_key. Make sure you do not append any newlines. Otherwise it will not work. Also note that you should NOT overwrite the existing line in authorized_key file. Otherwise you will no longer be able to login to your VM.

Once you are done with vm1, you should copy the content of id_rsa.pub in vm1 over to ~/.ssh/authorized_key in vm2 and vm3. This way, the leader node vm1 will have password-free SSH access to both itself and its follower nodes vm2-vm3.

Part 1: Software deployment

Dask.distributed

Dask is a Python library and framework for Python-based parallel computing. It comes with two sets of parallel programming libraries: one for single-machine-based, shared memory parallel computing, and the other for distributed computing that spans multiple machines. In this assignment, we will focus on the second type of Dask parallel tools based on Dask.distributed library.

Dask.distributed is a lightweight library for parallelizing Python computing to a cluster of machines. You can find a rather poor deployment tutorial in this official dask website (link), or you can just follow our more detailed version here.

Before starting Step 1, you should have already created a 3-instance EC2 cluster (see Part 0).

Step 1: Install Dask

First, install pip3 (the package installer for Python). You may skip this step on an existing EC2 instance that you used for A0, but for new EC2 instances, you need to install pip3 first.

$ sudo apt update
$ sudo apt install -y python3-pip
$ which pip3

After the installation, pip3 should appear at the following path /usr/bin/pip3.

Next, install the full Dask package:

$ pip3 install dask[complete]

The third command above, which pip3, should output something like /usr/bin/pip3 if it’s successfully installed.

pip3 by default will install everything in /home/ubuntu/.local/bin on your EC2 instance. This path, however, is not included by the environment variable $PATH, so your shell will not be able to locate the installed programs through pip3. To fix this, you will need to include the path in $PATH by running the following command:

$ source ~/.profile

Then, try which dask and it should locate the dask executable under /home/ubuntu/.local/bin/dask.

IMPORTANT: You should install Dask on all the 3 EC2 instances of your cluster.

Step 2: Install Jupyter Notebook

NOTE: Skip to Step 4 if you reuse an old EC2 instance that has Jupyter Notebook installed. If you install Notebook, you do not need to install it on all three instances.

You will be using Jupyter Notebook for development and testing. To install Jupyter Notebook:

$ pip3 install notebook

Again, you don’t need to install Jupyter Notebook on all the EC2 instances. Installing Jupyter Notebook on one instance should be sufficient. We recommend installing Notebook on vm1.

Step 3: Deploy a Jupyter Notebook server

SSH into one of your EC2 instances (say the scheduler instance vm1) using the following command:

$ ssh -i "vockey.pem" ubuntu@<public_IPv4_DNS_address_of_your_EC2_instance> -L 8000:localhost:8888

The -L option is to forward any connections to the given TCP port 8000 on your local client host to the given remote host (the specified EC2 instance of <public_IPv4_DNS_address_of_your_EC2_instance>) and port (8888), or Unix socket, on the remote side.

Once SSH’ed in, start the Jupyter Notebook server process using command jupyter notebook. You will see some output information generated by the launched Notebook server process:

To access the notebook, open this file in a browser:
    file:///home/ubuntu/.local/share/jupyter/runtime/nbserver-3233-open.html
Or copy and paste one of these URLs:
    http://localhost:8888/?token=557f06135dd4c77bc5267c88abca55db1672f741ff7631a4
 or http://127.0.0.1:8888/?token=557f06135dd4c77bc5267c88abca55db1672f741ff7631a4

On your local client machine, open a web browser and go to localhost:8000 and copy paste the token. Now you can start writing Python code using your Notebook GUI from your browser.

Step 4: Deploy a Dask cluster

In this assignment you will program a Dask application on a 3-node EC2 cluster. The architecture of your cluster is shown in the figure below.

Dask cluster

SSH into the EC2 instance that you plan to host the Dask scheduler (i.e., the scheduler EC2 instance vm1) using the following command:

$ ssh -i "vockey.pem" ubuntu@<public_IPv4_DNS_address_of_your_EC2_instance> -L 8001:localhost:8787

Note again you are forwarding a local port (which in this case is 8001) to the remote port 8787 on your EC2 instance. Once SSH’ed in, start the Dask scheduler process using the following command:

$ dask scheduler --host 0.0.0.0

Forwarding the connection to the local port 8001 to remote 8787 on your EC2 instance is to access the Dask dashboard from your local browser. Type localhost:8001 to access the Dask dashboard.

Next, SSH into the other 2 EC2 instances that will be your Dask worker nodes. Start two Dask worker processes on each of the 2 instances with the following command:

$ dask worker <private_IPv4_address_of_scheduler_instance>:8786 --nworkers 2 --nthreads 4

The first specified option in the above command <private_IPv4_address_of_scheduler_instance>:8786 is to let this Dask worker know that it should connect to the Dask scheduler running at that specified IP address and port; the second option --nworkers 2 is to configure this Dask worker to launch two separate Python processes on that EC2 instance; the third option --nthreads 4 is to configure this Dask worker to launch four Python threads in each of the two Dask worker processes. (Recall we learned the differences of processes vs. threads in Lecture 2c.)

WHY TWO WORKER PROCESSES PER INSTANCE? The reason is simple: each of your EC2 worker nodes is equipped with an EC2 instance type of t3.large, which has only 2 vCPU cores and 8 GB of memory. Running more than two Python processes would not help. But for concurrent processing each one of the two Dask worker processes has 4 Python threads within an individual worker process.

Configuring more than one thread per worker process could be helpful in some cases where concurrent processing is needed. But you should note that Python has limited parallel processing capability due to the restriction of so-called GIL (global interpreter lock). The Python GIL is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecode at once. While it simplifies the implementation of the inner mechanisms to some extent, it prevents multithreading Python programs from taking full advantage of modern multi-core systems in certain situations (for example, in our case, the data-intensive application scenarios). Therefore, having more than one Dask worker process is helpful as this deployment improves the parallelism.

To verify that you have 4 workers launched, check the log information from the terminal where you run the Dask scheduler. Or go to the Dask dashboard, you should be able to see all the connected workers. See a snapshot below. Yours, however, should be a brand new dashboard with four vertical blue bars under Bytes stored per worker and no colorful task stream visualizations under Task Stream.

Dask dashboard

Part 2: Programming tasks

Dataset

In this part, you will implement a simple Dask program. We have provided a dataset that you can download from the link below. Make sure the dataset is available under the same path on EC2 instances where the scheduler and workers are running. To do so, use the tool gdown to download the dataset to the scheduler node vm1:

$ cd ~
$ pip3 install gdown
$ gdown https://drive.google.com/uc?id=1vlczaocHyghEW5Q-6WprqB2nnna_wrLE
$ unzip stackoverflow.zip

NOTE: If unzip is not installed by default, you should run sudo apt install -y unzip to install it on vm1.

Then scp the two .csv files to all the other 2 EC2 instances.

$ scp question*.csv <private_IPv4_address_of_worker>:~/

Notebook skeleton

We have also provided a Jupyter Notebook skeleton to help you get started. To download the .ipynb file, run:

$ wget https://tddg.github.io/ds5110-cs5501-spring24/assets/datasets/a1.ipynb

Dataset description

The dataset has two files, a questions.csv file and a question_tags.csv file.

The schemas of questions.csv are as follows:

Column nameDescription
IdID of the question
CreationDateCreation date of the question
ClosedDateThe date when the question was closed
DeletionDateThe date when the question was deleted
ScoreScore of the question
OwnerUserIdID of the question’s owner user
AnswerCountNumber of answers under this question

The schemas of question_tags.csv are as follows:

Column nameDescription
IdID of the question
TagTag of the question
  • Task 1. Get the percentage of missing values for all the columns in the questions table and the question_tags table.
  • Task 2. Get mean, standard deviation, medium, min, and max of the Score column in the questions table.
  • Task 3. Get the top 5 tags that have the highest number of questions (hint: manipulate the question_tags table).
  • Task 4. Check if there are any dangling references to the question Id field from the questions table to question_tags table. Return 1 if there are dangling references; return 0 otherwise.
  • Task 5. Create a new owner user table based on the questions table grouped by the OwnerUserId field. Refer to this link for Dask’s groupby operator. Refer to this link to see how to use Dask’s groupby.agg operator to apply multiple functions to different columns of the re-grouped dataframe. The new table should have the following fields: (1) OwnerUserId: the field that the table is grouped by; (2) AverageScore: aggregated by getting the average score value of the owner user; (3) NumQuestions: aggregated by getting count value of the Id field of corresponding owner users; (4) NumAnswers: aggregated by getting the sum value of the AnswerCount field of corresponding owner users. See the schema table with the corresponding aggregation functions as follows:
Column nameDescriptionAggregation function
OwnerUserIdID of the question’s owner user
AverageScoreAverage (mean) score across all questions posted by this usermean
NumQuestionsNumber of questions (count) posted by this usercount
NumAnswersNumber of answers (sum) received by all the questions posted by this usersum

In Task 5, you should output the top 5 owner users who asked the most number of questions.

  • Task 6. Create a new table by merging the questions table and the question_tags table using Id as the index. Then group the new table by Tag with the following aggregated fields: (1) Tag: the field that the table is grouped by; (2) AverageScore: aggregated by getting the average score value of the corresponding tags; (3) NumAnswers: aggregated by getting the sum value of the AnswerCount field of corresponding tags; (4) NumQuestions: aggregated by getting the count value of the Id field of corresponding tags; and (5) NumOwners: aggregated by getting the count value of the OwnerUserId field of corresponding tags. The schemas are listed as follows:
Column nameDescriptionAggregation function
TagID of the question
AverageScoreAverage (mean) score across all questions marked with this tagmean
NumAnswersNumber of answers (sum) received by all questions marked with this tagsum
NumQuestionsNumber of questions (count) marked with this tagcount
NumOwnersNumber of users (count) asking questions marked with this tagcount

This task should output the top 5 tags with the highest number of questions and the top 5 tags with the highest number of answers received.

  • Task 7. Kill the third and fourth Dask worker on vm3 by using ^C (Control+C). Dask scheduler should be able to transparently detect worker failures and will use the only two workers that are left for upcoming computations. Repeat Task 5 using only two Dask workers (running on vm2) and report the execution time of Task 7. Compare it with the execution time you saw from Task 5 and present your reasoning about why the execution time increases or decreases.

  • Task 8. Kill the third and fourth Dask worker on vm3 by using ^C (Control+C). Dask scheduler should be able to transparently detect worker failures and will use the only two workers that are left for upcoming computations. Repeat Task 6 using only two Dask workers (running on vm2) and report the execution time of Task 8. Compare it with the execution time you saw from Task 6 and present your reasoning about why the execution time increases or decreases.

Point distribution

There are a total of 8 tasks for A1. Each task from Task1-4 is 4 points (running result: 3 points; comments: 1 point). Task 5 and 6, each is 6 points (running results: 4 points; comments: 2 points). Task 7 and 8, each is 6 points (running results: 3 points; comments: 3 points).

Deliverables

You should submit a .ipynb file to Canvas, which follows the naming convention of LastName_FirstName_ComputingID_A1.ipynb.

Code should be commented well (which will be worth some percentage of your grade for the assignment, the GTAs will be looking at your code). Use the comment as a README for each task and report your own findings if any.

You should include the report for Task 7 and 8 as comments in a new Notebook cell after the corresponding tasks.


© 2024 Yue Cheng. Released under the CC BY-SA license