-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathtest_cluster.py
More file actions
79 lines (70 loc) · 1.89 KB
/
test_cluster.py
File metadata and controls
79 lines (70 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
##
# .test.test_cluster
##
import sys
import os
import time
import unittest
import tempfile
from .. import installation
from ..cluster import Cluster, ClusterStartupError
default_install = installation.default()
if default_install is None:
sys.stderr.write("ERROR: cannot find 'default' pg_config\n")
sys.stderr.write("HINT: set the PGINSTALLATION environment variable to the `pg_config` path\n")
sys.exit(1)
class test_cluster(unittest.TestCase):
def setUp(self):
self.cluster = Cluster(default_install, 'test_cluster',)
def tearDown(self):
self.cluster.drop()
self.cluster = None
def start_cluster(self, logfile = None):
self.cluster.start(logfile = logfile)
self.cluster.wait_until_started(timeout = 10)
def init(self, *args, **kw):
self.cluster.init(*args, **kw)
self.cluster.settings.update({
'max_connections' : '8',
'listen_addresses' : 'localhost',
'port' : '6543',
'silent_mode' : 'off',
})
def testSilentMode(self):
self.init()
self.cluster.settings['silent_mode'] = 'on'
# if it fails to start(ClusterError), silent_mode is not working properly.
try:
self.start_cluster(logfile = sys.stdout)
except ClusterStartupError:
# silent_mode is not supported on windows by PG.
if sys.platform in ('win32','win64'):
pass
else:
raise
else:
if sys.platform in ('win32','win64'):
self.fail("silent_mode supported on windows")
def testSuperPassword(self):
self.init(
user = 'test',
password = 'secret',
logfile = sys.stdout,
)
self.start_cluster()
c = self.cluster.connection(
user='test',
password='secret',
database='template1',
)
with c:
self.failUnless(c.prepare('select 1').first() == 1)
def testNoParameters(self):
'simple init and drop'
self.init()
self.start_cluster()
if __name__ == '__main__':
from types import ModuleType
this = ModuleType("this")
this.__dict__.update(globals())
unittest.main(this)