Skip to content

API documentation

BlurHashPipeline

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BlurHashPipeline:
    """Calculate the BlurHashes of the downloaded images."""

    images_store: Path
    source_field: str
    target_field: str
    x_components: int
    y_components: int

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
        """Init from crawler."""

        images_store = crawler.settings.get("IMAGES_STORE")
        source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
        target_field = crawler.settings.get("BLURHASH_FIELD")

        if not images_store or not source_field or not target_field:
            raise NotConfigured

        if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
            LOGGER.error(
                "Unable to import libraries required for BlurHash, "
                "install with `blurhash` option",
            )
            raise NotConfigured

        x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
        y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

        return cls(
            images_store=images_store,
            source_field=source_field,
            target_field=target_field,
            x_components=x_components,
            y_components=y_components,
        )

    def __init__(
        self,
        *,
        images_store: str | Path,
        source_field: str,
        target_field: str,
        x_components: int = 4,
        y_components: int = 4,
    ) -> None:
        self.images_store = Path(images_store).resolve()
        self.source_field = source_field
        self.target_field = target_field
        self.x_components = x_components
        self.y_components = y_components

    def process_image_obj(
        self,
        image_obj: dict[str, Any],
        x_components: int = 4,
        y_components: int = 4,
    ) -> dict[str, Any]:
        """Calculate the BlurHash of a given image."""

        image_path = image_obj.get("path")
        if not image_path:
            return image_obj

        image_full_path = (self.images_store / image_path).resolve()
        if not image_full_path or not image_full_path.is_file():
            LOGGER.warning("Unable to locate image file <%s>", image_full_path)
            return image_obj

        # Don't modify the original object
        image_obj = image_obj.copy()

        image_obj["blurhash"] = _calculate_blurhash(
            path=image_full_path,
            x_components=x_components,
            y_components=y_components,
        )

        return image_obj

    def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
        """Calculate the BlurHashes of the downloaded images."""

        adapter = ItemAdapter(item)

        image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
        if not image_objs:
            return item

        try:
            adapter[self.target_field] = [
                self.process_image_obj(image_obj) for image_obj in image_objs
            ]
        except Exception:
            LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

        return item

from_crawler(crawler: Crawler) -> BlurHashPipeline classmethod

Init from crawler.

Source code in src/scrapy_extensions/pipelines.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@classmethod
def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
    """Init from crawler."""

    images_store = crawler.settings.get("IMAGES_STORE")
    source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
    target_field = crawler.settings.get("BLURHASH_FIELD")

    if not images_store or not source_field or not target_field:
        raise NotConfigured

    if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
        LOGGER.error(
            "Unable to import libraries required for BlurHash, "
            "install with `blurhash` option",
        )
        raise NotConfigured

    x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
    y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

    return cls(
        images_store=images_store,
        source_field=source_field,
        target_field=target_field,
        x_components=x_components,
        y_components=y_components,
    )

process_image_obj(image_obj: dict[str, Any], x_components: int = 4, y_components: int = 4) -> dict[str, Any]

Calculate the BlurHash of a given image.

Source code in src/scrapy_extensions/pipelines.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def process_image_obj(
    self,
    image_obj: dict[str, Any],
    x_components: int = 4,
    y_components: int = 4,
) -> dict[str, Any]:
    """Calculate the BlurHash of a given image."""

    image_path = image_obj.get("path")
    if not image_path:
        return image_obj

    image_full_path = (self.images_store / image_path).resolve()
    if not image_full_path or not image_full_path.is_file():
        LOGGER.warning("Unable to locate image file <%s>", image_full_path)
        return image_obj

    # Don't modify the original object
    image_obj = image_obj.copy()

    image_obj["blurhash"] = _calculate_blurhash(
        path=image_full_path,
        x_components=x_components,
        y_components=y_components,
    )

    return image_obj

process_item(item: Any, spider: Spider) -> Any

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
    """Calculate the BlurHashes of the downloaded images."""

    adapter = ItemAdapter(item)

    image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
    if not image_objs:
        return item

    try:
        adapter[self.target_field] = [
            self.process_image_obj(image_obj) for image_obj in image_objs
        ]
    except Exception:
        LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

    return item

DelayedRetryMiddleware

Bases: RetryMiddleware

retry requests with a delay (async/await version)

Notes

  • Uses asyncio.sleep to implement the delay.
  • process_response is an async coroutine; Scrapy accepts coroutines from middleware methods and will await them appropriately when using an asyncio-compatible reactor.
  • Behaviour and configuration keys are kept compatible with the original implementation.
Source code in src/scrapy_extensions/downloadermiddlewares.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class DelayedRetryMiddleware(RetryMiddleware):
    """retry requests with a delay (async/await version)

    Notes
    -----
    - Uses `asyncio.sleep` to implement the delay.
    - `process_response` is an async coroutine; Scrapy accepts coroutines from
      middleware methods and will await them appropriately when using an
      asyncio-compatible reactor.
    - Behaviour and configuration keys are kept compatible with the original
      implementation.
    """

    def __init__(
        self,
        settings: Settings,
    ):
        super().__init__(settings)

        delayed_retry_http_codes_settings = settings.getlist("DELAYED_RETRY_HTTP_CODES")
        try:
            delayed_retry_http_codes = (
                int(http_code) for http_code in delayed_retry_http_codes_settings
            )
        except ValueError as exc:
            LOGGER.exception(
                "Invalid http code(s) in DELAYED_RETRY_HTTP_CODES: %s",
                delayed_retry_http_codes_settings,
            )
            raise NotConfigured from exc
        self.delayed_retry_http_codes = frozenset(
            filter(None, delayed_retry_http_codes),
        )

        self.delayed_retry_max_retry_times = settings.getint("DELAYED_RETRY_TIMES", -1)
        self.delayed_retry_priority_adjust = settings.getint(
            "DELAYED_RETRY_PRIORITY_ADJUST",
            self.priority_adjust,
        )
        self.delayed_retry_delay = settings.getfloat("DELAYED_RETRY_DELAY", 1)
        self.delayed_retry_backoff = settings.getbool("DELAYED_RETRY_BACKOFF")
        self.delayed_retry_backoff_max_delay = settings.getfloat(
            "DELAYED_RETRY_BACKOFF_MAX_DELAY",
            10 * self.delayed_retry_delay,
        )

    async def process_response(  # type: ignore[override]
        self,
        request: Request,
        response: Response,
        spider: Spider,
    ) -> Request | Response:
        """retry certain requests with delay

        This method is now a coroutine. If the response status matches a
        delayed-retry code, we await the computed delay and then return the
        retry Request (or None, in which case the original response is
        returned). Otherwise we delegate to the parent implementation.
        """

        if request.meta.get("dont_retry"):
            return response

        if response.status in self.delayed_retry_http_codes:
            reason = response_status_message(response.status)
            req = await self._delayed_retry(request, reason, spider)
            return req or response

        # Delegate to parent. The parent may return a value or a Deferred/coroutine.
        parent_result = super().process_response(request, response, spider)
        if asyncio.iscoroutine(parent_result):
            return await parent_result  # type: ignore[no-any-return]
        return parent_result

    async def _delayed_retry(
        self,
        request: Request,
        reason: str,
        spider: Spider,
    ) -> Request | None:
        """Compute retry Request and await the configured delay before returning it."""

        max_retry_times = request.meta.get(
            "max_retry_times",
            self.delayed_retry_max_retry_times,
        )
        if max_retry_times < 0:
            max_retry_times = sys.maxsize
        priority_adjust = request.meta.get(
            "priority_adjust",
            self.delayed_retry_priority_adjust,
        )

        req = get_retry_request(
            request=request,
            spider=spider,
            reason=reason,
            max_retry_times=max_retry_times,
            priority_adjust=priority_adjust,
        )

        if req is None:
            return None

        delay = request.meta.get("retry_delay", self.delayed_retry_delay)
        req.meta["retry_delay"] = (
            min(2 * delay, self.delayed_retry_backoff_max_delay)
            if self.delayed_retry_backoff
            else delay
        )

        LOGGER.debug("Retry request %r in %.1f second(s)", req, delay)

        # Non-blocking sleep — preserves reactor responsiveness in asyncio mode.
        await asyncio.sleep(delay)
        return req

process_response(request: Request, response: Response, spider: Spider) -> Request | Response async

retry certain requests with delay

This method is now a coroutine. If the response status matches a delayed-retry code, we await the computed delay and then return the retry Request (or None, in which case the original response is returned). Otherwise we delegate to the parent implementation.

Source code in src/scrapy_extensions/downloadermiddlewares.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def process_response(  # type: ignore[override]
    self,
    request: Request,
    response: Response,
    spider: Spider,
) -> Request | Response:
    """retry certain requests with delay

    This method is now a coroutine. If the response status matches a
    delayed-retry code, we await the computed delay and then return the
    retry Request (or None, in which case the original response is
    returned). Otherwise we delegate to the parent implementation.
    """

    if request.meta.get("dont_retry"):
        return response

    if response.status in self.delayed_retry_http_codes:
        reason = response_status_message(response.status)
        req = await self._delayed_retry(request, reason, spider)
        return req or response

    # Delegate to parent. The parent may return a value or a Deferred/coroutine.
    parent_result = super().process_response(request, response, spider)
    if asyncio.iscoroutine(parent_result):
        return await parent_result  # type: ignore[no-any-return]
    return parent_result

LoopingExtension

Run a task in a loop.

Source code in src/scrapy_extensions/extensions.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class LoopingExtension:
    """Run a task in a loop."""

    task: Callable[..., object]
    _task: LoopingCall | None = None
    _interval: float

    def setup_looping_task(
        self,
        task: Callable[..., object],
        crawler: Crawler,
        interval: float,
    ) -> None:
        """Setup task to run periodically at a given interval."""

        self.task = task
        self._interval = interval
        crawler.signals.connect(
            self._spider_opened,
            signal=spider_opened,
        )
        crawler.signals.connect(
            self._spider_closed,
            signal=spider_closed,
        )

    def _spider_opened(self, spider: Spider) -> None:
        if self._task is None:
            self._task = LoopingCall(self.task, spider=spider)
        self._task.start(self._interval, now=False)

    def _spider_closed(self) -> None:
        if self._task is None:
            LOGGER.warning("No task was started")
            return

        if self._task.running:
            self._task.stop()

setup_looping_task(task: Callable[..., object], crawler: Crawler, interval: float) -> None

Setup task to run periodically at a given interval.

Source code in src/scrapy_extensions/extensions.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def setup_looping_task(
    self,
    task: Callable[..., object],
    crawler: Crawler,
    interval: float,
) -> None:
    """Setup task to run periodically at a given interval."""

    self.task = task
    self._interval = interval
    crawler.signals.connect(
        self._spider_opened,
        signal=spider_opened,
    )
    crawler.signals.connect(
        self._spider_closed,
        signal=spider_closed,
    )

NicerAutoThrottle

Bases: AutoThrottle

Autothrottling with exponential backoff depending on status codes.

Source code in src/scrapy_extensions/extensions.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class NicerAutoThrottle(AutoThrottle):
    """Autothrottling with exponential backoff depending on status codes."""

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> NicerAutoThrottle:
        http_codes_settings = crawler.settings.getlist("AUTOTHROTTLE_HTTP_CODES")

        try:
            http_codes = (
                int(http_code) for http_code in arg_to_iter(http_codes_settings)
            )

        except ValueError:
            LOGGER.exception("Invalid HTTP code: %s", http_codes_settings)
            http_codes = None

        return cls(crawler, http_codes)

    def __init__(
        self,
        crawler: Crawler,
        http_codes: Iterable[int] | None = None,
    ):
        super().__init__(crawler)
        self.http_codes: frozenset[int] = frozenset(
            filter(None, arg_to_iter(http_codes)),
        )
        LOGGER.info("Throttle requests on status codes: %s", sorted(self.http_codes))

    def _adjust_delay(
        self,
        slot: Slot,
        latency: float,
        response: Response,
    ) -> None:
        super()._adjust_delay(slot, latency, response)

        if response.status not in self.http_codes:
            return

        new_delay = (
            min(2 * slot.delay, self.maxdelay) if self.maxdelay else 2 * slot.delay
        )

        LOGGER.debug(
            "Status <%d> throttled from %.1fs to %.1fs: %r",
            response.status,
            slot.delay,
            new_delay,
            response,
        )

        slot.delay = new_delay

QuietLogFormatter

Bases: LogFormatter

Be quieter about scraped items.

Source code in src/scrapy_extensions/loggers.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class QuietLogFormatter(LogFormatter):
    """Be quieter about scraped items."""

    def scraped(  # type: ignore[override]
        self,
        item: Any,
        response: Response,
        spider: Spider,
    ) -> LogFormatterResult | None:
        return (
            super().scraped(item, response, spider)
            if spider.settings.getbool("LOG_SCRAPED_ITEMS")
            else None
        )

downloadermiddlewares

Scrapy downloader middleware (async/await rewrite)

This middleware preserves the same behaviour as the original Deferred-based implementation but uses Python coroutines (async/await) and asyncio.sleep for the delay. The public behaviour (delayed retries, backoff, priority adjust, config keys) is unchanged.

DelayedRetryMiddleware

Bases: RetryMiddleware

retry requests with a delay (async/await version)

Notes
  • Uses asyncio.sleep to implement the delay.
  • process_response is an async coroutine; Scrapy accepts coroutines from middleware methods and will await them appropriately when using an asyncio-compatible reactor.
  • Behaviour and configuration keys are kept compatible with the original implementation.
Source code in src/scrapy_extensions/downloadermiddlewares.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class DelayedRetryMiddleware(RetryMiddleware):
    """retry requests with a delay (async/await version)

    Notes
    -----
    - Uses `asyncio.sleep` to implement the delay.
    - `process_response` is an async coroutine; Scrapy accepts coroutines from
      middleware methods and will await them appropriately when using an
      asyncio-compatible reactor.
    - Behaviour and configuration keys are kept compatible with the original
      implementation.
    """

    def __init__(
        self,
        settings: Settings,
    ):
        super().__init__(settings)

        delayed_retry_http_codes_settings = settings.getlist("DELAYED_RETRY_HTTP_CODES")
        try:
            delayed_retry_http_codes = (
                int(http_code) for http_code in delayed_retry_http_codes_settings
            )
        except ValueError as exc:
            LOGGER.exception(
                "Invalid http code(s) in DELAYED_RETRY_HTTP_CODES: %s",
                delayed_retry_http_codes_settings,
            )
            raise NotConfigured from exc
        self.delayed_retry_http_codes = frozenset(
            filter(None, delayed_retry_http_codes),
        )

        self.delayed_retry_max_retry_times = settings.getint("DELAYED_RETRY_TIMES", -1)
        self.delayed_retry_priority_adjust = settings.getint(
            "DELAYED_RETRY_PRIORITY_ADJUST",
            self.priority_adjust,
        )
        self.delayed_retry_delay = settings.getfloat("DELAYED_RETRY_DELAY", 1)
        self.delayed_retry_backoff = settings.getbool("DELAYED_RETRY_BACKOFF")
        self.delayed_retry_backoff_max_delay = settings.getfloat(
            "DELAYED_RETRY_BACKOFF_MAX_DELAY",
            10 * self.delayed_retry_delay,
        )

    async def process_response(  # type: ignore[override]
        self,
        request: Request,
        response: Response,
        spider: Spider,
    ) -> Request | Response:
        """retry certain requests with delay

        This method is now a coroutine. If the response status matches a
        delayed-retry code, we await the computed delay and then return the
        retry Request (or None, in which case the original response is
        returned). Otherwise we delegate to the parent implementation.
        """

        if request.meta.get("dont_retry"):
            return response

        if response.status in self.delayed_retry_http_codes:
            reason = response_status_message(response.status)
            req = await self._delayed_retry(request, reason, spider)
            return req or response

        # Delegate to parent. The parent may return a value or a Deferred/coroutine.
        parent_result = super().process_response(request, response, spider)
        if asyncio.iscoroutine(parent_result):
            return await parent_result  # type: ignore[no-any-return]
        return parent_result

    async def _delayed_retry(
        self,
        request: Request,
        reason: str,
        spider: Spider,
    ) -> Request | None:
        """Compute retry Request and await the configured delay before returning it."""

        max_retry_times = request.meta.get(
            "max_retry_times",
            self.delayed_retry_max_retry_times,
        )
        if max_retry_times < 0:
            max_retry_times = sys.maxsize
        priority_adjust = request.meta.get(
            "priority_adjust",
            self.delayed_retry_priority_adjust,
        )

        req = get_retry_request(
            request=request,
            spider=spider,
            reason=reason,
            max_retry_times=max_retry_times,
            priority_adjust=priority_adjust,
        )

        if req is None:
            return None

        delay = request.meta.get("retry_delay", self.delayed_retry_delay)
        req.meta["retry_delay"] = (
            min(2 * delay, self.delayed_retry_backoff_max_delay)
            if self.delayed_retry_backoff
            else delay
        )

        LOGGER.debug("Retry request %r in %.1f second(s)", req, delay)

        # Non-blocking sleep — preserves reactor responsiveness in asyncio mode.
        await asyncio.sleep(delay)
        return req

process_response(request: Request, response: Response, spider: Spider) -> Request | Response async

retry certain requests with delay

This method is now a coroutine. If the response status matches a delayed-retry code, we await the computed delay and then return the retry Request (or None, in which case the original response is returned). Otherwise we delegate to the parent implementation.

Source code in src/scrapy_extensions/downloadermiddlewares.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def process_response(  # type: ignore[override]
    self,
    request: Request,
    response: Response,
    spider: Spider,
) -> Request | Response:
    """retry certain requests with delay

    This method is now a coroutine. If the response status matches a
    delayed-retry code, we await the computed delay and then return the
    retry Request (or None, in which case the original response is
    returned). Otherwise we delegate to the parent implementation.
    """

    if request.meta.get("dont_retry"):
        return response

    if response.status in self.delayed_retry_http_codes:
        reason = response_status_message(response.status)
        req = await self._delayed_retry(request, reason, spider)
        return req or response

    # Delegate to parent. The parent may return a value or a Deferred/coroutine.
    parent_result = super().process_response(request, response, spider)
    if asyncio.iscoroutine(parent_result):
        return await parent_result  # type: ignore[no-any-return]
    return parent_result

extensions

Extensions.

LoopingExtension

Run a task in a loop.

Source code in src/scrapy_extensions/extensions.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class LoopingExtension:
    """Run a task in a loop."""

    task: Callable[..., object]
    _task: LoopingCall | None = None
    _interval: float

    def setup_looping_task(
        self,
        task: Callable[..., object],
        crawler: Crawler,
        interval: float,
    ) -> None:
        """Setup task to run periodically at a given interval."""

        self.task = task
        self._interval = interval
        crawler.signals.connect(
            self._spider_opened,
            signal=spider_opened,
        )
        crawler.signals.connect(
            self._spider_closed,
            signal=spider_closed,
        )

    def _spider_opened(self, spider: Spider) -> None:
        if self._task is None:
            self._task = LoopingCall(self.task, spider=spider)
        self._task.start(self._interval, now=False)

    def _spider_closed(self) -> None:
        if self._task is None:
            LOGGER.warning("No task was started")
            return

        if self._task.running:
            self._task.stop()

setup_looping_task(task: Callable[..., object], crawler: Crawler, interval: float) -> None

Setup task to run periodically at a given interval.

Source code in src/scrapy_extensions/extensions.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def setup_looping_task(
    self,
    task: Callable[..., object],
    crawler: Crawler,
    interval: float,
) -> None:
    """Setup task to run periodically at a given interval."""

    self.task = task
    self._interval = interval
    crawler.signals.connect(
        self._spider_opened,
        signal=spider_opened,
    )
    crawler.signals.connect(
        self._spider_closed,
        signal=spider_closed,
    )

NicerAutoThrottle

Bases: AutoThrottle

Autothrottling with exponential backoff depending on status codes.

Source code in src/scrapy_extensions/extensions.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class NicerAutoThrottle(AutoThrottle):
    """Autothrottling with exponential backoff depending on status codes."""

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> NicerAutoThrottle:
        http_codes_settings = crawler.settings.getlist("AUTOTHROTTLE_HTTP_CODES")

        try:
            http_codes = (
                int(http_code) for http_code in arg_to_iter(http_codes_settings)
            )

        except ValueError:
            LOGGER.exception("Invalid HTTP code: %s", http_codes_settings)
            http_codes = None

        return cls(crawler, http_codes)

    def __init__(
        self,
        crawler: Crawler,
        http_codes: Iterable[int] | None = None,
    ):
        super().__init__(crawler)
        self.http_codes: frozenset[int] = frozenset(
            filter(None, arg_to_iter(http_codes)),
        )
        LOGGER.info("Throttle requests on status codes: %s", sorted(self.http_codes))

    def _adjust_delay(
        self,
        slot: Slot,
        latency: float,
        response: Response,
    ) -> None:
        super()._adjust_delay(slot, latency, response)

        if response.status not in self.http_codes:
            return

        new_delay = (
            min(2 * slot.delay, self.maxdelay) if self.maxdelay else 2 * slot.delay
        )

        LOGGER.debug(
            "Status <%d> throttled from %.1fs to %.1fs: %r",
            response.status,
            slot.delay,
            new_delay,
            response,
        )

        slot.delay = new_delay

loggers

Logging classes.

QuietLogFormatter

Bases: LogFormatter

Be quieter about scraped items.

Source code in src/scrapy_extensions/loggers.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class QuietLogFormatter(LogFormatter):
    """Be quieter about scraped items."""

    def scraped(  # type: ignore[override]
        self,
        item: Any,
        response: Response,
        spider: Spider,
    ) -> LogFormatterResult | None:
        return (
            super().scraped(item, response, spider)
            if spider.settings.getbool("LOG_SCRAPED_ITEMS")
            else None
        )

pipelines

Scrapy item pipelines

BlurHashPipeline

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BlurHashPipeline:
    """Calculate the BlurHashes of the downloaded images."""

    images_store: Path
    source_field: str
    target_field: str
    x_components: int
    y_components: int

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
        """Init from crawler."""

        images_store = crawler.settings.get("IMAGES_STORE")
        source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
        target_field = crawler.settings.get("BLURHASH_FIELD")

        if not images_store or not source_field or not target_field:
            raise NotConfigured

        if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
            LOGGER.error(
                "Unable to import libraries required for BlurHash, "
                "install with `blurhash` option",
            )
            raise NotConfigured

        x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
        y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

        return cls(
            images_store=images_store,
            source_field=source_field,
            target_field=target_field,
            x_components=x_components,
            y_components=y_components,
        )

    def __init__(
        self,
        *,
        images_store: str | Path,
        source_field: str,
        target_field: str,
        x_components: int = 4,
        y_components: int = 4,
    ) -> None:
        self.images_store = Path(images_store).resolve()
        self.source_field = source_field
        self.target_field = target_field
        self.x_components = x_components
        self.y_components = y_components

    def process_image_obj(
        self,
        image_obj: dict[str, Any],
        x_components: int = 4,
        y_components: int = 4,
    ) -> dict[str, Any]:
        """Calculate the BlurHash of a given image."""

        image_path = image_obj.get("path")
        if not image_path:
            return image_obj

        image_full_path = (self.images_store / image_path).resolve()
        if not image_full_path or not image_full_path.is_file():
            LOGGER.warning("Unable to locate image file <%s>", image_full_path)
            return image_obj

        # Don't modify the original object
        image_obj = image_obj.copy()

        image_obj["blurhash"] = _calculate_blurhash(
            path=image_full_path,
            x_components=x_components,
            y_components=y_components,
        )

        return image_obj

    def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
        """Calculate the BlurHashes of the downloaded images."""

        adapter = ItemAdapter(item)

        image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
        if not image_objs:
            return item

        try:
            adapter[self.target_field] = [
                self.process_image_obj(image_obj) for image_obj in image_objs
            ]
        except Exception:
            LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

        return item

from_crawler(crawler: Crawler) -> BlurHashPipeline classmethod

Init from crawler.

Source code in src/scrapy_extensions/pipelines.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@classmethod
def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
    """Init from crawler."""

    images_store = crawler.settings.get("IMAGES_STORE")
    source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
    target_field = crawler.settings.get("BLURHASH_FIELD")

    if not images_store or not source_field or not target_field:
        raise NotConfigured

    if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
        LOGGER.error(
            "Unable to import libraries required for BlurHash, "
            "install with `blurhash` option",
        )
        raise NotConfigured

    x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
    y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

    return cls(
        images_store=images_store,
        source_field=source_field,
        target_field=target_field,
        x_components=x_components,
        y_components=y_components,
    )

process_image_obj(image_obj: dict[str, Any], x_components: int = 4, y_components: int = 4) -> dict[str, Any]

Calculate the BlurHash of a given image.

Source code in src/scrapy_extensions/pipelines.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def process_image_obj(
    self,
    image_obj: dict[str, Any],
    x_components: int = 4,
    y_components: int = 4,
) -> dict[str, Any]:
    """Calculate the BlurHash of a given image."""

    image_path = image_obj.get("path")
    if not image_path:
        return image_obj

    image_full_path = (self.images_store / image_path).resolve()
    if not image_full_path or not image_full_path.is_file():
        LOGGER.warning("Unable to locate image file <%s>", image_full_path)
        return image_obj

    # Don't modify the original object
    image_obj = image_obj.copy()

    image_obj["blurhash"] = _calculate_blurhash(
        path=image_full_path,
        x_components=x_components,
        y_components=y_components,
    )

    return image_obj

process_item(item: Any, spider: Spider) -> Any

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
    """Calculate the BlurHashes of the downloaded images."""

    adapter = ItemAdapter(item)

    image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
    if not image_objs:
        return item

    try:
        adapter[self.target_field] = [
            self.process_image_obj(image_obj) for image_obj in image_objs
        ]
    except Exception:
        LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

    return item

utils

Utility functions.

calculate_blurhash(image: str | Path | PIL.Image.Image, x_components: int = 4, y_components: int = 4) -> str

Calculate the blurhash of a given image.

Source code in src/scrapy_extensions/utils.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def calculate_blurhash(
    image: str | Path | PIL.Image.Image,
    x_components: int = 4,
    y_components: int = 4,
) -> str:
    """Calculate the blurhash of a given image."""

    import numpy as np
    from blurhash_numba import encode
    from PIL import Image, ImageOps

    image = image if isinstance(image, Image.Image) else Image.open(image)
    image = ImageOps.fit(
        image=image,
        size=(32 * x_components, 32 * y_components),
        centering=(0.5, 0),
    )
    image_array = np.array(image.convert("RGB"), dtype=float)

    blurhash = encode(
        image=image_array,
        x_components=x_components,
        y_components=y_components,
    )
    assert isinstance(blurhash, str)
    return blurhash