Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
559 views
in Technique[技术] by (71.8m points)

scala - Optimize Spark job that has to calculate each to each entry similarity and output top N similar items for each

I have a Spark job that needs to compute movie content-based similarities. There are 46k movies. Each movie is represented by a set of SparseVectors (each vector is a feature vector for one of the movie's fields such as Title, Plot, Genres, Actors, etc.). For Actors and Genres, for example, the vector shows whether a given actor is present (1) or absent (0) in the movie.

The task is to find top 10 similar movies for each movie. I managed to write a script in Scala that performs all those computations and does the job. It works for smaller sets of movies such as 1000 movies but not for the whole dataset (out of memory, etc.).

The way I do this computation is by using a cross join on the movies dataset. Then reduce the problem by only taking rows where movie1_id < movie2_id. Still the dataset at this point will contain 46000^2/2 rows which is 1058000000. And each row has significant amount of data.

Then I calculate similarity score for each row. After similarity is calculated I group the results where movie1_id is same and sort them in descending order by similarity score using a Window function taking top N items (similar to how it's described here: Spark get top N highest score results for each (item1, item2, score)).

The question is - can it be done more efficiently in Spark? E.g. without having to perform a crossJoin?

And another question - how does Spark deal with such huge Dataframes (1058000000 rows consisting of multiple SparseVectors)? Does it have to keep all this in memory at a time? Or does it process such dataframes piece by piece somehow?


I'm using the following function to calculate similarity between movie vectors:

def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)

var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
  val index: Int = a.indexAt(offset)
  val value: Double = a.valueAt(offset)

  dot += value * b(index)
  offset += 1
}

val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)

if (maga == 0 || magb == 0)
  return 0
else
  return dot / (maga * magb)
}

Each row in the Dataframe consists of two joined classes:

final case class MovieVecData(imdbID: Int,
                          Title: SparseVector,
                          Decade: SparseVector,
                          Plot: SparseVector,
                          Genres: SparseVector,
                          Actors: SparseVector,
                          Countries: SparseVector,
                          Writers: SparseVector,
                          Directors: SparseVector,
                          Productions: SparseVector,
                          Rating: Double
                         )
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It can be done more efficiently, as long as you are fine with approximations, and don't require exact results (or exact number or results).

Similarly to my answer to Efficient string matching in Apache Spark you can use LSH, with:

If feature space is small (or can be reasonably reduced) and each category is relatively small you can also optimize your code by hand:

  • explode feature array to generate #features records from a single record.
  • Self join result by feature, compute distance and filter out candidates (each pair of records will be compared if and only if they share specific categorical feature).
  • Take top records using your current code.

A minimal example would be (consider it to be a pseudocode):

import org.apache.spark.ml.linalg._

// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)

val df = Seq(
  (1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
  (2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
  (3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
  (4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
  (5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")

val possibleMatches = df
  .withColumn("key", explode(indices($"features")))
  .transform(df => df.alias("left").join(df.alias("right"), Seq("key")))

val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) =>  intersectionCosine(v1, v2) > threshold)

possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct

Note that both solutions are worth the overhead only if hashing / features are selective enough (and optimally sparse). In the example shown above you'd compare only rows inside set {1, 2, 3} and {4, 5}, never between sets.

However in the worst case scenario (M records, N features) we can make N M2 comparisons, instead of M2


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...