I've implemented
MQTT client and broker on Sagittarius. Now feeling like the broker implementation is compliant the specification (as far as I can see, there is probably a bug(s) though), so let me introduce a bit. APIs would be changed in 0.5.9 and probably I wouldn't write document until I think it can be fixed (nearest would be after 0.5.10).
If you don't need anything, even authentication, then the broker can be written like this;
(import (rnrs) (net mq mqtt broker))
(define broker (make-mqtt-broker "5000"))
(mqtt-broker-start! broker)
With this, broker runs on port 5000. When broker is ready then next step is client.
The basic functions for client are subscribing and publishing. Subscribing would be like this;
(import (rnrs) (net mq mqtt client))
(let ((conn (open-mqtt-connection "localhost" "5000")))
(mqtt-subscribe conn "topic" +qos-exactly-once+
(lambda (topic payload)
(get-bytevector-all payload)))
(let loop ()
(let ((r (mqtt-receive-message conn)))
(display r) (newline)
(unless (eof-object? r)
(loop))))
(mqtt-unsubscribe conn "topic")
(close-mqtt-connection! conn))
Subscribe procedure, currently, takes 4 arguments, MQTT connection, topic filter, QoS level and callback procedure. The callback procedure takes 2 arguments, topic name and payload. Payload is a binary input port. For now, we don't provide daemon thread for callback so users need to explicitly receive messages.
Publishing messages would be like this;
(import (rnrs) (net mq mqtt client))
(let ((conn (open-mqtt-connection "localhost" "5000")))
(mqtt-publish conn "topic" (string->utf8 "Hello MQTT")
:qos +qos-at-least-once+)
(mqtt-publish conn "topic" #vu8())
(close-mqtt-connection! conn))
Publish procedure, currently, requires 3 arguments and also can take some keyword arguments to specify how to publish such as QoS and retain. The application message must be a bytevector so that MQTT requires it to be binary data. Publishing empty bytevector would send empty payload.
Followings are some of design rationale (please add 'currently' before read).
[Socket connection]
Broker creates a thread per connection instead of dispatching with
select
(this is sort of limitation of underlying
(net server)
library). By default, max connection number is 10. If this is 2 then you can do private conversation and if it's 1 then you can be alone...
[Session control]
Managing session is done by one daemon thread which is created when broker is created. Default interval period it 10 second. So even if client keep-alive is 5 seconds and it idled for 6 seconds then send something, it can still be treated as a live session. Session could have had own timer however I don't have any lightweight timer implementation other then using thread and making thread is sort of expensive on Sagittarius. So I've decided to manage it by one thread.
[Client packet control]
Even though client needs to receive message explicitly however there is an exception. That is when server published a message to client and right after that client send control packet like subscribe. In that case client first consume the published message handling with given callback then sends control packet.
[QoS control for exactly once]
Broker publishes received message after PUBCOMP is sent. MQTT spec says it can initiate delivering after receiving PUBLISH.
[Miscellaneous]
When client subscribes a topic and publishes a message to the same topic, then it would receive own message. Not sure if this is correct behaviour...
Pointing a bug/posting an opinion would be grateful!