1291 lines
42 KiB
Python
1291 lines
42 KiB
Python
# Copyright (c) Microsoft Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import asyncio
|
|
import base64
|
|
import inspect
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Pattern,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
from playwright._impl._accessibility import Accessibility
|
|
from playwright._impl._api_structures import (
|
|
AriaRole,
|
|
FilePayload,
|
|
FloatRect,
|
|
PdfMargins,
|
|
Position,
|
|
ViewportSize,
|
|
)
|
|
from playwright._impl._artifact import Artifact
|
|
from playwright._impl._connection import (
|
|
ChannelOwner,
|
|
from_channel,
|
|
from_nullable_channel,
|
|
)
|
|
from playwright._impl._console_message import ConsoleMessage
|
|
from playwright._impl._download import Download
|
|
from playwright._impl._element_handle import ElementHandle
|
|
from playwright._impl._errors import Error, TargetClosedError, is_target_closed_error
|
|
from playwright._impl._event_context_manager import EventContextManagerImpl
|
|
from playwright._impl._file_chooser import FileChooser
|
|
from playwright._impl._frame import Frame
|
|
from playwright._impl._har_router import HarRouter
|
|
from playwright._impl._helper import (
|
|
ColorScheme,
|
|
DocumentLoadState,
|
|
ForcedColors,
|
|
HarMode,
|
|
KeyboardModifier,
|
|
MouseButton,
|
|
ReducedMotion,
|
|
RouteFromHarNotFoundPolicy,
|
|
RouteHandler,
|
|
RouteHandlerCallback,
|
|
TimeoutSettings,
|
|
URLMatch,
|
|
URLMatcher,
|
|
URLMatchRequest,
|
|
URLMatchResponse,
|
|
async_readfile,
|
|
async_writefile,
|
|
locals_to_params,
|
|
make_dirs_for_file,
|
|
serialize_error,
|
|
)
|
|
from playwright._impl._input import Keyboard, Mouse, Touchscreen
|
|
from playwright._impl._js_handle import (
|
|
JSHandle,
|
|
Serializable,
|
|
parse_result,
|
|
serialize_argument,
|
|
)
|
|
from playwright._impl._network import Request, Response, Route, serialize_headers
|
|
from playwright._impl._video import Video
|
|
from playwright._impl._waiter import Waiter
|
|
|
|
if sys.version_info >= (3, 8): # pragma: no cover
|
|
from typing import Literal
|
|
else: # pragma: no cover
|
|
from typing_extensions import Literal
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from playwright._impl._browser_context import BrowserContext
|
|
from playwright._impl._fetch import APIRequestContext
|
|
from playwright._impl._locator import FrameLocator, Locator
|
|
from playwright._impl._network import WebSocket
|
|
|
|
|
|
class Page(ChannelOwner):
|
|
Events = SimpleNamespace(
|
|
Close="close",
|
|
Crash="crash",
|
|
Console="console",
|
|
Dialog="dialog",
|
|
Download="download",
|
|
FileChooser="filechooser",
|
|
DOMContentLoaded="domcontentloaded",
|
|
PageError="pageerror",
|
|
Request="request",
|
|
Response="response",
|
|
RequestFailed="requestfailed",
|
|
RequestFinished="requestfinished",
|
|
FrameAttached="frameattached",
|
|
FrameDetached="framedetached",
|
|
FrameNavigated="framenavigated",
|
|
Load="load",
|
|
Popup="popup",
|
|
WebSocket="websocket",
|
|
Worker="worker",
|
|
)
|
|
accessibility: Accessibility
|
|
keyboard: Keyboard
|
|
mouse: Mouse
|
|
touchscreen: Touchscreen
|
|
|
|
def __init__(
|
|
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
|
|
) -> None:
|
|
super().__init__(parent, type, guid, initializer)
|
|
self._browser_context = cast("BrowserContext", parent)
|
|
self.accessibility = Accessibility(self._channel)
|
|
self.keyboard = Keyboard(self._channel)
|
|
self.mouse = Mouse(self._channel)
|
|
self.touchscreen = Touchscreen(self._channel)
|
|
|
|
self._main_frame: Frame = from_channel(initializer["mainFrame"])
|
|
self._main_frame._page = self
|
|
self._frames = [self._main_frame]
|
|
self._viewport_size: Optional[ViewportSize] = initializer.get("viewportSize")
|
|
self._is_closed = False
|
|
self._workers: List["Worker"] = []
|
|
self._bindings: Dict[str, Any] = {}
|
|
self._routes: List[RouteHandler] = []
|
|
self._owned_context: Optional["BrowserContext"] = None
|
|
self._timeout_settings: TimeoutSettings = TimeoutSettings(
|
|
self._browser_context._timeout_settings
|
|
)
|
|
self._video: Optional[Video] = None
|
|
self._opener = cast("Page", from_nullable_channel(initializer.get("opener")))
|
|
self._close_reason: Optional[str] = None
|
|
|
|
self._channel.on(
|
|
"bindingCall",
|
|
lambda params: self._on_binding(from_channel(params["binding"])),
|
|
)
|
|
self._channel.on("close", lambda _: self._on_close())
|
|
self._channel.on("crash", lambda _: self._on_crash())
|
|
self._channel.on("download", lambda params: self._on_download(params))
|
|
self._channel.on(
|
|
"fileChooser",
|
|
lambda params: self.emit(
|
|
Page.Events.FileChooser,
|
|
FileChooser(
|
|
self, from_channel(params["element"]), params["isMultiple"]
|
|
),
|
|
),
|
|
)
|
|
self._channel.on(
|
|
"frameAttached",
|
|
lambda params: self._on_frame_attached(from_channel(params["frame"])),
|
|
)
|
|
self._channel.on(
|
|
"frameDetached",
|
|
lambda params: self._on_frame_detached(from_channel(params["frame"])),
|
|
)
|
|
self._channel.on(
|
|
"route",
|
|
lambda params: asyncio.create_task(
|
|
self._on_route(from_channel(params["route"]))
|
|
),
|
|
)
|
|
self._channel.on("video", lambda params: self._on_video(params))
|
|
self._channel.on(
|
|
"webSocket",
|
|
lambda params: self.emit(
|
|
Page.Events.WebSocket, from_channel(params["webSocket"])
|
|
),
|
|
)
|
|
self._channel.on(
|
|
"worker", lambda params: self._on_worker(from_channel(params["worker"]))
|
|
)
|
|
self._closed_or_crashed_future: asyncio.Future = asyncio.Future()
|
|
self.on(
|
|
Page.Events.Close,
|
|
lambda _: self._closed_or_crashed_future.set_result(
|
|
self._close_error_with_reason()
|
|
)
|
|
if not self._closed_or_crashed_future.done()
|
|
else None,
|
|
)
|
|
self.on(
|
|
Page.Events.Crash,
|
|
lambda _: self._closed_or_crashed_future.set_result(TargetClosedError())
|
|
if not self._closed_or_crashed_future.done()
|
|
else None,
|
|
)
|
|
|
|
self._set_event_to_subscription_mapping(
|
|
{
|
|
Page.Events.Console: "console",
|
|
Page.Events.Dialog: "dialog",
|
|
Page.Events.Request: "request",
|
|
Page.Events.Response: "response",
|
|
Page.Events.RequestFinished: "requestFinished",
|
|
Page.Events.RequestFailed: "requestFailed",
|
|
Page.Events.FileChooser: "fileChooser",
|
|
}
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<Page url={self.url!r}>"
|
|
|
|
def _on_frame_attached(self, frame: Frame) -> None:
|
|
frame._page = self
|
|
self._frames.append(frame)
|
|
self.emit(Page.Events.FrameAttached, frame)
|
|
|
|
def _on_frame_detached(self, frame: Frame) -> None:
|
|
self._frames.remove(frame)
|
|
frame._detached = True
|
|
self.emit(Page.Events.FrameDetached, frame)
|
|
|
|
async def _on_route(self, route: Route) -> None:
|
|
route._context = self.context
|
|
route_handlers = self._routes.copy()
|
|
for route_handler in route_handlers:
|
|
if not route_handler.matches(route.request.url):
|
|
continue
|
|
if route_handler.will_expire:
|
|
self._routes.remove(route_handler)
|
|
try:
|
|
handled = await route_handler.handle(route)
|
|
finally:
|
|
if len(self._routes) == 0:
|
|
asyncio.create_task(
|
|
self._connection.wrap_api_call(
|
|
lambda: self._update_interception_patterns(), True
|
|
)
|
|
)
|
|
if handled:
|
|
return
|
|
await self._browser_context._on_route(route)
|
|
|
|
def _on_binding(self, binding_call: "BindingCall") -> None:
|
|
func = self._bindings.get(binding_call._initializer["name"])
|
|
if func:
|
|
asyncio.create_task(binding_call.call(func))
|
|
self._browser_context._on_binding(binding_call)
|
|
|
|
def _on_worker(self, worker: "Worker") -> None:
|
|
self._workers.append(worker)
|
|
worker._page = self
|
|
self.emit(Page.Events.Worker, worker)
|
|
|
|
def _on_close(self) -> None:
|
|
self._is_closed = True
|
|
if self in self._browser_context._pages:
|
|
self._browser_context._pages.remove(self)
|
|
if self in self._browser_context._background_pages:
|
|
self._browser_context._background_pages.remove(self)
|
|
self.emit(Page.Events.Close, self)
|
|
|
|
def _on_crash(self) -> None:
|
|
self.emit(Page.Events.Crash, self)
|
|
|
|
def _on_download(self, params: Any) -> None:
|
|
url = params["url"]
|
|
suggested_filename = params["suggestedFilename"]
|
|
artifact = cast(Artifact, from_channel(params["artifact"]))
|
|
self.emit(
|
|
Page.Events.Download, Download(self, url, suggested_filename, artifact)
|
|
)
|
|
|
|
def _on_video(self, params: Any) -> None:
|
|
artifact = from_channel(params["artifact"])
|
|
cast(Video, self.video)._artifact_ready(artifact)
|
|
|
|
@property
|
|
def context(self) -> "BrowserContext":
|
|
return self._browser_context
|
|
|
|
async def opener(self) -> Optional["Page"]:
|
|
if self._opener and self._opener.is_closed():
|
|
return None
|
|
return self._opener
|
|
|
|
@property
|
|
def main_frame(self) -> Frame:
|
|
return self._main_frame
|
|
|
|
def frame(self, name: str = None, url: URLMatch = None) -> Optional[Frame]:
|
|
matcher = (
|
|
URLMatcher(self._browser_context._options.get("baseURL"), url)
|
|
if url
|
|
else None
|
|
)
|
|
for frame in self._frames:
|
|
if name and frame.name == name:
|
|
return frame
|
|
if url and matcher and matcher.matches(frame.url):
|
|
return frame
|
|
return None
|
|
|
|
@property
|
|
def frames(self) -> List[Frame]:
|
|
return self._frames.copy()
|
|
|
|
def set_default_navigation_timeout(self, timeout: float) -> None:
|
|
self._timeout_settings.set_default_navigation_timeout(timeout)
|
|
self._channel.send_no_reply(
|
|
"setDefaultNavigationTimeoutNoReply", dict(timeout=timeout)
|
|
)
|
|
|
|
def set_default_timeout(self, timeout: float) -> None:
|
|
self._timeout_settings.set_default_timeout(timeout)
|
|
self._channel.send_no_reply("setDefaultTimeoutNoReply", dict(timeout=timeout))
|
|
|
|
async def query_selector(
|
|
self,
|
|
selector: str,
|
|
strict: bool = None,
|
|
) -> Optional[ElementHandle]:
|
|
return await self._main_frame.query_selector(selector, strict)
|
|
|
|
async def query_selector_all(self, selector: str) -> List[ElementHandle]:
|
|
return await self._main_frame.query_selector_all(selector)
|
|
|
|
async def wait_for_selector(
|
|
self,
|
|
selector: str,
|
|
timeout: float = None,
|
|
state: Literal["attached", "detached", "hidden", "visible"] = None,
|
|
strict: bool = None,
|
|
) -> Optional[ElementHandle]:
|
|
return await self._main_frame.wait_for_selector(**locals_to_params(locals()))
|
|
|
|
async def is_checked(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> bool:
|
|
return await self._main_frame.is_checked(**locals_to_params(locals()))
|
|
|
|
async def is_disabled(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> bool:
|
|
return await self._main_frame.is_disabled(**locals_to_params(locals()))
|
|
|
|
async def is_editable(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> bool:
|
|
return await self._main_frame.is_editable(**locals_to_params(locals()))
|
|
|
|
async def is_enabled(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> bool:
|
|
return await self._main_frame.is_enabled(**locals_to_params(locals()))
|
|
|
|
async def is_hidden(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> bool:
|
|
return await self._main_frame.is_hidden(**locals_to_params(locals()))
|
|
|
|
async def is_visible(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> bool:
|
|
return await self._main_frame.is_visible(**locals_to_params(locals()))
|
|
|
|
async def dispatch_event(
|
|
self,
|
|
selector: str,
|
|
type: str,
|
|
eventInit: Dict = None,
|
|
timeout: float = None,
|
|
strict: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.dispatch_event(**locals_to_params(locals()))
|
|
|
|
async def evaluate(self, expression: str, arg: Serializable = None) -> Any:
|
|
return await self._main_frame.evaluate(expression, arg)
|
|
|
|
async def evaluate_handle(
|
|
self, expression: str, arg: Serializable = None
|
|
) -> JSHandle:
|
|
return await self._main_frame.evaluate_handle(expression, arg)
|
|
|
|
async def eval_on_selector(
|
|
self,
|
|
selector: str,
|
|
expression: str,
|
|
arg: Serializable = None,
|
|
strict: bool = None,
|
|
) -> Any:
|
|
return await self._main_frame.eval_on_selector(
|
|
selector, expression, arg, strict
|
|
)
|
|
|
|
async def eval_on_selector_all(
|
|
self,
|
|
selector: str,
|
|
expression: str,
|
|
arg: Serializable = None,
|
|
) -> Any:
|
|
return await self._main_frame.eval_on_selector_all(selector, expression, arg)
|
|
|
|
async def add_script_tag(
|
|
self,
|
|
url: str = None,
|
|
path: Union[str, Path] = None,
|
|
content: str = None,
|
|
type: str = None,
|
|
) -> ElementHandle:
|
|
return await self._main_frame.add_script_tag(**locals_to_params(locals()))
|
|
|
|
async def add_style_tag(
|
|
self, url: str = None, path: Union[str, Path] = None, content: str = None
|
|
) -> ElementHandle:
|
|
return await self._main_frame.add_style_tag(**locals_to_params(locals()))
|
|
|
|
async def expose_function(self, name: str, callback: Callable) -> None:
|
|
await self.expose_binding(name, lambda source, *args: callback(*args))
|
|
|
|
async def expose_binding(
|
|
self, name: str, callback: Callable, handle: bool = None
|
|
) -> None:
|
|
if name in self._bindings:
|
|
raise Error(f'Function "{name}" has been already registered')
|
|
if name in self._browser_context._bindings:
|
|
raise Error(
|
|
f'Function "{name}" has been already registered in the browser context'
|
|
)
|
|
self._bindings[name] = callback
|
|
await self._channel.send(
|
|
"exposeBinding", dict(name=name, needsHandle=handle or False)
|
|
)
|
|
|
|
async def set_extra_http_headers(self, headers: Dict[str, str]) -> None:
|
|
await self._channel.send(
|
|
"setExtraHTTPHeaders", dict(headers=serialize_headers(headers))
|
|
)
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
return self._main_frame.url
|
|
|
|
async def content(self) -> str:
|
|
return await self._main_frame.content()
|
|
|
|
async def set_content(
|
|
self,
|
|
html: str,
|
|
timeout: float = None,
|
|
waitUntil: DocumentLoadState = None,
|
|
) -> None:
|
|
return await self._main_frame.set_content(**locals_to_params(locals()))
|
|
|
|
async def goto(
|
|
self,
|
|
url: str,
|
|
timeout: float = None,
|
|
waitUntil: DocumentLoadState = None,
|
|
referer: str = None,
|
|
) -> Optional[Response]:
|
|
return await self._main_frame.goto(**locals_to_params(locals()))
|
|
|
|
async def reload(
|
|
self,
|
|
timeout: float = None,
|
|
waitUntil: DocumentLoadState = None,
|
|
) -> Optional[Response]:
|
|
return from_nullable_channel(
|
|
await self._channel.send("reload", locals_to_params(locals()))
|
|
)
|
|
|
|
async def wait_for_load_state(
|
|
self,
|
|
state: Literal["domcontentloaded", "load", "networkidle"] = None,
|
|
timeout: float = None,
|
|
) -> None:
|
|
return await self._main_frame.wait_for_load_state(**locals_to_params(locals()))
|
|
|
|
async def wait_for_url(
|
|
self,
|
|
url: URLMatch,
|
|
wait_until: DocumentLoadState = None,
|
|
timeout: float = None,
|
|
) -> None:
|
|
return await self._main_frame.wait_for_url(**locals_to_params(locals()))
|
|
|
|
async def wait_for_event(
|
|
self, event: str, predicate: Callable = None, timeout: float = None
|
|
) -> Any:
|
|
async with self.expect_event(event, predicate, timeout) as event_info:
|
|
pass
|
|
return await event_info
|
|
|
|
async def go_back(
|
|
self,
|
|
timeout: float = None,
|
|
waitUntil: DocumentLoadState = None,
|
|
) -> Optional[Response]:
|
|
return from_nullable_channel(
|
|
await self._channel.send("goBack", locals_to_params(locals()))
|
|
)
|
|
|
|
async def go_forward(
|
|
self,
|
|
timeout: float = None,
|
|
waitUntil: DocumentLoadState = None,
|
|
) -> Optional[Response]:
|
|
return from_nullable_channel(
|
|
await self._channel.send("goForward", locals_to_params(locals()))
|
|
)
|
|
|
|
async def emulate_media(
|
|
self,
|
|
media: Literal["null", "print", "screen"] = None,
|
|
colorScheme: ColorScheme = None,
|
|
reducedMotion: ReducedMotion = None,
|
|
forcedColors: ForcedColors = None,
|
|
) -> None:
|
|
params = locals_to_params(locals())
|
|
if "media" in params:
|
|
params["media"] = "no-override" if params["media"] == "null" else media
|
|
if "colorScheme" in params:
|
|
params["colorScheme"] = (
|
|
"no-override" if params["colorScheme"] == "null" else colorScheme
|
|
)
|
|
if "reducedMotion" in params:
|
|
params["reducedMotion"] = (
|
|
"no-override" if params["reducedMotion"] == "null" else reducedMotion
|
|
)
|
|
if "forcedColors" in params:
|
|
params["forcedColors"] = (
|
|
"no-override" if params["forcedColors"] == "null" else forcedColors
|
|
)
|
|
await self._channel.send("emulateMedia", params)
|
|
|
|
async def set_viewport_size(self, viewportSize: ViewportSize) -> None:
|
|
self._viewport_size = viewportSize
|
|
await self._channel.send("setViewportSize", locals_to_params(locals()))
|
|
|
|
@property
|
|
def viewport_size(self) -> Optional[ViewportSize]:
|
|
return self._viewport_size
|
|
|
|
async def bring_to_front(self) -> None:
|
|
await self._channel.send("bringToFront")
|
|
|
|
async def add_init_script(
|
|
self, script: str = None, path: Union[str, Path] = None
|
|
) -> None:
|
|
if path:
|
|
script = (await async_readfile(path)).decode()
|
|
if not isinstance(script, str):
|
|
raise Error("Either path or script parameter must be specified")
|
|
await self._channel.send("addInitScript", dict(source=script))
|
|
|
|
async def route(
|
|
self, url: URLMatch, handler: RouteHandlerCallback, times: int = None
|
|
) -> None:
|
|
self._routes.insert(
|
|
0,
|
|
RouteHandler(
|
|
URLMatcher(self._browser_context._options.get("baseURL"), url),
|
|
handler,
|
|
True if self._dispatcher_fiber else False,
|
|
times,
|
|
),
|
|
)
|
|
await self._update_interception_patterns()
|
|
|
|
async def unroute(
|
|
self, url: URLMatch, handler: Optional[RouteHandlerCallback] = None
|
|
) -> None:
|
|
self._routes = list(
|
|
filter(
|
|
lambda r: r.matcher.match != url or (handler and r.handler != handler),
|
|
self._routes,
|
|
)
|
|
)
|
|
await self._update_interception_patterns()
|
|
|
|
async def route_from_har(
|
|
self,
|
|
har: Union[Path, str],
|
|
url: Union[Pattern[str], str] = None,
|
|
not_found: RouteFromHarNotFoundPolicy = None,
|
|
update: bool = None,
|
|
update_content: Literal["attach", "embed"] = None,
|
|
update_mode: HarMode = None,
|
|
) -> None:
|
|
if update:
|
|
await self._browser_context._record_into_har(
|
|
har=har,
|
|
page=self,
|
|
url=url,
|
|
update_content=update_content,
|
|
update_mode=update_mode,
|
|
)
|
|
return
|
|
router = await HarRouter.create(
|
|
local_utils=self._connection.local_utils,
|
|
file=str(har),
|
|
not_found_action=not_found or "abort",
|
|
url_matcher=url,
|
|
)
|
|
await router.add_page_route(self)
|
|
|
|
async def _update_interception_patterns(self) -> None:
|
|
patterns = RouteHandler.prepare_interception_patterns(self._routes)
|
|
await self._channel.send(
|
|
"setNetworkInterceptionPatterns", {"patterns": patterns}
|
|
)
|
|
|
|
async def screenshot(
|
|
self,
|
|
timeout: float = None,
|
|
type: Literal["jpeg", "png"] = None,
|
|
path: Union[str, Path] = None,
|
|
quality: int = None,
|
|
omitBackground: bool = None,
|
|
fullPage: bool = None,
|
|
clip: FloatRect = None,
|
|
animations: Literal["allow", "disabled"] = None,
|
|
caret: Literal["hide", "initial"] = None,
|
|
scale: Literal["css", "device"] = None,
|
|
mask: List["Locator"] = None,
|
|
mask_color: str = None,
|
|
) -> bytes:
|
|
params = locals_to_params(locals())
|
|
if "path" in params:
|
|
del params["path"]
|
|
if "mask" in params:
|
|
params["mask"] = list(
|
|
map(
|
|
lambda locator: (
|
|
{
|
|
"frame": locator._frame._channel,
|
|
"selector": locator._selector,
|
|
}
|
|
),
|
|
params["mask"],
|
|
)
|
|
)
|
|
encoded_binary = await self._channel.send("screenshot", params)
|
|
decoded_binary = base64.b64decode(encoded_binary)
|
|
if path:
|
|
make_dirs_for_file(path)
|
|
await async_writefile(path, decoded_binary)
|
|
return decoded_binary
|
|
|
|
async def title(self) -> str:
|
|
return await self._main_frame.title()
|
|
|
|
async def close(self, runBeforeUnload: bool = None, reason: str = None) -> None:
|
|
self._close_reason = reason
|
|
try:
|
|
await self._channel.send("close", locals_to_params(locals()))
|
|
if self._owned_context:
|
|
await self._owned_context.close()
|
|
except Exception as e:
|
|
if not is_target_closed_error(e) and not runBeforeUnload:
|
|
raise e
|
|
|
|
def is_closed(self) -> bool:
|
|
return self._is_closed
|
|
|
|
async def click(
|
|
self,
|
|
selector: str,
|
|
modifiers: List[KeyboardModifier] = None,
|
|
position: Position = None,
|
|
delay: float = None,
|
|
button: MouseButton = None,
|
|
clickCount: int = None,
|
|
timeout: float = None,
|
|
force: bool = None,
|
|
noWaitAfter: bool = None,
|
|
trial: bool = None,
|
|
strict: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.click(**locals_to_params(locals()))
|
|
|
|
async def dblclick(
|
|
self,
|
|
selector: str,
|
|
modifiers: List[KeyboardModifier] = None,
|
|
position: Position = None,
|
|
delay: float = None,
|
|
button: MouseButton = None,
|
|
timeout: float = None,
|
|
force: bool = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
trial: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.dblclick(**locals_to_params(locals()))
|
|
|
|
async def tap(
|
|
self,
|
|
selector: str,
|
|
modifiers: List[KeyboardModifier] = None,
|
|
position: Position = None,
|
|
timeout: float = None,
|
|
force: bool = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
trial: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.tap(**locals_to_params(locals()))
|
|
|
|
async def fill(
|
|
self,
|
|
selector: str,
|
|
value: str,
|
|
timeout: float = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
force: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.fill(**locals_to_params(locals()))
|
|
|
|
def locator(
|
|
self,
|
|
selector: str,
|
|
has_text: Union[str, Pattern[str]] = None,
|
|
has_not_text: Union[str, Pattern[str]] = None,
|
|
has: "Locator" = None,
|
|
has_not: "Locator" = None,
|
|
) -> "Locator":
|
|
return self._main_frame.locator(
|
|
selector,
|
|
has_text=has_text,
|
|
has_not_text=has_not_text,
|
|
has=has,
|
|
has_not=has_not,
|
|
)
|
|
|
|
def get_by_alt_text(
|
|
self, text: Union[str, Pattern[str]], exact: bool = None
|
|
) -> "Locator":
|
|
return self._main_frame.get_by_alt_text(text, exact=exact)
|
|
|
|
def get_by_label(
|
|
self, text: Union[str, Pattern[str]], exact: bool = None
|
|
) -> "Locator":
|
|
return self._main_frame.get_by_label(text, exact=exact)
|
|
|
|
def get_by_placeholder(
|
|
self, text: Union[str, Pattern[str]], exact: bool = None
|
|
) -> "Locator":
|
|
return self._main_frame.get_by_placeholder(text, exact=exact)
|
|
|
|
def get_by_role(
|
|
self,
|
|
role: AriaRole,
|
|
checked: bool = None,
|
|
disabled: bool = None,
|
|
expanded: bool = None,
|
|
includeHidden: bool = None,
|
|
level: int = None,
|
|
name: Union[str, Pattern[str]] = None,
|
|
pressed: bool = None,
|
|
selected: bool = None,
|
|
exact: bool = None,
|
|
) -> "Locator":
|
|
return self._main_frame.get_by_role(
|
|
role,
|
|
checked=checked,
|
|
disabled=disabled,
|
|
expanded=expanded,
|
|
includeHidden=includeHidden,
|
|
level=level,
|
|
name=name,
|
|
pressed=pressed,
|
|
selected=selected,
|
|
exact=exact,
|
|
)
|
|
|
|
def get_by_test_id(self, testId: Union[str, Pattern[str]]) -> "Locator":
|
|
return self._main_frame.get_by_test_id(testId)
|
|
|
|
def get_by_text(
|
|
self, text: Union[str, Pattern[str]], exact: bool = None
|
|
) -> "Locator":
|
|
return self._main_frame.get_by_text(text, exact=exact)
|
|
|
|
def get_by_title(
|
|
self, text: Union[str, Pattern[str]], exact: bool = None
|
|
) -> "Locator":
|
|
return self._main_frame.get_by_title(text, exact=exact)
|
|
|
|
def frame_locator(self, selector: str) -> "FrameLocator":
|
|
return self.main_frame.frame_locator(selector)
|
|
|
|
async def focus(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> None:
|
|
return await self._main_frame.focus(**locals_to_params(locals()))
|
|
|
|
async def text_content(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> Optional[str]:
|
|
return await self._main_frame.text_content(**locals_to_params(locals()))
|
|
|
|
async def inner_text(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> str:
|
|
return await self._main_frame.inner_text(**locals_to_params(locals()))
|
|
|
|
async def inner_html(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> str:
|
|
return await self._main_frame.inner_html(**locals_to_params(locals()))
|
|
|
|
async def get_attribute(
|
|
self, selector: str, name: str, strict: bool = None, timeout: float = None
|
|
) -> Optional[str]:
|
|
return await self._main_frame.get_attribute(**locals_to_params(locals()))
|
|
|
|
async def hover(
|
|
self,
|
|
selector: str,
|
|
modifiers: List[KeyboardModifier] = None,
|
|
position: Position = None,
|
|
timeout: float = None,
|
|
noWaitAfter: bool = None,
|
|
force: bool = None,
|
|
strict: bool = None,
|
|
trial: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.hover(**locals_to_params(locals()))
|
|
|
|
async def drag_and_drop(
|
|
self,
|
|
source: str,
|
|
target: str,
|
|
sourcePosition: Position = None,
|
|
targetPosition: Position = None,
|
|
force: bool = None,
|
|
noWaitAfter: bool = None,
|
|
timeout: float = None,
|
|
strict: bool = None,
|
|
trial: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.drag_and_drop(**locals_to_params(locals()))
|
|
|
|
async def select_option(
|
|
self,
|
|
selector: str,
|
|
value: Union[str, List[str]] = None,
|
|
index: Union[int, List[int]] = None,
|
|
label: Union[str, List[str]] = None,
|
|
element: Union["ElementHandle", List["ElementHandle"]] = None,
|
|
timeout: float = None,
|
|
noWaitAfter: bool = None,
|
|
force: bool = None,
|
|
strict: bool = None,
|
|
) -> List[str]:
|
|
params = locals_to_params(locals())
|
|
return await self._main_frame.select_option(**params)
|
|
|
|
async def input_value(
|
|
self, selector: str, strict: bool = None, timeout: float = None
|
|
) -> str:
|
|
params = locals_to_params(locals())
|
|
return await self._main_frame.input_value(**params)
|
|
|
|
async def set_input_files(
|
|
self,
|
|
selector: str,
|
|
files: Union[str, Path, FilePayload, List[Union[str, Path]], List[FilePayload]],
|
|
timeout: float = None,
|
|
strict: bool = None,
|
|
noWaitAfter: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.set_input_files(**locals_to_params(locals()))
|
|
|
|
async def type(
|
|
self,
|
|
selector: str,
|
|
text: str,
|
|
delay: float = None,
|
|
timeout: float = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.type(**locals_to_params(locals()))
|
|
|
|
async def press(
|
|
self,
|
|
selector: str,
|
|
key: str,
|
|
delay: float = None,
|
|
timeout: float = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.press(**locals_to_params(locals()))
|
|
|
|
async def check(
|
|
self,
|
|
selector: str,
|
|
position: Position = None,
|
|
timeout: float = None,
|
|
force: bool = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
trial: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.check(**locals_to_params(locals()))
|
|
|
|
async def uncheck(
|
|
self,
|
|
selector: str,
|
|
position: Position = None,
|
|
timeout: float = None,
|
|
force: bool = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
trial: bool = None,
|
|
) -> None:
|
|
return await self._main_frame.uncheck(**locals_to_params(locals()))
|
|
|
|
async def wait_for_timeout(self, timeout: float) -> None:
|
|
await self._main_frame.wait_for_timeout(timeout)
|
|
|
|
async def wait_for_function(
|
|
self,
|
|
expression: str,
|
|
arg: Serializable = None,
|
|
timeout: float = None,
|
|
polling: Union[float, Literal["raf"]] = None,
|
|
) -> JSHandle:
|
|
return await self._main_frame.wait_for_function(**locals_to_params(locals()))
|
|
|
|
@property
|
|
def workers(self) -> List["Worker"]:
|
|
return self._workers.copy()
|
|
|
|
@property
|
|
def request(self) -> "APIRequestContext":
|
|
return self.context.request
|
|
|
|
async def pause(self) -> None:
|
|
default_navigation_timeout = (
|
|
self._browser_context._timeout_settings.default_navigation_timeout()
|
|
)
|
|
default_timeout = self._browser_context._timeout_settings.default_timeout()
|
|
self._browser_context.set_default_navigation_timeout(0)
|
|
self._browser_context.set_default_timeout(0)
|
|
try:
|
|
await asyncio.wait(
|
|
[
|
|
asyncio.create_task(self._browser_context._channel.send("pause")),
|
|
self._closed_or_crashed_future,
|
|
],
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
finally:
|
|
self._browser_context._set_default_navigation_timeout_impl(
|
|
default_navigation_timeout
|
|
)
|
|
self._browser_context._set_default_timeout_impl(default_timeout)
|
|
|
|
async def pdf(
|
|
self,
|
|
scale: float = None,
|
|
displayHeaderFooter: bool = None,
|
|
headerTemplate: str = None,
|
|
footerTemplate: str = None,
|
|
printBackground: bool = None,
|
|
landscape: bool = None,
|
|
pageRanges: str = None,
|
|
format: str = None,
|
|
width: Union[str, float] = None,
|
|
height: Union[str, float] = None,
|
|
preferCSSPageSize: bool = None,
|
|
margin: PdfMargins = None,
|
|
path: Union[str, Path] = None,
|
|
) -> bytes:
|
|
params = locals_to_params(locals())
|
|
if "path" in params:
|
|
del params["path"]
|
|
encoded_binary = await self._channel.send("pdf", params)
|
|
decoded_binary = base64.b64decode(encoded_binary)
|
|
if path:
|
|
make_dirs_for_file(path)
|
|
await async_writefile(path, decoded_binary)
|
|
return decoded_binary
|
|
|
|
@property
|
|
def video(
|
|
self,
|
|
) -> Optional[Video]:
|
|
if not self._video:
|
|
self._video = Video(self)
|
|
return self._video
|
|
|
|
def _close_error_with_reason(self) -> TargetClosedError:
|
|
return TargetClosedError(
|
|
self._close_reason or self._browser_context._effective_close_reason()
|
|
)
|
|
|
|
def expect_event(
|
|
self,
|
|
event: str,
|
|
predicate: Callable = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl:
|
|
return self._expect_event(
|
|
event, predicate, timeout, f'waiting for event "{event}"'
|
|
)
|
|
|
|
def _expect_event(
|
|
self,
|
|
event: str,
|
|
predicate: Callable = None,
|
|
timeout: float = None,
|
|
log_line: str = None,
|
|
) -> EventContextManagerImpl:
|
|
if timeout is None:
|
|
timeout = self._timeout_settings.timeout()
|
|
waiter = Waiter(self, f"page.expect_event({event})")
|
|
waiter.reject_on_timeout(
|
|
timeout, f'Timeout {timeout}ms exceeded while waiting for event "{event}"'
|
|
)
|
|
if log_line:
|
|
waiter.log(log_line)
|
|
if event != Page.Events.Crash:
|
|
waiter.reject_on_event(self, Page.Events.Crash, Error("Page crashed"))
|
|
if event != Page.Events.Close:
|
|
waiter.reject_on_event(
|
|
self, Page.Events.Close, lambda: self._close_error_with_reason()
|
|
)
|
|
waiter.wait_for_event(self, event, predicate)
|
|
return EventContextManagerImpl(waiter.result())
|
|
|
|
def expect_console_message(
|
|
self,
|
|
predicate: Callable[[ConsoleMessage], bool] = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl[ConsoleMessage]:
|
|
return self.expect_event(Page.Events.Console, predicate, timeout)
|
|
|
|
def expect_download(
|
|
self,
|
|
predicate: Callable[[Download], bool] = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl[Download]:
|
|
return self.expect_event(Page.Events.Download, predicate, timeout)
|
|
|
|
def expect_file_chooser(
|
|
self,
|
|
predicate: Callable[[FileChooser], bool] = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl[FileChooser]:
|
|
return self.expect_event(Page.Events.FileChooser, predicate, timeout)
|
|
|
|
def expect_navigation(
|
|
self,
|
|
url: URLMatch = None,
|
|
wait_until: DocumentLoadState = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl[Response]:
|
|
return self.main_frame.expect_navigation(url, wait_until, timeout)
|
|
|
|
def expect_popup(
|
|
self,
|
|
predicate: Callable[["Page"], bool] = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl["Page"]:
|
|
return self.expect_event(Page.Events.Popup, predicate, timeout)
|
|
|
|
def expect_request(
|
|
self,
|
|
url_or_predicate: URLMatchRequest,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl[Request]:
|
|
matcher = (
|
|
None
|
|
if callable(url_or_predicate)
|
|
else URLMatcher(
|
|
self._browser_context._options.get("baseURL"), url_or_predicate
|
|
)
|
|
)
|
|
predicate = url_or_predicate if callable(url_or_predicate) else None
|
|
|
|
def my_predicate(request: Request) -> bool:
|
|
if matcher:
|
|
return matcher.matches(request.url)
|
|
if predicate:
|
|
return predicate(request)
|
|
return True
|
|
|
|
trimmed_url = trim_url(url_or_predicate)
|
|
log_line = f"waiting for request {trimmed_url}" if trimmed_url else None
|
|
return self._expect_event(
|
|
Page.Events.Request,
|
|
predicate=my_predicate,
|
|
timeout=timeout,
|
|
log_line=log_line,
|
|
)
|
|
|
|
def expect_request_finished(
|
|
self,
|
|
predicate: Callable[["Request"], bool] = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl[Request]:
|
|
return self.expect_event(
|
|
Page.Events.RequestFinished, predicate=predicate, timeout=timeout
|
|
)
|
|
|
|
def expect_response(
|
|
self,
|
|
url_or_predicate: URLMatchResponse,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl[Response]:
|
|
matcher = (
|
|
None
|
|
if callable(url_or_predicate)
|
|
else URLMatcher(
|
|
self._browser_context._options.get("baseURL"), url_or_predicate
|
|
)
|
|
)
|
|
predicate = url_or_predicate if callable(url_or_predicate) else None
|
|
|
|
def my_predicate(response: Response) -> bool:
|
|
if matcher:
|
|
return matcher.matches(response.url)
|
|
if predicate:
|
|
return predicate(response)
|
|
return True
|
|
|
|
trimmed_url = trim_url(url_or_predicate)
|
|
log_line = f"waiting for response {trimmed_url}" if trimmed_url else None
|
|
return self._expect_event(
|
|
Page.Events.Response,
|
|
predicate=my_predicate,
|
|
timeout=timeout,
|
|
log_line=log_line,
|
|
)
|
|
|
|
def expect_websocket(
|
|
self,
|
|
predicate: Callable[["WebSocket"], bool] = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl["WebSocket"]:
|
|
return self.expect_event("websocket", predicate, timeout)
|
|
|
|
def expect_worker(
|
|
self,
|
|
predicate: Callable[["Worker"], bool] = None,
|
|
timeout: float = None,
|
|
) -> EventContextManagerImpl["Worker"]:
|
|
return self.expect_event("worker", predicate, timeout)
|
|
|
|
async def set_checked(
|
|
self,
|
|
selector: str,
|
|
checked: bool,
|
|
position: Position = None,
|
|
timeout: float = None,
|
|
force: bool = None,
|
|
noWaitAfter: bool = None,
|
|
strict: bool = None,
|
|
trial: bool = None,
|
|
) -> None:
|
|
if checked:
|
|
await self.check(
|
|
selector=selector,
|
|
position=position,
|
|
timeout=timeout,
|
|
force=force,
|
|
noWaitAfter=noWaitAfter,
|
|
strict=strict,
|
|
trial=trial,
|
|
)
|
|
else:
|
|
await self.uncheck(
|
|
selector=selector,
|
|
position=position,
|
|
timeout=timeout,
|
|
force=force,
|
|
noWaitAfter=noWaitAfter,
|
|
strict=strict,
|
|
trial=trial,
|
|
)
|
|
|
|
|
|
class Worker(ChannelOwner):
|
|
Events = SimpleNamespace(Close="close")
|
|
|
|
def __init__(
|
|
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
|
|
) -> None:
|
|
super().__init__(parent, type, guid, initializer)
|
|
self._channel.on("close", lambda _: self._on_close())
|
|
self._page: Optional[Page] = None
|
|
self._context: Optional["BrowserContext"] = None
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<Worker url={self.url!r}>"
|
|
|
|
def _on_close(self) -> None:
|
|
if self._page:
|
|
self._page._workers.remove(self)
|
|
if self._context:
|
|
self._context._service_workers.remove(self)
|
|
self.emit(Worker.Events.Close, self)
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
return self._initializer["url"]
|
|
|
|
async def evaluate(self, expression: str, arg: Serializable = None) -> Any:
|
|
return parse_result(
|
|
await self._channel.send(
|
|
"evaluateExpression",
|
|
dict(
|
|
expression=expression,
|
|
arg=serialize_argument(arg),
|
|
),
|
|
)
|
|
)
|
|
|
|
async def evaluate_handle(
|
|
self, expression: str, arg: Serializable = None
|
|
) -> JSHandle:
|
|
return from_channel(
|
|
await self._channel.send(
|
|
"evaluateExpressionHandle",
|
|
dict(
|
|
expression=expression,
|
|
arg=serialize_argument(arg),
|
|
),
|
|
)
|
|
)
|
|
|
|
|
|
class BindingCall(ChannelOwner):
|
|
def __init__(
|
|
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
|
|
) -> None:
|
|
super().__init__(parent, type, guid, initializer)
|
|
|
|
async def call(self, func: Callable) -> None:
|
|
try:
|
|
frame = from_channel(self._initializer["frame"])
|
|
source = dict(context=frame._page.context, page=frame._page, frame=frame)
|
|
if self._initializer.get("handle"):
|
|
result = func(source, from_channel(self._initializer["handle"]))
|
|
else:
|
|
func_args = list(map(parse_result, self._initializer["args"]))
|
|
result = func(source, *func_args)
|
|
if inspect.iscoroutine(result):
|
|
result = await result
|
|
await self._channel.send("resolve", dict(result=serialize_argument(result)))
|
|
except Exception as e:
|
|
tb = sys.exc_info()[2]
|
|
asyncio.create_task(
|
|
self._channel.send(
|
|
"reject", dict(error=dict(error=serialize_error(e, tb)))
|
|
)
|
|
)
|
|
|
|
|
|
def trim_url(param: Union[URLMatchRequest, URLMatchResponse]) -> Optional[str]:
|
|
if isinstance(param, re.Pattern):
|
|
return trim_end(param.pattern)
|
|
if isinstance(param, str):
|
|
return trim_end(param)
|
|
return None
|
|
|
|
|
|
def trim_end(s: str) -> str:
|
|
if len(s) > 50:
|
|
return s[:50] + "\u2026"
|
|
return s
|