|
1 | 1 | #!/usr/bin/env python |
2 | 2 | # -*- coding: utf-8 -*- |
3 | 3 |
|
4 | | -import os, sys, datetime |
| 4 | +import sys |
| 5 | +import datetime |
5 | 6 | import pymysql |
6 | 7 | from pymysqlreplication import BinLogStreamReader |
7 | 8 | from pymysqlreplication.row_event import ( |
|
10 | 11 | DeleteRowsEvent, |
11 | 12 | ) |
12 | 13 | from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent |
13 | | -from binlog2sql_util import command_line_args, concat_sql_from_binlogevent, create_unique_file, reversed_lines |
| 14 | +from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, \ |
| 15 | + temp_open, print_rollback_sql |
| 16 | + |
14 | 17 |
|
15 | 18 | class Binlog2sql(object): |
16 | 19 |
|
17 | | - def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, endPos=None, startTime=None, |
18 | | - stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False): |
19 | | - ''' |
20 | | - connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave} |
21 | | - ''' |
22 | | - if not startFile: |
23 | | - raise ValueError('lack of parameter,startFile.') |
24 | | - |
25 | | - self.connectionSettings = connectionSettings |
26 | | - self.startFile = startFile |
27 | | - self.startPos = startPos if startPos else 4 # use binlog v4 |
28 | | - self.endFile = endFile if endFile else startFile |
29 | | - self.endPos = endPos |
30 | | - self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") |
31 | | - self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") |
| 20 | + def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None, |
| 21 | + start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False, |
| 22 | + flashback=False, stop_never=False): |
| 23 | + """ |
| 24 | + conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'} |
| 25 | + """ |
| 26 | + |
| 27 | + if not start_file: |
| 28 | + raise ValueError('Lack of parameter: start_file') |
| 29 | + |
| 30 | + self.conn_setting = connection_settings |
| 31 | + self.start_file = start_file |
| 32 | + self.start_pos = start_pos if start_pos else 4 # use binlog v4 |
| 33 | + self.end_file = end_file if end_file else start_file |
| 34 | + self.end_pos = end_pos |
| 35 | + if start_time: |
| 36 | + self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") |
| 37 | + else: |
| 38 | + self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") |
| 39 | + if stop_time: |
| 40 | + self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S") |
| 41 | + else: |
| 42 | + self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") |
32 | 43 |
|
33 | 44 | self.only_schemas = only_schemas if only_schemas else None |
34 | 45 | self.only_tables = only_tables if only_tables else None |
35 | | - self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever) |
| 46 | + self.no_pk, self.flashback, self.stop_never = (no_pk, flashback, stop_never) |
36 | 47 |
|
37 | 48 | self.binlogList = [] |
38 | | - self.connection = pymysql.connect(**self.connectionSettings) |
39 | | - try: |
40 | | - cur = self.connection.cursor() |
41 | | - cur.execute("SHOW MASTER STATUS") |
42 | | - self.eofFile, self.eofPos = cur.fetchone()[:2] |
43 | | - cur.execute("SHOW MASTER LOGS") |
44 | | - binIndex = [row[0] for row in cur.fetchall()] |
45 | | - if self.startFile not in binIndex: |
46 | | - raise ValueError('parameter error: startFile %s not in mysql server' % self.startFile) |
| 49 | + self.connection = pymysql.connect(**self.conn_setting) |
| 50 | + with self.connection as cursor: |
| 51 | + cursor.execute("SHOW MASTER STATUS") |
| 52 | + self.eof_file, self.eof_pos = cursor.fetchone()[:2] |
| 53 | + cursor.execute("SHOW MASTER LOGS") |
| 54 | + bin_index = [row[0] for row in cursor.fetchall()] |
| 55 | + if self.start_file not in bin_index: |
| 56 | + raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file) |
47 | 57 | binlog2i = lambda x: x.split('.')[1] |
48 | | - for bin in binIndex: |
49 | | - if binlog2i(bin) >= binlog2i(self.startFile) and binlog2i(bin) <= binlog2i(self.endFile): |
50 | | - self.binlogList.append(bin) |
| 58 | + for binary in bin_index: |
| 59 | + if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file): |
| 60 | + self.binlogList.append(binary) |
51 | 61 |
|
52 | | - cur.execute("SELECT @@server_id") |
53 | | - self.serverId = cur.fetchone()[0] |
54 | | - if not self.serverId: |
55 | | - raise ValueError('need set server_id in mysql server %s:%s' % (self.connectionSettings['host'], self.connectionSettings['port'])) |
56 | | - finally: |
57 | | - cur.close() |
| 62 | + cursor.execute("SELECT @@server_id") |
| 63 | + self.server_id = cursor.fetchone()[0] |
| 64 | + if not self.server_id: |
| 65 | + raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port'])) |
58 | 66 |
|
59 | 67 | def process_binlog(self): |
60 | | - stream = BinLogStreamReader(connection_settings=self.connectionSettings, server_id=self.serverId, |
61 | | - log_file=self.startFile, log_pos=self.startPos, only_schemas=self.only_schemas, |
| 68 | + stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id, |
| 69 | + log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, |
62 | 70 | only_tables=self.only_tables, resume_stream=True) |
63 | 71 |
|
64 | | - cur = self.connection.cursor() |
65 | | - tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile. |
66 | | - ftmp = open(tmpFile ,"w") |
67 | | - flagLastEvent = False |
68 | | - eStartPos, lastPos = stream.log_pos, stream.log_pos |
69 | | - try: |
70 | | - for binlogevent in stream: |
71 | | - if not self.stopnever: |
72 | | - if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos): |
73 | | - flagLastEvent = True |
74 | | - elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime: |
75 | | - if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): |
76 | | - lastPos = binlogevent.packet.log_pos |
| 72 | + flag_last_event = False |
| 73 | + e_start_pos, last_pos = stream.log_pos, stream.log_pos |
| 74 | + # to simplify code, we do not use flock for tmp_file. |
| 75 | + tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) |
| 76 | + with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor: |
| 77 | + for binlog_event in stream: |
| 78 | + if not self.stop_never: |
| 79 | + try: |
| 80 | + event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp) |
| 81 | + except OSError: |
| 82 | + event_time = datetime.datetime(1980, 1, 1, 0, 0) |
| 83 | + if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \ |
| 84 | + (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos): |
| 85 | + flag_last_event = True |
| 86 | + elif event_time < self.start_time: |
| 87 | + if not (isinstance(binlog_event, RotateEvent) |
| 88 | + or isinstance(binlog_event, FormatDescriptionEvent)): |
| 89 | + last_pos = binlog_event.packet.log_pos |
77 | 90 | continue |
78 | | - elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime): |
| 91 | + elif (stream.log_file not in self.binlogList) or \ |
| 92 | + (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \ |
| 93 | + (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \ |
| 94 | + (event_time >= self.stop_time): |
79 | 95 | break |
80 | 96 | # else: |
81 | 97 | # raise ValueError('unknown binlog file or position') |
82 | 98 |
|
83 | | - if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN': |
84 | | - eStartPos = lastPos |
| 99 | + if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN': |
| 100 | + e_start_pos = last_pos |
85 | 101 |
|
86 | | - if isinstance(binlogevent, QueryEvent): |
87 | | - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk) |
| 102 | + if isinstance(binlog_event, QueryEvent): |
| 103 | + sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, |
| 104 | + flashback=self.flashback, no_pk=self.no_pk) |
88 | 105 | if sql: |
89 | | - print sql |
90 | | - elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent): |
91 | | - for row in binlogevent.rows: |
92 | | - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos) |
| 106 | + print(sql) |
| 107 | + elif isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) or\ |
| 108 | + isinstance(binlog_event, DeleteRowsEvent): |
| 109 | + for row in binlog_event.rows: |
| 110 | + sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk, |
| 111 | + row=row, flashback=self.flashback, e_start_pos=e_start_pos) |
93 | 112 | if self.flashback: |
94 | | - ftmp.write(sql + '\n') |
| 113 | + f_tmp.write(sql + '\n') |
95 | 114 | else: |
96 | | - print sql |
| 115 | + print(sql) |
97 | 116 |
|
98 | | - if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): |
99 | | - lastPos = binlogevent.packet.log_pos |
100 | | - if flagLastEvent: |
| 117 | + if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)): |
| 118 | + last_pos = binlog_event.packet.log_pos |
| 119 | + if flag_last_event: |
101 | 120 | break |
102 | | - ftmp.close() |
103 | 121 |
|
| 122 | + stream.close() |
| 123 | + f_tmp.close() |
104 | 124 | if self.flashback: |
105 | | - self.print_rollback_sql(tmpFile) |
106 | | - finally: |
107 | | - os.remove(tmpFile) |
108 | | - cur.close() |
109 | | - stream.close() |
| 125 | + print_rollback_sql(filename=tmp_file) |
110 | 126 | return True |
111 | 127 |
|
112 | | - def print_rollback_sql(self, fin): |
113 | | - '''print rollback sql from tmpfile''' |
114 | | - with open(fin) as ftmp: |
115 | | - sleepInterval = 1000 |
116 | | - i = 0 |
117 | | - for line in reversed_lines(ftmp): |
118 | | - print line.rstrip() |
119 | | - if i >= sleepInterval: |
120 | | - print 'SELECT SLEEP(1);' |
121 | | - i = 0 |
122 | | - else: |
123 | | - i += 1 |
124 | | - |
125 | 128 | def __del__(self): |
126 | 129 | pass |
127 | 130 |
|
128 | 131 |
|
129 | 132 | if __name__ == '__main__': |
130 | | - |
131 | 133 | args = command_line_args(sys.argv[1:]) |
132 | | - connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password} |
133 | | - binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile, |
134 | | - startPos=args.startPos, endFile=args.endFile, endPos=args.endPos, |
135 | | - startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases, |
136 | | - only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever) |
| 134 | + conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'} |
| 135 | + binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, |
| 136 | + end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time, |
| 137 | + stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables, |
| 138 | + no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never) |
137 | 139 | binlog2sql.process_binlog() |
0 commit comments