-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathPopulateProcessor.php
More file actions
114 lines (93 loc) · 4.36 KB
/
PopulateProcessor.php
File metadata and controls
114 lines (93 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
<?php
namespace Enqueue\ElasticaBundle\Queue;
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\QueueSubscriberInterface;
use Enqueue\Consumption\Result;
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Enqueue\Util\JSON;
final class PopulateProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface
{
private $pagerProviderRegistry;
private $pagerPersisterRegistry;
public function __construct(
PagerProviderRegistry $pagerProviderRegistry,
PagerPersisterRegistry $pagerPersisterRegistry
) {
$this->pagerPersisterRegistry = $pagerPersisterRegistry;
$this->pagerProviderRegistry = $pagerProviderRegistry;
}
public function process(Message $message, Context $context): Result
{
if ($message->isRedelivered()) {
$replyMessage = $this->createReplyMessage($context, $message, 0,'The message was redelivered. Chances are that something has gone wrong.');
return Result::reply($replyMessage, Result::REJECT);
}
$objectsCount = 0;
try {
$data = JSON::decode($message->getBody());
if (!isset($data['options'])) {
return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing options.'));
}
if (!isset($data['page'])) {
return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing page.'));
}
if (!isset($data['options']['indexName'])) {
return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing indexName option.'));
}
if (!isset($data['options']['typeName'])) {
return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing typeName option.'));
}
$options = $data['options'];
$options['first_page'] = $data['page'];
$options['last_page'] = $data['page'];
$provider = $this->pagerProviderRegistry->getProvider($options['indexName'], $options['typeName']);
$pager = $provider->provide($options);
$pager->setMaxPerPage($options['max_per_page']);
$pager->setCurrentPage($options['first_page']);
$objectsCount = count($pager->getCurrentPageResults());
$pagerPersister = $this->pagerPersisterRegistry->getPagerPersister(InPlacePagerPersister::NAME);
$pagerPersister->insert($pager, $options);
return Result::reply($this->createReplyMessage($context, $message, $objectsCount));
} catch (\Throwable $e) {
return Result::reply($this->createExceptionReplyMessage($context, $message, $objectsCount, $e), Result::REJECT);
}
}
private function createExceptionReplyMessage(Context $context, Message $message, int $objectsCount, \Throwable $e): Message
{
$errorMessage = sprintf(
'<error>The queue processor has failed to process the message with exception: </error><comment>%s: %s in file %s at line %s.</comment>',
get_class($e),
$e->getMessage(),
$e->getFile(),
$e->getLine()
);
return $this->createReplyMessage($context, $message, $objectsCount, $errorMessage);
}
private function createReplyMessage(Context $context, Message $message, int $objectsCount, string $error = null): Message
{
$replyMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
$replyMessage->setProperty('fos-populate-objects-count', $objectsCount);
if ($error) {
$replyMessage->setProperty('fos-populate-error', $error);
}
return $replyMessage;
}
public static function getSubscribedCommand(): array
{
return [
'command' => Commands::POPULATE,
'queue' => Commands::POPULATE,
'prefix_queue' => true,
'exclusive' => true,
];
}
public static function getSubscribedQueues(): array
{
return [Commands::POPULATE];
}
}