remove playwright wrapper

This commit is contained in:
oneflux 2025-04-21 20:14:23 -07:00
parent 8237cf4bcc
commit bca4090cd3
28 changed files with 0 additions and 5238 deletions

View file

@ -1,74 +0,0 @@
<div align="center">
# Camoufox Python Interface
#### Lightweight wrapper around the Playwright API to help launch Camoufox.
</div>
> [!NOTE]
> All the the latest documentation is avaliable [here](https://camoufox.com/python).
---
## What is this?
This Python library wraps around Playwright's API to help automatically generate & inject unique device characteristics (OS, CPU info, navigator, fonts, headers, screen dimensions, viewport size, WebGL, addons, etc.) into Camoufox.
It uses [BrowserForge](https://github.com/daijro/browserforge) under the hood to generate fingerprints that mimic the statistical distribution of device characteristics in real-world traffic.
In addition, it will also calculate your target geolocation, timezone, and locale to avoid proxy protection ([see demo](https://i.imgur.com/UhSHfaV.png)).
---
## Installation
First, install the `camoufox` package:
```bash
pip install -U camoufox[geoip]
```
The `geoip` parameter is optional, but heavily recommended if you are using proxies. It will download an extra dataset to determine the user's longitude, latitude, timezone, country, & locale.
Next, download the Camoufox browser:
**Windows**
```bash
camoufox fetch
```
**MacOS & Linux**
```bash
python3 -m camoufox fetch
```
To uninstall, run `camoufox remove`.
<details>
<summary>CLI options</summary>
```
Usage: python -m camoufox [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
fetch Fetch the latest version of Camoufox
path Display the path to the Camoufox executable
remove Remove all downloaded files
server Launch a Playwright server
test Open the Playwright inspector
version Display the current version
```
</details>
<hr width=50>
## Usage
All of the latest documentation is avaliable at [camoufox.com/python](https://camoufox.com/python).

View file

@ -1,13 +0,0 @@
from .addons import DefaultAddons
from .async_api import AsyncCamoufox, AsyncNewBrowser
from .sync_api import Camoufox, NewBrowser
from .utils import launch_options
__all__ = [
"Camoufox",
"NewBrowser",
"AsyncCamoufox",
"AsyncNewBrowser",
"DefaultAddons",
"launch_options",
]

View file

@ -1,171 +0,0 @@
"""
CLI package manager for Camoufox.
Adapted from https://github.com/daijro/hrequests/blob/main/hrequests/__main__.py
"""
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
from os import environ
from typing import Optional
import click
from .addons import DefaultAddons, maybe_download_addons
from .locale import ALLOW_GEOIP, download_mmdb, remove_mmdb
from .pkgman import INSTALL_DIR, CamoufoxFetcher, installed_verstr, rprint
try:
from browserforge.download import download as update_browserforge
except ImportError:
# Account for other Browserforge versions
from browserforge.download import Download as update_browserforge
class CamoufoxUpdate(CamoufoxFetcher):
"""
Checks & updates Camoufox
"""
def __init__(self) -> None:
"""
Initializes the CamoufoxUpdate class
"""
super().__init__()
self.current_verstr: Optional[str]
try:
self.current_verstr = installed_verstr()
except FileNotFoundError:
self.current_verstr = None
def is_updated_needed(self) -> bool:
# Camoufox is not installed
if self.current_verstr is None:
return True
# If the installed version is not the latest version
if self.current_verstr != self.verstr:
return True
return False
def update(self) -> None:
"""
Updates Camoufox if needed
"""
# Check if the version is the same as the latest available version
if not self.is_updated_needed():
rprint("Camoufox binaries up to date!", fg="green")
rprint(f"Current version: v{self.current_verstr}", fg="green")
return
# Download updated file
if self.current_verstr is not None:
# Display an updating message
rprint(
f"Updating Camoufox binaries from v{self.current_verstr} => v{self.verstr}",
fg="yellow",
)
else:
rprint(f"Fetching Camoufox binaries v{self.verstr}...", fg="yellow")
# Install the new version
self.install()
@click.group()
def cli() -> None:
pass
@cli.command(name='fetch')
@click.option(
'--browserforge', is_flag=True, help='Update browserforge\'s header and fingerprint definitions'
)
def fetch(browserforge=False) -> None:
"""
Fetch the latest version of Camoufox and optionally update Browserforge's database
"""
CamoufoxUpdate().update()
# Fetch the GeoIP database
if ALLOW_GEOIP:
download_mmdb()
# Download default addons
maybe_download_addons(list(DefaultAddons))
if browserforge:
update_browserforge(headers=True, fingerprints=True)
@cli.command(name='remove')
def remove() -> None:
"""
Remove all downloaded files
"""
if not CamoufoxUpdate().cleanup():
rprint("Camoufox binaries not found!", fg="red")
# Remove the GeoIP database
remove_mmdb()
@cli.command(name='test')
@click.argument('url', default=None, required=False)
def test(url: Optional[str] = None) -> None:
"""
Open the Playwright inspector
"""
from .sync_api import Camoufox
with Camoufox(headless=False, env=environ, config={'showcursor': False}) as browser:
page = browser.new_page()
if url:
page.goto(url)
page.pause() # Open the Playwright inspector
@cli.command(name='server')
def server() -> None:
"""
Launch a Playwright server
"""
from .server import launch_server
launch_server()
@cli.command(name='path')
def path() -> None:
"""
Display the path to the Camoufox executable
"""
rprint(INSTALL_DIR, fg="green")
@cli.command(name='version')
def version() -> None:
"""
Display the current version
"""
# python package version
try:
rprint(f"Pip package:\tv{pkg_version('camoufox')}", fg="green")
except PackageNotFoundError:
rprint("Pip package:\tNot installed!", fg="red")
updater = CamoufoxUpdate()
bin_ver = updater.current_verstr
# If binaries are not downloaded
if not bin_ver:
rprint("Camoufox:\tNot downloaded!", fg="red")
return
# Print the base version
rprint(f"Camoufox:\tv{bin_ver} ", fg="green", nl=False)
# Check for Camoufox updates
if updater.is_updated_needed():
rprint(f"(Latest supported: v{updater.verstr})", fg="red")
else:
rprint("(Up to date!)", fg="yellow")
if __name__ == '__main__':
cli()

View file

@ -1,19 +0,0 @@
"""
Camoufox version constants.
"""
class CONSTRAINTS:
"""
The minimum and maximum supported versions of the Camoufox browser.
"""
MIN_VERSION = 'beta.19'
MAX_VERSION = '1'
@staticmethod
def as_range() -> str:
"""
Returns the version range as a string.
"""
return f">={CONSTRAINTS.MIN_VERSION}, <{CONSTRAINTS.MAX_VERSION}"

View file

@ -1,89 +0,0 @@
import os
from enum import Enum
from multiprocessing import Lock
from typing import List, Optional
from .exceptions import InvalidAddonPath
from .pkgman import get_path, unzip, webdl
class DefaultAddons(Enum):
"""
Default addons to be downloaded
"""
UBO = "https://addons.mozilla.org/firefox/downloads/latest/ublock-origin/latest.xpi"
def confirm_paths(paths: List[str]) -> None:
"""
Confirms that the addon paths are valid
"""
for path in paths:
if not os.path.isdir(path):
raise InvalidAddonPath(path)
if not os.path.exists(os.path.join(path, 'manifest.json')):
raise InvalidAddonPath(
'manifest.json is missing. Addon path must be a path to an extracted addon.'
)
def add_default_addons(
addons_list: List[str], exclude_list: Optional[List[DefaultAddons]] = None
) -> None:
"""
Adds default addons, minus any specified in exclude_list, to addons_list
"""
# Build a dictionary from DefaultAddons, excluding keys found in exclude_list
if exclude_list is None:
exclude_list = []
addons = [addon for addon in DefaultAddons if addon not in exclude_list]
with Lock():
maybe_download_addons(addons, addons_list)
def download_and_extract(url: str, extract_path: str, name: str) -> None:
"""
Downloads and extracts an addon from a given URL to a specified path
"""
# Create a temporary file to store the downloaded zip
buffer = webdl(url, desc=f"Downloading addon ({name})", bar=False)
unzip(buffer, extract_path, f"Extracting addon ({name})", bar=False)
def get_addon_path(addon_name: str) -> str:
"""
Returns a path to the addon
"""
return get_path(os.path.join("addons", addon_name))
def maybe_download_addons(
addons: List[DefaultAddons], addons_list: Optional[List[str]] = None
) -> None:
"""
Downloads and extracts addons from a given dictionary to a specified list
Skips downloading if the addon is already downloaded
"""
for addon in addons:
# Get the addon path
addon_path = get_addon_path(addon.name)
# Check if the addon is already extracted
if os.path.exists(addon_path):
# Add the existing addon path to addons_list
if addons_list is not None:
addons_list.append(addon_path)
continue
# Addon doesn't exist, create directory and download
try:
os.makedirs(addon_path, exist_ok=True)
download_and_extract(addon.value, addon_path, addon.name)
# Add the new addon directory path to addons_list
if addons_list is not None:
addons_list.append(addon_path)
except Exception as e:
print(f"Failed to download and extract {addon.name}: {e}")

View file

@ -1,100 +0,0 @@
import asyncio
from functools import partial
from typing import Any, Dict, Optional, Union, overload
from playwright.async_api import (
Browser,
BrowserContext,
Playwright,
PlaywrightContextManager,
)
from typing_extensions import Literal
from camoufox.virtdisplay import VirtualDisplay
from .utils import async_attach_vd, launch_options
class AsyncCamoufox(PlaywrightContextManager):
"""
Wrapper around playwright.async_api.PlaywrightContextManager that automatically
launches a browser and closes it when the context manager is exited.
"""
def __init__(self, **launch_options):
super().__init__()
self.launch_options = launch_options
self.browser: Optional[Union[Browser, BrowserContext]] = None
async def __aenter__(self) -> Union[Browser, BrowserContext]:
_playwright = await super().__aenter__()
self.browser = await AsyncNewBrowser(_playwright, **self.launch_options)
return self.browser
async def __aexit__(self, *args: Any):
if self.browser:
await self.browser.close()
await super().__aexit__(*args)
@overload
async def AsyncNewBrowser(
playwright: Playwright,
*,
from_options: Optional[Dict[str, Any]] = None,
persistent_context: Literal[False] = False,
**kwargs,
) -> Browser: ...
@overload
async def AsyncNewBrowser(
playwright: Playwright,
*,
from_options: Optional[Dict[str, Any]] = None,
persistent_context: Literal[True],
**kwargs,
) -> BrowserContext: ...
async def AsyncNewBrowser(
playwright: Playwright,
*,
headless: Optional[Union[bool, Literal['virtual']]] = None,
from_options: Optional[Dict[str, Any]] = None,
persistent_context: bool = False,
debug: Optional[bool] = None,
**kwargs,
) -> Union[Browser, BrowserContext]:
"""
Launches a new browser instance for Camoufox given a set of launch options.
Parameters:
from_options (Dict[str, Any]):
A set of launch options generated by `launch_options()` to use
persistent_context (bool):
Whether to use a persistent context.
**kwargs:
All other keyword arugments passed to `launch_options()`.
"""
if headless == 'virtual':
virtual_display = VirtualDisplay(debug=debug)
kwargs['virtual_display'] = virtual_display.get()
headless = False
else:
virtual_display = None
if not from_options:
from_options = await asyncio.get_event_loop().run_in_executor(
None,
partial(launch_options, headless=headless, debug=debug, **kwargs),
)
# Persistent context
if persistent_context:
context = await playwright.firefox.launch_persistent_context(**from_options)
return await async_attach_vd(context, virtual_display)
# Browser
browser = await playwright.firefox.launch(**from_options)
return await async_attach_vd(browser, virtual_display)

View file

@ -1,68 +0,0 @@
# Mappings of Browserforge fingerprints to Camoufox config properties.
navigator:
# Note: Browserforge tends to have outdated UAs.
# The version will be replaced in Camoufox.
userAgent: navigator.userAgent
# userAgentData not in Firefox
doNotTrack: navigator.doNotTrack
appCodeName: navigator.appCodeName
appName: navigator.appName
appVersion: navigator.appVersion
oscpu: navigator.oscpu
# webdriver is always True
# Locale is now implemented separately:
# language: navigator.language
# languages: navigator.languages
platform: navigator.platform
# deviceMemory not in Firefox
hardwareConcurrency: navigator.hardwareConcurrency
product: navigator.product
# Never override productSub #105
# productSub: navigator.productSub
# vendor is not necessary
# vendorSub is not necessary
maxTouchPoints: navigator.maxTouchPoints
extraProperties:
# Note: Changing pdfViewerEnabled is not recommended. This will be kept to True.
globalPrivacyControl: navigator.globalPrivacyControl
screen:
# hasHDR is not implemented in Camoufox
availLeft: screen.availLeft
availTop: screen.availTop
availWidth: screen.availWidth
availHeight: screen.availHeight
height: screen.height
width: screen.width
colorDepth: screen.colorDepth
pixelDepth: screen.pixelDepth
# devicePixelRatio is not recommended. Any value other than 1.0 is suspicious.
pageXOffset: screen.pageXOffset
pageYOffset: screen.pageYOffset
outerHeight: window.outerHeight
outerWidth: window.outerWidth
innerHeight: window.innerHeight
innerWidth: window.innerWidth
screenX: window.screenX
screenY: window.screenY
# Tends to generate out of bounds (network inconsistencies):
# clientWidth: document.body.clientWidth
# clientHeight: document.body.clientHeight
# videoCard:
# renderer: webgl:renderer
# vendor: webgl:vendor
headers:
# headers.User-Agent is redundant with navigator.userAgent
# headers.Accept-Language is redundant with locale:*
Accept-Encoding: headers.Accept-Encoding
battery:
charging: battery:charging
chargingTime: battery:chargingTime
dischargingTime: battery:dischargingTime
# Unsupported: videoCodecs, audioCodecs, pluginsData, multimediaDevices
# Fonts are listed through the launcher.

View file

@ -1,196 +0,0 @@
class UnsupportedVersion(Exception):
"""
Raised when the Camoufox executable is outdated.
"""
...
class MissingRelease(Exception):
"""
Raised when a required GitHub release asset is missing.
"""
...
class UnsupportedArchitecture(Exception):
"""
Raised when the architecture is not supported.
"""
...
class UnsupportedOS(Exception):
"""
Raised when the OS is not supported.
"""
...
class UnknownProperty(Exception):
"""
Raised when the property is unknown.
"""
...
class InvalidPropertyType(Exception):
"""
Raised when the property type is invalid.
"""
...
class InvalidAddonPath(FileNotFoundError):
"""
Raised when the addon path is invalid.
"""
...
class InvalidDebugPort(ValueError):
"""
Raised when the debug port is invalid.
"""
...
class MissingDebugPort(ValueError):
"""
Raised when the debug port is missing.
"""
...
class LocaleError(Exception):
"""
Raised when the locale is invalid.
"""
...
class InvalidIP(Exception):
"""
Raised when an IP address is invalid.
"""
...
class InvalidProxy(Exception):
"""
Raised when a proxy is invalid.
"""
...
class UnknownIPLocation(LocaleError):
"""
Raised when the location of an IP is unknown.
"""
...
class InvalidLocale(LocaleError):
"""
Raised when the locale input is invalid.
"""
@classmethod
def invalid_input(cls, locale: str) -> 'InvalidLocale':
return cls(
f"Invalid locale: '{locale}'. Must be either a region, language, "
"language-region, or language-script-region."
)
class UnknownTerritory(InvalidLocale):
"""
Raised when the territory is unknown.
"""
...
class UnknownLanguage(InvalidLocale):
"""
Raised when the language is unknown.
"""
...
class NotInstalledGeoIPExtra(ImportError):
"""
Raised when the geoip2 module is not installed.
"""
...
class NonFirefoxFingerprint(Exception):
"""
Raised when a passed Browserforge fingerprint is invalid.
"""
...
class InvalidOS(ValueError):
"""
Raised when the target OS is invalid.
"""
...
class VirtualDisplayError(Exception):
"""
Raised when there is an error with the virtual display.
"""
...
class CannotFindXvfb(VirtualDisplayError):
"""
Raised when Xvfb cannot be found.
"""
...
pass
class CannotExecuteXvfb(VirtualDisplayError):
"""
Raised when Xvfb cannot be executed.
"""
...
class VirtualDisplayNotSupported(VirtualDisplayError):
"""
Raised when the user tried to use a virtual display on a non-Linux OS.
"""
...
class CamoufoxNotInstalled(FileNotFoundError):
"""
Raised when camoufox is not installed.
"""
...

View file

@ -1,142 +0,0 @@
import re
from dataclasses import asdict, dataclass
from random import randrange
from typing import Any, Dict, Optional, Tuple
from browserforge.fingerprints import (
Fingerprint,
FingerprintGenerator,
ScreenFingerprint,
)
from camoufox.pkgman import load_yaml
# Load the browserforge.yaml file
BROWSERFORGE_DATA = load_yaml('browserforge.yml')
FP_GENERATOR = FingerprintGenerator(browser='firefox', os=('linux', 'macos', 'windows'))
@dataclass
class ExtendedScreen(ScreenFingerprint):
"""
An extended version of Browserforge's ScreenFingerprint class
"""
screenY: Optional[int] = None
def _cast_to_properties(
camoufox_data: Dict[str, Any],
cast_enum: Dict[str, Any],
bf_dict: Dict[str, Any],
ff_version: Optional[str] = None,
) -> None:
"""
Casts Browserforge fingerprints to Camoufox config properties.
"""
for key, data in bf_dict.items():
# Ignore non-truthy values
if not data:
continue
# Get the associated Camoufox property
type_key = cast_enum.get(key)
if not type_key:
continue
# If the value is a dictionary, recursively recall
if isinstance(data, dict):
_cast_to_properties(camoufox_data, type_key, data, ff_version)
continue
# Fix values that are out of bounds
if type_key.startswith("screen.") and isinstance(data, int) and data < 0:
data = 0
# Replace the Firefox versions with ff_version
if ff_version and isinstance(data, str):
data = re.sub(r'(?<!\d)(1[0-9]{2})(\.0)(?!\d)', rf'{ff_version}\2', data)
camoufox_data[type_key] = data
def handle_screenXY(camoufox_data: Dict[str, Any], fp_screen: ScreenFingerprint) -> None:
"""
Helper method to set window.screenY based on Browserforge's screenX value.
"""
# Skip if manually provided
if 'window.screenY' in camoufox_data:
return
# Default screenX to 0 if not provided
screenX = fp_screen.screenX
if not screenX:
camoufox_data['window.screenX'] = 0
camoufox_data['window.screenY'] = 0
return
# If screenX is within [-50, 50], use the same value for screenY
if screenX in range(-50, 51):
camoufox_data['window.screenY'] = screenX
return
# Browserforge thinks the browser is windowed. # Randomly generate a screenY value.
screenY = fp_screen.availHeight - fp_screen.outerHeight
if screenY == 0:
camoufox_data['window.screenY'] = 0
elif screenY > 0:
camoufox_data['window.screenY'] = randrange(0, screenY) # nosec
else:
camoufox_data['window.screenY'] = randrange(screenY, 0) # nosec
def from_browserforge(fingerprint: Fingerprint, ff_version: Optional[str] = None) -> Dict[str, Any]:
"""
Converts a Browserforge fingerprint to a Camoufox config.
"""
camoufox_data: Dict[str, Any] = {}
_cast_to_properties(
camoufox_data,
cast_enum=BROWSERFORGE_DATA,
bf_dict=asdict(fingerprint),
ff_version=ff_version,
)
handle_screenXY(camoufox_data, fingerprint.screen)
return camoufox_data
def handle_window_size(fp: Fingerprint, outer_width: int, outer_height: int) -> None:
"""
Helper method to set a custom outer window size, and center it in the screen
"""
# Cast the screen to an ExtendedScreen
fp.screen = ExtendedScreen(**asdict(fp.screen))
sc = fp.screen
# Center the window on the screen
sc.screenX += (sc.width - outer_width) // 2
sc.screenY = (sc.height - outer_height) // 2
# Update inner dimensions if set
if sc.innerWidth:
sc.innerWidth = max(outer_width - sc.outerWidth + sc.innerWidth, 0)
if sc.innerHeight:
sc.innerHeight = max(outer_height - sc.outerHeight + sc.innerHeight, 0)
# Set outer dimensions
sc.outerWidth = outer_width
sc.outerHeight = outer_height
def generate_fingerprint(window: Optional[Tuple[int, int]] = None, **config) -> Fingerprint:
"""
Generates a Firefox fingerprint with Browserforge.
"""
if window: # User-specified outer window size
fingerprint = FP_GENERATOR.generate(**config)
handle_window_size(fingerprint, *window)
return fingerprint
return FP_GENERATOR.generate(**config)
if __name__ == "__main__":
from pprint import pprint
fp = generate_fingerprint()
pprint(from_browserforge(fp))

File diff suppressed because one or more lines are too long

View file

@ -1,120 +0,0 @@
import re
import warnings
from contextlib import contextmanager
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, Optional, Tuple
import requests
from urllib3.exceptions import InsecureRequestWarning
from .exceptions import InvalidIP, InvalidProxy
"""
Helpers to find the user's public IP address for geolocation.
"""
@dataclass
class Proxy:
"""
Stores proxy information.
"""
server: str
username: Optional[str] = None
password: Optional[str] = None
bypass: Optional[str] = None
@staticmethod
def parse_server(server: str) -> Tuple[str, str, Optional[str]]:
"""
Parses the proxy server string.
"""
proxy_match = re.match(r'^(?:(?P<schema>\w+)://)?(?P<url>.*?)(?:\:(?P<port>\d+))?$', server)
if not proxy_match:
raise InvalidProxy(f"Invalid proxy server: {server}")
return proxy_match['schema'], proxy_match['url'], proxy_match['port']
def as_string(self) -> str:
schema, url, port = self.parse_server(self.server)
if not schema:
schema = 'http'
result = f"{schema}://"
if self.username:
result += f"{self.username}"
if self.password:
result += f":{self.password}"
result += "@"
result += url
if port:
result += f":{port}"
return result
@staticmethod
def as_requests_proxy(proxy_string: str) -> Dict[str, str]:
"""
Converts the proxy to a requests proxy dictionary.
"""
return {
'http': proxy_string,
'https': proxy_string,
}
@lru_cache(128, typed=True)
def valid_ipv4(ip: str) -> bool:
return bool(re.match(r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$', ip))
@lru_cache(128, typed=True)
def valid_ipv6(ip: str) -> bool:
return bool(re.match(r'^(([0-9a-fA-F]{0,4}:){1,7}[0-9a-fA-F]{0,4})$', ip))
def validate_ip(ip: str) -> None:
if not valid_ipv4(ip) and not valid_ipv6(ip):
raise InvalidIP(f"Invalid IP address: {ip}")
@contextmanager
def _suppress_insecure_warning():
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
yield
@lru_cache(maxsize=None)
def public_ip(proxy: Optional[str] = None) -> str:
"""
Sends a request to a public IP api
"""
URLS = [
# Prefers IPv4
"https://api.ipify.org",
"https://checkip.amazonaws.com",
"https://ipinfo.io/ip",
# IPv4 & IPv6
"https://icanhazip.com",
"https://ifconfig.co/ip",
"https://ipecho.net/plain",
]
exception = None
for url in URLS:
try:
with _suppress_insecure_warning():
resp = requests.get( # nosec
url,
proxies=Proxy.as_requests_proxy(proxy) if proxy else None,
timeout=5,
verify=False,
)
resp.raise_for_status()
ip = resp.text.strip()
validate_ip(ip)
return ip
except (requests.exceptions.ProxyError, requests.RequestException, InvalidIP) as exception:
pass
raise InvalidIP(f"Failed to get IP address: {exception}")

View file

@ -1,40 +0,0 @@
// Workaround that accesses Playwright's undocumented `launchServer` method in Python
// Without having to use the Node.js Playwright library.
const { BrowserServerLauncherImpl } = require(`${process.cwd()}/lib/browserServerImpl.js`)
function collectData() {
return new Promise((resolve) => {
let data = '';
process.stdin.setEncoding('utf8');
process.stdin.on('data', (chunk) => {
data += chunk;
});
process.stdin.on('end', () => {
resolve(JSON.parse(Buffer.from(data, "base64").toString()));
});
});
}
collectData().then((options) => {
console.time('Server launched');
console.info('Launching server...');
const server = new BrowserServerLauncherImpl('firefox')
// Call Playwright's `launchServer` method
server.launchServer(options).then(browserServer => {
console.timeEnd('Server launched');
console.log('Websocket endpoint:\x1b[93m', browserServer.wsEndpoint(), '\x1b[0m');
// Continue forever
process.stdin.resume();
}).catch(error => {
console.error('Error launching server:', error.message);
process.exit(1);
});
}).catch((error) => {
console.error('Error collecting data:', error.message);
process.exit(1); // Exit with error code
});

View file

@ -1,392 +0,0 @@
import xml.etree.ElementTree as ET # nosec
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
import numpy as np
from language_tags import tags
from camoufox.pkgman import LOCAL_DATA, GitHubDownloader, rprint, webdl
from camoufox.warnings import LeakWarning
from .exceptions import (
InvalidLocale,
MissingRelease,
NotInstalledGeoIPExtra,
UnknownIPLocation,
UnknownLanguage,
UnknownTerritory,
)
from .ip import validate_ip
try:
import geoip2.database # type: ignore
except ImportError:
ALLOW_GEOIP = False
else:
ALLOW_GEOIP = True
"""
Data structures for locale and geolocation info
"""
@dataclass
class Locale:
"""
Stores locale, region, and script information.
"""
language: str
region: Optional[str] = None
script: Optional[str] = None
@property
def as_string(self) -> str:
if self.region:
return f"{self.language}-{self.region}"
return self.language
def as_config(self) -> Dict[str, str]:
"""
Converts the locale to a intl config dictionary.
"""
assert self.region
data = {
'locale:region': self.region,
'locale:language': self.language,
}
if self.script:
data['locale:script'] = self.script
return data
@dataclass(frozen=True)
class Geolocation:
"""
Stores geolocation information.
"""
locale: Locale
longitude: float
latitude: float
timezone: str
accuracy: Optional[float] = None
def as_config(self) -> Dict[str, Any]:
"""
Converts the geolocation to a config dictionary.
"""
data = {
'geolocation:longitude': self.longitude,
'geolocation:latitude': self.latitude,
'timezone': self.timezone,
**self.locale.as_config(),
}
if self.accuracy:
data['geolocation:accuracy'] = self.accuracy
return data
"""
Helpers to validate and normalize locales
"""
def verify_locale(loc: str) -> None:
"""
Verifies that a locale is valid.
Takes either language-region or language.
"""
if tags.check(loc):
return
raise InvalidLocale.invalid_input(loc)
def normalize_locale(locale: str) -> Locale:
"""
Normalizes and validates a locale code.
"""
verify_locale(locale)
# Parse the locale
parser = tags.tag(locale)
if not parser.region:
raise InvalidLocale.invalid_input(locale)
record = parser.language.data['record']
# Return a formatted locale object
return Locale(
language=record['Subtag'],
region=parser.region.data['record']['Subtag'],
script=record.get('Suppress-Script'),
)
def handle_locale(locale: str, ignore_region: bool = False) -> Locale:
"""
Handles a locale input, normalizing it if necessary.
"""
# If the user passed in `language-region` or `language-script-region`, normalize it.
if len(locale) > 3:
return normalize_locale(locale)
# Case: user passed in `region` and needs a full locale
try:
return SELECTOR.from_region(locale)
except UnknownTerritory:
pass
# Case: user passed in `language`, and doesn't care about the region
if ignore_region:
verify_locale(locale)
return Locale(language=locale)
# Case: user passed in `language` and wants a region
try:
language = SELECTOR.from_language(locale)
except UnknownLanguage:
pass
else:
LeakWarning.warn('no_region')
return language
# Locale is not in a valid format.
raise InvalidLocale.invalid_input(locale)
def handle_locales(locales: Union[str, List[str]], config: Dict[str, Any]) -> None:
"""
Handles a list of locales.
"""
if isinstance(locales, str):
locales = [loc.strip() for loc in locales.split(',')]
# First, handle the first locale. This will be used for the intl api.
intl_locale = handle_locale(locales[0])
config.update(intl_locale.as_config())
if len(locales) < 2:
return
# If additional locales were passed, validate them.
# Note: in this case, we do not need the region.
config['locale:all'] = _join_unique(
handle_locale(locale, ignore_region=True).as_string for locale in locales
)
def _join_unique(seq: Iterable[str]) -> str:
"""
Joins a sequence of strings without duplicates
"""
seen: Set[str] = set()
return ', '.join(x for x in seq if not (x in seen or seen.add(x)))
"""
Helpers to fetch geolocation, timezone, and locale data given an IP.
"""
MMDB_FILE = LOCAL_DATA / 'GeoLite2-City.mmdb'
MMDB_REPO = "P3TERX/GeoLite.mmdb"
class MaxMindDownloader(GitHubDownloader):
"""
MaxMind database downloader from a GitHub repository.
"""
def check_asset(self, asset: Dict) -> Optional[str]:
# Check for the first -City.mmdb file
if asset['name'].endswith('-City.mmdb'):
return asset['browser_download_url']
return None
def missing_asset_error(self) -> None:
raise MissingRelease('Failed to find GeoIP database release asset')
def geoip_allowed() -> None:
"""
Checks if the geoip2 module is available.
"""
if not ALLOW_GEOIP:
raise NotInstalledGeoIPExtra(
'Please install the geoip extra to use this feature: pip install camoufox[geoip]'
)
def download_mmdb() -> None:
"""
Downloads the MaxMind GeoIP2 database.
"""
geoip_allowed()
asset_url = MaxMindDownloader(MMDB_REPO).get_asset()
with open(MMDB_FILE, 'wb') as f:
webdl(
asset_url,
desc='Downloading GeoIP database',
buffer=f,
)
def remove_mmdb() -> None:
"""
Removes the MaxMind GeoIP2 database.
"""
if not MMDB_FILE.exists():
rprint("GeoIP database not found.")
return
MMDB_FILE.unlink()
rprint("GeoIP database removed.")
def get_geolocation(ip: str) -> Geolocation:
"""
Gets the geolocation for an IP address.
"""
# Check if the database is downloaded
if not MMDB_FILE.exists():
download_mmdb()
# Validate the IP address
validate_ip(ip)
with geoip2.database.Reader(str(MMDB_FILE)) as reader:
resp = reader.city(ip)
iso_code = cast(str, resp.registered_country.iso_code).upper()
location = resp.location
# Check if any required attributes are missing
if any(not getattr(location, attr) for attr in ('longitude', 'latitude', 'time_zone')):
raise UnknownIPLocation(f"Unknown IP location: {ip}")
# Get a statistically correct locale based on the country code
locale = SELECTOR.from_region(iso_code)
return Geolocation(
locale=locale,
longitude=cast(float, resp.location.longitude),
latitude=cast(float, resp.location.latitude),
timezone=cast(str, resp.location.time_zone),
)
"""
Gets a random language based on the territory code.
"""
def get_unicode_info() -> ET.Element:
"""
Fetches supplemental data from the territoryInfo.xml file.
Source: https://raw.githubusercontent.com/unicode-org/cldr/master/common/supplemental/supplementalData.xml
"""
with open(LOCAL_DATA / 'territoryInfo.xml', 'rb') as f:
data = ET.XML(f.read())
assert data is not None, 'Failed to load territoryInfo.xml'
return data
def _as_float(element: ET.Element, attr: str) -> float:
"""
Converts an attribute to a float.
"""
return float(element.get(attr, 0))
class StatisticalLocaleSelector:
"""
Selects a random locale based on statistical data.
Takes either a territory code or a language code, and generates a Locale object.
"""
def __init__(self):
self.root = get_unicode_info()
def _load_territory_data(self, iso_code: str) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculates a random language based on the territory code,
based on the probability that a person speaks the language in the territory.
"""
territory = self.root.find(f"territory[@type='{iso_code}']")
if territory is None:
raise UnknownTerritory(f"Unknown territory: {iso_code}")
lang_populations = territory.findall('languagePopulation')
if not lang_populations:
raise ValueError(f"No language data found for region: {iso_code}")
languages = np.array([lang.get('type') for lang in lang_populations])
percentages = np.array([_as_float(lang, 'populationPercent') for lang in lang_populations])
return self.normalize_probabilities(languages, percentages)
def _load_language_data(self, language: str) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculates a random region for a language
based on the total speakers of the language in that region.
"""
territories = self.root.findall(f'.//territory/languagePopulation[@type="{language}"]/..')
if not territories:
raise UnknownLanguage(f"No region data found for language: {language}")
regions = []
percentages = []
for terr in territories:
region = terr.get('type')
if region is None:
continue # Skip if region is not found
lang_pop = terr.find(f'languagePopulation[@type="{language}"]')
if lang_pop is None:
continue # This shouldn't happen due to our XPath, but just in case
regions.append(region)
percentages.append(
_as_float(lang_pop, 'populationPercent')
* _as_float(terr, 'literacyPercent')
/ 10_000
* _as_float(terr, 'population')
)
if not regions:
raise ValueError(f"No valid region data found for language: {language}")
return self.normalize_probabilities(np.array(regions), np.array(percentages))
def normalize_probabilities(
self, languages: np.ndarray, freq: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Normalize probabilities.
"""
total = np.sum(freq)
return languages, freq / total
def from_region(self, region: str) -> Locale:
"""
Get a random locale based on the territory ISO code.
Returns as a Locale object.
"""
languages, probabilities = self._load_territory_data(region)
language = np.random.choice(languages, p=probabilities).replace('_', '-')
return normalize_locale(f"{language}-{region}")
def from_language(self, language: str) -> Locale:
"""
Get a random locale based on the language.
Returns as a Locale object.
"""
regions, probabilities = self._load_language_data(language)
region = np.random.choice(regions, p=probabilities)
return normalize_locale(f"{language}-{region}")
SELECTOR = StatisticalLocaleSelector()

View file

@ -1,529 +0,0 @@
import os
import platform
import re
import shlex
import shutil
import sys
import tempfile
from dataclasses import dataclass
from functools import total_ordering
from io import BufferedWriter, BytesIO
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from zipfile import ZipFile
import click
import orjson
import requests
from platformdirs import user_cache_dir
from tqdm import tqdm
from typing_extensions import TypeAlias
from yaml import CLoader, load
from .__version__ import CONSTRAINTS
from .exceptions import (
CamoufoxNotInstalled,
MissingRelease,
UnsupportedArchitecture,
UnsupportedOS,
UnsupportedVersion,
)
DownloadBuffer: TypeAlias = Union[BytesIO, tempfile._TemporaryFileWrapper, BufferedWriter]
# Map machine architecture to Camoufox binary name
ARCH_MAP: Dict[str, str] = {
'amd64': 'x86_64',
'x86_64': 'x86_64',
'x86': 'x86_64',
'i686': 'i686',
'i386': 'i686',
'arm64': 'arm64',
'aarch64': 'arm64',
'armv5l': 'arm64',
'armv6l': 'arm64',
'armv7l': 'arm64',
}
OS_MAP: Dict[str, Literal['mac', 'win', 'lin']] = {'darwin': 'mac', 'linux': 'lin', 'win32': 'win'}
if sys.platform not in OS_MAP:
raise UnsupportedOS(f"OS {sys.platform} is not supported")
OS_NAME: Literal['mac', 'win', 'lin'] = OS_MAP[sys.platform]
INSTALL_DIR: Path = Path(user_cache_dir("camoufox"))
LOCAL_DATA: Path = Path(os.path.abspath(__file__)).parent
# The supported architectures for each OS
OS_ARCH_MATRIX: Dict[str, List[str]] = {
'win': ['x86_64', 'i686'],
'mac': ['x86_64', 'arm64'],
'lin': ['x86_64', 'arm64', 'i686'],
}
# The relative path to the camoufox executable
LAUNCH_FILE = {
'win': 'camoufox.exe',
'mac': '../MacOS/camoufox',
'lin': 'camoufox-bin',
}
def rprint(*a, **k):
click.secho(*a, **k, bold=True)
@total_ordering
@dataclass
class Version:
"""
A version string that can be compared to other version strings.
Stores versions up to 5 parts.
"""
release: str
version: Optional[str] = None
def __post_init__(self) -> None:
# Build an internal sortable structure
self.sorted_rel = tuple(
[
*(int(x) if x.isdigit() else ord(x[0]) - 1024 for x in self.release.split('.')),
*(0 for _ in range(5 - self.release.count('.'))),
]
)
@property
def full_string(self) -> str:
return f"{self.version}-{self.release}"
def __eq__(self, other) -> bool:
return self.sorted_rel == other.sorted_rel
def __lt__(self, other) -> bool:
return self.sorted_rel < other.sorted_rel
def is_supported(self) -> bool:
return VERSION_MIN <= self < VERSION_MAX
@staticmethod
def from_path(path: Optional[Path] = None) -> 'Version':
"""
Get the version from the given path.
"""
version_path = (path or INSTALL_DIR) / 'version.json'
if not os.path.exists(version_path):
raise FileNotFoundError(
f"Version information not found at {version_path}. "
"Please run `camoufox fetch` to install."
)
with open(version_path, 'rb') as f:
version_data = orjson.loads(f.read())
return Version(**version_data)
@staticmethod
def is_supported_path(path: Path) -> bool:
"""
Check if the version at the given path is supported.
"""
return Version.from_path(path) >= VERSION_MIN
@staticmethod
def build_minmax() -> Tuple['Version', 'Version']:
return Version(release=CONSTRAINTS.MIN_VERSION), Version(release=CONSTRAINTS.MAX_VERSION)
# The minimum and maximum supported versions
VERSION_MIN, VERSION_MAX = Version.build_minmax()
class GitHubDownloader:
"""
Manages fetching and installing GitHub releases.
"""
def __init__(self, github_repo: str) -> None:
self.github_repo = github_repo
self.api_url = f"https://api.github.com/repos/{github_repo}/releases"
def check_asset(self, asset: Dict) -> Any:
"""
Compare the asset to determine if it's the desired asset.
Args:
asset: Asset information from GitHub API
Returns:
Any: Data to be returned if this is the desired asset, or None/False if not
"""
return asset.get('browser_download_url')
def missing_asset_error(self) -> None:
"""
Raise a MissingRelease exception if no release is found.
"""
raise MissingRelease(f"Could not find a release asset in {self.github_repo}.")
def get_asset(self) -> Any:
"""
Fetch the latest release from the GitHub API.
Gets the first asset that returns a truthy value from check_asset.
"""
resp = requests.get(self.api_url, timeout=20)
resp.raise_for_status()
releases = resp.json()
for release in releases:
for asset in release['assets']:
if data := self.check_asset(asset):
return data
self.missing_asset_error()
class CamoufoxFetcher(GitHubDownloader):
"""
Handles fetching and installing the latest version of Camoufox.
"""
def __init__(self) -> None:
super().__init__("daijro/camoufox")
self.arch = self.get_platform_arch()
self._version_obj: Optional[Version] = None
self.pattern: re.Pattern = re.compile(
rf'camoufox-(?P<version>.+)-(?P<release>.+)-{OS_NAME}\.{self.arch}\.zip'
)
self.fetch_latest()
def check_asset(self, asset: Dict) -> Optional[Tuple[Version, str]]:
"""
Finds the latest release from a GitHub releases API response that
supports the Camoufox version constraints, the OS, and architecture.
Returns:
Optional[Tuple[Version, str]]: The version and URL of a release
"""
# Search through releases for the first supported version
match = self.pattern.match(asset['name'])
if not match:
return None
# Check if the version is supported
version = Version(release=match['release'], version=match['version'])
if not version.is_supported():
return None
# Asset was found. Return data
return version, asset['browser_download_url']
def missing_asset_error(self) -> None:
"""
Raise a MissingRelease exception if no release is found.
"""
raise MissingRelease(
f"No matching release found for {OS_NAME} {self.arch} in the "
f"supported range: ({CONSTRAINTS.as_range()}). "
"Please update the Python library."
)
@staticmethod
def get_platform_arch() -> str:
"""
Get the current platform and architecture information.
Returns:
str: The architecture of the current platform
Raises:
UnsupportedArchitecture: If the current architecture is not supported
"""
# Check if the architecture is supported for the OS
plat_arch = platform.machine().lower()
if plat_arch not in ARCH_MAP:
raise UnsupportedArchitecture(f"Architecture {plat_arch} is not supported")
arch = ARCH_MAP[plat_arch]
# Check if the architecture is supported for the OS
if arch not in OS_ARCH_MATRIX[OS_NAME]:
raise UnsupportedArchitecture(f"Architecture {arch} is not supported for {OS_NAME}")
return arch
def fetch_latest(self) -> None:
"""
Fetch the URL of the latest camoufox release for the current platform.
Sets the version, release, and url properties.
Raises:
requests.RequestException: If there's an error fetching release data
ValueError: If no matching release is found for the current platform
"""
release_data = self.get_asset()
# Set the version and URL
self._version_obj, self._url = release_data
@staticmethod
def download_file(file: DownloadBuffer, url: str) -> DownloadBuffer:
"""
Download a file from the given URL and return it as BytesIO.
Args:
file (DownloadBuffer): The buffer to download to
url (str): The URL to download the file from
Returns:
DownloadBuffer: The downloaded file content as a BytesIO object
"""
rprint(f'Downloading package: {url}')
return webdl(url, buffer=file)
def extract_zip(self, zip_file: DownloadBuffer) -> None:
"""
Extract the contents of a zip file to the installation directory.
Args:
zip_file (DownloadBuffer): The zip file content as a BytesIO object
"""
rprint(f'Extracting Camoufox: {INSTALL_DIR}')
unzip(zip_file, str(INSTALL_DIR))
@staticmethod
def cleanup() -> bool:
"""
Clean up the old installation.
"""
if INSTALL_DIR.exists():
rprint(f'Cleaning up cache: {INSTALL_DIR}')
shutil.rmtree(INSTALL_DIR)
return True
return False
def set_version(self) -> None:
"""
Set the version in the INSTALL_DIR/version.json file
"""
with open(INSTALL_DIR / 'version.json', 'wb') as f:
f.write(orjson.dumps({'version': self.version, 'release': self.release}))
def install(self) -> None:
"""
Download and install the latest version of camoufox.
Raises:
Exception: If any error occurs during the installation process
"""
# Clean up old installation
self.cleanup()
try:
# Install to directory
INSTALL_DIR.mkdir(parents=True, exist_ok=True)
# Fetch the latest zip
with tempfile.NamedTemporaryFile() as temp_file:
self.download_file(temp_file, self.url)
self.extract_zip(temp_file)
self.set_version()
# Set permissions on INSTALL_DIR
if OS_NAME != 'win':
os.system(f'chmod -R 755 {shlex.quote(str(INSTALL_DIR))}') # nosec
rprint('\nCamoufox successfully installed.', fg="yellow")
except Exception as e:
rprint(f"Error installing Camoufox: {str(e)}")
self.cleanup()
raise
@property
def url(self) -> str:
"""
Url of the fetched latest version of camoufox.
Returns:
str: The version of the installed camoufox
Raises:
ValueError: If the version is not available (fetch_latest not ran)
"""
if self._url is None:
raise ValueError("Url is not available. Make sure to run fetch_latest first.")
return self._url
@property
def version(self) -> str:
"""
Version of the fetched latest version of camoufox.
Returns:
str: The version of the installed camoufox
Raises:
ValueError: If the version is not available (fetch_latest not ran)
"""
if self._version_obj is None or not self._version_obj.version:
raise ValueError("Version is not available. Make sure to run the fetch_latest first.")
return self._version_obj.version
@property
def release(self) -> str:
"""
Release of the fetched latest version of camoufox.
Returns:
str: The release of the installed camoufox
Raises:
ValueError: If the release information is not available (fetch_latest not ran)
"""
if self._version_obj is None:
raise ValueError(
"Release information is not available. Make sure to run the installation first."
)
return self._version_obj.release
@property
def verstr(self) -> str:
"""
Fetches the version and release in version-release format
Returns:
str: The version of the installed camoufox
"""
if self._version_obj is None:
raise ValueError("Version is not available. Make sure to run the installation first.")
return self._version_obj.full_string
def installed_verstr() -> str:
"""
Get the full version string of the installed camoufox.
"""
return Version.from_path().full_string
def camoufox_path(download_if_missing: bool = True) -> Path:
"""
Full path to the camoufox folder.
"""
# Ensure the directory exists and is not empty
if not os.path.exists(INSTALL_DIR) or not os.listdir(INSTALL_DIR):
if not download_if_missing:
raise FileNotFoundError(f"Camoufox executable not found at {INSTALL_DIR}")
# Camoufox exists and the the version is supported
elif os.path.exists(INSTALL_DIR) and Version.from_path().is_supported():
return INSTALL_DIR
# Ensure the version is supported
else:
if not download_if_missing:
raise UnsupportedVersion("Camoufox executable is outdated.")
# Install and recheck
CamoufoxFetcher().install()
return camoufox_path()
def get_path(file: str) -> str:
"""
Get the path to the camoufox executable.
"""
if OS_NAME == 'mac':
return os.path.abspath(camoufox_path() / 'Camoufox.app' / 'Contents' / 'Resources' / file)
return str(camoufox_path() / file)
def launch_path() -> str:
"""
Get the path to the camoufox executable.
"""
launch_path = get_path(LAUNCH_FILE[OS_NAME])
if not os.path.exists(launch_path):
# Not installed error
raise CamoufoxNotInstalled(
f"Camoufox is not installed at {camoufox_path()}. Please run `camoufox fetch` to install."
)
return launch_path
def webdl(
url: str,
desc: Optional[str] = None,
buffer: Optional[DownloadBuffer] = None,
bar: bool = True,
) -> DownloadBuffer:
"""
Download a file from the given URL and return it as BytesIO.
Args:
url (str): The URL to download the file from
buffer (Optional[BytesIO]): A BytesIO object to store the downloaded file
bar (bool): Whether to show the progress bar
Returns:
DownloadBuffer: The downloaded file content as a BytesIO object
Raises:
requests.RequestException: If there's an error downloading the file
"""
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
block_size = 8192
if buffer is None:
buffer = BytesIO()
with tqdm(
total=total_size,
unit='iB',
bar_format=None if bar else '{desc}: {percentage:3.0f}%',
unit_scale=True,
desc=desc,
) as progress_bar:
for data in response.iter_content(block_size):
size = buffer.write(data)
progress_bar.update(size)
buffer.seek(0)
return buffer
def unzip(
zip_file: DownloadBuffer,
extract_path: str,
desc: Optional[str] = None,
bar: bool = True,
) -> None:
"""
Extract the contents of a zip file to the installation directory.
Args:
zip_file (BytesIO): The zip file content as a BytesIO object
Raises:
zipfile.BadZipFile: If the zip file is invalid or corrupted
OSError: If there's an error creating directories or writing files
"""
with ZipFile(zip_file) as zf:
for member in tqdm(
zf.infolist(), desc=desc, bar_format=None if bar else '{desc}: {percentage:3.0f}%'
):
zf.extract(member, extract_path)
def load_yaml(file: str) -> Dict[str, Any]:
"""
Loads a local YAML file and returns it as a dictionary.
"""
with open(LOCAL_DATA / file, 'r') as f:
return load(f, Loader=CLoader)

View file

@ -1 +0,0 @@

View file

@ -1,71 +0,0 @@
import subprocess
from pathlib import Path
from typing import Any, Dict, NoReturn, Tuple, Union
import base64
import orjson
from playwright._impl._driver import compute_driver_executable
from camoufox.pkgman import LOCAL_DATA
from camoufox.utils import launch_options
LAUNCH_SCRIPT: Path = LOCAL_DATA / "launchServer.js"
def camel_case(snake_str: str) -> str:
"""
Convert a string to camelCase
"""
if len(snake_str) < 2:
return snake_str
camel_case_str = ''.join(x.capitalize() for x in snake_str.lower().split('_'))
return camel_case_str[0].lower() + camel_case_str[1:]
def to_camel_case_dict(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert a dictionary to camelCase
"""
return {camel_case(key): value for key, value in data.items()}
def get_nodejs() -> str:
"""
Get the bundled Node.js executable
"""
# Note: Older versions of Playwright return a string rather than a tuple.
_nodejs: Union[str, Tuple[str, ...]] = compute_driver_executable()[0]
if isinstance(_nodejs, tuple):
return _nodejs[0]
return _nodejs
def launch_server(**kwargs) -> NoReturn:
"""
Launch a Playwright server. Takes the same arguments as `Camoufox()`.
Prints the websocket endpoint to the console.
"""
config = launch_options(**kwargs)
nodejs = get_nodejs()
data = orjson.dumps(to_camel_case_dict(config))
process = subprocess.Popen( # nosec
[
nodejs,
str(LAUNCH_SCRIPT),
],
cwd=Path(nodejs).parent / "package",
stdin=subprocess.PIPE,
text=True,
)
# Write data to stdin and close the stream
if process.stdin:
process.stdin.write(base64.b64encode(data).decode())
process.stdin.close()
# Wait forever
process.wait()
# Add an explicit return statement to satisfy the NoReturn type hint
raise RuntimeError("Server process terminated unexpectedly")

View file

@ -1,3 +0,0 @@
[vermin]
verbose = 1
processes = 4

View file

@ -1,95 +0,0 @@
from typing import Any, Dict, Optional, Union, overload
from playwright.sync_api import (
Browser,
BrowserContext,
Playwright,
PlaywrightContextManager,
)
from typing_extensions import Literal
from camoufox.virtdisplay import VirtualDisplay
from .utils import launch_options, sync_attach_vd
class Camoufox(PlaywrightContextManager):
"""
Wrapper around playwright.sync_api.PlaywrightContextManager that automatically
launches a browser and closes it when the context manager is exited.
"""
def __init__(self, **launch_options):
super().__init__()
self.launch_options = launch_options
self.browser: Optional[Union[Browser, BrowserContext]] = None
def __enter__(self) -> Union[Browser, BrowserContext]:
super().__enter__()
self.browser = NewBrowser(self._playwright, **self.launch_options)
return self.browser
def __exit__(self, *args: Any):
if self.browser:
self.browser.close()
super().__exit__(*args)
@overload
def NewBrowser(
playwright: Playwright,
*,
from_options: Optional[Dict[str, Any]] = None,
persistent_context: Literal[False] = False,
**kwargs,
) -> Browser: ...
@overload
def NewBrowser(
playwright: Playwright,
*,
from_options: Optional[Dict[str, Any]] = None,
persistent_context: Literal[True],
**kwargs,
) -> BrowserContext: ...
def NewBrowser(
playwright: Playwright,
*,
headless: Optional[Union[bool, Literal['virtual']]] = None,
from_options: Optional[Dict[str, Any]] = None,
persistent_context: bool = False,
debug: Optional[bool] = None,
**kwargs,
) -> Union[Browser, BrowserContext]:
"""
Launches a new browser instance for Camoufox given a set of launch options.
Parameters:
from_options (Dict[str, Any]):
A set of launch options generated by `launch_options()` to use
persistent_context (bool):
Whether to use a persistent context.
**kwargs:
All other keyword arugments passed to `launch_options()`.
"""
if headless == 'virtual':
virtual_display = VirtualDisplay(debug=debug)
kwargs['virtual_display'] = virtual_display.get()
headless = False
else:
virtual_display = None
if not from_options:
from_options = launch_options(headless=headless, debug=debug, **kwargs)
# Persistent context
if persistent_context:
context = playwright.firefox.launch_persistent_context(**from_options)
return sync_attach_vd(context, virtual_display)
# Browser
browser = playwright.firefox.launch(**from_options)
return sync_attach_vd(browser, virtual_display)

File diff suppressed because it is too large Load diff

View file

@ -1,662 +0,0 @@
import os
import sys
from os import environ
from os.path import abspath
from pathlib import Path
from pprint import pprint
from random import randint, randrange
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast
import numpy as np
import orjson
from browserforge.fingerprints import Fingerprint, Screen
from screeninfo import get_monitors
from typing_extensions import TypeAlias
from ua_parser import user_agent_parser
from .addons import DefaultAddons, add_default_addons, confirm_paths
from .exceptions import (
InvalidOS,
InvalidPropertyType,
NonFirefoxFingerprint,
UnknownProperty,
)
from .fingerprints import from_browserforge, generate_fingerprint
from .ip import Proxy, public_ip, valid_ipv4, valid_ipv6
from .locale import geoip_allowed, get_geolocation, handle_locales
from .pkgman import OS_NAME, get_path, installed_verstr, launch_path
from .virtdisplay import VirtualDisplay
from .warnings import LeakWarning
from .webgl import sample_webgl
ListOrString: TypeAlias = Union[Tuple[str, ...], List[str], str]
# Camoufox preferences to cache previous pages and requests
CACHE_PREFS = {
'browser.sessionhistory.max_entries': 10,
'browser.sessionhistory.max_total_viewers': -1,
'browser.cache.memory.enable': True,
'browser.cache.disk_cache_ssl': True,
'browser.cache.disk.smart_size.enabled': True,
}
def get_env_vars(
config_map: Dict[str, str], user_agent_os: str
) -> Dict[str, Union[str, float, bool]]:
"""
Gets a dictionary of environment variables for Camoufox.
"""
env_vars: Dict[str, Union[str, float, bool]] = {}
try:
updated_config_data = orjson.dumps(config_map)
except orjson.JSONEncodeError as e:
print(f"Error updating config: {e}")
sys.exit(1)
# Split the config into chunks
chunk_size = 2047 if OS_NAME == 'win' else 32767
config_str = updated_config_data.decode('utf-8')
for i in range(0, len(config_str), chunk_size):
chunk = config_str[i : i + chunk_size]
env_name = f"CAMOU_CONFIG_{(i // chunk_size) + 1}"
try:
env_vars[env_name] = chunk
except Exception as e:
print(f"Error setting {env_name}: {e}")
sys.exit(1)
if OS_NAME == 'lin':
fontconfig_path = get_path(os.path.join("fontconfig", user_agent_os))
env_vars['FONTCONFIG_PATH'] = fontconfig_path
return env_vars
def _load_properties(path: Optional[Path] = None) -> Dict[str, str]:
"""
Loads the properties.json file.
"""
if path:
prop_file = str(path.parent / "properties.json")
else:
prop_file = get_path("properties.json")
with open(prop_file, "rb") as f:
prop_dict = orjson.loads(f.read())
return {prop['property']: prop['type'] for prop in prop_dict}
def validate_config(config_map: Dict[str, str], path: Optional[Path] = None) -> None:
"""
Validates the config map.
"""
property_types = _load_properties(path=path)
for key, value in config_map.items():
expected_type = property_types.get(key)
if not expected_type:
raise UnknownProperty(f"Unknown property {key} in config")
if not validate_type(value, expected_type):
raise InvalidPropertyType(
f"Invalid type for property {key}. Expected {expected_type}, got {type(value).__name__}"
)
def validate_type(value: Any, expected_type: str) -> bool:
"""
Validates the type of the value.
"""
if expected_type == "str":
return isinstance(value, str)
elif expected_type == "int":
return isinstance(value, int) or (isinstance(value, float) and value.is_integer())
elif expected_type == "uint":
return (
isinstance(value, int) or (isinstance(value, float) and value.is_integer())
) and value >= 0
elif expected_type == "double":
return isinstance(value, (float, int))
elif expected_type == "bool":
return isinstance(value, bool)
elif expected_type == "array":
return isinstance(value, list)
elif expected_type == "dict":
return isinstance(value, dict)
else:
return False
def get_target_os(config: Dict[str, Any]) -> Literal['mac', 'win', 'lin']:
"""
Gets the OS from the config if the user agent is set,
otherwise returns the OS of the current system.
"""
if config.get("navigator.userAgent"):
return determine_ua_os(config["navigator.userAgent"])
return OS_NAME
def determine_ua_os(user_agent: str) -> Literal['mac', 'win', 'lin']:
"""
Determines the OS from the user agent string.
"""
parsed_ua = user_agent_parser.ParseOS(user_agent).get('family')
if not parsed_ua:
raise ValueError("Could not determine OS from user agent")
if parsed_ua.startswith("Mac"):
return "mac"
if parsed_ua.startswith("Windows"):
return "win"
return "lin"
def get_screen_cons(headless: Optional[bool] = None) -> Optional[Screen]:
"""
Determines a sane viewport size for Camoufox if being ran in headful mode.
"""
if headless is False:
return None # Skip if headless
try:
monitors = get_monitors()
except Exception:
return None # Skip if there's an error getting the monitors
if not monitors:
return None # Skip if there are no monitors
# Use the dimensions from the monitor with greatest screen real estate
monitor = max(monitors, key=lambda m: m.width * m.height)
return Screen(max_width=monitor.width, max_height=monitor.height)
def update_fonts(config: Dict[str, Any], target_os: str) -> None:
"""
Updates the fonts for the target OS.
"""
with open(os.path.join(os.path.dirname(__file__), "fonts.json"), "rb") as f:
fonts = orjson.loads(f.read())[target_os]
# Merge with existing fonts
if 'fonts' in config:
config['fonts'] = np.unique(fonts + config['fonts']).tolist()
else:
config['fonts'] = fonts
def check_custom_fingerprint(fingerprint: Fingerprint) -> None:
"""
Asserts that the passed BrowserForge fingerprint is a valid Firefox fingerprint.
and warns the user that passing their own fingerprint is not recommended.
"""
# Check what the browser is
browser_name = user_agent_parser.ParseUserAgent(fingerprint.navigator.userAgent).get(
'family', 'Non-Firefox'
)
if browser_name != 'Firefox':
raise NonFirefoxFingerprint(
f'"{browser_name}" fingerprints are not supported in Camoufox. '
'Using fingerprints from a browser other than Firefox WILL lead to detection. '
'If this is intentional, pass `i_know_what_im_doing=True`.'
)
LeakWarning.warn('custom_fingerprint', False)
def check_valid_os(os: ListOrString) -> None:
"""
Checks if the target OS is valid.
"""
if not isinstance(os, str):
for os_name in os:
check_valid_os(os_name)
return
# Assert that the OS is lowercase
if not os.islower():
raise InvalidOS(f"OS values must be lowercase: '{os}'")
# Assert that the OS is supported by Camoufox
if os not in ('windows', 'macos', 'linux'):
raise InvalidOS(f"Camoufox does not support the OS: '{os}'")
def _clean_locals(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Gets the launch options from the locals of the function.
"""
del data['playwright']
del data['persistent_context']
return data
def merge_into(target: Dict[str, Any], source: Dict[str, Any]) -> None:
"""
Merges new keys/values from the source dictionary into the target dictionary.
Given that the key does not exist in the target dictionary.
"""
for key, value in source.items():
if key not in target:
target[key] = value
def set_into(target: Dict[str, Any], key: str, value: Any) -> None:
"""
Sets a new key/value into the target dictionary.
Given that the key does not exist in the target dictionary.
"""
if key not in target:
target[key] = value
def is_domain_set(
config: Dict[str, Any],
*properties: str,
) -> bool:
"""
Checks if a domain is set in the config.
"""
for prop in properties:
# If the . prefix exists, check if the domain is a prefix of any key in the config
if prop[-1] in ('.', ':'):
if any(key.startswith(prop) for key in config):
return True
# Otherwise, check if the domain is a direct key in the config
else:
if prop in config:
return True
return False
def warn_manual_config(config: Dict[str, Any]) -> None:
"""
Warns the user if they are manually setting properties that Camoufox already sets internally.
"""
# Manual locale setting
if is_domain_set(
config, 'navigator.language', 'navigator.languages', 'headers.Accept-Language', 'locale:'
):
LeakWarning.warn('locale', False)
# Manual geolocation and timezone setting
if is_domain_set(config, 'geolocation:', 'timezone'):
LeakWarning.warn('geolocation', False)
# Manual User-Agent setting
if is_domain_set(config, 'headers.User-Agent'):
LeakWarning.warn('header-ua', False)
# Manual navigator setting
if is_domain_set(config, 'navigator.'):
LeakWarning.warn('navigator', False)
# Manual screen/window setting
if is_domain_set(config, 'screen.', 'window.', 'document.body.'):
LeakWarning.warn('viewport', False)
async def async_attach_vd(
browser: Any, virtual_display: Optional[VirtualDisplay] = None
) -> Any: # type: ignore
"""
Attaches the virtual display to the async browser cleanup
"""
if not virtual_display: # Skip if no virtual display is provided
return browser
_close = browser.close
async def new_close(*args: Any, **kwargs: Any):
await _close(*args, **kwargs)
if virtual_display:
virtual_display.kill()
browser.close = new_close
browser._virtual_display = virtual_display
return browser
def sync_attach_vd(
browser: Any, virtual_display: Optional[VirtualDisplay] = None
) -> Any: # type: ignore
"""
Attaches the virtual display to the sync browser cleanup
"""
if not virtual_display: # Skip if no virtual display is provided
return browser
_close = browser.close
def new_close(*args: Any, **kwargs: Any):
_close(*args, **kwargs)
if virtual_display:
virtual_display.kill()
browser.close = new_close
browser._virtual_display = virtual_display
return browser
def launch_options(
*,
config: Optional[Dict[str, Any]] = None,
os: Optional[ListOrString] = None,
block_images: Optional[bool] = None,
block_webrtc: Optional[bool] = None,
block_webgl: Optional[bool] = None,
disable_coop: Optional[bool] = None,
webgl_config: Optional[Tuple[str, str]] = None,
geoip: Optional[Union[str, bool]] = None,
humanize: Optional[Union[bool, float]] = None,
locale: Optional[Union[str, List[str]]] = None,
addons: Optional[List[str]] = None,
fonts: Optional[List[str]] = None,
custom_fonts_only: Optional[bool] = None,
exclude_addons: Optional[List[DefaultAddons]] = None,
screen: Optional[Screen] = None,
window: Optional[Tuple[int, int]] = None,
fingerprint: Optional[Fingerprint] = None,
ff_version: Optional[int] = None,
headless: Optional[bool] = None,
main_world_eval: Optional[bool] = None,
executable_path: Optional[Union[str, Path]] = None,
firefox_user_prefs: Optional[Dict[str, Any]] = None,
proxy: Optional[Dict[str, str]] = None,
enable_cache: Optional[bool] = None,
args: Optional[List[str]] = None,
env: Optional[Dict[str, Union[str, float, bool]]] = None,
i_know_what_im_doing: Optional[bool] = None,
debug: Optional[bool] = None,
virtual_display: Optional[str] = None,
**launch_options: Dict[str, Any],
) -> Dict[str, Any]:
"""
Launches a new browser instance for Camoufox.
Accepts all Playwright Firefox launch options, along with the following:
Parameters:
config (Optional[Dict[str, Any]]):
Camoufox properties to use. (read https://github.com/daijro/camoufox/blob/main/README.md)
os (Optional[ListOrString]):
Operating system to use for the fingerprint generation.
Can be "windows", "macos", "linux", or a list to randomly choose from.
Default: ["windows", "macos", "linux"]
block_images (Optional[bool]):
Whether to block all images.
block_webrtc (Optional[bool]):
Whether to block WebRTC entirely.
block_webgl (Optional[bool]):
Whether to block WebGL. To prevent leaks, only use this for special cases.
disable_coop (Optional[bool]):
Disables the Cross-Origin-Opener-Policy, allowing elements in cross-origin iframes,
such as the Turnstile checkbox, to be clicked.
geoip (Optional[Union[str, bool]]):
Calculate longitude, latitude, timezone, country, & locale based on the IP address.
Pass the target IP address to use, or `True` to find the IP address automatically.
humanize (Optional[Union[bool, float]]):
Humanize the cursor movement.
Takes either `True`, or the MAX duration in seconds of the cursor movement.
The cursor typically takes up to 1.5 seconds to move across the window.
locale (Optional[Union[str, List[str]]]):
Locale(s) to use in Camoufox. The first listed locale will be used for the Intl API.
addons (Optional[List[str]]):
List of Firefox addons to use.
fonts (Optional[List[str]]):
Fonts to load into Camoufox (in addition to the default fonts for the target `os`).
Takes a list of font family names that are installed on the system.
custom_fonts_only (Optional[bool]):
If enabled, OS-specific system fonts will be not be passed to Camoufox.
exclude_addons (Optional[List[DefaultAddons]]):
Default addons to exclude. Passed as a list of camoufox.DefaultAddons enums.
screen (Optional[Screen]):
Constrains the screen dimensions of the generated fingerprint.
Takes a browserforge.fingerprints.Screen instance.
window (Optional[Tuple[int, int]]):
Set a fixed window size instead of generating a random one
fingerprint (Optional[Fingerprint]):
Use a custom BrowserForge fingerprint. Note: Not all values will be implemented.
If not provided, a random fingerprint will be generated based on the provided
`os` & `screen` constraints.
ff_version (Optional[int]):
Firefox version to use. Defaults to the current Camoufox version.
To prevent leaks, only use this for special cases.
headless (Optional[bool]):
Whether to run the browser in headless mode. Defaults to False.
Note: If you are running linux, passing headless='virtual' to Camoufox & AsyncCamoufox
will use Xvfb.
main_world_eval (Optional[bool]):
Whether to enable running scripts in the main world.
To use this, prepend "mw:" to the script: page.evaluate("mw:" + script).
executable_path (Optional[Union[str, Path]]):
Custom Camoufox browser executable path.
firefox_user_prefs (Optional[Dict[str, Any]]):
Firefox user preferences to set.
proxy (Optional[Dict[str, str]]):
Proxy to use for the browser.
Note: If geoip is True, a request will be sent through this proxy to find the target IP.
enable_cache (Optional[bool]):
Cache previous pages, requests, etc (uses more memory).
args (Optional[List[str]]):
Arguments to pass to the browser.
env (Optional[Dict[str, Union[str, float, bool]]]):
Environment variables to set.
debug (Optional[bool]):
Prints the config being sent to Camoufox.
virtual_display (Optional[str]):
Virtual display number. Ex: ':99'. This is handled by Camoufox & AsyncCamoufox.
webgl_config (Optional[Tuple[str, str]]):
Use a specific WebGL vendor/renderer pair. Passed as a tuple of (vendor, renderer).
**launch_options (Dict[str, Any]):
Additional Firefox launch options.
"""
# Build the config
if config is None:
config = {}
# Set default values for optional arguments
if headless is None:
headless = False
if addons is None:
addons = []
if args is None:
args = []
if firefox_user_prefs is None:
firefox_user_prefs = {}
if custom_fonts_only is None:
custom_fonts_only = False
if i_know_what_im_doing is None:
i_know_what_im_doing = False
if env is None:
env = cast(Dict[str, Union[str, float, bool]], environ)
if isinstance(executable_path, str):
# Convert executable path to a Path object
executable_path = Path(abspath(executable_path))
# Handle virtual display
if virtual_display:
env['DISPLAY'] = virtual_display
# Warn the user for manual config settings
if not i_know_what_im_doing:
warn_manual_config(config)
# Assert the target OS is valid
if os:
check_valid_os(os)
# webgl_config requires OS to be set
elif webgl_config:
raise ValueError('OS must be set when using webgl_config')
# Add the default addons
add_default_addons(addons, exclude_addons)
# Confirm all addon paths are valid
if addons:
confirm_paths(addons)
config['addons'] = addons
# Get the Firefox version
if ff_version:
ff_version_str = str(ff_version)
LeakWarning.warn('ff_version', i_know_what_im_doing)
else:
ff_version_str = installed_verstr().split('.', 1)[0]
# Generate a fingerprint
if fingerprint is None:
fingerprint = generate_fingerprint(
screen=screen or get_screen_cons(headless or 'DISPLAY' in env),
window=window,
os=os,
)
else:
# Or use the one passed by the user
if not i_know_what_im_doing:
check_custom_fingerprint(fingerprint)
# Inject the fingerprint into the config
merge_into(
config,
from_browserforge(fingerprint, ff_version_str),
)
target_os = get_target_os(config)
# Set a random window.history.length
set_into(config, 'window.history.length', randrange(1, 6)) # nosec
# Update fonts list
if fonts:
config['fonts'] = fonts
if custom_fonts_only:
firefox_user_prefs['gfx.bundled-fonts.activate'] = 0
if fonts:
# The user has passed their own fonts, and OS fonts are disabled.
LeakWarning.warn('custom_fonts_only')
else:
# OS fonts are disabled, and the user has not passed their own fonts either.
raise ValueError('No custom fonts were passed, but `custom_fonts_only` is enabled.')
else:
update_fonts(config, target_os)
# Set a fixed font spacing seed
set_into(config, 'fonts:spacing_seed', randint(0, 1_073_741_823)) # nosec
# Set geolocation
if geoip:
geoip_allowed() # Assert that geoip is allowed
if geoip is True:
# Find the user's IP address
if proxy:
geoip = public_ip(Proxy(**proxy).as_string())
else:
geoip = public_ip()
# Spoof WebRTC if not blocked
if not block_webrtc:
if valid_ipv4(geoip):
set_into(config, 'webrtc:ipv4', geoip)
firefox_user_prefs['network.dns.disableIPv6'] = True
elif valid_ipv6(geoip):
set_into(config, 'webrtc:ipv6', geoip)
geolocation = get_geolocation(geoip)
config.update(geolocation.as_config())
# Raise a warning when a proxy is being used without spoofing geolocation.
# This is a very bad idea; the warning cannot be ignored with i_know_what_im_doing.
elif (
proxy
and 'localhost' not in proxy.get('server', '')
and not is_domain_set(config, 'geolocation')
):
LeakWarning.warn('proxy_without_geoip')
# Set locale
if locale:
handle_locales(locale, config)
# Pass the humanize option
if humanize:
set_into(config, 'humanize', True)
if isinstance(humanize, (int, float)):
set_into(config, 'humanize:maxTime', humanize)
# Enable the main world context creation
if main_world_eval:
set_into(config, 'allowMainWorld', True)
# Set Firefox user preferences
if block_images:
LeakWarning.warn('block_images', i_know_what_im_doing)
firefox_user_prefs['permissions.default.image'] = 2
if block_webrtc:
firefox_user_prefs['media.peerconnection.enabled'] = False
if disable_coop:
LeakWarning.warn('disable_coop', i_know_what_im_doing)
firefox_user_prefs['browser.tabs.remote.useCrossOriginOpenerPolicy'] = False
# Allow allow_webgl parameter for backwards compatibility
if block_webgl or launch_options.pop('allow_webgl', True) is False:
firefox_user_prefs['webgl.disabled'] = True
LeakWarning.warn('block_webgl', i_know_what_im_doing)
else:
# If the user has provided a specific WebGL vendor/renderer pair, use it
if webgl_config:
webgl_fp = sample_webgl(target_os, *webgl_config)
else:
webgl_fp = sample_webgl(target_os)
enable_webgl2 = webgl_fp.pop('webGl2Enabled')
# Merge the WebGL fingerprint into the config
merge_into(config, webgl_fp)
# Set the WebGL preferences
merge_into(
firefox_user_prefs,
{
'webgl.enable-webgl2': enable_webgl2,
'webgl.force-enabled': True,
},
)
# Canvas anti-fingerprinting
merge_into(
config,
{
'canvas:aaOffset': randint(-50, 50), # nosec
'canvas:aaCapOffset': True,
},
)
# Cache previous pages, requests, etc (uses more memory)
if enable_cache:
merge_into(firefox_user_prefs, CACHE_PREFS)
# Print the config if debug is enabled
if debug:
print('[DEBUG] Config:')
pprint(config)
# Validate the config
validate_config(config, path=executable_path)
# Prepare environment variables to pass to Camoufox
env_vars = {
**get_env_vars(config, target_os),
**env,
}
# Prepare the executable path
if executable_path:
executable_path = str(executable_path)
else:
executable_path = launch_path()
return {
"executable_path": executable_path,
"args": args,
"env": env_vars,
"firefox_user_prefs": firefox_user_prefs,
"proxy": proxy,
"headless": headless,
**(launch_options if launch_options is not None else {}),
}

View file

@ -1,146 +0,0 @@
import os
import subprocess # nosec
from glob import glob
from multiprocessing import Lock
from random import randrange
from shutil import which
from typing import List, Optional
from camoufox.exceptions import (
CannotExecuteXvfb,
CannotFindXvfb,
VirtualDisplayNotSupported,
)
from camoufox.pkgman import OS_NAME
class VirtualDisplay:
"""
A minimal virtual display implementation for Linux.
"""
def __init__(self, debug: Optional[bool] = False) -> None:
"""
Constructor for the VirtualDisplay class (singleton object).
"""
self.debug = debug
self.proc: Optional[subprocess.Popen] = None
self._display: Optional[int] = None
self._lock = Lock()
xvfb_args = (
# fmt: off
"-screen", "0", "1x1x24",
"-ac",
"-nolisten", "tcp",
"-extension", "RENDER",
"+extension", "GLX",
"-extension", "COMPOSITE",
"-extension", "XVideo",
"-extension", "XVideo-MotionCompensation",
"-extension", "XINERAMA",
"-shmem",
"-fp", "built-ins",
"-nocursor",
"-br",
# fmt: on
)
@property
def xvfb_path(self) -> str:
"""
Get the path to the xvfb executable
"""
path = which("Xvfb")
if not path:
raise CannotFindXvfb("Please install Xvfb to use headless mode.")
if not os.access(path, os.X_OK):
raise CannotExecuteXvfb(f"I do not have permission to execute Xvfb: {path}")
return path
@property
def xvfb_cmd(self) -> List[str]:
"""
Get the xvfb command
"""
return [self.xvfb_path, f':{self.display}', *self.xvfb_args]
def execute_xvfb(self):
"""
Spawn a detatched process
"""
if self.debug:
print('Starting virtual display:', ' '.join(self.xvfb_cmd))
self.proc = subprocess.Popen( # nosec
self.xvfb_cmd,
stdout=None if self.debug else subprocess.DEVNULL,
stderr=None if self.debug else subprocess.DEVNULL,
)
def get(self) -> str:
"""
Get the display number
"""
self.assert_linux()
with self._lock:
if self.proc is None:
self.execute_xvfb()
elif self.debug:
print(f'Using virtual display: {self.display}')
return f':{self.display}'
def kill(self):
"""
Terminate the xvfb process
"""
with self._lock:
if self.proc and self.proc.poll() is None:
if self.debug:
print('Terminating virtual display:', self.display)
self.proc.terminate()
def __del__(self):
"""
Kill and delete the VirtualDisplay object
"""
self.kill()
@staticmethod
def _get_lock_files() -> List[str]:
"""
Get list of lock files in /tmp
"""
tmpd = os.environ.get('TMPDIR', '/tmp') # nosec
try:
lock_files = glob(os.path.join(tmpd, ".X*-lock"))
except FileNotFoundError:
return []
return [p for p in lock_files if os.path.isfile(p)]
@staticmethod
def _free_display() -> int:
"""
Search for free display
"""
ls = list(
map(lambda x: int(x.split("X")[1].split("-")[0]), VirtualDisplay._get_lock_files())
)
return max(99, max(ls) + randrange(3, 20)) if ls else 99 # nosec
@property
def display(self) -> int:
"""
Get the display number
"""
if self._display is None:
self._display = self._free_display()
return self._display
@staticmethod
def assert_linux():
"""
Assert that the current OS is Linux
"""
if OS_NAME != 'lin':
raise VirtualDisplayNotSupported("Virtual display is only supported on Linux.")

View file

@ -1,44 +0,0 @@
import inspect
import warnings
from pathlib import Path
from typing import Optional
from camoufox.pkgman import load_yaml
WARNINGS_DATA = load_yaml('warnings.yml')
class LeakWarning(RuntimeWarning):
"""
Raised when a the user has a setting enabled that can cause detection.
"""
@staticmethod
def warn(warning_key: str, i_know_what_im_doing: Optional[bool] = None) -> None:
"""
Warns the user if a passed parameter can cause leaks.
"""
warning = WARNINGS_DATA[warning_key]
if i_know_what_im_doing:
return
if i_know_what_im_doing is not None:
warning += '\nIf this is intentional, pass `i_know_what_im_doing=True`.'
# Get caller information
current_module = Path(__file__).parent
frame = inspect.currentframe()
while frame:
if not Path(frame.f_code.co_filename).is_relative_to(current_module):
break
frame = frame.f_back
if frame:
warnings.warn_explicit(
warning,
category=LeakWarning,
filename=frame.f_code.co_filename,
lineno=frame.f_lineno,
)
return
warnings.warn(warning, category=LeakWarning)

View file

@ -1,51 +0,0 @@
navigator: >-
Manually setting navigator properties is not recommended.
Device information is automatically generated within Camoufox
based on the provided `os`.
locale: >-
Use the `locale` parameter in Camoufox instead of setting the config manually.
geolocation: >-
Please use the `geoip` parameter in Camoufox instead of setting your geolocation manually.
This can lead to detection if your target geolocation does not match your IP.
Pass `geoip=True` or a target IP (ex: geoip='123.45.67.89') to let Camoufox populate this data for you.
header-ua: >-
Do not set the header.User-Agent manually. Camoufox will generate a User-Agent for you.
viewport: >-
Manually setting screen & window properties is not recommended.
Screen dimensions are randomly generated within Camoufox
based on the provided screen constraints. See here:
https://github.com/daijro/camoufox/tree/main/pythonlib#browserforge-integration.
custom_fingerprint: >-
Passing your own fingerprint is not recommended.
BrowserForge fingerprints are automatically generated within Camoufox
based on the provided `os` and `screen` constraints.
proxy_without_geoip: >-
When using a proxy, it is heavily recommended that you pass `geoip=True`.
ff_version: >-
Spoofing the Firefox version will likely lead to detection.
If rotating the Firefox version is absolutely necessary, it would be more advisable to
rotate between older versions of Camoufox instead.
no_region: >-
Because you did not pass in a locale region, Camoufox will generate one for you.
This can cause suspicion if your IP does not match your locale region.
block_webgl: >-
Disabling WebGL is not recommended. Many WAFs will check if WebGL is enabled.
block_images: >-
Blocking image requests has been reported to cause detection issues on major WAFs.
custom_fonts_only: >-
Disabling OS-specific fonts while spoofing your OS will make your browser fingerprint inconsistent.
WAFs can detect this mismatch between your claimed OS and available system fonts.
disable_coop: >-
Disabling Cross-Origin-Opener-Policy (COOP) handling can potentially be detected by sophisticated WAFs.

View file

@ -1,3 +0,0 @@
from .sample import sample_webgl
__all__ = ['sample_webgl']

View file

@ -1,108 +0,0 @@
import sqlite3
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
import orjson
from camoufox.pkgman import OS_ARCH_MATRIX
# Get database path relative to this file
DB_PATH = Path(__file__).parent / 'webgl_data.db'
def sample_webgl(
os: str, vendor: Optional[str] = None, renderer: Optional[str] = None
) -> Dict[str, str]:
"""
Sample a random WebGL vendor/renderer combination and its data based on OS probabilities.
Optionally use a specific vendor/renderer pair.
Args:
os: Operating system ('win', 'mac', or 'lin')
vendor: Optional specific vendor to use
renderer: Optional specific renderer to use (requires vendor to be set)
Returns:
Dict containing WebGL data including vendor, renderer and additional parameters
Raises:
ValueError: If invalid OS provided or no data found for OS/vendor/renderer
"""
# Check that the OS is valid (avoid SQL injection)
if os not in OS_ARCH_MATRIX:
raise ValueError(f'Invalid OS: {os}. Must be one of: win, mac, lin')
# Connect to database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
if vendor and renderer:
# Get specific vendor/renderer pair and verify it exists for this OS
cursor.execute(
f'SELECT vendor, renderer, data, {os} FROM webgl_fingerprints ' # nosec
'WHERE vendor = ? AND renderer = ?',
(vendor, renderer),
)
result = cursor.fetchone()
if not result:
raise ValueError(f'No WebGL data found for vendor "{vendor}" and renderer "{renderer}"')
if result[3] <= 0: # Check OS-specific probability
# Get a list of possible (vendor, renderer) pairs for this OS
cursor.execute(
f'SELECT DISTINCT vendor, renderer FROM webgl_fingerprints WHERE {os} > 0' # nosec
)
possible_pairs = cursor.fetchall()
raise ValueError(
f'Vendor "{vendor}" and renderer "{renderer}" combination not valid for {os.title()}.\n'
f'Possible pairs: {", ".join(str(pair) for pair in possible_pairs)}'
)
conn.close()
return orjson.loads(result[2])
# Get all vendor/renderer pairs and their probabilities for this OS
cursor.execute(
f'SELECT vendor, renderer, data, {os} FROM webgl_fingerprints WHERE {os} > 0' # nosec
)
results = cursor.fetchall()
conn.close()
if not results:
raise ValueError(f'No WebGL data found for OS: {os}')
# Split into separate arrays
_, _, data_strs, probs = map(list, zip(*results))
# Convert probabilities to numpy array and normalize
probs_array = np.array(probs, dtype=np.float64)
probs_array = probs_array / probs_array.sum()
# Sample based on probabilities
idx = np.random.choice(len(probs_array), p=probs_array)
# Parse the JSON data string
return orjson.loads(data_strs[idx])
def get_possible_pairs() -> Dict[str, List[Tuple[str, str]]]:
"""
Get all possible (vendor, renderer) pairs for all OS, where the probability is greater than 0.
"""
# Connect to database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get all vendor/renderer pairs for each OS where probability > 0
result: Dict[str, List[Tuple[str, str]]] = {}
for os_type in OS_ARCH_MATRIX:
cursor.execute(
'SELECT DISTINCT vendor, renderer FROM webgl_fingerprints '
f'WHERE {os_type} > 0 ORDER BY {os_type} DESC', # nosec
)
result[os_type] = cursor.fetchall()
conn.close()
return result

View file

@ -1,14 +0,0 @@
rm -rf ./dist
rm -rf ./camoufox/*.mmdb
rm -rf ./camoufox/*.png
vermin . --eval-annotations --target=3.8 --violations camoufox/ || exit 1
python -m build
twine check dist/*
read -p "Confirm publish? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
twine upload dist/*
fi

View file

@ -1,52 +0,0 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "camoufox"
version = "0.4.11"
description = "Wrapper around Playwright to help launch Camoufox"
authors = ["daijro <daijro.dev@gmail.com>"]
license = "MIT"
repository = "https://github.com/daijro/camoufox"
homepage = "https://camoufox.com/python"
readme = "README.md"
keywords = [
"client",
"fingerprint",
"browser",
"scraping",
"injector",
"firefox",
"playwright",
]
classifiers = [
"Topic :: Internet :: WWW/HTTP",
"Topic :: Internet :: WWW/HTTP :: Browsers",
"Topic :: Software Development :: Libraries :: Python Modules",
]
[tool.poetry.dependencies]
python = "^3.8"
click = "*"
requests = "*"
orjson = "*"
browserforge = "^1.2.1"
playwright = "*"
pyyaml = "*"
platformdirs = "*"
tqdm = "*"
numpy = "*"
ua_parser = "*"
typing_extensions = "*"
screeninfo = "*"
lxml = "*"
language-tags = "*"
pysocks = "*"
geoip2 = {version = "*", optional = true}
[tool.poetry.extras]
geoip = ["geoip2"]
[tool.poetry.scripts]
camoufox = "camoufox.__main__:cli"