Commit f0cc58bc authored by alexandre burton's avatar alexandre burton
Browse files

Will, attributes, auto-connect

parent ccfbedd0
......@@ -2,8 +2,6 @@
**MQTT client based on libmosquitto**
In Memoriam W protocol
This object provides a Max interface to a treaded instance of the [libmosquitto](http://mosquitto.org/man/libmosquitto-3.html) client.
v20210101 was developped, tested and signed on macOS 10.14.6 with a mosquitto broker running on debian in a well-connected north american data center. It covers the features our application required; more work to follow (see below).
......@@ -15,10 +13,9 @@ It bundles a static libmosquitto binary (TLS stuff was raising errors so it was
Max interface:
- ~~QOS, persistence~~
- will
- ~~MQTT will~~
- ~~complete attribute support~~
- re-connect on attribute change when required
- complete max type support on publish (how to allow arbitrary length vs QOS/persistence?)
- ~~re-connect on attribute change when required~~
- ~~auto-connection on object creation if attributes are set~~
- "enable" toggle (to "mute" traffic at the max level)
......@@ -33,7 +30,7 @@ Other features:
- max package; better examples/docs
- scan payloads for binary or structured data (jitter matrices, msp vectors, json dicts)
- ... otherwise preprocess as a list (string, float, int) to spare large gensym()s and reduce pressure on [fromsymbol]
- "setup" of higher-level strategy (topic structure for parameters)
- "setup" of higher-level strategy (topic metastructure for "smart" parameters)
- optimize lifecycle of the mosquitto instance? (re-use vs re-create)
Windows version:
......
......@@ -16,37 +16,74 @@ typedef struct _mqtt {
int a_state; // MQTT connection state
long a_port; // MQTT Broker port
char a_clean; // Clean session
char a_verbose; // verbose chatter in Max windows
char a_autoconnect; // auto-connect on object instanciation
long a_port; // MQTT Broker port
char a_clean; // Clean session
char a_verbose; // verbose chatter in Max windows
char a_autoconnect; // auto-connect on object instanciation
t_symbol *a_user; // MQTT username
t_symbol *a_subscribe_topic;// MQTT topic auto-subscription
t_symbol *a_host; // MQTT host
t_symbol *a_password;; // MQTT password
t_symbol *a_id; // Client ID
t_symbol *a_will_topic; // Client ID
char a_will_payload[1000]; // Will payload
long a_will_qos; // MQTT Broker port
long a_will_retain; // MQTT Broker port
long a_num_will;
char payload[1000]; // reusable memory for received payload
t_atom *a_will;
char payload[1000]; // reusable memory for received payload
} t_mqtt;
#define MQTT_DEFAULT_PORT 1883
static long MOSQUITTO_CLIENT_ID = 0;
// methods
void mqtt_assist_method(t_mqtt *x, void *b, long m, long a, char *s);
void mqtt_subscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_unsubscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
void mqtt_will_set_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
void mqtt_will_clear_method(t_mqtt *x);
void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
void mqtt_loop_method(t_mqtt *x);
void actually_connect(t_mqtt *x);
// attribute setters
void mqtt_set_clean(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_user(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_port(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_password(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_host(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_will(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_autoconnect(t_mqtt *x, void *attr, long argc, t_atom *argv);
// lifecycle
void *mqtt_new(t_symbol *s, long argc, t_atom *argv);
void mqtt_free(t_mqtt *x);
void mosquitto_free_client(t_mqtt *x);
// download & compile static mosquitto lib (did not fight TLS/https; disabled in cmake)
// /usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none/usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none /Users/brtn/Downloads/max-sdk-8.0.3/externals/mqtt.mxo/Contents/Frameworks/libmosquitto_static.a
// add to max project
void mqtt_change_state(t_mqtt *x, int state) {
switch (state) {
case 1:
if (x->a_verbose) object_post((t_object *)x,"<%s> connection up",x->a_id->s_name);
x->a_state =1;
break;
default:
if (x->a_verbose) object_warn((t_object *)x,"<%s> connecction down",x->a_id->s_name);
x->a_state =0;
if (state != x->a_state) {
switch (state) {
case 1:
if (x->a_verbose) object_post((t_object *)x,"<%s> connection up",x->a_id->s_name);
if (strlen(x->a_subscribe_topic->s_name)) mqtt_subscribe_method(x, x->a_subscribe_topic);
break;
default:
if (x->a_verbose) object_warn((t_object *)x,"<%s> connecction down",x->a_id->s_name);
}
x->a_state = state;
outlet_int(x->p_status_outlet, x->a_state);
}
outlet_int(x->p_status_outlet, x->a_state);
}
void mosquitto_publish_callback(struct mosquitto *mosq, void *obj, int mid)
......@@ -90,29 +127,6 @@ void mosquitto_message_callback(struct mosquitto *mosq, void *obj, const struct
outlet_anything(x->p_main_outlet, gensym(message->topic), 1,a);
}
// methods
void mqtt_assist_method(t_mqtt *x, void *b, long m, long a, char *s);
void mqtt_subscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_unsubscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
void mqtt_loop_method(t_mqtt *x);
// attribute setters
void mqtt_set_clean(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_user(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_port(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_password(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_host(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_autoconnect(t_mqtt *x, void *attr, long argc, t_atom *argv);
// lifecycle
void *mqtt_new(t_symbol *s, long argc, t_atom *argv);
void mqtt_free(t_mqtt *x);
void mosquitto_free_client(t_mqtt *x);
void actually_connect(t_mqtt *x);
t_class *mqtt_class;
......@@ -122,13 +136,15 @@ void ext_main(void *r)
c = class_new("mqtt", (method)mqtt_new, (method)mqtt_free, sizeof(t_mqtt), 0L, A_GIMME, 0);
// class_addmethod(c, (method)mqtt_loop_method, "bang", 0);
// class_addmethod(c, (method)mqtt_loop_method, "bang", 0);
class_addmethod(c, (method)mqtt_assist_method, "assist", A_CANT, 0);
class_addmethod(c, (method)mqtt_subscribe_method, "subscribe", A_DEFSYM, 0);
class_addmethod(c, (method)mqtt_unsubscribe_method, "unsubscribe",A_DEFSYM, 0);
class_addmethod(c, (method)mqtt_publish_method, "publish", A_GIMME, 0);
class_addmethod(c, (method)mqtt_connect_method, "connect", A_GIMME, 0);
class_addmethod(c, (method)mqtt_will_set_method, "will_set", A_GIMME, 0);
class_addmethod(c, (method)mqtt_will_clear_method, "will_clear", 0);
CLASS_ATTR_SYM(c, "id", 0, t_mqtt, a_id);
CLASS_ATTR_ACCESSORS(c, "id", NULL, mqtt_set_id);
......@@ -146,9 +162,14 @@ void ext_main(void *r)
CLASS_ATTR_SYM(c, "user", 0, t_mqtt, a_user);
CLASS_ATTR_ACCESSORS(c, "user", NULL, mqtt_set_user);
CLASS_ATTR_SYM(c, "autosubscribe",0, t_mqtt, a_subscribe_topic);
CLASS_ATTR_SYM(c, "password", 0, t_mqtt, a_password);
CLASS_ATTR_ACCESSORS(c, "password", NULL, mqtt_set_password);
CLASS_ATTR_ATOM(c, "will", 0, t_mqtt, a_will);
CLASS_ATTR_ACCESSORS(c, "will", NULL, mqtt_set_will);
CLASS_ATTR_SYM(c, "host", 0, t_mqtt, a_host);
CLASS_ATTR_ACCESSORS(c, "host", NULL, mqtt_set_host);
......@@ -174,67 +195,90 @@ void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv)
{
if (atom_getsym(argv)->s_name != x->a_id->s_name) {
x->a_id = atom_getsym(argv);
if (x->a_state == 1) {
post("TO IMPLEMENT: should reconnect with id %s", x->a_id->s_name);
}
if (x->a_state == 1) actually_connect(x);
}
}
void mqtt_set_user(t_mqtt *x, void *attr, long argc, t_atom *argv) {
if (atom_getsym(argv)->s_name != x->a_user->s_name) {
x->a_user = atom_getsym(argv);
if (x->a_state == 1) {
post("TO IMPLEMENT: should reconnect with user %s", x->a_user->s_name);
}
if (x->a_state == 1) actually_connect(x);
}
}
void mqtt_set_port(t_mqtt *x, void *attr, long argc, t_atom *argv) {
if (atom_getlong(argv) != x->a_port) {
x->a_port = atom_getlong(argv);
if (x->a_state == 1) {
post("TO IMPLEMENT: should reconnect with port %i", x->a_port);
}
if (x->a_state == 1) actually_connect(x);
}
}
void mqtt_set_password(t_mqtt *x, void *attr, long argc, t_atom *argv) {
if (atom_getsym(argv)->s_name != x->a_password->s_name) {
x->a_password = atom_getsym(argv);
if (x->a_state == 1) {
post("TO IMPLEMENT: should reconnect with passwoord %s", x->a_password->s_name);
if (x->a_state == 1) actually_connect(x);
}
}
void mqtt_set_will(t_mqtt *x, void *attr, long argc, t_atom *argv) {
bool should_reconnect = false;
if (argc > 1) {
if (argv[0].a_type==A_SYM) {
should_reconnect = true;
x->a_will_topic = argv[0].a_w.w_sym;
if (argv[1].a_type == A_SYM) {
sprintf(x->a_will_payload, "%s",argv[1].a_w.w_sym->s_name);
} else if (argv[1].a_type==A_LONG) {
sprintf(x->a_will_payload, "%lli", argv[1].a_w.w_long);
} else if (argv[1].a_type==A_FLOAT) {
sprintf(x->a_will_payload, "%f", argv[1].a_w.w_float);
}
if (argc > 2) {
x->a_will_qos = argv[2].a_w.w_long;
if (argc == 4) {
x->a_will_retain = argv[3].a_w.w_long;
}
}
}
else {
object_error((t_object *)x,"<%s> will topic (arg#0) must be string", x->a_id->s_name);
}
} else {
object_error((t_object *)x,"<%s> will needs 2, 3 or 4 arguments", x->a_id->s_name);
}
if (x->a_state == 1 && should_reconnect) actually_connect(x);
}
void mqtt_set_host(t_mqtt *x, void *attr, long argc, t_atom *argv) {
if (atom_getsym(argv)->s_name != x->a_host->s_name) {
x->a_host = atom_getsym(argv);
if (x->a_state == 1) {
post("TO IMPLEMENT: should reconnect with host %s", x->a_host->s_name);
}
if (x->a_state == 1) actually_connect(x);
}
}
void mqtt_set_autoconnect(t_mqtt *x, void *attr, long argc, t_atom *argv) {
x->a_autoconnect = atom_getlong(argv);
if (x->a_state == 0 && x->a_autoconnect == 1) {
post("TO IMPLEMENT: should connect when autoconnect is turned on",0);
}
if (x->a_state == 0 && x->a_autoconnect == 1) actually_connect(x);
}
void mqtt_set_clean(t_mqtt *x, void *attr, long argc, t_atom *argv) {
if (atom_getlong(argv) != x->a_clean) {
x->a_clean = atom_getlong(argv);
if (x->a_state == 1) {
post("TO IMPLEMENT: should reconnect with cleaniness %i", x->a_clean);
}
if (x->a_state == 1) actually_connect(x);
}
}
void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
if ((argc < 0 && argc > 4) || argc ==3) {
object_error((t_object *)x,"connect requires 0, 1, 2 or 4 arguments: (host (port (username password)))");
object_error((t_object *)x,"connect requires 0, 1, 2 or 4 arguments: (host (port (username password)))");
} else {
if (argv>0) {
......@@ -271,7 +315,7 @@ void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
}
void actually_connect(t_mqtt *x) {
if (strlen(x->a_host->s_name)) {
if (x->mosq != NULL) {
......@@ -287,6 +331,8 @@ void actually_connect(t_mqtt *x) {
mosquitto_publish_callback_set(x->mosq, mosquitto_publish_callback);
mosquitto_user_data_set(x->mosq, x);
mqtt_change_state(x, 0);
if (strlen(x->a_user->s_name) && strlen(x->a_password->s_name)) {
mosquitto_username_pw_set(x->mosq, x->a_user->s_name, x->a_password->s_name);
if (x->a_verbose) object_post((t_object *)x,"<%s> attempting connection to %s:%s@%s:%i",
......@@ -296,6 +342,12 @@ void actually_connect(t_mqtt *x) {
x->a_id->s_name, x->a_host->s_name, x->a_port);
}
if (strlen(x->a_will_topic->s_name) && strlen(x->a_will_payload)) {
if (x->a_verbose) object_post((t_object *)x,"<%s> setting up a will %s = %s (%i, %i)",
x->a_id->s_name, x->a_will_topic->s_name,x->a_will_payload, x->a_will_qos, x->a_will_retain);
mosquitto_will_set(x->mosq, x->a_will_topic->s_name, strlen(x->a_will_payload), x->a_will_payload, x->a_will_qos, x->a_will_retain);
}
int res= mosquitto_connect(x->mosq, x->a_host->s_name, x->a_port, 60);
......@@ -319,21 +371,17 @@ void actually_connect(t_mqtt *x) {
}
}
//void mqtt_loop_method(t_mqtt * x) {
//int res = mosquitto_loop(x->mosq, -1, 1);
//}
void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
t_symbol topic;
long qos = 0;
bool retain = false;
if (argc < 2 || argc > 4) {
object_error((t_object *)x,"publish requires 2, 3 or 4 arguments: topic payload (qos (retain))");
object_error((t_object *)x,"publish requires 2, 3 or 4 arguments: topic payload (qos (retain))");
} else {
if (argv[0].a_type==A_SYM) {
t_symbol topic;
long qos = 0;
bool retain = false;
topic = *argv[0].a_w.w_sym;
if (argv[1].a_type==A_LONG) {
......@@ -352,8 +400,8 @@ void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
qos = q;
}
if (argc==4) {
if (argv[4].a_type==A_LONG) {
long q = argv[2].a_w.w_long;
if (argv[3].a_type==A_LONG) {
long q = argv[3].a_w.w_long;
if (q <0) q = 0;
if (q > 1) q = 1;
retain = q;
......@@ -362,20 +410,63 @@ void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
}
int res = mosquitto_publish(x->mosq, NULL, topic.s_name, strlen(x->payload), x->payload, qos, retain);
switch (res) {
case MOSQ_ERR_SUCCESS: /* silent success */
if (x->a_verbose) object_post((t_object *)x,"<%s> published %s with %s",x->a_id->s_name, topic.s_name, x->payload);
case MOSQ_ERR_SUCCESS:
if (x->a_verbose) object_post((t_object *)x,"<%s> published %s = %s",x->a_id->s_name, topic.s_name, x->payload);
break;
default:
object_error((t_object *)x,"<%s> can't publish %s [%s]", x->a_id->s_name, topic, mosquitto_reason_string(res));
object_error((t_object *)x,"<%s> can't publish %s [%s]", x->a_id->s_name, topic, mosquitto_reason_string(res));
}
} else {
object_error((t_object *)x,"publish topic (arg#1) must be string");
object_error((t_object *)x,"<%s> publish topic (arg#1) must be string", x->a_id->s_name);
}
}
}
void mqtt_will_set_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
if (argc < 2 || argc > 4) {
object_error((t_object *)x,"will_set requires 2, 3 or 4 arguments: topic payload (qos (retain))");
} else {
if (argv[0].a_type==A_SYM) {
x->a_will_topic = argv[0].a_w.w_sym;
if (argv[1].a_type==A_LONG) {
sprintf(x->a_will_payload, "%lli", argv[1].a_w.w_long);
} else if (argv[1].a_type==A_FLOAT) {
sprintf(x->a_will_payload, "%f", argv[1].a_w.w_float);
} else if (argv[1].a_type==A_SYM) {
sprintf(x->a_will_payload, "%s", argv[1].a_w.w_sym->s_name);
}
if (argc>2) {
if (argv[2].a_type==A_LONG) {
long q = argv[2].a_w.w_long;
if (q <0) q = 0;
if (q > 2) q = 2;
x->a_will_qos = q;
}
if (argc==4) {
if (argv[3].a_type==A_LONG) {
long q = argv[3].a_w.w_long;
if (q <0) q = 0;
if (q > 1) q = 1;
x->a_will_retain = q;
}
}
}
actually_connect(x);
}
}
}
void mqtt_will_clear_method(t_mqtt *x) {
mosquitto_will_clear(x->mosq);
}
void mqtt_subscribe_method(t_mqtt *x, t_symbol * topic) {
int sres = mosquitto_subscribe(x->mosq, NULL, topic->s_name, 0);
switch (sres) {
case MOSQ_ERR_SUCCESS:
......@@ -393,7 +484,7 @@ void mqtt_unsubscribe_method(t_mqtt *x, t_symbol * topic) {
if (x->a_verbose) object_post((t_object *)x,"<%s> unsusbscribed %s", x->a_id->s_name, topic->s_name);
break;
default:
object_error((t_object *)x,"unsubscribe <%s> failed [%s]]", x->a_id->s_name, mosquitto_reason_string(sres));
object_error((t_object *)x,"unsubscribe <%s> failed [%s]]", x->a_id->s_name, mosquitto_reason_string(sres));
}
}
......@@ -406,9 +497,8 @@ void *mqtt_new(t_symbol *s, long argc, t_atom *argv)
char random_id[100];
hostname[99] = '\0';
gethostname(hostname, 1023);
snprintf(random_id, 99, "%s-%03ld", hostname, MOSQUITTO_CLIENT_ID++);
snprintf(random_id, 99, "%s-%05d", hostname, rand()%99999);
x->a_id = gensym(random_id);
x->a_verbose = 0;
x->a_clean = 0;
......@@ -417,6 +507,12 @@ void *mqtt_new(t_symbol *s, long argc, t_atom *argv)
x->a_host = gensym("");
x->a_user = gensym("");
x->a_password = gensym("");
x->a_subscribe_topic = gensym("");
x->a_will_topic = gensym("");
sprintf(x->a_will_payload,"");
x->a_will_qos = 0;
x->a_will_retain = 1;
attr_args_process(x, argc, argv);
......@@ -454,7 +550,7 @@ void mqtt_assist_method(t_mqtt *x, void *b, long m, long a, char *s) // 4 final
}
void mosquitto_free_client(t_mqtt *x) {
if (x->a_verbose) object_post((t_object *)x,"freeing underlying client <%s>", x->a_id->s_name);
if (x->a_verbose) object_post((t_object *)x,"disconnecting and freeing underlying client <%s>", x->a_id->s_name);
mosquitto_disconnect(x->mosq);
mosquitto_loop_stop(x->mosq, true);
mosquitto_destroy(x->mosq);
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment