1: <?php
2:
3: namespace Charcoal\Queue;
4:
5: use \Exception;
6:
7: // PSR-3 (logger) dependencies
8: use \Psr\Log\LoggerAwareInterface;
9: use \Psr\Log\LoggerAwareTrait;
10:
11: // From `charcoal-core`
12: use \Charcoal\Loader\CollectionLoader;
13:
14: // From `charcoal-factory`
15: use \Charcoal\Factory\FactoryInterface;
16:
17: /**
18: * Abstract Queue Manager
19: *
20: * The queue manager is used to load queued items and batch-process them.
21: *
22: * ## Loading queued items
23: *
24: * If a "queue_id" is specified, only the item for this specific queue will be loaded.
25: * Otherwise, all unprocessed queue items will be processed.
26: *
27: * ## Type of queue items
28: *
29: * The type of queue items can be set in extended concrete class with the
30: * `queue_item_proto()` method. This method should return a QueueItemInterface instance.
31: *
32: * ## Callbacks
33: *
34: * There are 4 available callback methods that can be set:
35: *
36: * - `item_callback`
37: * - Called after an item has been processed.
38: * - Arguments: `QueueModelInterface $item`
39: * - `item_success_callback`
40: * - `item_failure_callback`
41: * - `processed_callback`
42: * - Called when the entire queue has been processed
43: * - Arguments: `array $success`, `array $failures`
44: */
45: abstract class AbstractQueueManager implements
46: QueueManagerInterface,
47: LoggerAwareInterface
48: {
49: use LoggerAwareTrait;
50:
51: /**
52: * The queue ID.
53: *
54: * If set, then it will load only the items from this queue.
55: * If NULL, load *all* queued items.
56: *
57: * @var mixed $queueId
58: */
59: private $queueId;
60:
61: /**
62: * The callback routine when an item is processed (whether resolved or rejected).
63: *
64: * @var callable $itemCallback
65: */
66: private $itemCallback;
67:
68: /**
69: * The callback routine when the item is resolved.
70: *
71: * @var callable $itemSuccessCallback
72: */
73: private $itemSuccessCallback;
74:
75: /**
76: * The callback routine when the item is rejected.
77: *
78: * @var callable $itemFailureCallback
79: */
80: private $itemFailureCallback;
81:
82: /**
83: * The callback routine when the queue is processed.
84: *
85: * @var callable $processedCallback
86: */
87: private $processedCallback;
88:
89: /**
90: * @var FactoryInterface $queueItemFactory
91: */
92: private $queueItemFactory;
93:
94: /**
95: * Construct new queue manager.
96: *
97: * @param array $data Dependencies and settings.
98: */
99: public function __construct(array $data = [])
100: {
101: $this->setLogger($data['logger']);
102: $this->setQueueItemFactory($data['queue_item_factory']);
103: }
104:
105: /**
106: * @param FactoryInterface $factory The factory used to create queue items.
107: * @return QueueItemInterface Chainable
108: */
109: protected function setQueueItemFactory(FactoryInterface $factory)
110: {
111: $this->queueItemFactory = $factory;
112: return $this;
113: }
114:
115: /**
116: * @return FactoryInterface
117: */
118: protected function queueItemFactory()
119: {
120: return $this->queueItemFactory;
121: }
122:
123: /**
124: * Set the manager's data.
125: *
126: * @param array $data The queue data to set.
127: * @return AbstractQueueManager Chainable
128: */
129: public function setData(array $data)
130: {
131: if (isset($data['queue_id']) && $data['queue_id']) {
132: $this->setQueueId($data['queue_id']);
133: }
134:
135: return $this;
136: }
137:
138: /**
139: * Set the queue's ID.
140: *
141: * @param mixed $id The unique queue identifier.
142: * @return AbstractQueueManager Chainable
143: */
144: public function setQueueId($id)
145: {
146: $this->queueId = $id;
147: return $this;
148: }
149:
150: /**
151: * Get the queue's ID.
152: *
153: * @return mixed
154: */
155: public function queueId()
156: {
157: return $this->queueId;
158: }
159:
160: /**
161: * Set the callback routine when an item is processed.
162: *
163: * @param callable $callback A item callback routine.
164: * @return QueueManagerInterface Chainable
165: */
166: public function setItemCallback(callable $callback)
167: {
168: $this->itemCallback = $callback;
169: return $this;
170: }
171:
172: /**
173: * Set the callback routine when the item is resolved.
174: *
175: * @param callable $callback A item callback routine.
176: * @return QueueManagerInterface Chainable
177: */
178: public function setItemSuccessCallback(callable $callback)
179: {
180: $this->itemSuccessCallback = $callback;
181: return $this;
182: }
183:
184: /**
185: * Set the callback routine when the item is rejected.
186: *
187: * @param callable $callback A item callback routine.
188: * @return QueueManagerInterface Chainable
189: */
190: public function setItemFailureCallback(callable $callback)
191: {
192: $this->itemSuccessCallback = $callback;
193: return $this;
194: }
195:
196: /**
197: * Set the callback routine when the queue is processed.
198: *
199: * @param callable $callback A queue callback routine.
200: * @return QueueManagerInterface Chainable
201: */
202: public function setProcessedCallback(callable $callback)
203: {
204: $this->processedCallback = $callback;
205: return $this;
206: }
207:
208: /**
209: * Process the items of the queue.
210: *
211: * If no callback is passed and a self::$processedCallback is set, the latter is used.
212: *
213: * @param callable $callback An optional alternative callback routine executed
214: * after all queue items are processed.
215: * @return boolean Success / Failure
216: */
217: public function processQueue(callable $callback = null)
218: {
219: $queued = $this->loadQueueItems();
220:
221: if (!is_callable($callback)) {
222: $callback = $this->processedCallback;
223: }
224:
225: $success = [];
226: $failures = [];
227: $skipped = [];
228: foreach ($queued as $q) {
229: try {
230: $res = $q->process($this->itemCallback, $this->itemSuccessCallback, $this->itemFailureCallback);
231: if ($res === true) {
232: $success[] = $q;
233: } elseif ($res === false) {
234: $failures[] = $q;
235: } else {
236: $skipped[] = $q;
237: }
238: } catch (Exception $e) {
239: $this->logger->error(
240: sprintf('Could not process a queue item: %s', $e->getMessage())
241: );
242: $failures[] = $q;
243: continue;
244: }
245: }
246:
247: if (is_callable($callback)) {
248: $callback($success, $failures, $skipped);
249: }
250:
251: return true;
252: }
253:
254: /**
255: * Retrieve the items of the current queue.
256: *
257: * @return Collection
258: */
259: public function loadQueueItems()
260: {
261: $loader = new CollectionLoader([
262: 'logger' => $this->logger,
263: 'factory' => $this->queueItemFactory()
264: ]);
265: $loader->setModel($this->queueItemProto());
266: $loader->addFilter([
267: 'property' => 'processed',
268: 'val' => 0
269: ]);
270: $loader->addFilter([
271: 'property' => 'processing_date',
272: 'val' => date('Y-m-d H:i:s'),
273: 'operator' => '<'
274: ]);
275:
276: $queueId = $this->queueId();
277: if ($queueId) {
278: $loader->addFilter([
279: 'property' => 'queue_id',
280: 'val' => $queueId
281: ]);
282: }
283:
284: $loader->addOrder([
285: 'property' => 'queued_date',
286: 'mode' => 'asc'
287: ]);
288: $queued = $loader->load();
289:
290: return $queued;
291: }
292:
293: /**
294: * Retrieve the queue item's model.
295: *
296: * @return QueueItemInterface
297: */
298: abstract public function queueItemProto();
299: }
300: