import jsonjsonStrings =[json.dumps(x,ensure_ascii=False)for x in json_array]otherPeopleRDD = sc.parallelize(jsonStrings)otherPeople = spark.read.json(otherPeopleRDD)otherPeople.show()
Concatenate dataframe
Print schema
Select columns
Create new columns
Print dataframe
Print descriptive statistics for dataframe
Apply SQL to dataframe
Return a new row per each element in an array in column
Return a single string per row in groupby
Filter rows
Result will be a spark dataframe
Match rows with regex
Create empty columns with type
detect null values in column
Fill null values
Read/write parquet files
Stratified sample
Transform to dict
Filter columns with/without nulls
Group by
Group by column A keep the rows that are max according to column B
Group by (count nulls)
Order by
Alias
This help us complement the withColumn functionality in cases where the computation is result of spark transformation.
Null processing
Remove all rows with at least a null value
Remove rows with a least X number of non-null values
df.createOrReplaceTempView("associates") #Creating a view called associates
sql_result_1 = spark.sql("SELECT * FROM associates") #Applying SQL query on associates
print("Showing the results of the select query")
sql_result_1.show()
df.na.fill("")
#Replace Replace 0 for null on only population column
df.na.fill(value=0,subset=["population"])
#Conditional filling of na
df.na.fill({"city": "unknown", "type": ""})
#read
s3_path = "PATH"
prod_cat_df = spark.read.parquet(s3_path)
prod_cat_df.show(1, truncate=False, vertical=True)
#write
prod_cat_df.write.parquet(s3_path)
#write with partition
df.write.partitionBy("gender","salary")
.parquet("s3a://sparkbyexamples/parquet/people2.parquet")
#read with partition
parqDF = spark.read.parquet("s3a://sparkbyexamples/parquet/people2.parquet")
parqDF.createOrReplaceTempView("Table2")
df = spark.sql("select * from Table2 where gender='M' and salary >= 4000")
from pyspark.ml.feature import VectorAssembler,VectorIndexer,StringIndexer,OneHotEncoder
#Formatting the categorical column - sex
#Creating a String Indexer - To convert every string into a unique number
sex_string_indexer_direct = StringIndexer(inputCol='sex',outputCol='sexIndexer')
indexed_data = sex_string_indexer_direct.fit(data)
final_string_indexed_data = indexed_data.transform(data)
# Male - 1 and Female 0 or vice versa
#Performing OneHotEncoing - convert this value into an array form
sex_encoder_direct = OneHotEncoder(inputCol='sexIndexer',outputCol='sexVector')
encoded_data = sex_encoder_direct.transform(final_string_indexed_data)
# Male - [1,0] and Female - [0,1] or vice versa
print("Data after OneHotEncoding")
encoded_data.show(4)
assembler_direct = VectorAssembler(inputCols=['age','sexVector','tumor_size'],outputCol='features')
assembler_data = assembler_direct.transform(encoded_data)
final_data_direct = assembler_data.select('features','cancerous')
print("Consolidated Data with accepted features and labels")
final_data_direct.show(3)
#Step 3 - Training our Logistic Regression model
from pyspark.ml.classification import LogisticRegression
logreg_direct = LogisticRegression(featuresCol='features',labelCol='cancerous')
train_data_direct,test_data_direct = final_data_direct.randomSplit([0.6,0.4])
logreg_model_direct = logreg_direct.fit(train_data_direct)
#Step 4 - Evaluating and performing Predictions on our model
#Evaluating our model with testing data
#Direct Evaluation using Trivial method
predictions_labels = logreg_model_direct.evaluate(test_data_direct)
print("Prediction Data")
predictions_labels.predictions.select(['features','cancerous',
'prediction']).show(3)
#Evaluation using BinaryClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
direct_evaluation = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='cancerous')
AUC_direct = direct_evaluation.evaluate(predictions_labels.predictions)
print("Area Under the Curve value is {}".format(AUC_direct))
print("\nCoeffecients are {}".format(logreg_model_direct.coefficients))
print("\nIntercept is {}".format(logreg_model_direct.intercept))
from pyspark.ml.feature import Tokenizer,RegexTokenizer
#Applying Tokenizer class which splits text on whitespaces
simple_tokenizer = Tokenizer(inputCol='text_content',outputCol='tokens_words')
simple_tokens = simple_tokenizer.transform(data)
print("Tokenizer Output - Splitting text on Whitespaces")
simple_tokens.show(truncate=False)
#Applying RegexTokenizer class which splits text on user defined patterns
# Special sequence \W splits on non-alphanumeric characters (in our case it splits on '-')
regex_tokenizer = RegexTokenizer(inputCol='text_content',outputCol='tokens_words',pattern='\\W')
regex_tokens = regex_tokenizer.transform(data)
print("RegexTokenizer Output - Splitting text on special sequence \W")
regex_tokens.show(truncate=False)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('StopWordsRemover').getOrCreate()
from pyspark.ml.feature import StopWordsRemover, Tokenizer
data = spark.read.csv('stopwords.csv',header=True,inferSchema=True)
print("Initial Data")
data.show(truncate=False)
#Applying Tokenizer prior to StopWordsRemover as StopWords takes tokens as its input
simple_tokenizer = Tokenizer(inputCol='text_content',outputCol='tokens_words')
simple_tokens = simple_tokenizer.transform(data)
print("Tokenizer Output - Splitting text on Whitespaces")
simple_tokens.show(truncate=False)
#Applying StopWordsRemover class
stopWords = StopWordsRemover(inputCol='tokens_words',outputCol='stopWordsRemoved')
stopWords_tokens = stopWords.transform(simple_tokens)
print("Data after Stop Words Removal")
stopWords_tokens.select('tokens_words','stopWordsRemoved').show(truncate=False)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('TFIDF_HashTF').getOrCreate()
from pyspark.ml.feature import Tokenizer,HashingTF,IDF
data = spark.read.csv('reviews_tfidf.csv',header=True,inferSchema=True)
print("Initial Data")
data.show(truncate=False)
#Applying Tokenizer class which splits text on whitespaces
simple_tokenizer = Tokenizer(inputCol='reviews',outputCol='review_tokens')
simple_tokens = simple_tokenizer.transform(data)
print("Tokenizer Output - Splitting text on Whitespaces")
simple_tokens.show(truncate=False)
#Applying HashingTF
hashingtf_vectors = HashingTF(inputCol='review_tokens',outputCol='hashVec')
HashingTF_featurized_data = hashingtf_vectors.transform(simple_tokens)
print("HashingTF Data")
HashingTF_featurized_data.select('review_tokens','hashVec').show(truncate=40)
#Applying CountVectorizer to convert tokens to vectors of token count
# count_vectors = CountVectorizer(inputCol='review_tokens',outputCol='countVec')
# count_vectors_model = count_vectors.fit(simple_tokens)
# countVector_featurized_data = count_vectors_model.transform(simple_tokens)
# print("CountVectorizer Data")
# countVector_featurized_data.select('review_tokens','countVec').show(truncate=False)
#Applying IDF on vectors of token count output from HashingTF
idf = IDF(inputCol='hashVec',outputCol='features')
idf_model = idf.fit(HashingTF_featurized_data)
final_data = idf_model.transform(HashingTF_featurized_data)
print("Final Spark accepted Data - NLP Formatted Data ready to pass into any Machine Learning Model")
final_data.select('label','features').show(truncate=60)