If you don't need anything, even authentication, then the broker can be written like this;
(import (rnrs) (net mq mqtt broker)) (define broker (make-mqtt-broker "5000")) (mqtt-broker-start! broker)With this, broker runs on port 5000. When broker is ready then next step is client.
The basic functions for client are subscribing and publishing. Subscribing would be like this;
(import (rnrs) (net mq mqtt client)) (let ((conn (open-mqtt-connection "localhost" "5000"))) (mqtt-subscribe conn "topic" +qos-exactly-once+ (lambda (topic payload) (get-bytevector-all payload))) (let loop () (let ((r (mqtt-receive-message conn))) (display r) (newline) (unless (eof-object? r) (loop)))) (mqtt-unsubscribe conn "topic") (close-mqtt-connection! conn))Subscribe procedure, currently, takes 4 arguments, MQTT connection, topic filter, QoS level and callback procedure. The callback procedure takes 2 arguments, topic name and payload. Payload is a binary input port. For now, we don't provide daemon thread for callback so users need to explicitly receive messages.
Publishing messages would be like this;
(import (rnrs) (net mq mqtt client)) (let ((conn (open-mqtt-connection "localhost" "5000"))) (mqtt-publish conn "topic" (string->utf8 "Hello MQTT") :qos +qos-at-least-once+) (mqtt-publish conn "topic" #vu8()) (close-mqtt-connection! conn))Publish procedure, currently, requires 3 arguments and also can take some keyword arguments to specify how to publish such as QoS and retain. The application message must be a bytevector so that MQTT requires it to be binary data. Publishing empty bytevector would send empty payload.
Followings are some of design rationale (please add 'currently' before read).
[Socket connection]
Broker creates a thread per connection instead of dispatching with
select
(this is sort of limitation of underlying (net server)
library). By default, max connection number is 10. If this is 2 then you can do private conversation and if it's 1 then you can be alone...[Session control]
Managing session is done by one daemon thread which is created when broker is created. Default interval period it 10 second. So even if client keep-alive is 5 seconds and it idled for 6 seconds then send something, it can still be treated as a live session. Session could have had own timer however I don't have any lightweight timer implementation other then using thread and making thread is sort of expensive on Sagittarius. So I've decided to manage it by one thread.
[Client packet control]
Even though client needs to receive message explicitly however there is an exception. That is when server published a message to client and right after that client send control packet like subscribe. In that case client first consume the published message handling with given callback then sends control packet.
[QoS control for exactly once]
Broker publishes received message after PUBCOMP is sent. MQTT spec says it can initiate delivering after receiving PUBLISH.
[Miscellaneous]
When client subscribes a topic and publishes a message to the same topic, then it would receive own message. Not sure if this is correct behaviour...
Pointing a bug/posting an opinion would be grateful!
No comments:
Post a Comment