Pipeline

处理 item,清洗、入库

  • pipeline_in:数据进入时,一般用作清洗
  • pipeline_save:数据保存方法,一种保存写一个清晰明了
  • pipeline_error:数据保存出错时,默认重试次数 3 ,可通过 settings.PIPELINE_RETRY_TIMES 设置
  • pipeline_failed:数据保存出错超过最大次数
  • pipeline_close:无数据时,整个 pipeline 结束时运行!!!

注意:

  • 数据都是原地修改的,不需要传递
  • 默认启动的数据处理线程是 5,可通过 settings.ITEM_THREADS 调整
  • 默认是单条传递,有多条传递需求的可通过 settings.PIPELINE_ITEM_BUFFER 调整,这样传递 item 的就是列表
  • 不需要的 item 可以通过 DropItemException 进行丢弃,但如果是多条直接在 item 列表移除就行,丢弃的话整个都会被丢掉

结构

import palp
from loguru import logger


class Pipeline(palp.Pipeline):
    def pipeline_in(self, spider, item) -> None:
        """
        入库之前的操作,一般是清洗

        :param spider:
        :param item:
        :return:
        """

    def pipeline_save(self, spider, item) -> None:
        """
        入库

        :param spider:
        :param item: 启用 item_buffer 将会是 List[item] 反之为 item
        :return:
        """
        logger.info(item)

    def pipeline_error(self, spider, item, exception: Exception) -> None:
        """
        入库出错时的操作

        :param spider:
        :param item: 启用 item buffer 时是 List[item]
        :param exception: 错误的详细信息
        :return:
        """
        logger.error(exception)

    def pipeline_failed(self, spider, item) -> None:
        """
        超过最大重试次数时的操作

        :param spider:
        :param item: 启用 item buffer 时是 List[item]
        :return:
        """
        logger.warning(f"失败的 item:{item}")

    def pipeline_close(self, spider) -> None:
        """
        spider 结束时的操作

        :param spider:
        :return:
        """

设置

注意:这里的 1 代表顺序

PIPELINE = {
    1: "pipelines.pipeline.Pipeline",
}