Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Lib/ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
import _ssl # if we can't import it, let the error propagate

from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION
from _ssl import _SSLContext, MemoryBIO, SSLSession
from _ssl import _SSLContext, MemoryBIO, SSLSession, Certificate, PrivateKey
from _ssl import (
SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError,
SSLSyscallError, SSLEOFError, SSLCertVerificationError
Expand Down
18 changes: 18 additions & 0 deletions Lib/test/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,24 @@ def getpass(self):
# Make sure the password function isn't called if it isn't needed
ctx.load_cert_chain(CERTFILE, password=getpass_exception)

def test_load_cert_privkey(self):
chain = ssl.Certificate.chain_from_file(ONLYCERT)
self.assertEqual(len(chain), 1)
cas = ssl.Certificate.bundle_from_file(SIGNING_CA)
self.assertEqual(len(cas), 1)
pkey = ssl.PrivateKey.from_file(ONLYKEY)

ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(chain, pkey)
ctx.load_verify_locations(cadata=cas)
self.assertEqual(len(ctx.get_ca_certs()), 1)

pkey = ssl.PrivateKey.from_file(
ONLYKEY_PROTECTED, password=KEY_PASSWORD
)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(chain, pkey)

def test_load_verify_locations(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_verify_locations(CERTFILE)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Certificate and PrivateKey classes were added to the ssl module.
Certificates and keys can now be loaded from memory buffer, too.
217 changes: 207 additions & 10 deletions Modules/_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,25 @@ typedef struct {
PySSLContext *ctx;
} PySSLSession;

<<<<<<< HEAD
||||||| parent of 27501a5ce86 (Work on trust store)
static PyTypeObject *PySSLContext_Type;
static PyTypeObject *PySSLSocket_Type;
static PyTypeObject *PySSLMemoryBIO_Type;
static PyTypeObject *PySSLSession_Type;
static PyTypeObject *PySSLPrivateKey_Type;
static PyTypeObject *PySSLCertificate_Type;

=======
static PyTypeObject *PySSLContext_Type;
static PyTypeObject *PySSLSocket_Type;
static PyTypeObject *PySSLMemoryBIO_Type;
static PyTypeObject *PySSLSession_Type;
static PyTypeObject *PySSLPrivateKey_Type;
static PyTypeObject *PySSLCertificate_Type;
static PyTypeObject *PySSLTrustStore_Type;

>>>>>>> 27501a5ce86 (Work on trust store)
static inline _PySSLError _PySSL_errno(int failed, const SSL *ssl, int retcode)
{
_PySSLError err = { 0 };
Expand Down Expand Up @@ -1692,6 +1711,11 @@ _certificate_to_der(_sslmodulestate *state, X509 *certificate)
return retval;
}

#include "_ssl/misc.c"
#include "_ssl/cert.c"
#include "_ssl/pkey.c"
#include "_ssl/truststore.c"

/*[clinic input]
_ssl._test_decode_cert
path: object(converter="PyUnicode_FSConverter")
Expand Down Expand Up @@ -1784,6 +1808,26 @@ _ssl__SSLSocket_getpeercert_impl(PySSLSocket *self, int binary_mode)
return result;
}

#if OPENSSL_VERSION_1_1
/*[clinic input]
_ssl._SSLSocket.verified_chain

[clinic start generated code]*/

static PyObject *
_ssl__SSLSocket_verified_chain_impl(PySSLSocket *self)
/*[clinic end generated code: output=3068b09c06b0a1eb input=d49eda1fa8963989]*/
{
STACK_OF(X509) *chain;
/* borrowed reference */
chain = SSL_get0_verified_chain(self->ssl);
if (chain == NULL) {
Py_RETURN_NONE;
}
return _PySSL_CertificateFromX509Stack(chain, 1);
}
#endif

static PyObject *
cipher_to_tuple(const SSL_CIPHER *cipher)
{
Expand Down Expand Up @@ -2799,6 +2843,7 @@ static PyMethodDef PySSLMethods[] = {
_SSL__SSLSOCKET_COMPRESSION_METHODDEF
_SSL__SSLSOCKET_SHUTDOWN_METHODDEF
_SSL__SSLSOCKET_VERIFY_CLIENT_POST_HANDSHAKE_METHODDEF
_SSL__SSLSOCKET_VERIFIED_CHAIN_METHODDEF
{NULL, NULL}
};

Expand Down Expand Up @@ -3521,9 +3566,9 @@ typedef struct {
int error;
} _PySSLPasswordInfo;

static int
_pwinfo_set(_PySSLPasswordInfo *pw_info, PyObject* password,
const char *bad_type_error)
int
PySSL_pwinfo_set(PySSLPasswordInfo *pw_info, PyObject* password,
const char *bad_type_error)
{
/* Set the password and size fields of a _PySSLPasswordInfo struct
from a unicode, bytes, or byte array object.
Expand Down Expand Up @@ -3575,13 +3620,14 @@ _pwinfo_set(_PySSLPasswordInfo *pw_info, PyObject* password,
return 0;
}

static int
_password_callback(char *buf, int size, int rwflag, void *userdata)
int
PySSL_password_cb(char *buf, int size, int rwflag, void *userdata)
{
_PySSLPasswordInfo *pw_info = (_PySSLPasswordInfo*) userdata;
PySSLPasswordInfo *pw_info = (PySSLPasswordInfo*) userdata;
PyObject *fn_ret = NULL;

PySSL_END_ALLOW_THREADS_S(pw_info->thread_state);
assert(pw_info);

if (pw_info->error) {
/* already failed previously. OpenSSL 3.0.0-alpha14 invokes the
Expand All @@ -3598,7 +3644,7 @@ _password_callback(char *buf, int size, int rwflag, void *userdata)
goto error;
}

if (!_pwinfo_set(pw_info, fn_ret,
if (!PySSL_pwinfo_set(pw_info, fn_ret,
"password callback must return a string")) {
goto error;
}
Expand All @@ -3622,6 +3668,105 @@ _password_callback(char *buf, int size, int rwflag, void *userdata)
return -1;
}


static PyObject *
_load_cert_privkey(PySSLContext *self, PyObject *certs, PyObject *key)
{
PyObject *seq = NULL;
Py_ssize_t i;
int ret;

/* Let's start picky and only accept tuples */
if (!PyTuple_Check(certs)) {
PyErr_Format(
PyExc_TypeError,
"Expected a tuple of Certificates, got '%.200s'.",
Py_TYPE(certs)->tp_name
);
return NULL;
}
if (Py_TYPE(key) != PySSLPrivateKey_Type) {
PyErr_Format(
PyExc_TypeError,
"Expected PrivateKey, got '%.200s'.",
Py_TYPE(key)->tp_name
);
return NULL;
}

seq = PySequence_Fast(certs, "expected a tuple of Certificates");
if (seq == NULL)
return NULL;
if (PySequence_Fast_GET_SIZE(seq) == 0) {
PyErr_SetString(PyExc_ValueError, "cert list is empty");
goto error;
}

/* validate certs */
for (i = 0; i < PySequence_Fast_GET_SIZE(seq); i++) {
PyObject *ob = PySequence_Fast_GET_ITEM(seq, i);
if (ob == NULL) {
goto error;
}
if (Py_TYPE(ob) != PySSLCertificate_Type) {
PyErr_Format(
PyExc_TypeError,
"Expected Certificate, got '%.200s'.",
Py_TYPE(ob)->tp_name
);
goto error;
}
}
/* add certs */
for (i = 0; i < PySequence_Fast_GET_SIZE(seq); i++) {
/* borrowed reference */
PySSLCertificate *cert = (PySSLCertificate *)PySequence_Fast_GET_ITEM(seq, i);
if (cert == NULL) {
goto error;
}
if (i == 0) {
PySSL_BEGIN_ALLOW_THREADS
ret = SSL_CTX_use_certificate(self->ctx, cert->cert);
PySSL_END_ALLOW_THREADS
if (ERR_peek_error() != 0) {
_setSSLError(NULL, 0, __FILE__, __LINE__);
goto error;
}
ret = SSL_CTX_clear_chain_certs(self->ctx);
if (ret == 0) {
_setSSLError(NULL, 0, __FILE__, __LINE__);
goto error;
}
}
else {
/* certs 1..n are chain certs */
PySSL_BEGIN_ALLOW_THREADS
ret = SSL_CTX_add1_chain_cert(self->ctx, cert->cert);
PySSL_END_ALLOW_THREADS
if (ret == 0) {
_setSSLError(NULL, 0, __FILE__, __LINE__);
goto error;
}
}
}
/* add and verify private key */
PySSL_BEGIN_ALLOW_THREADS
ret = SSL_CTX_use_PrivateKey(self->ctx, ((PySSLPrivateKey*)key)->pkey);
if (ret == 1) {
/* OK, now match key to cert */
ret = SSL_CTX_check_private_key(self->ctx);
}
PySSL_END_ALLOW_THREADS
if (ret != 1) {
_setSSLError(NULL, 0, __FILE__, __LINE__);
goto error;
}
Py_RETURN_NONE;
error:
Py_DECREF(seq);
return NULL;
}

/*[clinic input]
_ssl._SSLContext.load_cert_chain
certfile: object
Expand All @@ -3638,13 +3783,21 @@ _ssl__SSLContext_load_cert_chain_impl(PySSLContext *self, PyObject *certfile,
PyObject *certfile_bytes = NULL, *keyfile_bytes = NULL;
pem_password_cb *orig_passwd_cb = SSL_CTX_get_default_passwd_cb(self->ctx);
void *orig_passwd_userdata = SSL_CTX_get_default_passwd_cb_userdata(self->ctx);
_PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 };
PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 };
int r;

errno = 0;
ERR_clear_error();
if (keyfile == Py_None)
keyfile = NULL;
if ((keyfile != NULL) && (Py_TYPE(keyfile) == PySSLPrivateKey_Type)) {
if (password && password != Py_None) {
PyErr_SetString(PyExc_ValueError,
"Cannot use password arg with PrivateKey");
return NULL;
}
return _load_cert_privkey(self, certfile, keyfile);
}
if (!PyUnicode_FSConverter(certfile, &certfile_bytes)) {
if (PyErr_ExceptionMatches(PyExc_TypeError)) {
PyErr_SetString(PyExc_TypeError,
Expand All @@ -3662,11 +3815,11 @@ _ssl__SSLContext_load_cert_chain_impl(PySSLContext *self, PyObject *certfile,
if (password != Py_None) {
if (PyCallable_Check(password)) {
pw_info.callable = password;
} else if (!_pwinfo_set(&pw_info, password,
} else if (!PySSL_pwinfo_set(&pw_info, password,
"password should be a string or callable")) {
goto error;
}
SSL_CTX_set_default_passwd_cb(self->ctx, _password_callback);
SSL_CTX_set_default_passwd_cb(self->ctx, PySSL_password_cb);
SSL_CTX_set_default_passwd_cb_userdata(self->ctx, &pw_info);
}
PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state);
Expand Down Expand Up @@ -3831,6 +3984,7 @@ _ssl__SSLContext_load_verify_locations_impl(PySSLContext *self,
/*[clinic end generated code: output=454c7e41230ca551 input=42ecfe258233e194]*/
{
PyObject *cafile_bytes = NULL, *capath_bytes = NULL;
PyObject *seq = NULL;
const char *cafile_buf = NULL, *capath_buf = NULL;
int r = 0, ok = 1;

Expand Down Expand Up @@ -3899,6 +4053,48 @@ _ssl__SSLContext_load_verify_locations_impl(PySSLContext *self,
goto error;
}
}
else if (PyList_Check(cadata)) {
/* List of Certificates instances */
X509_STORE *store;
Py_ssize_t i;

seq = PySequence_Fast(cadata, "expected a tuple of Certificates");
if (seq == NULL)
goto error;
if (PySequence_Fast_GET_SIZE(seq) == 0) {
PyErr_SetString(PyExc_ValueError,
"cadata certificate list is empty");
goto error;
}
store = SSL_CTX_get_cert_store(self->ctx);
/* validate and add certs */
for (i = 0; i < PySequence_Fast_GET_SIZE(seq); i++) {
PySSLCertificate *ob = (PySSLCertificate *)PySequence_Fast_GET_ITEM(seq, i);
if (ob == NULL) {
goto error;
}
if (Py_TYPE(ob) != PySSLCertificate_Type) {
PyErr_Format(
PyExc_TypeError,
"Expected Certificate in cadata, got '%.200s'.",
Py_TYPE(ob)->tp_name
);
goto error;
}
r = X509_STORE_add_cert(store, ob->cert);
if (!r) {
int err = ERR_peek_last_error();
if ((ERR_GET_LIB(err) == ERR_LIB_X509) &&
(ERR_GET_REASON(err) == X509_R_CERT_ALREADY_IN_HASH_TABLE)) {
/* cert already in hash table, not an error */
ERR_clear_error();
} else {
_setSSLError(NULL, 0, __FILE__, __LINE__);
goto error;
}
}
}
}
else {
invalid_cadata:
PyErr_SetString(PyExc_TypeError,
Expand Down Expand Up @@ -3935,6 +4131,7 @@ _ssl__SSLContext_load_verify_locations_impl(PySSLContext *self,
end:
Py_XDECREF(cafile_bytes);
Py_XDECREF(capath_bytes);
Py_XDECREF(seq);
if (ok) {
Py_RETURN_NONE;
} else {
Expand Down
Loading