# Copyright (c) Microsoft Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import threading from pathlib import Path from typing import Dict, Generator, Optional, cast import OpenSSL.crypto import OpenSSL.SSL import pytest from twisted.internet import reactor as _twisted_reactor from twisted.internet import ssl from twisted.internet.selectreactor import SelectReactor from twisted.web import resource, server from twisted.web.http import Request from playwright.async_api import Browser, BrowserType, Playwright, expect ssl.optionsForClientTLS reactor = cast(SelectReactor, _twisted_reactor) @pytest.fixture(scope="function", autouse=True) def _skip_webkit_darwin(browser_name: str) -> None: if browser_name == "webkit" and sys.platform == "darwin": pytest.skip("WebKit does not proxy localhost on macOS") class HttpsResource(resource.Resource): serverCertificate: ssl.PrivateCertificate isLeaf = True def _verify_cert_chain(self, cert: Optional[OpenSSL.crypto.X509]) -> bool: if not cert: return False store = OpenSSL.crypto.X509Store() store.add_cert(self.serverCertificate.original) store_ctx = OpenSSL.crypto.X509StoreContext(store, cert) try: store_ctx.verify_certificate() return True except OpenSSL.crypto.X509StoreContextError: return False def render_GET(self, request: Request) -> bytes: tls_socket: OpenSSL.SSL.Connection = request.transport.getHandle() # type: ignore cert = tls_socket.get_peer_certificate() parts = [] if self._verify_cert_chain(cert): request.setResponseCode(200) parts.append( { "key": "message", "value": f"Hello {cert.get_subject().CN}, your certificate was issued by {cert.get_issuer().CN}!", # type: ignore } ) elif cert and cert.get_subject(): request.setResponseCode(403) parts.append( { "key": "message", "value": f"Sorry {cert.get_subject().CN}, certificates from {cert.get_issuer().CN} are not welcome here.", } ) else: request.setResponseCode(401) parts.append( { "key": "message", "value": "Sorry, but you need to provide a client certificate to continue.", } ) return b"".join( [ f'
{part["value"]}
'.encode() for part in parts ] ) @pytest.fixture(scope="session", autouse=True) def _client_certificate_server(assetdir: Path) -> Generator[None, None, None]: certAuthCert = ssl.Certificate.loadPEM( (assetdir / "client-certificates/server/server_cert.pem").read_text() ) serverCert = ssl.PrivateCertificate.loadPEM( (assetdir / "client-certificates/server/server_key.pem").read_text() + (assetdir / "client-certificates/server/server_cert.pem").read_text() ) contextFactory = serverCert.options(certAuthCert) contextFactory.requireCertificate = False resource = HttpsResource() resource.serverCertificate = serverCert site = server.Site(resource) def _run() -> None: reactor.listenSSL(8000, site, contextFactory) thread = threading.Thread(target=_run) thread.start() yield thread.join() async def test_should_throw_with_untrusted_client_certs( playwright: Playwright, assetdir: Path ) -> None: serverURL = "https://localhost:8000/" request = await playwright.request.new_context( # TODO: Remove this once we can pass a custom CA. ignore_https_errors=True, client_certificates=[ { "origin": serverURL, "certPath": assetdir / "client-certificates/client/self-signed/cert.pem", "keyPath": assetdir / "client-certificates/client/self-signed/key.pem", } ], ) with pytest.raises(Exception, match="alert unknown ca"): await request.get(serverURL) await request.dispose() async def test_should_work_with_new_context(browser: Browser, assetdir: Path) -> None: context = await browser.new_context( # TODO: Remove this once we can pass a custom CA. ignore_https_errors=True, client_certificates=[ { "origin": "https://127.0.0.1:8000", "certPath": assetdir / "client-certificates/client/trusted/cert.pem", "keyPath": assetdir / "client-certificates/client/trusted/key.pem", } ], ) page = await context.new_page() await page.goto("https://localhost:8000") await expect(page.get_by_test_id("message")).to_have_text( "Sorry, but you need to provide a client certificate to continue." ) await page.goto("https://127.0.0.1:8000") await expect(page.get_by_test_id("message")).to_have_text( "Hello Alice, your certificate was issued by localhost!" ) response = await page.context.request.get("https://localhost:8000") assert ( "Sorry, but you need to provide a client certificate to continue." in await response.text() ) response = await page.context.request.get("https://127.0.0.1:8000") assert ( "Hello Alice, your certificate was issued by localhost!" in await response.text() ) await context.close() async def test_should_work_with_new_persistent_context( browser_type: BrowserType, assetdir: Path, launch_arguments: Dict ) -> None: context = await browser_type.launch_persistent_context( "", **launch_arguments, # TODO: Remove this once we can pass a custom CA. ignore_https_errors=True, client_certificates=[ { "origin": "https://127.0.0.1:8000", "certPath": assetdir / "client-certificates/client/trusted/cert.pem", "keyPath": assetdir / "client-certificates/client/trusted/key.pem", } ], ) page = await context.new_page() await page.goto("https://localhost:8000") await expect(page.get_by_test_id("message")).to_have_text( "Sorry, but you need to provide a client certificate to continue." ) await page.goto("https://127.0.0.1:8000") await expect(page.get_by_test_id("message")).to_have_text( "Hello Alice, your certificate was issued by localhost!" ) await context.close() async def test_should_work_with_global_api_request_context( playwright: Playwright, assetdir: Path ) -> None: request = await playwright.request.new_context( # TODO: Remove this once we can pass a custom CA. ignore_https_errors=True, client_certificates=[ { "origin": "https://127.0.0.1:8000", "certPath": assetdir / "client-certificates/client/trusted/cert.pem", "keyPath": assetdir / "client-certificates/client/trusted/key.pem", } ], ) response = await request.get("https://localhost:8000") assert ( "Sorry, but you need to provide a client certificate to continue." in await response.text() ) response = await request.get("https://127.0.0.1:8000") assert ( "Hello Alice, your certificate was issued by localhost!" in await response.text() ) await request.dispose()