Pipeline
处理 item,清洗、入库
- pipeline_in:数据进入时,一般用作清洗
- pipeline_save:数据保存方法,一种保存写一个清晰明了
- pipeline_error:数据保存出错时,默认重试次数 3 ,可通过 settings.PIPELINE_RETRY_TIMES 设置
- pipeline_failed:数据保存出错超过最大次数
- pipeline_close:无数据时,整个 pipeline 结束时运行!!!
注意:
- 数据都是原地修改的,不需要传递
- 默认启动的数据处理线程是 5,可通过 settings.ITEM_THREADS 调整
- 默认是单条传递,有多条传递需求的可通过 settings.PIPELINE_ITEM_BUFFER 调整,这样传递 item 的就是列表
- 不需要的 item 可以通过 DropItemException 进行丢弃,但如果是多条直接在 item 列表移除就行,丢弃的话整个都会被丢掉
结构
import palp
from loguru import logger
class Pipeline(palp.Pipeline):
def pipeline_in(self, spider, item) -> None:
"""
入库之前的操作,一般是清洗
:param spider:
:param item:
:return:
"""
def pipeline_save(self, spider, item) -> None:
"""
入库
:param spider:
:param item: 启用 item_buffer 将会是 List[item] 反之为 item
:return:
"""
logger.info(item)
def pipeline_error(self, spider, item, exception: Exception) -> None:
"""
入库出错时的操作
:param spider:
:param item: 启用 item buffer 时是 List[item]
:param exception: 错误的详细信息
:return:
"""
logger.error(exception)
def pipeline_failed(self, spider, item) -> None:
"""
超过最大重试次数时的操作
:param spider:
:param item: 启用 item buffer 时是 List[item]
:return:
"""
logger.warning(f"失败的 item:{item}")
def pipeline_close(self, spider) -> None:
"""
spider 结束时的操作
:param spider:
:return:
"""
设置
注意:这里的 1 代表顺序
PIPELINE = {
1: "pipelines.pipeline.Pipeline",
}