KoD 0.6
-Nuova ricerca globale -migliorie prestazionali in generale -fix vari ai server
This commit is contained in:
@@ -66,22 +66,18 @@ def _cf_string_to_unicode(value):
|
||||
value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
|
||||
|
||||
string = CoreFoundation.CFStringGetCStringPtr(
|
||||
value_as_void_p,
|
||||
CFConst.kCFStringEncodingUTF8
|
||||
value_as_void_p, CFConst.kCFStringEncodingUTF8
|
||||
)
|
||||
if string is None:
|
||||
buffer = ctypes.create_string_buffer(1024)
|
||||
result = CoreFoundation.CFStringGetCString(
|
||||
value_as_void_p,
|
||||
buffer,
|
||||
1024,
|
||||
CFConst.kCFStringEncodingUTF8
|
||||
value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8
|
||||
)
|
||||
if not result:
|
||||
raise OSError('Error copying C string from CFStringRef')
|
||||
raise OSError("Error copying C string from CFStringRef")
|
||||
string = buffer.value
|
||||
if string is not None:
|
||||
string = string.decode('utf-8')
|
||||
string = string.decode("utf-8")
|
||||
return string
|
||||
|
||||
|
||||
@@ -97,8 +93,8 @@ def _assert_no_error(error, exception_class=None):
|
||||
output = _cf_string_to_unicode(cf_error_string)
|
||||
CoreFoundation.CFRelease(cf_error_string)
|
||||
|
||||
if output is None or output == u'':
|
||||
output = u'OSStatus %s' % error
|
||||
if output is None or output == u"":
|
||||
output = u"OSStatus %s" % error
|
||||
|
||||
if exception_class is None:
|
||||
exception_class = ssl.SSLError
|
||||
@@ -111,9 +107,11 @@ def _cert_array_from_pem(pem_bundle):
|
||||
Given a bundle of certs in PEM format, turns them into a CFArray of certs
|
||||
that can be used to validate a cert chain.
|
||||
"""
|
||||
# Normalize the PEM bundle's line endings.
|
||||
pem_bundle = pem_bundle.replace(b"\r\n", b"\n")
|
||||
|
||||
der_certs = [
|
||||
base64.b64decode(match.group(1))
|
||||
for match in _PEM_CERTS_RE.finditer(pem_bundle)
|
||||
base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle)
|
||||
]
|
||||
if not der_certs:
|
||||
raise ssl.SSLError("No root certificates specified")
|
||||
@@ -121,7 +119,7 @@ def _cert_array_from_pem(pem_bundle):
|
||||
cert_array = CoreFoundation.CFArrayCreateMutable(
|
||||
CoreFoundation.kCFAllocatorDefault,
|
||||
0,
|
||||
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks)
|
||||
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
|
||||
)
|
||||
if not cert_array:
|
||||
raise ssl.SSLError("Unable to allocate memory!")
|
||||
@@ -183,21 +181,16 @@ def _temporary_keychain():
|
||||
# some random bytes to password-protect the keychain we're creating, so we
|
||||
# ask for 40 random bytes.
|
||||
random_bytes = os.urandom(40)
|
||||
filename = base64.b64encode(random_bytes[:8]).decode('utf-8')
|
||||
password = base64.b64encode(random_bytes[8:]) # Must be valid UTF-8
|
||||
filename = base64.b16encode(random_bytes[:8]).decode("utf-8")
|
||||
password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8
|
||||
tempdirectory = tempfile.mkdtemp()
|
||||
|
||||
keychain_path = os.path.join(tempdirectory, filename).encode('utf-8')
|
||||
keychain_path = os.path.join(tempdirectory, filename).encode("utf-8")
|
||||
|
||||
# We now want to create the keychain itself.
|
||||
keychain = Security.SecKeychainRef()
|
||||
status = Security.SecKeychainCreate(
|
||||
keychain_path,
|
||||
len(password),
|
||||
password,
|
||||
False,
|
||||
None,
|
||||
ctypes.byref(keychain)
|
||||
keychain_path, len(password), password, False, None, ctypes.byref(keychain)
|
||||
)
|
||||
_assert_no_error(status)
|
||||
|
||||
@@ -216,14 +209,12 @@ def _load_items_from_file(keychain, path):
|
||||
identities = []
|
||||
result_array = None
|
||||
|
||||
with open(path, 'rb') as f:
|
||||
with open(path, "rb") as f:
|
||||
raw_filedata = f.read()
|
||||
|
||||
try:
|
||||
filedata = CoreFoundation.CFDataCreate(
|
||||
CoreFoundation.kCFAllocatorDefault,
|
||||
raw_filedata,
|
||||
len(raw_filedata)
|
||||
CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata)
|
||||
)
|
||||
result_array = CoreFoundation.CFArrayRef()
|
||||
result = Security.SecItemImport(
|
||||
@@ -234,7 +225,7 @@ def _load_items_from_file(keychain, path):
|
||||
0, # import flags
|
||||
None, # key params, can include passphrase in the future
|
||||
keychain, # The keychain to insert into
|
||||
ctypes.byref(result_array) # Results
|
||||
ctypes.byref(result_array), # Results
|
||||
)
|
||||
_assert_no_error(result)
|
||||
|
||||
@@ -244,9 +235,7 @@ def _load_items_from_file(keychain, path):
|
||||
# keychain already has them!
|
||||
result_count = CoreFoundation.CFArrayGetCount(result_array)
|
||||
for index in range(result_count):
|
||||
item = CoreFoundation.CFArrayGetValueAtIndex(
|
||||
result_array, index
|
||||
)
|
||||
item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index)
|
||||
item = ctypes.cast(item, CoreFoundation.CFTypeRef)
|
||||
|
||||
if _is_cert(item):
|
||||
@@ -304,9 +293,7 @@ def _load_client_cert_chain(keychain, *paths):
|
||||
|
||||
try:
|
||||
for file_path in paths:
|
||||
new_identities, new_certs = _load_items_from_file(
|
||||
keychain, file_path
|
||||
)
|
||||
new_identities, new_certs = _load_items_from_file(keychain, file_path)
|
||||
identities.extend(new_identities)
|
||||
certificates.extend(new_certs)
|
||||
|
||||
@@ -315,9 +302,7 @@ def _load_client_cert_chain(keychain, *paths):
|
||||
if not identities:
|
||||
new_identity = Security.SecIdentityRef()
|
||||
status = Security.SecIdentityCreateWithCertificate(
|
||||
keychain,
|
||||
certificates[0],
|
||||
ctypes.byref(new_identity)
|
||||
keychain, certificates[0], ctypes.byref(new_identity)
|
||||
)
|
||||
_assert_no_error(status)
|
||||
identities.append(new_identity)
|
||||
|
||||
Reference in New Issue
Block a user