--- a/libpurple/protocols/sametime/sametime.c Fri Nov 20 16:20:15 2020 -0600
+++ b/libpurple/protocols/sametime/sametime.c Fri Nov 20 16:43:26 2020 -0600
@@ -209,13 +209,12 @@
/** event id for the buddy list save callback */
+ GCancellable *cancellable; + PurpleQueuedOutputStream *output; guint inpa; /* input watcher */
- gint outpa; /* like inpa, but the other way */
- /** circular buffer for outgoing data */
- PurpleCircularBuffer *sock_buf;
@@ -303,7 +302,7 @@
/* connection functions */
-static void connect_cb(gpointer data, gint source, const gchar *error_message);
+static void connect_cb(GObject *source_object, GAsyncResult *result, gpointer data); /* ----- session ------ */
@@ -334,34 +333,24 @@
-static void write_cb(gpointer data, gint source, PurpleInputCondition cond) {
+write_cb(GObject *source_object, GAsyncResult *result, gpointer data) + PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(source_object); struct mwPurpleProtocolData *pd = data;
- PurpleCircularBuffer *circ = pd->sock_buf;
DEBUG_INFO("write_cb\n");
- g_return_if_fail(circ != NULL);
- avail = purple_circular_buffer_get_max_read(circ);
- if(BUF_LONG < avail) avail = BUF_LONG;
- ret = write(pd->socket, purple_circular_buffer_get_output(circ), avail);
- purple_circular_buffer_mark_read(circ, ret);
- avail = purple_circular_buffer_get_max_read(circ);
- if(BUF_LONG < avail) avail = BUF_LONG;
- purple_input_remove(pd->outpa);
+ if (!purple_queued_output_stream_push_bytes_finish(stream, result, &error)) { + purple_queued_output_stream_clear_queue(stream); + if (error->code != G_IO_ERROR_CANCELLED) { + g_prefix_error(&error, "%s", _("Lost connection with server: ")); + purple_connection_take_error(pd->gc, error); @@ -369,52 +358,20 @@
static int mw_session_io_write(struct mwSession *session,
const guchar *buf, gsize len) {
struct mwPurpleProtocolData *pd;
pd = mwSession_getClientData(session);
- /* socket was already closed. */
+ /* the session was already closed. */ + if (pd->stream == NULL) {
- DEBUG_INFO("already pending INPUT_WRITE, buffering\n");
- purple_circular_buffer_append(pd->sock_buf, buf, len);
- ret = write(pd->socket, buf, (len > BUF_LEN)? BUF_LEN: len);
- /* append remainder to circular buffer */
- DEBUG_INFO("EAGAIN\n");
- purple_circular_buffer_append(pd->sock_buf, buf, len);
- pd->outpa = purple_input_add(pd->socket, PURPLE_INPUT_WRITE, write_cb, pd);
- gchar *tmp = g_strdup_printf(_("Lost connection with server: %s"),
- DEBUG_ERROR("write returned %" G_GSSIZE_FORMAT ", %" G_GSIZE_FORMAT
- " bytes left unwritten\n", ret, len);
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
+ output = g_bytes_new(buf, len); + purple_queued_output_stream_push_bytes_async( + pd->output, output, G_PRIORITY_DEFAULT, pd->cancellable, @@ -426,20 +383,20 @@
pd = mwSession_getClientData(session);
g_return_if_fail(pd != NULL);
- purple_input_remove(pd->outpa);
+ g_cancellable_cancel(pd->cancellable); + purple_input_remove(pd->inpa); + purple_gio_graceful_close(pd->stream, pd->input, G_OUTPUT_STREAM(pd->output));
- purple_input_remove(pd->inpa);
+ g_clear_object(&pd->output); + g_clear_object(&pd->stream); @@ -1443,15 +1400,27 @@
if(purple_account_get_bool(account, MW_KEY_FORCE, FALSE) ||
- !host || purple_strequal(current_host, host) ||
- (purple_proxy_connect(gc, account, host, port, connect_cb, pd) == NULL)) {
+ !host || purple_strequal(current_host, host)) { /* if we're configured to force logins, or if we're being
- redirected to the already configured host, or if we couldn't
- connect to the new host, we'll force the login instead */
+ redirected to the already configured host, + we'll force the login instead */ mwSession_forceLogin(session);
+ pd->client = purple_gio_socket_client_new(account, NULL); + if (pd->client == NULL) { + /* if we couldn't connect to the new host, we'll force the login instead */ + mwSession_forceLogin(session); + g_socket_client_connect_to_host_async( @@ -1696,106 +1665,98 @@
-/** called from read_cb, attempts to read available data from sock and
- pass it to the session, passing back the return code from the read
- call for handling in read_cb */
-static int read_recv(struct mwSession *session, int sock) {
+/** callback triggered from GPollableInputStream, watches the stream for + available data to be processed by the session */ +read_cb(GObject *stream, gpointer data) + struct mwPurpleProtocolData *pd = data;
- len = read(sock, buf, BUF_LEN);
- mwSession_recv(session, buf, len);
-/** callback triggered from purple_input_add, watches the socked for
- available data to be processed by the session */
-static void read_cb(gpointer data, gint source, PurpleInputCondition cond) {
- struct mwPurpleProtocolData *pd = data;
- g_return_if_fail(pd != NULL);
- ret = read_recv(pd->session, pd->socket);
+ g_return_val_if_fail(pd != NULL, G_SOURCE_REMOVE); + len = g_pollable_input_stream_read_nonblocking( + G_POLLABLE_INPUT_STREAM(stream), buf, sizeof(buf) - 1, + pd->cancellable, &error); + DEBUG_INFO("connection reset\n"); + purple_connection_error(pd->gc, + PURPLE_CONNECTION_ERROR_NETWORK_ERROR, + _("Server closed the connection")); + return G_SOURCE_REMOVE; + /* read problem occurred if we're here, so we'll need to take care of + it and clean up internal state */ + if (error->code == G_IO_ERROR_WOULD_BLOCK) { + return G_SOURCE_CONTINUE; + } else if (error->code == G_IO_ERROR_CANCELLED) { + g_prefix_error(&error, "%s", + _("Lost connection with server: ")); + purple_connection_take_error(pd->gc, error); + g_clear_object(&pd->output); + g_clear_object(&pd->stream); + return G_SOURCE_REMOVE; + mwSession_recv(pd->session, buf, len); /* normal operation ends here */
- /* fetch the global error value */
- /* read problem occurred if we're here, so we'll need to take care of
- it and clean up internal state */
- purple_input_remove(pd->inpa);
- DEBUG_INFO("connection reset\n");
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
- _("Server closed the connection"));
- const gchar *err_str = g_strerror(err);
- DEBUG_INFO("error in read callback: %s\n", err_str);
- msg = g_strdup_printf(_("Lost connection with server: %s"), err_str);
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
-/** Callback passed to purple_proxy_connect when an account is logged
- in, and if the session logging in receives a redirect message */
-static void connect_cb(gpointer data, gint source, const gchar *error_message) {
+ return G_SOURCE_CONTINUE; +/* Callback passed to g_socket_client_connect_to_host_async when an account is + * logged in, and if the session logging in receives a redirect message */ +connect_cb(GObject *source_object, GAsyncResult *result, gpointer data) + GSocketClient *client = G_SOCKET_CLIENT(source_object); struct mwPurpleProtocolData *pd = data;
- /* connection failed */
+ GSocketConnection *sockconn = NULL; + GSource *source = NULL; + sockconn = g_socket_client_connect_to_host_finish(client, result, &error); + if (sockconn == NULL) { + if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + /* connection was cancelled */ + } else if (pd->stream) { /* this is a redirect connect, force login on existing socket */
mwSession_forceLogin(pd->session);
/* this is a regular connect, error out */
- gchar *tmp = g_strdup_printf(_("Unable to connect: %s"),
- purple_connection_error(pd->gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
+ purple_connection_take_error(pd->gc, error);
/* stop any existing login attempt */
mwSession_stop(pd->session, ERR_SUCCESS);
- pd->inpa = purple_input_add(source, PURPLE_INPUT_READ,
+ pd->stream = G_IO_STREAM(sockconn); + pd->input = g_io_stream_get_input_stream(pd->stream); + pd->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(pd->stream)); + source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(pd->input), pd->cancellable); + g_source_set_callback(source, (GSourceFunc)read_cb, pd, NULL); + pd->inpa = g_source_attach(source, NULL); + g_source_unref(source); mwSession_start(pd->session);
@@ -3077,6 +3038,7 @@
pd = g_new0(struct mwPurpleProtocolData, 1);
+ pd->cancellable = g_cancellable_new(); pd->session = mwSession_new(&mw_session_handler);
pd->srvc_aware = mw_srvc_aware_new(pd->session);
pd->srvc_conf = mw_srvc_conf_new(pd->session);
@@ -3086,7 +3048,6 @@
pd->srvc_resolve = mw_srvc_resolve_new(pd->session);
pd->srvc_store = mw_srvc_store_new(pd->session);
pd->group_list_map = g_hash_table_new(g_direct_hash, g_direct_equal);
- pd->sock_buf = purple_circular_buffer_new(0);
mwSession_addService(pd->session, MW_SERVICE(pd->srvc_aware));
mwSession_addService(pd->session, MW_SERVICE(pd->srvc_conf));
@@ -3132,8 +3093,11 @@
mwSession_free(pd->session);
+ g_clear_object(&pd->cancellable); g_hash_table_destroy(pd->group_list_map);
- g_object_unref(G_OBJECT(pd->sock_buf));
+ g_clear_object(&pd->output); + g_clear_object(&pd->stream); @@ -3558,6 +3522,8 @@
char *user, *pass, *host;
gc = purple_account_get_connection(account);
pd = mwPurpleProtocolData_new(gc);
@@ -3626,10 +3592,16 @@
purple_connection_update_progress(gc, _("Connecting"), 1, MW_CONNECT_STEPS);
- if (purple_proxy_connect(gc, account, host, port, connect_cb, pd) == NULL) {
- purple_connection_error(gc, PURPLE_CONNECTION_ERROR_NETWORK_ERROR,
- _("Unable to connect"));
+ pd->client = purple_gio_socket_client_new(account, &error); + if (pd->client == NULL) { + purple_connection_take_error(gc, error); + g_socket_client_connect_to_host_async(