Zeppelin (pyspark) Guide
When data is very large we need to use Zeppelin(pyspark) to speed up the data preprocess.
Read Data
Before start we need to put data to hdfs or s3.
%pyspark #read single file with header. df = spark.read.csv("hdfs://[ip]:9000/filename.csv", header = True, sep = '\t', encoding = 'gbk') #read by wild card and schema example schema = StructType([ StructField("af_channel", StringType(), True), StructField("af_siteid", StringType(), True)]) df = spark.read.option("basePath","s3a://datavisor-appsflyer/").csv("s3a://datavisor-appsflyer/*_v1.txt.gz", schema = schema, sep = '\t') #show the top 10 rows, here parameter False will force it to print full content of each column. df.show(10, False)
Output
#csv df.write.mode('overwrite').option('sep','\001').csv("hdfs://[ip]:9000/[filename]", compression = "gzip") #json df.write.mode('overwirte').json("hdfs://[ip]:9000/[filename]", compression = "gzip")
Add constant column
import pyspark.sql.functions as f df = df.withColumn("newCol", f.lit("value"))
Rename a column
import pyspark.sql.functions as f df = df.withColumn(f.col("oldName").alias("newName"))
Select and Filter
Some times we need to select specific columns, and filter by some column
import pyspark.sql.functions as f #select df = df.select(['col1', 'col2']) #filter by a column is equal to a value. df = df.filter(f.col('colName') =='value') #filter by check if a column is in a set. df = df.filter(f.col('colName').isin(set(['value1','value2'])))
Join
df = df1.join(df2, on=['join_key1', 'join_key2'], how = 'inner')
Concat to DataFrame
Return a new DataFrame containing union of rows in this and another frame.
df = df1.union(df2)
Persist
In pyspark, it will not save all dataframe in memory automatically after we computed it, when it is used multiple times lately, it will recompute multiple times.
So when we need to use a dataframe multiple times, we need to call persist to keep it in memory.
#if we generate df in a complex procedure, and will reuse it, we need to persist it. df = ... df.persist() #after persist, we use it multiple times. after that we need to unpersist() df.unpersist()
Count Distinct
import pyspark.sql.functions as f df.agg(f.countDistinct(f.col('colName')).alias('count')).show()
UDF
UDF is used to do some use defined function to each row of some columns and generate new column.
import pyspark.sql.functions as f from pyspark.sql.types import StringType import datetime timeFormat = '%Y/%m/%d %H:%M:%S' targetFormat = '%Y-%m-%d %H:%M:%S' def formatTime(timeStr): dt = datetime.datetime.strptime(timeStr, timeFormat) return dt.strftime(targetFormat) formatTimeUDF = f.udf(formatTime, StringType()) df = df.withColumn("time", formatTimeUDF("originTime")) #we can also assign the new column to the some column. df = df.withColumn("time", formatTimeUDF("time")) #udf can also have multiple parameter. def product(col1, col2): return col1*col2 productUDF = f.udf(product, StringType()) df = df.withColumn("amount", productUDF("number", "price"))
GroupBy
Sometime, we need to group by some dimension and do some aggregation.
import pyspark.sql.functions as f dfNew = df.groupby('userId').agg(f.collect_list(f.col('item')).alias('item_list'), f.sum(f.col('amount')).alias('amount_sum'))
Sort by a Column
Here is an example to show the user with top count items.
dfCount = df.groupby(f.col("userId")).agg(f.countDistinct("item").alias("item_count")).sort(f.desc("item_count")) dfCount.show(100, False)
Repartition by Date
When we do trial we need to split data into small files by date:
ps: you need to change “fileType” to HADOOP_PART in rawlogconverter config
def format_date(x): return datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d') format_date_udf = sf.udf(format_date) def outputByDate(raw_df): raw_df = raw_df.withColumn('jyrq_date', format_date_udf('jyrq')) raw_df.createOrReplaceTempView("raw_tv") raw_df.persist() days = spark.sql("select distinct jyrq_date from raw_tv").collect() for day in days: if day['jyrq_date']: output_path = 'hdfs://10.1.94.98:9000/rawdata/%s/rawlog.%s_235959.event.json'%(day['jyrq_date'],day['jyrq_date']) print('handling day:%s to path:%s'%(day['jyrq_date'], output_path)) repartition_df = spark.sql("select * from raw_tv where jyrq_date = %s"%(day['jyrq_date'])).coalesce(1) repartition_df.write.option("compression", "gzip").json(output_path) raw_df.unpersist()