上一篇文章里设计的队列表, 在使用了一段时间后, 发现了以下不足:
handle 字段限制了事件执行者 (subscriber) 的数量
系统中的某个事件或者消息, 往往对应着多个接收者, 直接将处理程序写在 Handle 字段上, 限制了事件(消息)接收者的数量.
改进:
移除队列表中的 handle 字段, 将事件接收者(类)写到配置文件中:
队列表结构变更为:
1 2 3 4 5 6 7 8 9 | id: 事件id, 表的主键 code: 事件的 code, 同类型事件都是一样的 event_data: 一个json字符串, 包含处理事件事所需的数据 status: 事件的状态, 共有4种状态: pending, running, finished, error message: 事件处理中所产生的消息 scheduled_at: 计划处理该事件的时间 created_at: 该事件生成的时间 started_at: 处理开始时间 finished_at: 处理结束时间 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | // config.xml <event> <handle> <order_invoice> <event_data_initialization> <handle>OrderInvoiceInitHandle</handle> <sort_order>0</sort_order> </event_data_initialization> <reward_points_balance_update> <handle>IncreaseRewardPointsBalanceHandle</handle> <sort_order>10</sort_order> </reward_points_balance_update> <customer_upgrade_group> <handle>CustomerGroupUpdateHandle</handle> <sort_order>20</sort_order> </customer_upgrade_group> <reward_points_update_notification> <handle>SendRewardPointsNotificationHandle</handle> <sort_order>100</sort_order> </reward_points_update_notification> </order_invoice> </handle> </event> |
修改 EventDispatcher class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | class EventDispatcher { const XML_PATH_EVENT_HANDLES_PREFIX = 'event/handle/'; protected $logger; protected $objectManager; protected $scopeConfig; public function __construct( LoggerInterface $logger, ScopeConfigInterface $scopeConfig, ObjectManagerInterface $objectManager ) { $this->logger = $logger; $this->objectManager = $objectManager; $this->scopeConfig = $scopeConfig; } public function execute() { try { /** @var EventQueue $eventModel */ $eventModel = $this->objectManager->create(EventQueue::class); $conn = $eventModel->getResource()->getConnection(); $select = $conn->select(); $select->reset() ->from($conn->getTableName('oneday_event_queue'), ['id']) ->where('status = ?', EventQueue::STATUS_PENDING) ->where('scheduled_at < ?', date('Y-m-d H:i:s', time() + 60)); $stmt = $select->query(); /** @var EventQueue $event */ while ($row = $stmt->fetch()) { try { $event = clone $eventModel; $event->load($row['id']); if ($event->getStatus() !== EventQueue::STATUS_PENDING) { continue; } // 设置 Event 为 running 状态 $event->setStatus(EventQueue::STATUS_RUNNING) ->setStartedAt(date('Y-m-d H:i:s')); $event->save(); $handles = $this->getHandlesByCode($event->getCode()); $messages = []; /** @var DataObject $eventData */ $eventData = $this->objectManager->create(DataObject::class); $eventData->addData(json_decode($event->getEventData(), true)); foreach ($handles as $handle) { // 开始处理 Event try { /** @var EventExecutorInterface $handleInstance */ $handleInstance = $this->objectManager->create($handle); if ($handleInstance instanceof EventExecutorInterface) { $handleInstance->execute($eventData); } else { $messages[] = 'Event Handle('. $handle. ') have not implements '. EventExecutorInterface::class. 'Skipped this event handle'; } } catch (\Exception $e) { $messages[] = 'Event execute exception: '. $e->getMessage(); } } // Event 处理完成, 设置为 finished 状态 $event->setFinishedAt(time()) ->setStatus(EventQueue::STATUS_FINISHED) ->setMessage(implode("\n", $messages)); $event->save(); } catch (\Exception $e) { // Event 处理失败, 设置为 error 状态 // todo 改进处理失败方式 $message = ''; if (isset($messages) && count($messages)) { $message = implode("\n", $messages); } $event->setMessage($message. "\n". $e->getMessage()) ->setStatus(EventQueue::STATUS_ERROR); $event->save(); $this->logger->critical(sprintf('processing event#%s failed, %s', $event->getId(), $e->getMessage())); } } } catch (\Exception $e) { $this->logger->critical('Event process error: '. $e->getMessage()); } } /** * 根据 code 获取配置的 event handle * @param $code * @return array */ protected function getHandlesByCode($code) { $configData = $this->scopeConfig->getValue(self::XML_PATH_EVENT_HANDLES_PREFIX. $code); $handles = []; if (is_array($configData)) { $_handles = []; foreach ($configData as $item) { if (isset($item['handle']) && $item['handle']) { $sortOrder = isset($item['sort_order']) ? (int) $item['sort_order'] : 1; $handle = $item['handle']; if (isset($_handles[$sortOrder])) { $_handles[$sortOrder][] = $handle; } else { $_handles[$sortOrder] = [$handle]; } } } ksort($_handles); foreach ($_handles as $items) { $handles = array_merge($handles, $items); } } return $handles; } } |
这样修改后, 同一个事件(消息)就可以对应多个事件处理程序(subscriber)了.