Skip to content

Commit b94b1aa

Browse files
author
James William Pye
committed
Implement an Advisory Lock context manager.
Adds the postgresql.alock module that provide an ABC, ALock and two subclasses that represent to the two lock modes supported by PostgreSQL's advisory locks. Fixes #5
1 parent 21b0c31 commit b94b1aa

5 files changed

Lines changed: 430 additions & 14 deletions

File tree

postgresql/alock.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
##
2+
# .alock - Advisory Locks
3+
##
4+
"""
5+
Tools for Advisory Locks
6+
"""
7+
from abc import abstractmethod, abstractproperty
8+
from .python.element import Element
9+
10+
class ALock(Element):
11+
"""
12+
Advisory Lock class for managing the acquisition and release of a sequence
13+
of PostgreSQL advisory locks.
14+
15+
ALock()'s are fairly consistent with threading.RLock()'s. They can be
16+
acquired multiple times, and they must be released the same number of times
17+
for the lock to actually be released.
18+
19+
A notably difference is that ALock's manage a sequence of lock identifiers.
20+
This means that a given ALock() may represent multiple advisory locks.
21+
"""
22+
_e_factors = ('database', 'identifiers',)
23+
_e_label = 'ALOCK'
24+
def _e_metas(self,
25+
headfmt = "{1} [{0}]".format
26+
):
27+
yield None, headfmt(self.state, self.mode)
28+
29+
@abstractproperty
30+
def mode(self):
31+
"""
32+
The mode of the lock class.
33+
"""
34+
35+
@abstractmethod
36+
def __select_statements__(self):
37+
"""
38+
ALock subclasses need to implement this in order to specify
39+
the actual statements that try, acquire, and release the locks.
40+
"""
41+
42+
@staticmethod
43+
def _split_lock_identifiers(idseq):
44+
# lame O(2)
45+
id_pairs = [
46+
list(x) if x.__class__ is not int else [None,None]
47+
for x in idseq
48+
]
49+
ids = [
50+
x if x.__class__ is int else None
51+
for x in idseq
52+
]
53+
return (id_pairs, ids)
54+
55+
def acquire(self, blocking = True, len = len):
56+
"""
57+
Acquire the locks using the configured identifiers.
58+
"""
59+
if self._count == 0:
60+
# _count is zero, so the locks need to be acquired.
61+
wait = bool(blocking)
62+
if wait:
63+
self._acquire_stmt(self._id_pairs, self._ids)
64+
else:
65+
# grab the success of each lock id. if some were
66+
# unsuccessful, then the ones that were successful need to be
67+
# released.
68+
r = self._try_stmt(self._id_pairs, self._ids)
69+
# accumulate the identifiers that *did* lock
70+
release_seq = [
71+
id for didlock, id in zip(r, self.identifiers) if didlock[0]
72+
]
73+
if len(release_seq) != len(self.identifiers):
74+
# some failed, so release the acquired and return False
75+
#
76+
# reverse in case there is another waiting for all.
77+
# that is, release last-to-first so that if another is waiting
78+
# on the same seq that it should be able to acquire all of
79+
# them once the contended lock is released.
80+
release_seq.reverse()
81+
self._release_stmt(*self._split_lock_identifiers(release_seq))
82+
# unable to acquire all.
83+
return False
84+
self._count = self._count + 1
85+
return True
86+
87+
def __enter__(self):
88+
self.acquire()
89+
return self
90+
91+
def release(self):
92+
"""
93+
Release the locks using the configured identifiers.
94+
"""
95+
if self._count < 1:
96+
raise RuntimeError("cannot release un-acquired lock")
97+
if not self.database.closed and self._count > 0:
98+
# if the database has been closed, or the count will
99+
# remain non-zero, there is no need to release.
100+
self._release_stmt(reversed(self._id_pairs), reversed(self._ids))
101+
# decrement the count nonetheless.
102+
self._count = self._count - 1
103+
104+
def __exit__(self, typ, val, tb):
105+
self.release()
106+
107+
def locked(self):
108+
"""
109+
Whether the locks have been acquired. This method is sensitive to the
110+
connection's state. If the connection is closed, it will return False.
111+
"""
112+
return (self._count > 0) and (not self.database.closed)
113+
114+
@property
115+
def state(self):
116+
return 'locked' if self.locked() else 'unlocked'
117+
118+
def __init__(self, database, *identifiers):
119+
"""
120+
Initialize the lock object to manage a sequence of advisory locks
121+
for use with the given database.
122+
"""
123+
self._count = 0
124+
self.connection = self.database = database
125+
self.identifiers = identifiers
126+
self._id_pairs, self._ids = self._split_lock_identifiers(identifiers)
127+
self._try_stmt, self._acquire_stmt, self._release_stmt = \
128+
self.__select_statements__()
129+
130+
class ShareLock(ALock):
131+
mode = 'share'
132+
def __select_statements__(self):
133+
sys = self.database.sys
134+
return (
135+
sys.try_advisory_shared,
136+
sys.acquire_advisory_shared,
137+
sys.release_advisory_shared
138+
)
139+
140+
class ExclusiveLock(ALock):
141+
mode = 'exclusive'
142+
def __select_statements__(self):
143+
sys = self.database.sys
144+
return (
145+
sys.try_advisory_exclusive,
146+
sys.acquire_advisory_exclusive,
147+
sys.release_advisory_exclusive
148+
)

postgresql/documentation/changes.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ Changes
2222
* Alter Statement.chunks() to return chunks of builtins.tuple. Being
2323
an interface intended for speed, types.Row() impedes its performance.
2424
* Fix handling of infinity values with timestamptz, timestamp, and date.
25-
Bug reported by Axel Rau.
25+
[Bug reported by Axel Rau.]
2626
* Correct representation of PostgreSQL ARRAYs by properly recording
2727
lowerbounds and upperbounds. Internally, sub-ARRAYs have their own
2828
element lists.
2929
* Implement a NotificationManager for managing the NOTIFYs received
3030
by a connection. The class can manage NOTIFYs from multiple
3131
connections, whereas the db.wait() method is modified for single targets.
32+
* Implement an ALock class for managing advisory locks using the
33+
threading.Lock APIs. [Feedback from Valentine Gogichashvili]

postgresql/documentation/driver.txt

Lines changed: 110 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -407,11 +407,6 @@ the connection is made:
407407
``db.backend_id``
408408
The process-id of the backend process.
409409

410-
``db.fileno()``
411-
Method to get the file descriptor number of the connection's socket. This
412-
attribute will not exist if the socket object does not have a ``fileno``
413-
attribute. Under normal circumstances, it will be available.
414-
415410
``db.backend_start``
416411
When backend was started. ``datetime.datetime`` instance.
417412

@@ -421,8 +416,14 @@ the connection is made:
421416
``db.client_port``
422417
The port of the client that the backend is communicating with.
423418

424-
The latter three are collected from pg_stat_activity. If this information is
425-
unavailable, the attributes will be `None`.
419+
``db.fileno()``
420+
Method to get the file descriptor number of the connection's socket. This
421+
method will return `None` if the socket object does not have a ``fileno``
422+
attribute. Under normal circumstances, it will return an `int`.
423+
424+
The ``backend_start``, ``client_address``, and ``client_port`` are collected
425+
from pg_stat_activity. If this information is unavailable, the attributes will
426+
be `None`.
426427

427428

428429
Prepared Statements
@@ -461,8 +462,8 @@ be explicitly closed if the statement is to be discarded.
461462
Statement objects are one-time objects. Once closed, they can no longer be used.
462463

463464

464-
Prepared Statement Interface Points
465-
-----------------------------------
465+
Statement Interface Points
466+
--------------------------
466467

467468
Prepared statements can be executed just like functions:
468469

@@ -1911,11 +1912,109 @@ Summary of Characteristics
19111912
* When a connection dies or raises an exception, it will be removed from
19121913
the ``nm.connections`` set and added to the ``nm.garbage`` set.
19131914
* The NotificationManager instance will *not* hold any notifications
1914-
during an idle event.
1915-
(Idle events offer a break point in which the manager may be discarded.)
1915+
during an idle event. Idle events offer a break point in which the manager
1916+
may be discarded.
19161917
* A timeout of zero will cause the iterator to only yield the events
19171918
that are pending right now, and promptly end. However, the same manager
19181919
object may be used again.
19191920
* A notification triple is a tuple consisting of ``(channel, payload, pid)``.
19201921
* Connections may be added and removed from the ``nm.connections`` set at
19211922
any time.
1923+
1924+
1925+
Advisory Locks
1926+
==============
1927+
1928+
`Explicit Locking in PostgreSQL <http://www.postgresql.org/docs/current/static/explicit-locking.html#ADVISORY-LOCKS>`_.
1929+
1930+
PostgreSQL's advisory locks offers a cooperative synchronization primitive.
1931+
These are used in cases where an application needs access to a resource, but
1932+
using table locks may cause interference with other operations that can be
1933+
safely performed alongside the application-level, exclusive operation.
1934+
1935+
Advisory locks can be used by directly executing the stored procedures in the
1936+
database or by using the :class:`postgresql.alock.ALock` subclasses, which
1937+
provides a context manager that uses those stored procedures.
1938+
1939+
Currently, only two subclasses exists, each representing the lock mode
1940+
supported by PostgreSQL's advisory locks:
1941+
1942+
* :class:`postgresql.alock.ShareLock`
1943+
* :class:`postgresql.alock.ExclusiveLock`
1944+
1945+
1946+
Acquiring ALocks
1947+
----------------
1948+
1949+
An ALock instance represents a sequence of advisory locks. A single ALock can
1950+
acquire and release multiple advisory locks by creating the instance with
1951+
multiple lock identifiers::
1952+
1953+
>>> from postgresql import alock
1954+
>>> table1_oid = 192842
1955+
>>> table2_oid = 192849
1956+
>>> l = alock.ExclusiveLock(db, (table1_oid, 0), (table2_oid, 0))
1957+
>>> l.acquire()
1958+
>>> ...
1959+
>>> l.release()
1960+
1961+
:class:`postgresql.alock.ALock` is similar to :class:`threading.RLock`; in
1962+
order for an ALock to be released, it must be released the number of times it
1963+
has been acquired. ALocks are associated with and survived by their session.
1964+
Much like how RLocks are associated with the thread they are acquired in:
1965+
acquiring an ALock again will merely increment its count.
1966+
1967+
PostgreSQL allows advisory locks to be identified using a pair of `int4` or a
1968+
single `int8`. ALock instances represent a *sequence* of those identifiers::
1969+
1970+
>>> from postgresql import alock
1971+
>>> ids = [(0,0), 0, 1]
1972+
>>> with alock.ShareLock(db, *ids):
1973+
... ...
1974+
1975+
Both types of identifiers may be used within the same ALock, and, regardless of
1976+
their type, will be aquired in the order that they were given to the class'
1977+
constructor. In the above example, ``(0,0)`` is acquired first, then ``0``, and
1978+
lastly ``1``.
1979+
1980+
`postgresql.alock.ALock` subclasses:
1981+
1982+
``postgresql.alock.ExclusiveLock(database, *identifiers)``
1983+
Instantiate an ALock object representing the `identifiers` for use with the
1984+
`database`. Exclusive locks will conflict with other exclusive locks and share
1985+
locks.
1986+
1987+
``postgresql.alock.ShareLock(database, *identifiers)``
1988+
Instantiate an ALock object representing the `identifiers` for use with the
1989+
`database`. Share locks can be acquired when a share lock with the same
1990+
identifier has been acquired by another backend. However, an exclusive lock
1991+
with the same identifier will conflict.
1992+
1993+
1994+
Advisory Lock Interface Points
1995+
------------------------------
1996+
1997+
Methods and properties available on :class:`postgresql.alock.ALock` instances:
1998+
1999+
``alock.acquire(blocking = True)``
2000+
Acquire the advisory locks represented by the ``alock`` object. If blocking is
2001+
`True`, the default, the method will block until locks on *all* the
2002+
identifiers have been acquired.
2003+
2004+
If blocking is `False`, acquisition may not block, and success will be
2005+
indicated by the returned object: `True` if *all* lock identifiers were
2006+
acquired and `False` if any of the lock identifiers could not be acquired.
2007+
2008+
``alock.release()``
2009+
Release the advisory locks represented by the ``alock`` object. If the lock
2010+
has not been acquired, a `RuntimeError` will be raised.
2011+
2012+
``alock.locked()``
2013+
Returns a boolean describing whether the locks are held or not. This will
2014+
return `False` if the lock connection has been closed.
2015+
2016+
``alock.__enter__()``
2017+
Alias to ``acquire``; context manager protocol. Always blocking.
2018+
2019+
``alock.__exit__(typ, val, tb)``
2020+
Alias to ``release``; context manager protocol.

postgresql/lib/libsys.sql

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,5 +172,75 @@ WHERE listenerpid = pg_catalog.pg_backend_pid();
172172

173173
[notify::first]
174174
-- 9.0 and greater
175-
SELECT COUNT(pg_catalog.pg_notify(($1::text[])[i][1], $1[i][2]) IS NULL)
176-
FROM generate_series(1, array_upper($1, 1)) AS g(i)
175+
SELECT
176+
COUNT(pg_catalog.pg_notify(($1::text[])[i][1], $1[i][2]) IS NULL)
177+
FROM
178+
pg_catalog.generate_series(1, array_upper($1, 1)) AS g(i)
179+
180+
[release_advisory_shared]
181+
SELECT
182+
CASE WHEN ($2::int8[])[i] IS NULL
183+
THEN
184+
pg_catalog.pg_advisory_unlock_shared(($1::int4[])[i][1], $1[i][2])
185+
ELSE
186+
pg_catalog.pg_advisory_unlock_shared($2[i])
187+
END AS released
188+
FROM
189+
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
190+
191+
[acquire_advisory_shared]
192+
SELECT COUNT((
193+
CASE WHEN ($2::int8[])[i] IS NULL
194+
THEN
195+
pg_catalog.pg_advisory_lock_shared(($1::int4[])[i][1], $1[i][2])
196+
ELSE
197+
pg_catalog.pg_advisory_lock_shared($2[i])
198+
END
199+
) IS NULL) AS acquired
200+
FROM
201+
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
202+
203+
[try_advisory_shared]
204+
SELECT
205+
CASE WHEN ($2::int8[])[i] IS NULL
206+
THEN
207+
pg_catalog.pg_try_advisory_lock_shared(($1::int4[])[i][1], $1[i][2])
208+
ELSE
209+
pg_catalog.pg_try_advisory_lock_shared($2[i])
210+
END AS acquired
211+
FROM
212+
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
213+
214+
[release_advisory_exclusive]
215+
SELECT
216+
CASE WHEN ($2::int8[])[i] IS NULL
217+
THEN
218+
pg_catalog.pg_advisory_unlock(($1::int4[])[i][1], $1[i][2])
219+
ELSE
220+
pg_catalog.pg_advisory_unlock($2[i])
221+
END AS released
222+
FROM
223+
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
224+
225+
[acquire_advisory_exclusive]
226+
SELECT COUNT((
227+
CASE WHEN ($2::int8[])[i] IS NULL
228+
THEN
229+
pg_catalog.pg_advisory_lock(($1::int4[])[i][1], $1[i][2])
230+
ELSE
231+
pg_catalog.pg_advisory_lock($2[i])
232+
END
233+
) IS NULL) AS acquired -- Guaranteed to be acquired once complete.
234+
FROM
235+
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
236+
237+
[try_advisory_exclusive]
238+
SELECT
239+
CASE WHEN ($2::int8[])[i] IS NULL
240+
THEN
241+
pg_catalog.pg_try_advisory_lock(($1::int4[])[i][1], $1[i][2])
242+
ELSE
243+
pg_catalog.pg_try_advisory_lock($2[i])
244+
END AS acquired
245+
FROM
246+
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)

0 commit comments

Comments
 (0)