pidgin/pidgin

Convert sametime to Gio.

2020-11-20, Elliott Sales de Andrade
6507c3c9940b
Parents e139fc54c76c
Children 406ff566ad57
Convert sametime to Gio.

Add a cancellable to sametime data.
Convert sametime to Gio.

Testing Done:
Compile only.

Reviewed at https://reviews.imfreedom.org/r/243/
--- a/libpurple/protocols/sametime/sametime.c Fri Nov 20 16:20:15 2020 -0600
+++ b/libpurple/protocols/sametime/sametime.c Fri Nov 20 16:43:26 2020 -0600
@@ -209,13 +209,12 @@
/** event id for the buddy list save callback */
guint save_event;
- /** socket fd */
- int socket;
+ GCancellable *cancellable;
+ GSocketClient *client;
+ GIOStream *stream;
+ GInputStream *input;
+ PurpleQueuedOutputStream *output;
guint inpa; /* input watcher */
- gint outpa; /* like inpa, but the other way */
-
- /** circular buffer for outgoing data */
- PurpleCircularBuffer *sock_buf;
PurpleConnection *gc;
};
@@ -303,7 +302,7 @@
/* connection functions */
-static void connect_cb(gpointer data, gint source, const gchar *error_message);
+static void connect_cb(GObject *source_object, GAsyncResult *result, gpointer data);
/* ----- session ------ */
@@ -334,34 +333,24 @@
return pd->gc;
}
-
-static void write_cb(gpointer data, gint source, PurpleInputCondition cond) {
+static void
+write_cb(GObject *source_object, GAsyncResult *result, gpointer data)
+{
+ PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(source_object);
struct mwPurpleProtocolData *pd = data;
- PurpleCircularBuffer *circ = pd->sock_buf;
- gsize avail;
- int ret;
+ GError *error = NULL;
DEBUG_INFO("write_cb\n");
- g_return_if_fail(circ != NULL);
-
- avail = purple_circular_buffer_get_max_read(circ);
- if(BUF_LONG < avail) avail = BUF_LONG;
-
- while(avail) {
- ret = write(pd->socket, purple_circular_buffer_get_output(circ), avail);
-
- if(ret <= 0)
- break;
-
- purple_circular_buffer_mark_read(circ, ret);
- avail = purple_circular_buffer_get_max_read(circ);
- if(BUF_LONG < avail) avail = BUF_LONG;
- }
-
- if(! avail) {
- purple_input_remove(pd->outpa);
- pd->outpa = 0;
+ if (!purple_queued_output_stream_push_bytes_finish(stream, result, &error)) {
+ purple_queued_output_stream_clear_queue(stream);
+
+ if (error->code != G_IO_ERROR_CANCELLED) {
+ g_prefix_error(&error, "%s", _("Lost connection with server: "));
+ purple_connection_take_error(pd->gc, error);
+ } else {
+ g_error_free(error);
+ }
}
}
@@ -369,52 +358,20 @@
static int mw_session_io_write(struct mwSession *session,
const guchar *buf, gsize len) {
struct mwPurpleProtocolData *pd;
- gssize ret = 0;
- int err = 0;
+ GBytes *output;
pd = mwSession_getClientData(session);
- /* socket was already closed. */
- if(pd->socket == 0)
+ /* the session was already closed. */
+ if (pd->stream == NULL) {
return 1;
-
- if(pd->outpa) {
- DEBUG_INFO("already pending INPUT_WRITE, buffering\n");
- purple_circular_buffer_append(pd->sock_buf, buf, len);
- return 0;
- }
-
- while(len) {
- ret = write(pd->socket, buf, (len > BUF_LEN)? BUF_LEN: len);
-
- if(ret <= 0)
- break;
-
- len -= ret;
- buf += ret;
}
- if(ret <= 0)
- err = errno;
-
- if(err == EAGAIN) {
- /* append remainder to circular buffer */
- DEBUG_INFO("EAGAIN\n");
- purple_circular_buffer_append(pd->sock_buf, buf, len);
- pd->outpa = purple_input_add(pd->socket, PURPLE_INPUT_WRITE, write_cb, pd);
-
- } else if(len > 0) {
- gchar *tmp = g_strdup_printf(_("Lost connection with server: %s"),
- g_strerror(errno));
- DEBUG_ERROR("write returned %" G_GSSIZE_FORMAT ", %" G_GSIZE_FORMAT
- " bytes left unwritten\n", ret, len);
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
- tmp);
- g_free(tmp);
-
- return -1;
- }
+ output = g_bytes_new(buf, len);
+ purple_queued_output_stream_push_bytes_async(
+ pd->output, output, G_PRIORITY_DEFAULT, pd->cancellable,
+ write_cb, pd);
+ g_bytes_unref(output);
return 0;
}
@@ -426,20 +383,20 @@
pd = mwSession_getClientData(session);
g_return_if_fail(pd != NULL);
- if(pd->outpa) {
- purple_input_remove(pd->outpa);
- pd->outpa = 0;
+ g_cancellable_cancel(pd->cancellable);
+
+ if (pd->stream) {
+ if (pd->inpa) {
+ purple_input_remove(pd->inpa);
+ pd->inpa = 0;
+ }
+
+ purple_gio_graceful_close(pd->stream, pd->input, G_OUTPUT_STREAM(pd->output));
}
- if(pd->socket) {
- close(pd->socket);
- pd->socket = 0;
- }
-
- if(pd->inpa) {
- purple_input_remove(pd->inpa);
- pd->inpa = 0;
- }
+ pd->input = NULL;
+ g_clear_object(&pd->output);
+ g_clear_object(&pd->stream);
}
@@ -1443,15 +1400,27 @@
MW_PLUGIN_DEFAULT_HOST);
if(purple_account_get_bool(account, MW_KEY_FORCE, FALSE) ||
- !host || purple_strequal(current_host, host) ||
- (purple_proxy_connect(gc, account, host, port, connect_cb, pd) == NULL)) {
-
+ !host || purple_strequal(current_host, host)) {
/* if we're configured to force logins, or if we're being
- redirected to the already configured host, or if we couldn't
- connect to the new host, we'll force the login instead */
+ redirected to the already configured host,
+ we'll force the login instead */
mwSession_forceLogin(session);
+ return;
}
+
+ pd->client = purple_gio_socket_client_new(account, NULL);
+ if (pd->client == NULL) {
+ /* if we couldn't connect to the new host, we'll force the login instead */
+
+ mwSession_forceLogin(session);
+ return;
+ }
+
+ g_socket_client_connect_to_host_async(
+ pd->client, host,
+ port, pd->cancellable,
+ connect_cb, pd);
}
@@ -1696,106 +1665,98 @@
}
-/** called from read_cb, attempts to read available data from sock and
- pass it to the session, passing back the return code from the read
- call for handling in read_cb */
-static int read_recv(struct mwSession *session, int sock) {
+/** callback triggered from GPollableInputStream, watches the stream for
+ available data to be processed by the session */
+static gboolean
+read_cb(GObject *stream, gpointer data)
+{
+ struct mwPurpleProtocolData *pd = data;
guchar buf[BUF_LEN];
- int len;
-
- len = read(sock, buf, BUF_LEN);
- if(len > 0) {
- mwSession_recv(session, buf, len);
- }
-
- return len;
-}
-
-
-/** callback triggered from purple_input_add, watches the socked for
- available data to be processed by the session */
-static void read_cb(gpointer data, gint source, PurpleInputCondition cond) {
- struct mwPurpleProtocolData *pd = data;
- int ret = 0, err = 0;
-
- g_return_if_fail(pd != NULL);
-
- ret = read_recv(pd->session, pd->socket);
+ gssize len;
+ GError *error = NULL;
+
+ g_return_val_if_fail(pd != NULL, G_SOURCE_REMOVE);
+
+ do {
+ len = g_pollable_input_stream_read_nonblocking(
+ G_POLLABLE_INPUT_STREAM(stream), buf, sizeof(buf) - 1,
+ pd->cancellable, &error);
+ if (len == 0) {
+ DEBUG_INFO("connection reset\n");
+ purple_connection_error(pd->gc,
+ PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
+ _("Server closed the connection"));
+ return G_SOURCE_REMOVE;
+ } else if (len < 0) {
+ /* read problem occurred if we're here, so we'll need to take care of
+ it and clean up internal state */
+ if (error->code == G_IO_ERROR_WOULD_BLOCK) {
+ g_error_free(error);
+ return G_SOURCE_CONTINUE;
+ } else if (error->code == G_IO_ERROR_CANCELLED) {
+ g_error_free(error);
+ } else {
+ g_prefix_error(&error, "%s",
+ _("Lost connection with server: "));
+ purple_connection_take_error(pd->gc, error);
+ }
+ pd->inpa = 0;
+ pd->input = NULL;
+ g_clear_object(&pd->output);
+ g_clear_object(&pd->stream);
+ return G_SOURCE_REMOVE;
+ }
+
+ mwSession_recv(pd->session, buf, len);
+ } while (len > 0);
/* normal operation ends here */
- if(ret > 0) return;
-
- /* fetch the global error value */
- err = errno;
-
- /* read problem occurred if we're here, so we'll need to take care of
- it and clean up internal state */
-
- if(pd->socket) {
- close(pd->socket);
- pd->socket = 0;
- }
-
- if(pd->inpa) {
- purple_input_remove(pd->inpa);
- pd->inpa = 0;
- }
-
- if(! ret) {
- DEBUG_INFO("connection reset\n");
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
- _("Server closed the connection"));
-
- } else if(ret < 0) {
- const gchar *err_str = g_strerror(err);
- char *msg = NULL;
-
- DEBUG_INFO("error in read callback: %s\n", err_str);
-
- msg = g_strdup_printf(_("Lost connection with server: %s"), err_str);
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
- msg);
- g_free(msg);
- }
-}
-
-
-/** Callback passed to purple_proxy_connect when an account is logged
- in, and if the session logging in receives a redirect message */
-static void connect_cb(gpointer data, gint source, const gchar *error_message) {
-
+ return G_SOURCE_CONTINUE;
+}
+
+
+/* Callback passed to g_socket_client_connect_to_host_async when an account is
+ * logged in, and if the session logging in receives a redirect message */
+static void
+connect_cb(GObject *source_object, GAsyncResult *result, gpointer data)
+{
+ GSocketClient *client = G_SOCKET_CLIENT(source_object);
struct mwPurpleProtocolData *pd = data;
-
- if(source < 0) {
- /* connection failed */
-
- if(pd->socket) {
+ GSocketConnection *sockconn = NULL;
+ GSource *source = NULL;
+ GError *error = NULL;
+
+ sockconn = g_socket_client_connect_to_host_finish(client, result, &error);
+ if (sockconn == NULL) {
+ if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ /* connection was cancelled */
+ g_error_free(error);
+ } else if (pd->stream) {
/* this is a redirect connect, force login on existing socket */
mwSession_forceLogin(pd->session);
} else {
/* this is a regular connect, error out */
- gchar *tmp = g_strdup_printf(_("Unable to connect: %s"),
- error_message);
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
- tmp);
- g_free(tmp);
+ purple_connection_take_error(pd->gc, error);
}
return;
}
- if(pd->socket) {
+ if (pd->stream) {
/* stop any existing login attempt */
mwSession_stop(pd->session, ERR_SUCCESS);
}
- pd->socket = source;
- pd->inpa = purple_input_add(source, PURPLE_INPUT_READ,
- read_cb, pd);
+ pd->stream = G_IO_STREAM(sockconn);
+ pd->input = g_io_stream_get_input_stream(pd->stream);
+ pd->output = purple_queued_output_stream_new(
+ g_io_stream_get_output_stream(pd->stream));
+ source = g_pollable_input_stream_create_source(
+ G_POLLABLE_INPUT_STREAM(pd->input), pd->cancellable);
+ g_source_set_callback(source, (GSourceFunc)read_cb, pd, NULL);
+ pd->inpa = g_source_attach(source, NULL);
+ g_source_unref(source);
mwSession_start(pd->session);
}
@@ -3077,6 +3038,7 @@
pd = g_new0(struct mwPurpleProtocolData, 1);
pd->gc = gc;
+ pd->cancellable = g_cancellable_new();
pd->session = mwSession_new(&mw_session_handler);
pd->srvc_aware = mw_srvc_aware_new(pd->session);
pd->srvc_conf = mw_srvc_conf_new(pd->session);
@@ -3086,7 +3048,6 @@
pd->srvc_resolve = mw_srvc_resolve_new(pd->session);
pd->srvc_store = mw_srvc_store_new(pd->session);
pd->group_list_map = g_hash_table_new(g_direct_hash, g_direct_equal);
- pd->sock_buf = purple_circular_buffer_new(0);
mwSession_addService(pd->session, MW_SERVICE(pd->srvc_aware));
mwSession_addService(pd->session, MW_SERVICE(pd->srvc_conf));
@@ -3132,8 +3093,11 @@
mwSession_free(pd->session);
+ g_clear_object(&pd->cancellable);
g_hash_table_destroy(pd->group_list_map);
- g_object_unref(G_OBJECT(pd->sock_buf));
+ pd->input = NULL;
+ g_clear_object(&pd->output);
+ g_clear_object(&pd->stream);
g_free(pd);
}
@@ -3558,6 +3522,8 @@
char *user, *pass, *host;
guint port;
+ GError *error = NULL;
+
gc = purple_account_get_connection(account);
pd = mwPurpleProtocolData_new(gc);
@@ -3626,10 +3592,16 @@
purple_connection_update_progress(gc, _("Connecting"), 1, MW_CONNECT_STEPS);
- if (purple_proxy_connect(gc, account, host, port, connect_cb, pd) == NULL) {
- purple_connection_error(gc, PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
- _("Unable to connect"));
+ pd->client = purple_gio_socket_client_new(account, &error);
+ if (pd->client == NULL) {
+ purple_connection_take_error(gc, error);
+ return;
}
+
+ g_socket_client_connect_to_host_async(
+ pd->client, host,
+ port, pd->cancellable,
+ connect_cb, pd);
}