Skip to content

Commit 5381d3d

Browse files
author
James William Pye
committed
Commit the NotificationManager.
Provides an event loop for receiving asynchronous notifications from a set of connections, or a single connection. The approach is iterative, but subsequent changes may provide a callback oriented method. Remaining items include the .notify method API. Fixes #11
1 parent e64f2a5 commit 5381d3d

7 files changed

Lines changed: 604 additions & 5 deletions

File tree

postgresql/api.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -964,9 +964,9 @@ def reset(self) -> None:
964964
"""
965965

966966
@abstractmethod
967-
def notify(self, channel, payload = None) -> None:
967+
def notify(self, *channels, **channel_and_payload) -> int:
968968
"""
969-
NOTIFY the channel with the given payload.
969+
NOTIFY the channels with the given payload.
970970
971971
Equivalent to issuing "NOTIFY <channel>" or "NOTIFY <channel>, <payload>"
972972
if a payload is given.
@@ -994,6 +994,22 @@ def listening_channels(self) -> ["channel name", ...]:
994994
Return an *iterator* to all the channels currently being listened to.
995995
"""
996996

997+
@abstractmethod
998+
def wait(self, timeout = None) -> collections.Iterator:
999+
"""
1000+
Return an iterator to the notifications received by the connection. The
1001+
iterator *must* produce triples in the form ``(channel, payload, pid)``.
1002+
1003+
If timeout is not `None`, `None` *must* be emitted at the specified
1004+
timeout interval. If the timeout is zero, all the pending notifications
1005+
*must* be yielded by the iterator and then `StopIteration` *must* be
1006+
raised.
1007+
1008+
If the connection is closed for any reason, the iterator *must* silently
1009+
stop by raising `StopIteration`. Further error control is then the
1010+
responsibility of the user.
1011+
"""
1012+
9971013
class SocketFactory(object):
9981014
@propertydoc
9991015
@abstractproperty

postgresql/documentation/changes.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,6 @@ Changes
2626
* Correct representation of PostgreSQL ARRAYs by properly recording
2727
lowerbounds and upperbounds. Internally, sub-ARRAYs have their own
2828
element lists.
29+
* Implement a NotificationManager for managing the NOTIFYs received
30+
by a connection. The class can manage NOTIFYs from multiple
31+
connections, whereas the db.wait() method is modified for single targets.

postgresql/documentation/driver.txt

Lines changed: 216 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ interfaces:
241241

242242
``unix``
243243
The unix domain socket to connect to. Exclusive with ``host`` and ``port``.
244-
Expects a string containing the absolute path to the unix domain socket to
244+
Expects a string containing the *absolute path* to the unix domain socket to
245245
connect to.
246246

247247
``settings``
@@ -366,6 +366,26 @@ The methods and properties on the connection object are ready for use:
366366
database derived object will be given to the callable. See the
367367
`Database Messages`_ section for more information.
368368

369+
``db.listen(*channels)``
370+
Start listening for asynchronous notifications in the specified channels.
371+
Sends a batch of ``LISTEN`` statements to the server.
372+
373+
``db.unlisten(*channels)``
374+
Stop listening for asynchronous notifications in the specified channels.
375+
Sends a batch of ``UNLISTEN`` statements to the server.
376+
377+
``db.listening_channels()``
378+
Return an iterator producing the channel names that are currently being
379+
listened to.
380+
381+
``db.wait(timeout = None)``
382+
Return an iterator to the NOTIFYs received on the connection. The iterator
383+
will yield notification triples consisting of ``(channel, payload, pid)``.
384+
While iterating, the connection should *not* be used in other threads.
385+
The optional timeout can be used to enable "idle" events in which `None`
386+
objects will be yielded by the iterator.
387+
See `Notification Management`_ for details.
388+
369389

370390
Connection Metadata
371391
-------------------
@@ -589,9 +609,9 @@ identifier, N+1: ``sql_parameter_types[0]`` -> ``'$1'``.
589609

590610
>>> ps = db.prepare("SELECT $1::integer AS intname, $2::varchar AS chardata")
591611
>>> ps.sql_parameter_types
592-
('integer','varchar')
612+
('INTEGER','VARCHAR')
593613
>>> ps.sql_column_types
594-
('integer','varchar')
614+
('INTEGER','VARCHAR')
595615
>>> ps.column_names
596616
('intname','chardata')
597617
>>> ps.column_types
@@ -1706,3 +1726,196 @@ details. The follow attributes are available on message objects:
17061726
``'line'``
17071727
The line of the file that emitted the message.
17081728
(*normally* server information)
1729+
1730+
1731+
.. _db_notify:
1732+
1733+
Notification Management
1734+
=======================
1735+
1736+
Relevant SQL commands: `NOTIFY <http://postgresql.org/docs/current/static/sql-notify.html>`_,
1737+
`LISTEN <http://postgresql.org/docs/current/static/sql-listen.html>`_,
1738+
`UNLISTEN <http://postgresql.org/docs/current/static/sql-unlisten.html>`_.
1739+
1740+
Asynchronous notifications offer a means for PostgreSQL to signal application
1741+
code. Often these notifications are used to signal cache invalidation. In 9.0
1742+
and greater, notifications may include a "payload" in which arbitrary data may
1743+
be delivered on a channel being listened to.
1744+
1745+
By default, received notifications will merely be appended to an internal
1746+
list on the connection object. This list will remain empty for the duration
1747+
of a connection *unless* the connection begins listening to a channel that
1748+
receives notifications.
1749+
1750+
The `postgresql.notifyman.NotificationManager` class is used to wait for
1751+
messages to come in on a set of connections, pick up the messages, and deliver
1752+
the messages to the object's user via the `collections.Iterator` protocol.
1753+
1754+
The notification manager is a simple event loop that services multiple
1755+
connections. In cases where only one connection needs to be serviced, the
1756+
`postgresql.api.Database.wait` method can be used to simplify the process.
1757+
1758+
1759+
Listening on a Single Connection
1760+
--------------------------------
1761+
1762+
The ``db.wait()`` method is a simplification of the notification manager. It
1763+
returns an iterator to the notifications received on the subject connection.
1764+
The iterator yields triples consisting of the ``channel`` being
1765+
notified, the ``payload`` sent with the notification, and the ``pid`` of the
1766+
backend that caused the notification::
1767+
1768+
>>> db.listen('for_rabbits')
1769+
>>> db.notify('for_rabbits')
1770+
>>> for x in db.wait():
1771+
... channel, payload, pid = x
1772+
... break
1773+
>>> assert channel == 'for_rabbits'
1774+
True
1775+
>>> assert payload == ''
1776+
True
1777+
>>> assert pid == db.backend_id
1778+
True
1779+
1780+
The iterator, by default, will continue listening forever unless the connection
1781+
is terminated--thus the immediate ``break`` statement in the above loop. In
1782+
cases where some additional activity is necessary, a timeout parameter may be
1783+
given to the ``wait`` method in order to allow "idle" events to occur at the
1784+
designated frequency::
1785+
1786+
>>> for x in db.wait(0.5):
1787+
... if x is None:
1788+
... break
1789+
1790+
The above example illustrates that idle events are represented using `None`
1791+
objects. Idle events are guaranteed to occur *approximately* at the
1792+
specified interval--the ``timeout`` keyword parameter. In addition to
1793+
providing a means to do other processing or polling, they also offer a safe
1794+
break point for the loop. Internally, the iterator produced by the ``wait``
1795+
method *is* a `NotificationManager`, which will localize the notifications
1796+
prior to emitting them via the iterator.
1797+
*It's not safe to break out of the loop, unless an idle event is being handled.*
1798+
If the loop is broken while a regular event is being processed, some events may
1799+
remain in the iterator. In order to consume those events, the iterator *must*
1800+
be accessible.
1801+
1802+
The iterator will be exhausted when the connection is closed, but if the
1803+
connection is closed during the loop, any remaining notifications *will*
1804+
be emitted prior to the loop ending, so it is important to be prepared to
1805+
handle exceptions or check for a closed connection.
1806+
1807+
In situations where multiple connections need to be watched, direct use of the
1808+
`NotificationManager` is necessary.
1809+
1810+
1811+
Listening on Multiple Connections
1812+
---------------------------------
1813+
1814+
The `postgresql.notifyman.NotificationManager` class is used to manage
1815+
*connections* that are expecting to receive notifications. Instances are
1816+
iterators that yield the connection object and notifications received on the
1817+
connection or `None` in the case of an idle event. The manager emits events as
1818+
a pair; the connection object that received notifications, and *all* the
1819+
notifications picked up on that connection::
1820+
1821+
>>> from postgresql.notifyman import NotificationManager
1822+
>>> # Using ``nm`` to reference the manager from here on.
1823+
>>> nm = NotificationManager(db1, db2, ..., dbN)
1824+
>>> nm.settimeout(2)
1825+
>>> for x in nm:
1826+
... if x is None:
1827+
... # idle
1828+
... break
1829+
...
1830+
... db, notifies = x
1831+
... for channel, payload, pid in notifies:
1832+
... ...
1833+
1834+
The manager will continue to wait for and emit events so long as there are
1835+
good connections available in the set; it is possible for connections to be
1836+
added and removed at any time. Although, in rare circumstances, discarded
1837+
connections may still have pending events if it not removed during an idle
1838+
event. The ``connections`` attribute on `NotificationManager` objects is a
1839+
set object that may be used directly in order to add and remove connections
1840+
from the manager::
1841+
1842+
>>> y = []
1843+
>>> for x in nm:
1844+
... if x is None:
1845+
... if y:
1846+
... nm.connections.add(y[0])
1847+
... del y[0]
1848+
...
1849+
1850+
The notification manager is resilient; if a connection dies, it will discard the
1851+
connection from the set, and add it to the set of bad connections, the
1852+
``garbage`` attribute. In these cases, the idle event *should* be leveraged to
1853+
check for these failures if that's a concern. It is the user's
1854+
responsibility to explicitly handle the failure cases, and remove the bad
1855+
connections from the ``garbage`` set::
1856+
1857+
>>> for x in nm:
1858+
... if x is None:
1859+
... if nm.garbage:
1860+
... recovered = take_out_trash(nm.garbage)
1861+
... nm.connections.update(recovered)
1862+
... nm.garbage.clear()
1863+
... db, notifies = x
1864+
... for channel, payload, pid in notifies:
1865+
... ...
1866+
1867+
Explicitly removing connections from the set can also be a means to gracefully
1868+
terminate the event loop::
1869+
1870+
>>> for x in nm:
1871+
... if x in None:
1872+
... if done_listening is True:
1873+
... nm.connections.clear()
1874+
1875+
However, doing so inside the loop is not a requirement; it is safe to remove a
1876+
connection from the set at any point.
1877+
1878+
1879+
Zero Timeout
1880+
------------
1881+
1882+
When a timeout of zero, ``0``, is configured, the notification manager will
1883+
terminate early. Specifically, each connection will be polled for any pending
1884+
notifications, and once all of the collected notifications have been emitted
1885+
by the iterator, `StopIteration` will be raised. Notably, *no* idle events will
1886+
occur when the timeout is configured to zero.
1887+
1888+
Zero timeouts offer a means for the notification "queue" to be polled. Often,
1889+
this is the appropriate way to collect pending notifications on active
1890+
connections where using the connection exclusively for waiting is not
1891+
practical::
1892+
1893+
>>> notifies = list(db.wait(0))
1894+
1895+
Or with a NotificationManager instance::
1896+
1897+
>>> nm.settimeout(0)
1898+
>>> db_notifies = list(nm)
1899+
1900+
In both cases of zero timeout, the iterator may be promptly discarded without
1901+
losing any events.
1902+
1903+
1904+
Summary of Characteristics
1905+
--------------------------
1906+
1907+
* The iterator will continue until the connections die.
1908+
* Objects yielded by the iterator are either `None`, an "idle event", or an
1909+
individual notification triple if using ``db.wait()``, or a
1910+
``(db, notifies)`` pair if using the base `NotificationManager`.
1911+
* When a connection dies or raises an exception, it will be removed from
1912+
the ``nm.connections`` set and added to the ``nm.garbage`` set.
1913+
* The NotificationManager instance will *not* hold any notifications
1914+
during an idle event.
1915+
(Idle events offer a break point in which the manager may be discarded.)
1916+
* A timeout of zero will cause the iterator to only yield the events
1917+
that are pending right now, and promptly end. However, the same manager
1918+
object may be used again.
1919+
* A notification triple is a tuple consisting of ``(channel, payload, pid)``.
1920+
* Connections may be added and removed from the ``nm.connections`` set at
1921+
any time.

postgresql/driver/pq3.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from ..protocol.message_types import message_types
3636

3737
from .pg_type import TypeIO
38+
from ..notifyman import NotificationManager
3839
from ..types import Row
3940

4041
# Map element3.Notice field identifiers
@@ -2275,6 +2276,15 @@ def unlisten(self, *channels, len = len):
22752276
qstr += '; UNLISTEN ' + x.replace('"', '""')
22762277
return self.execute(qstr)
22772278

2279+
def wait(self, timeout = None):
2280+
nm = NotificationManager(self, timeout = timeout)
2281+
for x in nm:
2282+
if x is None:
2283+
yield None
2284+
else:
2285+
for y in x[1]:
2286+
yield y
2287+
22782288
def __init__(self, connector, *args, **kw):
22792289
"""
22802290
Create a connection based on the given connector.

postgresql/lib/libsys.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,8 @@ SELECT channel FROM pg_catalog.pg_listening_channels() AS x(channel)
169169
-- listening_relations: old version of listening_channels.
170170
SELECT relname as channel FROM pg_catalog.pg_listener
171171
WHERE listenerpid = pg_catalog.pg_backend_pid();
172+
173+
[notify::first]
174+
-- 9.0 and greater
175+
SELECT COUNT(pg_catalog.pg_notify(($1::text[])[i][1], $1[i][2]) IS NULL)
176+
FROM generate_series(1, array_upper($1, 1)) AS g(i)

0 commit comments

Comments
 (0)