Zeppelin (pyspark) Guide

When data is very large we need to use Zeppelin(pyspark) to speed up the data preprocess.

Read Data

Before start we need to put data to hdfs or s3.

%pyspark
#read single file with header.
df = spark.read.csv("hdfs://[ip]:9000/filename.csv", header = True, sep = '\t', encoding = 'gbk')
#read by wild card and schema example
schema = StructType([
    StructField("af_channel", StringType(), True),
    StructField("af_siteid", StringType(), True)])
df = spark.read.option("basePath","s3a://datavisor-appsflyer/").csv("s3a://datavisor-appsflyer/*_v1.txt.gz", schema = schema, sep = '\t')
#show the top 10 rows, here parameter False will force it to print full content of each column.
df.show(10, False)

Output

#csv
df.write.mode('overwrite').option('sep','\001').csv("hdfs://[ip]:9000/[filename]", compression = "gzip")
#json
df.write.mode('overwirte').json("hdfs://[ip]:9000/[filename]", compression = "gzip")

Add constant column

import pyspark.sql.functions as f
df = df.withColumn("newCol", f.lit("value"))

Rename a column

import pyspark.sql.functions as f
df = df.withColumn(f.col("oldName").alias("newName"))

Select and Filter

Some times we need to select specific columns, and filter by some column

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.filter

import pyspark.sql.functions as f
#select
df = df.select(['col1', 'col2'])
#filter by a column is equal to a value.
df = df.filter(f.col('colName') =='value')
#filter by check if a column is in a set.
df = df.filter(f.col('colName').isin(set(['value1','value2'])))

Join

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

df = df1.join(df2, on=['join_key1', 'join_key2'], how = 'inner')

Concat to DataFrame

Return a new DataFrame containing union of rows in this and another frame.

df = df1.union(df2)

Persist

In pyspark, it will not save all dataframe in memory automatically after we computed it, when it is used multiple times lately, it will recompute multiple times.

So when we need to use a dataframe multiple times, we need to call persist to keep it in memory.

#if we generate df in a complex procedure, and will reuse it, we need to persist it.
df = ... 
df.persist()
#after persist, we use it multiple times. after that we need to unpersist()
df.unpersist()

Count Distinct

import pyspark.sql.functions as f
df.agg(f.countDistinct(f.col('colName')).alias('count')).show()

UDF

UDF is used to do some use defined function to each row of some columns and generate new column.

import pyspark.sql.functions as f
from pyspark.sql.types import StringType
import datetime
timeFormat = '%Y/%m/%d %H:%M:%S'
targetFormat = '%Y-%m-%d %H:%M:%S'
def formatTime(timeStr):
  dt = datetime.datetime.strptime(timeStr, timeFormat)
  return dt.strftime(targetFormat)
formatTimeUDF = f.udf(formatTime, StringType())
df = df.withColumn("time", formatTimeUDF("originTime"))
#we can also assign the new column to the some column.
df = df.withColumn("time", formatTimeUDF("time"))
#udf can also have multiple parameter.
def product(col1, col2):
  return col1*col2
productUDF = f.udf(product, StringType())
df = df.withColumn("amount", productUDF("number", "price"))

GroupBy

Sometime, we need to group by some dimension and do some aggregation.

import pyspark.sql.functions as f
dfNew = df.groupby('userId').agg(f.collect_list(f.col('item')).alias('item_list'), f.sum(f.col('amount')).alias('amount_sum'))

Sort by a Column

Here is an example to show the user with top count items.

dfCount = df.groupby(f.col("userId")).agg(f.countDistinct("item").alias("item_count")).sort(f.desc("item_count"))
dfCount.show(100, False)

Repartition by Date

When we do trial we need to split data into small files by date:

ps: you need to change “fileType” to HADOOP_PART in rawlogconverter config

def format_date(x):
    return datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d')
format_date_udf = sf.udf(format_date)

def outputByDate(raw_df):
    raw_df = raw_df.withColumn('jyrq_date', format_date_udf('jyrq'))
    raw_df.createOrReplaceTempView("raw_tv")
    raw_df.persist()
    days = spark.sql("select distinct jyrq_date from raw_tv").collect()
    for day in days:
        if day['jyrq_date']:
            output_path = 'hdfs://10.1.94.98:9000/rawdata/%s/rawlog.%s_235959.event.json'%(day['jyrq_date'],day['jyrq_date'])
            print('handling day:%s to path:%s'%(day['jyrq_date'], output_path))
            repartition_df = spark.sql("select * from raw_tv where jyrq_date = %s"%(day['jyrq_date'])).coalesce(1)
            repartition_df.write.option("compression", "gzip").json(output_path)
    raw_df.unpersist()

Add a Comment

您的邮箱地址不会被公开。 必填项已用 * 标注