Spark SQL⽀持通过JDBC直接读取数据库中的数据,这个特性是基于JdbcRDD实现。返回值作为DataFrame返回,这样可以直接使⽤Spark SQL并跟其他的数据源进⾏join操作。JDBC数据源可以很简单的通过Java或者Python,⽽不需要提供ClassTag。注意这与Spark SQL JDBC server不同,后者是基于Spark SQL执⾏查询。
要保证能使⽤需要把对应的jdbc驱动放到spark的classpath中。⽐如,想要连接postgres可以在启动命令中添加jars:
bin/spark-shell
--driver-class-path postgresql-9.4.1207.jar
--jars postgresql-9.4.1207.jar
远程数据库的表可以加载成DataFrame或者注册成Spark SQL的临时表,⽤户可以在数据源选项中配置JDBC相关的连接参数。user和password⼀般是必须提供的参数,另外⼀些参数可以参考下⾯的列表:
partitionColumn, lowerBound, upperBound
指定时这三项需要同时存在,描述了worker如何并⾏读取数据库。其中partitionColumn必须是数字、dat
e、timestamp,lowerBound 和upperBound只是决定了分区的步长,⽽不会过滤数据,因此表中所有的数据都会被分区返回。该参数仅⽤于读。
numPartitions
读写时的最⼤分区数。这也决定了连接JDBC的最⼤连接数,如果并⾏度超过该配置,将会使⽤coalesce(partition)来降低并⾏度。
queryTimeout
driver执⾏statement的等待时间,0意味着没有限制。写⼊的时候这个选项依赖于底层是如何实现setQueryTimeout的,⽐如h2 driver 会检查每个query。默认是0
fetchSize
fetch的⼤⼩,决定了每⼀个fetch,拉取多少数据量。这个参数帮助针对默认⽐较⼩的驱动进⾏调优,⽐如oracle默认是10⾏。仅⽤于读操作。
batchSize
batch⼤⼩,决定插⼊时的并发⼤⼩,默认1000。
isolationLvel
事务隔离的等级,作⽤于当前连接。可以配置成NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 依赖于底层jdbc提供的事务隔离,默认是READ_UNCOMMITTED。这个选项仅⽤于写操作。
sessionInitStatment
每个数据库session创建前执⾏的操作,⽤于初始化。如定义⼀些触发器操作。如 BEGIN execute immediate 'alter session set
"_serial_direct_read"=true'; END;
truncate
写操作选项,当使⽤SaveMode.Overwrite时,该选项⽤于是否直接删除并重建表。当表结构发现变化的时候会失效。默认是false。
cascadeTruncate
写操作选项,是否开启级联删除。
createTableOptions
写操作选项,⼀般⽤于配置特殊的分区或者数据库配置,⽐如 CREATE TABLE t (name string) ENGINE=InnoDB
createTableColumnTypes
配置数据库字段的类型,⽐如 name CHAR(64), comments VARCHAR(1024),仅⽀持spark sql中⽀持的数据类型。
customSchema
⾃定义读取的schema信息,⽐如 id DECIMAL(38, 0), name STRING 。可以配置部分字段,其他的使⽤默认的类型映射,⽐如 id DECIMAL(38, 0)。仅⽤于读操作。
pushDownPredicate
该选项⽤于开启或禁⽤jdbc数据源的谓词下推。默认是true。如果配置为false,那么所有的filter操作都会由spark来完成。当过滤操作⽤spark更快时,⼀般才会关闭下推功能。
// 加载jdbc
val jdbcDF = ad
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
// 使⽤propeties
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = ad
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 指定⾃定义的schema信息
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = ad
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 保存jdbc
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.
option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 指定⾃定义schema映射
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
>session如何设置和读取

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。