Overview

Namespaces

  • Charcoal
    • Queue

Classes

  • Charcoal\Queue\AbstractQueueManager

Interfaces

  • Charcoal\Queue\QueueableInterface
  • Charcoal\Queue\QueueItemInterface
  • Charcoal\Queue\QueueManagerInterface

Traits

  • Charcoal\Queue\QueueableTrait
  • Charcoal\Queue\QueueItemTrait
  • Overview
  • Namespace
  • Class
  1: <?php
  2: 
  3: namespace Charcoal\Queue;
  4: 
  5: use \Exception;
  6: 
  7: // PSR-3 (logger) dependencies
  8: use \Psr\Log\LoggerAwareInterface;
  9: use \Psr\Log\LoggerAwareTrait;
 10: 
 11: // From `charcoal-core`
 12: use \Charcoal\Loader\CollectionLoader;
 13: 
 14: // From `charcoal-factory`
 15: use \Charcoal\Factory\FactoryInterface;
 16: 
 17: /**
 18:  * Abstract Queue Manager
 19:  *
 20:  * The queue manager is used to load queued items and batch-process them.
 21:  *
 22:  * ## Loading queued items
 23:  *
 24:  * If a "queue_id" is specified, only the item for this specific queue will be loaded.
 25:  * Otherwise, all unprocessed queue items will be processed.
 26:  *
 27:  * ## Type of queue items
 28:  *
 29:  * The type of queue items can be set in extended concrete class with the
 30:  * `queue_item_proto()` method. This method should return a QueueItemInterface instance.
 31:  *
 32:  * ## Callbacks
 33:  *
 34:  * There are 4 available callback methods that can be set:
 35:  *
 36:  * - `item_callback`
 37:  *   - Called after an item has been processed.
 38:  *   - Arguments: `QueueModelInterface $item`
 39:  * - `item_success_callback`
 40:  * - `item_failure_callback`
 41:  * - `processed_callback`
 42:  *   - Called when the entire queue has been processed
 43:  *   - Arguments: `array $success`, `array $failures`
 44:  */
 45: abstract class AbstractQueueManager implements
 46:     QueueManagerInterface,
 47:     LoggerAwareInterface
 48: {
 49:     use LoggerAwareTrait;
 50: 
 51:     /**
 52:      * The queue ID.
 53:      *
 54:      * If set, then it will load only the items from this queue.
 55:      * If NULL, load *all* queued items.
 56:      *
 57:      * @var mixed $queueId
 58:      */
 59:     private $queueId;
 60: 
 61:     /**
 62:      * The callback routine when an item is processed (whether resolved or rejected).
 63:      *
 64:      * @var callable $itemCallback
 65:      */
 66:     private $itemCallback;
 67: 
 68:     /**
 69:      * The callback routine when the item is resolved.
 70:      *
 71:      * @var callable $itemSuccessCallback
 72:      */
 73:     private $itemSuccessCallback;
 74: 
 75:     /**
 76:      * The callback routine when the item is rejected.
 77:      *
 78:      * @var callable $itemFailureCallback
 79:      */
 80:     private $itemFailureCallback;
 81: 
 82:     /**
 83:      * The callback routine when the queue is processed.
 84:      *
 85:      * @var callable $processedCallback
 86:      */
 87:     private $processedCallback;
 88: 
 89:     /**
 90:      * @var FactoryInterface $queueItemFactory
 91:      */
 92:     private $queueItemFactory;
 93: 
 94:     /**
 95:      * Construct new queue manager.
 96:      *
 97:      * @param array $data Dependencies and settings.
 98:      */
 99:     public function __construct(array $data = [])
100:     {
101:         $this->setLogger($data['logger']);
102:         $this->setQueueItemFactory($data['queue_item_factory']);
103:     }
104: 
105:     /**
106:      * @param FactoryInterface $factory The factory used to create queue items.
107:      * @return QueueItemInterface Chainable
108:      */
109:     protected function setQueueItemFactory(FactoryInterface $factory)
110:     {
111:         $this->queueItemFactory = $factory;
112:         return $this;
113:     }
114: 
115:     /**
116:      * @return FactoryInterface
117:      */
118:     protected function queueItemFactory()
119:     {
120:         return $this->queueItemFactory;
121:     }
122: 
123:     /**
124:      * Set the manager's data.
125:      *
126:      * @param array $data The queue data to set.
127:      * @return AbstractQueueManager Chainable
128:      */
129:     public function setData(array $data)
130:     {
131:         if (isset($data['queue_id']) && $data['queue_id']) {
132:             $this->setQueueId($data['queue_id']);
133:         }
134: 
135:         return $this;
136:     }
137: 
138:     /**
139:      * Set the queue's ID.
140:      *
141:      * @param mixed $id The unique queue identifier.
142:      * @return AbstractQueueManager Chainable
143:      */
144:     public function setQueueId($id)
145:     {
146:         $this->queueId = $id;
147:         return $this;
148:     }
149: 
150:     /**
151:      * Get the queue's ID.
152:      *
153:      * @return mixed
154:      */
155:     public function queueId()
156:     {
157:         return $this->queueId;
158:     }
159: 
160:     /**
161:      * Set the callback routine when an item is processed.
162:      *
163:      * @param callable $callback A item callback routine.
164:      * @return QueueManagerInterface Chainable
165:      */
166:     public function setItemCallback(callable $callback)
167:     {
168:         $this->itemCallback = $callback;
169:         return $this;
170:     }
171: 
172:     /**
173:      * Set the callback routine when the item is resolved.
174:      *
175:      * @param callable $callback A item callback routine.
176:      * @return QueueManagerInterface Chainable
177:      */
178:     public function setItemSuccessCallback(callable $callback)
179:     {
180:         $this->itemSuccessCallback = $callback;
181:         return $this;
182:     }
183: 
184:     /**
185:      * Set the callback routine when the item is rejected.
186:      *
187:      * @param callable $callback A item callback routine.
188:      * @return QueueManagerInterface Chainable
189:      */
190:     public function setItemFailureCallback(callable $callback)
191:     {
192:         $this->itemSuccessCallback = $callback;
193:         return $this;
194:     }
195: 
196:     /**
197:      * Set the callback routine when the queue is processed.
198:      *
199:      * @param callable $callback A queue callback routine.
200:      * @return QueueManagerInterface Chainable
201:      */
202:     public function setProcessedCallback(callable $callback)
203:     {
204:         $this->processedCallback = $callback;
205:         return $this;
206:     }
207: 
208:     /**
209:      * Process the items of the queue.
210:      *
211:      * If no callback is passed and a self::$processedCallback is set, the latter is used.
212:      *
213:      * @param  callable $callback An optional alternative callback routine executed
214:      *                            after all queue items are processed.
215:      * @return boolean  Success / Failure
216:      */
217:     public function processQueue(callable $callback = null)
218:     {
219:         $queued = $this->loadQueueItems();
220: 
221:         if (!is_callable($callback)) {
222:             $callback = $this->processedCallback;
223:         }
224: 
225:         $success  = [];
226:         $failures = [];
227:         $skipped  = [];
228:         foreach ($queued as $q) {
229:             try {
230:                 $res = $q->process($this->itemCallback, $this->itemSuccessCallback, $this->itemFailureCallback);
231:                 if ($res === true) {
232:                     $success[] = $q;
233:                 } elseif ($res === false) {
234:                     $failures[] = $q;
235:                 } else {
236:                     $skipped[] = $q;
237:                 }
238:             } catch (Exception $e) {
239:                 $this->logger->error(
240:                     sprintf('Could not process a queue item: %s', $e->getMessage())
241:                 );
242:                 $failures[] = $q;
243:                 continue;
244:             }
245:         }
246: 
247:         if (is_callable($callback)) {
248:             $callback($success, $failures, $skipped);
249:         }
250: 
251:         return true;
252:     }
253: 
254:     /**
255:      * Retrieve the items of the current queue.
256:      *
257:      * @return Collection
258:      */
259:     public function loadQueueItems()
260:     {
261:         $loader = new CollectionLoader([
262:             'logger' => $this->logger,
263:             'factory' => $this->queueItemFactory()
264:         ]);
265:         $loader->setModel($this->queueItemProto());
266:         $loader->addFilter([
267:             'property' => 'processed',
268:             'val'      => 0
269:         ]);
270:         $loader->addFilter([
271:              'property' => 'processing_date',
272:              'val'      => date('Y-m-d H:i:s'),
273:              'operator' => '<'
274:         ]);
275: 
276:         $queueId = $this->queueId();
277:         if ($queueId) {
278:             $loader->addFilter([
279:                 'property' => 'queue_id',
280:                 'val'      => $queueId
281:             ]);
282:         }
283: 
284:         $loader->addOrder([
285:             'property' => 'queued_date',
286:             'mode'     => 'asc'
287:         ]);
288:         $queued = $loader->load();
289: 
290:         return $queued;
291:     }
292: 
293:     /**
294:      * Retrieve the queue item's model.
295:      *
296:      * @return QueueItemInterface
297:      */
298:     abstract public function queueItemProto();
299: }
300: 
API documentation generated by ApiGen