Commit ccfbedd0 authored by alexandre burton's avatar alexandre burton
Browse files

Cleanup of post()s, auto-connect, and general formatting

parent 482e277a
......@@ -8,7 +8,7 @@ This object provides a Max interface to a treaded instance of the [libmosquitto]
v20210101 was developped, tested and signed on macOS 10.14.6 with a mosquitto broker running on debian in a well-connected north american data center. It covers the features our application required; more work to follow (see below).
It bundles a static libmosquitto binary (TLS support was giving errors so it was simply disabled through CMake). You can probably swap the .a in the .mxo with your own signed library if you wish. (Not sure how the Xcode project will present itself to someone else vs signing identities).
It bundles a static libmosquitto binary (TLS stuff was raising errors so it was disabled through CMake). You can probably swap the .a in the .mxo with your own signed library if you wish. (Not sure how the Xcode project will present itself to someone else vs signing identities).
## TODO
......@@ -19,7 +19,7 @@ Max interface:
- ~~complete attribute support~~
- re-connect on attribute change when required
- complete max type support on publish (how to allow arbitrary length vs QOS/persistence?)
- auto-connection on object creation if attributes are set
- ~~auto-connection on object creation if attributes are set~~
- "enable" toggle (to "mute" traffic at the max level)
Bundled library:
......
......@@ -39,23 +39,23 @@ static long MOSQUITTO_CLIENT_ID = 0;
void mqtt_change_state(t_mqtt *x, int state) {
switch (state) {
case 1:
if (x->a_verbose) post("[mqtt] <%s> connection up",x->a_id->s_name);
if (x->a_verbose) object_post((t_object *)x,"<%s> connection up",x->a_id->s_name);
x->a_state =1;
break;
default:
if (x->a_verbose) post("[mqtt] <%s> connecction down",x->a_id->s_name);
if (x->a_verbose) object_warn((t_object *)x,"<%s> connecction down",x->a_id->s_name);
x->a_state =0;
}
outlet_int(x->p_status_outlet, x->a_state);
}
void publish_callback(struct mosquitto *mosq, void *obj, int mid)
void mosquitto_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
t_mqtt * x = obj;
if (x->a_verbose) post("[mqtt] <%s> published topic id#%05i",x->a_id->s_name,mid);
if (x->a_verbose) object_post((t_object *)x,"<%s> published topic id#%05i",x->a_id->s_name,mid);
}
void connect_callback(struct mosquitto *mosq, void *obj, int result)
void mosquitto_connect_callback(struct mosquitto *mosq, void *obj, int result)
{
t_mqtt * x = obj;
switch (result) {
......@@ -63,24 +63,24 @@ void connect_callback(struct mosquitto *mosq, void *obj, int result)
mqtt_change_state(x, 1);
break;
default:
error("[mqtt] <%s> Connection Refused, [%s]", x->a_id->s_name, mosquitto_reason_string(result));
object_warn((t_object *)x,"<%s> Connection Refused, [%s]", x->a_id->s_name, mosquitto_reason_string(result));
mqtt_change_state(x, 0);
mosquitto_disconnect(mosq);
}
}
void disconnect_callback(struct mosquitto *mosq, void *obj, int result)
void mosquitto_disconnect_callback(struct mosquitto *mosq, void *obj, int result)
{
t_mqtt * x = obj;
if (result ==0) {
if (x->a_verbose) post("[mqtt] <%s> disconnected as requested",x->a_id->s_name);
if (x->a_verbose) object_post((t_object *)x,"<%s> disconnected as requested",x->a_id->s_name);
} else {
post("[mqtt] <%s> unexpectedly disconnected from broker",x->a_id->s_name);
object_warn((t_object *)x,"<%s> unexpectedly disconnected from broker",x->a_id->s_name);
}
mqtt_change_state(x, 0);
}
void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
void mosquitto_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
t_mqtt * x = obj;
t_atom a[1];
......@@ -91,7 +91,7 @@ void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_
}
// methods
void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s);
void mqtt_assist_method(t_mqtt *x, void *b, long m, long a, char *s);
void mqtt_subscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_unsubscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
......@@ -107,19 +107,12 @@ void mqtt_set_password(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_host(t_mqtt *x, void *attr, long argc, t_atom *argv);
void mqtt_set_autoconnect(t_mqtt *x, void *attr, long argc, t_atom *argv);
// lifecycle
void *mqtt_new(t_symbol *s, long argc, t_atom *argv);
void mqtt_free(t_mqtt *x);
void mosquitto_free_client(t_mqtt *x);
void mosquitto_free_client(t_mqtt *x) {
if (x->a_verbose) post("[mqtt] freeing underlying client <%s>", x->a_id->s_name);
mosquitto_disconnect(x->mosq);
mosquitto_loop_stop(x->mosq, true);
mosquitto_destroy(x->mosq);
}
void mqtt_free(t_mqtt *x)
{
mosquitto_free_client(x);
}
void actually_connect(t_mqtt *x);
t_class *mqtt_class;
......@@ -129,8 +122,8 @@ void ext_main(void *r)
c = class_new("mqtt", (method)mqtt_new, (method)mqtt_free, sizeof(t_mqtt), 0L, A_GIMME, 0);
class_addmethod(c, (method)mqtt_loop_method, "bang", 0);
class_addmethod(c, (method)mqtt_assist, "assist", A_CANT, 0);
// class_addmethod(c, (method)mqtt_loop_method, "bang", 0);
class_addmethod(c, (method)mqtt_assist_method, "assist", A_CANT, 0);
class_addmethod(c, (method)mqtt_subscribe_method, "subscribe", A_DEFSYM, 0);
class_addmethod(c, (method)mqtt_unsubscribe_method, "unsubscribe",A_DEFSYM, 0);
class_addmethod(c, (method)mqtt_publish_method, "publish", A_GIMME, 0);
......@@ -139,7 +132,7 @@ void ext_main(void *r)
CLASS_ATTR_SYM(c, "id", 0, t_mqtt, a_id);
CLASS_ATTR_ACCESSORS(c, "id", NULL, mqtt_set_id);
CLASS_ATTR_CHAR(c, "clean", 0, t_mqtt, a_clean);
CLASS_ATTR_CHAR(c, "clean", 0, t_mqtt, a_clean);
CLASS_ATTR_STYLE_LABEL(c, "clean", 0, "onoff", "Clean Session");
CLASS_ATTR_ACCESSORS(c, "clean", NULL, mqtt_set_clean);
......@@ -165,14 +158,15 @@ void ext_main(void *r)
class_register(CLASS_BOX, c);
mqtt_class = c;
int res = mosquitto_lib_init();
post("MQTT Client for Max ©2021 https://gitlab.artificiel.org/max/mqtt",0);
int x,y,z;
mosquitto_lib_version(&x,&y,&z);
int res = mosquitto_lib_init();
if (res==MOSQ_ERR_SUCCESS) {
int x,y,z;
mosquitto_lib_version(&x,&y,&z);
post("Mosquitto MQTT Library %i.%i.%i initialized.",x ,y ,z);
} else {
error("Mosquitto MQTT Library %i.%i.%i did not correctly initialize!",x ,y ,z);
error("Mosquitto MQTT Library did not correctly initialize!",0);
}
}
......@@ -240,7 +234,7 @@ void mqtt_set_clean(t_mqtt *x, void *attr, long argc, t_atom *argv) {
void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
if ((argc < 0 && argc > 4) || argc ==3) {
error("[mqtt] connect requires 0, 1, 2 or 4 arguments: (host (port (username password)))");
object_error((t_object *)x,"connect requires 0, 1, 2 or 4 arguments: (host (port (username password)))");
} else {
if (argv>0) {
......@@ -258,7 +252,7 @@ void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
if (argv[1].a_type==A_LONG) {
x->a_port = argv[1].a_w.w_long;
} else {
error("[mqtt] connect port (arg#2) must be int -- using %i", x->a_port);
object_error((t_object *)x,"connect port (arg#2) must be int -- using %i", x->a_port);
}
if (argc ==4) {
......@@ -266,67 +260,69 @@ void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
x->a_password = gensym(argv[3].a_w.w_sym->s_name);
x->a_user = argv[2].a_w.w_sym;
} else {
error("[mqtt] connect username & password (arg#2-3) must be symbols");
object_error((t_object *)x,"connect username & password (arg#2-3) must be symbols");
}
}
}
}
}
if (strlen(x->a_host->s_name)) {
if (x->mosq != NULL) {
mosquitto_free_client(x);
}
x->mosq = mosquitto_new(x->a_id->s_name, false, x); // pass this object into mosquitto for callbacks
if (x->a_verbose) post("[mqtt] instance for cliend id <%s> created",x->a_id->s_name);
mosquitto_disconnect_callback_set(x->mosq, disconnect_callback);
mosquitto_connect_callback_set(x->mosq, connect_callback);
mosquitto_message_callback_set(x->mosq, message_callback);
mosquitto_publish_callback_set(x->mosq, publish_callback);
mosquitto_user_data_set(x->mosq, x);
if (strlen(x->a_user->s_name) && strlen(x->a_password->s_name)) {
mosquitto_username_pw_set(x->mosq, x->a_user->s_name, x->a_password->s_name);
if (x->a_verbose) post("[mqtt] <%s> attempting connection to %s:%s@%s:%i",
x->a_id->s_name, x->a_user->s_name, x->a_password->s_name, x->a_host->s_name, x->a_port);
} else {
if (x->a_verbose) post("[mqtt] <%s> attempting connection to %s:%i",
x->a_id->s_name, x->a_host->s_name, x->a_port);
}
int res= mosquitto_connect(x->mosq, x->a_host->s_name, x->a_port, 60);
int rloop;
switch (res) {
case MOSQ_ERR_SUCCESS:
rloop = mosquitto_loop_start(x->mosq);
if (rloop==MOSQ_ERR_INVAL) {
error("[mqtt] <%s> could not start thread [%02i]:%s", x->a_id->s_name, rloop, mosquitto_reason_string(rloop));
} else if (rloop==MOSQ_ERR_NOT_SUPPORTED) {
error("[mqtt] <%s> could not start thread [MOSQ_ERR_NOT_SUPPORTED]", x->a_id->s_name);
} else {
if (x->a_verbose) post("[mqtt] <%s> started event thread",x->a_id->s_name);
}
// }
break;
case MOSQ_ERR_INVAL: error("[mqtt] connect: <%s> invalid parameters [MOSQ_ERR_INVAL]", x->a_id->s_name); break;
case MOSQ_ERR_ERRNO: error("[mqtt] connect: <%s> out of memory [MOSQ_ERR_NOMEM]", x->a_id->s_name); break;
}
if (x->a_verbose) post("[mqtt] <%s> connection proocess finished.",x->a_id->s_name);
actually_connect(x);
}
}
void actually_connect(t_mqtt *x) {
if (strlen(x->a_host->s_name)) {
if (x->mosq != NULL) {
mosquitto_free_client(x);
}
x->mosq = mosquitto_new(x->a_id->s_name, false, x); // pass this object into mosquitto for callbacks
if (x->a_verbose) object_post((t_object *)x,"instance for cliend id <%s> created",x->a_id->s_name);
mosquitto_disconnect_callback_set(x->mosq, mosquitto_disconnect_callback);
mosquitto_connect_callback_set(x->mosq, mosquitto_connect_callback);
mosquitto_message_callback_set(x->mosq, mosquitto_message_callback);
mosquitto_publish_callback_set(x->mosq, mosquitto_publish_callback);
mosquitto_user_data_set(x->mosq, x);
if (strlen(x->a_user->s_name) && strlen(x->a_password->s_name)) {
mosquitto_username_pw_set(x->mosq, x->a_user->s_name, x->a_password->s_name);
if (x->a_verbose) object_post((t_object *)x,"<%s> attempting connection to %s:%s@%s:%i",
x->a_id->s_name, x->a_user->s_name, x->a_password->s_name, x->a_host->s_name, x->a_port);
} else {
error("[mqtt] host not set");
if (x->a_verbose) object_post((t_object *)x,"<%s> attempting connection to %s:%i",
x->a_id->s_name, x->a_host->s_name, x->a_port);
}
int res= mosquitto_connect(x->mosq, x->a_host->s_name, x->a_port, 60);
int rloop;
switch (res) {
case MOSQ_ERR_SUCCESS:
rloop = mosquitto_loop_start(x->mosq);
switch (rloop) {
case MOSQ_ERR_SUCCESS:
if (x->a_verbose) object_post((t_object *)x,"<%s> started event thread",x->a_id->s_name);
break;
default:
object_error((t_object *)x,"<%s> could not start thread [%02i]:%s", x->a_id->s_name, rloop, mosquitto_reason_string(rloop));
}
break;
default:
object_error((t_object *)x,"<%s> could not connect [%02i]:%s", x->a_id->s_name, res,mosquitto_reason_string(res));
}
} else {
object_error((t_object *)x,"need to set host prior to connect");
}
}
void mqtt_loop_method(t_mqtt * x) {
post("loop...");
int res = mosquitto_loop(x->mosq, -1, 1);
post("loop %i", res);
}
//void mqtt_loop_method(t_mqtt * x) {
//int res = mosquitto_loop(x->mosq, -1, 1);
//}
void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
t_symbol topic;
......@@ -335,7 +331,7 @@ void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
bool retain = false;
if (argc < 2 || argc > 4) {
error("[mqtt] publish requires 2, 3 or 4 arguments: topic payload (qos (retain))");
object_error((t_object *)x,"publish requires 2, 3 or 4 arguments: topic payload (qos (retain))");
} else {
if (argv[0].a_type==A_SYM) {
topic = *argv[0].a_w.w_sym;
......@@ -368,13 +364,13 @@ void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
int res = mosquitto_publish(x->mosq, NULL, topic.s_name, strlen(x->payload), x->payload, qos, retain);
switch (res) {
case MOSQ_ERR_SUCCESS: /* silent success */
if (x->a_verbose) post("[mqtt] <%s> published %s with %s",x->a_id->s_name, topic.s_name, x->payload);
if (x->a_verbose) object_post((t_object *)x,"<%s> published %s with %s",x->a_id->s_name, topic.s_name, x->payload);
break;
default:
error("[mqtt] <%s> can't publish %s [%s]", x->a_id->s_name, topic, mosquitto_reason_string(res));
object_error((t_object *)x,"<%s> can't publish %s [%s]", x->a_id->s_name, topic, mosquitto_reason_string(res));
}
} else {
error("[mqtt] publish topic (arg#1) must be string");
object_error((t_object *)x,"publish topic (arg#1) must be string");
}
}
}
......@@ -383,10 +379,10 @@ void mqtt_subscribe_method(t_mqtt *x, t_symbol * topic) {
int sres = mosquitto_subscribe(x->mosq, NULL, topic->s_name, 0);
switch (sres) {
case MOSQ_ERR_SUCCESS:
if (x->a_verbose) post("[mqtt] <%s> susbscribed %s", x->a_id->s_name, topic->s_name);
if (x->a_verbose) object_post((t_object *)x,"<%s> susbscribed %s", x->a_id->s_name, topic->s_name);
break;
default:
error("[mqtt] subscribe <%s> failed [%02i: %s]", x->a_id->s_name, sres, mosquitto_reason_string(sres));
object_error((t_object *)x,"subscribe <%s> failed [%02i: %s]", x->a_id->s_name, sres, mosquitto_reason_string(sres));
}
}
......@@ -394,10 +390,10 @@ void mqtt_unsubscribe_method(t_mqtt *x, t_symbol * topic) {
int sres = mosquitto_unsubscribe(x->mosq, NULL, topic->s_name);
switch (sres) {
case MOSQ_ERR_SUCCESS:
if (x->a_verbose) post("[mqtt] <%s> unsusbscribed %s", x->a_id->s_name, topic->s_name);
if (x->a_verbose) object_post((t_object *)x,"<%s> unsusbscribed %s", x->a_id->s_name, topic->s_name);
break;
default:
error("[mqtt] unsubscribe <%s> failed [%s]]", x->a_id->s_name, mosquitto_reason_string(sres));
object_error((t_object *)x,"unsubscribe <%s> failed [%s]]", x->a_id->s_name, mosquitto_reason_string(sres));
}
}
......@@ -412,6 +408,7 @@ void *mqtt_new(t_symbol *s, long argc, t_atom *argv)
gethostname(hostname, 1023);
snprintf(random_id, 99, "%s-%03ld", hostname, MOSQUITTO_CLIENT_ID++);
x->a_id = gensym(random_id);
x->a_verbose = 0;
x->a_clean = 0;
......@@ -426,10 +423,14 @@ void *mqtt_new(t_symbol *s, long argc, t_atom *argv)
x->p_status_outlet = outlet_new(x, NULL);
x->p_main_outlet = outlet_new(x, NULL);
if (x->a_autoconnect) {
actually_connect(x);
}
return(x);
}
void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s) // 4 final arguments are always the same for the assistance method
void mqtt_assist_method(t_mqtt *x, void *b, long m, long a, char *s) // 4 final arguments are always the same for the assistance method
{
if (m == ASSIST_OUTLET)
switch (a) {
......@@ -451,3 +452,15 @@ void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s) // 4 final argumen
}
}
}
void mosquitto_free_client(t_mqtt *x) {
if (x->a_verbose) object_post((t_object *)x,"freeing underlying client <%s>", x->a_id->s_name);
mosquitto_disconnect(x->mosq);
mosquitto_loop_stop(x->mosq, true);
mosquitto_destroy(x->mosq);
}
void mqtt_free(t_mqtt *x)
{
mosquitto_free_client(x);
}
......@@ -46,7 +46,6 @@
"numinlets" : 1,
"numoutlets" : 0,
"patching_rect" : [ 366.0, 631.0, 211.0, 48.0 ],
"presentation_linecount" : 3,
"text" : "some NTP methods could be devised to find concrete diff/latency"
}
......@@ -1615,8 +1614,8 @@
"numinlets" : 1,
"numoutlets" : 2,
"outlettype" : [ "", "" ],
"patching_rect" : [ 216.0, 379.0, 413.0, 22.0 ],
"text" : "mqtt @host w.artificiel.org @user forum @password maxmsp"
"patching_rect" : [ 216.0, 379.0, 521.0, 22.0 ],
"text" : "mqtt @host w.artificiel.org @user forum @password maxmsp @autoconnect 1"
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment