Skip to content

API documentation

BlurHashPipeline

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BlurHashPipeline:
    """Calculate the BlurHashes of the downloaded images."""

    images_store: Path
    source_field: str
    target_field: str
    x_components: int
    y_components: int

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
        """Init from crawler."""

        images_store = crawler.settings.get("IMAGES_STORE")
        source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
        target_field = crawler.settings.get("BLURHASH_FIELD")

        if not images_store or not source_field or not target_field:
            raise NotConfigured

        if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
            LOGGER.error(
                "Unable to import libraries required for BlurHash, "
                "install with `blurhash` option",
            )
            raise NotConfigured

        x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
        y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

        return cls(
            images_store=images_store,
            source_field=source_field,
            target_field=target_field,
            x_components=x_components,
            y_components=y_components,
        )

    def __init__(
        self,
        *,
        images_store: str | Path,
        source_field: str,
        target_field: str,
        x_components: int = 4,
        y_components: int = 4,
    ) -> None:
        self.images_store = Path(images_store).resolve()
        self.source_field = source_field
        self.target_field = target_field
        self.x_components = x_components
        self.y_components = y_components

    def process_image_obj(
        self,
        image_obj: dict[str, Any],
        x_components: int = 4,
        y_components: int = 4,
    ) -> dict[str, Any]:
        """Calculate the BlurHash of a given image."""

        image_path = image_obj.get("path")
        if not image_path:
            return image_obj

        image_full_path = (self.images_store / image_path).resolve()
        if not image_full_path or not image_full_path.is_file():
            LOGGER.warning("Unable to locate image file <%s>", image_full_path)
            return image_obj

        # Don't modify the original object
        image_obj = image_obj.copy()

        image_obj["blurhash"] = _calculate_blurhash(
            path=image_full_path,
            x_components=x_components,
            y_components=y_components,
        )

        return image_obj

    def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
        """Calculate the BlurHashes of the downloaded images."""

        adapter = ItemAdapter(item)

        image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
        if not image_objs:
            return item

        try:
            adapter[self.target_field] = [
                self.process_image_obj(image_obj) for image_obj in image_objs
            ]
        except Exception:
            LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

        return item

from_crawler(crawler: Crawler) -> BlurHashPipeline classmethod

Init from crawler.

Source code in src/scrapy_extensions/pipelines.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@classmethod
def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
    """Init from crawler."""

    images_store = crawler.settings.get("IMAGES_STORE")
    source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
    target_field = crawler.settings.get("BLURHASH_FIELD")

    if not images_store or not source_field or not target_field:
        raise NotConfigured

    if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
        LOGGER.error(
            "Unable to import libraries required for BlurHash, "
            "install with `blurhash` option",
        )
        raise NotConfigured

    x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
    y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

    return cls(
        images_store=images_store,
        source_field=source_field,
        target_field=target_field,
        x_components=x_components,
        y_components=y_components,
    )

process_image_obj(image_obj: dict[str, Any], x_components: int = 4, y_components: int = 4) -> dict[str, Any]

Calculate the BlurHash of a given image.

Source code in src/scrapy_extensions/pipelines.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def process_image_obj(
    self,
    image_obj: dict[str, Any],
    x_components: int = 4,
    y_components: int = 4,
) -> dict[str, Any]:
    """Calculate the BlurHash of a given image."""

    image_path = image_obj.get("path")
    if not image_path:
        return image_obj

    image_full_path = (self.images_store / image_path).resolve()
    if not image_full_path or not image_full_path.is_file():
        LOGGER.warning("Unable to locate image file <%s>", image_full_path)
        return image_obj

    # Don't modify the original object
    image_obj = image_obj.copy()

    image_obj["blurhash"] = _calculate_blurhash(
        path=image_full_path,
        x_components=x_components,
        y_components=y_components,
    )

    return image_obj

process_item(item: Any, spider: Spider) -> Any

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
    """Calculate the BlurHashes of the downloaded images."""

    adapter = ItemAdapter(item)

    image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
    if not image_objs:
        return item

    try:
        adapter[self.target_field] = [
            self.process_image_obj(image_obj) for image_obj in image_objs
        ]
    except Exception:
        LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

    return item

DelayedRetryMiddleware

Bases: RetryMiddleware

retry requests with a delay

Source code in src/scrapy_extensions/downloadermiddlewares.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class DelayedRetryMiddleware(RetryMiddleware):
    """retry requests with a delay"""

    def __init__(
        self,
        settings: Settings,
    ):
        super().__init__(settings)

        delayed_retry_http_codes_settings = settings.getlist("DELAYED_RETRY_HTTP_CODES")
        try:
            delayed_retry_http_codes = (
                int(http_code) for http_code in delayed_retry_http_codes_settings
            )
        except ValueError as exc:
            LOGGER.exception(
                "Invalid http code(s) in DELAYED_RETRY_HTTP_CODES: %s",
                delayed_retry_http_codes_settings,
            )
            raise NotConfigured from exc
        self.delayed_retry_http_codes = frozenset(
            filter(None, delayed_retry_http_codes),
        )

        self.delayed_retry_max_retry_times = settings.getint("DELAYED_RETRY_TIMES", -1)
        self.delayed_retry_priority_adjust = settings.getint(
            "DELAYED_RETRY_PRIORITY_ADJUST",
            self.priority_adjust,
        )
        self.delayed_retry_delay = settings.getfloat("DELAYED_RETRY_DELAY", 1)
        self.delayed_retry_backoff = settings.getbool("DELAYED_RETRY_BACKOFF")
        self.delayed_retry_backoff_max_delay = settings.getfloat(
            "DELAYED_RETRY_BACKOFF_MAX_DELAY",
            10 * self.delayed_retry_delay,
        )

    def process_response(
        self,
        request: Request,
        response: Response,
        spider: Spider,
    ) -> Response | defer.Deferred[Callable[..., Response]]:
        """retry certain requests with delay"""

        if request.meta.get("dont_retry"):
            return response

        if response.status in self.delayed_retry_http_codes:
            reason = response_status_message(response.status)
            return self._delayed_retry(request, reason, spider) or response

        return super().process_response(request, response, spider)

    def _delayed_retry(
        self,
        request: Request,
        reason: str,
        spider: Spider,
    ) -> defer.Deferred[Callable[..., Response]] | None:
        from twisted.internet import defer, reactor

        max_retry_times = request.meta.get(
            "max_retry_times",
            self.delayed_retry_max_retry_times,
        )
        if max_retry_times < 0:
            max_retry_times = sys.maxsize
        priority_adjust = request.meta.get(
            "priority_adjust",
            self.delayed_retry_priority_adjust,
        )

        req = get_retry_request(
            request=request,
            spider=spider,
            reason=reason,
            max_retry_times=max_retry_times,
            priority_adjust=priority_adjust,
        )

        if req is None:
            return None

        delay = request.meta.get("retry_delay", self.delayed_retry_delay)
        req.meta["retry_delay"] = (
            min(2 * delay, self.delayed_retry_backoff_max_delay)
            if self.delayed_retry_backoff
            else delay
        )

        LOGGER.debug("Retry request %r in %.1f second(s)", req, delay)

        deferred: defer.Deferred[Callable[..., Response]] = defer.Deferred()
        deferred.addCallback(lambda req: req)
        reactor.callLater(delay, deferred.callback, req)  # type: ignore[attr-defined]

        return deferred

process_response(request: Request, response: Response, spider: Spider) -> Response | defer.Deferred[Callable[..., Response]]

retry certain requests with delay

Source code in src/scrapy_extensions/downloadermiddlewares.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def process_response(
    self,
    request: Request,
    response: Response,
    spider: Spider,
) -> Response | defer.Deferred[Callable[..., Response]]:
    """retry certain requests with delay"""

    if request.meta.get("dont_retry"):
        return response

    if response.status in self.delayed_retry_http_codes:
        reason = response_status_message(response.status)
        return self._delayed_retry(request, reason, spider) or response

    return super().process_response(request, response, spider)

LoopingExtension

Run a task in a loop.

Source code in src/scrapy_extensions/extensions.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class LoopingExtension:
    """Run a task in a loop."""

    task: Callable[..., object]
    _task: LoopingCall | None = None
    _interval: float

    def setup_looping_task(
        self,
        task: Callable[..., object],
        crawler: Crawler,
        interval: float,
    ) -> None:
        """Setup task to run periodically at a given interval."""

        self.task = task
        self._interval = interval
        crawler.signals.connect(
            self._spider_opened,
            signal=spider_opened,
        )
        crawler.signals.connect(
            self._spider_closed,
            signal=spider_closed,
        )

    def _spider_opened(self, spider: Spider) -> None:
        if self._task is None:
            self._task = LoopingCall(self.task, spider=spider)
        self._task.start(self._interval, now=False)

    def _spider_closed(self) -> None:
        if self._task is None:
            LOGGER.warning("No task was started")
            return

        if self._task.running:
            self._task.stop()

setup_looping_task(task: Callable[..., object], crawler: Crawler, interval: float) -> None

Setup task to run periodically at a given interval.

Source code in src/scrapy_extensions/extensions.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def setup_looping_task(
    self,
    task: Callable[..., object],
    crawler: Crawler,
    interval: float,
) -> None:
    """Setup task to run periodically at a given interval."""

    self.task = task
    self._interval = interval
    crawler.signals.connect(
        self._spider_opened,
        signal=spider_opened,
    )
    crawler.signals.connect(
        self._spider_closed,
        signal=spider_closed,
    )

NicerAutoThrottle

Bases: AutoThrottle

Autothrottling with exponential backoff depending on status codes.

Source code in src/scrapy_extensions/extensions.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class NicerAutoThrottle(AutoThrottle):
    """Autothrottling with exponential backoff depending on status codes."""

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> NicerAutoThrottle:
        http_codes_settings = crawler.settings.getlist("AUTOTHROTTLE_HTTP_CODES")

        try:
            http_codes = (
                int(http_code) for http_code in arg_to_iter(http_codes_settings)
            )

        except ValueError:
            LOGGER.exception("Invalid HTTP code: %s", http_codes_settings)
            http_codes = None

        return cls(crawler, http_codes)

    def __init__(
        self,
        crawler: Crawler,
        http_codes: Iterable[int] | None = None,
    ):
        super().__init__(crawler)
        self.http_codes: frozenset[int] = frozenset(
            filter(None, arg_to_iter(http_codes)),
        )
        LOGGER.info("Throttle requests on status codes: %s", sorted(self.http_codes))

    def _adjust_delay(
        self,
        slot: Slot,
        latency: float,
        response: Response,
    ) -> None:
        super()._adjust_delay(slot, latency, response)

        if response.status not in self.http_codes:
            return

        new_delay = (
            min(2 * slot.delay, self.maxdelay) if self.maxdelay else 2 * slot.delay
        )

        LOGGER.debug(
            "Status <%d> throttled from %.1fs to %.1fs: %r",
            response.status,
            slot.delay,
            new_delay,
            response,
        )

        slot.delay = new_delay

QuietLogFormatter

Bases: LogFormatter

Be quieter about scraped items.

Source code in src/scrapy_extensions/loggers.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class QuietLogFormatter(LogFormatter):
    """Be quieter about scraped items."""

    def scraped(
        self,
        item: Any,
        response: Response,
        spider: Spider,
    ) -> dict[str, Any] | None:
        return (
            super().scraped(item, response, spider)
            if spider.settings.getbool("LOG_SCRAPED_ITEMS")
            else None
        )

downloadermiddlewares

Scrapy downloader middleware

DelayedRetryMiddleware

Bases: RetryMiddleware

retry requests with a delay

Source code in src/scrapy_extensions/downloadermiddlewares.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class DelayedRetryMiddleware(RetryMiddleware):
    """retry requests with a delay"""

    def __init__(
        self,
        settings: Settings,
    ):
        super().__init__(settings)

        delayed_retry_http_codes_settings = settings.getlist("DELAYED_RETRY_HTTP_CODES")
        try:
            delayed_retry_http_codes = (
                int(http_code) for http_code in delayed_retry_http_codes_settings
            )
        except ValueError as exc:
            LOGGER.exception(
                "Invalid http code(s) in DELAYED_RETRY_HTTP_CODES: %s",
                delayed_retry_http_codes_settings,
            )
            raise NotConfigured from exc
        self.delayed_retry_http_codes = frozenset(
            filter(None, delayed_retry_http_codes),
        )

        self.delayed_retry_max_retry_times = settings.getint("DELAYED_RETRY_TIMES", -1)
        self.delayed_retry_priority_adjust = settings.getint(
            "DELAYED_RETRY_PRIORITY_ADJUST",
            self.priority_adjust,
        )
        self.delayed_retry_delay = settings.getfloat("DELAYED_RETRY_DELAY", 1)
        self.delayed_retry_backoff = settings.getbool("DELAYED_RETRY_BACKOFF")
        self.delayed_retry_backoff_max_delay = settings.getfloat(
            "DELAYED_RETRY_BACKOFF_MAX_DELAY",
            10 * self.delayed_retry_delay,
        )

    def process_response(
        self,
        request: Request,
        response: Response,
        spider: Spider,
    ) -> Response | defer.Deferred[Callable[..., Response]]:
        """retry certain requests with delay"""

        if request.meta.get("dont_retry"):
            return response

        if response.status in self.delayed_retry_http_codes:
            reason = response_status_message(response.status)
            return self._delayed_retry(request, reason, spider) or response

        return super().process_response(request, response, spider)

    def _delayed_retry(
        self,
        request: Request,
        reason: str,
        spider: Spider,
    ) -> defer.Deferred[Callable[..., Response]] | None:
        from twisted.internet import defer, reactor

        max_retry_times = request.meta.get(
            "max_retry_times",
            self.delayed_retry_max_retry_times,
        )
        if max_retry_times < 0:
            max_retry_times = sys.maxsize
        priority_adjust = request.meta.get(
            "priority_adjust",
            self.delayed_retry_priority_adjust,
        )

        req = get_retry_request(
            request=request,
            spider=spider,
            reason=reason,
            max_retry_times=max_retry_times,
            priority_adjust=priority_adjust,
        )

        if req is None:
            return None

        delay = request.meta.get("retry_delay", self.delayed_retry_delay)
        req.meta["retry_delay"] = (
            min(2 * delay, self.delayed_retry_backoff_max_delay)
            if self.delayed_retry_backoff
            else delay
        )

        LOGGER.debug("Retry request %r in %.1f second(s)", req, delay)

        deferred: defer.Deferred[Callable[..., Response]] = defer.Deferred()
        deferred.addCallback(lambda req: req)
        reactor.callLater(delay, deferred.callback, req)  # type: ignore[attr-defined]

        return deferred

process_response(request: Request, response: Response, spider: Spider) -> Response | defer.Deferred[Callable[..., Response]]

retry certain requests with delay

Source code in src/scrapy_extensions/downloadermiddlewares.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def process_response(
    self,
    request: Request,
    response: Response,
    spider: Spider,
) -> Response | defer.Deferred[Callable[..., Response]]:
    """retry certain requests with delay"""

    if request.meta.get("dont_retry"):
        return response

    if response.status in self.delayed_retry_http_codes:
        reason = response_status_message(response.status)
        return self._delayed_retry(request, reason, spider) or response

    return super().process_response(request, response, spider)

extensions

Extensions.

LoopingExtension

Run a task in a loop.

Source code in src/scrapy_extensions/extensions.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class LoopingExtension:
    """Run a task in a loop."""

    task: Callable[..., object]
    _task: LoopingCall | None = None
    _interval: float

    def setup_looping_task(
        self,
        task: Callable[..., object],
        crawler: Crawler,
        interval: float,
    ) -> None:
        """Setup task to run periodically at a given interval."""

        self.task = task
        self._interval = interval
        crawler.signals.connect(
            self._spider_opened,
            signal=spider_opened,
        )
        crawler.signals.connect(
            self._spider_closed,
            signal=spider_closed,
        )

    def _spider_opened(self, spider: Spider) -> None:
        if self._task is None:
            self._task = LoopingCall(self.task, spider=spider)
        self._task.start(self._interval, now=False)

    def _spider_closed(self) -> None:
        if self._task is None:
            LOGGER.warning("No task was started")
            return

        if self._task.running:
            self._task.stop()

setup_looping_task(task: Callable[..., object], crawler: Crawler, interval: float) -> None

Setup task to run periodically at a given interval.

Source code in src/scrapy_extensions/extensions.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def setup_looping_task(
    self,
    task: Callable[..., object],
    crawler: Crawler,
    interval: float,
) -> None:
    """Setup task to run periodically at a given interval."""

    self.task = task
    self._interval = interval
    crawler.signals.connect(
        self._spider_opened,
        signal=spider_opened,
    )
    crawler.signals.connect(
        self._spider_closed,
        signal=spider_closed,
    )

NicerAutoThrottle

Bases: AutoThrottle

Autothrottling with exponential backoff depending on status codes.

Source code in src/scrapy_extensions/extensions.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class NicerAutoThrottle(AutoThrottle):
    """Autothrottling with exponential backoff depending on status codes."""

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> NicerAutoThrottle:
        http_codes_settings = crawler.settings.getlist("AUTOTHROTTLE_HTTP_CODES")

        try:
            http_codes = (
                int(http_code) for http_code in arg_to_iter(http_codes_settings)
            )

        except ValueError:
            LOGGER.exception("Invalid HTTP code: %s", http_codes_settings)
            http_codes = None

        return cls(crawler, http_codes)

    def __init__(
        self,
        crawler: Crawler,
        http_codes: Iterable[int] | None = None,
    ):
        super().__init__(crawler)
        self.http_codes: frozenset[int] = frozenset(
            filter(None, arg_to_iter(http_codes)),
        )
        LOGGER.info("Throttle requests on status codes: %s", sorted(self.http_codes))

    def _adjust_delay(
        self,
        slot: Slot,
        latency: float,
        response: Response,
    ) -> None:
        super()._adjust_delay(slot, latency, response)

        if response.status not in self.http_codes:
            return

        new_delay = (
            min(2 * slot.delay, self.maxdelay) if self.maxdelay else 2 * slot.delay
        )

        LOGGER.debug(
            "Status <%d> throttled from %.1fs to %.1fs: %r",
            response.status,
            slot.delay,
            new_delay,
            response,
        )

        slot.delay = new_delay

loggers

Logging classes.

QuietLogFormatter

Bases: LogFormatter

Be quieter about scraped items.

Source code in src/scrapy_extensions/loggers.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class QuietLogFormatter(LogFormatter):
    """Be quieter about scraped items."""

    def scraped(
        self,
        item: Any,
        response: Response,
        spider: Spider,
    ) -> dict[str, Any] | None:
        return (
            super().scraped(item, response, spider)
            if spider.settings.getbool("LOG_SCRAPED_ITEMS")
            else None
        )

pipelines

Scrapy item pipelines

BlurHashPipeline

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BlurHashPipeline:
    """Calculate the BlurHashes of the downloaded images."""

    images_store: Path
    source_field: str
    target_field: str
    x_components: int
    y_components: int

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
        """Init from crawler."""

        images_store = crawler.settings.get("IMAGES_STORE")
        source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
        target_field = crawler.settings.get("BLURHASH_FIELD")

        if not images_store or not source_field or not target_field:
            raise NotConfigured

        if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
            LOGGER.error(
                "Unable to import libraries required for BlurHash, "
                "install with `blurhash` option",
            )
            raise NotConfigured

        x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
        y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

        return cls(
            images_store=images_store,
            source_field=source_field,
            target_field=target_field,
            x_components=x_components,
            y_components=y_components,
        )

    def __init__(
        self,
        *,
        images_store: str | Path,
        source_field: str,
        target_field: str,
        x_components: int = 4,
        y_components: int = 4,
    ) -> None:
        self.images_store = Path(images_store).resolve()
        self.source_field = source_field
        self.target_field = target_field
        self.x_components = x_components
        self.y_components = y_components

    def process_image_obj(
        self,
        image_obj: dict[str, Any],
        x_components: int = 4,
        y_components: int = 4,
    ) -> dict[str, Any]:
        """Calculate the BlurHash of a given image."""

        image_path = image_obj.get("path")
        if not image_path:
            return image_obj

        image_full_path = (self.images_store / image_path).resolve()
        if not image_full_path or not image_full_path.is_file():
            LOGGER.warning("Unable to locate image file <%s>", image_full_path)
            return image_obj

        # Don't modify the original object
        image_obj = image_obj.copy()

        image_obj["blurhash"] = _calculate_blurhash(
            path=image_full_path,
            x_components=x_components,
            y_components=y_components,
        )

        return image_obj

    def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
        """Calculate the BlurHashes of the downloaded images."""

        adapter = ItemAdapter(item)

        image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
        if not image_objs:
            return item

        try:
            adapter[self.target_field] = [
                self.process_image_obj(image_obj) for image_obj in image_objs
            ]
        except Exception:
            LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

        return item

from_crawler(crawler: Crawler) -> BlurHashPipeline classmethod

Init from crawler.

Source code in src/scrapy_extensions/pipelines.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@classmethod
def from_crawler(cls, crawler: Crawler) -> BlurHashPipeline:
    """Init from crawler."""

    images_store = crawler.settings.get("IMAGES_STORE")
    source_field = crawler.settings.get("IMAGES_RESULT_FIELD")
    target_field = crawler.settings.get("BLURHASH_FIELD")

    if not images_store or not source_field or not target_field:
        raise NotConfigured

    if not find_spec("scrapy_extensions.utils", "calculate_blurhash"):
        LOGGER.error(
            "Unable to import libraries required for BlurHash, "
            "install with `blurhash` option",
        )
        raise NotConfigured

    x_components = crawler.settings.getint("BLURHASH_X_COMPONENTS", 4)
    y_components = crawler.settings.getint("BLURHASH_Y_COMPONENTS", 4)

    return cls(
        images_store=images_store,
        source_field=source_field,
        target_field=target_field,
        x_components=x_components,
        y_components=y_components,
    )

process_image_obj(image_obj: dict[str, Any], x_components: int = 4, y_components: int = 4) -> dict[str, Any]

Calculate the BlurHash of a given image.

Source code in src/scrapy_extensions/pipelines.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def process_image_obj(
    self,
    image_obj: dict[str, Any],
    x_components: int = 4,
    y_components: int = 4,
) -> dict[str, Any]:
    """Calculate the BlurHash of a given image."""

    image_path = image_obj.get("path")
    if not image_path:
        return image_obj

    image_full_path = (self.images_store / image_path).resolve()
    if not image_full_path or not image_full_path.is_file():
        LOGGER.warning("Unable to locate image file <%s>", image_full_path)
        return image_obj

    # Don't modify the original object
    image_obj = image_obj.copy()

    image_obj["blurhash"] = _calculate_blurhash(
        path=image_full_path,
        x_components=x_components,
        y_components=y_components,
    )

    return image_obj

process_item(item: Any, spider: Spider) -> Any

Calculate the BlurHashes of the downloaded images.

Source code in src/scrapy_extensions/pipelines.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def process_item(self, item: Any, spider: Spider) -> Any:  # noqa: ARG002
    """Calculate the BlurHashes of the downloaded images."""

    adapter = ItemAdapter(item)

    image_objs = tuple(arg_to_iter(adapter.get(self.source_field)))
    if not image_objs:
        return item

    try:
        adapter[self.target_field] = [
            self.process_image_obj(image_obj) for image_obj in image_objs
        ]
    except Exception:
        LOGGER.exception("Unable to add field <%s> to the item", self.target_field)

    return item

utils

Utility functions.

calculate_blurhash(image: str | Path | PIL.Image.Image, x_components: int = 4, y_components: int = 4) -> str

Calculate the blurhash of a given image.

Source code in src/scrapy_extensions/utils.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def calculate_blurhash(
    image: str | Path | PIL.Image.Image,
    x_components: int = 4,
    y_components: int = 4,
) -> str:
    """Calculate the blurhash of a given image."""

    import numpy as np
    from blurhash_numba import encode
    from PIL import Image, ImageOps

    image = image if isinstance(image, Image.Image) else Image.open(image)
    image = ImageOps.fit(
        image=image,
        size=(32 * x_components, 32 * y_components),
        centering=(0.5, 0),
    )
    image_array = np.array(image.convert("RGB"), dtype=float)

    blurhash = encode(
        image=image_array,
        x_components=x_components,
        y_components=y_components,
    )
    assert isinstance(blurhash, str)
    return blurhash