I am very new in pyspark and I have developed a program to perform NLTK on HDFS file, The following are the steps for that.I'm using spark 2.3.1
1. Get file from HDFS
2. perform Lemmatization
3. Remove punctuation mark.
4. Convert RDD to DataFrame
5. Perform Tokenizer
6. Remove Stop words
7. Explode columns data to create a unique row for each record
8. I want to keep all files data into a single file so I am merging the output with old fil
9. Now write this entire merged output into HDFS
10. Then deleting old file and renaming spark created file to different name
11. I am doing this for all bigram and trigram files.
Here is my pyspark code.
%pyspark
import os
import pyspark
import csv
import nltk
import json
import string
import re
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark import SparkContext, SparkConf as sc
from pyspark.sql.types import StringType
from nltk.corpus import stopwords
nltk.download('stopwords')
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode,regexp_replace
import pandas
import hdfs
nltk.download('punkt')
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("PySpark App")
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
hdfs_dst_dir = "/user/zeppelin/achyuttest.csv/"
counter=0
#Lemmatizen
def lemma(x):
lemmatizer = WordNetLemmatizer()
return lemmatizer.lemmatize(x)
for i in range(1,50001):
data = sc.textFile('hdfs:///user/spark/Patentdata/ElectronicsPatents/Link {}/abstract.txt'.format(i), use_unicode=False)
print(type(data))
if data.isEmpty():
continue
else:
lem_words = data.map(lemma)
list_punct=list(string.punctuation)
len_list = lem_words.collect()
test_str = len_list[0]
test_df = test_str.split(' ')
data_df = data.map(lambda x: (x, )).toDF(['lem_words'])
# Perform Tokenizer
tokenizer = Tokenizer(inputCol="lem_words", outputCol="tokenized_data")
outputdata = tokenizer.transform(data_df)
outputdata = outputdata.select('tokenized_data')
# Remove stop words
remover = StopWordsRemover(inputCol='tokenized_data', outputCol='words_clean')
outputdata = remover.transform(outputdata).select('words_clean')
#Explode one Row into multiple Row with value
result_df = outputdata.withColumn("exploded", explode("words_clean")).select("exploded")
result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\s]",""))
print("Link ========>",i)
#Merge with old output
if counter>0:
old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/unigram.csv', use_unicode=False)
old_data_df = old_data.map(lambda x: (x, )).toDF(['words_clean'])
result_df = old_data_df.union(result_df)
else:
pass
#Write DataFrame to HDFS
result_df.coalesce(1).write.mode('append').csv(hdfs_dst_dir)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
# Rename file
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))
#filter name of the file starts with part-
print("Get FileName")
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
print(file_name)
#rename the file
new_filename = "unigram.csv"
# Remove Old file
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
## Bigrams
bigram = NGram(n=2, inputCol="words_clean", outputCol="bigrams")
bigramDataFrame = bigram.transform(outputdata)
#Explode one Row into multiple Row with value
result_df = bigramDataFrame.withColumn("exploded", explode("bigrams")).select("exploded")
result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\s]",""))
#Merge with old output
if counter>0:
old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/bigram.csv', use_unicode=False)
old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])
result_df = old_data_df.union(result_df)
else:
pass
# Write Output in file
result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')
# Rename file
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))
#filter name of the file starts with part-
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
#rename the file
new_filename = "bigram.csv"
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
## TriGram
trigram = NGram(n=3, inputCol="words_clean", outputCol="trigrams")
trigramDataFrame = trigram.transform(outputdata)
#Explode one Row into multiple Row with value
result_df = trigramDataFrame.withColumn("exploded", explode("trigrams")).select("exploded")
result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\s]",""))
#Merge with old output
if counter>0:
old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/trigram.csv', use_unicode=False)
old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])
result_df = old_data_df.union(result_df)
else:
pass
#Save DataFrame in HDFS
result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')
# Rename file
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))
#filter name of the file starts with part-
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
#rename the file
new_filename = "trigram.csv"
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
counter = counter+1
I am performing this code on 50K files, and my spark is taking too much time to perform this program. (Passed 2 days and still going ...)
I'm running HDP in Virtual machine(running one node HDP Sandbox)Here is my system specification...
====> Guest OS::
Memory: 12930 MB
CPU: 6CPUs
===> YARN Specifications::
1.Memory: 4608 MB
Maximum Container memory: 4608 MB
Maximum Container size(Vcores): 4
Number of virtual core: 4
===> Zeppelin Pyspark Interpreter Specification::
1. spark.executor.memory: Blank (it's mean 1g as per specified in the documentation)
So I have two questions.
- Is my code proper or not?
- Which value I have to specify in YARN and Zeppelin Interpreter so it will work fast and efficiently.
Thank you.
See Question&Answers more detail:
os