mirror of
https://forge.fsky.io/oneflux/omegafox.git
synced 2026-02-11 04:42:05 -08:00
227 lines
7.9 KiB
Text
227 lines
7.9 KiB
Text
# Copyright (c) Microsoft Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import sys
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Dict, Generator, Optional, cast
|
|
|
|
import OpenSSL.crypto
|
|
import OpenSSL.SSL
|
|
import pytest
|
|
from twisted.internet import reactor as _twisted_reactor
|
|
from twisted.internet import ssl
|
|
from twisted.internet.selectreactor import SelectReactor
|
|
from twisted.web import resource, server
|
|
from twisted.web.http import Request
|
|
|
|
from playwright.async_api import Browser, BrowserType, Playwright, expect
|
|
|
|
ssl.optionsForClientTLS
|
|
reactor = cast(SelectReactor, _twisted_reactor)
|
|
|
|
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def _skip_webkit_darwin(browser_name: str) -> None:
|
|
if browser_name == "webkit" and sys.platform == "darwin":
|
|
pytest.skip("WebKit does not proxy localhost on macOS")
|
|
|
|
|
|
class HttpsResource(resource.Resource):
|
|
serverCertificate: ssl.PrivateCertificate
|
|
isLeaf = True
|
|
|
|
def _verify_cert_chain(self, cert: Optional[OpenSSL.crypto.X509]) -> bool:
|
|
if not cert:
|
|
return False
|
|
store = OpenSSL.crypto.X509Store()
|
|
store.add_cert(self.serverCertificate.original)
|
|
store_ctx = OpenSSL.crypto.X509StoreContext(store, cert)
|
|
try:
|
|
store_ctx.verify_certificate()
|
|
return True
|
|
except OpenSSL.crypto.X509StoreContextError:
|
|
return False
|
|
|
|
def render_GET(self, request: Request) -> bytes:
|
|
tls_socket: OpenSSL.SSL.Connection = request.transport.getHandle() # type: ignore
|
|
cert = tls_socket.get_peer_certificate()
|
|
parts = []
|
|
|
|
if self._verify_cert_chain(cert):
|
|
request.setResponseCode(200)
|
|
parts.append(
|
|
{
|
|
"key": "message",
|
|
"value": f"Hello {cert.get_subject().CN}, your certificate was issued by {cert.get_issuer().CN}!", # type: ignore
|
|
}
|
|
)
|
|
elif cert and cert.get_subject():
|
|
request.setResponseCode(403)
|
|
parts.append(
|
|
{
|
|
"key": "message",
|
|
"value": f"Sorry {cert.get_subject().CN}, certificates from {cert.get_issuer().CN} are not welcome here.",
|
|
}
|
|
)
|
|
else:
|
|
request.setResponseCode(401)
|
|
parts.append(
|
|
{
|
|
"key": "message",
|
|
"value": "Sorry, but you need to provide a client certificate to continue.",
|
|
}
|
|
)
|
|
return b"".join(
|
|
[
|
|
f'<div data-testid="{part["key"]}">{part["value"]}</div>'.encode()
|
|
for part in parts
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def _client_certificate_server(assetdir: Path) -> Generator[None, None, None]:
|
|
certAuthCert = ssl.Certificate.loadPEM(
|
|
(assetdir / "client-certificates/server/server_cert.pem").read_text()
|
|
)
|
|
serverCert = ssl.PrivateCertificate.loadPEM(
|
|
(assetdir / "client-certificates/server/server_key.pem").read_text()
|
|
+ (assetdir / "client-certificates/server/server_cert.pem").read_text()
|
|
)
|
|
|
|
contextFactory = serverCert.options(certAuthCert)
|
|
contextFactory.requireCertificate = False
|
|
resource = HttpsResource()
|
|
resource.serverCertificate = serverCert
|
|
site = server.Site(resource)
|
|
|
|
def _run() -> None:
|
|
reactor.listenSSL(8000, site, contextFactory)
|
|
|
|
thread = threading.Thread(target=_run)
|
|
thread.start()
|
|
yield
|
|
thread.join()
|
|
|
|
|
|
async def test_should_throw_with_untrusted_client_certs(
|
|
playwright: Playwright, assetdir: Path
|
|
) -> None:
|
|
serverURL = "https://localhost:8000/"
|
|
request = await playwright.request.new_context(
|
|
# TODO: Remove this once we can pass a custom CA.
|
|
ignore_https_errors=True,
|
|
client_certificates=[
|
|
{
|
|
"origin": serverURL,
|
|
"certPath": assetdir
|
|
/ "client-certificates/client/self-signed/cert.pem",
|
|
"keyPath": assetdir / "client-certificates/client/self-signed/key.pem",
|
|
}
|
|
],
|
|
)
|
|
with pytest.raises(Exception, match="alert unknown ca"):
|
|
await request.get(serverURL)
|
|
await request.dispose()
|
|
|
|
|
|
async def test_should_work_with_new_context(browser: Browser, assetdir: Path) -> None:
|
|
context = await browser.new_context(
|
|
# TODO: Remove this once we can pass a custom CA.
|
|
ignore_https_errors=True,
|
|
client_certificates=[
|
|
{
|
|
"origin": "https://127.0.0.1:8000",
|
|
"certPath": assetdir / "client-certificates/client/trusted/cert.pem",
|
|
"keyPath": assetdir / "client-certificates/client/trusted/key.pem",
|
|
}
|
|
],
|
|
)
|
|
page = await context.new_page()
|
|
await page.goto("https://localhost:8000")
|
|
await expect(page.get_by_test_id("message")).to_have_text(
|
|
"Sorry, but you need to provide a client certificate to continue."
|
|
)
|
|
await page.goto("https://127.0.0.1:8000")
|
|
await expect(page.get_by_test_id("message")).to_have_text(
|
|
"Hello Alice, your certificate was issued by localhost!"
|
|
)
|
|
|
|
response = await page.context.request.get("https://localhost:8000")
|
|
assert (
|
|
"Sorry, but you need to provide a client certificate to continue."
|
|
in await response.text()
|
|
)
|
|
response = await page.context.request.get("https://127.0.0.1:8000")
|
|
assert (
|
|
"Hello Alice, your certificate was issued by localhost!"
|
|
in await response.text()
|
|
)
|
|
await context.close()
|
|
|
|
|
|
async def test_should_work_with_new_persistent_context(
|
|
browser_type: BrowserType, assetdir: Path, launch_arguments: Dict
|
|
) -> None:
|
|
context = await browser_type.launch_persistent_context(
|
|
"",
|
|
**launch_arguments,
|
|
# TODO: Remove this once we can pass a custom CA.
|
|
ignore_https_errors=True,
|
|
client_certificates=[
|
|
{
|
|
"origin": "https://127.0.0.1:8000",
|
|
"certPath": assetdir / "client-certificates/client/trusted/cert.pem",
|
|
"keyPath": assetdir / "client-certificates/client/trusted/key.pem",
|
|
}
|
|
],
|
|
)
|
|
page = await context.new_page()
|
|
await page.goto("https://localhost:8000")
|
|
await expect(page.get_by_test_id("message")).to_have_text(
|
|
"Sorry, but you need to provide a client certificate to continue."
|
|
)
|
|
await page.goto("https://127.0.0.1:8000")
|
|
await expect(page.get_by_test_id("message")).to_have_text(
|
|
"Hello Alice, your certificate was issued by localhost!"
|
|
)
|
|
await context.close()
|
|
|
|
|
|
async def test_should_work_with_global_api_request_context(
|
|
playwright: Playwright, assetdir: Path
|
|
) -> None:
|
|
request = await playwright.request.new_context(
|
|
# TODO: Remove this once we can pass a custom CA.
|
|
ignore_https_errors=True,
|
|
client_certificates=[
|
|
{
|
|
"origin": "https://127.0.0.1:8000",
|
|
"certPath": assetdir / "client-certificates/client/trusted/cert.pem",
|
|
"keyPath": assetdir / "client-certificates/client/trusted/key.pem",
|
|
}
|
|
],
|
|
)
|
|
response = await request.get("https://localhost:8000")
|
|
assert (
|
|
"Sorry, but you need to provide a client certificate to continue."
|
|
in await response.text()
|
|
)
|
|
response = await request.get("https://127.0.0.1:8000")
|
|
assert (
|
|
"Hello Alice, your certificate was issued by localhost!"
|
|
in await response.text()
|
|
)
|
|
await request.dispose()
|