Merge remote-tracking branch 'origin/pakjoy' into pakjoy
This commit is contained in:
commit
4c36433d36
|
|
@ -24,6 +24,7 @@ from qutebrowser.browser.webengine import (spell, webenginequtescheme, cookies,
|
|||
webenginedownloads, notification)
|
||||
from qutebrowser.config import config, websettings
|
||||
from qutebrowser.config.websettings import AttributeInfo as Attr
|
||||
from qutebrowser.misc import pakjoy
|
||||
from qutebrowser.utils import (standarddir, qtutils, message, log,
|
||||
urlmatch, usertypes, objreg, version)
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -546,6 +547,10 @@ def init():
|
|||
_global_settings = WebEngineSettings(_SettingsWrapper())
|
||||
|
||||
log.init.debug("Initializing profiles...")
|
||||
|
||||
# Apply potential resource patches before initializing profiles.
|
||||
pakjoy.patch()
|
||||
|
||||
_init_default_profile()
|
||||
init_private_profile()
|
||||
config.instance.changed.connect(_update_settings)
|
||||
|
|
|
|||
|
|
@ -41,4 +41,3 @@ def safe_seek(fobj: IO[bytes], pos: int) -> None:
|
|||
fobj.seek(pos)
|
||||
except (OSError, OverflowError) as e:
|
||||
raise ParseError(e)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org>
|
||||
#
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
|
@ -26,10 +25,14 @@ This is a "best effort" parser. If it errors out, we don't apply the workaround
|
|||
instead of crashing.
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import pathlib
|
||||
import dataclasses
|
||||
from typing import ClassVar, IO, Optional, Dict, Tuple
|
||||
|
||||
from qutebrowser.misc import binparsing
|
||||
from qutebrowser.utils import qtutils, standarddir, version, utils, log
|
||||
|
||||
HANGOUTS_MARKER = b"// Extension ID: nkeimhogjdpnpccoofpliimaahmaaome"
|
||||
HANGOUTS_ID = 36197 # as found by toofar
|
||||
|
|
@ -47,7 +50,7 @@ class PakHeader:
|
|||
|
||||
encoding: int # uint32
|
||||
resource_count: int # uint16
|
||||
alias_count: int # uint16
|
||||
_alias_count: int # uint16
|
||||
|
||||
_FORMAT: ClassVar[str] = '<IHH'
|
||||
|
||||
|
|
@ -60,7 +63,7 @@ class PakHeader:
|
|||
@dataclasses.dataclass
|
||||
class PakEntry:
|
||||
|
||||
"""Entry description in a .pak file"""
|
||||
"""Entry description in a .pak file."""
|
||||
|
||||
resource_id: int # uint16
|
||||
file_offset: int # uint32
|
||||
|
|
@ -75,18 +78,20 @@ class PakEntry:
|
|||
|
||||
|
||||
class PakParser:
|
||||
"""Parse webengine pak and find patch location to disable Google Meet extension."""
|
||||
|
||||
def __init__(self, fobj: IO[bytes]) -> None:
|
||||
"""Parse the .pak file from the given file object."""
|
||||
version = binparsing.unpack("<I", fobj)[0]
|
||||
if version != PAK_VERSION:
|
||||
raise binparsing.ParseError(f"Unsupported .pak version {version}")
|
||||
pak_version = binparsing.unpack("<I", fobj)[0]
|
||||
if pak_version != PAK_VERSION:
|
||||
raise binparsing.ParseError(f"Unsupported .pak version {pak_version}")
|
||||
|
||||
self.fobj = fobj
|
||||
entries = self._read_header()
|
||||
self.manifest_entry, self.manifest = self._find_manifest(entries)
|
||||
|
||||
def find_patch_offset(self) -> int:
|
||||
"""Return byte offset of TARGET_URL into the pak file."""
|
||||
try:
|
||||
return self.manifest_entry.file_offset + self.manifest.index(TARGET_URL)
|
||||
except ValueError:
|
||||
|
|
@ -135,25 +140,79 @@ class PakParser:
|
|||
for entry in entries.values():
|
||||
manifest = self._maybe_get_hangouts_manifest(entry)
|
||||
if manifest is not None:
|
||||
return entry, manifest
|
||||
return entries[id_], manifest
|
||||
|
||||
raise binparsing.ParseError("Couldn't find hangouts manifest")
|
||||
|
||||
|
||||
def copy_webengine_resources() -> pathlib.Path:
|
||||
"""Copy qtwebengine resources to local dir for patching."""
|
||||
resources_dir = qtutils.library_path(qtutils.LibraryPath.data)
|
||||
if utils.is_mac:
|
||||
# I'm not sure how to arrive at this path without hardcoding it
|
||||
# ourselves. importlib_resources("PyQt6.Qt6") can serve as a
|
||||
# replacement for the qtutils bit but it doesn't seem to help find the
|
||||
# actually Resources folder.
|
||||
resources_dir /= pathlib.Path("lib", "QtWebEngineCore.framework", "Resources")
|
||||
else:
|
||||
resources_dir /= "resources"
|
||||
work_dir = pathlib.Path(standarddir.cache()) / "webengine_resources_pak_quirk"
|
||||
|
||||
log.misc.debug(
|
||||
"Copying webengine resources for quirk patching: "
|
||||
f"{resources_dir} -> {work_dir}"
|
||||
)
|
||||
|
||||
if work_dir.exists():
|
||||
# TODO: make backup?
|
||||
shutil.rmtree(work_dir)
|
||||
|
||||
shutil.copytree(resources_dir, work_dir)
|
||||
|
||||
os.environ["QTWEBENGINE_RESOURCES_PATH"] = str(work_dir)
|
||||
|
||||
return work_dir
|
||||
|
||||
|
||||
def patch(file_to_patch: pathlib.Path = None) -> None:
|
||||
"""Apply any patches to webengine resource pak files."""
|
||||
versions = version.qtwebengine_versions(avoid_init=True)
|
||||
if versions.webengine != utils.VersionNumber(6, 6):
|
||||
return
|
||||
|
||||
if not file_to_patch:
|
||||
try:
|
||||
file_to_patch = copy_webengine_resources() / "qtwebengine_resources.pak"
|
||||
except OSError:
|
||||
log.misc.exception("Failed to copy webengine resources, not applying quirk")
|
||||
return
|
||||
|
||||
if not file_to_patch.exists():
|
||||
log.misc.error(
|
||||
"Resource pak doesn't exist at expected location! "
|
||||
f"Not applying quirks. Expected location: {file_to_patch}"
|
||||
)
|
||||
return
|
||||
|
||||
with open(file_to_patch, "r+b") as f:
|
||||
try:
|
||||
parser = PakParser(f)
|
||||
log.misc.debug(f"Patching pak entry: {parser.manifest_entry}")
|
||||
offset = parser.find_patch_offset()
|
||||
binparsing.safe_seek(f, offset)
|
||||
f.write(REPLACEMENT_URL)
|
||||
except binparsing.ParseError:
|
||||
log.misc.exception("Failed to apply quirk to resources pak.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import shutil
|
||||
shutil.copy("/usr/share/qt6/resources/qtwebengine_resources.pak", "/tmp/test.pak")
|
||||
output_test_file = pathlib.Path("/tmp/test.pak")
|
||||
#shutil.copy("/opt/google/chrome/resources.pak", output_test_file)
|
||||
shutil.copy("/usr/share/qt6/resources/qtwebengine_resources.pak", output_test_file)
|
||||
patch(output_test_file)
|
||||
|
||||
with open("/tmp/test.pak", "r+b") as f:
|
||||
parser = PakParser(f)
|
||||
print(parser.manifest_entry)
|
||||
print(parser.manifest)
|
||||
offset = parser.find_patch_offset()
|
||||
f.seek(offset)
|
||||
f.write(REPLACEMENT_URL)
|
||||
with open(output_test_file, "rb") as fd:
|
||||
reparsed = PakParser(fd)
|
||||
|
||||
with open("/tmp/test.pak", "rb") as f:
|
||||
parser = PakParser(f)
|
||||
|
||||
print(parser.manifest_entry)
|
||||
print(parser.manifest)
|
||||
print(reparsed.manifest_entry)
|
||||
print(reparsed.manifest)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,277 @@
|
|||
# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org>
|
||||
#
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import os
|
||||
import io
|
||||
import json
|
||||
import struct
|
||||
import pathlib
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from qutebrowser.misc import pakjoy, binparsing
|
||||
from qutebrowser.utils import utils, version, standarddir
|
||||
|
||||
|
||||
pytest.importorskip("qutebrowser.qt.webenginecore")
|
||||
|
||||
|
||||
versions = version.qtwebengine_versions(avoid_init=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def skipifneeded():
|
||||
"""Used to skip happy path tests with the real resources file.
|
||||
|
||||
Since we don't know how reliably the Google Meet hangouts extensions is
|
||||
reliably in the resource files, and this quirk is only targeting 6.6
|
||||
anyway.
|
||||
"""
|
||||
if versions.webengine != utils.VersionNumber(6, 6):
|
||||
raise pytest.skip("Code under test only runs on 6.6")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clean_env():
|
||||
yield
|
||||
if "QTWEBENGINE_RESOURCES_PATH" in os.environ:
|
||||
del os.environ["QTWEBENGINE_RESOURCES_PATH"]
|
||||
|
||||
|
||||
def patch_version(monkeypatch, *args):
|
||||
monkeypatch.setattr(
|
||||
pakjoy.version,
|
||||
"qtwebengine_versions",
|
||||
lambda **kwargs: version.WebEngineVersions(
|
||||
webengine=utils.VersionNumber(*args),
|
||||
chromium=None,
|
||||
source="unittest",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unaffected_version(monkeypatch):
|
||||
patch_version(monkeypatch, 6, 6, 1)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def affected_version(monkeypatch):
|
||||
patch_version(monkeypatch, 6, 6)
|
||||
|
||||
|
||||
def test_version_gate(unaffected_version, mocker):
|
||||
|
||||
fake_open = mocker.patch("qutebrowser.misc.pakjoy.open")
|
||||
pakjoy.patch()
|
||||
assert not fake_open.called
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def tmp_cache(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(pakjoy.standarddir, "cache", lambda: tmp_path)
|
||||
return str(tmp_path)
|
||||
|
||||
|
||||
def json_without_comments(bytestring):
|
||||
str_without_comments = "\n".join(
|
||||
[
|
||||
line
|
||||
for line in
|
||||
bytestring.decode("utf-8").split("\n")
|
||||
if not line.strip().startswith("//")
|
||||
]
|
||||
)
|
||||
return json.loads(str_without_comments)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("affected_version")
|
||||
class TestWithRealResourcesFile:
|
||||
"""Tests that use the real pak file form the Qt installation."""
|
||||
|
||||
def test_happy_path(self, skipifneeded):
|
||||
# Go through the full patching processes with the real resources file from
|
||||
# the current installation. Make sure our replacement string is in it
|
||||
# afterwards.
|
||||
pakjoy.patch()
|
||||
|
||||
patched_resources = pathlib.Path(os.environ["QTWEBENGINE_RESOURCES_PATH"])
|
||||
|
||||
with open(patched_resources / "qtwebengine_resources.pak", "rb") as fd:
|
||||
reparsed = pakjoy.PakParser(fd)
|
||||
|
||||
json_manifest = json_without_comments(reparsed.manifest)
|
||||
|
||||
assert pakjoy.REPLACEMENT_URL.decode("utf-8") in json_manifest[
|
||||
"externally_connectable"
|
||||
]["matches"]
|
||||
|
||||
def test_copying_resources(self):
|
||||
# Test we managed to copy some files over
|
||||
work_dir = pakjoy.copy_webengine_resources()
|
||||
|
||||
assert work_dir.exists()
|
||||
assert work_dir == standarddir.cache() / "webengine_resources_pak_quirk"
|
||||
assert (work_dir / "qtwebengine_resources.pak").exists()
|
||||
assert len(list(work_dir.glob("*"))) > 1
|
||||
|
||||
def test_copying_resources_overwrites(self):
|
||||
work_dir = pakjoy.copy_webengine_resources()
|
||||
tmpfile = work_dir / "tmp.txt"
|
||||
tmpfile.touch()
|
||||
|
||||
pakjoy.copy_webengine_resources()
|
||||
assert not tmpfile.exists()
|
||||
|
||||
@pytest.mark.parametrize("osfunc", ["copytree", "rmtree"])
|
||||
def test_copying_resources_oserror(self, monkeypatch, caplog, osfunc):
|
||||
# Test errors from the calls to shutil are handled
|
||||
pakjoy.copy_webengine_resources() # run twice so we hit rmtree too
|
||||
caplog.clear()
|
||||
|
||||
def raiseme(err):
|
||||
raise err
|
||||
|
||||
monkeypatch.setattr(pakjoy.shutil, osfunc, lambda *_args: raiseme(PermissionError(osfunc)))
|
||||
with caplog.at_level(logging.ERROR, "misc"):
|
||||
pakjoy.patch()
|
||||
assert caplog.messages == ["Failed to copy webengine resources, not applying quirk"]
|
||||
|
||||
def test_expected_file_not_found(self, tmp_cache, monkeypatch, caplog):
|
||||
with caplog.at_level(logging.ERROR, "misc"):
|
||||
pakjoy.patch(pathlib.Path(tmp_cache) / "doesntexist")
|
||||
assert caplog.messages[-1].startswith(
|
||||
"Resource pak doesn't exist at expected location! "
|
||||
"Not applying quirks. Expected location: "
|
||||
)
|
||||
|
||||
|
||||
def json_manifest_factory(extension_id=pakjoy.HANGOUTS_MARKER, url=pakjoy.TARGET_URL):
|
||||
assert isinstance(extension_id, bytes)
|
||||
assert isinstance(url, bytes)
|
||||
|
||||
return f"""
|
||||
{{
|
||||
{extension_id.decode("utf-8")}
|
||||
"key": "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDAQt2ZDdPfoSe/JI6ID5bgLHRCnCu9T36aYczmhw/tnv6QZB2I6WnOCMZXJZlRdqWc7w9jo4BWhYS50Vb4weMfh/I0On7VcRwJUgfAxW2cHB+EkmtI1v4v/OU24OqIa1Nmv9uRVeX0GjhQukdLNhAE6ACWooaf5kqKlCeK+1GOkQIDAQAB",
|
||||
"name": "Google Hangouts",
|
||||
// Note: Always update the version number when this file is updated. Chrome
|
||||
// triggers extension preferences update on the version increase.
|
||||
"version": "1.3.21",
|
||||
"manifest_version": 2,
|
||||
"externally_connectable": {{
|
||||
"matches": [
|
||||
"{url.decode("utf-8")}",
|
||||
"http://localhost:*/*"
|
||||
]
|
||||
}}
|
||||
}}
|
||||
""".strip().encode("utf-8")
|
||||
|
||||
|
||||
def pak_factory(version=5, entries=None, encoding=1, sentinel_position=-1):
|
||||
if entries is None:
|
||||
entries = [json_manifest_factory()]
|
||||
|
||||
buffer = io.BytesIO()
|
||||
buffer.write(struct.pack("<I", version))
|
||||
buffer.write(struct.pack(pakjoy.Pak5Header._FORMAT, encoding, len(entries), 0))
|
||||
|
||||
entry_headers_size = (len(entries) + 1) * 6
|
||||
start_of_data = buffer.tell() + entry_headers_size
|
||||
|
||||
# Normally the sentinel sits between the headers and the data. But to get
|
||||
# full coverage we want to insert it in other positions.
|
||||
with_indices = list(enumerate(entries, 1))
|
||||
if sentinel_position == -1:
|
||||
with_indices.append((0, b""))
|
||||
elif sentinel_position is not None:
|
||||
with_indices.insert(sentinel_position, (0, b""))
|
||||
|
||||
accumulated_data_offset = start_of_data
|
||||
for idx, entry in with_indices:
|
||||
buffer.write(struct.pack(pakjoy.PakEntry._FORMAT, idx, accumulated_data_offset))
|
||||
accumulated_data_offset += len(entry)
|
||||
|
||||
for entry in entries:
|
||||
assert isinstance(entry, bytes)
|
||||
buffer.write(entry)
|
||||
|
||||
buffer.seek(0)
|
||||
return buffer
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("affected_version")
|
||||
class TestWithConstructedResourcesFile:
|
||||
"""Tests that use a constructed pak file to give us more control over it."""
|
||||
|
||||
def test_happy_path(self):
|
||||
buffer = pak_factory()
|
||||
|
||||
parser = pakjoy.PakParser(buffer)
|
||||
|
||||
json_manifest = json_without_comments(parser.manifest)
|
||||
|
||||
assert pakjoy.TARGET_URL.decode("utf-8") in json_manifest[
|
||||
"externally_connectable"
|
||||
]["matches"]
|
||||
|
||||
def test_bad_version(self):
|
||||
buffer = pak_factory(version=99)
|
||||
|
||||
with pytest.raises(
|
||||
binparsing.ParseError,
|
||||
match="Unsupported .pak version 99",
|
||||
):
|
||||
pakjoy.PakParser(buffer)
|
||||
|
||||
@pytest.mark.parametrize("position, error", [
|
||||
(0, "Unexpected sentinel entry"),
|
||||
(None, "Missing sentinel entry"),
|
||||
])
|
||||
def test_bad_sentinal_position(self, position, error):
|
||||
buffer = pak_factory(sentinel_position=position)
|
||||
|
||||
with pytest.raises(binparsing.ParseError):
|
||||
pakjoy.PakParser(buffer)
|
||||
|
||||
@pytest.mark.parametrize("entry", [
|
||||
b"{foo}",
|
||||
b"V2VsbCBoZWxsbyB0aGVyZQo=",
|
||||
])
|
||||
def test_marker_not_found(self, entry):
|
||||
buffer = pak_factory(entries=[entry])
|
||||
|
||||
with pytest.raises(
|
||||
binparsing.ParseError,
|
||||
match="Couldn't find hangouts manifest",
|
||||
):
|
||||
pakjoy.PakParser(buffer)
|
||||
|
||||
def test_url_not_found(self):
|
||||
buffer = pak_factory(entries=[json_manifest_factory(url=b"example.com")])
|
||||
|
||||
parser = pakjoy.PakParser(buffer)
|
||||
with pytest.raises(
|
||||
binparsing.ParseError,
|
||||
match="Couldn't find URL in manifest",
|
||||
):
|
||||
parser.find_patch_offset()
|
||||
|
||||
def test_url_not_found_high_level(self, tmp_cache, caplog,
|
||||
affected_version):
|
||||
buffer = pak_factory(entries=[json_manifest_factory(url=b"example.com")])
|
||||
|
||||
# Write bytes to file so we can test pakjoy.patch()
|
||||
tmpfile = pathlib.Path(tmp_cache) / "bad.pak"
|
||||
with open(tmpfile, "wb") as fd:
|
||||
fd.write(buffer.read())
|
||||
|
||||
with caplog.at_level(logging.ERROR, "misc"):
|
||||
pakjoy.patch(tmpfile)
|
||||
|
||||
assert caplog.messages == [
|
||||
"Failed to apply quirk to resources pak."
|
||||
]
|
||||
Loading…
Reference in New Issue