#include #include "/usr/local/include/mosquitto.h" #include #include #include #include "ext.h" #include "ext_obex.h" typedef struct _mqtt { t_object p_ob; void *p_status_outlet; void *p_main_outlet; struct mosquitto *mosq; // instance of mosquitto client int a_state; // MQTT connection state long a_port; // MQTT Broker port char a_clean; // Clean session char a_verbose; // verbose chatter in Max windows char a_autoconnect; // auto-connect on object instanciation t_symbol *a_user; // MQTT username t_symbol *a_host; // MQTT host t_symbol *a_password;; // MQTT password t_symbol *a_id; // Client ID char payload[1000]; // reusable memory for received payload } t_mqtt; #define MQTT_DEFAULT_PORT 1883 static long MOSQUITTO_CLIENT_ID = 0; // download & compile static mosquitto lib (did not fight TLS/https; disabled in cmake) // /usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none/usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none /Users/brtn/Downloads/max-sdk-8.0.3/externals/mqtt.mxo/Contents/Frameworks/libmosquitto_static.a // add to max project void mqtt_change_state(t_mqtt *x, int state) { switch (state) { case 1: if (x->a_verbose) post("[mqtt] <%s> connection up",x->a_id->s_name); x->a_state =1; break; default: if (x->a_verbose) post("[mqtt] <%s> connecction down",x->a_id->s_name); x->a_state =0; } outlet_int(x->p_status_outlet, x->a_state); } void publish_callback(struct mosquitto *mosq, void *obj, int mid) { t_mqtt * x = obj; if (x->a_verbose) post("[mqtt] <%s> published topic id#%05i",x->a_id->s_name,mid); } void connect_callback(struct mosquitto *mosq, void *obj, int result) { t_mqtt * x = obj; switch (result) { case 0: mqtt_change_state(x, 1); break; default: error("[mqtt] <%s> Connection Refused, [%s]", x->a_id->s_name, mosquitto_reason_string(result)); mqtt_change_state(x, 0); mosquitto_disconnect(mosq); } } void disconnect_callback(struct mosquitto *mosq, void *obj, int result) { t_mqtt * x = obj; if (result ==0) { if (x->a_verbose) post("[mqtt] <%s> disconnected as requested",x->a_id->s_name); } else { post("[mqtt] <%s> unexpectedly disconnected from broker",x->a_id->s_name); } mqtt_change_state(x, 0); } void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { t_mqtt * x = obj; t_atom a[1]; // asssumes something that fits in a max symbol -- should check for cues or magic numbers and generates json dicts, jitter matrices, or vectors of audio atom_setsym(a+0, gensym((char*) message->payload)); outlet_anything(x->p_main_outlet, gensym(message->topic), 1,a); } // methods void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s); void mqtt_subscribe_method(t_mqtt *x, t_symbol *topic); void mqtt_unsubscribe_method(t_mqtt *x, t_symbol *topic); void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv); void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv); void mqtt_loop_method(t_mqtt *x); // attribute setters void mqtt_set_clean(t_mqtt *x, void *attr, long argc, t_atom *argv); void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv); void mqtt_set_user(t_mqtt *x, void *attr, long argc, t_atom *argv); void mqtt_set_port(t_mqtt *x, void *attr, long argc, t_atom *argv); void mqtt_set_password(t_mqtt *x, void *attr, long argc, t_atom *argv); void mqtt_set_host(t_mqtt *x, void *attr, long argc, t_atom *argv); void mqtt_set_autoconnect(t_mqtt *x, void *attr, long argc, t_atom *argv); void *mqtt_new(t_symbol *s, long argc, t_atom *argv); void mosquitto_free_client(t_mqtt *x) { if (x->a_verbose) post("[mqtt] freeing underlying client <%s>", x->a_id->s_name); mosquitto_disconnect(x->mosq); mosquitto_loop_stop(x->mosq, true); mosquitto_destroy(x->mosq); } void mqtt_free(t_mqtt *x) { mosquitto_free_client(x); } t_class *mqtt_class; void ext_main(void *r) { t_class *c; c = class_new("mqtt", (method)mqtt_new, (method)mqtt_free, sizeof(t_mqtt), 0L, A_GIMME, 0); class_addmethod(c, (method)mqtt_loop_method, "bang", 0); class_addmethod(c, (method)mqtt_assist, "assist", A_CANT, 0); class_addmethod(c, (method)mqtt_subscribe_method, "subscribe", A_DEFSYM, 0); class_addmethod(c, (method)mqtt_unsubscribe_method, "unsubscribe",A_DEFSYM, 0); class_addmethod(c, (method)mqtt_publish_method, "publish", A_GIMME, 0); class_addmethod(c, (method)mqtt_connect_method, "connect", A_GIMME, 0); CLASS_ATTR_SYM(c, "id", 0, t_mqtt, a_id); CLASS_ATTR_ACCESSORS(c, "id", NULL, mqtt_set_id); CLASS_ATTR_CHAR(c, "clean", 0, t_mqtt, a_clean); CLASS_ATTR_STYLE_LABEL(c, "clean", 0, "onoff", "Clean Session"); CLASS_ATTR_ACCESSORS(c, "clean", NULL, mqtt_set_clean); CLASS_ATTR_CHAR(c, "verbose", 0, t_mqtt, a_verbose); CLASS_ATTR_STYLE_LABEL(c, "verbose", 0, "onoff", "Verbose"); CLASS_ATTR_CHAR(c, "autoconnect", 0, t_mqtt, a_autoconnect); CLASS_ATTR_STYLE_LABEL(c, "autoconnect", 0, "onoff", "Auto-connect"); CLASS_ATTR_ACCESSORS(c, "autoconnect", NULL, mqtt_set_autoconnect); CLASS_ATTR_SYM(c, "user", 0, t_mqtt, a_user); CLASS_ATTR_ACCESSORS(c, "user", NULL, mqtt_set_user); CLASS_ATTR_SYM(c, "password", 0, t_mqtt, a_password); CLASS_ATTR_ACCESSORS(c, "password", NULL, mqtt_set_password); CLASS_ATTR_SYM(c, "host", 0, t_mqtt, a_host); CLASS_ATTR_ACCESSORS(c, "host", NULL, mqtt_set_host); CLASS_ATTR_LONG(c, "port", 0, t_mqtt, a_port); CLASS_ATTR_ACCESSORS(c, "port", NULL, mqtt_set_port); class_register(CLASS_BOX, c); mqtt_class = c; int res = mosquitto_lib_init(); post("MQTT Client for Max ©2021 https://gitlab.artificiel.org/max/mqtt",0); int x,y,z; mosquitto_lib_version(&x,&y,&z); if (res==MOSQ_ERR_SUCCESS) { post("Mosquitto MQTT Library %i.%i.%i initialized.",x ,y ,z); } else { error("Mosquitto MQTT Library %i.%i.%i did not correctly initialize!",x ,y ,z); } } void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv) { if (atom_getsym(argv)->s_name != x->a_id->s_name) { x->a_id = atom_getsym(argv); if (x->a_state == 1) { post("TO IMPLEMENT: should reconnect with id %s", x->a_id->s_name); } } } void mqtt_set_user(t_mqtt *x, void *attr, long argc, t_atom *argv) { if (atom_getsym(argv)->s_name != x->a_user->s_name) { x->a_user = atom_getsym(argv); if (x->a_state == 1) { post("TO IMPLEMENT: should reconnect with user %s", x->a_user->s_name); } } } void mqtt_set_port(t_mqtt *x, void *attr, long argc, t_atom *argv) { if (atom_getlong(argv) != x->a_port) { x->a_port = atom_getlong(argv); if (x->a_state == 1) { post("TO IMPLEMENT: should reconnect with port %i", x->a_port); } } } void mqtt_set_password(t_mqtt *x, void *attr, long argc, t_atom *argv) { if (atom_getsym(argv)->s_name != x->a_password->s_name) { x->a_password = atom_getsym(argv); if (x->a_state == 1) { post("TO IMPLEMENT: should reconnect with passwoord %s", x->a_password->s_name); } } } void mqtt_set_host(t_mqtt *x, void *attr, long argc, t_atom *argv) { if (atom_getsym(argv)->s_name != x->a_host->s_name) { x->a_host = atom_getsym(argv); if (x->a_state == 1) { post("TO IMPLEMENT: should reconnect with host %s", x->a_host->s_name); } } } void mqtt_set_autoconnect(t_mqtt *x, void *attr, long argc, t_atom *argv) { x->a_autoconnect = atom_getlong(argv); if (x->a_state == 0 && x->a_autoconnect == 1) { post("TO IMPLEMENT: should connect when autoconnect is turned on",0); } } void mqtt_set_clean(t_mqtt *x, void *attr, long argc, t_atom *argv) { if (atom_getlong(argv) != x->a_clean) { x->a_clean = atom_getlong(argv); if (x->a_state == 1) { post("TO IMPLEMENT: should reconnect with cleaniness %i", x->a_clean); } } } void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) { if ((argc < 0 && argc > 4) || argc ==3) { error("[mqtt] connect requires 0, 1, 2 or 4 arguments: (host (port (username password)))"); } else { if (argv>0) { if (argv[0].a_type==A_SYM) { //mosquitto_reinitialise(x->mosq, x->clientid, false, x); // char host[100]; // int port = MQTT_DEFAULT_PORT; // sprintf(host, "%s", argv[0].a_w.w_sym->s_name); // x->a_host = argv[0].a_w.w_sym; if (argc > 1) { if (argv[1].a_type==A_LONG) { x->a_port = argv[1].a_w.w_long; } else { error("[mqtt] connect port (arg#2) must be int -- using %i", x->a_port); } if (argc ==4) { if (argv[2].a_type==A_SYM && argv[3].a_type==A_SYM) { x->a_password = gensym(argv[3].a_w.w_sym->s_name); x->a_user = argv[2].a_w.w_sym; } else { error("[mqtt] connect username & password (arg#2-3) must be symbols"); } } } } } if (strlen(x->a_host->s_name)) { if (x->mosq != NULL) { mosquitto_free_client(x); } x->mosq = mosquitto_new(x->a_id->s_name, false, x); // pass this object into mosquitto for callbacks if (x->a_verbose) post("[mqtt] instance for cliend id <%s> created",x->a_id->s_name); mosquitto_disconnect_callback_set(x->mosq, disconnect_callback); mosquitto_connect_callback_set(x->mosq, connect_callback); mosquitto_message_callback_set(x->mosq, message_callback); mosquitto_publish_callback_set(x->mosq, publish_callback); mosquitto_user_data_set(x->mosq, x); if (strlen(x->a_user->s_name) && strlen(x->a_password->s_name)) { mosquitto_username_pw_set(x->mosq, x->a_user->s_name, x->a_password->s_name); if (x->a_verbose) post("[mqtt] <%s> attempting connection to %s:%s@%s:%i", x->a_id->s_name, x->a_user->s_name, x->a_password->s_name, x->a_host->s_name, x->a_port); } else { if (x->a_verbose) post("[mqtt] <%s> attempting connection to %s:%i", x->a_id->s_name, x->a_host->s_name, x->a_port); } int res= mosquitto_connect(x->mosq, x->a_host->s_name, x->a_port, 60); int rloop; switch (res) { case MOSQ_ERR_SUCCESS: rloop = mosquitto_loop_start(x->mosq); if (rloop==MOSQ_ERR_INVAL) { error("[mqtt] <%s> could not start thread [%02i]:%s", x->a_id->s_name, rloop, mosquitto_reason_string(rloop)); } else if (rloop==MOSQ_ERR_NOT_SUPPORTED) { error("[mqtt] <%s> could not start thread [MOSQ_ERR_NOT_SUPPORTED]", x->a_id->s_name); } else { if (x->a_verbose) post("[mqtt] <%s> started event thread",x->a_id->s_name); } // } break; case MOSQ_ERR_INVAL: error("[mqtt] connect: <%s> invalid parameters [MOSQ_ERR_INVAL]", x->a_id->s_name); break; case MOSQ_ERR_ERRNO: error("[mqtt] connect: <%s> out of memory [MOSQ_ERR_NOMEM]", x->a_id->s_name); break; } if (x->a_verbose) post("[mqtt] <%s> connection proocess finished.",x->a_id->s_name); } else { error("[mqtt] host not set"); } } } void mqtt_loop_method(t_mqtt * x) { post("loop..."); int res = mosquitto_loop(x->mosq, -1, 1); post("loop %i", res); } void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) { t_symbol topic; long qos = 0; bool retain = false; if (argc < 2 || argc > 4) { error("[mqtt] publish requires 2, 3 or 4 arguments: topic payload (qos (retain))"); } else { if (argv[0].a_type==A_SYM) { topic = *argv[0].a_w.w_sym; if (argv[1].a_type==A_LONG) { sprintf(x->payload, "%lli", argv[1].a_w.w_long); } else if (argv[1].a_type==A_FLOAT) { sprintf(x->payload, "%f", argv[1].a_w.w_float); } else if (argv[1].a_type==A_SYM) { sprintf(x->payload, "%s", argv[1].a_w.w_sym->s_name); } if (argc>2) { if (argv[2].a_type==A_LONG) { long q = argv[2].a_w.w_long; if (q <0) q = 0; if (q > 2) q = 2; qos = q; } if (argc==4) { if (argv[4].a_type==A_LONG) { long q = argv[2].a_w.w_long; if (q <0) q = 0; if (q > 1) q = 1; retain = q; } } } int res = mosquitto_publish(x->mosq, NULL, topic.s_name, strlen(x->payload), x->payload, qos, retain); switch (res) { case MOSQ_ERR_SUCCESS: /* silent success */ if (x->a_verbose) post("[mqtt] <%s> published %s with %s",x->a_id->s_name, topic.s_name, x->payload); break; default: error("[mqtt] <%s> can't publish %s [%s]", x->a_id->s_name, topic, mosquitto_reason_string(res)); } } else { error("[mqtt] publish topic (arg#1) must be string"); } } } void mqtt_subscribe_method(t_mqtt *x, t_symbol * topic) { int sres = mosquitto_subscribe(x->mosq, NULL, topic->s_name, 0); switch (sres) { case MOSQ_ERR_SUCCESS: if (x->a_verbose) post("[mqtt] <%s> susbscribed %s", x->a_id->s_name, topic->s_name); break; default: error("[mqtt] subscribe <%s> failed [%02i: %s]", x->a_id->s_name, sres, mosquitto_reason_string(sres)); } } void mqtt_unsubscribe_method(t_mqtt *x, t_symbol * topic) { int sres = mosquitto_unsubscribe(x->mosq, NULL, topic->s_name); switch (sres) { case MOSQ_ERR_SUCCESS: if (x->a_verbose) post("[mqtt] <%s> unsusbscribed %s", x->a_id->s_name, topic->s_name); break; default: error("[mqtt] unsubscribe <%s> failed [%s]]", x->a_id->s_name, mosquitto_reason_string(sres)); } } void *mqtt_new(t_symbol *s, long argc, t_atom *argv) { t_mqtt *x; x = (t_mqtt *)object_alloc(mqtt_class); char hostname[100]; char random_id[100]; hostname[99] = '\0'; gethostname(hostname, 1023); snprintf(random_id, 99, "%s-%03ld", hostname, MOSQUITTO_CLIENT_ID++); x->a_id = gensym(random_id); x->a_verbose = 0; x->a_clean = 0; x->a_autoconnect = 0; x->a_port = 1883; x->a_host = gensym(""); x->a_user = gensym(""); x->a_password = gensym(""); attr_args_process(x, argc, argv); x->p_status_outlet = outlet_new(x, NULL); x->p_main_outlet = outlet_new(x, NULL); return(x); } void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s) // 4 final arguments are always the same for the assistance method { if (m == ASSIST_OUTLET) switch (a) { case 0: sprintf(s,"topic and payload (2 symbols list)"); break; case 1: sprintf(s,"connection status (0 or 1)"); break; } else { switch (a) { case 0: sprintf(s,"all messages go here"); break; case 1: sprintf(s,"Inlet %ld: Right Operand (Added to Left)", a); break; } } }