# Licensed to the Software Freedom Conservancy (SFC) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The SFC licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import http.server import os import socketserver import sys import threading import types from dataclasses import dataclass from pathlib import Path import pytest import rich.console import rich.traceback from python.runfiles import Runfiles from selenium import webdriver from selenium.common.exceptions import WebDriverException from selenium.webdriver.common.utils import free_port from selenium.webdriver.remote.server import Server from test.selenium.webdriver.common.network import get_lan_ip from test.selenium.webdriver.common.webserver import SimpleWebServer drivers = ( "chrome", "edge", "firefox", "ie", "safari", "webkitgtk", "wpewebkit", ) TRACEBACK_WIDTH = 130 # don't force colors on RBE since errors get redirected to a log file force_terminal = "REMOTE_BUILD" not in os.environ console = rich.console.Console(force_terminal=force_terminal, width=TRACEBACK_WIDTH) def extract_traceback_frames(tb): """Extract frames from a traceback object.""" frames = [] while tb: if hasattr(tb, "tb_frame") and hasattr(tb, "tb_lineno"): # Skip frames without source files if Path(tb.tb_frame.f_code.co_filename).exists(): frames.append((tb.tb_frame, tb.tb_lineno, getattr(tb, "tb_lasti", 0))) tb = getattr(tb, "tb_next", None) return frames def filter_frames(frames): """Filter out frames from pytest internals.""" skip_modules = ["pytest", "_pytest", "pluggy"] filtered = [] for frame, lineno, lasti in reversed(frames): mod_name = frame.f_globals.get("__name__", "") if not any(skip in mod_name for skip in skip_modules): filtered.append((frame, lineno, lasti)) return filtered def rebuild_traceback(frames): """Rebuild a traceback object from frames list.""" new_tb = None for frame, lineno, lasti in frames: new_tb = types.TracebackType(new_tb, frame, lasti, lineno) return new_tb def pytest_runtest_makereport(item, call): """Hook to print Rich traceback for test failures.""" if call.excinfo is None: return exc_type = call.excinfo.type exc_value = call.excinfo.value exc_tb = call.excinfo.tb frames = extract_traceback_frames(exc_tb) filtered_frames = filter_frames(frames) new_tb = rebuild_traceback(filtered_frames) tb = rich.traceback.Traceback.from_exception( exc_type, exc_value, new_tb, show_locals=False, max_frames=5, width=TRACEBACK_WIDTH, ) console.print("\n", tb) def pytest_addoption(parser): parser.addoption( "--driver", action="append", choices=drivers, dest="drivers", metavar="DRIVER", help="Driver to run tests against ({})".format(", ".join(drivers)), ) parser.addoption( "--browser-binary", action="store", dest="binary", help="Location of the browser binary", ) parser.addoption( "--driver-binary", action="store", dest="executable", help="Location of the service executable binary", ) parser.addoption( "--browser-args", action="store", dest="args", help="Arguments to start the browser with", ) parser.addoption( "--headless", action="store_true", dest="headless", help="Run tests in headless mode", ) parser.addoption( "--use-lan-ip", action="store_true", dest="use_lan_ip", help="Start test server with lan ip instead of localhost", ) parser.addoption( "--bidi", action="store_true", dest="bidi", help="Enable BiDi support", ) parser.addoption( "--remote", action="store_true", dest="remote", help="Run tests against a remote Grid server", ) def pytest_ignore_collect(collection_path, config): drivers_opt = config.getoption("drivers") _drivers = set(drivers).difference(drivers_opt or drivers) if drivers_opt: _drivers.add("unit") if len([d for d in _drivers if d.lower() in collection_path.parts]) > 0: return True return None def pytest_generate_tests(metafunc): if "driver" in metafunc.fixturenames and metafunc.config.option.drivers: metafunc.parametrize("driver", metafunc.config.option.drivers, indirect=True) selenium_driver = None class ContainerProtocol: def __contains__(self, name): if name.lower() in self.__dict__: return True return False @dataclass class SupportedDrivers(ContainerProtocol): chrome: str = "Chrome" firefox: str = "Firefox" safari: str = "Safari" edge: str = "Edge" ie: str = "Ie" webkitgtk: str = "WebKitGTK" wpewebkit: str = "WPEWebKit" @dataclass class SupportedOptions(ContainerProtocol): chrome: str = "ChromeOptions" firefox: str = "FirefoxOptions" edge: str = "EdgeOptions" safari: str = "SafariOptions" ie: str = "IeOptions" webkitgtk: str = "WebKitGTKOptions" wpewebkit: str = "WPEWebKitOptions" @dataclass class SupportedBidiDrivers(ContainerProtocol): chrome: str = "Chrome" firefox: str = "Firefox" edge: str = "Edge" class Driver: def __init__(self, driver_class, request): self.driver_class = driver_class self._request = request self._driver = None self._service = None self._server = None self.options = driver_class self.headless = driver_class self.bidi = driver_class @classmethod def clean_options(cls, driver_class, request): return cls(driver_class, request).options @property def supported_drivers(self): return SupportedDrivers() @property def supported_options(self): return SupportedOptions() @property def supported_bidi_drivers(self): return SupportedBidiDrivers() @property def driver_class(self): return self._driver_class @driver_class.setter def driver_class(self, cls_name): if cls_name.lower() not in self.supported_drivers: raise AttributeError(f"Invalid driver class {cls_name.lower()}") self._driver_class = getattr(self.supported_drivers, cls_name.lower()) @property def exe_platform(self): if sys.platform == "win32": return "Windows" elif sys.platform == "darwin": return "Darwin" elif sys.platform == "linux": return "Linux" else: return sys.platform.title() @property def browser_path(self): if self._request.config.option.binary: return self._request.config.option.binary return None @property def browser_args(self): if self._request.config.option.args: return self._request.config.option.args return None @property def driver_path(self): if self._request.config.option.executable: return self._request.config.option.executable return None @property def headless(self): return self._headless @headless.setter def headless(self, cls_name): self._headless = self._request.config.option.headless if self._headless: if cls_name.lower() == "chrome" or cls_name.lower() == "edge": self._options.add_argument("--headless") if cls_name.lower() == "firefox": self._options.add_argument("-headless") @property def bidi(self): return self._bidi @bidi.setter def bidi(self, cls_name): self._bidi = self._request.config.option.bidi if self._bidi: self._options.web_socket_url = True self._options.unhandled_prompt_behavior = "ignore" @property def options(self): return self._options @options.setter def options(self, cls_name): if cls_name.lower() not in self.supported_options: raise AttributeError(f"Invalid Options class {cls_name.lower()}") if self.driver_class == self.supported_drivers.firefox: self._options = getattr(webdriver, self.supported_options.firefox)() if self.exe_platform == "Linux": # There are issues with window size/position when running Firefox # under Wayland, so we use XWayland instead. os.environ["MOZ_ENABLE_WAYLAND"] = "0" else: opts_cls = getattr(self.supported_options, cls_name.lower()) self._options = getattr(webdriver, opts_cls)() if cls_name.lower() in ("chrome", "edge"): self._options.add_argument("--disable-dev-shm-usage") if self.is_remote: self._options.enable_downloads = True if self.browser_path or self.browser_args: if self.driver_class == self.supported_drivers.webkitgtk: self._options.overlay_scrollbars_enabled = False if self.browser_path is not None: self._options.binary_location = self.browser_path.strip("'") if self.browser_args is not None: for arg in self.browser_args.split(): self._options.add_argument(arg) @property def service(self): executable = self.driver_path if executable: module = getattr(webdriver, self.driver_class.lower()) self._service = module.service.Service(executable_path=executable) return self._service return None @property def driver(self): if self._driver is None: self._driver = self._initialize_driver() return self._driver @property def is_platform_valid(self): if self.driver_class.lower() == "safari" and self.exe_platform != "Darwin": return False if self.driver_class.lower() == "ie" and self.exe_platform != "Windows": return False if "webkit" in self.driver_class.lower() and self.exe_platform == "Windows": return False return True @property def is_remote(self): return self._request.config.getoption("remote") def _initialize_driver(self): kwargs = {} if self.options is not None: kwargs["options"] = self.options if self.is_remote: kwargs["command_executor"] = self._server.status_url.removesuffix("/status") return webdriver.Remote(**kwargs) if self.driver_path is not None: kwargs["service"] = self.service return getattr(webdriver, self.driver_class)(**kwargs) def stop_driver(self): driver_to_stop = self._driver self._driver = None if driver_to_stop is not None: driver_to_stop.quit() @pytest.fixture def driver(request, server): global selenium_driver driver_class = getattr(request, "param", "Chrome").lower() if selenium_driver is None: selenium_driver = Driver(driver_class, request) if server: selenium_driver._server = server # skip tests if not available on the platform if not selenium_driver.is_platform_valid: pytest.skip(f"{driver_class} tests can only run on {selenium_driver.exe_platform}") # skip tests in the 'remote' directory if not running with --remote flag if request.node.path.parts[-2] == "remote" and not selenium_driver.is_remote: pytest.skip("Remote tests require the --remote flag") # skip tests for drivers that don't support BiDi when --bidi is enabled if selenium_driver.bidi: if driver_class.lower() not in selenium_driver.supported_bidi_drivers: pytest.skip(f"{driver_class} does not support BiDi") # conditionally mark tests as expected to fail based on driver marker = request.node.get_closest_marker(f"xfail_{driver_class.lower()}") # Also check for xfail_remote when running with --remote if marker is None and selenium_driver.is_remote: marker = request.node.get_closest_marker("xfail_remote") if marker is not None: kwargs = dict(marker.kwargs) # Support condition kwarg - if condition is False, skip the xfail condition = kwargs.pop("condition", True) if callable(condition): condition = condition() if condition: if "run" in kwargs: if not kwargs["run"]: pytest.skip() yield return kwargs.pop("raises", None) pytest.xfail(**kwargs) # For BiDi tests, only restart driver when explicitly marked as needing fresh driver. # Tests marked with @pytest.mark.needs_fresh_driver get full driver restart for test isolation. # Cleanup after every test is recommended. if selenium_driver is not None and selenium_driver.bidi: if request.node.get_closest_marker("needs_fresh_driver"): request.addfinalizer(selenium_driver.stop_driver) else: def ensure_valid_window(): try: driver = selenium_driver._driver if driver: try: # Check if current window is still valid driver.current_window_handle except Exception: # restart driver selenium_driver.stop_driver() except Exception: pass request.addfinalizer(ensure_valid_window) # noqa: PT021 yield selenium_driver.driver if request.node.get_closest_marker("no_driver_after_test"): if selenium_driver is not None: try: selenium_driver.stop_driver() except WebDriverException: pass except Exception: raise selenium_driver = None @pytest.fixture(scope="session", autouse=True) def stop_driver(request): def fin(): global selenium_driver if selenium_driver is not None: selenium_driver.stop_driver() selenium_driver = None request.addfinalizer(fin) # noqa: PT021 def pytest_exception_interact(node, call, report): if report.failed: global selenium_driver if selenium_driver is not None: selenium_driver.stop_driver() selenium_driver = None @pytest.fixture def pages(driver, webserver): class Pages: def url(self, name, localhost=False): return webserver.where_is(name, localhost) def load(self, name): driver.get(self.url(name)) return Pages() @pytest.fixture(autouse=True, scope="session") def server(request): is_remote = request.config.getoption("remote") if not is_remote: yield None return r = Runfiles.Create() java_location_txt = r.Rlocation("_main/" + os.environ.get("SE_BAZEL_JAVA_LOCATION")) try: with open(java_location_txt, encoding="utf-8") as handle: read = handle.read().strip() rel_path = read[len("external/") :] if read.startswith("external/") else read java_path = r.Rlocation(rel_path) except Exception: java_path = None built_jar = "selenium/java/src/org/openqa/selenium/grid/selenium_server_deploy.jar" jar_path = r.Rlocation(built_jar) remote_env = os.environ.copy() if sys.platform == "linux": # There are issues with window size/position when running Firefox # under Wayland, so we use XWayland instead. remote_env["MOZ_ENABLE_WAYLAND"] = "0" server = Server(env=remote_env, startup_timeout=60) if Path(java_path).exists(): server.java_path = java_path if Path(jar_path).exists(): server.path = jar_path server.port = free_port() server.start() yield server server.stop() @pytest.fixture(autouse=True, scope="session") def webserver(request): host = get_lan_ip() if request.config.getoption("use_lan_ip") else None webserver = SimpleWebServer(host=host) webserver.start() yield webserver webserver.stop() @pytest.fixture def edge_service(): from selenium.webdriver.edge.service import Service as EdgeService return EdgeService @pytest.fixture def driver_executable(request): return request.config.option.executable @pytest.fixture def clean_driver(request): _supported_drivers = SupportedDrivers() try: driver_class = getattr(_supported_drivers, request.config.option.drivers[0].lower()) except (AttributeError, TypeError): raise Exception("This test requires a --driver to be specified.") driver_reference = getattr(webdriver, driver_class) # conditionally mark tests as expected to fail based on driver marker = request.node.get_closest_marker(f"xfail_{driver_class.lower()}") # Also check for xfail_remote when running with --remote if marker is None and request.config.getoption("remote"): marker = request.node.get_closest_marker("xfail_remote") if marker is not None: kwargs = dict(marker.kwargs) if "run" in kwargs: if not kwargs["run"]: pytest.skip() yield return kwargs.pop("raises", None) pytest.xfail(**kwargs) yield driver_reference if request.node.get_closest_marker("no_driver_after_test"): driver_reference = None @pytest.fixture def clean_service(request): driver_class = request.config.option.drivers[0].lower() selenium_driver = Driver(driver_class, request) return selenium_driver.service @pytest.fixture def clean_options(request): driver_class = request.config.option.drivers[0].lower() return Driver.clean_options(driver_class, request) @pytest.fixture def firefox_options(request): try: driver_class = request.config.option.drivers[0].lower() except (AttributeError, TypeError): raise Exception("This test requires a --driver to be specified") # skip if not Firefox if driver_class != "firefox": pytest.skip(f"This test requires Firefox. Got {driver_class}") # skip tests in the 'remote' directory if not running with --remote flag is_remote = request.config.getoption("remote") if request.node.path.parts[-2] == "remote" and not is_remote: pytest.skip("Remote tests require the --remote flag") options = Driver.clean_options("firefox", request) return options @pytest.fixture def chromium_options(request): try: driver_class = request.config.option.drivers[0].lower() except (AttributeError, TypeError): raise Exception("This test requires a --driver to be specified") # skip if not Chrome or Edge if driver_class not in ("chrome", "edge"): pytest.skip(f"This test requires Chrome or Edge. Got {driver_class}") # skip tests in the 'remote' directory if not running with --remote flag is_remote = request.config.getoption("remote") if request.node.path.parts[-2] == "remote" and not is_remote: pytest.skip("Remote tests require the --remote flag") options = Driver.clean_options(driver_class, request) return options @pytest.fixture def proxy_server(): """Creates HTTP proxy servers with custom response content, cleans up after the test.""" servers = [] def create_server(response_content=b"test response"): port = free_port() class CustomHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): self.send_response(200) self.end_headers() self.wfile.write(response_content) server = socketserver.TCPServer(("localhost", port), CustomHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() servers.append(server) return {"port": port, "server": server} yield create_server for server in servers: server.shutdown() server.server_close()