-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlexer.py
More file actions
169 lines (146 loc) · 5.86 KB
/
lexer.py
File metadata and controls
169 lines (146 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""SQL Lexer"""
import re
from threading import Lock
from io import TextIOBase
from sqlparse import tokens, keywords
from sqlparse.utils import consume
class Lexer:
"""The Lexer supports configurable syntax.
To add support for additional keywords, use the `add_keywords` method."""
_default_instance = None
_lock = Lock()
@classmethod
def get_default_instance(cls):
"""Returns the lexer instance used internally
by the sqlparse core functions."""
with cls._lock:
if cls._default_instance is None:
cls._default_instance = cls()
cls._default_instance.default_initialization()
return cls._default_instance
def default_initialization(self):
"""Initialize the lexer with default dictionaries.
Useful if you need to revert custom syntax settings."""
self.clear()
self.add_keywords(keywords.KEYWORDS)
self.add_keywords(keywords.KEYWORDS_COMMON)
self.add_keywords(keywords.KEYWORDS_ORACLE)
self.add_keywords(keywords.KEYWORDS_PLPGSQL)
self.add_keywords(keywords.KEYWORDS_HQL)
self.add_keywords(keywords.KEYWORDS_MSACCESS)
def clear(self):
"""Clear all syntax configurations.
Useful if you want to load a reduced set of syntax configurations.
After this call, regexps and keyword dictionaries need to be loaded
to make the lexer functional again."""
self._keywords = []
self._SQL_REGEX = []
def set_SQL_REGEX(self, SQL_REGEX):
"""Set the list of regex that will parse the SQL."""
self._SQL_REGEX = SQL_REGEX
def add_keywords(self, keywords):
"""Add keyword dictionaries. Keywords are looked up in the same order
that dictionaries were added."""
self._keywords.append(keywords)
def is_keyword(self, value):
"""Checks for a keyword.
If the given value is in one of the KEYWORDS_* dictionary
it's considered a keyword. Otherwise, tokens.Name is returned.
"""
val = value.upper()
for kwdict in self._keywords:
if val in kwdict:
return kwdict[val]
return tokens.Name
def get_tokens(self, text, encoding=None):
"""
Return an iterable of (tokentype, value) pairs generated from
`text`. If `unfiltered` is set to `True`, the filtering mechanism
is bypassed even if filters are defined.
Also preprocess the text, i.e. expand tabs and strip it if
wanted and applies registered filters.
Split ``text`` into (tokentype, text) pairs.
``stack`` is the initial stack (default: ``['root']``)
"""
if isinstance(text, TextIOBase):
text = text.read()
if encoding is not None:
if isinstance(text, str):
text = text.encode(encoding)
text = text.decode(encoding)
iterable = enumerate(text)
for pos, char in iterable:
# Handle whitespace
if char.isspace():
end = pos + 1
while end < len(text) and text[end].isspace():
end += 1
consume(iterable, end - pos - 1)
yield tokens.Whitespace, text[pos:end]
continue
# Handle comments
if char == '-' and text[pos + 1] == '-':
end = text.find('\n', pos)
if end == -1:
end = len(text)
consume(iterable, end - pos - 1)
yield tokens.Comment.Single, text[pos:end]
continue
# Handle string literals
if char in ('"', "'"):
end = pos + 1
escaped = False
while end < len(text):
if text[end] == char and not escaped:
break
if text[end] == '\\':
escaped = not escaped
else:
escaped = False
end += 1
if end < len(text):
end += 1
consume(iterable, end - pos - 1)
yield tokens.String, text[pos:end]
continue
# Handle numbers
if char.isdigit():
end = pos + 1
while end < len(text) and (text[end].isdigit() or text[end] == '.'):
end += 1
consume(iterable, end - pos - 1)
yield tokens.Number, text[pos:end]
continue
# Handle identifiers and keywords
if char.isalpha() or char == '_' or char == '$':
end = pos + 1
while end < len(text) and (text[end].isalnum() or text[end] in '_$'):
end += 1
word = text[pos:end]
consume(iterable, end - pos - 1)
if word.upper() in ('ASC', 'DESC'):
yield tokens.Keyword.Order, word
else:
yield self.is_keyword(word), word
continue
# Handle operators and punctuation
if char in '+-*/%<>=!|&~^':
end = pos + 1
while end < len(text) and text[end] in '+-*/%<>=!|&~^':
end += 1
consume(iterable, end - pos - 1)
yield tokens.Operator, text[pos:end]
continue
# Handle punctuation
if char in '()[]{},;.':
yield tokens.Punctuation, char
continue
# Handle unknown characters
yield tokens.Error, char
def tokenize(sql, encoding=None):
"""Tokenize sql.
Tokenize *sql* using the :class:`Lexer` and return a 2-tuple stream
of ``(token type, value)`` items.
"""
lexer = Lexer.get_default_instance()
return lexer.get_tokens(sql, encoding)