-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathtest_alock.py
More file actions
138 lines (130 loc) · 3.69 KB
/
test_alock.py
File metadata and controls
138 lines (130 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
##
# .test.test_alock - test .alock
##
import unittest
import threading
import time
from ..temporal import pg_tmp
from .. import alock
n_alocks = "select count(*) FROM pg_locks WHERE locktype = 'advisory'"
class test_alock(unittest.TestCase):
@pg_tmp
def testALockWait(self):
# sadly, this is primarily used to exercise the code paths..
ad = prepare(n_alocks).first
self.assertEqual(ad(), 0)
state = [False, False, False]
alt = new()
first = alock.ExclusiveLock(db, (0,0))
second = alock.ExclusiveLock(db, 1)
def concurrent_lock():
try:
with alock.ExclusiveLock(alt, 1):
with alock.ExclusiveLock(alt, (0,0)):
# start it
state[0] = True
while not state[1]:
pass
time.sleep(0.01)
while not state[2]:
time.sleep(0.01)
except Exception:
# Avoid dead lock in cases where advisory is not available.
state[0] = state[1] = state[2] = True
t = threading.Thread(target = concurrent_lock)
t.start()
while not state[0]:
time.sleep(0.01)
self.assertEqual(ad(), 2)
state[1] = True
with first:
self.assertEqual(ad(), 2)
state[2] = True
with second:
self.assertEqual(ad(), 2)
t.join(timeout = 1)
@pg_tmp
def testALockNoWait(self):
alt = new()
ad = prepare(n_alocks).first
self.assertEqual(ad(), 0)
with alock.ExclusiveLock(db, (0,0)):
l=alock.ExclusiveLock(alt, (0,0))
# should fail to acquire
self.assertEqual(l.acquire(blocking=False), False)
# no alocks should exist now
self.assertEqual(ad(), 0)
@pg_tmp
def testALock(self):
ad = prepare(n_alocks).first
self.assertEqual(ad(), 0)
# test a variety..
lockids = [
(1,4),
-32532, 0, 2,
(7, -1232),
4, 5, 232142423,
(18,7),
2, (1,4)
]
alt = new()
xal1 = alock.ExclusiveLock(db, *lockids)
xal2 = alock.ExclusiveLock(db, *lockids)
sal1 = alock.ShareLock(db, *lockids)
with sal1:
with xal1, xal2:
self.assertTrue(ad() > 0)
for x in lockids:
xl = alock.ExclusiveLock(alt, x)
self.assertEqual(xl.acquire(blocking=False), False)
# main has exclusives on these, so this should fail.
xl = alock.ShareLock(alt, *lockids)
self.assertEqual(xl.acquire(blocking=False), False)
for x in lockids:
# sal1 still holds
xl = alock.ExclusiveLock(alt, x)
self.assertEqual(xl.acquire(blocking=False), False)
# sal1 still holds, but we want a share lock too.
xl = alock.ShareLock(alt, x)
self.assertEqual(xl.acquire(blocking=False), True)
xl.release()
# no alocks should exist now
self.assertEqual(ad(), 0)
@pg_tmp
def testPartialALock(self):
# Validates that release is properly cleaning up
ad = prepare(n_alocks).first
self.assertEqual(ad(), 0)
held = (0,-1234)
wanted = [0, 324, -1232948, 7, held, 1, (2,4), (834,1)]
alt = new()
with alock.ExclusiveLock(db, held):
l=alock.ExclusiveLock(alt, *wanted)
# should fail to acquire, db has held
self.assertEqual(l.acquire(blocking=False), False)
# No alocks should exist now.
# This *MUST* occur prior to alt being closed.
# Otherwise, we won't be testing for the recovery
# of a failed non-blocking acquire().
self.assertEqual(ad(), 0)
@pg_tmp
def testALockParameterErrors(self):
self.assertRaises(TypeError, alock.ALock)
l = alock.ExclusiveLock(db)
self.assertRaises(RuntimeError, l.release)
@pg_tmp
def testALockOnClosed(self):
ad = prepare(n_alocks).first
self.assertEqual(ad(), 0)
held = (0,-1234)
alt = new()
# __exit__ should only touch the count.
with alock.ExclusiveLock(alt, held) as l:
self.assertEqual(ad(), 1)
self.assertEqual(l.locked(), True)
alt.close()
time.sleep(0.005)
self.assertEqual(ad(), 0)
self.assertEqual(l.locked(), False)
if __name__ == '__main__':
unittest.main()