Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
ProcessCronQueueObserver.php
Go to the documentation of this file.
1 <?php
10 namespace Magento\Cron\Observer;
11 
17 use Magento\Framework\Profiler\Driver\Standard\StatFactory;
18 
25 {
29  const CACHE_KEY_LAST_SCHEDULE_GENERATE_AT = 'cron_last_schedule_generate_at';
30 
31  const CACHE_KEY_LAST_HISTORY_CLEANUP_AT = 'cron_last_history_cleanup_at';
32 
37  const STANDALONE_PROCESS_STARTED = 'standaloneProcessStarted';
38 
44  const XML_PATH_SCHEDULE_GENERATE_EVERY = 'schedule_generate_every';
45 
46  const XML_PATH_SCHEDULE_AHEAD_FOR = 'schedule_ahead_for';
47 
48  const XML_PATH_SCHEDULE_LIFETIME = 'schedule_lifetime';
49 
50  const XML_PATH_HISTORY_CLEANUP_EVERY = 'history_cleanup_every';
51 
52  const XML_PATH_HISTORY_SUCCESS = 'history_success_lifetime';
53 
54  const XML_PATH_HISTORY_FAILURE = 'history_failure_lifetime';
55 
61  const SECONDS_IN_MINUTE = 60;
62 
66  const LOCK_TIMEOUT = 5;
67 
71  const LOCK_PREFIX = 'CRON_GROUP_';
72 
76  protected $_pendingSchedules;
77 
81  protected $_config;
82 
86  protected $_objectManager;
87 
91  protected $_cache;
92 
96  protected $_scopeConfig;
97 
101  protected $_scheduleFactory;
102 
106  protected $_request;
107 
111  protected $_shell;
112 
116  protected $dateTime;
117 
122 
126  private $logger;
127 
131  private $state;
132 
136  private $lockManager;
137 
141  private $invalid = [];
142 
146  private $statProfiler;
147 
164  public function __construct(
165  \Magento\Framework\ObjectManagerInterface $objectManager,
166  \Magento\Cron\Model\ScheduleFactory $scheduleFactory,
167  \Magento\Framework\App\CacheInterface $cache,
168  \Magento\Cron\Model\ConfigInterface $config,
169  \Magento\Framework\App\Config\ScopeConfigInterface $scopeConfig,
170  \Magento\Framework\App\Console\Request $request,
171  \Magento\Framework\ShellInterface $shell,
172  \Magento\Framework\Stdlib\DateTime\DateTime $dateTime,
173  \Magento\Framework\Process\PhpExecutableFinderFactory $phpExecutableFinderFactory,
174  \Psr\Log\LoggerInterface $logger,
175  \Magento\Framework\App\State $state,
176  StatFactory $statFactory,
177  \Magento\Framework\Lock\LockManagerInterface $lockManager
178  ) {
179  $this->_objectManager = $objectManager;
180  $this->_scheduleFactory = $scheduleFactory;
181  $this->_cache = $cache;
182  $this->_config = $config;
183  $this->_scopeConfig = $scopeConfig;
184  $this->_request = $request;
185  $this->_shell = $shell;
186  $this->dateTime = $dateTime;
187  $this->phpExecutableFinder = $phpExecutableFinderFactory->create();
188  $this->logger = $logger;
189  $this->state = $state;
190  $this->statProfiler = $statFactory->create();
191  $this->lockManager = $lockManager;
192  }
193 
205  public function execute(\Magento\Framework\Event\Observer $observer)
206  {
207 
208  $currentTime = $this->dateTime->gmtTimestamp();
209  $jobGroupsRoot = $this->_config->getJobs();
210  // sort jobs groups to start from used in separated process
211  uksort(
212  $jobGroupsRoot,
213  function ($a, $b) {
214  return $this->getCronGroupConfigurationValue($b, 'use_separate_process')
215  - $this->getCronGroupConfigurationValue($a, 'use_separate_process');
216  }
217  );
218 
219  $phpPath = $this->phpExecutableFinder->find() ?: 'php';
220 
221  foreach ($jobGroupsRoot as $groupId => $jobsRoot) {
222  if (!$this->isGroupInFilter($groupId)) {
223  continue;
224  }
225  if ($this->_request->getParam(self::STANDALONE_PROCESS_STARTED) !== '1'
226  && $this->getCronGroupConfigurationValue($groupId, 'use_separate_process') == 1
227  ) {
228  $this->_shell->execute(
229  $phpPath . ' %s cron:run --group=' . $groupId . ' --' . Cli::INPUT_KEY_BOOTSTRAP . '='
230  . self::STANDALONE_PROCESS_STARTED . '=1',
231  [
232  BP . '/bin/magento'
233  ]
234  );
235  continue;
236  }
237 
238  $this->lockGroup(
239  $groupId,
240  function ($groupId) use ($currentTime, $jobsRoot) {
241  $this->cleanupJobs($groupId, $currentTime);
242  $this->generateSchedules($groupId);
243  $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
244  }
245  );
246  }
247  }
248 
259  private function lockGroup($groupId, callable $callback)
260  {
261 
262  if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
263  $this->logger->warning(
264  sprintf(
265  "Could not acquire lock for cron group: %s, skipping run",
266  $groupId
267  )
268  );
269  return;
270  }
271  try {
272  $callback($groupId);
273  } finally {
274  $this->lockManager->unlock(self::LOCK_PREFIX . $groupId);
275  }
276  }
277 
289  protected function _runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
290  {
291  $jobCode = $schedule->getJobCode();
292  $scheduleLifetime = $this->getCronGroupConfigurationValue($groupId, self::XML_PATH_SCHEDULE_LIFETIME);
293  $scheduleLifetime = $scheduleLifetime * self::SECONDS_IN_MINUTE;
294  if ($scheduledTime < $currentTime - $scheduleLifetime) {
295  $schedule->setStatus(Schedule::STATUS_MISSED);
296  throw new \Exception(sprintf('Cron Job %s is missed at %s', $jobCode, $schedule->getScheduledAt()));
297  }
298 
299  if (!isset($jobConfig['instance'], $jobConfig['method'])) {
300  $schedule->setStatus(Schedule::STATUS_ERROR);
301  throw new \Exception('No callbacks found');
302  }
303  $model = $this->_objectManager->create($jobConfig['instance']);
304  $callback = [$model, $jobConfig['method']];
305  if (!is_callable($callback)) {
306  $schedule->setStatus(Schedule::STATUS_ERROR);
307  throw new \Exception(
308  sprintf('Invalid callback: %s::%s can\'t be called', $jobConfig['instance'], $jobConfig['method'])
309  );
310  }
311 
312  $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
313 
314  $this->startProfiling();
315  try {
316  $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
317  call_user_func_array($callback, [$schedule]);
318  } catch (\Throwable $e) {
319  $schedule->setStatus(Schedule::STATUS_ERROR);
320  $this->logger->error(sprintf(
321  'Cron Job %s has an error: %s. Statistics: %s',
322  $jobCode,
323  $e->getMessage(),
324  $this->getProfilingStat()
325  ));
326  if (!$e instanceof \Exception) {
327  $e = new \RuntimeException(
328  'Error when running a cron job',
329  0,
330  $e
331  );
332  }
333  throw $e;
334  } finally {
335  $this->stopProfiling();
336  }
337 
338  $schedule->setStatus(Schedule::STATUS_SUCCESS)->setFinishedAt(strftime(
339  '%Y-%m-%d %H:%M:%S',
340  $this->dateTime->gmtTimestamp()
341  ));
342 
343  $this->logger->info(sprintf(
344  'Cron Job %s is successfully finished. Statistics: %s',
345  $jobCode,
346  $this->getProfilingStat()
347  ));
348  }
349 
355  private function startProfiling()
356  {
357  $this->statProfiler->clear();
358  $this->statProfiler->start('job', microtime(true), memory_get_usage(true), memory_get_usage());
359  }
360 
366  private function stopProfiling()
367  {
368  $this->statProfiler->stop('job', microtime(true), memory_get_usage(true), memory_get_usage());
369  }
370 
376  private function getProfilingStat()
377  {
378  $stat = $this->statProfiler->get('job');
379  unset($stat[Stat::START]);
380  return json_encode($stat);
381  }
382 
389  private function getPendingSchedules($groupId)
390  {
391  $jobs = $this->_config->getJobs();
392  $pendingJobs = $this->_scheduleFactory->create()->getCollection();
393  $pendingJobs->addFieldToFilter('status', Schedule::STATUS_PENDING);
394  $pendingJobs->addFieldToFilter('job_code', ['in' => array_keys($jobs[$groupId])]);
395  return $pendingJobs;
396  }
397 
404  private function generateSchedules($groupId)
405  {
409  $lastRun = (int)$this->_cache->load(self::CACHE_KEY_LAST_SCHEDULE_GENERATE_AT . $groupId);
410  $rawSchedulePeriod = (int)$this->getCronGroupConfigurationValue(
411  $groupId,
412  self::XML_PATH_SCHEDULE_GENERATE_EVERY
413  );
414  $schedulePeriod = $rawSchedulePeriod * self::SECONDS_IN_MINUTE;
415  if ($lastRun > $this->dateTime->gmtTimestamp() - $schedulePeriod) {
416  return $this;
417  }
418 
422  $this->_cache->save(
423  $this->dateTime->gmtTimestamp(),
424  self::CACHE_KEY_LAST_SCHEDULE_GENERATE_AT . $groupId,
425  ['crontab'],
426  null
427  );
428 
429  $schedules = $this->getPendingSchedules($groupId);
430  $exists = [];
432  foreach ($schedules as $schedule) {
433  $exists[$schedule->getJobCode() . '/' . $schedule->getScheduledAt()] = 1;
434  }
435 
439  $jobs = $this->_config->getJobs();
440  $this->invalid = [];
441  $this->_generateJobs($jobs[$groupId], $exists, $groupId);
442  $this->cleanupScheduleMismatches();
443 
444  return $this;
445  }
446 
455  protected function _generateJobs($jobs, $exists, $groupId)
456  {
457  foreach ($jobs as $jobCode => $jobConfig) {
458  $cronExpression = $this->getCronExpression($jobConfig);
459  if (!$cronExpression) {
460  continue;
461  }
462 
463  $timeInterval = $this->getScheduleTimeInterval($groupId);
464  $this->saveSchedule($jobCode, $cronExpression, $timeInterval, $exists);
465  }
466  }
467 
475  private function cleanupJobs($groupId, $currentTime)
476  {
477  // check if history cleanup is needed
478  $lastCleanup = (int)$this->_cache->load(self::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $groupId);
479  $historyCleanUp = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_CLEANUP_EVERY);
480  if ($lastCleanup > $this->dateTime->gmtTimestamp() - $historyCleanUp * self::SECONDS_IN_MINUTE) {
481  return $this;
482  }
483  // save time history cleanup was ran with no expiration
484  $this->_cache->save(
485  $this->dateTime->gmtTimestamp(),
486  self::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $groupId,
487  ['crontab'],
488  null
489  );
490 
491  $this->cleanupDisabledJobs($groupId);
492 
493  $historySuccess = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_SUCCESS);
494  $historyFailure = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_FAILURE);
495  $historyLifetimes = [
498  Schedule::STATUS_ERROR => $historyFailure * self::SECONDS_IN_MINUTE,
499  Schedule::STATUS_PENDING => max($historyFailure, $historySuccess) * self::SECONDS_IN_MINUTE,
500  ];
501 
502  $jobs = $this->_config->getJobs()[$groupId];
503  $scheduleResource = $this->_scheduleFactory->create()->getResource();
504  $connection = $scheduleResource->getConnection();
505  $count = 0;
506  foreach ($historyLifetimes as $status => $time) {
507  $count += $connection->delete(
508  $scheduleResource->getMainTable(),
509  [
510  'status = ?' => $status,
511  'job_code in (?)' => array_keys($jobs),
512  'created_at < ?' => $connection->formatDate($currentTime - $time)
513  ]
514  );
515  }
516 
517  if ($count) {
518  $this->logger->info(sprintf('%d cron jobs were cleaned', $count));
519  }
520  }
521 
528  protected function getConfigSchedule($jobConfig)
529  {
530  $cronExpr = $this->_scopeConfig->getValue(
531  $jobConfig['config_path'],
532  \Magento\Store\Model\ScopeInterface::SCOPE_STORE
533  );
534 
535  return $cronExpr;
536  }
537 
547  protected function saveSchedule($jobCode, $cronExpression, $timeInterval, $exists)
548  {
549  $currentTime = $this->dateTime->gmtTimestamp();
550  $timeAhead = $currentTime + $timeInterval;
551  for ($time = $currentTime; $time < $timeAhead; $time += self::SECONDS_IN_MINUTE) {
552  $scheduledAt = strftime('%Y-%m-%d %H:%M:00', $time);
553  $alreadyScheduled = !empty($exists[$jobCode . '/' . $scheduledAt]);
554  $schedule = $this->createSchedule($jobCode, $cronExpression, $time);
555  $valid = $schedule->trySchedule();
556  if (!$valid) {
557  if ($alreadyScheduled) {
558  if (!isset($this->invalid[$jobCode])) {
559  $this->invalid[$jobCode] = [];
560  }
561  $this->invalid[$jobCode][] = $scheduledAt;
562  }
563  continue;
564  }
565  if (!$alreadyScheduled) {
566  // time matches cron expression
567  $schedule->save();
568  }
569  }
570  }
571 
580  protected function createSchedule($jobCode, $cronExpression, $time)
581  {
582  $schedule = $this->_scheduleFactory->create()
583  ->setCronExpr($cronExpression)
584  ->setJobCode($jobCode)
585  ->setStatus(Schedule::STATUS_PENDING)
586  ->setCreatedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))
587  ->setScheduledAt(strftime('%Y-%m-%d %H:%M', $time));
588 
589  return $schedule;
590  }
591 
598  protected function getScheduleTimeInterval($groupId)
599  {
600  $scheduleAheadFor = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_SCHEDULE_AHEAD_FOR);
601  $scheduleAheadFor = $scheduleAheadFor * self::SECONDS_IN_MINUTE;
602 
603  return $scheduleAheadFor;
604  }
605 
614  private function cleanupDisabledJobs($groupId)
615  {
616  $jobs = $this->_config->getJobs();
617  $jobsToCleanup = [];
618  foreach ($jobs[$groupId] as $jobCode => $jobConfig) {
619  if (!$this->getCronExpression($jobConfig)) {
621  $jobsToCleanup[] = $jobCode;
622  }
623  }
624 
625  if (count($jobsToCleanup) > 0) {
626  $scheduleResource = $this->_scheduleFactory->create()->getResource();
627  $count = $scheduleResource->getConnection()->delete(
628  $scheduleResource->getMainTable(),
629  [
630  'status = ?' => Schedule::STATUS_PENDING,
631  'job_code in (?)' => $jobsToCleanup,
632  ]
633  );
634 
635  $this->logger->info(sprintf('%d cron jobs were cleaned', $count));
636  }
637  }
638 
645  private function getCronExpression($jobConfig)
646  {
647  $cronExpression = null;
648  if (isset($jobConfig['config_path'])) {
649  $cronExpression = $this->getConfigSchedule($jobConfig) ?: null;
650  }
651 
652  if (!$cronExpression) {
653  if (isset($jobConfig['schedule'])) {
654  $cronExpression = $jobConfig['schedule'];
655  }
656  }
657  return $cronExpression;
658  }
659 
667  private function cleanupScheduleMismatches()
668  {
670  $scheduleResource = $this->_scheduleFactory->create()->getResource();
671  foreach ($this->invalid as $jobCode => $scheduledAtList) {
672  $scheduleResource->getConnection()->delete($scheduleResource->getMainTable(), [
673  'status = ?' => Schedule::STATUS_PENDING,
674  'job_code = ?' => $jobCode,
675  'scheduled_at in (?)' => $scheduledAtList,
676  ]);
677  }
678  return $this;
679  }
680 
688  private function getCronGroupConfigurationValue($groupId, $path)
689  {
690  return $this->_scopeConfig->getValue(
691  'system/cron/' . $groupId . '/' . $path,
692  \Magento\Store\Model\ScopeInterface::SCOPE_STORE
693  );
694  }
695 
702  private function isGroupInFilter($groupId): bool
703  {
704  return !($this->_request->getParam('group') !== null
705  && trim($this->_request->getParam('group'), "'") !== $groupId);
706  }
707 
715  private function processPendingJobs($groupId, $jobsRoot, $currentTime)
716  {
717  $procesedJobs = [];
718  $pendingJobs = $this->getPendingSchedules($groupId);
720  foreach ($pendingJobs as $schedule) {
721  if (isset($procesedJobs[$schedule->getJobCode()])) {
722  // process only on job per run
723  continue;
724  }
725  $jobConfig = isset($jobsRoot[$schedule->getJobCode()]) ? $jobsRoot[$schedule->getJobCode()] : null;
726  if (!$jobConfig) {
727  continue;
728  }
729 
730  $scheduledTime = strtotime($schedule->getScheduledAt());
731  if ($scheduledTime > $currentTime) {
732  continue;
733  }
734 
735  try {
736  if ($schedule->tryLockJob()) {
737  $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
738  }
739  } catch (\Exception $e) {
740  $this->processError($schedule, $e);
741  }
742  if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
743  $procesedJobs[$schedule->getJobCode()] = true;
744  }
745  $schedule->save();
746  }
747  }
748 
756  private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
757  {
758  $schedule->setMessages($exception->getMessage());
759  if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
760  $this->logger->critical($exception);
761  }
762  if ($schedule->getStatus() === Schedule::STATUS_MISSED
763  && $this->state->getMode() === State::MODE_DEVELOPER
764  ) {
765  $this->logger->info($schedule->getMessages());
766  }
767  }
768 }
execute(\Magento\Framework\Event\Observer $observer)
$objectManager
Definition: bootstrap.php:17
$config
Definition: fraud_order.php:17
$count
Definition: recent.phtml:13
if(!file_exists($installConfigFile)) if(!defined('TESTS_INSTALLATION_DB_CONFIG_FILE')) $shell
Definition: bootstrap.php:46
$logger
saveSchedule($jobCode, $cronExpression, $timeInterval, $exists)
__construct(\Magento\Framework\ObjectManagerInterface $objectManager, \Magento\Cron\Model\ScheduleFactory $scheduleFactory, \Magento\Framework\App\CacheInterface $cache, \Magento\Cron\Model\ConfigInterface $config, \Magento\Framework\App\Config\ScopeConfigInterface $scopeConfig, \Magento\Framework\App\Console\Request $request, \Magento\Framework\ShellInterface $shell, \Magento\Framework\Stdlib\DateTime\DateTime $dateTime, \Magento\Framework\Process\PhpExecutableFinderFactory $phpExecutableFinderFactory, \Psr\Log\LoggerInterface $logger, \Magento\Framework\App\State $state, StatFactory $statFactory, \Magento\Framework\Lock\LockManagerInterface $lockManager)
$status
Definition: order_status.php:8
const BP
Definition: autoload.php:14
$connection
Definition: bulk.php:13
_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)