HiveMetastoreThrift客户端
上篇⽂章介绍了Metastore Thrift服务端,接下来浅析⼀下Metastore Thrift客户端。
1、IMetaStoreClient接⼝
IMetaStoreClient接⼝定义了Metastore的thrift api,该接⼝中定义了操作元数据的各种⽅法,如下图所⽰。
IMetaStoreClient
2、HiveMetaStoreClient
Hive中IMetaStoreClient的实现类是HiveMetaStoreClient
↳ Proxy()//动态代理类创建代理对象
↳RetryingMetaStoreClient.RetryingMetaStoreClient()
↳wInstance()//反射实例化对象
↳SessionHiveMetaStoreClient.SessionHiveMetaStoreClient()
↳HiveMetaStoreClient.HiveMetaStoreClient()
↳HiveMetaStoreClient.open()
3、建⽴Thrift连接
private void open() throws MetaException {
isConnected = false;
TTransportException tte = null;
boolean useSSL = BoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
boolean useSasl = BoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
boolean useFramedTransport = BoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
boolean useCompactProtocol = BoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
int clientSocketTimeout = (int) TimeVar(
ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
for (URI store : metastoreUris) {
LOG.info("Trying to connect to metastore with URI " + store);
try {
if (useSasl) {
// Wrap thrift connection with SASL for secure connection.
try {
HadoopThriftAuthBridge.Client authBridge =
// check if we should use delegation tokens to authenticate
// the call below gets hold of the tokens if they are set up by hadoop
// this should happen on the map/reduce tasks if the client added the
/
/ tokens into hadoop's credential store in the front end during job
// submission.
String tokenSig = ("ken.signature");
// tokenSig could be null
tokenStrForm = TokenStrForm(tokenSig);
transport = new Host(), Port(), clientSocketTimeout);
if(tokenStrForm != null) {
// authenticate using delegation tokens via the "DIGEST" mechanism
transport = ateClientTransport(null, Host(),
"DIGEST", tokenStrForm, transport,
} else {
String principalConfig =
transport = ateClientTransport(
principalConfig, Host(), "KERBEROS", null,
transport, MetaStoreSaslProperties(conf));
}
} catch (IOException ioe) {
<("Couldn't create client transport", ioe);
throw new String());
}
} else {
if (useSSL) {
......
}
thriftfinal TProtocol protocol;
if (useCompactProtocol) {
protocol = new TCompactProtocol(transport);
} else {
protocol = new TBinaryProtocol(transport);
}
client = new ThriftHiveMetastore.Client(protocol);
try {
if (!transport.isOpen()) {
transport.open();
LOG.info("Opened a connection to metastore, current connections: " + connCount.incrementAndGet()); }
isConnected = true;
} catch (TTransportException e) {
tte = e;
if (LOG.isDebugEnabled()) {
LOG.warn("Failed to connect to the ", e);
} else {
// Don't print full exception trace if DEBUG is not on.
LOG.warn("Failed to connect to the ");
}
}
if (isConnected && !useSasl && BoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){
......
}
} catch (MetaException e) {
<("Unable to connect to metastore with URI " + store
+ " in attempt " + attempt, e);
}
if (isConnected) {
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论