SparkSQL内置函数:聚合函数,集合函数,⽇期操作,数学,字符串操作
SparkSQL⾃带了⼀组功能丰富的内置函数,这些函数既可以⽤于DataFrame API, 也可以⽤于SQL接⼝。
内置函数可以分为⼏类: 聚合操作, 集合操作, ⽇期/时间, 数学, 字符串,窗⼝操作,其他。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("fig.option","some-value") \
.getOrCreate()
# 创建DataFrame, customers, products
customers =[(1,'James',21,'M',100),(2,"Liz",25,"F",150),(3,"John",31,"M",120),\
(4,"Jennifer",45,"F",300),(5,"Robert",41,"M",120),(6,"Sandra",45,"F",200)]
df_customers = ateDataFrame(customers,["cID","name","age","gender","income"])# list -> DF
聚合操作
approx_count_distinct/countDistinct: 计算某⼀列或⼏列不同元素的个数
avg: 平均数, count: 个数; first: ⾸个元素, last:最后⼀个元素; max/min, mean, sum, sumDistinct
var_pop: 总体⽅差, var_samp/variance 计算样本⽅差; stddev_pop, stddev_samp: 标准差
covar_pop, covar_samp 协⽅差,.corr 相关系数
kurtosis: 峰度; .skewness:偏度
df = ateDataFrame([('2015-04-08',)],['dt'])
df_customers.agg(F.approx_count_distinct(df_customers.age).alias('distinct_ages')).collect()# [Row(distinct_ages=5)]
df_customers.untDistinct("age","name").alias('distinct_ages')).collect()# [Row(distinct_ages=6)]
df_customers.select(F.avg('age')).collect()
df_customers.an('age')).collect()# [Row(avg(age)=34.666666666666664)]
df_customers.select(F.sum('age')).collect()# [Row(sum(age)=208)]
df_customers.select(F.max('age')).collect()# [Row(max(age)=45)]
df_customers.select(F.min('age').alias('min_age')).collect()# [Row(min_age=21)]
upby('gender').agg(F.first('age').alias('first_age')).collect()#  [Row(gender='F', first_age=25), Row(gender='M', first_age=21)]
df_customers.select(F.last('age').alias('last_age')).collect()# [Row(last_age=45)]
df_customers.select(F.sumDistinct('age').alias('sumdis_age')).collect()# [Row(sumdis_age=163)]
df_customers.select(F.var_samp('age').alias("variance")).collect()# [Row(variance=109.46666666666665)]
df_customers.var_samp("age","income")).collect()# [Row(covar_samp(age, income)=534.0000000000001)]
集合操作
array_contains: 包含某个元素
array_distinct: 删除重复的元素
array_except 返回⼀个array 在第⼀个col 不在第⼆个col的元素
array_intersect(col1, col2) 交集
array_max, array_min, array_position(col, value):第⼀次出现value的位置,从1开始array_remove(col, element) 删除出现的元素
array_repeat(col, count): 创造⼀个数组,count为重复的个数
array_sort(col): 对数组进⾏排序
array_union(col1, col2), 合并操作
array_overlap, 判断是否有重叠
array_zip, 合并,类似于python的zip
reverse: 逆序; shuffle: 打乱; size, slice, sort_array
df = ateDataFrame([(["a","b","c"],),([],)],['data'])
df.show()
'''
|    data|
+---------+
|[a, b, c]|
|      []|
+---------+
'''
df.select(F.array_contains(df.data,"a")).collect()## df.data 每⼀个元素需要是⼀个array
# [Row(array_contains(data, a)=True), Row(array_contains(data, a)=False)]
df = ateDataFrame([([1,2,3,2],),([4,5,5,4],)],['data'])
df.select(F.array_distinct(df.data)).collect()
# [Row(array_distinct(data)=[1, 2, 3]), Row(array_distinct(data)=[4, 5])]
from pyspark.sql import Row
df = ateDataFrame([Row(c1=["b","a","c"], c2=["c","d","a","f"])])
df.select(F.array_except(df.c1, df.c2)).collect()
# [Row(array_except(c1, c2)=['b'])]
df = ateDataFrame([(["c","b","a"],),([],)],['data'])
df.select(F.array_position(df.data,"a")).collect()
#[Row(array_position(data, a)=3), Row(array_position(data, a)=0)]
df = ateDataFrame([Row(c1=["b","a","c"], c2=["c","d","a","f"])])
df.select(F.array_union(df.c1, df.c2)).collect()
# [Row(array_union(c1, c2)=['b', 'a', 'c', 'd', 'f'])]
## faltten:拉直操作,如果有多个⽔平,合并成⼀维数组
df = ateDataFrame([([[1,2,3],[4,5],[6]],),([None,[4,5]],)],['data'])
df.select(F.flatten(df.data).alias('r')).collect()
# [Row(r=[1, 2, 3, 4, 5, 6]), Row(r=None)]
⽇期操作
转换
# 将unix 时间戳转化为时间戳字符串
time_df = ateDataFrame([(1576425600,)],['unix_time'])
time_df.select(F.from_unixtime('unix_time').alias('ts')).collect()
# [Row(ts='2019-12-16 00:00:00')]
# 将时间戳字符串转化为unix时间戳
time_df = ateDataFrame([('2019-12-16',)],['dt'])
time_df.select(F.unix_timestamp('dt','yyyy-MM-dd').alias('unix_time')).collect()
# [Row(unix_time=1576425600)]
## to_date, Converts a Column of pes.StringType or pes.TimestampType into pes.DateType time_df._date(time_df.dt).alias('date')).collect()# [Row(date=datetime.date(2019, 12, 16))]
time_df._timestamp(time_df.dt).alias('date')).collect()# [Row(date=datetime.datetime(2019, 12, 16, 0, 0))]
time_df._utc_timestamp(time_df.dt,"PST").alias('utc_time')).collect()# [Row(utc_time=datetime.datetime(2019, 12, 16, 8, 0))] # date_format(date, format)
time_df.select(F.date_format('dt','MM/dd/yyy').alias('date')).collect()# [Row(date='12/16/2019')]
抽取
# year, month, week,
time_df = ateDataFrame([('2019-12-16 13:08:15',)],['dt'])
time_df.ar(time_df.dt).alias('year')).collect()# [Row(year=2019)]
datediff是字符型函数
# 提取⼀年的第⼏周  weekofyear(col); dayofyear(col), weekofmonth, dayofmonth,
time_df.select(F.weekofyear(time_df.dt).alias('week')).collect()# [Row(week=51)]
time_df.select(F.dayofyear(time_df.dt).alias('day')).collect()# [Row(day=350)]
time_df.select(F.dayofmonth(time_df.dt).alias('day')).collect()# [Row(day=16)]
time_df.h(time_df.dt).alias('month')).collect()# [Row(month=12)]
time_df.select(F.hour(time_df.dt).alias('hour')).collect()# [Row(hour=13)]
time_df.select(F.minute(time_df.dt).alias('minute')).collect()#  [Row(minute=8)]
time_df.select(F.second(time_df.dt).alias('second')).collect()#  [Row(second=15)]
时间运算
# 下⼀天
time_df.select(F.date_add(time_df.dt,1).alias('next_date')).collect()# [Row(next_date=datetime.date(2019, 12, 17))]
# 前⼀天
time_df.select(F.date_sub(time_df.dt,1).alias('next_date')).collect()# [Row(next_date=datetime.date(2019, 12, 15))]
# ⽇期截断
df = ateDataFrame([('1997-02-28 05:02:11',)],['t'])
df.select(F.date_trunc('year', df.t).alias('year')).collect()# [Row(year=datetime.datetime(1997, 1, 1, 0, 0))]
df.select(F.date_trunc('month', df.t).alias('month')).collect()# [Row(year=datetime.datetime(1997, 2, 1, 0, 0))]
# ⽇期差
df = ateDataFrame([('2015-04-08','2015-05-10')],['d1','d2'])
df.select(F.datediff(df.d2, df.d1).alias('diff')).collect()# [Row(diff=32)]
## last_day, next_day, months_between
df = ateDataFrame([('1997-02-28 05:02:11',)],['dt'])
df.select(F.add_months(df.dt,1).alias('next_month')).collect()# [Row(next_month=datetime.date(1997, 3, 31))]
df.select(F.last_day(df.dt).alias('last_day')).collect()# [Row(last_day=datetime.date(1997, 2, 28))]
df._day(df.dt,'Sun').alias('next_day')).collect()# [Row(next_day=datetime.date(1997, 3, 2))]
df = ateDataFrame([('1997-02-28 10:30:00','1996-10-30')],['date1','date2'])
df.hs_between(df.date1, df.date2).alias('months')).collect()# [Row(months=3.94959677)]
数学
⽤于处理数值类型的列
‘’’
abs, ceil, floor
cos, acos, sin, asin, tan, atan, atan2
exp, factorial(阶乘), hypot(平⽅和开根号), log, log10, log2, pow, round, shiftLeft, sqrt, log1p(col):log(x+1) hex (⼗六进制)
‘’’
字符串
字符串分割,⼤⼩写转化,删除⾸尾空⽩, 正则表达式匹配
‘’’
ltrim, rtrim, trim
ascii, base64, unbase64, decode, encode, format_string
repeat, reverse
instr(str, substr): 第⼀次出现的位置; locate(substr, str, pos=1)
字符拼接:concat, concat_ws
分割,⼦串: split, substring, substring_index
isnan, isnull, nanvl(col1, col2): 返回col1, 如果col1不为nan, 否则返回col2 lower, upper
‘’’
其他
lit(col): reates a Column of literal value.
rand, randn 产⽣随机数, sequence(start, stop, step=None), shuffle 打乱

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。