mqtt.c 13.3 KB
Newer Older
alexandre burton's avatar
alexandre burton committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <stdio.h>
#include "/usr/local/include/mosquitto.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "ext.h"
#include "ext_obex.h"

typedef struct _mqtt {
    t_object p_ob;
    void *p_status_outlet;
    void *p_main_outlet;
    
    struct mosquitto *mosq;     // instance of mosquitto client
    int a_state;                // MQTT connection state
    long a_port;                // MQTT Broker port
    
    char *a_verbose;            // verbose chatter in Max windows
    t_symbol *a_autoconnect;    // auto-connect on object instanciation
    t_symbol *a_user;           // MQTT username
    t_symbol *a_pw;             // MQTT password
    t_symbol *a_id;             // Client ID
24
    
alexandre burton's avatar
alexandre burton committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
} t_mqtt;

#define MQTT_DEFAULT_PORT 1883
static long MOSQUITTO_CLIENT_ID = 0;

// download & compile static mosquitto lib (did not fight TLS/https; disabled in cmake)
// /usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none/usr/bin/codesign --force --sign 0EFF62FA1517300CC579B619A634293E5A5E7193 --timestamp=none /Users/brtn/Downloads/max-sdk-8.0.3/externals/mqtt.mxo/Contents/Frameworks/libmosquitto_static.a
// add to max project

void mqtt_change_state(t_mqtt *x, int state) {
    switch (state) {
        case 1:
            if (x->a_verbose)   post("Mosquitto <%s> connection up",x->a_id->s_name);
            x->a_state =1;
            break;
        default:
            if (x->a_verbose) post("Mosquitto <%s> connecction down",x->a_id->s_name);
            x->a_state =0;
    }
    outlet_int(x->p_status_outlet, x->a_state);
}

void connect_callback(struct mosquitto *mosq, void *obj, int result)
{
    t_mqtt * x = obj;
    switch (result) {
        case 0:
            mqtt_change_state(x, 1);
            break;
        default:
            error("Mosquitto <%s> Connection Refused, [%s]", x->a_id->s_name, mosquitto_reason_string(result));
            mqtt_change_state(x, 0);
            mosquitto_disconnect(mosq);
    }
}

void disconnect_callback(struct mosquitto *mosq, void *obj, int result)
{
    t_mqtt * x = obj;
    if (result ==0) {
        if (x->a_verbose) post("Mosquitto <%s> disconnected as requested",x->a_id->s_name);
    } else {
        post("Mosquitto <%s> unexpectedly disconnected from broker",x->a_id->s_name);
    }
    mqtt_change_state(x, 0);
}

void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
    t_mqtt * x = obj;
    t_atom a[1];
76
77
    
    // asssumes something that fits in a max symbol -- should check for cues or magic numbers and generates json dicts, jitter matrices, or vectors of audio
alexandre burton's avatar
alexandre burton committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    atom_setsym(a+0, gensym((char*) message->payload));
    outlet_anything(x->p_main_outlet, gensym(message->topic), 1,a);
}

void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s);
void mqtt_subscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_unsubscribe_method(t_mqtt *x, t_symbol *topic);
void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv);
void mqtt_loop_method(t_mqtt *x);

void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv);

void *mqtt_new(t_symbol *s, long argc, t_atom *argv);

void mosquitto_free_client(t_mqtt *x) {
    if (x->a_verbose) post("Mosquitto: freeing underlying client <%s>", x->a_id->s_name);
    mosquitto_disconnect(x->mosq);
    mosquitto_loop_stop(x->mosq, true);
    mosquitto_destroy(x->mosq);
98
    
alexandre burton's avatar
alexandre burton committed
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
}

void mqtt_free(t_mqtt *x)
{
    mosquitto_free_client(x);
}

t_class *mqtt_class;

void ext_main(void *r)
{
    t_class *c;
    
    c = class_new("mqtt", (method)mqtt_new, (method)mqtt_free, sizeof(t_mqtt), 0L, A_GIMME, 0);
    
    class_addmethod(c, (method)mqtt_loop_method,        "bang",       0);
    class_addmethod(c, (method)mqtt_assist,             "assist",     A_CANT, 0);
    class_addmethod(c, (method)mqtt_subscribe_method,   "subscribe",  A_DEFSYM, 0);
    class_addmethod(c, (method)mqtt_unsubscribe_method, "unsubscribe",A_DEFSYM, 0);
    class_addmethod(c, (method)mqtt_publish_method,     "publish",    A_GIMME, 0);
    class_addmethod(c, (method)mqtt_connect_method,     "connect",    A_GIMME, 0);
    
    CLASS_ATTR_SYM(c,              "id",  0, t_mqtt, a_id);
    CLASS_ATTR_ACCESSORS(c,        "id",  NULL, mqtt_set_id);
123
    
alexandre burton's avatar
alexandre burton committed
124
125
    CLASS_ATTR_CHAR(c,              "verbose",  0, t_mqtt, a_verbose);
    CLASS_ATTR_STYLE_LABEL(c,       "verbose",0,"onoff","Verbose");
126
127
    
    //    CLASS_ATTR_SYM(c, "autoconnect",    0, t_mqtt, a_autoconnect);
alexandre burton's avatar
alexandre burton committed
128
129
130
131
    CLASS_ATTR_SYM(c, "user",           0, t_mqtt, a_user);
    CLASS_ATTR_SYM(c, "password",       0, t_mqtt, a_pw);
    CLASS_ATTR_LONG(c, "port",          0, t_mqtt, a_port);
    
132
    
alexandre burton's avatar
alexandre burton committed
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
    class_register(CLASS_BOX, c);
    mqtt_class = c;
    
    int res = mosquitto_lib_init();
    post("MQTT Client for Max ©2021 https://gitlab.artificiel.org/max/mqtt",0);
    if (res==MOSQ_ERR_SUCCESS) {
        int a,b,c;
        mosquitto_lib_version(&a,&b,&c);
        post("Mosquitto MQTT Library %i.%i.%i initialized.",a,b,c);
    } else {
        error("Mosquitto MQTT did not correctly initialize!",0);
    }
}

void mqtt_set_id(t_mqtt *x, void *attr, long argc, t_atom *argv)
{
    if (atom_getsym(argv)->s_name != x->a_id->s_name) {
        x->a_id = atom_getsym(argv);
        if (x->a_state == 1) {
            post("TO IMPLEMENT: should reconnect with id %s", x->a_id->s_name);
        }
    }
}

void mqtt_connect_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
    if ((argc < 1 && argc > 4) || argc ==3) {
        error("mqtt connect requires 1, 2 or 4 arguments: host (port (username password))");
    } else {
        if (argv[0].a_type==A_SYM) {
            
163
164
            //          mosquitto_reinitialise(x->mosq, x->clientid, false, x);
            
alexandre burton's avatar
alexandre burton committed
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
            if (x->mosq != NULL) {
                mosquitto_free_client(x);
            }
            x->mosq = mosquitto_new(x->a_id->s_name, false, x);
            if (x->a_verbose) post("Mosquitto Instance id <%s> created",x->a_id->s_name);
            
            char host[100];
            int port = MQTT_DEFAULT_PORT;
            sprintf(host, "%s", argv[0].a_w.w_sym->s_name);
            
            if (argc > 1) {
                if (argv[1].a_type==A_LONG) {
                    port = argv[1].a_w.w_long;
                } else {
                    error("mqtt connect port (arg#2) must be int -- using default 1883");
                }
                
                if (argc ==4) {
                    if (argv[2].a_type==A_SYM && argv[3].a_type==A_SYM) {
                        mosquitto_username_pw_set(x->mosq, argv[2].a_w.w_sym->s_name, argv[3].a_w.w_sym->s_name);
                    } else {
                        error("mqtt connect username & password (arg#2-3) must be symbols -- skipping auth");
                    }
                }
            }
            
            mosquitto_disconnect_callback_set(x->mosq, disconnect_callback);
            mosquitto_connect_callback_set(x->mosq, connect_callback);
            mosquitto_message_callback_set(x->mosq, message_callback);
            mosquitto_user_data_set(x->mosq, x);
            
            
            //        rc = mosquitto_loop(x->mosq, -1, 1);
            //        if(rc){
            //            error ("Mosquitto loop error!\n");
            //        } else {
            int res = mosquitto_connect(x->mosq, host, port, 60);
            
203
204
205
206
207
208
209
210
211
212
213
214
            //          post("Mosquitto Instance id %s starting loop",x->a_id->s_name    );
            
            //          int rloop = mosquitto_loop_start(x->mosq);
            //            if (rloop==MOSQ_ERR_INVAL) {
            //                error("Mosquitto <%s> could not start thread [MOSQ_ERR_INVAL]", x->clientid);
            //            } else if (rloop==MOSQ_ERR_NOT_SUPPORTED) {
            //                error("Mosquitto <%s> could not start thread [MOSQ_ERR_NOT_SUPPORTED]", x->clientid);
            //            } else {
            //                post("Mosquitto <%s> started event thread",x->clientid);
            //            }
            //            //        }
            //
alexandre burton's avatar
alexandre burton committed
215
216
217
218
219
220
221
222
223
224
225
            
            
            //           mosquitto_disconnect(x->mosq);
            //            mosquitto_loop_stop(x->mosq, false);
            int rloop;
            switch (res) {
                case MOSQ_ERR_SUCCESS:
                    //                    rc = mosquitto_loop(x->mosq, -1, 1);
                    //                    if(rc){
                    //                        error ("Mosquitto connection error!\n");
                    //                    } else {
226
227
228
229
230
231
232
233
                    rloop = mosquitto_loop_start(x->mosq);
                    if (rloop==MOSQ_ERR_INVAL) {
                        error("Mosquitto <%s> could not start thread [MOSQ_ERR_INVAL]", x->a_id->s_name);
                    } else if (rloop==MOSQ_ERR_NOT_SUPPORTED) {
                        error("Mosquitto <%s> could not start thread [MOSQ_ERR_NOT_SUPPORTED]", x->a_id->s_name);
                    } else {
                        if (x->a_verbose) post("Mosquitto <%s> started event thread",x->a_id->s_name);
                    }
alexandre burton's avatar
alexandre burton committed
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
                    //                    }
                    break;
                case MOSQ_ERR_INVAL: error("Mosquitto::connect <%s> invalid parameters [MOSQ_ERR_INVAL]", x->a_id); break;
                case MOSQ_ERR_ERRNO: error("Mosquitto::connect <%s> out of memory [MOSQ_ERR_NOMEM]", x->a_id); break;
            }
        } else {
            error("mqtt connect host (arg#1) must be string");
        }
    }
}

void mqtt_loop_method(t_mqtt * x) {
    post("loop...");
    int res = mosquitto_loop(x->mosq, -1, 1);
    post("loop %i", res);
}

void mqtt_publish_method(t_mqtt *x, t_symbol *s, long argc, t_atom *argv) {
    t_symbol topic;
    char value[100];
    
    long qos = 0;
    bool retain = false;
    
    if (argc < 2 || argc > 4) {
        error("mqtt publish requires 2, 3 or 4 arguments: topic valuee (qos (retain))");
    } else {
        if (argv[0].a_type==A_SYM) {
            topic = *argv[0].a_w.w_sym;
            
            if (argv[1].a_type==A_LONG) {
                sprintf(value, "%lli", argv[1].a_w.w_long);
            } else if (argv[1].a_type==A_FLOAT) {
                sprintf(value, "%f", argv[1].a_w.w_float);
            } else if (argv[1].a_type==A_SYM) {
                sprintf(value, "%s", argv[1].a_w.w_sym->s_name);
            }
            
            if (argc>2) {
                if (argv[2].a_type==A_LONG) {
                    long q = argv[2].a_w.w_long;
                    if (q <0) q = 0;
                    if (q > 2) q = 2;
                    qos = q;
                }
                if (argc==4) {
                    if (argv[4].a_type==A_LONG) {
                        long q = argv[2].a_w.w_long;
                        if (q <0) q = 0;
                        if (q > 1) q = 1;
                        retain = q;
                    }
                }
            }
            
            int res = mosquitto_publish(x->mosq, NULL, topic.s_name, strlen(value), value, qos,  retain);
            switch (res) {
                case MOSQ_ERR_SUCCESS: /* silent success */
                    break;
                default:
                    error("Mosquitto::publish <%s> %s [%s]", x->a_id->s_name, topic, mosquitto_reason_string(res));
            }
        } else {
            error("mqtt publish topic (arg#1) must be string");
        }
    }
}

void mqtt_subscribe_method(t_mqtt *x, t_symbol * topic) {
    int sres = mosquitto_subscribe(x->mosq, NULL, topic->s_name, 0);
    switch (sres) {
        case MOSQ_ERR_SUCCESS:
            if (x->a_verbose) post("Mosquitto <%s> susbscribed %s", x->a_id->s_name, topic->s_name);
            break;
        default:
            error("Mosquitto::subscribe <%s> [%s]", x->a_id->s_name, mosquitto_reason_string(sres));
    }
}

void mqtt_unsubscribe_method(t_mqtt *x, t_symbol * topic) {
    int sres = mosquitto_unsubscribe(x->mosq, NULL, topic->s_name);
    switch (sres) {
        case MOSQ_ERR_SUCCESS:
            if (x->a_verbose) post("Mosquitto <%s> unsusbscribed %s", x->a_id->s_name, topic->s_name);
            break;
        default:
            error("Mosquitto::unsubscribe <%s> [%s]]", x->a_id->s_name, mosquitto_reason_string(sres));
    }
}

void *mqtt_new(t_symbol *s, long argc, t_atom *argv)		// n = int argument typed into object box (A_DEFLONG) -- defaults to 0 if no args are typed
{
    t_mqtt *x;
    x = (t_mqtt *)object_alloc(mqtt_class);
328
    
alexandre burton's avatar
alexandre burton committed
329
330
331
332
333
334
335
336
    char hostname[100];
    char random_id[100];
    hostname[99] = '\0';
    gethostname(hostname, 1023);
    snprintf(random_id, 99, "%s-maxmsp%05ld", hostname, MOSQUITTO_CLIENT_ID++);
    x->a_id = gensym(random_id);
    x->a_verbose = 0;
    attr_args_process(x, argc, argv);
337
    
alexandre burton's avatar
alexandre burton committed
338
339
    x->p_status_outlet  = outlet_new(x, NULL);
    x->p_main_outlet = outlet_new(x, NULL);
340
    
alexandre burton's avatar
alexandre burton committed
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
    return(x);
}

void mqtt_assist(t_mqtt *x, void *b, long m, long a, char *s) // 4 final arguments are always the same for the assistance method
{
    if (m == ASSIST_OUTLET)
        sprintf(s,"Sum of Left and Right Inlets");
    else {
        switch (a) {
            case 0:
                sprintf(s,"Inlet %ld: Left Operand (Causes Output)", a);
                break;
            case 1:
                sprintf(s,"Inlet %ld: Right Operand (Added to Left)", a);
                break;
        }
    }
}