Mastering Distributed Scrapy Spiders: Streamlining Workloads with Scrapyd and Postgres

Mastering Distributed Scrapy Spiders: Streamlining Workloads with Scrapyd and Postgres

Distributing load between scrapyd instances.

💡 Don't miss the second installment in our 3-part series on mastering distributed scraping—catch up on the first article here if you haven't already: How we manage 100s of scrapy spiders

In the last article, we dockerized our scraping application. we are now able to run our application in isolated containers. Our next challenge would be how to efficiently distribute the workload between the different scrapyd containers.

Before going further let’s talk about Scrapyd. Scrapyd is a web service designed for managing the execution of Scrapy spiders. It provides a centralized platform where you can deploy, schedule, and monitor your spiders remotely. Scrapyd operates by maintaining an internal queue (built on sqlite) that handles pending spider jobs. When you schedule a spider, it is added to this queue. The scrapyd scheduler removes the spider from this queue once it goes into execution. Scrapyd exposes HTTP API to interact with its different components. It, also, supports concurrent spider execution. Overall, Scrapyd simplifies the process of deploying and managing Scrapy spiders execution.

To efficiently distribute the workload between the different scrapyd containers, one initial idea, was to implement an orchestration node responsible for monitoring pending spiders and the execution status across the different containers. If a container has bandwith for new tasks, the orchestrator would assign it additional work. However, upon reflection, this approach closely resembles the inherent functionality of Scrapyd itself, suggesting redundancy in our proposed solution.

After going through the Scrapyd documentation, we found out that we have the option to create our own custom scheduler and a custom queue to go with it. This gave us the following idea: why not set up a shared queue that all the Scrapyd instances can use? That way, rather than containers pulling spider jobs from their local SQLite queue, each container can get jobs straight from this shared queue.

When it came to building this queue, we chose to use a Postgres database. Sure, there might be other options out there that could be better, like Redis. But we already had a Postgres database all set up and ready to go. Plus, we're not working with a 100s of instances, so we don't foresee any big issues with concurrency as long as we handle it correctly. It's all about keeping it simple.

Our queue jobs will be saved in table called SpiderJob. Let's break down how a spider job is represented in our database. We use SQLAlchemy to define our table, which looks like this:

class SpiderJob():
   __tablename__ = "spider_job"
   
   id: int = field(init=False, metadata={"sa": Column(Integer, primary_key=True)})
   priority: float = field(default=None, metadata={"sa": Column(Float)})
   message: bytes = field(default=None, metadata={"sa": Column(LargeBinary)})

   status: str = field(default=None, metadata={"sa": Column(String)})

   project: str = field(default=None, metadata={"sa": Column(String)})
   spider: str = field(default=None, metadata={"sa": Column(String)})
   job: str = field(default=None, metadata={"sa": Column(String)})
   start_time: datetime.datetime = field(default=None, metadata={"sa": Column(DateTime)})
   end_time: datetime.datetime = field(default=None, metadata={"sa": Column(DateTime)})

the fields priority, message, start_time, end_time, spider, project are necessary for scrapyd to function correctly. For our use case we added the field status. It stores the execution status of the spider. The status is initialized with the value pending. When a scrapyd container polls the spider. The status is set to ongoing. When scrapyd finishes executing the spider, the status changes  to finished.  Now let's give the code a spin:

class PostgresPriorityQueue(object):
   def __init__(self, session=None):
       self.session = session
       self._table_name = SpiderJob.__tablename__
       self.execution_ref = config.DB_EXECUTION_NAME

   @class_session_handler
   def put(self, message, priority=0.0):
       record = SpiderJob(
           spider=message.get("name"),
           job=message.get("_job"),
           execution_ref=self.execution_ref,
           priority=priority,
           status="pending",
           message=self.encode(message),
       )
       self.session.add(record)

   @class_session_handler
   def pop(self):
       self.session.execute(f"LOCK TABLE {self._table_name} IN SHARE ROW EXCLUSIVE MODE")
       record = (
           self.session.query(SpiderJob)
           .filter(SpiderJob.status == "pending")
           .order_by(SpiderJob.priority.desc())
           .first()
       )
       if not record:
           return None
       record.status = "ongoing"
       self.session.add(record)
       return self.decode(record.message)

The put method creates a new SpiderJob line in the database with the status pending. The pop method checks if there is any pending jobs. If yes, it returns the first one. It also changes the status  to ongoing. Notice here that we are using a database lock this is to prevent two simultaneous containers from popping the same SpiderJob. If there are two concurrent requests one will have access to the table and the other one will wait.

Now moving onto handling finished spider jobs, this is managed by another interface. Check it out:

class PostgresFinishedJobs(object):
   def __init__(self, session=None):
       self.session = session
       self.execution_ref = config.DB_EXECUTION_NAME

   @class_session_handler
   def add(self, job):
       record = (
           self.session.query(SpiderJob)
           .filter(
               SpiderJob.spider == job.spider,
               SpiderJob.job == job.job,
               SpiderJob.execution_ref == self.execution_ref,
           )
           .first()
       )
       if record:
           record.project = job.project
           record.start_time = job.start_time
           record.end_time = job.end_time
           record.status = "terminated"
       else:
           record = SpiderJob(
               project=job.project,
               spider=job.spider,
               job=job.job,
               start_time=job.start_time,
               end_time=job.end_time,
               status="terminated",
               execution_ref=self.execution_ref,
           )
       self.session.add(record)

When a spider is finished executing, Scrapyd returns a SpiderJob object that contains the following attributes: priority, message, start_time, end_time, spider, project. The add function finds the job in the SpiderJob table and sets its status to terminated. If it’s not found it creates a new SpiderJob object.

Next up we need to call our newly formed classes into action by including them in the spider_queue class and spider_storage class:


@implementer(ISpiderQueue)
class PostgresSpiderQueue(object):
   def __init__(self, config, collection, session=None):
       self.q = PostgresPriorityQueue(session=session)
       
   ....
       
@implementer(IJobStorage)
class PostgresJobStorage(object):
   def __init__(self, config, session=None):
       self.jstorage = PostgresFinishedJobs(session=session)
       self.finished_to_keep = config.getint("finished_to_keep", 100)
   ...

Lastly, to get Scrapyd to utilize the spiderqueue and jobstorage classes that we created, we need to update the scrapyd.cfg:

[scrapyd]
spiderqueue = libs.scrapy_queue.spider_queue.PostgresSpiderQueue
jobstorage  = libs.scrapy_queue.job_storage.PostgresJobStorage

With these changes whenever we run a new Scrapyd container it will connect to the shared queue. It will start polling the shared queue for any pending spiders. After executing a spider it will change its status to terminated in the shared queue so the other containers do not attempt to execute it.

In the next article, we will discuss how we were able to set up the infrasturcture to orchestrate and manage the scrapyd containers using ECS and terraform.

Amin SAFFAR

Backend Engineer @Stackadoc