pidgin/pidgin

4663f9da17aa
Merged in CMaiku/pidgin (pull request #129)

facebook: Port from sslconn to Gio
--- a/libpurple/protocols/facebook/facebook.c Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/facebook.c Fri Sep 30 09:50:21 2016 -0500
@@ -332,8 +332,8 @@
gc = fb_data_get_connection(fata);
- if (error->domain == FB_MQTT_SSL_ERROR) {
- purple_connection_ssl_error(gc, error->code);
+ if (error->domain == G_IO_ERROR) {
+ purple_connection_g_error(gc, error);
return;
}
--- a/libpurple/protocols/facebook/mqtt.c Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/mqtt.c Fri Sep 30 09:50:21 2016 -0500
@@ -28,7 +28,8 @@
#include "account.h"
#include "eventloop.h"
#include "glibcompat.h"
-#include "sslconn.h"
+#include "purple-gio.h"
+#include "queuedoutputstream.h"
#include "marshal.h"
#include "mqtt.h"
@@ -37,17 +38,17 @@
struct _FbMqttPrivate
{
PurpleConnection *gc;
- PurpleSslConnection *gsc;
+ GIOStream *conn;
+ GBufferedInputStream *input;
+ PurpleQueuedOutputStream *output;
+ GCancellable *cancellable;
gboolean connected;
guint16 mid;
GByteArray *rbuf;
- GByteArray *wbuf;
gsize remz;
gint tev;
- gint rev;
- gint wev;
};
struct _FbMqttMessagePrivate
@@ -65,6 +66,8 @@
G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
+static void fb_mqtt_read_packet(FbMqtt *mqtt);
+
static void
fb_mqtt_dispose(GObject *obj)
{
@@ -73,7 +76,6 @@
fb_mqtt_close(mqtt);
g_byte_array_free(priv->rbuf, TRUE);
- g_byte_array_free(priv->wbuf, TRUE);
}
static void
@@ -161,7 +163,6 @@
mqtt->priv = priv;
priv->rbuf = g_byte_array_new();
- priv->wbuf = g_byte_array_new();
}
static void
@@ -205,18 +206,6 @@
return q;
}
-GQuark
-fb_mqtt_ssl_error_quark(void)
-{
- static GQuark q = 0;
-
- if (G_UNLIKELY(q == 0)) {
- q = g_quark_from_static_string("fb-mqtt-ssl-error-quark");
- }
-
- return q;
-}
-
FbMqtt *
fb_mqtt_new(PurpleConnection *gc)
{
@@ -240,33 +229,47 @@
g_return_if_fail(FB_IS_MQTT(mqtt));
priv = mqtt->priv;
- if (priv->wev > 0) {
- purple_input_remove(priv->wev);
- priv->wev = 0;
- }
-
- if (priv->rev > 0) {
- purple_input_remove(priv->rev);
- priv->rev = 0;
- }
-
if (priv->tev > 0) {
purple_timeout_remove(priv->tev);
priv->tev = 0;
}
- if (priv->gsc != NULL) {
- purple_ssl_close(priv->gsc);
- priv->gsc = NULL;
+ if (priv->cancellable != NULL) {
+ g_cancellable_cancel(priv->cancellable);
+ g_clear_object(&priv->cancellable);
}
- if (priv->wbuf->len > 0) {
- fb_util_debug_warning("Closing with unwritten data");
+ if (priv->conn != NULL) {
+ purple_gio_graceful_close(priv->conn,
+ G_INPUT_STREAM(priv->input),
+ G_OUTPUT_STREAM(priv->output));
+ g_clear_object(&priv->input);
+ g_clear_object(&priv->output);
+ g_clear_object(&priv->conn);
}
priv->connected = FALSE;
g_byte_array_set_size(priv->rbuf, 0);
- g_byte_array_set_size(priv->wbuf, 0);
+}
+
+static void
+fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
+{
+ if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ /* Return as cancelled means the connection is closing */
+ g_error_free(err);
+ return;
+ }
+
+ /* Now we can check for programming errors */
+ g_return_if_fail(FB_IS_MQTT(mqtt));
+
+ if (prefix != NULL) {
+ g_prefix_error(&err, "%s: ", prefix);
+ }
+
+ g_signal_emit_by_name(mqtt, "error", err);
+ g_error_free(err);
}
void
@@ -344,74 +347,130 @@
}
static void
-fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
+{
+ GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
+ FbMqtt *mqtt = data;
+ gssize ret;
+ GError *err = NULL;
+
+ ret = g_buffered_input_stream_fill_finish(input, res, &err);
+
+ if (ret < 1) {
+ if (ret == 0) {
+ err = g_error_new_literal(G_IO_ERROR,
+ G_IO_ERROR_CONNECTION_CLOSED,
+ _("Connection closed"));
+ }
+
+ fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
+ return;
+ }
+
+ fb_mqtt_read_packet(mqtt);
+}
+
+static void
+fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
{
FbMqtt *mqtt = data;
+ FbMqttPrivate *priv;
+ gssize ret;
FbMqttMessage *msg;
- FbMqttPrivate *priv = mqtt->priv;
- gint res;
- guint mult;
- guint8 buf[1024];
- guint8 byte;
- gsize size;
- gssize rize;
+ GError *err = NULL;
- if (priv->remz < 1) {
- /* Reset the read buffer */
- g_byte_array_set_size(priv->rbuf, 0);
+ ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
- res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
- g_byte_array_append(priv->rbuf, &byte, sizeof byte);
-
- if (res != sizeof byte) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read fixed header"));
- return;
+ if (ret < 1) {
+ if (ret == 0) {
+ err = g_error_new_literal(G_IO_ERROR,
+ G_IO_ERROR_CONNECTION_CLOSED,
+ _("Connection closed"));
}
- mult = 1;
-
- do {
- res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
- g_byte_array_append(priv->rbuf, &byte, sizeof byte);
+ fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
+ return;
+ }
- if (res != sizeof byte) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read packet size"));
- return;
- }
-
- priv->remz += (byte & 127) * mult;
- mult *= 128;
- } while ((byte & 128) != 0);
- }
+ priv = mqtt->priv;
+ priv->remz -= ret;
if (priv->remz > 0) {
- size = MIN(priv->remz, sizeof buf);
- rize = purple_ssl_read(priv->gsc, buf, size);
+ g_input_stream_read_async(G_INPUT_STREAM(source),
+ priv->rbuf->data +
+ priv->rbuf->len - priv->remz, priv->remz,
+ G_PRIORITY_DEFAULT, priv->cancellable,
+ fb_mqtt_cb_read_packet, mqtt);
+ return;
+ }
- if (rize < 1) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read packet data"));
- return;
- }
+ msg = fb_mqtt_message_new_bytes(priv->rbuf);
- g_byte_array_append(priv->rbuf, buf, rize);
- priv->remz -= rize;
+ if (G_UNLIKELY(msg == NULL)) {
+ fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
+ _("Failed to parse message"));
+ return;
}
- if (priv->remz < 1) {
- msg = fb_mqtt_message_new_bytes(priv->rbuf);
- priv->remz = 0;
+ fb_mqtt_read(mqtt, msg);
+ g_object_unref(msg);
+
+ /* Read another packet if connection wasn't reset in fb_mqtt_read() */
+ if (fb_mqtt_connected(mqtt, FALSE)) {
+ fb_mqtt_read_packet(mqtt);
+ }
+}
- if (G_UNLIKELY(msg == NULL)) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to parse message"));
- return;
+static void
+fb_mqtt_read_packet(FbMqtt *mqtt)
+{
+ FbMqttPrivate *priv = mqtt->priv;
+ const guint8 const *buf;
+ gsize count = 0;
+ gsize pos;
+ guint mult = 1;
+ guint8 byte;
+ gsize size = 0;
+
+ buf = g_buffered_input_stream_peek_buffer(priv->input, &count);
+
+ /* Start at 1 to skip the first byte */
+ pos = 1;
+
+ do {
+ if (pos >= count) {
+ /* Not enough data yet, try again later */
+ size = 0;
+ break;
}
- fb_mqtt_read(mqtt, msg);
- g_object_unref(msg);
+ byte = *(buf + pos++);
+
+ size += (byte & 127) * mult;
+ mult *= 128;
+ } while ((byte & 128) != 0);
+
+ if (size > 0) {
+ /* Add header to size */
+ size += pos;
+
+ g_byte_array_set_size(priv->rbuf, size);
+ priv->remz = size;
+
+ /* TODO: Use g_input_stream_read_all_async() when available. */
+ /* TODO: Alternately, it would be nice to let the
+ * FbMqttMessage directly use the GBufferedInputStream
+ * buffer instead of copying it, provided it's consumed
+ * before the next read.
+ */
+ g_input_stream_read_async(G_INPUT_STREAM(priv->input),
+ priv->rbuf->data, priv->rbuf->len,
+ G_PRIORITY_DEFAULT, priv->cancellable,
+ fb_mqtt_cb_read_packet, mqtt);
+ } else {
+ g_buffered_input_stream_fill_async(priv->input, -1,
+ G_PRIORITY_DEFAULT, priv->cancellable,
+ fb_mqtt_cb_fill, mqtt);
}
}
@@ -519,27 +578,16 @@
}
static void
-fb_mqtt_cb_write(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_flush(GObject *source, GAsyncResult *res, gpointer data)
{
FbMqtt *mqtt = data;
- FbMqttPrivate *priv = mqtt->priv;
- gssize wize;
+ GError *err = NULL;
- wize = purple_ssl_write(priv->gsc, priv->wbuf->data, priv->wbuf->len);
-
- if (wize < 0) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to write data"));
+ if (!g_output_stream_flush_finish(G_OUTPUT_STREAM(source),
+ res, &err)) {
+ fb_mqtt_take_error(mqtt, err, _("Failed to write data"));
return;
}
-
- if (wize > 0) {
- g_byte_array_remove_range(priv->wbuf, 0, wize);
- }
-
- if (priv->wbuf->len < 1) {
- priv->wev = 0;
- }
}
void
@@ -548,6 +596,7 @@
const GByteArray *bytes;
FbMqttMessagePrivate *mriv;
FbMqttPrivate *priv;
+ GBytes *gbytes;
g_return_if_fail(FB_IS_MQTT(mqtt));
g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
@@ -566,46 +615,46 @@
"Writing %d (flags: 0x%0X)",
mriv->type, mriv->flags);
- g_byte_array_append(priv->wbuf, bytes->data, bytes->len);
- fb_mqtt_cb_write(mqtt, priv->gsc->fd, PURPLE_INPUT_WRITE);
+ /* TODO: Would be nice to refactor this to not require copying bytes */
+ gbytes = g_bytes_new(bytes->data, bytes->len);
+ purple_queued_output_stream_push_bytes(priv->output, gbytes);
+ g_bytes_unref(gbytes);
- if (priv->wev > 0) {
- priv->wev = purple_input_add(priv->gsc->fd,
- PURPLE_INPUT_WRITE,
- fb_mqtt_cb_write, mqtt);
+ if (!g_output_stream_has_pending(G_OUTPUT_STREAM(priv->output))) {
+ g_output_stream_flush_async(G_OUTPUT_STREAM(priv->output),
+ G_PRIORITY_DEFAULT, priv->cancellable,
+ fb_mqtt_cb_flush, mqtt);
}
}
static void
-fb_mqtt_cb_open(gpointer data, PurpleSslConnection *ssl,
- PurpleInputCondition cond)
+fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data)
{
FbMqtt *mqtt = data;
- FbMqttPrivate *priv = mqtt->priv;
+ FbMqttPrivate *priv;
+ GSocketConnection *conn;
+ GError *err = NULL;
+
+ conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source),
+ res, &err);
+
+ if (conn == NULL) {
+ fb_mqtt_take_error(mqtt, err, NULL);
+ return;
+ }
fb_mqtt_timeout_clear(mqtt);
- priv->rev = purple_input_add(priv->gsc->fd, PURPLE_INPUT_READ,
- fb_mqtt_cb_read, mqtt);
- g_signal_emit_by_name(mqtt, "open");
-}
-static void
-fb_mqtt_cb_open_error(PurpleSslConnection *ssl, PurpleSslErrorType error,
- gpointer data)
-{
- const gchar *str;
- FbMqtt *mqtt = data;
- FbMqttPrivate *priv = mqtt->priv;
- GError *err;
+ priv = mqtt->priv;
+ priv->conn = G_IO_STREAM(conn);
+ priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
+ g_io_stream_get_input_stream(priv->conn)));
+ priv->output = purple_queued_output_stream_new(
+ g_io_stream_get_output_stream(priv->conn));
- str = purple_ssl_strerror(error);
- err = g_error_new_literal(FB_MQTT_SSL_ERROR, error, str);
+ fb_mqtt_read_packet(mqtt);
- /* Do not call purple_ssl_close() from the error_func */
- priv->gsc = NULL;
-
- g_signal_emit_by_name(mqtt, "error", err);
- g_error_free(err);
+ g_signal_emit_by_name(mqtt, "open");
}
void
@@ -613,20 +662,29 @@
{
FbMqttPrivate *priv;
PurpleAccount *acc;
+ GSocketClient *client;
+ GError *err = NULL;
g_return_if_fail(FB_IS_MQTT(mqtt));
priv = mqtt->priv;
acc = purple_connection_get_account(priv->gc);
fb_mqtt_close(mqtt);
- priv->gsc = purple_ssl_connect(acc, host, port, fb_mqtt_cb_open,
- fb_mqtt_cb_open_error, mqtt);
+
+ client = purple_gio_socket_client_new(acc, &err);
- if (priv->gsc == NULL) {
- fb_mqtt_cb_open_error(NULL, 0, mqtt);
+ if (client == NULL) {
+ fb_mqtt_take_error(mqtt, err, NULL);
return;
}
+ priv->cancellable = g_cancellable_new();
+
+ g_socket_client_set_tls(client, TRUE);
+ g_socket_client_connect_to_host_async(client, host, port,
+ priv->cancellable, fb_mqtt_cb_open, mqtt);
+ g_object_unref(client);
+
fb_mqtt_timeout(mqtt);
}
@@ -662,7 +720,7 @@
g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
priv = mqtt->priv;
- connected = (priv->gsc != NULL) && priv->connected;
+ connected = (priv->conn != NULL) && priv->connected;
if (!connected && error) {
fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
--- a/libpurple/protocols/facebook/mqtt.h Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/mqtt.h Fri Sep 30 09:50:21 2016 -0500
@@ -107,13 +107,6 @@
*/
#define FB_MQTT_ERROR fb_mqtt_error_quark()
-/**
- * FB_MQTT_SSL_ERROR:
- *
- * The #GQuark of the domain of MQTT SSL errors.
- */
-#define FB_MQTT_SSL_ERROR fb_mqtt_ssl_error_quark()
-
typedef struct _FbMqtt FbMqtt;
typedef struct _FbMqttClass FbMqttClass;
typedef struct _FbMqttPrivate FbMqttPrivate;
@@ -298,16 +291,6 @@
fb_mqtt_error_quark(void);
/**
- * fb_mqtt_ssl_error_quark:
- *
- * Gets the #GQuark of the domain of MQTT SSL errors.
- *
- * Returns: The #GQuark of the domain.
- */
-GQuark
-fb_mqtt_ssl_error_quark(void);
-
-/**
* fb_mqtt_new:
* @gc: The #PurpleConnection.
*