#include #include "/usr/local/include/mosquitto.h" #include #include #include #include "ext.h" #include "ext_obex.h" typedef struct _mqtt { t_object p_ob; void *p_status_outlet; void *p_main_outlet; struct mosquitto *mosq; // instance of mosquitto client int a_state; // MQTT connection state long a_port; // MQTT Broker port char *a_verbose; // verbose chatter in Max windows t_symbol *a_autoconnect; // auto-connect on object instanciation t_symbol *a_user; // MQTT username t_symbol *a_pw; // MQTT password t_symbol *a_id; // Client ID } t_mqtt; #define MQTT_DEFAULT_PORT 1883 static long MOSQUITTO_CLIENT_ID = 0; // download & compile static mosquitto lib (did not fight TLS/https; disabled in cmake) // /usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none/usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none /Users/brtn/Downloads/max-sdk-8.0.3/externals/mqtt.mxo/Contents/Frameworks/libmosquitto_static.a // add to max project void mqtt_change_state(t_mqtt *x, int state) { switch (state) { case 1: if (x->a_verbose) post("Mosquitto <%s> connection up",x->a_id->s_name); x->a_state =1; break; default: if (x->a_verbose) post("Mosquitto <%s> connecction down",x->a_id->s_name); x->a_state =0; } outlet_int(x->p_status_outlet, x->a_state); } void connect_callback(struct mosquitto *mosq, void *obj, int result) { t_mqtt * x = obj; switch (result) { case 0: mqtt_change_state(x, 1); break; default: error("Mosquitto <%s> Connection Refused, [%s]", x->a_id->s_name, mosquitto_reason_string(result)); mqtt_change_state(x, 0); mosquitto_disconnect(mosq); } } void disconnect_callback(struct mosquitto *mosq, void *obj, int result) { t_mqtt * x = obj; if (result ==0) { if (x->a_verbose) post("Mosquitto <%s> disconnected as requested",x->a_id->s_name); } else { post("Mosquitto <%s> unexpectedly disconnected from broker",x->a_id->s_name); } mqtt_change_state(x, 0); } void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { t_mqtt * x = obj; t_atom a[1]; atom_setsym(a+0, gensym((char*) message->payload)); outlet_anything(x->p_main_outlet, gensym(message->topic), 1,a); } void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s); void mqtt_subscribe_method(t_mqtt *x, t_symbol *topic); void mqtt_unsubscribe_method(t_mqtt *x, t_symbol *topic); void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv); void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv); void mqtt_loop_method(t_mqtt *x); void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv); void *mqtt_new(t_symbol *s, long argc, t_atom *argv); void mosquitto_free_client(t_mqtt *x) { if (x->a_verbose) post("Mosquitto: freeing underlying client <%s>", x->a_id->s_name); mosquitto_disconnect(x->mosq); mosquitto_loop_stop(x->mosq, true); mosquitto_destroy(x->mosq); } void mqtt_free(t_mqtt *x) { mosquitto_free_client(x); } t_class *mqtt_class; void ext_main(void *r) { t_class *c; c = class_new("mqtt", (method)mqtt_new, (method)mqtt_free, sizeof(t_mqtt), 0L, A_GIMME, 0); class_addmethod(c, (method)mqtt_loop_method, "bang", 0); class_addmethod(c, (method)mqtt_assist, "assist", A_CANT, 0); class_addmethod(c, (method)mqtt_subscribe_method, "subscribe", A_DEFSYM, 0); class_addmethod(c, (method)mqtt_unsubscribe_method, "unsubscribe",A_DEFSYM, 0); class_addmethod(c, (method)mqtt_publish_method, "publish", A_GIMME, 0); class_addmethod(c, (method)mqtt_connect_method, "connect", A_GIMME, 0); CLASS_ATTR_SYM(c, "id", 0, t_mqtt, a_id); CLASS_ATTR_ACCESSORS(c, "id", NULL, mqtt_set_id); CLASS_ATTR_CHAR(c, "verbose", 0, t_mqtt, a_verbose); CLASS_ATTR_STYLE_LABEL(c, "verbose",0,"onoff","Verbose"); // CLASS_ATTR_SYM(c, "autoconnect", 0, t_mqtt, a_autoconnect); CLASS_ATTR_SYM(c, "user", 0, t_mqtt, a_user); CLASS_ATTR_SYM(c, "password", 0, t_mqtt, a_pw); CLASS_ATTR_LONG(c, "port", 0, t_mqtt, a_port); class_register(CLASS_BOX, c); mqtt_class = c; int res = mosquitto_lib_init(); post("MQTT Client for Max ©2021 https://gitlab.artificiel.org/max/mqtt",0); if (res==MOSQ_ERR_SUCCESS) { int a,b,c; mosquitto_lib_version(&a,&b,&c); post("Mosquitto MQTT Library %i.%i.%i initialized.",a,b,c); } else { error("Mosquitto MQTT did not correctly initialize!",0); } } void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv) { if (atom_getsym(argv)->s_name != x->a_id->s_name) { x->a_id = atom_getsym(argv); if (x->a_state == 1) { post("TO IMPLEMENT: should reconnect with id %s", x->a_id->s_name); } } } void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) { if ((argc < 1 && argc > 4) || argc ==3) { error("mqtt connect requires 1, 2 or 4 arguments: host (port (username password))"); } else { if (argv[0].a_type==A_SYM) { // mosquitto_reinitialise(x->mosq, x->clientid, false, x); if (x->mosq != NULL) { mosquitto_free_client(x); } x->mosq = mosquitto_new(x->a_id->s_name, false, x); if (x->a_verbose) post("Mosquitto Instance id <%s> created",x->a_id->s_name); char host[100]; int port = MQTT_DEFAULT_PORT; sprintf(host, "%s", argv[0].a_w.w_sym->s_name); if (argc > 1) { if (argv[1].a_type==A_LONG) { port = argv[1].a_w.w_long; } else { error("mqtt connect port (arg#2) must be int -- using default 1883"); } if (argc ==4) { if (argv[2].a_type==A_SYM && argv[3].a_type==A_SYM) { mosquitto_username_pw_set(x->mosq, argv[2].a_w.w_sym->s_name, argv[3].a_w.w_sym->s_name); } else { error("mqtt connect username & password (arg#2-3) must be symbols -- skipping auth"); } } } mosquitto_disconnect_callback_set(x->mosq, disconnect_callback); mosquitto_connect_callback_set(x->mosq, connect_callback); mosquitto_message_callback_set(x->mosq, message_callback); mosquitto_user_data_set(x->mosq, x); // rc = mosquitto_loop(x->mosq, -1, 1); // if(rc){ // error ("Mosquitto loop error!\n"); // } else { int res = mosquitto_connect(x->mosq, host, port, 60); // post("Mosquitto Instance id %s starting loop",x->a_id->s_name ); // int rloop = mosquitto_loop_start(x->mosq); // if (rloop==MOSQ_ERR_INVAL) { // error("Mosquitto <%s> could not start thread [MOSQ_ERR_INVAL]", x->clientid); // } else if (rloop==MOSQ_ERR_NOT_SUPPORTED) { // error("Mosquitto <%s> could not start thread [MOSQ_ERR_NOT_SUPPORTED]", x->clientid); // } else { // post("Mosquitto <%s> started event thread",x->clientid); // } // // } // // mosquitto_disconnect(x->mosq); // mosquitto_loop_stop(x->mosq, false); int rloop; switch (res) { case MOSQ_ERR_SUCCESS: // rc = mosquitto_loop(x->mosq, -1, 1); // if(rc){ // error ("Mosquitto connection error!\n"); // } else { rloop = mosquitto_loop_start(x->mosq); if (rloop==MOSQ_ERR_INVAL) { error("Mosquitto <%s> could not start thread [MOSQ_ERR_INVAL]", x->a_id->s_name); } else if (rloop==MOSQ_ERR_NOT_SUPPORTED) { error("Mosquitto <%s> could not start thread [MOSQ_ERR_NOT_SUPPORTED]", x->a_id->s_name); } else { if (x->a_verbose) post("Mosquitto <%s> started event thread",x->a_id->s_name); } // } break; case MOSQ_ERR_INVAL: error("Mosquitto::connect <%s> invalid parameters [MOSQ_ERR_INVAL]", x->a_id); break; case MOSQ_ERR_ERRNO: error("Mosquitto::connect <%s> out of memory [MOSQ_ERR_NOMEM]", x->a_id); break; } } else { error("mqtt connect host (arg#1) must be string"); } } } void mqtt_loop_method(t_mqtt * x) { post("loop..."); int res = mosquitto_loop(x->mosq, -1, 1); post("loop %i", res); } void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) { t_symbol topic; char value[100]; long qos = 0; bool retain = false; if (argc < 2 || argc > 4) { error("mqtt publish requires 2, 3 or 4 arguments: topic valuee (qos (retain))"); } else { if (argv[0].a_type==A_SYM) { topic = *argv[0].a_w.w_sym; if (argv[1].a_type==A_LONG) { sprintf(value, "%lli", argv[1].a_w.w_long); } else if (argv[1].a_type==A_FLOAT) { sprintf(value, "%f", argv[1].a_w.w_float); } else if (argv[1].a_type==A_SYM) { sprintf(value, "%s", argv[1].a_w.w_sym->s_name); } if (argc>2) { if (argv[2].a_type==A_LONG) { long q = argv[2].a_w.w_long; if (q <0) q = 0; if (q > 2) q = 2; qos = q; } if (argc==4) { if (argv[4].a_type==A_LONG) { long q = argv[2].a_w.w_long; if (q <0) q = 0; if (q > 1) q = 1; retain = q; } } } int res = mosquitto_publish(x->mosq, NULL, topic.s_name, strlen(value), value, qos, retain); switch (res) { case MOSQ_ERR_SUCCESS: /* silent success */ break; default: error("Mosquitto::publish <%s> %s [%s]", x->a_id->s_name, topic, mosquitto_reason_string(res)); } } else { error("mqtt publish topic (arg#1) must be string"); } } } void mqtt_subscribe_method(t_mqtt *x, t_symbol * topic) { int sres = mosquitto_subscribe(x->mosq, NULL, topic->s_name, 0); switch (sres) { case MOSQ_ERR_SUCCESS: if (x->a_verbose) post("Mosquitto <%s> susbscribed %s", x->a_id->s_name, topic->s_name); break; default: error("Mosquitto::subscribe <%s> [%s]", x->a_id->s_name, mosquitto_reason_string(sres)); } } void mqtt_unsubscribe_method(t_mqtt *x, t_symbol * topic) { int sres = mosquitto_unsubscribe(x->mosq, NULL, topic->s_name); switch (sres) { case MOSQ_ERR_SUCCESS: if (x->a_verbose) post("Mosquitto <%s> unsusbscribed %s", x->a_id->s_name, topic->s_name); break; default: error("Mosquitto::unsubscribe <%s> [%s]]", x->a_id->s_name, mosquitto_reason_string(sres)); } } void *mqtt_new(t_symbol *s, long argc, t_atom *argv) // n = int argument typed into object box (A_DEFLONG) -- defaults to 0 if no args are typed { t_mqtt *x; x = (t_mqtt *)object_alloc(mqtt_class); char hostname[100]; char random_id[100]; hostname[99] = '\0'; gethostname(hostname, 1023); snprintf(random_id, 99, "%s-maxmsp%05ld", hostname, MOSQUITTO_CLIENT_ID++); x->a_id = gensym(random_id); x->a_verbose = 0; attr_args_process(x, argc, argv); x->p_status_outlet = outlet_new(x, NULL); x->p_main_outlet = outlet_new(x, NULL); return(x); } void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s) // 4 final arguments are always the same for the assistance method { if (m == ASSIST_OUTLET) sprintf(s,"Sum of Left and Right Inlets"); else { switch (a) { case 0: sprintf(s,"Inlet %ld: Left Operand (Causes Output)", a); break; case 1: sprintf(s,"Inlet %ld: Right Operand (Added to Left)", a); break; } } }