Dec 11 2015
Dec 11

Drupal 8 logo

The Queue API in Drupal allows us to handle a number of tasks at a later stage. What this means is that we can place items into a queue which will run some time in the future and process each individual item at that point and at least once. Usually, this happens on CRON runs, and Drupal 8 allows for a quick set up for cronjob based queues. It doesn’t necessarily have to be CRON, however.

In this article, we will look at using the Queue API in Drupal 8 by exploring two simple examples. The first will see the queue triggered by Cron while the second will allow us to manually do so ourselves. However, the actual processing will be handled by a similar worker. If you want to follow along, clone this git repository where you can find the npq module we will write in this article.

The module we’ll work with is called Node Publisher Queue and it automatically adds newly created nodes that are saved unpublished to a queue to be published later on. We will see how later on can be the next CRON run or a manual action triggered by the site’s administrator. First, let’s understand some basic concepts about queues in Drupal 8.

The theory

There are a few components that make up the Queue API in Drupal 8.

The most important role in this API is played by the QueueInterface implementation which represents the queue. The default queue type Drupal 8 ships with is currently the DatabaseQueue which is a type of reliable queue that makes sure all its items are processed at least once and in their original order (FIFO). This is in contrast to unreliable queues which only do their best to achieve this (something for which valid use cases do exist).

The typical role of the queue object is to create items, later claim them from the queue and delete them when they have been processed. In addition, it can release items if processing is either not finished or another worker needs to process them again before deletion.

The QueueInterface implementation is instantiated with the help of a general QueueFactory. In the case of the DatabaseQueue, the former uses the DatabaseQueueFactory as well. Queues also need to be created before they can be used. However, the DatabaseQueue is already created when Drupal is first installed so no additional setup is required.

The Queue Workers are responsible for processing queue items as they receive them. In Drupal 8 these are QueueWorker plugins that implement the QueueWorkerInterface. Using the QueueWorkerManager, we create instances of these plugins and process the items whenever the queue needs to be run.

The Node Publish Queue module

Now that we’ve covered the basic concepts of the Queue API in Drupal 8, let’s get our hands dirty and create the functionality described in the introduction. Our npq.info.yml file can be simple:

name: Node Publish Queue
description: Demo module illustrating the Queue API in Drupal 8
core: 8.x
type: module

Queue item creation

Inside the npq.module file we take care of the logic for creating queue items whenever a node is saved and not published:

use Drupal\Core\Entity\EntityInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;

/**
 * Implements hook_entity_insert().
 */
function npq_entity_insert(EntityInterface $entity) {
  if ($entity->getEntityTypeId() !== 'node') {
    return;
  }

  if ($entity->isPublished()) {
    return;
  }

  /** @var QueueFactory $queue_factory */
  $queue_factory = \Drupal::service('queue');
  /** @var QueueInterface $queue */
  $queue = $queue_factory->get('cron_node_publisher');
  $item = new \stdClass();
  $item->nid = $entity->id();
  $queue->createItem($item);
}

Inside this basic hook_entity_insert() implementation we do a very simple task. We first retrieve the QueueFactoryInterface object from the service container and use it to get a queue called cron_node_publisher. If we track things down, we notice that the get() method on the DatabaseQueueFactory simply creates a new DatabaseQueue instance with the name we pass to it.

Lastly, we create a small PHP object containing the node ID and create an item in the queue with that data. Simple.

The CRON queue worker

Next, let’s create a QueueWorker plugin that will process the queue items whenever Cron is run. However, because we know that we will also need one for manual processing that does the same thing, we will add most of the logic in a base abstract class. So inside the Plugin/QueueWorker namespace of our module we can have the NodePublishBase class:

/**
 * @file
 * Contains Drupal\npq\Plugin\QueueWorker\NodePublishBase.php
 */

namespace Drupal\npq\Plugin\QueueWorker;

use Drupal\Core\Entity\EntityStorageInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\node\NodeInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;


/**
 * Provides base functionality for the NodePublish Queue Workers.
 */
abstract class NodePublishBase extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The node storage.
   *
   * @var \Drupal\Core\Entity\EntityStorageInterface
   */
  protected $nodeStorage;

  /**
   * Creates a new NodePublishBase object.
   *
   * @param \Drupal\Core\Entity\EntityStorageInterface $node_storage
   *   The node storage.
   */
  public function __construct(EntityStorageInterface $node_storage) {
    $this->nodeStorage = $node_storage;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $container->get('entity.manager')->getStorage('node')
    );
  }

  /**
   * Publishes a node.
   *
   * @param NodeInterface $node
   * @return int
   */
  protected function publishNode($node) {
    $node->setPublished(TRUE);
    return $node->save();
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    /** @var NodeInterface $node */
    $node = $this->nodeStorage->load($data->nid);
    if (!$node->isPublished() && $node instanceof NodeInterface) {
      return $this->publishNode($node);
    }
  }
}

Right off the bat we can see that we are using dependency injection to inject the NodeStorage into our class. For more information about dependency injection and the service container, feel free to check out my article on the topic.

In this base class we have two methods: publishNode() and the obligatory processItem(). The former publishes and saves a node that is passed to it. The latter loads the node using the node ID contained in the $data object and publishes it if it’s unpublished.

Now, let’s create a CronNodePublisher plugin that will use this logic on Cron runs:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes on CRON run.
 *
 * @QueueWorker(
 *   id = "cron_node_publisher",
 *   title = @Translation("Cron Node Publisher"),
 *   cron = {"time" = 10}
 * )
 */
class CronNodePublisher extends NodePublishBase {}

And that is all. We don’t need any other logic than what already is in our base class. Notice that, in the annotation, we are telling Drupal that this worker needs to be used by Cron to process as many items as it can within 10 seconds. How does this happen?

Whenever Cron runs, it uses the QueueWorkerManager to load all its plugin definitions. Then, if any of them have the cron key in their annotation, a Queue with the same name as the ID of the worker is loaded for processing. Lastly, each item in the queue is claimed and processed by the worker until the specified time has elapsed.

If we now save an unpublished node, it will most likely become published at the next Cron run.

The manual worker

Let’s create also the possibility for the Queue to be processed manually. First, let’s adapt the hook_entity_insert() implementation from before and change this line:

$queue = $queue_factory->get('cron_node_publisher');

to this:

$queue = $queue_factory->get('manual_node_publisher');

You can of course provide an admin screen for configuring which type of node publisher the application should use.

Second, let’s create our ManualNodePublisher plugin:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes via a manual action triggered by an admin.
 *
 * @QueueWorker(
 *   id = "manual_node_publisher",
 *   title = @Translation("Manual Node Publisher"),
 * )
 */
class ManualNodePublisher extends NodePublishBase {}

This is almost the same as with the CRON example but without the cron key.

Third, let’s create a form where we can see how many items are in the manual_node_publisher queue and process them all by the press of a button. Inside npq.routing.yml in the module root folder:

demo.form:
  path: '/npq'
  defaults:
    _form: '\Drupal\npq\Form\NodePublisherQueueForm'
    _title: 'Node Publisher'
  requirements:
    _permission: 'administer site configuration'

We define a path at /npq which should use the specified form that lives in that namespace and that we can define as such:

/**
 * @file
 * Contains \Drupal\npq\Form\NodePublisherQueueForm.
 */

namespace Drupal\npq\Form;

use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\SuspendQueueException;
use Symfony\Component\DependencyInjection\ContainerInterface;

class NodePublisherQueueForm extends FormBase {

  /**
   * @var QueueFactory
   */
  protected $queueFactory;

  /**
   * @var QueueWorkerManagerInterface
   */
  protected $queueManager;


  /**
   * {@inheritdoc}
   */
  public function __construct(QueueFactory $queue, QueueWorkerManagerInterface $queue_manager) {
    $this->queueFactory = $queue;
    $this->queueManager = $queue_manager;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('queue'),
      $container->get('plugin.manager.queue_worker')
    );
  }
  
  /**
   * {@inheritdoc}.
   */
  public function getFormId() {
    return 'demo_form';
  }
  
  /**
   * {@inheritdoc}.
   */
  public function buildForm(array $form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('node_publisher');

    $form['help'] = array(
      '#type' => 'markup',
      '#markup' => $this->t('Submitting this form will process the Manual Queue which contains @number items.', array('@number' => $queue->numberOfItems())),
    );
    $form['actions']['#type'] = 'actions';
    $form['actions']['submit'] = array(
      '#type' => 'submit',
      '#value' => $this->t('Process queue'),
      '#button_type' => 'primary',
    );
    
    return $form;
  }
  
  /**
   * {@inheritdoc}
   */
  public function submitForm(array &$form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('manual_node_publisher');
    /** @var QueueWorkerInterface $queue_worker */
    $queue_worker = $this->queueManager->createInstance('manual_node_publisher');

    while($item = $queue->claimItem()) {
      try {
        $queue_worker->processItem($item->data);
        $queue->deleteItem($item);
      }
      catch (SuspendQueueException $e) {
        $queue->releaseItem($item);
        break;
      }
      catch (\Exception $e) {
        watchdog_exception('npq', $e);
      }
    }
  }
}


We are again using dependency injection to inject the QueueFactory and the manager for QueueWorker plugins. Inside buildForm() we are creating a basic form structure and using the numberOfItems() method on the queue to tell the user how many items they are about to process. And finally, inside the submitForm() method we take care of the processing. But how do we do that?

First, we load the Queue and instantiate a Queue worker (in both cases we use the manual_node_publisher id). Then we run a while loop until all the items have been processed. The claimItem() method is responsible for blocking a queue item from being claimed by another queue and returning it for processing. After it gets processed by the worker, we delete it. In the next iteration, the next item is returned and on like this until no items are left.

Although we have not used it, the SuspendQueueException is meant to indicate that during the processing of the item, the worker found a problem that would most likely make all other items in the queue fail as well. And for this reason it is pointless to continue to the next item so we break out of the loop. However, we also release the item so that when we try again later, the item is available. Other exceptions are also caught and logged to the watchdog.

Now if we create a couple of nodes and don’t publish them, we’ll see their count inside the message if we navigate to /npq. By clicking the submit button we process (publish) them all one by one.

This has been a demonstration example only. It’s always important to take into account the potential load of processing a large number of items and either limit that so your request doesn’t time out or use the Batch API to split them into multiple requests.

Conclusion

In this article we’ve looked at the Queue API in Drupal 8. We’ve learned some basic concepts about how it is built and how it works, but we’ve also seen some examples of how we can work with it. Namely, we’ve played with two use cases by which we can publish unpublished nodes either during Cron runs or manually via an action executed by the user.

Have you tried out the Queue API in Drupal 8? Let us know how it went!

Daniel Sipos

Meet the author

Daniel Sipos is a Drupal developer who lives in Brussels, Belgium. He works professionally with Drupal but likes to use other PHP frameworks and technologies as well. He runs webomelette.com, a Drupal blog where he writes articles and tutorials about Drupal development, theming and site building.
Jun 27 2011
Jun 27

Here at Appnovation, we usually deploy Drupal sites on Linux servers. Recently, however, one of our clients needed Drupal to run off a Windows 2008 server. I had to set up a cron job on a Windows 2008 Server for Drupal and came across two methods for setting up a Scheduled Task.

1. Using the GUI

- Click on Start and then go to Administrative Tools, once there click the “Task Scheduler” item
- Once the Task Scheduler opens, click “Create Basic Task”
- Give the task a name and description and click next

read more

Jan 10 2011
Jan 10

Following the release of Drush 4 and picking up on a couple of suggestions made by Moshe Weitzman and Nick Thompson on my previous Drush 3 Automated backup post, it was time for a revised post.

Drush 4 makes it really REALLY easy to backup all your sites, no more bash scripting etc.

Install / Update to Drush 4

Follow Drush installation guide

drushrc.php Setup

In the Drupal root directory add drushrc.php with the following setup:

/*
 * Customize this associative array with your own tables. This is the list of
 * tables whose *data* is skipped by the 'sql-dump' and 'sql-sync' commands when
 * a structure-tables-key is provided. You may add new tables to the existing
 * array or add a new element.
 */
$options['structure-tables'] = array(
  'common' => array(
    'cache', 
    'cache_filter', 
    'cache_menu', 
    'cache_page', 
    'history', 
    'sessions', 
    'watchdog',
    ),
  );

Note:
You may need to adjust the structure-tables array to suit your requirements.

Test Drush

drush --root=/var/www/html/drupal @sites st -y

If you see the status output, proceed to the next step.

Drush Backup Command

 drush --root=/var/www/html/drupal @sites sql-dump --result-file --gzip --structure-tables-key=common

This puts the the backups in timestamp named directories within ~/.drush-backups folder.

Crontab

For the automated nightly backups, you can setup the cron jobs with the above mentioned command.

Related blog posts: 


Bookmark and Share
Dec 23 2010
Dec 23

Drush + Bash + Cron: Datbase Backup Goals

  • Scan sites directory for a given drupal install
  • Find all multisite folders/symlinks
  • For each multisite:
  • Use Drush to clear cache - we dont want cache table bloating up the MySQL dump file
  • Use Drush to delete watchdog logs - we dont want watchdog table bloating up the MySQL dump file
  • Use Drush to backup the database to pre-assigned folder
  • Use tar to compress and timestamp the Drush generated sql file
  • Setup Crontab to run perodically with the above commands as a bash file

Assumptions and Instructions

You will need to adjust the Bash file if any of these are not the same on your server

  • Drupal is installed in /var/www/html/drupal
  • Multisites are setup in the /var/www/html/drupal/sites folder
  • Backup folder exists in /var/www/backup/sqldumps
  • Drush is already installed in /root/drush/drush. If drush is not installed follow this Drush installation guide
  • AWK is already installed, if not, type: sudo yum install gawk

Drush Backup BASH file

Copy paste the code below and create a new bash file ideally in your/root home folder. Make the Bash file executable.

#!/bin/bash
#
 
# Adjust to match your system settings
DRUSH=/root/drush/drush
ECHO=/bin/echo
FIND=/usr/bin/find
AWK=/bin/awk
 
# Adjust to match your system settings
docroot=/var/www/html/drupal
backup_dir=/var/www/backup/sqldumps
 
multisites=$1
 
START_TIME=$(date +%Y%m%d%H%M);
 
# Add all multisites for a given docroot into a list. Detects all web addresses which are a directory which isn't named all, default or ends in .local.
if [ "${multisites}" = "all" ];then
        # If multisites are folders change -type d
        # If multisites are symlinks change -type l
        # Adjust $8 to match your docroot, it $x needs to be the name of the multisite folder/symlink
        multisites_list="`$FIND ${docroot}/sites/* -type l -prune | $AWK -F \/ '$8!="all" && $8!="default" && $8!~/\.local$/ { print $8 }'`"
else
        multisites_list=$multisites
fi
 
 
# Must be in the docroot directory before proceeding.
cd $docroot
 
for multisite in $multisites_list
do
        # Echo to the screen the current task.
        $ECHO
        $ECHO "##############################################################"
        $ECHO "Backing up ${multisite}"
        $ECHO
        $ECHO
 
        # Clear Drupal cache
        $DRUSH -y -u 1 -l "${multisite}" cc all
 
        # Truncate Watchdog
        $DRUSH -y -u 1 -l "${multisite}" wd-del all
 
        # SQL Dump DB
        $DRUSH -u 1 -l "${multisite}" sql-dump --result-file="${backup_dir}"/"${multisite}".sql
 
        # Compress the SQL Dump
        tar -czv -f "${backup_dir}"/"${START_TIME}"-"${multisite}".tar.gz -C "${backup_dir}"/ "${multisite}".sql
 
        # Delete original SQL Dump
        rm -f "${backup_dir}"/"${multisite}".sql
 
        $ECHO
        $ECHO
        $ECHO "Finished backing up ${multisite}"
        $ECHO
        $ECHO "##############################################################"
 
done

Setup Crontab

Assuming your bash file containing the code above is saved as /root/drush_backup.sh, you can setup a crontab for root user.

crontab -e
1 1 * * * /root/drush_backup_db.sh

Further Reading and Links

Related blog posts: 


Bookmark and Share
Aug 09 2010
Aug 09

One great feature that Drupal has is the ability to make modules run certain tasks, often heavy ones, in the background at preset intervals. This can be achieved by a module implementing hook_cron.

Core uses this feature to index new content for the search module, ping module to notify remote sites of new content, fetch new release information from drupal.org, poll other sites for RSS feeds, and more.

Various contributed modules use this for various purposes, such as mailing out newsletters, cleaning up logs, synchronizing content with other servers/sites, and much more ...

Core Cron: All or None

This powerful core feature has some limitations though, such as:

  • All hook cron implementations for all modules are run at the same time, in sequence alphabetically or according to module weight.
  • When cron for one module is stuck, all modules following it will not be executed, and cron will not run again until 1 hour has passed.
  • There is no way to know which module is the one that caused the entire cron to get stuck. Moreover, there is no instrumentation information to know which cron hook takes the most time.

2bits.com have proposed core patches to report overcome the lack of instrumentation, by logging the information to the watchdog. The patches are useful only to those who apply them. It is unlikely that they will get in core any time soon.

For a practical example, you can use Job Queue with our own queue mail module to improve end user response time, and avoid timeouts due to sending a lot of emails. This scheme defers sending to when cron is run, and not when a user submits a node or a comment.

This works well, but for core, all cron hooks run at the same time. If you set cron to run every hour, then email sending could be delayed by an hour or even more if job queue cannot send them all in one run. If you make cron run more frequently, e.g. every 15 minutes, then all the heavy hooks such as search indexing and log cleanup will also run every 15 minutes consuming lots of resources.

Enter Elysia Cron ...

With Elysia cron, you can now have the best of both worlds: you can set cron for job_queue to run every minute, and defer other heavy stuff to once a day during off hours, or once an hour. The email is delivered quickly, within minutes, and we don't incur the penalty of long running cron hooks.

Features and Benefits of Elysia Cron

The features that Elysia cron offers are many, the important ones, with a focus on performance, are:

  • You can run different hook_cron implementations for different modules at a different frequency.
  • You are aware what the resource and performance impact of each hook_cron implementation is. This includes the time it took to run it last, the average, and maximum time. This information is very valuable in distributing different hooks across the day, and their frequencies.
  • Set a configurable maximum for cron invocations. Drupal core has a hard coded value of 240 seconds. You can adjust this up or down as per your needs.
  • Handles "stuck" crons better than core. In core, if cron is stuck, it takes one hour for it to automatically recover. In Elysia cron, the other hook invocations continue to run normally.
  • You can set the weight for each module, independent from the weight for the module in the system table. Using this, you can have a different order of execution for modules.
  • You can group modules in "contexts", assigning different run schedules for different contexts, or disable contexts globally.
  • The ability to assign a cron key, or a white list of allowed hosts that can execute cron.
  • Selectively disable cron for one or more modules, but not others, or all cron.
  • Selectively run cron for only one module.
  • Defining a cronapi that developers can use.
  • It requires no patching of core or contributed modules.

Examples of Elysia Cron in action

Here is captcha's cron, which has been configured to run only once a day in the early hours of the morning:

As well, dblog's cron runs once a day too. No need to trigger this every hours or twice an hour.

Search here is shown to be the most heavy of all cron hooks. But still, we run it twice every hour, so that the search is always fresh.

Statistics cleanup is kind of heavy too, so we run it only once a day.

Finally, xmlsitemap is a useful module, yet it is also heavy on a site with lots of nodes. Therefore we run it only once a day.

The above is not cast in stone for these module. They will vary from one site to the other depending on the server configuration, resources available and data set sizes. Moreover, even for the same site, it is recommended to monitor regularly and adjust these on an ongoing basis.

Alternatives to Elysia Cron

Elysia cron is no alone though. There are other modules that have overlapping functions, such as Super Cron, Cron Plus, and even a Cron API module. Super Cron seems promising, but Elysia does everything we need so far, so the motivation to evaluate it low on the list of priorities.

Here is an attempt to compare the various cron modules, but so far it is sparse on information.

A more powerful solution, but also more complex and heavy weight is the use of tools like Hudson Continuous Integration. Since it runs within Java it adds dependencies to the usual LAMP-only server as well as being more demanding on resources. You can read a full article on it here.

Mar 27 2010
Mar 27

This short video shows you how to create a cron job using the Dreamhost web panel that will allow you to execute cron on a Drupal site. As I have noted previously execution of the cron file is an important task for any Drupal site. Many modules rely on cron for periodic processing of data including the core search and aggregator modules. Running cron also prompts the system to check for module updates. 

Here's what I entered in the video. Be sure to replace the url with url of your site. wget -qO /dev/null http://yoursite.com/cron.php

This video is one in a series of videos I created called Getting The Most Out Of Dreamhost. If you're a current Dreamhost customer you probably already know how to do things like set up a domain or create a MySQL database so it may not be relevant to you. If you're considering Dreamhost I think the series offers a very good preview of how the web panel functions make it easy to launch new Drupal sites. There's also a discount code (LEARNDRUPAL2010) on that page that will save you $50 on a new Dreamhost web hosting account.

Note: Click the 'full screen' icon (to the left of the volume control) in order to watch online at full 1280x720 resolution.

Video Links

Flash Version

About Drupal Sun

Drupal Sun is an Evolving Web project. It allows you to:

  • Do full-text search on all the articles in Drupal Planet (thanks to Apache Solr)
  • Facet based on tags, author, or feed
  • Flip through articles quickly (with j/k or arrow keys) to find what you're interested in
  • View the entire article text inline, or in the context of the site where it was created

See the blog post at Evolving Web

Evolving Web