| описан в core.transport
Транспорт (связность между различными модулями сервера и внешними устройствами (периферийными устройствами или клиентами), управляемыми через MQTT) реализован через набор очередей в RabbitMQ и воркеров, которые стартуют в отдельных docker-контейнерах и являются паблишерами\подписчиками этих очередей.
Роутинг между очередями описан внутри кода воркеров.
Документация по роутингу находится здесь
Базовая сущность - сообщения внутри AMQP.
Пересылаются как пара message, body
message.delivery_info
- содержит данные адресацииmessage.headers
- содержит опциональные мета-данные сообщенияbody
содержит полезную нагрузкуСообщения и Пакеты описанные ниже - являются частными случаями Сообщений.
| описаны в core.transport.packets
Используются для передачи информации от и к периферийным устройствам.
Конвертируются в MQTT автоматически в exchange mqtt
.
Передаваемый Пакет не может содержать заголовков - только
routing_key
(он жеdelivery_info
) и полезную нагрузкуbody
.
Любой Пакет терминируется на соответствующем обработчике очереди:
Обратная отправка Пакета осуществляется через метод core.transport.publisher.send_mqtt
| базовое событие описано в core.transport.events
Описывают внутренние серверные взаимодействия, реакции и изменения состояния.
Отправка События осуществляется через метод core.transport.publisher.send_event
| описаны в core.scheduler
модуль tasks
- описывает рутинные задачи
модуль service
- описывает сбор и запуск тасок
Scheduler запускается в одноименном контейнере, представляет собой один async loop. Синхронные задачи выполняются через sync_to_async
.
| описано в core.transport.config
Происходит при старте контейнеров воркеров через management команду Django (описано в core.management.commands
)
В этом процессе создается очередь с определенным именем, привязывается к существующему exchange в RabbitMQ, и на очередь назначается обработчик.
Все обработчики очередей описаны в core.worker_queue_handlers
| описан в core.worker_queue_handlers.ask_mqtt_handler
Обрабатывает входящие MQTT-пакеты, маркированные ask. в начале топика. RabbitMQ настроен таким образом, чтобы фильтровать такие события в отдельный exchange + очередь, где они обрабатываются воркером. MQTT пакет распаковывается, роутинг-ключ приводится к внутреннему формату (заменяются /
на .
).
Исходящие пакеты отправляются в очередь internal
| описан в core.worker_queue_handlers.state_update
Сохраняет конфигурацию от клиентов в БД. Simple-устройства игнорируются.
| описан в core.worker_queue_handlers.client_update
Отправляет конфигурации из БД клиентам, включая simple-устройства.
Обрабатывает события, связанные с изменениями состояния конфигурации клиентов или состоянием сервера. Многие серверные операции (изменение уровня тревоги) - создают собственные события в этой очереди.
В этом же обработчике применяются дополнительные правила, путем дописывания инструкций в метод handle_event
(см. Сообщения)
Роутинг выполняется воркером internal_worker
.
!! Важно не путать метки очередей с метками пакетов
Метки пакетов (core.transport.packets.SkabenPacketTypes
) используются для определения типа данных, пришедших от внешних устройств и назначения топиков при отправке данных обратно в MQTT.
Когда данные из MQTT декодируются и попадают в internal обработчик - метка пакета отрывается, и в соответствии с правилами обработчика назначается метка очереди.
Метки очередей (core.transport.config.SkabenQueue
) используются для внутреннего роутинга.