Spark

Frequently used code for spark related code snippets

Create Spark session:

#!pip install pyspark
#!pip install findspark

#import os
#os.environ["SPARK_HOME"] = "/opt/spark-3.0.0" # might be required
#import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()
sc = spark.sparkContext # Sometimes required

Create dataframe from json:

import json
jsonStrings = [json.dumps(x, ensure_ascii=False) for x in json_array]
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

Concatenate dataframe

result = df_1.union(df_2)
#result.show()

Print schema

df.printSchema()

Select columns

df.select(['colname'])

Create new columns

df.withColumns('new_col',col_value)

Print dataframe

df.show()

Print descriptive statistics for dataframe

df.describe().show()

Apply SQL to dataframe

df.createOrReplaceTempView("associates") #Creating a view called associates
sql_result_1 = spark.sql("SELECT * FROM associates") #Applying SQL query on associates
print("Showing the results of the select query")
sql_result_1.show()

Return a new row per each element in an array in column

from pyspark.sql import Row
df = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
df.select(explode(eDF.intlist).alias("anInt")).collect()

Return a single string per row in groupby

import pyspark.sql.functions as f
df.groupby("col1").agg(f.concat_ws(", ", f.collect_list(df.col2)))

Filter rows

Result will be a spark dataframe

df.filter("total_amount > 1000").collect()
# or df.filter(df["total_amount"] > 1000).collect()

Match rows with regex

df.where(F.col("text_col").rlike(r'hello world'))

Create empty columns with type

df.withColumn('new_column', F.lit(None).cast(T.StringType()))

detect null values in column

df.where(F.col("col").isNull())
#df.where(F.col("col").isNotNull())

Fill null values

df.na.fill("")

#Replace Replace 0 for null on only population column 
df.na.fill(value=0,subset=["population"])

#Conditional filling of na
df.na.fill({"city": "unknown", "type": ""})

Read/write parquet files

#read
s3_path = "PATH"
prod_cat_df = spark.read.parquet(s3_path)
prod_cat_df.show(1, truncate=False, vertical=True)
#write
prod_cat_df.write.parquet(s3_path)
#write with partition
df.write.partitionBy("gender","salary")
        .parquet("s3a://sparkbyexamples/parquet/people2.parquet")
#read with partition
parqDF = spark.read.parquet("s3a://sparkbyexamples/parquet/people2.parquet")
parqDF.createOrReplaceTempView("Table2")
df = spark.sql("select * from Table2  where gender='M' and salary >= 4000")

Stratified sample

fractions = df.select(c.strat_column).distinct().withColumn("fraction", F.lit(prop)).rdd.collectAsMap() # prop like 0.2
print(fractions)
# {2147481832: 0.8, 214748183: 0.8}
df_sampled = df.stat.sampleBy("strat_column", fractions, seed)
df_sampled.count()

Transform to dict

result_data[0].asDict()

Filter columns with/without nulls

df.where(col("A").isNull())

df.where(col("A").isNotNull())

Group by

df.groupBy("colname").sum().show()
# alternative to sum:
# count()
# mean()
# max()
# min()
# sum()
# avg()
# agg()
# agg({"revenue_sales":"max"}) it's also valid
# order:
# orderBy("revenue_sales")

Group by column A keep the rows that are max according to column B

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB')\
    .show()

Group by (count nulls)

# within the group by
F.col('col').isNull().cast("int")

Order by

df.orderBy("revenue_sales").show(5)
# or df.orderBy(df["revenue_sales"].desc()).show(5)

Alias

This help us complement the withColumn functionality in cases where the computation is result of spark transformation.

df.select(avg(“revenue_sales”).alias(“Average”).show()

Null processing

Remove all rows with at least a null value

df.na.drop(how='all')

Remove rows with a least X number of non-null values

df.na.drop(thresh=2).show() # 2 non-null values

Check only given columns

df.na.drop(subset=['col1','col2']).show()

Fill na values

df.na.fill(new_value,subset=['col1', 'col2'])

Example classification

from pyspark.ml.feature import VectorAssembler,VectorIndexer,StringIndexer,OneHotEncoder
#Formatting the categorical column - sex
#Creating a String Indexer - To convert every string into a unique number
sex_string_indexer_direct = StringIndexer(inputCol='sex',outputCol='sexIndexer')
indexed_data = sex_string_indexer_direct.fit(data)
final_string_indexed_data = indexed_data.transform(data)
# Male - 1 and Female 0 or vice versa
#Performing OneHotEncoing - convert this value into an array form
sex_encoder_direct = OneHotEncoder(inputCol='sexIndexer',outputCol='sexVector')
encoded_data = sex_encoder_direct.transform(final_string_indexed_data)
# Male - [1,0] and Female - [0,1] or vice versa
print("Data after OneHotEncoding")
encoded_data.show(4)
assembler_direct = VectorAssembler(inputCols=['age','sexVector','tumor_size'],outputCol='features')
assembler_data = assembler_direct.transform(encoded_data)
final_data_direct = assembler_data.select('features','cancerous')
print("Consolidated Data with accepted features and labels")
final_data_direct.show(3)
#Step 3 - Training our Logistic Regression model
from pyspark.ml.classification import LogisticRegression
logreg_direct = LogisticRegression(featuresCol='features',labelCol='cancerous')
train_data_direct,test_data_direct = final_data_direct.randomSplit([0.6,0.4])
logreg_model_direct = logreg_direct.fit(train_data_direct)
#Step 4 - Evaluating and performing Predictions on our model
#Evaluating our model with testing data
#Direct Evaluation using Trivial method
predictions_labels = logreg_model_direct.evaluate(test_data_direct)
print("Prediction Data")
predictions_labels.predictions.select(['features','cancerous',
'prediction']).show(3)

#Evaluation using BinaryClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
direct_evaluation = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='cancerous')
AUC_direct = direct_evaluation.evaluate(predictions_labels.predictions)
print("Area Under the Curve value is {}".format(AUC_direct))
print("\nCoeffecients are {}".format(logreg_model_direct.coefficients))
print("\nIntercept is {}".format(logreg_model_direct.intercept))

NLP tools

Tokenization

from pyspark.ml.feature import Tokenizer,RegexTokenizer
#Applying Tokenizer class which splits text on whitespaces
simple_tokenizer = Tokenizer(inputCol='text_content',outputCol='tokens_words')
simple_tokens = simple_tokenizer.transform(data)
print("Tokenizer Output - Splitting text on Whitespaces")
simple_tokens.show(truncate=False)

#Applying RegexTokenizer class which splits text on user defined patterns
# Special sequence \W splits on non-alphanumeric characters (in our case it splits on '-')
regex_tokenizer = RegexTokenizer(inputCol='text_content',outputCol='tokens_words',pattern='\\W')
regex_tokens = regex_tokenizer.transform(data)
print("RegexTokenizer Output - Splitting text on special sequence \W")
regex_tokens.show(truncate=False)

Stop words

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('StopWordsRemover').getOrCreate()
from pyspark.ml.feature import StopWordsRemover, Tokenizer
data = spark.read.csv('stopwords.csv',header=True,inferSchema=True)
print("Initial Data")
data.show(truncate=False)
#Applying Tokenizer prior to StopWordsRemover as StopWords takes tokens as its input
simple_tokenizer = Tokenizer(inputCol='text_content',outputCol='tokens_words')
simple_tokens = simple_tokenizer.transform(data)
print("Tokenizer Output - Splitting text on Whitespaces")
simple_tokens.show(truncate=False)
#Applying StopWordsRemover class
stopWords = StopWordsRemover(inputCol='tokens_words',outputCol='stopWordsRemoved')
stopWords_tokens = stopWords.transform(simple_tokens)
print("Data after Stop Words Removal")
stopWords_tokens.select('tokens_words','stopWordsRemoved').show(truncate=False)

TFIDF

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('TFIDF_HashTF').getOrCreate()

from pyspark.ml.feature import Tokenizer,HashingTF,IDF
data = spark.read.csv('reviews_tfidf.csv',header=True,inferSchema=True)
print("Initial Data")
data.show(truncate=False)
#Applying Tokenizer class which splits text on whitespaces
simple_tokenizer = Tokenizer(inputCol='reviews',outputCol='review_tokens')
simple_tokens = simple_tokenizer.transform(data)
print("Tokenizer Output - Splitting text on Whitespaces")
simple_tokens.show(truncate=False)
#Applying HashingTF
hashingtf_vectors = HashingTF(inputCol='review_tokens',outputCol='hashVec')
HashingTF_featurized_data = hashingtf_vectors.transform(simple_tokens)
print("HashingTF Data")
HashingTF_featurized_data.select('review_tokens','hashVec').show(truncate=40)
#Applying CountVectorizer to convert tokens to vectors of token count
# count_vectors = CountVectorizer(inputCol='review_tokens',outputCol='countVec')
# count_vectors_model = count_vectors.fit(simple_tokens)
# countVector_featurized_data = count_vectors_model.transform(simple_tokens)
# print("CountVectorizer Data")
# countVector_featurized_data.select('review_tokens','countVec').show(truncate=False)

#Applying IDF on vectors of token count output from HashingTF
idf = IDF(inputCol='hashVec',outputCol='features')
idf_model = idf.fit(HashingTF_featurized_data)
final_data = idf_model.transform(HashingTF_featurized_data)
print("Final Spark accepted Data - NLP Formatted Data ready to pass into any Machine Learning Model")
final_data.select('label','features').show(truncate=60)

References:

Last updated