diff -r a30cdf366c02 Doc/library/io.rst
--- a/Doc/library/io.rst Tue Jan 10 11:30:44 2017 +0800
+++ b/Doc/library/io.rst Wed Jan 11 21:49:21 2017 +0900
@@ -833,21 +833,20 @@ Text I/O
Return the current stream position as an opaque number. The number
does not usually represent a number of bytes in the underlying
binary storage.
.. method:: write(s)
Write the string *s* to the stream and return the number of characters
written.
-
.. class:: TextIOWrapper(buffer, encoding=None, errors=None, newline=None, \
line_buffering=False, write_through=False)
A buffered text stream over a :class:`BufferedIOBase` binary stream.
It inherits :class:`TextIOBase`.
*encoding* gives the name of the encoding that the stream will be decoded or
encoded with. It defaults to
:func:`locale.getpreferredencoding(False) `.
@@ -894,27 +893,40 @@ Text I/O
.. versionchanged:: 3.3
The *write_through* argument has been added.
.. versionchanged:: 3.3
The default *encoding* is now ``locale.getpreferredencoding(False)``
instead of ``locale.getpreferredencoding()``. Don't change temporary the
locale encoding using :func:`locale.setlocale`, use the current locale
encoding instead of the user preferred encoding.
- :class:`TextIOWrapper` provides one attribute in addition to those of
+ :class:`TextIOWrapper` provides these members in addition to those of
:class:`TextIOBase` and its parents:
.. attribute:: line_buffering
Whether line buffering is enabled.
+ .. method:: set_encoding(encoding=None, errors=None[, newline])
+
+ Change the encoding, error handler, and newline handler.
+ If *encoding* is None or *newline* is unspecified, the existing
+ setting is retained. If *errors* is None, the default depends on
+ *encoding*: if *encoding* is also None, the existing error handler
+ is retained, otherwise it is reset to ``'strict'``.
+
+ It is not possible to change the encoding if some data has already
+ been read from the stream.
+
+ .. versionadded:: 3.7
+
.. class:: StringIO(initial_value='', newline='\\n')
An in-memory stream for text I/O. The text buffer is discarded when the
:meth:`~IOBase.close` method is called.
The initial value of the buffer can be set by providing *initial_value*.
If newline translation is enabled, newlines will be encoded as if by
:meth:`~TextIOBase.write`. The stream is positioned at the start of
the buffer.
diff -r a30cdf366c02 Lib/_pyio.py
--- a/Lib/_pyio.py Tue Jan 10 11:30:44 2017 +0800
+++ b/Lib/_pyio.py Wed Jan 11 21:49:21 2017 +0900
@@ -1939,25 +1939,21 @@ class TextIOWrapper(TextIOBase):
if errors is None:
errors = "strict"
else:
if not isinstance(errors, str):
raise ValueError("invalid errors: %r" % errors)
self._buffer = buffer
self._line_buffering = line_buffering
self._encoding = encoding
self._errors = errors
- self._readuniversal = not newline
- self._readtranslate = newline is None
- self._readnl = newline
- self._writetranslate = newline != ''
- self._writenl = newline or os.linesep
+ self._set_newline(newline)
self._encoder = None
self._decoder = None
self._decoded_chars = '' # buffer for text returned from decoder
self._decoded_chars_used = 0 # offset into _decoded_chars for read()
self._snapshot = None # info for reconstructing decoder state
self._seekable = self._telling = self.buffer.seekable()
self._has_read1 = hasattr(self.buffer, 'read1')
self._b2cratio = 0.0
if self._seekable and self.writable():
@@ -1988,20 +1984,79 @@ class TextIOWrapper(TextIOBase):
else:
result += " name={0!r}".format(name)
try:
mode = self.mode
except Exception:
pass
else:
result += " mode={0!r}".format(mode)
return result + " encoding={0!r}>".format(self.encoding)
+ def set_encoding(self, encoding=None, errors=None, newline=Ellipsis):
+ """Change the encoding of the stream.
+
+ It is not possible to change the encoding if some data has already
+ been read from the stream.
+ """
+ old_encoding = codecs.lookup(self._encoding).name
+ if encoding is None:
+ encoding = old_encoding
+ if errors is None:
+ errors = self._errors
+ else:
+ if not isinstance(encoding, str):
+ raise ValueError("invalid encoding: %r" % encoding)
+
+ if errors is None:
+ errors = 'strict'
+
+ encoding = codecs.lookup(encoding).name
+ if newline is Ellipsis:
+ newline = self._readnl
+ if encoding == old_encoding and errors == self._errors \
+ and newline == self._readnl:
+ # no change
+ return
+
+ if self._decoder is not None:
+ raise UnsupportedOperation(
+ "It is not possible to set the encoding of stream after "
+ "the first read")
+
+ # flush write buffer
+ self.flush()
+
+ # reset attributes
+ self._encoding = encoding
+ self._errors = errors
+ self._encoder = None
+ self._decoder = None
+ self._b2cratio = 0.0
+ self._set_newline(newline)
+
+ # don't write a BOM in the middle of a file
+ if self._seekable and self.writable():
+ position = self.buffer.tell()
+ if position != 0:
+ try:
+ self._get_encoder().setstate(0)
+ except LookupError:
+ # Sometimes the encoder doesn't exist
+ pass
+
+ def _set_newline(self, newline):
+ self._readuniversal = not newline
+ self._readtranslate = newline is None
+ self._readnl = newline
+ self._writetranslate = newline != ''
+ self._writenl = newline or os.linesep
+
@property
def encoding(self):
return self._encoding
@property
def errors(self):
return self._errors
@property
def line_buffering(self):
diff -r a30cdf366c02 Lib/test/test_io.py
--- a/Lib/test/test_io.py Tue Jan 10 11:30:44 2017 +0800
+++ b/Lib/test/test_io.py Wed Jan 11 21:49:21 2017 +0900
@@ -3215,20 +3215,148 @@ class TextIOWrapperTest(unittest.TestCas
for i in range(10):
try:
self.TextIOWrapper(F(), encoding='utf-8')
except Exception:
pass
F.tell = lambda x: 0
t = self.TextIOWrapper(F(), encoding='utf-8')
+ def test_set_encoding_same_codec(self):
+ data = 'foobar\n'.encode('latin1')
+ raw = self.BytesIO(data)
+ txt = self.TextIOWrapper(raw, encoding='latin1')
+ self.assertEqual(txt.encoding, 'latin1')
+
+ # Just an alias, shouldn't change anything
+ txt.set_encoding('ISO-8859-1')
+ self.assertEqual(txt.encoding, 'latin1')
+
+ # This is an actual change
+ txt.set_encoding('iso8859-15')
+ self.assertEqual(txt.encoding, 'iso8859-15')
+
+ def test_set_encoding_read(self):
+ # latin1 -> utf8
+ # (latin1 can decode utf-8 encoded string)
+ data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
+ raw = self.BytesIO(data)
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ self.assertEqual(txt.readline(), 'abc\xe9\n')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.set_encoding('utf-8')
+
+ def test_set_encoding_write_fromascii(self):
+ # ascii has a specific encodefunc in the C implementation,
+ # but utf-8-sig has not. Make sure that we get rid of the
+ # cached encodefunc when we switch encoders.
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('foo\n')
+ txt.set_encoding('utf-8-sig')
+ txt.write('\xe9\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
+
+ def test_set_encoding_write(self):
+ # latin -> utf8
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ txt.write('abc\xe9\n')
+ txt.set_encoding('utf-8')
+ self.assertEqual(raw.getvalue(), b'abc\xe9\n')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
+
+ # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
+ # the file
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.set_encoding('utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
+
+ def test_set_encoding_write_non_seekable(self):
+ raw = self.BytesIO()
+ raw.seekable = lambda: False
+ raw.seek = None
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.set_encoding('utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+
+ # If the raw stream is not seekable, there'll be a BOM
+ self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
+
+ def test_set_encoding_defaults(self):
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
+ txt.set_encoding(None, None)
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+ txt.write('LF\n')
+
+ txt.set_encoding(newline='\r\n')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+
+ txt.set_encoding(errors='ignore')
+ self.assertEqual(txt.encoding, 'ascii')
+ txt.write('CRLF\n')
+
+ txt.set_encoding(encoding='utf-8', newline=None)
+ self.assertEqual(txt.errors, 'strict')
+ txt.seek(0)
+ self.assertEqual(txt.read(), 'LF\nCRLF\n')
+
+ self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
+
+ def test_set_encoding_newline(self):
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.set_encoding(newline=None)
+ self.assertEqual(txt.readline(), 'CR\n')
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.set_encoding(newline='')
+ self.assertEqual(txt.readline(), 'CR\r')
+ raw = self.BytesIO(b'CR\rLF\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.set_encoding(newline='\n')
+ self.assertEqual(txt.readline(), 'CR\rLF\n')
+ raw = self.BytesIO(b'LF\nCR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.set_encoding(newline='\r')
+ self.assertEqual(txt.readline(), 'LF\nCR\r')
+ raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.set_encoding(newline='\r\n')
+ self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
+
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
+ txt.set_encoding(newline=None)
+ txt.write('linesep\n')
+ txt.set_encoding(newline='')
+ txt.write('LF\n')
+ txt.set_encoding(newline='\n')
+ txt.write('LF\n')
+ txt.set_encoding(newline='\r')
+ txt.write('CR\n')
+ txt.set_encoding(newline='\r\n')
+ txt.write('CRLF\n')
+ expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
+ self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
+
class MemviewBytesIO(io.BytesIO):
'''A BytesIO object whose read method returns memoryviews
rather than bytes'''
def read1(self, len_):
return _to_memoryview(super().read1(len_))
def read(self, len_):
return _to_memoryview(super().read(len_))
diff -r a30cdf366c02 Modules/_io/textio.c
--- a/Modules/_io/textio.c Tue Jan 10 11:30:44 2017 +0800
+++ b/Modules/_io/textio.c Wed Jan 11 21:49:21 2017 +0900
@@ -632,21 +632,21 @@ typedef struct
PyObject_HEAD
int ok; /* initialized? */
int detached;
Py_ssize_t chunk_size;
PyObject *buffer;
PyObject *encoding;
PyObject *encoder;
PyObject *decoder;
PyObject *readnl;
PyObject *errors;
- const char *writenl; /* utf-8 encoded, NULL stands for \n */
+ const char *writenl; /* ASCII-encoded; NULL stands for \n */
char line_buffering;
char write_through;
char readuniversal;
char readtranslate;
char writetranslate;
char seekable;
char has_read1;
char telling;
char finalizing;
/* Specialized encoding func (see below) */
@@ -778,20 +778,171 @@ static const encodefuncentry encodefuncs
{"utf-8", (encodefunc_t) utf8_encode},
{"utf-16-be", (encodefunc_t) utf16be_encode},
{"utf-16-le", (encodefunc_t) utf16le_encode},
{"utf-16", (encodefunc_t) utf16_encode},
{"utf-32-be", (encodefunc_t) utf32be_encode},
{"utf-32-le", (encodefunc_t) utf32le_encode},
{"utf-32", (encodefunc_t) utf32_encode},
{NULL, NULL}
};
+static int
+validate_newline(const char *newline)
+{
+ if (newline && newline[0] != '\0'
+ && !(newline[0] == '\n' && newline[1] == '\0')
+ && !(newline[0] == '\r' && newline[1] == '\0')
+ && !(newline[0] == '\r' && newline[1] == '\n' && newline[2] == '\0')) {
+ PyErr_Format(PyExc_ValueError,
+ "illegal newline value: %s", newline);
+ return -1;
+ }
+ return 0;
+}
+
+static int
+set_newline(textio *self, const char *newline)
+{
+ PyObject *old = self->readnl;
+ if (newline == NULL) {
+ self->readnl = NULL;
+ }
+ else {
+ self->readnl = PyUnicode_FromString(newline);
+ if (self->readnl == NULL) {
+ self->readnl = old;
+ return -1;
+ }
+ }
+ self->readuniversal = (newline == NULL || newline[0] == '\0');
+ self->readtranslate = (newline == NULL);
+ self->writetranslate = (newline == NULL || newline[0] != '\0');
+ if (!self->readuniversal && self->readnl != NULL) {
+ assert(PyUnicode_KIND(self->readnl) == PyUnicode_1BYTE_KIND);
+ self->writenl = (const char *)PyUnicode_1BYTE_DATA(self->readnl);
+ if (strcmp(self->writenl, "\n") == 0) {
+ self->writenl = NULL;
+ }
+ }
+ else {
+#ifdef MS_WINDOWS
+ self->writenl = "\r\n";
+#else
+ self->writenl = NULL;
+#endif
+ }
+ Py_XDECREF(old);
+ return 0;
+}
+
+static int
+_textiowrapper_set_decoder(textio *self, PyObject *codec_info,
+ const char *errors)
+{
+ PyObject *res;
+ int r;
+
+ res = _PyObject_CallMethodId(self->buffer, &PyId_readable, NULL);
+ if (res == NULL)
+ return -1;
+
+ r = PyObject_IsTrue(res);
+ Py_DECREF(res);
+ if (r == -1)
+ return -1;
+
+ if (r != 1)
+ return 0;
+
+ Py_CLEAR(self->decoder);
+ self->decoder = _PyCodecInfo_GetIncrementalDecoder(codec_info, errors);
+ if (self->decoder == NULL)
+ return -1;
+
+ if (self->readuniversal) {
+ PyObject *incrementalDecoder = PyObject_CallFunction(
+ (PyObject *)&PyIncrementalNewlineDecoder_Type,
+ "Oi", self->decoder, (int)self->readtranslate);
+ if (incrementalDecoder == NULL)
+ return -1;
+ Py_CLEAR(self->decoder);
+ self->decoder = incrementalDecoder;
+ }
+
+ return 0;
+}
+
+static PyObject*
+_textiowrapper_decode(PyObject *decoder, PyObject *bytes, int eof)
+{
+ PyObject *chars;
+
+ if (Py_TYPE(decoder) == &PyIncrementalNewlineDecoder_Type)
+ chars = _PyIncrementalNewlineDecoder_decode(decoder, bytes, eof);
+ else
+ chars = PyObject_CallMethodObjArgs(decoder, _PyIO_str_decode, bytes,
+ eof ? Py_True : Py_False, NULL);
+
+ if (check_decoded(chars) < 0)
+ // check_decoded already decreases refcount
+ return NULL;
+
+ return chars;
+}
+
+static int
+_textiowrapper_set_encoder(textio *self, PyObject *codec_info,
+ const char *errors)
+{
+ PyObject *res;
+ int r;
+
+ res = _PyObject_CallMethodId(self->buffer, &PyId_writable, NULL);
+ if (res == NULL)
+ return -1;
+
+ r = PyObject_IsTrue(res);
+ Py_DECREF(res);
+ if (r == -1)
+ return -1;
+
+ if (r != 1)
+ return 0;
+
+ Py_CLEAR(self->encoder);
+ self->encodefunc = NULL;
+ self->encoder = _PyCodecInfo_GetIncrementalEncoder(codec_info, errors);
+ if (self->encoder == NULL)
+ return -1;
+
+ /* Get the normalized named of the codec */
+ res = _PyObject_GetAttrId(codec_info, &PyId_name);
+ if (res == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_AttributeError))
+ PyErr_Clear();
+ else
+ return -1;
+ }
+ else if (PyUnicode_Check(res)) {
+ const encodefuncentry *e = encodefuncs;
+ while (e->name != NULL) {
+ if (_PyUnicode_EqualToASCIIString(res, e->name)) {
+ self->encodefunc = e->encodefunc;
+ break;
+ }
+ e++;
+ }
+ }
+ Py_XDECREF(res);
+
+ return 0;
+}
/*[clinic input]
_io.TextIOWrapper.__init__
buffer: object
encoding: str(accept={str, NoneType}) = NULL
errors: str(accept={str, NoneType}) = NULL
newline: str(accept={str, NoneType}) = NULL
line_buffering: int(c_default="0") = False
write_through: int(c_default="0") = False
@@ -833,26 +984,21 @@ static int
/*[clinic end generated code: output=56a83402ce2a8381 input=3126cb3101a2c99b]*/
{
PyObject *raw, *codec_info = NULL;
_PyIO_State *state = NULL;
PyObject *res;
int r;
self->ok = 0;
self->detached = 0;
- if (newline && newline[0] != '\0'
- && !(newline[0] == '\n' && newline[1] == '\0')
- && !(newline[0] == '\r' && newline[1] == '\0')
- && !(newline[0] == '\r' && newline[1] == '\n' && newline[2] == '\0')) {
- PyErr_Format(PyExc_ValueError,
- "illegal newline value: %s", newline);
+ if (validate_newline(newline) < 0) {
return -1;
}
Py_CLEAR(self->buffer);
Py_CLEAR(self->encoding);
Py_CLEAR(self->encoder);
Py_CLEAR(self->decoder);
Py_CLEAR(self->readnl);
Py_CLEAR(self->decoded_chars);
Py_CLEAR(self->pending_bytes);
@@ -946,106 +1092,40 @@ static int
* of the partially constructed object (like self->encoding)
*/
if (errors == NULL)
errors = "strict";
self->errors = PyBytes_FromString(errors);
if (self->errors == NULL)
goto error;
self->chunk_size = 8192;
- self->readuniversal = (newline == NULL || newline[0] == '\0');
self->line_buffering = line_buffering;
self->write_through = write_through;
- self->readtranslate = (newline == NULL);
- if (newline) {
- self->readnl = PyUnicode_FromString(newline);
- if (self->readnl == NULL)
- goto error;
+ if (set_newline(self, newline) < 0) {
+ goto error;
}
- self->writetranslate = (newline == NULL || newline[0] != '\0');
- if (!self->readuniversal && self->readnl) {
- self->writenl = PyUnicode_AsUTF8(self->readnl);
- if (self->writenl == NULL)
- goto error;
- if (!strcmp(self->writenl, "\n"))
- self->writenl = NULL;
- }
-#ifdef MS_WINDOWS
- else
- self->writenl = "\r\n";
-#endif
-
+
+ self->buffer = buffer;
+ Py_INCREF(buffer);
+
/* Build the decoder object */
- res = _PyObject_CallMethodId(buffer, &PyId_readable, NULL);
- if (res == NULL)
- goto error;
- r = PyObject_IsTrue(res);
- Py_DECREF(res);
- if (r == -1)
+ if (_textiowrapper_set_decoder(self, codec_info, errors) != 0)
goto error;
- if (r == 1) {
- self->decoder = _PyCodecInfo_GetIncrementalDecoder(codec_info,
- errors);
- if (self->decoder == NULL)
- goto error;
-
- if (self->readuniversal) {
- PyObject *incrementalDecoder = PyObject_CallFunction(
- (PyObject *)&PyIncrementalNewlineDecoder_Type,
- "Oi", self->decoder, (int)self->readtranslate);
- if (incrementalDecoder == NULL)
- goto error;
- Py_XSETREF(self->decoder, incrementalDecoder);
- }
- }
/* Build the encoder object */
- res = _PyObject_CallMethodId(buffer, &PyId_writable, NULL);
- if (res == NULL)
- goto error;
- r = PyObject_IsTrue(res);
- Py_DECREF(res);
- if (r == -1)
+ if (_textiowrapper_set_encoder(self, codec_info, errors) != 0)
goto error;
- if (r == 1) {
- self->encoder = _PyCodecInfo_GetIncrementalEncoder(codec_info,
- errors);
- if (self->encoder == NULL)
- goto error;
- /* Get the normalized named of the codec */
- res = _PyObject_GetAttrId(codec_info, &PyId_name);
- if (res == NULL) {
- if (PyErr_ExceptionMatches(PyExc_AttributeError))
- PyErr_Clear();
- else
- goto error;
- }
- else if (PyUnicode_Check(res)) {
- const encodefuncentry *e = encodefuncs;
- while (e->name != NULL) {
- if (_PyUnicode_EqualToASCIIString(res, e->name)) {
- self->encodefunc = e->encodefunc;
- break;
- }
- e++;
- }
- }
- Py_XDECREF(res);
- }
/* Finished sorting out the codec details */
Py_CLEAR(codec_info);
- self->buffer = buffer;
- Py_INCREF(buffer);
-
if (Py_TYPE(buffer) == &PyBufferedReader_Type ||
Py_TYPE(buffer) == &PyBufferedWriter_Type ||
Py_TYPE(buffer) == &PyBufferedRandom_Type) {
raw = _PyObject_GetAttrId(buffer, &PyId_raw);
/* Cache the raw FileIO object to speed up 'closed' checks */
if (raw == NULL) {
if (PyErr_ExceptionMatches(PyExc_AttributeError))
PyErr_Clear();
else
goto error;
@@ -1363,20 +1443,214 @@ static PyObject *
/* Steal a reference to chars and store it in the decoded_char buffer;
*/
static void
textiowrapper_set_decoded_chars(textio *self, PyObject *chars)
{
Py_XSETREF(self->decoded_chars, chars);
self->decoded_chars_used = 0;
}
+static PyObject*
+_textiowrapper_canonical_codec_name(PyObject *codec_name)
+{
+ char *c_name = NULL;
+ PyObject *codec_obj = NULL;
+ PyObject *canonical_name = NULL;
+
+ c_name = PyUnicode_AsUTF8(codec_name);
+ if (c_name == NULL)
+ goto err_out;
+
+ codec_obj = _PyCodec_Lookup(c_name);
+ if (codec_obj == NULL)
+ goto err_out;
+
+ canonical_name = PyObject_GetAttrString(codec_obj, "name");
+ Py_CLEAR(codec_obj);
+ if (canonical_name == NULL)
+ goto err_out;
+
+ return canonical_name;
+
+ err_out:
+ Py_CLEAR(canonical_name);
+ return NULL;
+}
+
+PyDoc_STRVAR(set_encoding_doc,
+"set_encoding(encoding=None, errors=None[, newline])\n"
+"\n"
+"Change the encoding of the stream.\n"
+"\n"
+" encoding\n"
+" Name of new encoding to use.\n"
+" errors\n"
+" New error handler to use.\n"
+" newline\n"
+" New newline handler.\n"
+"\n"
+"It is not possible to change the encoding if some data has already\n"
+"been read from the stream.");
+
+static PyObject *
+set_encoding(PyObject *selfobj, PyObject *posargs, PyObject *kwargs)
+{
+ PyObject *encoding = Py_None;
+ const char *errors = NULL;
+ const char *newline = (const char *)&newline; /* Unique non-NULL value */
+
+ static char *keywords[] = {"encoding", "errors", "newline", NULL};
+ if (!PyArg_ParseTupleAndKeywords(
+ posargs, kwargs, "|Ozz:set_encoding", keywords,
+ &encoding, &errors, &newline)) {
+ return NULL;
+ }
+
+ char res;
+ PyObject *encoding_cname, *old_encoding_cname; // canonical name
+
+ textio *self = (textio *)selfobj;
+ CHECK_INITIALIZED(self);
+
+ /* Use existing settings where new settings are not specified */
+ if (encoding == Py_None) {
+ encoding = self->encoding;
+ if (errors == NULL) {
+ errors = PyBytes_AS_STRING(self->errors);
+ }
+ }
+ else if (errors == NULL) {
+ errors = "strict";
+ }
+ if (newline == (const char *)&newline) {
+ if (self->readnl == NULL) {
+ newline = NULL;
+ }
+ else {
+ assert(PyUnicode_KIND(self->readnl) == PyUnicode_1BYTE_KIND);
+ newline = (const char *)PyUnicode_1BYTE_DATA(self->readnl);
+ }
+ }
+ else if (validate_newline(newline) < 0) {
+ return NULL;
+ }
+
+ /* Get the normalized named of the old and new codec */
+ encoding_cname = _textiowrapper_canonical_codec_name(encoding);
+ if (encoding_cname == NULL)
+ return NULL;
+ old_encoding_cname = _textiowrapper_canonical_codec_name(self->encoding);
+ if (old_encoding_cname == NULL) {
+ Py_CLEAR(encoding_cname);
+ return NULL;
+ }
+
+ /* Compare with current codec and error handler */
+ res = (PyUnicode_Compare(encoding_cname, old_encoding_cname) == 0);
+ Py_CLEAR(encoding_cname);
+ Py_CLEAR(old_encoding_cname);
+ if (res && strcmp(PyBytes_AS_STRING(self->errors), errors) == 0 && (
+ (newline == NULL && self->readnl == NULL)
+ || (newline != NULL && self->readnl != NULL
+ && PyUnicode_CompareWithASCIIString(self->readnl, newline) == 0)
+ )) {
+ // No change
+ Py_RETURN_NONE;
+ }
+
+ /* Check if something is in the read buffer */
+ if (self->decoded_chars != NULL) {
+ _unsupported("It is not possible to set the encoding "
+ "of a non seekable file after the first read");
+ return NULL;
+ }
+ assert(self->decoded_chars_used == 0);
+
+ // Flush write buffer
+ if (_textiowrapper_writeflush(self) != 0)
+ return NULL;
+
+ self->b2cratio = 0;
+
+ PyObject *old_encoding = self->encoding;
+ self->encoding = encoding;
+ Py_INCREF(self->encoding);
+ Py_DECREF(old_encoding);
+
+ if (errors != PyBytes_AS_STRING(self->errors)) {
+ PyObject *new = PyBytes_FromString(errors);
+ if (new == NULL) {
+ return NULL;
+ }
+ self->errors = new;
+ }
+
+ if (set_newline(self, newline) < 0) {
+ return NULL;
+ }
+
+ // Create new encoder & decoder
+ PyObject *codec_info = _PyCodec_LookupTextEncoding(
+ PyUnicode_AsUTF8(encoding), "codecs.open()");
+ if (codec_info == NULL) {
+ return NULL;
+ }
+ if (_textiowrapper_set_decoder(self, codec_info, errors) != 0 ||
+ _textiowrapper_set_encoder(self, codec_info, errors) != 0) {
+ Py_DECREF(codec_info);
+ return NULL;
+ }
+ Py_DECREF(codec_info);
+
+ if (self->seekable) {
+ char writeable;
+ PyObject *res;
+
+ res = _PyObject_CallMethodId(self->buffer, &PyId_writable, NULL);
+ if (res == NULL)
+ return NULL;
+ writeable = PyObject_IsTrue(res);
+ Py_DECREF(res);
+
+ if (writeable) {
+ PyObject *posobj = NULL;
+ char cmp;
+ posobj = _PyObject_CallMethodId(self->buffer, &PyId_tell, NULL);
+ if (posobj == NULL)
+ return NULL;
+
+ /* We have a writable, seekable stream. Check if we're at the
+ beginning */
+ cmp = PyObject_RichCompareBool(posobj, _PyIO_zero, Py_EQ);
+ Py_DECREF(posobj);
+ if (cmp < 0)
+ return NULL;
+
+ // don't write a BOM in the middle of a file
+ if (cmp) {
+ self->encoding_start_of_stream = 1;
+ } else {
+ PyObject *res;
+ self->encoding_start_of_stream = 0;
+ res = PyObject_CallMethodObjArgs(self->encoder, _PyIO_str_setstate,
+ _PyIO_zero, NULL);
+ if (res == NULL)
+ return NULL;
+ Py_DECREF(res);
+ }
+ } /* writeable */
+ } /* seekable */
+
+ Py_RETURN_NONE;
+}
+
static PyObject *
textiowrapper_get_decoded_chars(textio *self, Py_ssize_t n)
{
PyObject *chars;
Py_ssize_t avail;
if (self->decoded_chars == NULL)
return PyUnicode_FromStringAndSize(NULL, 0);
/* decoded_chars is guaranteed to be "ready". */
@@ -1476,32 +1750,26 @@ textiowrapper_read_chunk(textio *self, P
if (PyObject_GetBuffer(input_chunk, &input_chunk_buf, 0) != 0) {
PyErr_Format(PyExc_TypeError,
"underlying %s() should have returned a bytes-like object, "
"not '%.200s'", (self->has_read1 ? "read1": "read"),
Py_TYPE(input_chunk)->tp_name);
goto fail;
}
nbytes = input_chunk_buf.len;
eof = (nbytes == 0);
- if (Py_TYPE(self->decoder) == &PyIncrementalNewlineDecoder_Type) {
- decoded_chars = _PyIncrementalNewlineDecoder_decode(
- self->decoder, input_chunk, eof);
- }
- else {
- decoded_chars = PyObject_CallMethodObjArgs(self->decoder,
- _PyIO_str_decode, input_chunk, eof ? Py_True : Py_False, NULL);
- }
+
+ decoded_chars = _textiowrapper_decode(self->decoder, input_chunk, eof);
PyBuffer_Release(&input_chunk_buf);
-
- if (check_decoded(decoded_chars) < 0)
+ if (decoded_chars == NULL)
goto fail;
+
textiowrapper_set_decoded_chars(self, decoded_chars);
nchars = PyUnicode_GET_LENGTH(decoded_chars);
if (nchars > 0)
self->b2cratio = (double) nbytes / nchars;
else
self->b2cratio = 0.0;
if (nchars > 0)
eof = 0;
if (self->telling) {
@@ -2836,20 +3104,22 @@ static PyMethodDef textiowrapper_methods
_IO_TEXTIOWRAPPER_CLOSE_METHODDEF
_IO_TEXTIOWRAPPER_FILENO_METHODDEF
_IO_TEXTIOWRAPPER_SEEKABLE_METHODDEF
_IO_TEXTIOWRAPPER_READABLE_METHODDEF
_IO_TEXTIOWRAPPER_WRITABLE_METHODDEF
_IO_TEXTIOWRAPPER_ISATTY_METHODDEF
{"__getstate__", (PyCFunction)textiowrapper_getstate, METH_NOARGS},
_IO_TEXTIOWRAPPER_SEEK_METHODDEF
+ {"set_encoding", (PyCFunction)set_encoding, METH_KEYWORDS | METH_VARARGS,
+ set_encoding_doc},
_IO_TEXTIOWRAPPER_TELL_METHODDEF
_IO_TEXTIOWRAPPER_TRUNCATE_METHODDEF
{NULL, NULL}
};
static PyMemberDef textiowrapper_members[] = {
{"encoding", T_OBJECT, offsetof(textio, encoding), READONLY},
{"buffer", T_OBJECT, offsetof(textio, buffer), READONLY},
{"line_buffering", T_BOOL, offsetof(textio, line_buffering), READONLY},
{"_finalizing", T_BOOL, offsetof(textio, finalizing), 0},