forked from danfengcao/binlog2sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbinlog2sql.py
More file actions
executable file
·125 lines (109 loc) · 6.45 KB
/
binlog2sql.py
File metadata and controls
executable file
·125 lines (109 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os, sys, datetime
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
)
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from binlog2sql_util import command_line_args, concat_sql_from_binlogevent, create_unique_file, reversed_lines
class Binlog2sql(object):
def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, endPos=None, startTime=None,
stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False):
'''
connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave}
'''
if not startFile:
raise ValueError('lack of parameter,startFile.')
self.connectionSettings = connectionSettings
self.startFile = startFile
self.startPos = startPos if startPos else 4 # use binlog v4
self.endFile = endFile if endFile else startFile
self.endPos = endPos
self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")
self.only_schemas = only_schemas if only_schemas else None
self.only_tables = only_tables if only_tables else None
self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever)
self.binlogList = []
self.connection = pymysql.connect(**self.connectionSettings)
try:
cur = self.connection.cursor()
cur.execute("SHOW MASTER STATUS")
self.eofFile, self.eofPos = cur.fetchone()[:2]
cur.execute("SHOW MASTER LOGS")
binIndex = [row[0] for row in cur.fetchall()]
if self.startFile not in binIndex:
raise ValueError('parameter error: startFile %s not in mysql server' % self.startFile)
binlog2i = lambda x: x.split('.')[1]
for bin in binIndex:
if binlog2i(bin) >= binlog2i(self.startFile) and binlog2i(bin) <= binlog2i(self.endFile):
self.binlogList.append(bin)
cur.execute("SELECT @@server_id")
self.serverId = cur.fetchone()[0]
if not self.serverId:
raise ValueError('need set server_id in mysql server %s:%s' % (self.connectionSettings['host'], self.connectionSettings['port']))
finally:
cur.close()
def process_binlog(self):
stream = BinLogStreamReader(connection_settings=self.connectionSettings, server_id=self.serverId,
log_file=self.startFile, log_pos=self.startPos, only_schemas=self.only_schemas,
only_tables=self.only_tables, resume_stream=True)
cur = self.connection.cursor()
tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile.
ftmp = open(tmpFile ,"w")
flagLastEvent = False
eStartPos, lastPos = stream.log_pos, stream.log_pos
try:
for binlogevent in stream:
if not self.stopnever:
if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
flagLastEvent = True
elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime:
if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
continue
elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime):
break
# else:
# raise ValueError('unknown binlog file or position')
if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN':
eStartPos = lastPos
if isinstance(binlogevent, QueryEvent):
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk)
if sql:
print sql
elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent):
for row in binlogevent.rows:
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos)
if self.flashback:
ftmp.write(sql + '\n')
else:
print sql
if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
if flagLastEvent:
break
ftmp.close()
if self.flashback:
with open(tmpFile) as ftmp:
for line in reversed_lines(ftmp):
print line.rstrip()
finally:
os.remove(tmpFile)
cur.close()
stream.close()
return True
def __del__(self):
pass
if __name__ == '__main__':
args = command_line_args(sys.argv[1:])
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile,
startPos=args.startPos, endFile=args.endFile, endPos=args.endPos,
startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases,
only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever)
binlog2sql.process_binlog()