# Copyright (c) Microsoft Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import datetime from typing import Any, AsyncGenerator, List import pytest from playwright.async_api import Error, Page from tests.server import Server @pytest.fixture(autouse=True) async def calls(page: Page) -> List[Any]: calls: List[Any] = [] await page.expose_function("stub", lambda *args: calls.append(list(args))) return calls class TestRunFor: @pytest.fixture(autouse=True) async def before_each(self, page: Page) -> AsyncGenerator[None, None]: await page.clock.install(time=0) await page.clock.pause_at(1000) yield async def test_run_for_triggers_immediately_without_specified_delay( self, page: Page, calls: List[Any] ) -> None: await page.evaluate("setTimeout(window.stub)") await page.clock.run_for(0) assert len(calls) == 1 async def test_run_for_does_not_trigger_without_sufficient_delay( self, page: Page, calls: List[Any] ) -> None: await page.evaluate("setTimeout(window.stub, 100)") await page.clock.run_for(10) assert len(calls) == 0 async def test_run_for_triggers_after_sufficient_delay( self, page: Page, calls: List[Any] ) -> None: await page.evaluate("setTimeout(window.stub, 100)") await page.clock.run_for(100) assert len(calls) == 1 async def test_run_for_triggers_simultaneous_timers(self, page: Page, calls: List[Any]) -> None: await page.evaluate("setTimeout(window.stub, 100); setTimeout(window.stub, 100)") await page.clock.run_for(100) assert len(calls) == 2 async def test_run_for_triggers_multiple_simultaneous_timers( self, page: Page, calls: List[Any] ) -> None: await page.evaluate( "setTimeout(window.stub, 100); setTimeout(window.stub, 100); setTimeout(window.stub, 99); setTimeout(window.stub, 100)" ) await page.clock.run_for(100) assert len(calls) == 4 async def test_run_for_waits_after_setTimeout_was_called( self, page: Page, calls: List[Any] ) -> None: await page.evaluate("setTimeout(window.stub, 150)") await page.clock.run_for(50) assert len(calls) == 0 await page.clock.run_for(100) assert len(calls) == 1 async def test_run_for_triggers_event_when_some_throw( self, page: Page, calls: List[Any] ) -> None: await page.evaluate( "setTimeout(() => { throw new Error(); }, 100); setTimeout(window.stub, 120)" ) with pytest.raises(Error): await page.clock.run_for(120) assert len(calls) == 1 async def test_run_for_creates_updated_Date_while_ticking( self, page: Page, calls: List[Any] ) -> None: await page.clock.set_system_time(0) await page.evaluate("setInterval(() => { window.stub(new Date().getTime()); }, 10)") await page.clock.run_for(100) assert calls == [ [10], [20], [30], [40], [50], [60], [70], [80], [90], [100], ] async def test_run_for_passes_8_seconds(self, page: Page, calls: List[Any]) -> None: await page.evaluate("setInterval(window.stub, 4000)") await page.clock.run_for("08") assert len(calls) == 2 async def test_run_for_passes_1_minute(self, page: Page, calls: List[Any]) -> None: await page.evaluate("setInterval(window.stub, 6000)") await page.clock.run_for("01:00") assert len(calls) == 10 async def test_run_for_passes_2_hours_34_minutes_and_10_seconds( self, page: Page, calls: List[Any] ) -> None: await page.evaluate("setInterval(window.stub, 10000)") await page.clock.run_for("02:34:10") assert len(calls) == 925 async def test_run_for_throws_for_invalid_format(self, page: Page, calls: List[Any]) -> None: await page.evaluate("setInterval(window.stub, 10000)") with pytest.raises(Error): await page.clock.run_for("12:02:34:10") assert len(calls) == 0 async def test_run_for_returns_the_current_now_value(self, page: Page) -> None: await page.clock.set_system_time(0) value = 200 await page.clock.run_for(value) assert await page.evaluate("Date.now()") == value class TestFastForward: @pytest.fixture(autouse=True) async def before_each(self, page: Page) -> AsyncGenerator[None, None]: await page.clock.install(time=0) await page.clock.pause_at(1) yield async def test_ignores_timers_which_wouldnt_be_run(self, page: Page, calls: List[Any]) -> None: await page.evaluate("setTimeout(() => { window.stub('should not be logged'); }, 1000)") await page.clock.fast_forward(500) assert len(calls) == 0 async def test_pushes_back_execution_time_for_skipped_timers( self, page: Page, calls: List[Any] ) -> None: await page.evaluate("setTimeout(() => { window.stub(Date.now()); }, 1000)") await page.clock.fast_forward(2000) assert calls == [[1000 + 2000]] async def test_supports_string_time_arguments(self, page: Page, calls: List[Any]) -> None: await page.evaluate( "setTimeout(() => { window.stub(Date.now()); }, 100000)" ) # 100000 = 1:40 await page.clock.fast_forward("01:50") assert calls == [[1000 + 110000]] class TestStubTimers: @pytest.fixture(autouse=True) async def before_each(self, page: Page) -> AsyncGenerator[None, None]: await page.clock.install(time=0) await page.clock.pause_at(1) yield async def test_sets_initial_timestamp(self, page: Page) -> None: await page.clock.set_system_time(1.4) assert await page.evaluate("Date.now()") == 1400 async def test_replaces_global_setTimeout(self, page: Page, calls: List[Any]) -> None: await page.evaluate("setTimeout(window.stub, 1000)") await page.clock.run_for(1000) assert len(calls) == 1 async def test_global_fake_setTimeout_should_return_id(self, page: Page) -> None: to = await page.evaluate("setTimeout(window.stub, 1000)") assert isinstance(to, int) async def test_replaces_global_clearTimeout(self, page: Page, calls: List[Any]) -> None: await page.evaluate( """ const to = setTimeout(window.stub, 1000); clearTimeout(to); """ ) await page.clock.run_for(1000) assert len(calls) == 0 async def test_replaces_global_setInterval(self, page: Page, calls: List[Any]) -> None: await page.evaluate("setInterval(window.stub, 500)") await page.clock.run_for(1000) assert len(calls) == 2 async def test_replaces_global_clearInterval(self, page: Page, calls: List[Any]) -> None: await page.evaluate( """ const to = setInterval(window.stub, 500); clearInterval(to); """ ) await page.clock.run_for(1000) assert len(calls) == 0 async def test_replaces_global_performance_now(self, page: Page) -> None: promise = asyncio.create_task( page.evaluate( """async () => { const prev = performance.now(); await new Promise(f => setTimeout(f, 1000)); const next = performance.now(); return { prev, next }; }""" ) ) await asyncio.sleep(0) # Make sure the promise is scheduled. await page.clock.run_for(1000) assert await promise == {"prev": 1000, "next": 2000} async def test_fakes_Date_constructor(self, page: Page) -> None: now = await page.evaluate("new Date().getTime()") assert now == 1000 class TestStubTimersPerformance: async def test_replaces_global_performance_time_origin(self, page: Page) -> None: await page.clock.install(time=1) await page.clock.pause_at(2) promise = asyncio.create_task( page.evaluate( """async () => { const prev = performance.now(); await new Promise(f => setTimeout(f, 1000)); const next = performance.now(); return { prev, next }; }""" ) ) await asyncio.sleep(0) # Make sure the promise is scheduled. await page.clock.run_for(1000) assert await page.evaluate("performance.timeOrigin") == 1000 assert await promise == {"prev": 1000, "next": 2000} class TestPopup: async def test_should_tick_after_popup(self, page: Page) -> None: await page.clock.install(time=0) now = datetime.datetime.fromisoformat("2015-09-25") await page.clock.pause_at(now) popup, _ = await asyncio.gather( page.wait_for_event("popup"), page.evaluate("window.open('about:blank')") ) popup_time = await popup.evaluate("Date.now()") assert popup_time == now.timestamp() * 1000 await page.clock.run_for(1000) popup_time_after = await popup.evaluate("Date.now()") assert popup_time_after == now.timestamp() * 1000 + 1000 async def test_should_tick_before_popup(self, page: Page) -> None: await page.clock.install(time=0) now = datetime.datetime.fromisoformat("2015-09-25") await page.clock.pause_at(now) await page.clock.run_for(1000) popup, _ = await asyncio.gather( page.wait_for_event("popup"), page.evaluate("window.open('about:blank')") ) popup_time = await popup.evaluate("Date.now()") assert popup_time == int(now.timestamp() * 1000 + 1000) assert datetime.datetime.fromtimestamp(popup_time / 1_000).year == 2015 async def test_should_run_time_before_popup(self, page: Page, server: Server) -> None: server.set_route( "/popup.html", lambda res: ( res.setHeader("Content-Type", "text/html"), res.write(b""), res.finish(), ), ) await page.goto(server.EMPTY_PAGE) # Wait for 2 second in real life to check that it is past in popup. await page.wait_for_timeout(2000) popup, _ = await asyncio.gather( page.wait_for_event("popup"), page.evaluate("window.open('{}')".format(server.PREFIX + "/popup.html")), ) popup_time = await popup.evaluate("window.time") assert popup_time >= 2000 async def test_should_not_run_time_before_popup_on_pause( self, page: Page, server: Server ) -> None: server.set_route( "/popup.html", lambda res: ( res.setHeader("Content-Type", "text/html"), res.write(b""), res.finish(), ), ) await page.clock.install(time=0) await page.clock.pause_at(1) await page.goto(server.EMPTY_PAGE) # Wait for 2 second in real life to check that it is past in popup. await page.wait_for_timeout(2000) popup, _ = await asyncio.gather( page.wait_for_event("popup"), page.evaluate("window.open('{}')".format(server.PREFIX + "/popup.html")), ) popup_time = await popup.evaluate("window.time") assert popup_time == 1000 class TestSetFixedTime: async def test_does_not_fake_methods(self, page: Page) -> None: await page.clock.set_fixed_time(0) # Should not stall. await page.evaluate("new Promise(f => setTimeout(f, 1))") async def test_allows_setting_time_multiple_times(self, page: Page) -> None: await page.clock.set_fixed_time(0.1) assert await page.evaluate("Date.now()") == 100 await page.clock.set_fixed_time(0.2) assert await page.evaluate("Date.now()") == 200 async def test_fixed_time_is_not_affected_by_clock_manipulation(self, page: Page) -> None: await page.clock.set_fixed_time(0.1) assert await page.evaluate("Date.now()") == 100 await page.clock.fast_forward(20) assert await page.evaluate("Date.now()") == 100 async def test_allows_installing_fake_timers_after_setting_time( self, page: Page, calls: List[Any] ) -> None: await page.clock.set_fixed_time(0.1) assert await page.evaluate("Date.now()") == 100 await page.clock.set_fixed_time(0.2) await page.evaluate("setTimeout(() => window.stub(Date.now()))") await page.clock.run_for(0) assert calls == [[200]] class TestWhileRunning: async def test_should_progress_time(self, page: Page) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.wait_for_timeout(1000) now = await page.evaluate("Date.now()") assert 1000 <= now <= 2000 async def test_should_run_for(self, page: Page) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.run_for(10000) now = await page.evaluate("Date.now()") assert 10000 <= now <= 11000 async def test_should_fast_forward(self, page: Page) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.fast_forward(10000) now = await page.evaluate("Date.now()") assert 10000 <= now <= 11000 async def test_should_fast_forward_to(self, page: Page) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.fast_forward(10000) now = await page.evaluate("Date.now()") assert 10000 <= now <= 11000 async def test_should_pause(self, page: Page) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.pause_at(1) await page.wait_for_timeout(1000) await page.clock.resume() now = await page.evaluate("Date.now()") assert 0 <= now <= 1000 async def test_should_pause_and_fast_forward(self, page: Page) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.pause_at(1) await page.clock.fast_forward(1000) now = await page.evaluate("Date.now()") assert now == 2000 async def test_should_set_system_time_on_pause(self, page: Page) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.pause_at(1) now = await page.evaluate("Date.now()") assert now == 1000 class TestWhileOnPause: async def test_fast_forward_should_not_run_nested_immediate( self, page: Page, calls: List[Any] ) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.pause_at(1000) await page.evaluate( """ setTimeout(() => { window.stub('outer'); setTimeout(() => window.stub('inner'), 0); }, 1000); """ ) await page.clock.fast_forward(1000) assert calls == [["outer"]] await page.clock.fast_forward(1) assert calls == [["outer"], ["inner"]] async def test_run_for_should_not_run_nested_immediate( self, page: Page, calls: List[Any] ) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.pause_at(1000) await page.evaluate( """ setTimeout(() => { window.stub('outer'); setTimeout(() => window.stub('inner'), 0); }, 1000); """ ) await page.clock.run_for(1000) assert calls == [["outer"]] await page.clock.run_for(1) assert calls == [["outer"], ["inner"]] async def test_run_for_should_not_run_nested_immediate_from_microtask( self, page: Page, calls: List[Any] ) -> None: await page.clock.install(time=0) await page.goto("data:text/html,") await page.clock.pause_at(1000) await page.evaluate( """ setTimeout(() => { window.stub('outer'); void Promise.resolve().then(() => setTimeout(() => window.stub('inner'), 0)); }, 1000); """ ) await page.clock.run_for(1000) assert calls == [["outer"]] await page.clock.run_for(1) assert calls == [["outer"], ["inner"]]