FLINKNotebook混合编程:PYTHON(⼀)
本⽂介绍了 Py4j的使⽤以及 Flink官⽅如何使⽤ Py4j进⾏混合语⾔编程,最后会介绍下我们会应⽤这种技术在我们的 Flink
Notebook 服务,来创建⼀个混合语⾔编程环境。
Flink Notebook 服务是我司⾃研的基于Notebook⽅式的Flink 开发平台,他⽀持⽤户通过SQL⽅式和JAR包⽅式进⾏混合编程,并通过⼀些配置,既可完全的在页⾯上完成FLINK任务的开发⼯作,如图:
通过不同的Notebook Type我们可以加载不同类型的组件,通过table结果集流转⽅式,承接上下游,以完成相应的功能。⽬前的插件类型主要是主要分:1.SQL组建:可以⾃由撰写SQL,2.格式化组建:sink或者source,有具体的格式,标准的前端组建对应,3.JAR包⾃定义组件,通过⽤户上传⾃⼰开发的jar包完成对应的逻辑。
对于Jar包⾃定义组件来说,他是为了解决1%的特异性需求的,但问题是其代码不可见,逻辑也相对⾃由,有违Notebook的初衷,因此,我们想设计⼀种Notebook的Type,⽀持可视化的Python编写,可以直接将代码在页⾯上进⾏开发。
Flink 本⾝来说,就有PyFlink 和 Python UDF support,因此python和 flink的耦合度应该很⾼,所以我们要了解Flink是怎么做的,从⽽研究我们应该如何去做,所以本⽂会分成以下3个部分来介绍整个混编逻辑:
1. Java与Python 通信:Py4J
2. Py4j in Flink
3. Notebook with Python
Py4j 介绍
Py4j可以使运⾏于python解释器的python程序动态的访问java虚拟机中的java对象。Java⽅法可以像java对象就在python解释器⾥⼀样被调⽤, java collection也可以通过标准python collection⽅法调⽤。Py4j也可以使java程序回调python对象。
安装以及基本使⽤也可以参考官⽹
Py4j可以在系统中创建⼀个 java和python 之间通信的socket管道。
我们可以通过⼀个例⼦来看整个Py4j是如何⼯作的。
我们先创建⼀个想让python负责具体实现的Java 接⼝:
public interface TestEnterPoint {
String gift(HashMap<String,String> a, String b);
}
在java 服务端,我们通过以下代码可以启动⼀个简单的Py4j监听:
public static void main(String[] args){
ListenerApplication application =new ListenerApplication();
GatewayServer server =new GatewayServer(application);
server.start(true);
}
ListenerApplication 表⽰⼀个允许共享给python的类,她可以是任意java类,包括Map,List等复杂结构化数据:
public class ListenerApplication {
TestEnterPoint enterPoint =new TestEnterPoint();
public void setListener(TestEnterPoint enterPoint){
}
public void notifyAllListeners(){
HashMap<String,String> map =new HashMap<>();
map.put("a","aaaa");
Object returnValue = listener.gift(map,"a");
System.out.println(returnValue);
}
}
⽽在Python端,我们可以通过以下代码运⾏⼀个python程序:
from py4j.java_gateway import JavaGateway, CallbackServerParameters
class TestEnterPoint(object):
def gift(self,map, key):
(key)
class Java:
implements =["st.py.TestEnterPoint"]
if __name__ =="__main__":
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters())
listener = TestEnterPoint()
<_point.setListener(listener)
<_ifyAllListeners()
gateway.shutdown()
这样我们就通过Python来实现了⼀个 (key) 的⽅法
整个过程中,我们看出⼏点对于python来说⽐较基本的使⽤⽅式,那就是,第⼀,通过Python 中implements =
["st.py.TestEnterPoint"]的使⽤⽅式,我们可以实现⼀个Java的Interface,第⼆,通过_point的⽅式,我们可以拿到java中设置的可共享变量,第三个我们在例⼦中并没有呈现,但也是⾮常基础的使⽤,就是通过在python中使⽤
xx.test.py.TestServer的⽅式,允许python使⽤任何java的class,允许初始化,允许调⽤⽅法,但是他们如果想和java 端进⾏数据通信,则必须通过entry_point来实现。
Py4j in Flink
讲完Py4j并且如果把上⾯的代码⾃⼰拿来试下,应该已经对整个python和java互通有⼀定理解了,那么我们Flink中如何使⽤Py4J来进⾏混编,也就顺理成章的很好理解了,在Flink中,有很多地⽅使⽤到了这种技术,包括PyFlink,以及Python UDF support,PyFlink 属于Pyton为主,也⽐较复杂,这边就先就以简单的Python UDF为例,梳理下Flink的执⾏逻辑。
在Flink Java中如何使⽤Python UDF
在Flink 中使⽤Python的UDF相对来说⾮常简单,创建⼀个Python代码,⽐如:
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(s:str):
place('hello','ni hao')
在Flink Java中,需要配置Python环境变量,⾸先将Python⽂件加到环境中去,如果是集提交,需要加到依赖中去(使⽤-pyfs 提交Python⽂件),或者远程的Hdfs⽂件。其次需要配置Python的程序依赖环境路径:
configuration.setString("python.files","/Users/yourName/test.py");
configuration.setString("utable","python3");
configuration.setString("utable","/usr/bin/python3");
最后,我们在使⽤过程中,⽐如通过SQL使⽤时候,只需要如下SQL语句即可:
create temporary system function func1 as'test1.func1' language python
其中test1是python的⽂件名"test1.py"⽽ func1就是上⽂中的那个python 的function name,如此既可以在java中使⽤python实现的UDF
Flink是如何实现这些的
在追踪Flink Sql是如何执⾏create function过程中,我们发现整个Flink的执⾏流程⼤致如图:
Flink会通过语法解析后的通过create function的后缀“ language python”判断是否是Python fuction,如
果是,会调⽤PythonFunctionUtils来获取function,⽽PythonFunctionUtil最终通过动态加载的PythonFunctionFactory来最终调⽤Py4j。这⾥可以看见他的逻辑其实也⽐较简单,⾸先就是启动Py4j的Java端server,然后主要就是通过环境变量,以及configture ⾥的各种参数,最终拼接出python的cmd 执⾏命令,运⾏命令并通过entryPoint获取其中的贡献类。最终⽣成我们在java端可以⽤的function。
这块如果有兴趣,在Flink源码中搜索 PythonFunctionFactory 可以直接看见相关代码。
Notebook 混编Python
我们平台是类似Zeeplin的可视化Notebook编程页⾯,对于我们来说,要在页⾯上⽀持Python编程,有⼏种⽅案:
只⽀持Python UDF
java调用python模型以PyFlink为基础,配置混合编程⽅案
以Java版为基础,配置混合编程⽅案
⽅案⼀对于我们来说并不难,可以看到Flink官⽅既是⽀持Python UDF的,我们只需要将这个Noteboo
k Part⾥的内容,⽣成Python⽂件并添加到环境中⼀起提交即可,但这种⽅案没法解决我们上⾯提出的⼀⼤痛点,⽤户的1%需要Jar包开发的⾮标任务,不是单单可以通过UDF来实现的。
⽅案⼆对于我们来说,最⼤的问题是所有的优化,整个程序体系都是建⽴在Java 基础上的,改动会⾮常巨⼤。
如此,只能采取⽅案三,⽽⽅案三的问题是,Flink的原版PyFlink只创建了 PythonFunctionFactory 和⼀个 ⼼跳2个 entryPoint,这对我们来说⽐较局限。所以我们会采取模仿 PythonFunctionFactory 的⽅式,⾃⼰创建Py4j进程,来完成Notebook的混编实现
这⾥的详细设计以及Demo 我们会在下篇⽂章(⼆)中放出。谢谢各位。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论