spark读写Oracle、hive的艰⾟之路(⼆)-Oracle的date类型近期⼜有需求为:导⼊Oracle的表到hive库中;
关于spark读取Oracle到hive有以下两点需要说明:
 1、数据量较⼩时,可以直接使⽤ad.jdbc(orclUrl,table_name,orclProperties)读取,效率应该没什么问题,能很快完成;
 2、数据量较⼤时候,使⽤ad.jdbc(orclUrl,table_name,分区条件,orclProperties)⽅法,分区读取,该⽅法可根据分区条件同时多线程读取;原理为在读取Oracle的SQL最后加⼊where+不同的分区条件;例如oracle 中的id为1~10;分区之后为where id >=1 and id <=5和where id >=6 and id <=10;两个线程同时读取;
源码如下:spark2.2.0;请注意看官⽅注释
/**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table using connection properties. The `predicates` parameter gives a list
* expressions suitable for inclusion in WHERE clauses; each one defines one partition
* of the `DataFrame`.
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param predicates Condition in the where clause for each partition.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
*                            tag/value. Normally at least a "user" and "password" property
*                            should be included. "fetchsize" can be used to control the
*                            number of rows per fetch.
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {
assertNoSpecifiedSchema("jdbc")
// connectionProperties should override settings in extraOptions.
val params = Map ++ Map
val options = new JDBCOptions(url, table, params)
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
}
在实际⼯作中发现。spark读取Oracle时,Oracle中的date类型并不能得到很好的⽀持,例如:2018-10-10 23:00格式的时间,在去读取到hive表中之后只剩下了2018-10-10,⼩时和分钟没了;
可⾏的解决⽅案如下:重写java的⽅⾔,代码如下:
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.pes.DataType;
import org.apache.pes.DataTypes;
import org.apache.pes.MetadataBuilder;
import scala.Option;
import java.sql.Types;
public class OracleDateTypeInit {
public static void oracleInit() {
JdbcDialect dialect = new JdbcDialect() {
//判断是否为oracle库
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle");
}
//⽤于读取Oracle数据库时数据类型的转换
@Override
public Option<DataType> getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md) {
if (sqlType == Types.DATE && typeName.equals("DATE") && size == 0)
return Option.apply(DataTypes.TimestampType);
pty();
}
//⽤于写Oracle数据库时数据类型的转换
@Override
public Option<JdbcType> getJDBCType(DataType dt) {
if (DataTypes.StringType.sameType(dt)) {
return Option.apply(
new JdbcType("VARCHAR2(255)", Types.VARCHAR));
} else if (DataTypes.BooleanType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(1)", Types.NUMERIC));
} else if (DataTypes.IntegerType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(10)", Types.NUMERIC));
} else if (DataTypes.LongType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(19)", Types.NUMERIC));
} else if (DataTypes.DoubleType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(19,4)", Types.NUMERIC));
} else if (DataTypes.FloatType.sameType(dt)) {oracle decimal类型
return Option.apply(
new JdbcType("NUMBER(19,4)", Types.NUMERIC));
} else if (DataTypes.ShortType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(5)", Types.NUMERIC));
} else if (DataTypes.ByteType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(3)", Types.NUMERIC));
} else if (DataTypes.BinaryType.sameType(dt)) {
return Option.apply(
new JdbcType("BLOB", Types.BLOB));
} else if (DataTypes.TimestampType.sameType(dt)) {
return Option.apply(
new JdbcType("DATE", Types.DATE));
} else if (DataTypes.DateType.sameType(dt)) {
return Option.apply(
new JdbcType("DATE", Types.DATE));
} else if (ateDecimalType()
.sameType(dt)) { //unlimited
/*                    return DecimalType.Fixed(precision, scale)
=>Some(JdbcType("NUMBER(" + precision + "," + scale + ")",                            java.sql.Types.NUMERIC))*/
return Option.apply(
new JdbcType("NUMBER(38,4)", Types.NUMERIC));
}
pty();
}
};
//注册此⽅⾔
}
}
使⽤时调⽤就可以了
//spark直接读取hive之后date类型的数据只剩年⽉⽇了,需要转为acleInit()

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。