--- a/libpurple/protocols/facebook/facebook.c Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/facebook.c Fri Sep 30 09:50:21 2016 -0500
@@ -332,8 +332,8 @@
gc = fb_data_get_connection(fata);
- if (error->domain == FB_MQTT_SSL_ERROR) {
- purple_connection_ssl_error(gc, error->code);
+ if (error->domain == G_IO_ERROR) { + purple_connection_g_error(gc, error); --- a/libpurple/protocols/facebook/mqtt.c Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/mqtt.c Fri Sep 30 09:50:21 2016 -0500
@@ -28,7 +28,8 @@
+#include "queuedoutputstream.h" @@ -37,17 +38,17 @@
- PurpleSslConnection *gsc;
+ GBufferedInputStream *input; + PurpleQueuedOutputStream *output; + GCancellable *cancellable;
struct _FbMqttMessagePrivate
@@ -65,6 +66,8 @@
G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
+static void fb_mqtt_read_packet(FbMqtt *mqtt); fb_mqtt_dispose(GObject *obj)
@@ -73,7 +76,6 @@
g_byte_array_free(priv->rbuf, TRUE);
- g_byte_array_free(priv->wbuf, TRUE);
@@ -161,7 +163,6 @@
priv->rbuf = g_byte_array_new();
- priv->wbuf = g_byte_array_new();
@@ -205,18 +206,6 @@
-fb_mqtt_ssl_error_quark(void)
- if (G_UNLIKELY(q == 0)) {
- q = g_quark_from_static_string("fb-mqtt-ssl-error-quark");
fb_mqtt_new(PurpleConnection *gc)
@@ -240,33 +229,47 @@
g_return_if_fail(FB_IS_MQTT(mqtt));
- purple_input_remove(priv->wev);
- purple_input_remove(priv->rev);
purple_timeout_remove(priv->tev);
- if (priv->gsc != NULL) {
- purple_ssl_close(priv->gsc);
+ if (priv->cancellable != NULL) { + g_cancellable_cancel(priv->cancellable); + g_clear_object(&priv->cancellable); - if (priv->wbuf->len > 0) {
- fb_util_debug_warning("Closing with unwritten data");
+ if (priv->conn != NULL) { + purple_gio_graceful_close(priv->conn, + G_INPUT_STREAM(priv->input), + G_OUTPUT_STREAM(priv->output)); + g_clear_object(&priv->input); + g_clear_object(&priv->output); + g_clear_object(&priv->conn); g_byte_array_set_size(priv->rbuf, 0);
- g_byte_array_set_size(priv->wbuf, 0);
+fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix) + if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + /* Return as cancelled means the connection is closing */ + /* Now we can check for programming errors */ + g_return_if_fail(FB_IS_MQTT(mqtt)); + g_prefix_error(&err, "%s: ", prefix); + g_signal_emit_by_name(mqtt, "error", err); @@ -344,74 +347,130 @@
-fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data) + GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source); + ret = g_buffered_input_stream_fill_finish(input, res, &err); + err = g_error_new_literal(G_IO_ERROR, + G_IO_ERROR_CONNECTION_CLOSED, + _("Connection closed")); + fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header")); + fb_mqtt_read_packet(mqtt); +fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data) - FbMqttPrivate *priv = mqtt->priv;
- /* Reset the read buffer */
- g_byte_array_set_size(priv->rbuf, 0);
+ ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err); - res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
- g_byte_array_append(priv->rbuf, &byte, sizeof byte);
- if (res != sizeof byte) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read fixed header"));
+ err = g_error_new_literal(G_IO_ERROR, + G_IO_ERROR_CONNECTION_CLOSED, + _("Connection closed"));
- res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
- g_byte_array_append(priv->rbuf, &byte, sizeof byte);
+ fb_mqtt_take_error(mqtt, err, _("Failed to read packet data")); - if (res != sizeof byte) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read packet size"));
- priv->remz += (byte & 127) * mult;
- } while ((byte & 128) != 0);
- size = MIN(priv->remz, sizeof buf);
- rize = purple_ssl_read(priv->gsc, buf, size);
+ g_input_stream_read_async(G_INPUT_STREAM(source), + priv->rbuf->len - priv->remz, priv->remz, + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_read_packet, mqtt);
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read packet data"));
+ msg = fb_mqtt_message_new_bytes(priv->rbuf); - g_byte_array_append(priv->rbuf, buf, rize);
+ if (G_UNLIKELY(msg == NULL)) { + fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, + _("Failed to parse message"));
- msg = fb_mqtt_message_new_bytes(priv->rbuf);
+ fb_mqtt_read(mqtt, msg); + /* Read another packet if connection wasn't reset in fb_mqtt_read() */ + if (fb_mqtt_connected(mqtt, FALSE)) { + fb_mqtt_read_packet(mqtt); - if (G_UNLIKELY(msg == NULL)) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to parse message"));
+fb_mqtt_read_packet(FbMqtt *mqtt) + FbMqttPrivate *priv = mqtt->priv; + const guint8 const *buf; + buf = g_buffered_input_stream_peek_buffer(priv->input, &count); + /* Start at 1 to skip the first byte */ + /* Not enough data yet, try again later */ - fb_mqtt_read(mqtt, msg);
+ size += (byte & 127) * mult; + } while ((byte & 128) != 0); + /* Add header to size */ + g_byte_array_set_size(priv->rbuf, size); + /* TODO: Use g_input_stream_read_all_async() when available. */ + /* TODO: Alternately, it would be nice to let the + * FbMqttMessage directly use the GBufferedInputStream + * buffer instead of copying it, provided it's consumed + * before the next read. + g_input_stream_read_async(G_INPUT_STREAM(priv->input), + priv->rbuf->data, priv->rbuf->len, + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_read_packet, mqtt); + g_buffered_input_stream_fill_async(priv->input, -1, + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_fill, mqtt); @@ -519,27 +578,16 @@
-fb_mqtt_cb_write(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_flush(GObject *source, GAsyncResult *res, gpointer data) - FbMqttPrivate *priv = mqtt->priv;
- wize = purple_ssl_write(priv->gsc, priv->wbuf->data, priv->wbuf->len);
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to write data"));
+ if (!g_output_stream_flush_finish(G_OUTPUT_STREAM(source), + fb_mqtt_take_error(mqtt, err, _("Failed to write data"));
- g_byte_array_remove_range(priv->wbuf, 0, wize);
- if (priv->wbuf->len < 1) {
@@ -548,6 +596,7 @@
FbMqttMessagePrivate *mriv;
g_return_if_fail(FB_IS_MQTT(mqtt));
g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
@@ -566,46 +615,46 @@
"Writing %d (flags: 0x%0X)",
mriv->type, mriv->flags);
- g_byte_array_append(priv->wbuf, bytes->data, bytes->len);
- fb_mqtt_cb_write(mqtt, priv->gsc->fd, PURPLE_INPUT_WRITE);
+ /* TODO: Would be nice to refactor this to not require copying bytes */ + gbytes = g_bytes_new(bytes->data, bytes->len); + purple_queued_output_stream_push_bytes(priv->output, gbytes);
- priv->wev = purple_input_add(priv->gsc->fd,
- fb_mqtt_cb_write, mqtt);
+ if (!g_output_stream_has_pending(G_OUTPUT_STREAM(priv->output))) { + g_output_stream_flush_async(G_OUTPUT_STREAM(priv->output), + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_flush, mqtt); -fb_mqtt_cb_open(gpointer data, PurpleSslConnection *ssl,
- PurpleInputCondition cond)
+fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data) - FbMqttPrivate *priv = mqtt->priv;
+ GSocketConnection *conn; + conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source), + fb_mqtt_take_error(mqtt, err, NULL); fb_mqtt_timeout_clear(mqtt);
- priv->rev = purple_input_add(priv->gsc->fd, PURPLE_INPUT_READ,
- fb_mqtt_cb_read, mqtt);
- g_signal_emit_by_name(mqtt, "open");
-fb_mqtt_cb_open_error(PurpleSslConnection *ssl, PurpleSslErrorType error,
- FbMqttPrivate *priv = mqtt->priv;
+ priv->conn = G_IO_STREAM(conn); + priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new( + g_io_stream_get_input_stream(priv->conn))); + priv->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(priv->conn)); - str = purple_ssl_strerror(error);
- err = g_error_new_literal(FB_MQTT_SSL_ERROR, error, str);
+ fb_mqtt_read_packet(mqtt); - /* Do not call purple_ssl_close() from the error_func */
- g_signal_emit_by_name(mqtt, "error", err);
+ g_signal_emit_by_name(mqtt, "open"); @@ -613,20 +662,29 @@
g_return_if_fail(FB_IS_MQTT(mqtt));
acc = purple_connection_get_account(priv->gc);
- priv->gsc = purple_ssl_connect(acc, host, port, fb_mqtt_cb_open,
- fb_mqtt_cb_open_error, mqtt);
+ client = purple_gio_socket_client_new(acc, &err); - if (priv->gsc == NULL) {
- fb_mqtt_cb_open_error(NULL, 0, mqtt);
+ fb_mqtt_take_error(mqtt, err, NULL); + priv->cancellable = g_cancellable_new(); + g_socket_client_set_tls(client, TRUE); + g_socket_client_connect_to_host_async(client, host, port, + priv->cancellable, fb_mqtt_cb_open, mqtt); + g_object_unref(client); @@ -662,7 +720,7 @@
g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
- connected = (priv->gsc != NULL) && priv->connected;
+ connected = (priv->conn != NULL) && priv->connected; if (!connected && error) {
fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
--- a/libpurple/protocols/facebook/mqtt.h Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/mqtt.h Fri Sep 30 09:50:21 2016 -0500
@@ -107,13 +107,6 @@
#define FB_MQTT_ERROR fb_mqtt_error_quark()
- * The #GQuark of the domain of MQTT SSL errors.
-#define FB_MQTT_SSL_ERROR fb_mqtt_ssl_error_quark()
typedef struct _FbMqtt FbMqtt;
typedef struct _FbMqttClass FbMqttClass;
typedef struct _FbMqttPrivate FbMqttPrivate;
@@ -298,16 +291,6 @@
fb_mqtt_error_quark(void);
- * fb_mqtt_ssl_error_quark:
- * Gets the #GQuark of the domain of MQTT SSL errors.
- * Returns: The #GQuark of the domain.
-fb_mqtt_ssl_error_quark(void);
* @gc: The #PurpleConnection.