Python zeep soap request signature fails: cannot load cert

808 Views Asked by At

I am trying to make a SOAP request using Python zeep, which requires a sha256 signature with RSA 2048. For some reason I am unable to load the signature, and I get 'signature fails: cannot load cert' error upon trying to send the request.

sample code bellow:

from zeep import Client as cl
from requests import Session
from zeep.transports import Transport
from zeep.plugins import HistoryPlugin
from lxml import etree
import zeep
from zeep.wsse.signature import Signature
import uuid
import OpenSSL
import base64
from zeep.wsse import utils
from datetime import datetime, timedelta


# Helper Class
class SingnatureOverride(Signature):
    def apply(self, envelope, headers):
        security = utils.get_security_header(envelope)

        created = datetime.utcnow()
        expired = created + timedelta(seconds=1 * 60)

        timestamp = utils.WSU('Timestamp')
        timestamp.append(utils.WSU('Created', created.replace(microsecond=0).isoformat()+'Z'))
        timestamp.append(utils.WSU('Expires', expired.replace(microsecond=0).isoformat()+'Z'))

        security.append(timestamp)

        super().apply(envelope, headers)
        return envelope, headers

# Override response verification and skip response verification for now...
# Zeep does not supprt Signature verification with different certificate...
# Ref. https://github.com/mvantellingen/python-zeep/pull/822/  "Add support for different signing and verification certificates #822"
    def verify(self, envelope):
        return envelope


# Funtion to generete ecrtificates and keys
def create_csr(common_name, country=None, state=None, city=None,
               organization=None, organizational_unit=None,
               email_address=None):

    key = OpenSSL.crypto.PKey()
    key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)

    req = OpenSSL.crypto.X509()
    req.get_subject().CN = common_name
    if organizational_unit:
        req.get_subject().OU = organizational_unit
    if organization:
        req.get_subject().O = organization
    if city:
        req.get_subject().L = city
    if state:
        req.get_subject().ST = state
    if country:
        req.get_subject().C = country
    if email_address:
        req.get_subject().emailAddress = email_address

    req.set_pubkey(key)
    req.sign(key, 'sha256')

    private_key = OpenSSL.crypto.dump_privatekey(
        OpenSSL.crypto.FILETYPE_PEM, key)
    public_key = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_PEM, key)

    csr = OpenSSL.crypto.dump_certificate(
               OpenSSL.crypto.FILETYPE_PEM, req)
    
    out_keys = {
        "private key" : private_key,
        "public key": public_key,
        "certificate": csr,
        "reqest": req
    }
    return out_keys


certificate = create_csr(common_name="somename",
                         organizational_unit="unit",
                         organization="unit",
                         city="test",
                         state="test",
                         country="GB"
                         )

# Write certificate and keys to files
with open('cert.pem', "wb") as f:
    f.write(certificate['certificate'])

with open('public.pem', "wb") as f:
    f.write(certificate['public key'])

with open('private.pem', "wb") as f:
    f.write(certificate['private key'])



# Create Client
history = HistoryPlugin()
session = Session()
session.verify = False  # For some reason SSL validation fails and I had to ignore it. This is unacceptable and needs to be addressed, but it is outside of the scope of this question.
transport = Transport(session=session)
client = cl(url, transport=transport, plugins=[history],
            wsse=SingnatureOverride("private.pem", "cert.pem"))

# url
url="https://stest.bankconnect.dk/2019/04/04/services/CorporateService?wsdl"

# build headers
now = datetime.now()
unique_id = uuid.uuid4().hex
headers = {
        'serviceHeader': {
        'organisationIdentification':  {
            'mainRegistrationNumber': "randint",
            'isoCountryCode': 'GB',
            
        },
        'format': 'ISO20022',
        'functionIdentification': "randint",
        'erpInformation': 'randstring',
        'endToEndMessageId': unique_id,
        'createDateTime': now.strftime("%Y-%m-%dT%H:%M:%S"),
        
    },
    }

# Send request
# InternalError: (-1, 'cannot load cert') here
client.service.getCustomerStatement(_soapheaders=headers, )

# debug
sent = etree.tostring(history.last_sent["envelope"], encoding="unicode", pretty_print=True)
received = etree.tostring(history.last_received["envelope"], encoding="unicode", pretty_print=True)

print(sent)
print(received)

I had to anonymize some confidential information, but if it is critical to solving the issue, please feel free to ask for details.

Edit: Added a test url

Edit2: added correct headers for easier replicability

1

There are 1 best solutions below

1
On BEST ANSWER

I think I figured it out. Adding the following lines to the 'create_csr' function, right before signing the certificate seems to solve the issue. It resolves the specific 'load cert' issue and the request is sent, but the signature is still regarded as invalid by the server, so I am not sure if the helper function generates an invalid certificate or this is a server issue.

def create_csr(*args):
....
    req.set_serial_number(1000)
    req.gmtime_adj_notBefore(0)
    req.gmtime_adj_notAfter(10*365*24*60*60)
    req.set_issuer(req.get_subject())

    req.sign(key, 'sha256')
....