MQTT client and broker

I've implemented MQTT client and broker on Sagittarius. Now feeling like the broker implementation is compliant the specification (as far as I can see, there is probably a bug(s) though), so let me introduce a bit. APIs would be changed in 0.5.9 and probably I wouldn't write document until I think it can be fixed (nearest would be after 0.5.10).

If you don't need anything, even authentication, then the broker can be written like this;
(import (rnrs) (net mq mqtt broker))

(define broker (make-mqtt-broker "5000"))

(mqtt-broker-start! broker)
With this, broker runs on port 5000. When broker is ready then next step is client.

The basic functions for client are subscribing and publishing. Subscribing would be like this;
(import (rnrs) (net mq mqtt client))

(let ((conn (open-mqtt-connection "localhost" "5000")))
  (mqtt-subscribe conn "topic" +qos-exactly-once+
                  (lambda (topic payload)
                    (get-bytevector-all payload)))
  (let loop ()
    (let ((r (mqtt-receive-message conn)))
      (display r) (newline)
      (unless (eof-object? r)
  (mqtt-unsubscribe conn "topic")
  (close-mqtt-connection! conn))

Subscribe procedure, currently, takes 4 arguments, MQTT connection, topic filter, QoS level and callback procedure. The callback procedure takes 2 arguments, topic name and payload. Payload is a binary input port. For now, we don't provide daemon thread for callback so users need to explicitly receive messages.

Publishing messages would be like this;
(import (rnrs) (net mq mqtt client))

(let ((conn (open-mqtt-connection "localhost" "5000")))
  (mqtt-publish conn "topic" (string->utf8 "Hello MQTT")
  :qos +qos-at-least-once+)
  (mqtt-publish conn "topic" #vu8())
  (close-mqtt-connection! conn))
Publish procedure, currently, requires 3 arguments and also can take some keyword arguments to specify how to publish such as QoS and retain. The application message must be a bytevector so that MQTT requires it to be binary data. Publishing empty bytevector would send empty payload.

Followings are some of design rationale (please add 'currently' before read).

[Socket connection]
Broker creates a thread per connection instead of dispatching with select (this is sort of limitation of underlying (net server) library). By default, max connection number is 10. If this is 2 then you can do private conversation and if it's 1 then you can be alone...

[Session control]
Managing session is done by one daemon thread which is created when broker is created. Default interval period it 10 second. So even if client keep-alive is 5 seconds and it idled for 6 seconds then send something, it can still be treated as a live session. Session could have had own timer however I don't have any lightweight timer implementation other then using thread and making thread is sort of expensive on Sagittarius. So I've decided to manage it by one thread.

[Client packet control]
Even though client needs to receive message explicitly however there is an exception. That is when server published a message to client and right after that client send control packet like subscribe. In that case client first consume the published message handling with given callback then sends control packet.

[QoS control for exactly once]
Broker publishes received message after PUBCOMP is sent. MQTT spec says it can initiate delivering after receiving PUBLISH.

When client subscribes a topic and publishes a message to the same topic, then it would receive own message. Not sure if this is correct behaviour...

Pointing a bug/posting an opinion would be grateful!

No comments:

Post a Comment