forked from danfengcao/binlog2sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbinlog2sql.py
More file actions
executable file
·137 lines (119 loc) · 6.76 KB
/
binlog2sql.py
File metadata and controls
executable file
·137 lines (119 loc) · 6.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os, sys, datetime
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
)
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from binlog2sql_util import command_line_args, concat_sql_from_binlogevent, create_unique_file, reversed_lines
class Binlog2sql(object):
def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, endPos=None, startTime=None,
stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False):
'''
connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave}
'''
if not startFile:
raise ValueError('lack of parameter,startFile.')
self.connectionSettings = connectionSettings
self.startFile = startFile
self.startPos = startPos if startPos else 4 # use binlog v4
self.endFile = endFile if endFile else startFile
self.endPos = endPos
self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")
self.only_schemas = only_schemas if only_schemas else None
self.only_tables = only_tables if only_tables else None
self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever)
self.binlogList = []
self.connection = pymysql.connect(**self.connectionSettings)
try:
cur = self.connection.cursor()
cur.execute("SHOW MASTER STATUS")
self.eofFile, self.eofPos = cur.fetchone()[:2]
cur.execute("SHOW MASTER LOGS")
binIndex = [row[0] for row in cur.fetchall()]
if self.startFile not in binIndex:
raise ValueError('parameter error: startFile %s not in mysql server' % self.startFile)
binlog2i = lambda x: x.split('.')[1]
for bin in binIndex:
if binlog2i(bin) >= binlog2i(self.startFile) and binlog2i(bin) <= binlog2i(self.endFile):
self.binlogList.append(bin)
cur.execute("SELECT @@server_id")
self.serverId = cur.fetchone()[0]
if not self.serverId:
raise ValueError('need set server_id in mysql server %s:%s' % (self.connectionSettings['host'], self.connectionSettings['port']))
finally:
cur.close()
def process_binlog(self):
stream = BinLogStreamReader(connection_settings=self.connectionSettings, server_id=self.serverId,
log_file=self.startFile, log_pos=self.startPos, only_schemas=self.only_schemas,
only_tables=self.only_tables, resume_stream=True)
cur = self.connection.cursor()
tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile.
ftmp = open(tmpFile ,"w")
flagLastEvent = False
eStartPos, lastPos = stream.log_pos, stream.log_pos
try:
for binlogevent in stream:
if not self.stopnever:
if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
flagLastEvent = True
elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime:
if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
continue
elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime):
break
# else:
# raise ValueError('unknown binlog file or position')
if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN':
eStartPos = lastPos
if isinstance(binlogevent, QueryEvent):
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk)
if sql:
print sql
elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent):
for row in binlogevent.rows:
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos)
if self.flashback:
ftmp.write(sql + '\n')
else:
print sql
if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
if flagLastEvent:
break
ftmp.close()
if self.flashback:
self.print_rollback_sql(tmpFile)
finally:
os.remove(tmpFile)
cur.close()
stream.close()
return True
def print_rollback_sql(self, fin):
'''print rollback sql from tmpfile'''
with open(fin) as ftmp:
sleepInterval = 1000
i = 0
for line in reversed_lines(ftmp):
print line.rstrip()
if i >= sleepInterval:
print 'SELECT SLEEP(1);'
i = 0
else:
i += 1
def __del__(self):
pass
if __name__ == '__main__':
args = command_line_args(sys.argv[1:])
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile,
startPos=args.startPos, endFile=args.endFile, endPos=args.endPos,
startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases,
only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever)
binlog2sql.process_binlog()