使⽤Python和Oracle数据库实现⾼并发性
了解如何借助线程和并发性提升⽀持 Oracle 数据库的 Python 应⽤程序的吞吐量和响应性。
随着趋势发展的核⼼转向更多⽽不是更快发展,最⼤限度地提⾼并发性的重要性⽇益凸显。并发性使得编程模式发⽣了新的转变,可以编写异步代码,从⽽将多个任务分散到⼀组线程或进程中并⾏⼯作。如果您不是编程新⼿并且很熟悉 C 或 C++,您可能已经对线程和进程有所了解,并且知道它们之间的区别。在进⾏并发编程时,线程提供了进程的轻量级替代物,在⼤多数情况下多线程较多进程更受青睐。因此,本⽂将讨论如何通过多线程来实现并发性。
与很多其他编程语⾔⼀样,在使⽤多 CPU 计算机时将占⽤⼤量 CPU 的任务分散到 Python 中的多个线程中(可以使⽤ Python 标准库中的多进程模块实现)可以提⾼性能。对于单处理器计算机,这样确实可以并⾏运⾏多个操作,⽽不是只能在任务间切换且在任何指定时间只能执⾏⼀个任务。相反,在将多线程的 Python 程序移到⼀个多 CPU 计算机时,由于全局解释器锁 (GIL) 的原因您不会注意到任何性能提
升,Python 使⽤ GIL 保护内部数据结构,确保在⼀次只有⼀个线程运⾏ CPython 虚拟机。
但是,您可能仍然有兴趣向⽀持数据库的 Python 程序中添加线程以加快其速度。关键是 Python 与之交
互的底层数据库很可能安装在并⾏处理提交的查询的⾼性能服务器上。这意味着您可以从提交多个查询到数据库服务器并在单独的线程中并⾏进⾏的操作中受益,⽽不是在⼀个线程中⼀个接⼀个地按顺序发出查询。
要注意的是:尽管利⽤任务⾃⾝的并⾏性可以显著提升应⽤程序性能,但是我们必须认识到,不是所有任务都可并⾏执⾏。例如,在客户请求的操作(例如转账)完成之前,您⽆法向客户发出确认电⼦邮件。很显然,此类任务必须按特定顺序执⾏。
另外,构建多线程代码时还要记住,某些并⾏运⾏的线程可能同时尝试更改共享的对象,这可能导致数据丢失、数据残缺,甚⾄损坏正在更改的对象。要避免此问题,应该控制对共享对象的访问,使得⼀个线程⼀次只能使⽤⼀个此类对象。幸运的是,利⽤ Python 可以实施⼀个锁定机制来同步对共享对象的访问(利⽤线程模块中的锁定⼯具)。
使⽤锁定的缺点是损失了可扩展性。设计可扩展性时,不要忘记,对⼀个线程内的某个资源进⾏锁定将使该资源在所有其他正在运⾏的线程和进程中不可⽤,直⾄该锁定被释放为⽌。因此,要确保⾼效的资源管理,不应过多地使⽤锁定,尽可能避免锁定,如果需要使⽤锁定也要尽可能早地释放该锁定。
幸运的是,当您处理存储在 Oracle 数据库中的资源时不必担⼼锁定。这是因为,在并发环境中对共享
数据提供访问时,Oracle 数据库将使⽤其⾃⾝的后台锁定机制。因此,通常较好的做法是将共享数据存储在 Oracle 数据库中,从⽽由 Oracle 数据库处理并发性问题。
异步执⾏操作也是实现可扩展性和受益于并发性的较好⽅式。在异步编程中,阻塞代码排队等待稍后单独的线程完成,从⽽确保您的应⽤程序可以继续执⾏其他任务。使⽤异步框架(如 Twisted)可以极⼤地简化构建异步应⽤程序的任务。
本⽂将简单介绍如何使⽤ Python 和 Oracle 数据库构建并发应⽤程序,描述如何使⽤ Python 代码利⽤线程与 Oracle 数据库交互,并解释如何将 SQL 查询并⾏提交到数据库服务器⽽不是依次处理。您还将了解如何让 Oracle 数据库处理并发性问题以及如何利⽤ Python 事件驱动的框架 Twisted。
Python 中的多线程编程
线程是并⾏处理中的⼀个⾮常有⽤的特性。如果您的⼀个程序正在执⾏耗时的操作并且可以将其分成若⼲个独⽴的任务并⾏执⾏,那么使⽤线程可以帮助您构建更加⾼效、快速的代码。多线程的另⼀个有趣的⽤处是可以提⾼应⽤程序的响应能⼒ — 在后台执⾏耗时操作的同时,主程序仍然可以做出响应。
当长时间运⾏的 SQL 语句彼此并⽆关联并且可以并⾏执⾏时,将这些语句封装到 Python 的不同线程
中是不错的做法。例如,如果 Web 页⾯将初始的 SQL 查询并⾏提交到数据库服务器⽽不是按顺序处理它们(使它们⼀个接⼀个地排队等待),则可显著减少 Web 页⾯的加载时间。
当您需要将某些⼤对象 (LOB) 上载到数据库时,也会发现线程很有⽤。以并⾏⽅式执⾏此操作不仅可以减少将 LOB 上载到数据库所需的整体时间,还可以在后台进⾏并⾏上载的同时保持程序主线程的响应能⼒。
假设您需要将⼏个⼆进制⼤对象 (BLOB) 上载到数据库并将其保存到 blob_tab 表(您可能已经在⾃定义数据库模式中创建了该表),如下所⽰:
CREATE TABLE blob_tab(
id NUMBER PRIMARY KEY,
blobdoc BLOB
);
CREATE SEQUENCE blob_seq;
CREATE SEQUENCE blob_seq;
⾸先,我们来了解⼀下如何不利⽤线程将 BLOB ⼀个接⼀个地存储到 blob_tab 表中。以下 Python 脚本可以完成该任务,永久保存分别使⽤⽂件名和 URL 获得的两个输⼊图像。该⽰例假设您已经在 usr/pswd ⾃定义数据库模式中创建了 blob_tab 表和 blob_seq 序列:
#File: singlethread.py
#Storing BLOBs in a single thread sequentially, one after another
import cx_Oracle
from urllib import urlopen
inputs = []
#if you?¯re a Windows user, the path could be 'c:/temp/figure1.bmp'
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('localhost/mypictures/figure2.bmp', 'rb'))
#obtaining a connection and predefining a memory area for a BLOB
dbconn = t('usr', 'pswd', '127.0.0.1/XE')
dbconn.autocommit = True
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
#executing INSERT statements saving BLOBs to the database
for input in inputs:
blobdoc = ad()
input.close()
dbconn.close()
尽管获取和存储 figure1.bmp 和 figure2.bmp 的任务在此处⼀个接⼀个地进⾏,但是,您可能已经猜到,这些任务实际上并不存在顺序上的先后关联性。因此,您可以重构上述代码,使其在单个线程中读取和存储每个图像,从⽽通过并⾏处理提升性能。在这种特殊的情况下值得⼀提的是,您不必协调并⾏运⾏的线程,从⽽可以极⼤地简化编码。
以下⽰例显⽰了如何利⽤⾯向对象的⽅法重新编写上述脚本以使⽤线程。具体来说,该⽰例说明了如何从 threading 模块扩展 Thread 类,针对特定任务对其进⾏⾃定义。
#File: multithread.py
#Storing BLOBs in separate threads in parallel
import cx_Oracle
import threading
from urllib import urlopen
#subclass of threading.Thread
class AsyncBlobInsert(threading.Thread):
def __init__(self, cur, input):
threading.Thread.__init__(self)
self.cur = cur
self.cur = cur
self.input = input
def run(self):
blobdoc = ad()
ute("INSERT INTO blob_tab (ID, BLOBDOC) VALUES(blob_seq.NEXTVAL, :blobdoc)", {'blobdoc':blobdoc})
self.input.close()
self.cur.close()
#main thread starts here
inputs = []
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('localhost/_figure2.bmp', 'rb'))
dbconn = t('usr', 'pswd', '127.0.0.1/XE',threaded=True)
dbconn.autocommit = True
for input in inputs:
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
th = AsyncBlobInsert(cur, input)
th.start()
在上述代码中,注意 threaded 属性的使⽤,该属性作为参数传递到 t ⽅法。通过将
其设置为 true,您指⽰ Oracle 数据库使⽤ OCI_THREADED 模式(⼜称为 threaded 模式),从⽽指明应⽤程序正在多线程环境中运⾏。请注意,在此处针对单线程应⽤程序使⽤ threaded 模式并不是⼀种好的做法。根据 cx_Oracle ⽂档,在单线程应⽤程序中将 threaded 参数设置为 true 将使性能下降 10% 到15%。
在本⽰例中,您将在两个线程间共享⼀个连接,但是将为每个线程创建⼀个单独的游标对象。此处,读取 BLOB 然后将其插⼊数据库的操作是在 threading.Thread 标准 Python 类中 AsyncBlobInsert ⾃定义⼦类的改写的 run ⽅法中实现的。因此,要在单独的线程中开始上载BLOB,您只需创建⼀个 AsyncBlobInsert 实例,然后调⽤其 start ⽅法。
这⾥要讨论⼀个与脚本有关的问题。执⾏时,它不会等到正在启动的线程完成 — 启动⼦线程后主线程将结束,不会等到⼦线程完成。如果您并不希望这样⽽是希望程序仅在所有线程都完成后再结束,那么您可以在脚本末尾调⽤每个 AsyncBlobInsert 实例的 join ⽅法。这将阻塞主线程,使其等待⼦线程的完成。对前⾯的脚本进⾏修改,使其等待 for 循环中启动的所有线程完成,如下所⽰:
...
th = []
for i, input in enumerate(inputs):
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
th.append(AsyncBlobInsert(cur, input))
th[i].start()
#main thread waits until all child threads are done
for t in th:
t.join()
下⼀节中提供了需要强制主线程等待⼦线程完成的⽰例。
同步对共享资源的访问
前⾯的⽰例显⽰了⼀个多线程的 Python 应⽤程序,该程序处理⼏个彼此并⽆关联的任务,因此很容易分离并放到不同的线程中进⾏并⾏处理。但是在实际中,您经常需要处理彼此相互关联的操作,并且需要在某个时刻进⾏同步。
作为单个进程的⼀部分,线程共享相同的全局内存,因此可以通过共享资源(如变量、类实例、流和⽂件)在彼此之间传递信息。但是,这种在线程间交换信息的简单⽅法是有条件的 — 当修改的对象可以同时在另⼀线程中访问和/或修改时,您确实要⾮常谨慎。因此,如果能够避免冲突,使⽤⼀个机制来同步对共享数据的访问,这将是很有⽤的。
为帮助解决这⼀问题,Python 允许您指定锁定,然后可以由某个线程取得该锁定以确保对该线程中您所使⽤的数据结构进⾏独占访问。Threading 模块附带有 Lock ⽅法,您可以使⽤该⽅法指定锁定。但是请注意,使⽤ threading.Lock ⽅法指定的锁定最初处于未锁定状态。要锁定⼀个分配的锁,您需要显式调⽤该锁定对象的 acquire ⽅法。之后,可以对需要锁定的对象执⾏操作。例如,当向线程中的 stdout 标准输出流进⾏写⼊时,您可能需要使⽤锁,以免其他使⽤ stdout 的线程发⽣重叠。进⾏此操作后,您需要使⽤锁定对象的 release ⽅法释放该锁,以使释放的数据结构可⽤于其他线程中的进⼀步处理。
关于锁定要注意的是,它们并不绑定到单个线程。在⼀个线程中指定的锁,可以由另⼀个线程获得,并由第三个线程释放。以下脚本例举了实际操作中的⼀个简单的锁。此处,为在⼦线程中进⾏使⽤,您在主线程中指定了⼀个锁,在向 DOM ⽂档写⼊之前获得它,然后⽴即释放。
#File: synchmultithread.py
connect和join的区别
#Using locks for synchronization in a multithreaded script
import sys
import cx_Oracle
import threading
from xml.dom.minidom import parseString
from urllib import urlopen
#subclass of threading.Thread
class SynchThread(threading.Thread):
def __init__(self, cur, query, dom):
threading.Thread.__init__(self)
self.cur = cur
self.query = query[1]
self.tag = query[0]
self.dom = dom
def run(self):
ute(self.query)
rslt = self.cur.fetchone()[0]
self.cur.close()
mutex.acquire()
sal = ElementsByTagName('salary')[0]
newtag = ateElement(self.tag)
newtext = ateTextNode('%s'%rslt)
newtag.appendChild(newtext)
sal.appendChild(newtag)
#main thread starts here
domdoc = parseString('<employees><salary/></employees>') dbconn = t('hr', 'hr', '127.0.0.1/XE',threaded=True) mutex = threading.Lock()
queries = {}
queries['avg'] = "SELECT AVG(salary) FROM employees"
queries['max'] = "SELECT MAX(salary) FROM employees"
th = []
for i, query in enumerate(queries.items()):
cur = dbconn.cursor()
th.append(SynchThread(cur, query, domdoc))
th[i].start()
#forcing the main thread to wait until all child threads are done
for t in th:
t.join()
#printing out the result xml document
domdoc.writexml(sys.stdout)
在上⾯的脚本中,您⾸先在主线程中创建了⼀个⽂档对象模型 (DOM) ⽂档对象,然后在并⾏运⾏的⼦线程中修改该⽂档,添加包含从数据库获取的信息的标签。此处,您将针对 HR 演⽰模式中的 employees 表使⽤了两个简单的查询。为避免在向 DOM 对象并⾏写⼊期间发⽣冲突,您需要在每个⼦线程中获取在主线程中指定的锁。⼀个⼦线程获得该锁后,另⼀个⼦线程将⽆法修改此处处理的 DOM 对象,直⾄第⼀个线程释放该锁。
然后,您可以使⽤主线程同步在各⼦线程中对 DOM 对象所做的更新,在主线程中调⽤每个⼦线程对象的 join ⽅法。之后,您可以在主流中对 DOM ⽂档对象进⾏进⼀步处理。在该特定⽰例中,您只是将其写⼊ stdout 标准输出流。
因此,您可能已经注意到,此处展⽰的⽰例实际上并没有讨论如何锁定数据库访问操作,例如,发出查询或针对并⾏线程中的同⼀数据库表进⾏更新。实际上,Oracle 数据库有⾃⼰的强⼤锁定机制,可确保并发环境中的数据完整性。⽽您的任务是正确使⽤这些机制。下⼀节中,我们将讨论如何利⽤ Oracle 数据库特性控制对共享数据的并发访问,从⽽让数据库处理并发性问题。
使 Oracle 数据库管理并发性
如上所述,当对存储在 Oracle 数据库中的共享数据进⾏访问或操作时,您不必在 Python 代码中⼿动实施资源锁定。为解决并发性问
题,Oracle 数据库根据事务概念在后台使⽤不同类型的锁和多版本并发性控制系统。在实际操作中,这意味着,您只需考虑如何正确利⽤事务以确保正确访问、更新或更改数据库数据。具体来说,您必须谨慎地在⾃动提交事务模式和⼿动提交事务模式之间做出选择,将多个 SQL 语句组合到⼀个事务中时也需⼩⼼仔细。最后,必须避免发⽣并发事务间的破坏性交互。
在这⾥,需要记住的是,您在 Python 代码中使⽤的事务与连接⽽⾮游标相关联,这意味着您可以轻松地按照逻辑将使⽤不同游标但通过相同连接执⾏的语句组合到⼀个事务中。但是,如果您希望实施两个并发事务,则需要创建两个不同的连接对象。
在前⾯的“Python 中的多线程编程”⼀节中讨论的多线程⽰例中,您将连接对象的 autocommit 模式设置为 true,从⽽指⽰ cx_Oracle 模块在每个 INSERT 语句后隐式执⾏ COMMIT。在这种特定情况下,使⽤⾃动提交模式是合理的,因为这样可以避免⼦线程和主线程间的同步,从⽽可以在主线程中⼿动执⾏ COMMIT,如下所⽰:
...
#main thread waits until all child threads are done
for t in th:
t.join()
#and then issues a commit
dbconnmit()
但是,在有些情况下,您需要⽤到上述⽅案。考虑以下⽰例。假设您在两个并⾏线程中分别执⾏以下两个操作。在⼀个线程中,您将采购订单⽂档保存到数据库中,包括订单详细信息。在另⼀个线程中,您对包含该订单中涉及产品的相关信息的表进⾏修改,更新可供购买的产品数量。
很显然,上述两个操作必须封装到⼀个事务中。为此,您必须关闭 autocommit 模式,该模式为默认模式。此外,您还将需要使⽤主线程同步并⾏线程,然后显式执⾏ COMMIT,如上述代码段所⽰。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。