Skip to content

Python async

As a developer you may be interested in developing custom Cloud Connect integrations such as agents, workers or monitors using Python.

This document steps though some tips and tricks the Upswell team has learned when developing async integrations for MQTT via Twisted and asyncio.

Async Compatible MQTT Clients

During the development of Liminal (Twisted) and the Experience Agent a number of async MQTT clients were evaluated. Many open-source clients are supported by a single individual and many did not perform as documented or were missing features.

None of the existing Python async clients were determined to be complete, or reliable enough for integration into the primary Atlas software services.

Syncronous Paho MQTT

A number of Cloud Connect syncronous integrations have been developed for Cloud Connect/Experiecnce Manager using paho-mqtt. As a reliable and foundation supported open-source library, Paho was the ideal choice.

Paho is a syncronous library and in starting the library with loop_forever(), Paho spawns is own event loop taking over the entire process/thread. While this works to take action on incoming messages, it does not provide a mechanism for scheduling tasks and sending messages outside of this loop.

Working within the syncronous parameters of is challenging without implementing threading, or multiprocessing which bring their own development challenges.

Asyncronous(ish) Paho MQTT

To work around this limitaion, an asyncronous(ish) approach has been developed for async applications. This is compatible with asyncio, Twisted or any approach where you plug into an existing event loop, or spawn and manage the loop within your application.

Paho exposes a loop() method which is intended to be called at a regular inteval checking the TCP socket buffer for data to be read and pushing queued data to the socket to be sent.

While not truly async, when plugged into an async event loop, other work may be performed within the process in-between loop calls to Paho allowing for other work to be performed within the process without spawning additional sub-processes or threads.

It is recommended in-between reading from the client to sleep the async process for at-least 0.1 seconds.

WARNING

This minimal example is intended as demonstrate the basic contept of making Paho aync(ish). It is not intended to be a functional code sample for use in your application.

python
import asyncio
import json
import ssl
import paho.mqtt.client as mqtt

client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id)
client.username_pw_set(username, password)
client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT)

def on_message(self, client, userdata, msg):
    try:
        payload: dict = json.loads(msg.payload)
    except json.JSONDecodeError as e:
        log.warning(f'MQTT :: On Message Error => Cannot process message, JSONDecodeError: {e}')
        return

async run():
    while True:
        client.loop()
        await asyncio.sleep(0.1)

client.on_message = self.on_message
client.connect(host, port, 60)

asyncio.create_task(run())

Atlas, Hybrid cloud, on-premesis platform for large scale media program development, delivery and operation.