* Purple is the legal property of its developers, whose names are too numerous * to list here. Please refer to the COPYRIGHT file distributed with this * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA #include <glib/gprintf.h> #include "queuedoutputstream.h" GBufferedInputStream *input; PurpleQueuedOutputStream *output; GCancellable *cancellable; * Represents an MQTT connection. G_DEFINE_TYPE_WITH_PRIVATE(FbMqtt, fb_mqtt, G_TYPE_OBJECT); FbMqttMessageFlags flags; * Represents a reader/writer for an MQTT message. FbMqttMessagePrivate *priv; G_DEFINE_TYPE_WITH_PRIVATE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT); static void fb_mqtt_read_packet(FbMqtt *mqtt); fb_mqtt_dispose(GObject *obj) FbMqtt *mqtt = FB_MQTT(obj); FbMqttPrivate *priv = mqtt->priv; g_byte_array_free(priv->rbuf, TRUE); fb_mqtt_class_init(FbMqttClass *klass) GObjectClass *gklass = G_OBJECT_CLASS(klass); gklass->dispose = fb_mqtt_dispose; * Emitted upon the successful completion of the connection * process. This is emitted as a result of #fb_mqtt_connect(). G_TYPE_FROM_CLASS(klass), * Emitted whenever an error is hit within the #FbMqtt. This * should close the #FbMqtt with #fb_mqtt_close(). G_TYPE_FROM_CLASS(klass), * Emitted upon the successful opening of the remote socket. * This is emitted as a result of #fb_mqtt_open(). This should * call #fb_mqtt_connect(). G_TYPE_FROM_CLASS(klass), * Emitted upon an incoming message from the steam. G_TYPE_FROM_CLASS(klass), 2, G_TYPE_STRING, G_TYPE_BYTE_ARRAY); fb_mqtt_init(FbMqtt *mqtt) FbMqttPrivate *priv = fb_mqtt_get_instance_private(mqtt); priv->rbuf = g_byte_array_new(); fb_mqtt_message_dispose(GObject *obj) FbMqttMessagePrivate *priv = FB_MQTT_MESSAGE(obj)->priv; if ((priv->bytes != NULL) && priv->local) { g_byte_array_free(priv->bytes, TRUE); fb_mqtt_message_class_init(FbMqttMessageClass *klass) GObjectClass *gklass = G_OBJECT_CLASS(klass); gklass->dispose = fb_mqtt_message_dispose; fb_mqtt_message_init(FbMqttMessage *msg) FbMqttMessagePrivate *priv = fb_mqtt_message_get_instance_private(msg); fb_mqtt_error_quark(void) if (G_UNLIKELY(q == 0)) { q = g_quark_from_static_string("fb-mqtt-error-quark"); fb_mqtt_new(PurpleConnection *gc) g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL); mqtt = g_object_new(FB_TYPE_MQTT, NULL); fb_mqtt_close(FbMqtt *mqtt) g_return_if_fail(FB_IS_MQTT(mqtt)); g_source_remove(priv->tev); if (priv->cancellable != NULL) { g_cancellable_cancel(priv->cancellable); g_clear_object(&priv->cancellable); if (priv->conn != NULL) { purple_gio_graceful_close(priv->conn, G_INPUT_STREAM(priv->input), G_OUTPUT_STREAM(priv->output)); g_clear_object(&priv->input); g_clear_object(&priv->output); g_clear_object(&priv->conn); g_byte_array_set_size(priv->rbuf, 0); fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix) if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { /* Return as cancelled means the connection is closing */ /* Now we can check for programming errors */ g_return_if_fail(FB_IS_MQTT(mqtt)); g_prefix_error(&err, "%s: ", prefix); g_signal_emit_by_name(mqtt, "error", err); fb_mqtt_error_literal(FbMqtt *mqtt, FbMqttError error, const gchar *msg) g_return_if_fail(FB_IS_MQTT(mqtt)); err = g_error_new_literal(FB_MQTT_ERROR, error, msg); g_signal_emit_by_name(mqtt, "error", err); fb_mqtt_error(FbMqtt *mqtt, FbMqttError error, const gchar *format, ...) g_return_if_fail(FB_IS_MQTT(mqtt)); err = g_error_new_valist(FB_MQTT_ERROR, error, format, ap); g_signal_emit_by_name(mqtt, "error", err); fb_mqtt_cb_timeout(gpointer data) FbMqttPrivate *priv = mqtt->priv; fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Connection timed out")); fb_mqtt_timeout_clear(FbMqtt *mqtt) FbMqttPrivate *priv = mqtt->priv; g_source_remove(priv->tev); fb_mqtt_timeout(FbMqtt *mqtt) FbMqttPrivate *priv = mqtt->priv; fb_mqtt_timeout_clear(mqtt); priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout, mqtt); fb_mqtt_cb_ping(gpointer data) FbMqttPrivate *priv = mqtt->priv; msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0); fb_mqtt_write(mqtt, msg); fb_mqtt_ping(FbMqtt *mqtt) FbMqttPrivate *priv = mqtt->priv; fb_mqtt_timeout_clear(mqtt); priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data) GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source); ret = g_buffered_input_stream_fill_finish(input, res, &err); err = g_error_new_literal(G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED, fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header")); fb_mqtt_read_packet(mqtt); fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data) ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err); err = g_error_new_literal(G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED, fb_mqtt_take_error(mqtt, err, _("Failed to read packet data")); g_input_stream_read_async(G_INPUT_STREAM(source), priv->rbuf->len - priv->remz, priv->remz, G_PRIORITY_DEFAULT, priv->cancellable, fb_mqtt_cb_read_packet, mqtt); msg = fb_mqtt_message_new_bytes(priv->rbuf); if (G_UNLIKELY(msg == NULL)) { fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Failed to parse message")); /* Read another packet if connection wasn't reset in fb_mqtt_read() */ if (fb_mqtt_connected(mqtt, FALSE)) { fb_mqtt_read_packet(mqtt); fb_mqtt_read_packet(FbMqtt *mqtt) FbMqttPrivate *priv = mqtt->priv; buf = g_buffered_input_stream_peek_buffer(priv->input, &count); /* Start at 1 to skip the first byte */ /* Not enough data yet, try again later */ g_buffered_input_stream_fill_async(priv->input, -1, G_PRIORITY_DEFAULT, priv->cancellable, size += (byte & 127) * mult; } while ((byte & 128) != 0); g_byte_array_set_size(priv->rbuf, size); /* TODO: Use g_input_stream_read_all_async() when available. */ /* TODO: Alternately, it would be nice to let the * FbMqttMessage directly use the GBufferedInputStream * buffer instead of copying it, provided it's consumed g_input_stream_read_async(G_INPUT_STREAM(priv->input), priv->rbuf->data, priv->rbuf->len, G_PRIORITY_DEFAULT, priv->cancellable, fb_mqtt_cb_read_packet, mqtt); fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg) FbMqttMessagePrivate *mriv; g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes, "Reading %d (flags: 0x%0X)", mriv->type, mriv->flags); case FB_MQTT_MESSAGE_TYPE_CONNACK: if (!fb_mqtt_message_read_byte(msg, NULL) || !fb_mqtt_message_read_byte(msg, &chr)) if (chr != FB_MQTT_ERROR_SUCCESS) { fb_mqtt_error(mqtt, chr, _("Connection failed (%u)"), g_signal_emit_by_name(mqtt, "connect"); case FB_MQTT_MESSAGE_TYPE_PUBLISH: if (!fb_mqtt_message_read_str(msg, &str)) { if ((mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) || (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS2)) if (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) { chr = FB_MQTT_MESSAGE_TYPE_PUBACK; chr = FB_MQTT_MESSAGE_TYPE_PUBREC; if (!fb_mqtt_message_read_mid(msg, &mid)) { nsg = fb_mqtt_message_new(chr, 0); fb_mqtt_message_write_u16(nsg, mid); fb_mqtt_write(mqtt, nsg); wytes = g_byte_array_new(); fb_mqtt_message_read_r(msg, wytes); g_signal_emit_by_name(mqtt, "publish", str, wytes); g_byte_array_free(wytes, TRUE); case FB_MQTT_MESSAGE_TYPE_PUBREL: if (!fb_mqtt_message_read_mid(msg, &mid)) { nsg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP, 0); fb_mqtt_message_write_u16(nsg, mid); /* Message identifier */ fb_mqtt_write(mqtt, nsg); case FB_MQTT_MESSAGE_TYPE_PINGRESP: case FB_MQTT_MESSAGE_TYPE_PUBACK: case FB_MQTT_MESSAGE_TYPE_PUBCOMP: case FB_MQTT_MESSAGE_TYPE_SUBACK: case FB_MQTT_MESSAGE_TYPE_UNSUBACK: fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, _("Unknown packet (%u)"), mriv->type); /* Since no case returned, there was a parse error. */ fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Failed to parse message")); fb_mqtt_cb_push_bytes(GObject *source, GAsyncResult *res, gpointer data) PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(source); if (!purple_queued_output_stream_push_bytes_finish(stream, purple_queued_output_stream_clear_queue(stream); fb_mqtt_take_error(mqtt, err, _("Failed to write data")); fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg) FbMqttMessagePrivate *mriv; g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); bytes = fb_mqtt_message_bytes(msg); if (G_UNLIKELY(bytes == NULL)) { fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Failed to format data")); fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes, "Writing %d (flags: 0x%0X)", mriv->type, mriv->flags); /* TODO: Would be nice to refactor this to not require copying bytes */ gbytes = g_bytes_new(bytes->data, bytes->len); purple_queued_output_stream_push_bytes_async(priv->output, gbytes, G_PRIORITY_DEFAULT, priv->cancellable, fb_mqtt_cb_push_bytes, mqtt); fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data) conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source), fb_mqtt_take_error(mqtt, err, NULL); fb_mqtt_timeout_clear(mqtt); priv->conn = G_IO_STREAM(conn); priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new( g_io_stream_get_input_stream(priv->conn))); priv->output = purple_queued_output_stream_new( g_io_stream_get_output_stream(priv->conn)); fb_mqtt_read_packet(mqtt); g_signal_emit_by_name(mqtt, "open"); fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port) g_return_if_fail(FB_IS_MQTT(mqtt)); acc = purple_connection_get_account(priv->gc); client = purple_gio_socket_client_new(acc, &err); fb_mqtt_take_error(mqtt, err, NULL); priv->cancellable = g_cancellable_new(); g_socket_client_set_tls(client, TRUE); g_socket_client_connect_to_host_async(client, host, port, priv->cancellable, fb_mqtt_cb_open, mqtt); fb_mqtt_connect(FbMqtt *mqtt, guint8 flags, const GByteArray *pload) g_return_if_fail(!fb_mqtt_connected(mqtt, FALSE)); g_return_if_fail(pload != NULL); /* Facebook always sends a CONNACK, use QoS1 */ flags |= FB_MQTT_CONNECT_FLAG_QOS1; msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT, 0); fb_mqtt_message_write_str(msg, FB_MQTT_NAME); /* Protocol name */ fb_mqtt_message_write_byte(msg, FB_MQTT_LEVEL); /* Protocol level */ fb_mqtt_message_write_byte(msg, flags); /* Flags */ fb_mqtt_message_write_u16(msg, FB_MQTT_KA); /* Keep alive */ fb_mqtt_message_write(msg, pload->data, pload->len); fb_mqtt_write(mqtt, msg); fb_mqtt_connected(FbMqtt *mqtt, gboolean error) g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE); connected = (priv->conn != NULL) && priv->connected; if (!connected && error) { fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Not connected")); fb_mqtt_disconnect(FbMqtt *mqtt) if (G_UNLIKELY(!fb_mqtt_connected(mqtt, FALSE))) { msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT, 0); fb_mqtt_write(mqtt, msg); fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload) g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(fb_mqtt_connected(mqtt, FALSE)); /* Message identifier not required, but for consistency use QoS1 */ msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH, FB_MQTT_MESSAGE_FLAG_QOS1); fb_mqtt_message_write_str(msg, topic); /* Message topic */ fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */ fb_mqtt_message_write(msg, pload->data, pload->len); fb_mqtt_write(mqtt, msg); fb_mqtt_subscribe(FbMqtt *mqtt, const gchar *topic1, guint16 qos1, ...) g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(fb_mqtt_connected(mqtt, FALSE)); /* Facebook requires a message identifier, use QoS1 */ msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE, FB_MQTT_MESSAGE_FLAG_QOS1); fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */ fb_mqtt_message_write_str(msg, topic1); /* First topics */ fb_mqtt_message_write_byte(msg, qos1); /* First QoS value */ while ((topic = va_arg(ap, const gchar*)) != NULL) { fb_mqtt_message_write_str(msg, topic); /* Remaining topics */ fb_mqtt_message_write_byte(msg, qos); /* Remaining QoS values */ fb_mqtt_write(mqtt, msg); fb_mqtt_unsubscribe(FbMqtt *mqtt, const gchar *topic1, ...) g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(fb_mqtt_connected(mqtt, FALSE)); /* Facebook requires a message identifier, use QoS1 */ msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE, FB_MQTT_MESSAGE_FLAG_QOS1); fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */ fb_mqtt_message_write_str(msg, topic1); /* First topic */ while ((topic = va_arg(ap, const gchar*)) != NULL) { fb_mqtt_message_write_str(msg, topic); /* Remaining topics */ fb_mqtt_write(mqtt, msg); fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags) FbMqttMessagePrivate *priv; msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL); priv->bytes = g_byte_array_new(); fb_mqtt_message_new_bytes(GByteArray *bytes) FbMqttMessagePrivate *priv; g_return_val_if_fail(bytes != NULL, NULL); g_return_val_if_fail(bytes->len >= 2, NULL); msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL); priv->type = (*bytes->data & 0xF0) >> 4; priv->flags = *bytes->data & 0x0F; /* Skip the fixed header */ for (byte = priv->bytes->data + 1; (*(byte++) & 128) != 0; ); priv->offset = byte - bytes->data; priv->pos = priv->offset; fb_mqtt_message_reset(FbMqttMessage *msg) FbMqttMessagePrivate *priv; g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); g_byte_array_remove_range(priv->bytes, 0, priv->offset); fb_mqtt_message_bytes(FbMqttMessage *msg) FbMqttMessagePrivate *priv; g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL); size = priv->bytes->len - priv->offset; if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) { fb_mqtt_message_reset(msg); g_byte_array_prepend(priv->bytes, sbuf, i); byte = ((priv->type & 0x0F) << 4) | (priv->flags & 0x0F); g_byte_array_prepend(priv->bytes, &byte, sizeof byte); priv->pos = (i + 1) * (sizeof byte); fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size) FbMqttMessagePrivate *priv; g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE); if ((priv->pos + size) > priv->bytes->len) { if ((data != NULL) && (size > 0)) { memcpy(data, priv->bytes->data + priv->pos, size); fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes) FbMqttMessagePrivate *priv; g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE); size = priv->bytes->len - priv->pos; if (G_LIKELY(size > 0)) { g_byte_array_append(bytes, priv->bytes->data + priv->pos, fb_mqtt_message_read_byte(FbMqttMessage *msg, guint8 *value) return fb_mqtt_message_read(msg, value, sizeof *value); fb_mqtt_message_read_mid(FbMqttMessage *msg, guint16 *value) return fb_mqtt_message_read_u16(msg, value); fb_mqtt_message_read_u16(FbMqttMessage *msg, guint16 *value) if (!fb_mqtt_message_read(msg, value, sizeof *value)) { *value = g_ntohs(*value); fb_mqtt_message_read_str(FbMqttMessage *msg, gchar **value) if (!fb_mqtt_message_read_u16(msg, &size)) { data = g_new(guint8, size + 1); if (!fb_mqtt_message_read(msg, data, size)) { fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size) FbMqttMessagePrivate *priv; g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); g_byte_array_append(priv->bytes, data, size); fb_mqtt_message_write_byte(FbMqttMessage *msg, guint8 value) fb_mqtt_message_write(msg, &value, sizeof value); fb_mqtt_message_write_mid(FbMqttMessage *msg, guint16 *value) g_return_if_fail(value != NULL); fb_mqtt_message_write_u16(msg, ++(*value)); fb_mqtt_message_write_u16(FbMqttMessage *msg, guint16 value) fb_mqtt_message_write(msg, &value, sizeof value); fb_mqtt_message_write_str(FbMqttMessage *msg, const gchar *value) g_return_if_fail(value != NULL); fb_mqtt_message_write_u16(msg, size); fb_mqtt_message_write(msg, value, size);