slama.dev

Mining Massive Datasets

Preface

This website contains my lecture notes from a lecture by Artur Andrzejak from the academic year 2022/2023 (University of Heidelberg). If you find something incorrect/unclear, or would like to contribute, feel free to submit a pull request (or let me know via email).

Note that the notes only cover the topics required for the exam, which are only a portion of the contents of the lecture.

Lecture overview

  1. Spark architecture and RRDs [slides]
  2. Spark Dataframes + Pipelines [slides]
  3. Recommender Systems 1 [slides]
  4. Recommender Systems 2 [slides]
  5. Page Rank 1 [slides]
  6. Page Rank 2 [slides]
  7. Locality Sensitive Hashing 1 [slides]
  8. Locality Sensitive Hashing 2 [slides]
  9. Frequent itemsets + A-Priori algorithm [slides]
  10. PCY extensions [slides]
  11. Online bipartite matching + BALANCE [slides]
  12. Mining data streams 1 [slides]
  13. Mining data streams 2 [slides]

Programming and Frameworks

PySpark

RDDs [cheatsheet]

Main data structure are RDDs (resilient distributed datasets):

Two operation types:

DataFrames [cheatsheet]

PySpark’s other main data structure are DataFrames:

ML with Spark

Definition (Transformer): an algorithm which transforms one DataFrame into another

Definition (Estimator): an algorithm which can be fit on a DataFrame to produce a Transformer

Definition (Pipeline): an object which contains multiple Transformers and Estimators

PySpark ML pipeline illustration.

Spark Streaming

General idea is to:

DStream (discretized stream) – container for a stream, implemented as a sequence of RDDs

# an example that returns the word counts in a stream

# 2 threads - source feed + processing
sc = SparkContext("local[2]", "NetworkWordCount")

# batch interval = 1s
ssc = StreamingContext(sc, 1)

# DStream will be connected here
lines = ssc.socketTextStream("localhost", 9999)

# lines is an RDD (kinda) and we can behave as such
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = words.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

# start the task and wait for termination
ssc.start()
ssc.awaitTermination()

Recommender Systems

Problem: set of customers XX, set of items II, utility function (set of ratings) u:X×IRu: X \times I \mapsto R

  Alice Bob Carol David
Star Wars 11   0.20.2  
Matrix   0.50.5   0.30.3
Avatar 0.20.2   11  
Pirates       0.40.4

Note: for some reason, this matrix is I×XI \times X while all other notation is X×IX \times I (like rxir_{xi}, for example). It’s confusing, I’m not sure why we’ve defined it this way.

Goal: extrapolate unknown ratings from the known ones.

Collaborative filtering (CF)

Idea: take known ratings of other similar items/users.

User-user CF

Algorithm:

  1. let rxr_x be vector of user xx’s ratings and NN the set of neighbors of xx
  2. predicted rating of user xx for item ii is rxi=1NyNryinaive version (just average)rxi=avg(rx)+yNsim(x,y)(ryiavg(ry))yNsim(x,y)improved, takes similarity of users into account, along with their bias\underbrace{r_{xi} = \frac{1}{|N|} \sum_{y \in N} r_{yi}}_{\text{naive version (just average)}} \qquad \underbrace{r_{xi} = \mathrm{avg}(r_x) + \frac{\sum_{y \in N} \mathrm{sim}(x, y) \cdot (r_{yi} - \mathrm{avg}(r_y))}{ \sum_{y \in N} \mathrm{sim}(x, y)}}_{\text{improved, takes similarity of users into account, along with their bias}}

To calculate similarity, we can use a few things:

This way of writing Pearson is hiding what’s really going on, so here is a nicer way: let sxs_x and sys_y be formed from vectors rx,ryr_x, r_y by removing the indexes where either one is zero. Then the Pearson similarity measure can be calculated like such: sim(rx,ry)=cos(sxavg(rx),syavg(ry))\mathrm{sim}(r_x, r_y) = \cos(s_x - \mathrm{avg}(r_x), s_y - \mathrm{avg}(r_y))

To calculate neighbourhood, we can do a few things:

Item-item CF

Analogous to User-user: for rating item ii, we’re find items rated by user xx that are similar (as in rated similarly by other users). To do this, we can again use sim\mathrm{sim}, obtaining rxi=jNsim(i,j)rxjjNsim(i,j)r_{xi} = \frac{\sum_{j \in N} \mathrm{sim}(i, j) \cdot r_{xj}}{\sum_{j \in N} \mathrm{sim}(i, j)}

The improved version has a slightly different baseline to User-User, namely rxi=bxi+jNsim(i,j)(rxjbxj)jNsim(i,j)r_{xi} = b_{xi} + \frac{\sum_{j \in N} \mathrm{sim}(i, j) \cdot (r_{xj} - b_{xj})}{\sum_{j \in N} \mathrm{sim}(i, j)} where bxi=b_{xi} = mean item rating ++ rating deviation of user xx ++ rating deviation of item ii.

Pros/Cons

Content-based recommendations

Main idea: create item profiles for each item, which is a vector vv of features:

For creating user profiles, we can do a weighted average (by rating) of their item profiles: x=(r1i1+r2i2++rnin)/nx = \left(r_1 i_1 + r_2 i_2 + \ldots + r_n i_n\right) / n

For matching User and Item (i.e. determining rating), we can again use cosine similarity between the user profile and the item.

Pros/Cons

Latent factor models

Merges Content-Based Recommenders (rating is a product of vectors) and Collaborative filtering (weighted sum of other ratings from the utility matrix) – use user data to create the item profiles! We’ll do this by factorizing the matrix into user matrix PP and item matrix QQ such that we minimize SSE minP,Q(i,x)R(rxiqipx)2\min_{P, Q} \sum_{\left(i, x\right) \in R} (r_{xi} - q_{i} \cdot p_x)^2

Latent factors illustration.

What we want to do is split the data into a training set, which we use to create the matrices, and the testing set, on which the matrices need to perform well. To do this well, we’ll use regularization, which controls for when the data is rich and when it’s scarce (lots of zeroes in pxp_xs and qiq_is): minP,Q(x,i)R(rxiqipx)2error+λ1xpx2+λ2iqi2„length“ (approx. num. of non-zeros in p, q)\min_{P, Q} \underbrace{\sum_{\left(x, i\right) \in R} \left(r_{xi} - q_i p_x\right)^2}_{\text{error}} + \underbrace{\lambda_1 \sum_{x} ||p_x||^2 + \lambda_2 \sum_{i} ||q_i||^2}_{\text{„length“ (approx. num. of non-zeros in p, q)}} for λ1,λ2\lambda_1, \lambda_2 user-set regularization parameters.

Flow formulation

Problem: we have pages as a directed graph. We want to determine the importance of pages based on how many links lead to it. I.e. if page jj of importance rjr_j has nn outgoing links, each link gets importance rj/nr_j / n. Formally: rj=ijridioutr_j = \sum_{i \rightarrow j} \frac{r_i}{ d^{\mathrm{out}}_i}

Can be expressed as a system of linear equations to be solved (Gaussian elimination).

Matrix formulation

Can also be formulated as an adjacency matrix MM, Mji={1dioutedge ij0otherwiseM_{ji} = \begin{cases} \frac{1}{d^{\mathrm{out}}_i} & \text{edge}\ i \rightarrow j \\ 0 & \text{otherwise} \end{cases} then we can define the flow equation as Mr=rMr = r meaning that we’re looking for the eigenvector of the matrix. Since the matrix is stochastic (columns sum to 1), its first eigenvector has eigenvalue 1 and we can find it using power iteration (see the PageRank formulation below).

This, however, has two problems:

Both are solved using teleports – during each visit, we have a probability of 1β1 - \beta to jump to a random page (β\beta usually 0.80.8). In a dead-end, we teleport with probability 11.

Using this, we get the Google PageRank equation: rj=ijβridiout+(1β)1NPageRank equationA=βM+(1β)[1N]N×NAr=rGoogle Matrix A\underbrace{r_j = \sum_{i \rightarrow j} \beta \frac{r_i}{d^{\mathrm{out}}_i} + (1 - \beta) \frac{1}{N}}_{\text{PageRank equation}} \qquad \underbrace{A = \beta M + (1 - \beta) \left[\frac{1}{N}\right]_{N \times N} \quad Ar = r}_{\text{Google Matrix A}}

Practical computation

To reduce the matrix operations, we can rearrange the matrix equation and get r=βMr+[1βN]Nr =\beta M \cdot r + \left[\frac{1 - \beta}{N}\right]_N

Algorithm (PageRank):

When we don’t have enough RAM to fit the whole matrix MM, we can read and update it by vertices (it’s sparse so we’d likely be storing it as a dictionary):

Source Degree Destination
00 33 1,5,61, 5, 6
11 44 17,64,113,11617, 64, 113, 116
22 22 13,2313, 23

I.e. do rdestjnew+=βriold/dir^{\mathrm{new}}_{\mathrm{dest}_j} += \beta r^{\mathrm{old}}_i / d_i.

When we can’t even fit rnewr^{\mathrm{new}} into memory, we can break it into kk blocks that do fit into memory and scan MM and roldr^{\mathrm{old}}. This, however, is pretty inefficient – we can instead use the block-stripe algorithm, which breaks MM by destinations instead so we don’t need to repeatedly scan it!

Each block will only contain the destination nodes in the corresponding block of rnewr^{\mathrm{new}}.

Destination Source Degree Destination
[0,1][0,1] 00 44 0,10, 1
  11 33 00
  22 22 11
       
[2,3][2,3] 00 44 33
  22 22 33
       
[4,5][4,5] 00 44 55
  11 33 55
  22 22 44

Topic-Specific PageRank

We can bias the random page walk to teleport to to relevant pages (from set SS). For each teleport set SS, we get different vector rSr_S. This changes the PageRank formulation like so:

Aij=βMij+{(1β)/SiS0otherwiseA_{ij} = \beta M_{ij} + \begin{cases} (1 - \beta) / |S| & i \in S \\ 0 & \text{otherwise} \end{cases}

TrustRank

Adresses issues with spam farms, which are pages that just point to one another. The general idea to fix this is to use a set of seed pages from the web and identify the ones that are „good“, i.e. trusted. Then perform topic-specific pagerank with S=S = trusted pages, which propagates the trust to other pages. After this, websites with trust below a certain threshold are spam.

We can use TrustRank for spam mass estimation (i.e. estimate how much rank of a page comes from spammers):

Locality Sensitive Hashing

Goal: find near-neighbors in high-dimensional spaces:

General idea:

Preface

Definition (hash function): function h:DRh: D \mapsto R, where DD is a large domain space and RR a small range space (usually integers).

They are very useful for implementing sets and dictionaries (have an array to store the elements in and use a good hash function to map them; dynamically grow/shrink as needed).

Shingling

For a document, take a sliding window of size kk. Then kk-shingles for the given document are all sequences of kk consecutive tokens from the document.

Min-hashing

The shingling sets are very large, so we have to find a way to measure how similar they are. What we want to measure is their Jaccard similarity, which is sim(S1,S2)=S1S2 / S1S2\mathrm{sim}(S_1, S_2) = |S_1 \cap S_2|\ /\ |S_1 \cup S_2|

To do this, we’ll compute signatures, which are shorter but should have the same Jaccard similarity as the original set. To compute a signature, we take many random hash function (for example random permutations), hash all values from the set of shingles and take the minimum. Then the list of all those minimal values is the signature.

For example:

It turns out that Pr[hπ(S1)=hπ(S2)]=sim(S1,S2)\mathrm{Pr}\left[h_\pi (S_1) = h_\pi (S_2)\right] = \mathrm{sim}(S_1, S_2)

Locality-Sensitive Hashing

Our goal now is to find documents with Jaccard similarity at least ss (e.g. 0.80.8). To do this, we will hash parts of the signature matrix:

Locality-Sensitive Hashing example.

We want to tune bb and rr to catch most similar pairs but few non-similar pairs:

Plotting the probabilities with variable ss, we get the S-curve:

S-Curve illustration.

Association Rule Discovery

Goal (the market-basket model): identify items that are bought together by sufficiently many customers (if someone buys diaper and baby milk, they will also buy vodka since the baby is probably driving them crazy).

Approach: process the sales data to find dependencies among items

TID Items
1 Bread, Coke, Milk
2 Vodka, Bread
3 Vodka, Coke, Diaper, Milk
4 Vodka, Bread, Diaper, Milk
5 Coke, Diaper, Milk

Definition (frequent itemsets): sets of items that frequently appear together

Definition (association rule): an association rule RR has the form {i1,i2,,ik}    {j1,j2,,jm}\left\{i_1, i_2, \ldots, i_k\right\} \implies \left\{j_1, j_2, \ldots, j_m\right\} and essentially states that if a basket contains the set II, then it also contains set JJ

Definition (confidence): of an association rule is the probability that it applies if IBI \subseteq B, namely confidence(IJ)=support(IJ)support(I)\mathrm{confidence}(I \rightarrow J) = \frac{\mathrm{support}(I \cup J)}{\mathrm{support}(I)}

Definition (interest): of an association rule is the difference between confidence and the fraction of baskets that contain JJ, namely interest(IJ)=confidence(IJ)Pr[JB]\mathrm{interest}(I \rightarrow J) = \mathrm{confidence}(I \rightarrow J) - \mathrm{Pr}[J \in B]

Problem: we want to find all association rules with supports\mathrm{support} \ge s and confidencec\mathrm{confidence} \ge c.

  1. find all frequent itemsets II (those with supports\mathrm{support} \ge s)
    • recipes are usually stored on disks (they won’t fit into memory)
    • association-rule algorithms read data in passes – this is the true cost
    • hardest is finding frequent pairs (number of larger tuples drops off)
      • approach 1: count all pairs using a matrix 4\rightarrow 4 bytes per pair
      • approach 2: count all pairs using a dictionary 12\rightarrow 12 bytes per pair with count >0> 0
      • smarter approaches: A-Priori, PCY (see below)
  2. use them to generate rules with confidencec\mathrm{confidence} \ge c:
    • for every AIA \subseteq I, generate rule AIAA \rightarrow I \setminus A: since II is frequent, AA is also frequent
    • for calculating confidences, we can do a few things:
      1. brute force go brrrrrr
      2. use the fact that if A,B,CDA, B, C \rightarrow D is below confidence, so is A,BC,DA, B \rightarrow C, D

A-Priori Algorithm

Algorithm (A-Priori):

  1. pass: count individual items
  2. pass: count only pairs where both elements are frequent

A-Priori memory layout illustration.

Can be generalized for any kk by again constructing candidate kk-tuples from previous pass and then passing again to get the truly frequent kk-tuples.

PCY Algorithm

Observation: in pass 11 of A-Priori, most memory is idle:

Algorithm (PCY (Park-Chen-Yu)):

  1. pass: count individual items + hash pairs to buckets, counting them too
    • between passes: convert the buckets into a bit-vector:
      • 11 if a bucket count exceeded support ss
      • 00 if it did not
  2. pass: count only pairs where
    • both elements are frequent (same as A-Priori) and
    • the pair hashes to a bucket whose bit is frequent

PCY memory layout illustration.

Online Advertising

Initial problem: find a maximum matching for a bipartite graph where we’re only given the left side and the right side is revealed one-by-one. The obvious first try is a greedy algorithm (match with first available)

Revised problem: left side are advertisers, right side terms to advertise on; we know

We want to respond to each search query with a set of advertisers such that:

Greedy: pick the first available advertiser

BALANCE: pick the advertiser with the largest unspent budget (largest balance)

Mining Data Streams

For our purposes, a stream is a long list of tuples of some values.

Stream Filtering

Problem: given a stream and a list of keys SS, determine which stream elements are in SS

First-cut solution

The probability that a target gets at least one hash (which equals the false positive rate) is 1(11/n)one doesn’t hitmnone of them hit=1(11/n)n(m/n)1em/n1 - \overbrace{ { {\underbrace{(1 - 1/n)}_{\text{one doesn't hit}}}^m} }^{\text{none of them hit}} = 1 - \left(1 - 1/n\right)^{n (m / n)} \approx 1 - e^{-m/n}

Bloom filter

Create kk independent hash functions, setting 11s for all element’s hashes: 1(11/n)km1ekm/n1 - (1 - 1/n)^{km} \approx 1 - e^{-km/n}

However, to generate a false positive rate, all of the hash functions have to get a hit, so: (1(11/n)km)k(1ekm/n)k(1 - (1 - 1/n)^{km})^k \approx (1 - e^{-km / n})^k

The minimum of this function (wrt. kk) is n/mln(2)n/m \ln(2):

Bloom filter graph.

Stream Sampling

Fixed portion

Goal: store a fixed portion of the stream (for ex. 1/10)

Naive solution: pick randomly and hope for the best

Better solution: pick by value, not by position (i.e. pick 1/10 of users)

Fixed size (Reservoir Sampling)

Goal: store a fixed number S|S| of elements of the stream

Algorithm (Reservoir Sampling):

This ensures that after nn elements, all elements have a s/ns/n probability to be currently sampled3.

Stream Counting

Goal: count the number of distinct elements in the stream

Flajolet-Martin

Algorithm (Flajolet-Martin):

Intuitively, h(a)h(a) hashes aa with equal probability, so the probability that we get a hash with rr trailing zeroes is 2r2^r (i.e. we have to hash at least 2r2^r others before).

We can generalize this concept to counting moments: for mam_a being the number of times element aa appears in the stream SS, we define the kk-th moment as aSmak\sum_{a \in S} m_a^k

AMS Algorithm

Algorithm (AMS (Alton-Matias-Szegedy)):

Fixups (we don’t know the size of the stream)

Let’s prove that this works for the 22nd moment for one variable. We want to prove that E[f(X)]=imi2\mathbb{E}[f(X)] = \sum_i m_i^2. Let ctc_t be the number of times the item at index tt appears from then on. We get E[f(X)]=1nt=1nn(2ct1)# definition=1nitem in(1+3+5++2mi1)# group by i=1nitem in(2mi(mi+1)2mi)=1nitem inmi2=item imi2\begin{aligned} \mathbb{E}\left[f(X)\right] &= \frac{1}{n} \sum_{t = 1}^{n} n(2 c_t - 1) \qquad \text{\# definition} \\ &= \frac{1}{n} \sum_{\text{item}\ i} n(1 + 3 + 5 + \ldots + 2m_i - 1) \qquad \text{\# group by $i$} \\ &= \frac{1}{n} \sum_{\text{item}\ i} n (2 \frac{m_i (m_i + 1)}{2} - m_i) \\ &= \frac{1}{n} \sum_{\text{item}\ i} n m_i^2 \\ &= \sum_{\text{item}\ i} m_i^2 \\ \end{aligned}

  1. let LL be left side and RR the right. If MgreedyMoptionalM_{\mathrm{greedy}} \neq M_{\mathrm{optional}}, consider set GRG \subseteq R matched in MoptionalM_{\mathrm{optional}} but not in MgreedyM_{\mathrm{greedy}}. Now consider BLB \subseteq L adjacent to GG: every one of those must be matched in MgreedyM_{\mathrm{greedy}} (for those GG not to be) so BMgreedy.|B| \le |M_{\mathrm{greedy}}|.. Also, BG|B| \ge |G|, since otherwise the optimal algorithm couldn’t have matched all girls in GG. Since MoptMgreedy+G|M_{\mathrm{opt}}| \le |M_{\mathrm{greedy}}| + |G|, we get the desired bound after substituting for G|G|

  2. Here is the proof from the lecture: BALANCE proof. 

  3. Shown by induction – after element n+1n + 1 arrives:

    • for element xx in SS, in the probability that the algorithm keeps it this iteration is (1sn+1)new one is discarded+(sn+1)(s1s)new one is not discardedbut replaces a different one=nn+1\underbrace{\left(1 - \frac{s}{n + 1}\right)}_{\text{new one is discarded}} + \underbrace{\left(\frac{s}{n + 1}\right) \left(\frac{s - 1}{s}\right)}_{\text{new one is not discarded} \atop \text{but replaces a different one}} = \frac{n}{n + 1}
    • the probability that the element xx is in SS at time n+1n + 1 is therefore sninductionnn+1=sn+1\underbrace{\frac{s}{n}}_{\text{induction}} \cdot \frac{n}{n + 1} = \frac{s}{n + 1}