Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions .idea/sparkdevelop.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions 08-RowDemo/RowDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pyspark.sql.functions import *
from pyspark.sql.types import *

from lib.logger import Log4j
##from lib.logger import Log4j


def to_date_df(df, fmt, fld):
Expand All @@ -16,7 +16,7 @@ def to_date_df(df, fmt, fld):
.appName("RowDemo") \
.getOrCreate()

logger = Log4j(spark)
##logger = Log4j(spark)

my_schema = StructType([
StructField("ID", StringType()),
Expand Down
44 changes: 44 additions & 0 deletions 21-DataFrames/lib/DatabricksSample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.sql.functions import spark_partition_id

if __name__ == "__main__":

spark = SparkSession \
.builder \
.appName("DatabricksSample") \
.master("local[2]") \
.getOrCreate()

logDF = spark.read\
.option("header",True)\
.csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv")\
##.sample(withReplacement=False, fraction=0.3, seed=3)

from pyspark.sql.functions import col

serverErrorDF = logDF.filter((col("code")>=500)& (col("code")<600)).select("date", "time", "extention", "code")
logDF.show()
logDF.createOrReplaceTempView("total")
sqlDF = spark.sql("select ip,count(ip) from total group by ip order by count")

#ipCountDF = spark.sql("select ip,count(ip) as count from total group by ip order by count desc")
sqlDF.show()

from pyspark.sql.functions import from_utc_timestamp, hour, col
countvalue=serverErrorDF.count()
print(countvalue)

serverErrorDF.show(100)

countsDF = (serverErrorDF
.select(f.minute(from_utc_timestamp(col("time"), "GMT")).alias("hour"))
.groupBy("hour")
.count()
.orderBy("hour")
)

countsDF.show()

spark.stop()
121 changes: 121 additions & 0 deletions 21-DataFrames/lib/Dataframe_Creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":

spark = SparkSession \
.builder \
.appName("dateFormatting") \
.master("local[2]") \
.getOrCreate()


my_schema_date = StructType([
StructField("ID", StringType()),
StructField("EventDatestr", StringType())
])

my_schema_timestamp= StructType([
StructField("ID", StringType()),
StructField("EventTimestamp", StringType())
])

my_rows_date = [Row("123", "04/05/2020"),
Row("124", "3/05/2020" ),
Row("125", "04/05/2020"),
Row("126", "04/05/2020"),
Row("127", "04/05/2020"),
Row("128", "04/05/2020"),
Row("129", "04/05/2020"),
Row("130", "8/3/2020"),
Row("131", "11/12/2020"),
Row("132", "04/13/2020")]

my_rows_timestamp = [Row("123", "2020-02-28 12:30:00"),
Row("234", "2020-02-28 12:30:00") ,
Row("233", "2020-02-28 10:30:00") ,
Row("343", "2020-02-28 11:30:00") ,
Row("434", "2020-02-28 14:30:00") ,
Row("343", "2020-02-28 10:30:00") ,
Row("353", "2020-02-28 10:30:00") ,
Row("453", "2020-02-28 23:30:00") ,
Row("137", "2020-02-28 14:30:00") ]

my_schema_list = StructType([
StructField("ID", IntegerType())
])

my_row_list = [Row(123)]

my_rdd_list = spark.sparkContext.parallelize(my_row_list)

my_df_list= spark.createDataFrame(my_rdd_list, my_schema_list)

my_df_list.show()

print(type(my_df_list))

mvv = my_df_list.select(max("ID")).rdd.flatMap(lambda x: x).collect()

print(type(mvv))
print(mvv)

for i in mvv:
xy=i
print(xy)
print(type(xy))

#list(my_df_list.select('ID').toPandas()['ID']) # =>

#print(list)

my_rdd_date = spark.sparkContext.parallelize(my_rows_date)

my_df_date = spark.createDataFrame(my_rdd_date, my_schema_date)

my_rdd2 = spark.sparkContext.parallelize(my_rows_timestamp)

my_df_timestamp = spark.createDataFrame(my_rdd2, my_schema_timestamp)

from pyspark.sql.functions import col

dfwithDate = my_df_date.withColumn("EventDatetype",to_date(col("EventDatestr"), 'M/d/yyyy') )

dfwithDate.show()

dfwithTimestamp=my_df_timestamp.withColumn("EventTimestamptype", to_timestamp(col("EventTimestamp"), 'yyyy-MM-dd HH:mm:ss'))

dfwithTimestamp2 = my_df_timestamp.select("ID",to_timestamp("EventTimestamp",'yyyy-MM-dd HH:mm:ss').alias("new"))

my_df_timestamp.show()

dfwithTimestamp2.show()


dfwithTimestamp3 = my_df_timestamp.withColumn("Timeintimestamp", to_timestamp("EventTimestamp", 'yyyy-MM-dd HH:mm:ss')) \
.withColumn("ID2", col("ID").cast(IntegerType()))\
.withColumn("ID3", col("ID2")*2)\
.withColumnRenamed("ID3","Transformed_Integer_column")\
.drop("ID2")

dfwithTimestamp3.show()

cols = set(dfwithTimestamp3.columns)
print(cols)

print("dfwithTimestamp2")
minmax = dfwithTimestamp3.select(min("Transformed_Integer_column")).collect().map(_(0)).toList


print(type(minmax))

dfwithTimestamp3.agg({'Transformed_Integer_column': 'max'}).show()

newdf5=dfwithTimestamp3.select(hour(from_utc_timestamp(col("Timeintimestamp"), "GMT")).alias("hour"))\
.groupBy("hour").count().orderBy("hour")
newdf5.show()


spark.stop()
61 changes: 61 additions & 0 deletions 21-DataFrames/lib/JSON_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as f

if __name__ == "__main__":

spark = SparkSession \
.builder \
.appName("jsonread") \
.master("local[2]") \
.getOrCreate()


smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
smartphoneDF.show()

smartphoneDF.printSchema()
smartphoneDFschema = smartphoneDF.schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType

schemaSMS = StructType([
StructField("SMS", StringType(), True)
])




# Here is the full schema as well
fullSchema = StructType([
StructField("SMS", StructType([
StructField("Address",StringType(),True),
StructField("date",StringType(),True),
StructField("metadata", StructType([
StructField("name",StringType(), True)
]), True),
]), True)
])

## Filter using SQL expression
SMSDF = (spark.read
.schema(schemaSMS)
.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
.filter("SMS is not null")
)
# Filter using column
SMSDF2 = (spark.read
.schema(fullSchema)
.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
.filter(f.col("SMS").isNotNull())
)


SMSDF.show()
SMSDF2.show()


SMSDF2.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False)

spark.stop()
52 changes: 52 additions & 0 deletions 21-DataFrames/lib/SampleUDF.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":

spark = SparkSession \
.builder \
.appName("dateFormatting") \
.master("local[2]") \
.getOrCreate()

# Creating a function
def cube(x):
return x*x*x

def strlen_nullsafe(x):
return len(x)

# Register the function as udf
spark.udf.register("udfcube" ,cube,LongType())

spark.range(1, 20).createOrReplaceTempView("test1")
sqlDF=spark.sql("select id,udfcube(id) as qubedID from test1")
sqlDF.show()

spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")

#spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")

my_schema = StructType([
StructField("ID", StringType()),
StructField("EventDate", StringType())])

my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row(None, "04/5/2020"), Row("126", "4/05/2020")]
my_rdd = spark.sparkContext.parallelize(my_rows, 2)
my_df = spark.createDataFrame(my_rdd, my_schema)

my_df.createOrReplaceTempView("test2")

sqlDF = spark.sql("select id,strlen_nullsafe(id) as strlen_nullsafe from test2")

print("lambdadf :")
sqlDF.show()

sqldf = spark.sql("select id ,strlen_nullsafe(id) from test2 where id is not null and strlen_nullsafe(id) > 1")
print("sqldf :")
sqldf.show()

spark.stop()

19 changes: 19 additions & 0 deletions 21-DataFrames/lib/SelectingColumns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

from pyspark.sql import *


spark = SparkSession \
.builder \
.appName("DatabricksSample") \
.master("local[2]") \
.getOrCreate()

logDF = spark.read\
.option("header",True)\
.csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv")\
.sample(withReplacement=False, fraction=0.3, seed=3)

logDF.show(50)


spark.stop()
Empty file added 21-DataFrames/lib/__init__.py
Empty file.
Empty file.
Loading