How to use events (Chrome-Developer-Tools) using Selenium with Python as a thread?

372 Views Asked by At

My current code is like that:


# Note: driver variable allready initialzed (Chrome)

import trio # async library that selenium uses
import requests
from PIL import Image
from io import BytesIO

def show_image(url):
    try:
        response = requests.get(url)
        img = Image.open(BytesIO(response.content))
        img.show()
    except Exception as e:
        print(e)

def intercept(event, devtools):
    return devtools.fetch.fail_request(request_id=event.request_id,error_reason=devtools.network.ErrorReason.CONNECTION_REFUSED)

async def at_event(listener, connection, intercept):
    session, devtools = connection.session, connection.devtools
    async for event in listener:

        print({"frame_id": event.frame_id, "url": event.request.url})
        #show_image(event.request.url)
        await trio.sleep(0.0001)
        try:
            await session.execute(intercept(event=event, devtools=devtools))
            # await session.execute(devtools.fetch.fulfill_request(request_id = event.request_id, response_code=200))
            # await session.execute(devtools.fetch.continue_request(request_id=event.request_id))
        except Exception as e:
            print(e)

async def async_interceptor(pattern,main_func, intercept):
    async with driver.bidi_connection() as connection:
        session, devtools = connection.session, connection.devtools

        pattern = devtools.fetch.RequestPattern.from_json(pattern)
        await session.execute(devtools.fetch.enable(patterns =[pattern]))

        listener = session.listen(devtools.fetch.RequestPaused)
        async with trio.open_nursery() as nursery:
            nursery.start_soon(at_event,listener, connection, intercept)
            nursery.start_soon(main_func)

async def interceptor():
    await async_interceptor({"resourceType":"Image"},main, intercept)
async def main():
    driver.get("https://www.youtube.com/")

trio.run(interceptor)

What I want to change:

  • I want interceptor to be a non-blocking like in a thread, and not inside a async function
  • atm, the listener doesn't always get triggered, maybe because something in the async code?

In my example code, I want to intercept and fail all requests which are Images

0

There are 0 best solutions below