Brute force approach as follows in Scala not working over lines and treating all as lowercase, could all be added but that is for another day. Relies on not trying to examine strings but to define ngrams as that is what it is is, ngrams against ngrams and genning these and then JOINing and counting, whereby inner join only relevant. Some extra data added to prove the matching.
import org.apache.spark.ml.feature._
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType,ArrayType,LongType,StringType}
import spark.implicits._
// Sample data, duplicates and items to check it works.
val dfPostsInit = Seq(
( "Hello!!, Stack overflow users, Do you know spark scala users."),
( "Spark scala is very fast,"),
( "Users in stack are good in spark"),
( "Users in stack are good in spark"),
( "xy z"),
( "x yz"),
( "ABC"),
( "abc"),
( "XYZ,!!YYY@#$ Hello Bob..."))
.toDF("posting")
val dfWordsInit = Seq(("Stack"), ("Stack Overflow"),("users"), ("spark scala"), ("xyz"), ("xy"), ("not found"), ("abc")).toDF("words")
val dfWords = dfWordsInit.withColumn("words_perm" ,regexp_replace(dfWordsInit("words"), " ", "^")).withColumn("lower_words_perm" ,lower(regexp_replace(dfWordsInit("words"), " ", "^")))
val dfPostsTemp = dfPostsInit.map(r => (r.getString(0), r.getString(0).split("\W+").toArray ))
// Tidy Up
val columnsRenamed = Seq("posting", "posting_array")
val dfPosts = dfPostsTemp.toDF(columnsRenamed: _*)
// Generate Ngrams up to some limit N - needs to be set. This so that we can count properly via a JOIN direct comparison. Can parametrize this in calls below.
// Not easy to find string matching over Array and no other answer presented.
def buildNgrams(inputCol: String = "posting_array", n: Int = 3) = {
val ngrams = (1 to n).map(i =>
new NGram().setN(i)
.setInputCol(inputCol).setOutputCol(s"${i}_grams")
)
new Pipeline().setStages((ngrams).toArray)
}
val suffix:String = "_grams"
var i_grams_Cols:List[String] = Nil
for(i <- 1 to 3) {
val iGCS = i.toString.concat(suffix)
i_grams_Cols = i_grams_Cols ::: List(iGCS)
}
// Generate data for checking against later from via rows only and thus not via columns, positional dependency counts, hence permutations.
val dfPostsNGrams = buildNgrams().fit(dfPosts).transform(dfPosts)
val dummySchema = StructType(
StructField("phrase", StringType, true) :: Nil)
var dfPostsNGrams2 = spark.createDataFrame(sc.emptyRDD[Row], dummySchema)
for (i <- i_grams_Cols) {
val nameCol = col({i})
dfPostsNGrams2 = dfPostsNGrams2.union (dfPostsNGrams.select(explode({nameCol}).as("phrase")).toDF )
}
val dfPostsNGrams3 = dfPostsNGrams2.withColumn("lower_phrase_concatenated",lower(regexp_replace(dfPostsNGrams2("phrase"), " ", "^")))
val result = dfPostsNGrams3.join(dfWords, col("lower_phrase_concatenated") ===
col("lower_words_perm"), "inner")
.groupBy("words_perm", "words")
.agg(count("*").as("match_count"))
result.select("words", "match_count").show(false)
returns:
+--------------+-----------+
|words |match_count|
+--------------+-----------+
|spark scala |2 |
|users |4 |
|abc |2 |
|Stack Overflow|1 |
|xy |1 |
|Stack |3 |
|xyz |1 |
+--------------+-----------+