# HG changeset patch
# User Kristján Valur Jónsson
# Date 1372286804 0
# Node ID 76ed20dd96b4a8dd58f2fcd685fd3203c78c59ea
# Parent 8f22e03f5f07ea8ba81b5537a01baa73d2d313f5
[mq]: ssl
diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -148,6 +148,9 @@
contain a certificate to be used to identify the local side of the
connection. See the discussion of :ref:`ssl-certificates` for more
information on how the certificate is stored in the ``certfile``.
+ Both ``keyfile`` and ``certfile`` can be either filenames or file-like
+ objects in binary mode. In the latter case, they only need to support
+ the ``read`` method.
The parameter ``server_side`` is a boolean which identifies whether
server-side or client-side behavior is desired from this socket.
@@ -165,6 +168,8 @@
the other end of the connection. See the discussion of
:ref:`ssl-certificates` for more information about how to arrange the
certificates in this file.
+ Alternatively, ``ca_certs`` can be a file-like object opened in binary
+ mode similar to ``keyfile`` and ``certfile`` above.
The parameter ``ssl_version`` specifies which version of the SSL protocol to
use. Typically, the server chooses a particular protocol version, and the
@@ -814,6 +819,8 @@
key will be taken from *certfile* as well. See the discussion of
:ref:`ssl-certificates` for more information on how the certificate
is stored in the *certfile*.
+ *certfile* and/or *keyfile* can also file-like objects opened in binary
+ mode, supporting the `read` method.
The *password* argument may be a function to call to get the password for
decrypting the private key. It will only be called if the private key is
@@ -833,6 +840,8 @@
.. versionchanged:: 3.3
New optional argument *password*.
+ .. versionchanged:: 3.4
+ File-like objects supported for *certfile* and *keyfile*
.. method:: SSLContext.load_verify_locations(cafile=None, capath=None)
@@ -861,6 +870,9 @@
..versionadded:: 3.4
+ The *cafile* parameter can be a file-like object opened in binary mode.
+ When provided, the *capath* argument must be None.
+
.. method:: SSLContext.set_default_verify_paths()
Load a set of default "certification authority" (CA) certificates from
diff --git a/Lib/ssl.py b/Lib/ssl.py
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -310,7 +310,7 @@
self._context = SSLContext(ssl_version)
self._context.verify_mode = cert_reqs
if ca_certs:
- self._context.load_verify_locations(ca_certs)
+ self.context.load_verify_locations(ca_certs)
if certfile:
self._context.load_cert_chain(certfile, keyfile)
if npn_protocols:
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -18,6 +18,7 @@
import weakref
import platform
import functools
+import io
from unittest import mock
ssl = support.import_module("ssl")
@@ -504,106 +505,156 @@
ctx.verify_mode = 42
def test_load_cert_chain(self):
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- # Combined key and cert in a single file
- ctx.load_cert_chain(CERTFILE)
- ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
- self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
- with self.assertRaises(OSError) as cm:
- ctx.load_cert_chain(WRONGCERT)
- self.assertEqual(cm.exception.errno, errno.ENOENT)
- with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
- ctx.load_cert_chain(BADCERT)
- with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
- ctx.load_cert_chain(EMPTYCERT)
- # Separate key and cert
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- ctx.load_cert_chain(ONLYCERT, ONLYKEY)
- ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
- ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
- with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
- ctx.load_cert_chain(ONLYCERT)
- with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
- ctx.load_cert_chain(ONLYKEY)
- with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
- ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
- # Mismatching key and cert
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
- ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
- # Password protected key and cert
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
- ctx.load_cert_chain(CERTFILE_PROTECTED,
- password=bytearray(KEY_PASSWORD.encode()))
- ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
- ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
- ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
- bytearray(KEY_PASSWORD.encode()))
- with self.assertRaisesRegex(TypeError, "should be a string"):
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
- with self.assertRaises(ssl.SSLError):
- ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
- with self.assertRaisesRegex(ValueError, "cannot be longer"):
- # openssl has a fixed limit on the password buffer.
- # PEM_BUFSIZE is generally set to 1kb.
- # Return a string larger than this.
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
- # Password callback
- def getpass_unicode():
- return KEY_PASSWORD
- def getpass_bytes():
- return KEY_PASSWORD.encode()
- def getpass_bytearray():
- return bytearray(KEY_PASSWORD.encode())
- def getpass_badpass():
- return "badpass"
- def getpass_huge():
- return b'a' * (1024 * 1024)
- def getpass_bad_type():
- return 9
- def getpass_exception():
- raise Exception('getpass error')
- class GetPassCallable:
- def __call__(self):
+ #try both file and data mode for both arguments, do that by delegation
+ class CTX:
+ def __init__(self, ctx, mode):
+ self.ctx = ctx
+ self.mode = mode
+ def load_cert_chain(self, certfile, keyfile=None, password=None):
+ args = {"password" : password}
+ if (self.mode & 1) == 0:
+ cf = certfile
+ else:
+ with open(certfile, "rb") as f:
+ cf = io.BytesIO(f.read())
+ if keyfile:
+ if (self.mode & 2) == 0:
+ kf = keyfile
+ else:
+ with open(keyfile, "rb") as f:
+ kf = io.BytesIO(f.read())
+ return self.ctx.load_cert_chain(cf, kf)
+ return self.ctx.load_cert_chain(cf)
+ for mode in range(-1, 4):
+ print('a', mode)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ if mode >= 0:
+ ctx = CTX(ctx, mode)
+
+ # Combined key and cert in a single file
+ ctx.load_cert_chain(CERTFILE)
+ ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
+ self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
+ with self.assertRaises(OSError) as cm:
+ ctx.load_cert_chain(WRONGCERT)
+ self.assertEqual(cm.exception.errno, errno.ENOENT)
+ with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+ ctx.load_cert_chain(BADCERT)
+ with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+ ctx.load_cert_chain(EMPTYCERT)
+ # Separate key and cert
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ ctx.load_cert_chain(ONLYCERT, ONLYKEY)
+ ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
+ ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
+ with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+ ctx.load_cert_chain(ONLYCERT)
+ with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+ ctx.load_cert_chain(ONLYKEY)
+ with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+ ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
+ # Mismatching key and cert
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
+ ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
+ # Password protected key and cert
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
+ ctx.load_cert_chain(CERTFILE_PROTECTED,
+ password=bytearray(KEY_PASSWORD.encode()))
+ ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
+ ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
+ ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
+ bytearray(KEY_PASSWORD.encode()))
+ with self.assertRaisesRegex(TypeError, "should be a string"):
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
+ with self.assertRaises(ssl.SSLError):
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
+ with self.assertRaisesRegex(ValueError, "cannot be longer"):
+ # openssl has a fixed limit on the password buffer.
+ # PEM_BUFSIZE is generally set to 1kb.
+ # Return a string larger than this.
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
+ # Password callback
+ def getpass_unicode():
return KEY_PASSWORD
- def getpass(self):
- return KEY_PASSWORD
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
- ctx.load_cert_chain(CERTFILE_PROTECTED,
- password=GetPassCallable().getpass)
- with self.assertRaises(ssl.SSLError):
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
- with self.assertRaisesRegex(ValueError, "cannot be longer"):
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
- with self.assertRaisesRegex(TypeError, "must return a string"):
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
- with self.assertRaisesRegex(Exception, "getpass error"):
- ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
- # Make sure the password function isn't called if it isn't needed
- ctx.load_cert_chain(CERTFILE, password=getpass_exception)
+ def getpass_bytes():
+ return KEY_PASSWORD.encode()
+ def getpass_bytearray():
+ return bytearray(KEY_PASSWORD.encode())
+ def getpass_badpass():
+ return "badpass"
+ def getpass_huge():
+ return b'a' * (1024 * 1024)
+ def getpass_bad_type():
+ return 9
+ def getpass_exception():
+ raise Exception('getpass error')
+ class GetPassCallable:
+ def __call__(self):
+ return KEY_PASSWORD
+ def getpass(self):
+ return KEY_PASSWORD
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
+ ctx.load_cert_chain(CERTFILE_PROTECTED,
+ password=GetPassCallable().getpass)
+ with self.assertRaises(ssl.SSLError):
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
+ with self.assertRaisesRegex(ValueError, "cannot be longer"):
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
+ with self.assertRaisesRegex(TypeError, "must return a string"):
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
+ with self.assertRaisesRegex(Exception, "getpass error"):
+ ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
+ # Make sure the password function isn't called if it isn't needed
+ ctx.load_cert_chain(CERTFILE, password=getpass_exception)
def test_load_verify_locations(self):
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- ctx.load_verify_locations(CERTFILE)
- ctx.load_verify_locations(cafile=CERTFILE, capath=None)
- ctx.load_verify_locations(BYTES_CERTFILE)
- ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
- self.assertRaises(TypeError, ctx.load_verify_locations)
- self.assertRaises(TypeError, ctx.load_verify_locations, None, None)
- with self.assertRaises(OSError) as cm:
- ctx.load_verify_locations(WRONGCERT)
- self.assertEqual(cm.exception.errno, errno.ENOENT)
- with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
- ctx.load_verify_locations(BADCERT)
- ctx.load_verify_locations(CERTFILE, CAPATH)
- ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
+ #try both file and data mode for both arguments, do that by delegation
+ class CTX:
+ def __init__(self, ctx, mode, override=False):
+ self.ctx = ctx
+ self.mode = mode
+ self.override = override
+ def load_verify_locations(self, cafile, capath=None):
+ if self.mode != 0 and not self.override:
+ capath = None # ignore capath when we want to use file-like obj
+ if self.mode == 0:
+ cf = cafile
+ else:
+ with open(cafile, "rb") as f:
+ cf = io.BytesIO(f.read())
+ return self.ctx.load_verify_locations(cf, capath)
+ for mode in range(-1, 2):
+ print('b', mode)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ if mode >= 0:
+ ctx = CTX(ctx, mode)
- # Issue #10989: crash if the second argument type is invalid
- self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
+ ctx.load_verify_locations(CERTFILE)
+ ctx.load_verify_locations(cafile=CERTFILE, capath=None)
+ ctx.load_verify_locations(BYTES_CERTFILE)
+ ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
+ self.assertRaises(TypeError, ctx.load_verify_locations)
+ self.assertRaises(TypeError, ctx.load_verify_locations, None, None)
+ with self.assertRaises(OSError) as cm:
+ ctx.load_verify_locations(WRONGCERT)
+ self.assertEqual(cm.exception.errno, errno.ENOENT)
+ with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+ ctx.load_verify_locations(BADCERT)
+ ctx.load_verify_locations(CERTFILE, CAPATH)
+ ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
+
+ # Issue #10989: crash if the second argument type is invalid
+ self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
+
+ # assert that capath can't be speficied with a file like object
+ ctx = CTX(ssl.SSLContext(ssl.PROTOCOL_TLSv1), 1, override=True)
+ self.assertRaises(TypeError, ctx.load_verify_locations, CERTFILE, BYTES_CAPATH)
+
def test_load_dh_params(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
@@ -792,6 +843,17 @@
finally:
s.close()
+ def test_connect_listcert(self):
+ global SVN_PYTHON_ORG_ROOT_CERT
+ with open(SVN_PYTHON_ORG_ROOT_CERT, "rb") as f:
+ data = [f.read()]
+ old = SVN_PYTHON_ORG_ROOT_CERT
+ SVN_PYTHON_ORG_ROOT_CERT = data
+ try:
+ self.test_connect()
+ finally:
+ SVN_PYTHON_ORG_ROOT_CERT = old
+
def test_connect_ex(self):
# Issue #11326: check connect_ex() implementation
with support.transient_internet("svn.python.org"):
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -247,6 +247,23 @@
#define ERRSTR1(x,y,z) (x ":" y ": " z)
#define ERRSTR(x) ERRSTR1("_ssl.c", STRINGIFY2(__LINE__), x)
+/* Forward declare some custom SSL functions */
+static int
+PySSL_CTX_use_PrivateKey_mem(SSL_CTX *ctx, void *data, int len);
+static int
+PySSL_CTX_use_certificate_chain_mem(SSL_CTX *ctx, void *data, int len);
+static int
+PySSL_CTX_load_verify_certs_mem(SSL_CTX *ctx, void *data, int len);
+static int
+PySSL_UseCertificateChainFromFile(SSL_CTX *ctx,
+ struct _PySSLPasswordInfo *pw_info, int file_like, Py_buffer *buf, PyObject *file_bytes);
+static int
+PySSL_UsePrivateKeyFromFile(SSL_CTX *ctx,
+ struct _PySSLPasswordInfo *pw_info, int file_like, Py_buffer *buf, PyObject *file_bytes);
+static int
+PrepareFile(PyObject *file, const char *fname, Py_buffer *buf, PyObject **file_bytes);
+static void
+UnprepareFile(int file_like, Py_buffer *buf, PyObject *file_bytes);
/*
* SSL errors.
@@ -2026,7 +2043,7 @@
return 0;
}
-typedef struct {
+typedef struct _PySSLPasswordInfo{
PyThreadState *thread_state;
PyObject *callable;
char *password;
@@ -2133,11 +2150,12 @@
{
char *kwlist[] = {"certfile", "keyfile", "password", NULL};
PyObject *certfile, *keyfile = NULL, *password = NULL;
- PyObject *certfile_bytes = NULL, *keyfile_bytes = NULL;
pem_password_cb *orig_passwd_cb = self->ctx->default_passwd_callback;
void *orig_passwd_userdata = self->ctx->default_passwd_callback_userdata;
_PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 };
- int r;
+ int c_file_like;
+ Py_buffer c_buf;
+ PyObject *c_bytes;
errno = 0;
ERR_clear_error();
@@ -2147,16 +2165,6 @@
return NULL;
if (keyfile == Py_None)
keyfile = NULL;
- if (!PyUnicode_FSConverter(certfile, &certfile_bytes)) {
- PyErr_SetString(PyExc_TypeError,
- "certfile should be a valid filesystem path");
- return NULL;
- }
- if (keyfile && !PyUnicode_FSConverter(keyfile, &keyfile_bytes)) {
- PyErr_SetString(PyExc_TypeError,
- "keyfile should be a valid filesystem path");
- goto error;
- }
if (password && password != Py_None) {
if (PyCallable_Check(password)) {
pw_info.callable = password;
@@ -2167,63 +2175,40 @@
SSL_CTX_set_default_passwd_cb(self->ctx, _password_callback);
SSL_CTX_set_default_passwd_cb_userdata(self->ctx, &pw_info);
}
- PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state);
- r = SSL_CTX_use_certificate_chain_file(self->ctx,
- PyBytes_AS_STRING(certfile_bytes));
- PySSL_END_ALLOW_THREADS_S(pw_info.thread_state);
- if (r != 1) {
- if (pw_info.error) {
- ERR_clear_error();
- /* the password callback has already set the error information */
- }
- else if (errno != 0) {
- ERR_clear_error();
- PyErr_SetFromErrno(PyExc_IOError);
- }
- else {
- _setSSLError(NULL, 0, __FILE__, __LINE__);
- }
+ c_file_like = PrepareFile(certfile, "certfile", &c_buf, &c_bytes);
+ if (c_file_like == -1)
goto error;
+ if (PySSL_UseCertificateChainFromFile(self->ctx, &pw_info, c_file_like,
+ &c_buf, c_bytes))
+ goto error;
+ if (keyfile) {
+ int fail, k_file_like;
+ Py_buffer k_buf;
+ PyObject *k_bytes;
+ k_file_like = PrepareFile(keyfile, "keyfile", &k_buf, &k_bytes);
+ if (k_file_like == -1)
+ goto error;
+ fail = PySSL_UsePrivateKeyFromFile(self->ctx, &pw_info, k_file_like,
+ &k_buf, k_bytes);
+ UnprepareFile(k_file_like, &k_buf, k_bytes);
+ if (fail)
+ goto error;
}
- PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state);
- r = SSL_CTX_use_PrivateKey_file(self->ctx,
- PyBytes_AS_STRING(keyfile ? keyfile_bytes : certfile_bytes),
- SSL_FILETYPE_PEM);
- PySSL_END_ALLOW_THREADS_S(pw_info.thread_state);
- Py_CLEAR(keyfile_bytes);
- Py_CLEAR(certfile_bytes);
- if (r != 1) {
- if (pw_info.error) {
- ERR_clear_error();
- /* the password callback has already set the error information */
- }
- else if (errno != 0) {
- ERR_clear_error();
- PyErr_SetFromErrno(PyExc_IOError);
- }
- else {
- _setSSLError(NULL, 0, __FILE__, __LINE__);
- }
- goto error;
- }
- PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state);
- r = SSL_CTX_check_private_key(self->ctx);
- PySSL_END_ALLOW_THREADS_S(pw_info.thread_state);
- if (r != 1) {
- _setSSLError(NULL, 0, __FILE__, __LINE__);
- goto error;
+ else {
+ if (PySSL_UsePrivateKeyFromFile(self->ctx, &pw_info, c_file_like, &c_buf, c_bytes))
+ goto error;
}
SSL_CTX_set_default_passwd_cb(self->ctx, orig_passwd_cb);
SSL_CTX_set_default_passwd_cb_userdata(self->ctx, orig_passwd_userdata);
free(pw_info.password);
+ UnprepareFile(c_file_like, &c_buf, c_bytes);
Py_RETURN_NONE;
error:
SSL_CTX_set_default_passwd_cb(self->ctx, orig_passwd_cb);
SSL_CTX_set_default_passwd_cb_userdata(self->ctx, orig_passwd_userdata);
free(pw_info.password);
- Py_XDECREF(keyfile_bytes);
- Py_XDECREF(certfile_bytes);
+ UnprepareFile(c_file_like, &c_buf, c_bytes);
return NULL;
}
@@ -2234,6 +2219,8 @@
PyObject *cafile = NULL, *capath = NULL;
PyObject *cafile_bytes = NULL, *capath_bytes = NULL;
const char *cafile_buf = NULL, *capath_buf = NULL;
+ Py_buffer cadata;
+ int file_like;
int r;
errno = 0;
@@ -2250,28 +2237,40 @@
"cafile and capath cannot be both omitted");
return NULL;
}
- if (cafile && !PyUnicode_FSConverter(cafile, &cafile_bytes)) {
- PyErr_SetString(PyExc_TypeError,
- "cafile should be a valid filesystem path");
- return NULL;
+ if (cafile) {
+ file_like = PrepareFile(cafile, "cafile", &cadata, &cafile_bytes);
+ if (file_like < 0)
+ return NULL;
}
- if (capath && !PyUnicode_FSConverter(capath, &capath_bytes)) {
- Py_XDECREF(cafile_bytes);
- PyErr_SetString(PyExc_TypeError,
- "capath should be a valid filesystem path");
- return NULL;
+ if (capath) {
+ if (file_like) {
+ PyErr_SetString(PyExc_TypeError,
+ "capath cannot be specified when cafile is a file-like object");
+ UnprepareFile(file_like, &cadata, cafile_bytes);
+ return NULL;
+ }
+ if (!PyUnicode_FSConverter(capath, &capath_bytes)) {
+ Py_XDECREF(cafile_bytes);
+ PyErr_SetString(PyExc_TypeError,
+ "capath should be a valid filesystem path");
+ UnprepareFile(file_like, &cadata, cafile_bytes);
+ return NULL;
+ }
}
- if (cafile)
+ if (cafile && !file_like)
cafile_buf = PyBytes_AS_STRING(cafile_bytes);
if (capath)
capath_buf = PyBytes_AS_STRING(capath_bytes);
PySSL_BEGIN_ALLOW_THREADS
- r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf);
+ if (!file_like)
+ r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf);
+ else
+ r = PySSL_CTX_load_verify_certs_mem(self->ctx, cadata.buf, (int)cadata.len);
PySSL_END_ALLOW_THREADS
- Py_XDECREF(cafile_bytes);
+ UnprepareFile(file_like, &cadata, cafile_bytes);
Py_XDECREF(capath_bytes);
if (r != 1) {
- if (errno != 0) {
+ if (!file_like && errno != 0) {
ERR_clear_error();
PyErr_SetFromErrno(PyExc_IOError);
}
@@ -3479,3 +3478,243 @@
return m;
}
+
+/* Custom additions to read certificates and keyfiles from memory rather than files */
+/* this one based on SSL_CTX_use_PrivateKey_file */
+static int
+PySSL_CTX_use_PrivateKey_mem(SSL_CTX *ctx, void *data, int len)
+{
+ int ret=0;
+ BIO *in;
+ EVP_PKEY *pkey=NULL;
+
+ in=BIO_new_mem_buf(data,len);
+ if (in == NULL)
+ {
+ SSLerr(SSL_F_SSL_CTX_USE_PRIVATEKEY_FILE,ERR_R_BUF_LIB);
+ goto end;
+ }
+
+ pkey=PEM_read_bio_PrivateKey(in,NULL,
+ ctx->default_passwd_callback,ctx->default_passwd_callback_userdata);
+ if (pkey == NULL)
+ {
+ SSLerr(SSL_F_SSL_CTX_USE_PRIVATEKEY_FILE,ERR_R_PEM_LIB);
+ goto end;
+ }
+ ret=SSL_CTX_use_PrivateKey(ctx,pkey);
+ EVP_PKEY_free(pkey);
+end:
+ if (in != NULL) BIO_free(in);
+ return(ret);
+}
+
+static int
+PySSL_CTX_use_certificate_chain_mem(SSL_CTX *ctx, void *data, int len)
+ {
+ BIO *in;
+ int ret=0;
+ X509 *x=NULL;
+
+ ERR_clear_error(); /* clear error stack for SSL_CTX_use_certificate() */
+
+ in=BIO_new_mem_buf(data,len);
+ if (in == NULL)
+ {
+ SSLerr(SSL_F_SSL_CTX_USE_CERTIFICATE_CHAIN_FILE,ERR_R_BUF_LIB);
+ goto end;
+ }
+
+ x=PEM_read_bio_X509(in,NULL,ctx->default_passwd_callback,ctx->default_passwd_callback_userdata);
+ if (x == NULL)
+ {
+ SSLerr(SSL_F_SSL_CTX_USE_CERTIFICATE_CHAIN_FILE,ERR_R_PEM_LIB);
+ goto end;
+ }
+
+ ret=SSL_CTX_use_certificate(ctx,x);
+ if (ERR_peek_error() != 0)
+ ret = 0; /* Key/certificate mismatch doesn't imply ret==0 ... */
+ if (ret)
+ {
+ /* If we could set up our certificate, now proceed to
+ * the CA certificates.
+ */
+ X509 *ca;
+ int r;
+ unsigned long err;
+
+ if (ctx->extra_certs != NULL)
+ {
+ sk_X509_pop_free(ctx->extra_certs, X509_free);
+ ctx->extra_certs = NULL;
+ }
+
+ while ((ca = PEM_read_bio_X509(in,NULL,ctx->default_passwd_callback,ctx->default_passwd_callback_userdata))
+ != NULL)
+ {
+ r = SSL_CTX_add_extra_chain_cert(ctx, ca);
+ if (!r)
+ {
+ X509_free(ca);
+ ret = 0;
+ goto end;
+ }
+ /* Note that we must not free r if it was successfully
+ * added to the chain (while we must free the main
+ * certificate, since its reference count is increased
+ * by SSL_CTX_use_certificate). */
+ }
+ /* When the while loop ends, it's usually just EOF. */
+ err = ERR_peek_last_error();
+ if (ERR_GET_LIB(err) == ERR_LIB_PEM && ERR_GET_REASON(err) == PEM_R_NO_START_LINE)
+ ERR_clear_error();
+ else
+ ret = 0; /* some real error */
+ }
+
+end:
+ if (x != NULL) X509_free(x);
+ if (in != NULL) BIO_free(in);
+ return(ret);
+}
+
+/* this one is custom, based on info from the net */
+static int
+PySSL_CTX_load_verify_certs_mem(SSL_CTX *ctx, void *data, int len)
+{
+ BIO *in;
+ int ret=1, err;
+ X509 *ca=NULL;
+
+ in=BIO_new_mem_buf(data,len);
+ if (in == NULL)
+ {
+ SSLerr(SSL_F_SSL_CTX_USE_CERTIFICATE_CHAIN_FILE,ERR_R_BUF_LIB);
+ goto end;
+ }
+
+ while ((ca = PEM_read_bio_X509(in,NULL,ctx->default_passwd_callback,ctx->default_passwd_callback_userdata))
+ != NULL)
+ {
+ ret = X509_STORE_add_cert(SSL_CTX_get_cert_store(ctx), ca);
+ X509_free(ca);
+ if (!ret) {
+ /* allow a certificate to be specified multiple times */
+ err = ERR_peek_last_error();
+ if (ERR_GET_REASON(err) == X509_R_CERT_ALREADY_IN_HASH_TABLE) {
+ ERR_clear_error();
+ continue;
+ }
+ goto end;
+ }
+ }
+ /* When the while loop ends, it's usually just EOF. */
+ err = ERR_peek_last_error();
+ if (ERR_GET_LIB(err) == ERR_LIB_PEM && ERR_GET_REASON(err) == PEM_R_NO_START_LINE) {
+ ERR_clear_error();
+ ret = 1;
+ } else {
+ /* compatibility with the "file" reading function which overwrites
+ * the true error with a generic one
+ */
+ if (ERR_GET_REASON(err) == PEM_R_BAD_BASE64_DECODE)
+ X509err(X509_F_X509_LOAD_CERT_FILE,
+ ERR_R_PEM_LIB);
+ ret = 0; /* some real error */
+ }
+
+end:
+ if (in != NULL) BIO_free(in);
+ return(ret);
+}
+
+/* a function to help us share code for reading PrivateKey and certificate_chain */
+static int PySSL_UseGenericFromFile(SSL_CTX *ctx, _PySSLPasswordInfo *pw_info,
+ int file_like, Py_buffer *buf, PyObject *file_bytes,
+ int (*memfunc)(SSL_CTX *, void *, int),
+ int (*filefunc)(SSL_CTX *, char*))
+{
+ int r;
+ char *file_str;
+ if (!file_like)
+ file_str = PyBytes_AS_STRING(file_bytes);
+ PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state);
+ if (file_like)
+ r = memfunc(ctx, buf->buf, (int)buf->len);
+ else
+ r = filefunc(ctx, file_str);
+ PySSL_END_ALLOW_THREADS_S(pw_info->thread_state);
+ if (r != 1) {
+ if (pw_info->error) {
+ ERR_clear_error();
+ /* the password callback has already set the error information */
+ }
+ else if (!file_like && errno != 0) {
+ ERR_clear_error();
+ PyErr_SetFromErrno(PyExc_IOError);
+ }
+ else {
+ _setSSLError(NULL, 0, __FILE__, __LINE__);
+ }
+ return -1;
+ }
+ return 0;
+}
+
+/* thunker to pass the third argument */
+static int thunk_pem(SSL_CTX *ctx, const char *file)
+{
+ return SSL_CTX_use_PrivateKey_file(ctx, file, SSL_FILETYPE_PEM);
+}
+
+static int PySSL_UsePrivateKeyFromFile(SSL_CTX *ctx, _PySSLPasswordInfo *pw_info,
+ int file_like, Py_buffer *buf, PyObject *file_bytes)
+{
+ return PySSL_UseGenericFromFile(ctx, pw_info, file_like, buf, file_bytes,
+ &PySSL_CTX_use_PrivateKey_mem, &thunk_pem);
+}
+
+static int PySSL_UseCertificateChainFromFile(SSL_CTX *ctx, _PySSLPasswordInfo *pw_info,
+ int file_like, Py_buffer *buf, PyObject *file_bytes)
+{
+ return PySSL_UseGenericFromFile(ctx, pw_info, file_like, buf, file_bytes,
+ &PySSL_CTX_use_certificate_chain_mem, &SSL_CTX_use_certificate_chain_file);
+}
+
+/* decode a file as either a file-like object or a file system path.
+ * return 1 if the data has been read into buf, 0 if the file_bytes is valid,
+ * and -1 on error.
+ */
+static int PrepareFile(PyObject *file, const char *fname, Py_buffer *buf,
+ PyObject **file_bytes)
+{
+ int result;
+ PyObject *data = PyObject_CallMethod(file, "read", "");
+ *file_bytes = NULL;
+ if (data) {
+ result = PyObject_GetBuffer(data, buf, PyBUF_SIMPLE);
+ Py_DECREF(data);
+ return result ? -1 : 1; /* return -1 or 1 */
+ }
+ if (!PyErr_ExceptionMatches(PyExc_AttributeError))
+ return -1;
+ /* no read method, we assume. So this was not a file-like obj */
+ PyErr_Clear();
+
+ if (!PyUnicode_FSConverter(file, file_bytes)) {
+ PyErr_Format(PyExc_TypeError,
+ "%s should be a file-like object or a valid filesystem path", fname);
+ return -1;
+ }
+ return 0;
+}
+
+/* release any references claimed in the previous function */
+static void UnprepareFile(int file_like, Py_buffer *buf, PyObject *file_bytes)
+{
+ if (file_like == 1)
+ PyBuffer_Release(buf);
+ if (file_like == 0)
+ Py_DECREF(file_bytes);
+}
\ No newline at end of file