# Copyright (c) Microsoft Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import os import re import shutil import sys from pathlib import Path from typing import Any import pytest from flaky import flaky from playwright._impl._path_utils import get_file_dirname from playwright.async_api import Error, FilePayload, Page from tests.server import Server from tests.utils import chromium_version_less_than, must _dirname = get_file_dirname() FILE_TO_UPLOAD = _dirname / ".." / "assets/file-to-upload.txt" @pytest.mark.skip(reason="Not supported by Camoufox") async def test_should_upload_the_file(page: Page, server: Server) -> None: await page.goto(server.PREFIX + "/input/fileupload.html") file_path = os.path.relpath(FILE_TO_UPLOAD, os.getcwd()) input = await page.query_selector("input") assert input await input.set_input_files(file_path) assert await page.evaluate("e => e.files[0].name", input) == "file-to-upload.txt" assert ( await page.evaluate( """e => { reader = new FileReader() promise = new Promise(fulfill => reader.onload = fulfill) reader.readAsText(e.files[0]) return promise.then(() => reader.result) }""", input, ) == "contents of the file\n" ) async def test_should_work(page: Page, assetdir: Path) -> None: await page.set_content("") await page.set_input_files("input", assetdir / "file-to-upload.txt") assert await page.eval_on_selector("input", "input => input.files.length") == 1 assert ( await page.eval_on_selector("input", "input => input.files[0].name") == "file-to-upload.txt" ) async def test_should_set_from_memory(page: Page) -> None: await page.set_content("") file: FilePayload = { "name": "test.txt", "mimeType": "text/plain", "buffer": b"this is a test", } await page.set_input_files( "input", files=[file], ) assert await page.eval_on_selector("input", "input => input.files.length") == 1 assert await page.eval_on_selector("input", "input => input.files[0].name") == "test.txt" async def test_should_emit_event(page: Page) -> None: await page.set_content("") fc_done: asyncio.Future = asyncio.Future() page.once("filechooser", lambda file_chooser: fc_done.set_result(file_chooser)) await page.click("input") file_chooser = await fc_done assert file_chooser assert ( repr(file_chooser) == f"" ) async def test_should_work_when_file_input_is_attached_to_dom(page: Page) -> None: await page.set_content("") async with page.expect_file_chooser() as fc_info: await page.click("input") file_chooser = await fc_info.value assert file_chooser async def test_should_work_when_file_input_is_not_attached_to_DOM(page: Page) -> None: async with page.expect_file_chooser() as fc_info: await page.evaluate( """() => { el = document.createElement('input') el.type = 'file' el.click() }""" ) file_chooser = await fc_info.value assert file_chooser async def test_should_return_the_same_file_chooser_when_there_are_many_watchdogs_simultaneously( page: Page, ) -> None: await page.set_content("") results = await asyncio.gather( page.wait_for_event("filechooser"), page.wait_for_event("filechooser"), page.eval_on_selector("input", "input => input.click()"), ) assert results[0] == results[1] @pytest.mark.skip(reason="Not supported by Camoufox") async def test_should_accept_single_file(page: Page) -> None: await page.set_content('') async with page.expect_file_chooser() as fc_info: await page.click("input") file_chooser = await fc_info.value assert file_chooser.page == page assert file_chooser.element await file_chooser.set_files(FILE_TO_UPLOAD) assert await page.eval_on_selector("input", "input => input.files.length") == 1 assert ( await page.eval_on_selector("input", "input => input.files[0].name") == "file-to-upload.txt" ) @pytest.mark.skip(reason="Not supported by Camoufox") async def test_should_be_able_to_read_selected_file(page: Page) -> None: page.once("filechooser", lambda file_chooser: file_chooser.set_files(FILE_TO_UPLOAD)) await page.set_content("") content = await page.eval_on_selector( "input", """async picker => { picker.click(); await new Promise(x => picker.oninput = x); const reader = new FileReader(); const promise = new Promise(fulfill => reader.onload = fulfill); reader.readAsText(picker.files[0]); return promise.then(() => reader.result); }""", ) assert content == "contents of the file\n" @pytest.mark.skip(reason="Not supported by Camoufox") async def test_should_be_able_to_reset_selected_files_with_empty_file_list( page: Page, ) -> None: await page.set_content("") page.once("filechooser", lambda file_chooser: file_chooser.set_files(FILE_TO_UPLOAD)) file_length = 0 async with page.expect_file_chooser(): file_length = await page.eval_on_selector( "input", """async picker => { picker.click(); await new Promise(x => picker.oninput = x); return picker.files.length; }""", ) assert file_length == 1 page.once("filechooser", lambda file_chooser: file_chooser.set_files([])) async with page.expect_file_chooser(): file_length = await page.eval_on_selector( "input", """async picker => { picker.click(); await new Promise(x => picker.oninput = x); return picker.files.length; }""", ) assert file_length == 0 @pytest.mark.skip(reason="Not supported by Camoufox") async def test_should_not_accept_multiple_files_for_single_file_input( page: Page, assetdir: Path ) -> None: await page.set_content("") async with page.expect_file_chooser() as fc_info: await page.click("input") file_chooser = await fc_info.value with pytest.raises(Exception) as exc_info: await file_chooser.set_files( [ os.path.realpath(assetdir / "file-to-upload.txt"), os.path.realpath(assetdir / "pptr.png"), ] ) assert exc_info.value @pytest.mark.skip(reason="Not supported by Camoufox") async def test_should_emit_input_and_change_events(page: Page) -> None: events = [] await page.expose_function("eventHandled", lambda e: events.append(e)) await page.set_content( """ """ ) await must(await page.query_selector("input")).set_input_files(FILE_TO_UPLOAD) assert len(events) == 2 assert events[0]["type"] == "input" assert events[1]["type"] == "change" async def test_should_work_for_single_file_pick(page: Page) -> None: await page.set_content("") async with page.expect_file_chooser() as fc_info: await page.click("input") file_chooser = await fc_info.value assert file_chooser.is_multiple() is False async def test_should_work_for_multiple(page: Page) -> None: await page.set_content("") async with page.expect_file_chooser() as fc_info: await page.click("input") file_chooser = await fc_info.value assert file_chooser.is_multiple() async def test_should_work_for_webkitdirectory(page: Page) -> None: await page.set_content("") async with page.expect_file_chooser() as fc_info: await page.click("input") file_chooser = await fc_info.value assert file_chooser.is_multiple() def _assert_wheel_event(expected: Any, received: Any, browser_name: str) -> None: # Chromium reports deltaX/deltaY scaled by host device scale factor. # https://bugs.chromium.org/p/chromium/issues/detail?id=1324819 # https://github.com/microsoft/playwright/issues/7362 # Different bots have different scale factors (usually 1 or 2), so we just ignore the values # instead of guessing the host scale factor. if sys.platform == "darwin" and browser_name == "chromium": del expected["deltaX"] del expected["deltaY"] del received["deltaX"] del received["deltaY"] assert received == expected @pytest.mark.skip(reason="Not supported by Camoufox") async def test_wheel_should_work(page: Page, browser_name: str) -> None: await page.set_content( """
""" ) await page.mouse.move(50, 60) await _listen_for_wheel_events(page, "div") await page.mouse.wheel(0, 100) _assert_wheel_event( await page.evaluate("window.lastEvent"), { "deltaX": 0, "deltaY": 100, "clientX": 50, "clientY": 60, "deltaMode": 0, "ctrlKey": False, "shiftKey": False, "altKey": False, "metaKey": False, }, browser_name, ) await page.wait_for_function("window.scrollY === 100") async def _listen_for_wheel_events(page: Page, selector: str) -> None: await page.evaluate( """ selector => { document.querySelector(selector).addEventListener('wheel', (e) => { window['lastEvent'] = { deltaX: e.deltaX, deltaY: e.deltaY, clientX: e.clientX, clientY: e.clientY, deltaMode: e.deltaMode, ctrlKey: e.ctrlKey, shiftKey: e.shiftKey, altKey: e.altKey, metaKey: e.metaKey, }; }, { passive: false }); } """, selector, ) @pytest.mark.skip(reason="Not supported by Camoufox") @flaky async def test_should_upload_large_file(page: Page, server: Server, tmp_path: Path) -> None: await page.goto(server.PREFIX + "/input/fileupload.html") large_file_path = tmp_path / "200MB.zip" data = b"A" * 1024 with large_file_path.open("wb") as f: for i in range(0, 200 * 1024 * 1024, len(data)): f.write(data) input = page.locator('input[type="file"]') events = await input.evaluate_handle( """ e => { const events = []; e.addEventListener('input', () => events.push('input')); e.addEventListener('change', () => events.push('change')); return events; } """ ) await input.set_input_files(large_file_path) assert await input.evaluate("e => e.files[0].name") == "200MB.zip" assert await events.evaluate("e => e") == ["input", "change"] [request, _] = await asyncio.gather( server.wait_for_request("/upload"), page.click("input[type=submit]"), ) contents = request.args[b"file1"][0] assert len(contents) == 200 * 1024 * 1024 assert contents[:1024] == data # flake8: noqa: E203 assert contents[len(contents) - 1024 :] == data assert request.post_body match = re.search( rb'^.*Content-Disposition: form-data; name="(?P.*)"; filename="(?P.*)".*$', request.post_body, re.MULTILINE, ) assert match assert match.group("name") == b"file1" assert match.group("filename") == b"200MB.zip" async def test_set_input_files_should_preserve_last_modified_timestamp( page: Page, assetdir: Path, ) -> None: await page.set_content("") input = page.locator("input") files = ["file-to-upload.txt", "file-to-upload-2.txt"] await input.set_input_files([assetdir / file for file in files]) assert await input.evaluate("input => [...input.files].map(f => f.name)") == files timestamps = await input.evaluate("input => [...input.files].map(f => f.lastModified)") expected_timestamps = [os.path.getmtime(assetdir / file) * 1000 for file in files] # On Linux browser sometimes reduces the timestamp by 1ms: 1696272058110.0715 -> 1696272058109 or even # rounds it to seconds in WebKit: 1696272058110 -> 1696272058000. for i in range(len(timestamps)): assert abs(timestamps[i] - expected_timestamps[i]) < 1000 @flaky async def test_should_upload_multiple_large_file( page: Page, server: Server, tmp_path: Path ) -> None: files_count = 10 await page.goto(server.PREFIX + "/input/fileupload-multi.html") upload_file = tmp_path / "50MB_1.zip" data = b"A" * 1024 with upload_file.open("wb") as f: # 49 is close to the actual limit for i in range(0, 49 * 1024): f.write(data) input = page.locator('input[type="file"]') upload_files = [upload_file] for i in range(2, files_count + 1): dst_file = tmp_path / f"50MB_{i}.zip" shutil.copy(upload_file, dst_file) upload_files.append(dst_file) async with page.expect_file_chooser() as fc_info: await input.click() file_chooser = await fc_info.value await file_chooser.set_files(upload_files) files_len = await page.evaluate('document.getElementsByTagName("input")[0].files.length') assert file_chooser.is_multiple() assert files_len == files_count for path in upload_files: path.unlink() @pytest.mark.skip(reason="Not supported by Camoufox") async def test_should_upload_a_folder( page: Page, server: Server, tmp_path: Path, browser_name: str, browser_version: str, headless: bool, ) -> None: await page.goto(server.PREFIX + "/input/folderupload.html") input = await page.query_selector("input") assert input dir = tmp_path / "file-upload-test" dir.mkdir() (dir / "file1.txt").write_text("file1 content") (dir / "file2").write_text("file2 content") (dir / "sub-dir").mkdir() (dir / "sub-dir" / "really.txt").write_text("sub-dir file content") await input.set_input_files(dir) assert set(await input.evaluate("e => [...e.files].map(f => f.webkitRelativePath)")) == set( [ "file-upload-test/file1.txt", "file-upload-test/file2", # https://issues.chromium.org/issues/345393164 *( [] if browser_name == "chromium" and headless and chromium_version_less_than(browser_version, "127.0.6533.0") else ["file-upload-test/sub-dir/really.txt"] ), ] ) webkit_relative_paths = await input.evaluate("e => [...e.files].map(f => f.webkitRelativePath)") for i, webkit_relative_path in enumerate(webkit_relative_paths): content = await input.evaluate( """(e, i) => { const reader = new FileReader(); const promise = new Promise(fulfill => reader.onload = fulfill); reader.readAsText(e.files[i]); return promise.then(() => reader.result); }""", i, ) assert content == (dir / ".." / webkit_relative_path).read_text() async def test_should_upload_a_folder_and_throw_for_multiple_directories( page: Page, server: Server, tmp_path: Path ) -> None: await page.goto(server.PREFIX + "/input/folderupload.html") input = page.locator("input") dir = tmp_path / "file-upload-test" dir.mkdir() (dir / "folder1").mkdir() (dir / "folder1" / "file1.txt").write_text("file1 content") (dir / "folder2").mkdir() (dir / "folder2" / "file2.txt").write_text("file2 content") with pytest.raises(Error) as exc_info: await input.set_input_files([dir / "folder1", dir / "folder2"]) assert "Multiple directories are not supported" in exc_info.value.message async def test_should_throw_if_a_directory_and_files_are_passed( page: Page, server: Server, tmp_path: Path ) -> None: await page.goto(server.PREFIX + "/input/folderupload.html") input = page.locator("input") dir = tmp_path / "file-upload-test" dir.mkdir() (dir / "file1.txt").write_text("file1 content") with pytest.raises(Error) as exc_info: await input.set_input_files([dir, dir / "file1.txt"]) assert "File paths must be all files or a single directory" in exc_info.value.message async def test_should_throw_when_upload_a_folder_in_a_normal_file_upload_input( page: Page, server: Server, tmp_path: Path ) -> None: await page.goto(server.PREFIX + "/input/fileupload.html") input = await page.query_selector("input") assert input dir = tmp_path / "file-upload-test" dir.mkdir() (dir / "file1.txt").write_text("file1 content") with pytest.raises(Error) as exc_info: await input.set_input_files(dir) assert ( "File input does not support directories, pass individual files instead" in exc_info.value.message )