Skip to content

Commit 6e00f4d

Browse files
authored
Merge pull request danfengcao#22 from danfengcao/danfengcao-patch
Danfengcao patch
2 parents bb09b8f + 9a14642 commit 6e00f4d

5 files changed

Lines changed: 271 additions & 210 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
*~
2+
.idea/
3+
14
# Byte-compiled / optimized / DLL files
25
__pycache__/
36
*.py[cod]

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ binlog2sql
1616
正常维护。应用于大众点评线上环境。线上环境的操作,请在对MySQL**相当熟悉**的同学指导下进行
1717

1818
* 已测试环境
19-
* Python 2.6, 2.7
19+
* Python 2.6, 2.7, 3.4
2020
* MySQL 5.6
2121

2222

binlog2sql/binlog2sql.py

Lines changed: 92 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4-
import os, sys, datetime
4+
import sys
5+
import datetime
56
import pymysql
67
from pymysqlreplication import BinLogStreamReader
78
from pymysqlreplication.row_event import (
@@ -10,128 +11,129 @@
1011
DeleteRowsEvent,
1112
)
1213
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
13-
from binlog2sql_util import command_line_args, concat_sql_from_binlogevent, create_unique_file, reversed_lines
14+
from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, \
15+
temp_open, print_rollback_sql
16+
1417

1518
class Binlog2sql(object):
1619

17-
def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, endPos=None, startTime=None,
18-
stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False):
19-
'''
20-
connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave}
21-
'''
22-
if not startFile:
23-
raise ValueError('lack of parameter,startFile.')
24-
25-
self.connectionSettings = connectionSettings
26-
self.startFile = startFile
27-
self.startPos = startPos if startPos else 4 # use binlog v4
28-
self.endFile = endFile if endFile else startFile
29-
self.endPos = endPos
30-
self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
31-
self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")
20+
def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None,
21+
start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False,
22+
flashback=False, stop_never=False):
23+
"""
24+
conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'}
25+
"""
26+
27+
if not start_file:
28+
raise ValueError('Lack of parameter: start_file')
29+
30+
self.conn_setting = connection_settings
31+
self.start_file = start_file
32+
self.start_pos = start_pos if start_pos else 4 # use binlog v4
33+
self.end_file = end_file if end_file else start_file
34+
self.end_pos = end_pos
35+
if start_time:
36+
self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
37+
else:
38+
self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
39+
if stop_time:
40+
self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S")
41+
else:
42+
self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")
3243

3344
self.only_schemas = only_schemas if only_schemas else None
3445
self.only_tables = only_tables if only_tables else None
35-
self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever)
46+
self.no_pk, self.flashback, self.stop_never = (no_pk, flashback, stop_never)
3647

3748
self.binlogList = []
38-
self.connection = pymysql.connect(**self.connectionSettings)
39-
try:
40-
cur = self.connection.cursor()
41-
cur.execute("SHOW MASTER STATUS")
42-
self.eofFile, self.eofPos = cur.fetchone()[:2]
43-
cur.execute("SHOW MASTER LOGS")
44-
binIndex = [row[0] for row in cur.fetchall()]
45-
if self.startFile not in binIndex:
46-
raise ValueError('parameter error: startFile %s not in mysql server' % self.startFile)
49+
self.connection = pymysql.connect(**self.conn_setting)
50+
with self.connection as cursor:
51+
cursor.execute("SHOW MASTER STATUS")
52+
self.eof_file, self.eof_pos = cursor.fetchone()[:2]
53+
cursor.execute("SHOW MASTER LOGS")
54+
bin_index = [row[0] for row in cursor.fetchall()]
55+
if self.start_file not in bin_index:
56+
raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file)
4757
binlog2i = lambda x: x.split('.')[1]
48-
for bin in binIndex:
49-
if binlog2i(bin) >= binlog2i(self.startFile) and binlog2i(bin) <= binlog2i(self.endFile):
50-
self.binlogList.append(bin)
58+
for binary in bin_index:
59+
if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file):
60+
self.binlogList.append(binary)
5161

52-
cur.execute("SELECT @@server_id")
53-
self.serverId = cur.fetchone()[0]
54-
if not self.serverId:
55-
raise ValueError('need set server_id in mysql server %s:%s' % (self.connectionSettings['host'], self.connectionSettings['port']))
56-
finally:
57-
cur.close()
62+
cursor.execute("SELECT @@server_id")
63+
self.server_id = cursor.fetchone()[0]
64+
if not self.server_id:
65+
raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port']))
5866

5967
def process_binlog(self):
60-
stream = BinLogStreamReader(connection_settings=self.connectionSettings, server_id=self.serverId,
61-
log_file=self.startFile, log_pos=self.startPos, only_schemas=self.only_schemas,
68+
stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id,
69+
log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas,
6270
only_tables=self.only_tables, resume_stream=True)
6371

64-
cur = self.connection.cursor()
65-
tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile.
66-
ftmp = open(tmpFile ,"w")
67-
flagLastEvent = False
68-
eStartPos, lastPos = stream.log_pos, stream.log_pos
69-
try:
70-
for binlogevent in stream:
71-
if not self.stopnever:
72-
if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
73-
flagLastEvent = True
74-
elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime:
75-
if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
76-
lastPos = binlogevent.packet.log_pos
72+
flag_last_event = False
73+
e_start_pos, last_pos = stream.log_pos, stream.log_pos
74+
# to simplify code, we do not use flock for tmp_file.
75+
tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))
76+
with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor:
77+
for binlog_event in stream:
78+
if not self.stop_never:
79+
try:
80+
event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
81+
except OSError:
82+
event_time = datetime.datetime(1980, 1, 1, 0, 0)
83+
if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \
84+
(stream.log_file == self.eof_file and stream.log_pos == self.eof_pos):
85+
flag_last_event = True
86+
elif event_time < self.start_time:
87+
if not (isinstance(binlog_event, RotateEvent)
88+
or isinstance(binlog_event, FormatDescriptionEvent)):
89+
last_pos = binlog_event.packet.log_pos
7790
continue
78-
elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime):
91+
elif (stream.log_file not in self.binlogList) or \
92+
(self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \
93+
(stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \
94+
(event_time >= self.stop_time):
7995
break
8096
# else:
8197
# raise ValueError('unknown binlog file or position')
8298

83-
if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN':
84-
eStartPos = lastPos
99+
if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
100+
e_start_pos = last_pos
85101

86-
if isinstance(binlogevent, QueryEvent):
87-
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk)
102+
if isinstance(binlog_event, QueryEvent):
103+
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event,
104+
flashback=self.flashback, no_pk=self.no_pk)
88105
if sql:
89-
print sql
90-
elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent):
91-
for row in binlogevent.rows:
92-
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos)
106+
print(sql)
107+
elif isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) or\
108+
isinstance(binlog_event, DeleteRowsEvent):
109+
for row in binlog_event.rows:
110+
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk,
111+
row=row, flashback=self.flashback, e_start_pos=e_start_pos)
93112
if self.flashback:
94-
ftmp.write(sql + '\n')
113+
f_tmp.write(sql + '\n')
95114
else:
96-
print sql
115+
print(sql)
97116

98-
if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
99-
lastPos = binlogevent.packet.log_pos
100-
if flagLastEvent:
117+
if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)):
118+
last_pos = binlog_event.packet.log_pos
119+
if flag_last_event:
101120
break
102-
ftmp.close()
103121

122+
stream.close()
123+
f_tmp.close()
104124
if self.flashback:
105-
self.print_rollback_sql(tmpFile)
106-
finally:
107-
os.remove(tmpFile)
108-
cur.close()
109-
stream.close()
125+
print_rollback_sql(filename=tmp_file)
110126
return True
111127

112-
def print_rollback_sql(self, fin):
113-
'''print rollback sql from tmpfile'''
114-
with open(fin) as ftmp:
115-
sleepInterval = 1000
116-
i = 0
117-
for line in reversed_lines(ftmp):
118-
print line.rstrip()
119-
if i >= sleepInterval:
120-
print 'SELECT SLEEP(1);'
121-
i = 0
122-
else:
123-
i += 1
124-
125128
def __del__(self):
126129
pass
127130

128131

129132
if __name__ == '__main__':
130-
131133
args = command_line_args(sys.argv[1:])
132-
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
133-
binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile,
134-
startPos=args.startPos, endFile=args.endFile, endPos=args.endPos,
135-
startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases,
136-
only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever)
134+
conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}
135+
binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos,
136+
end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time,
137+
stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables,
138+
no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never)
137139
binlog2sql.process_binlog()

0 commit comments

Comments
 (0)