--- a/libpurple/protocols/simple/simple.c Thu May 21 04:45:24 2020 +0000
+++ b/libpurple/protocols/simple/simple.c Fri May 22 01:13:41 2020 +0000
@@ -164,17 +164,6 @@
-static struct sip_connection *connection_find(struct simple_account_data *sip, int fd) {
- struct sip_connection *ret = NULL;
- GSList *entry = sip->openconns;
- if(ret->fd == fd) return ret;
static struct simple_watcher *watcher_find(struct simple_account_data *sip,
struct simple_watcher *watcher;
@@ -211,12 +200,10 @@
static struct sip_connection *
-connection_create(struct simple_account_data *sip, GSocketConnection *sockconn,
+connection_create(struct simple_account_data *sip, GSocketConnection *sockconn) struct sip_connection *ret = g_new0(struct sip_connection, 1);
ret->sockconn = sockconn;
sip->openconns = g_slist_append(sip->openconns, ret);
@@ -225,17 +212,32 @@
connection_destroy(struct sip_connection *conn)
if (conn->inputhandler) {
- purple_input_remove(conn->inputhandler);
+ g_source_remove(conn->inputhandler); g_clear_pointer(&conn->inbuf, g_free);
g_clear_object(&conn->sockconn);
+static struct sip_connection * +connection_find(struct simple_account_data *sip, GInputStream *input) + struct sip_connection *ret = NULL; + GSList *entry = sip->openconns; + if (g_io_stream_get_input_stream(G_IO_STREAM(ret->sockconn)) == input) { -connection_remove(struct simple_account_data *sip, int fd)
+connection_remove(struct simple_account_data *sip, GInputStream *input) - struct sip_connection *conn = connection_find(sip, fd);
+ struct sip_connection *conn = connection_find(sip, input); sip->openconns = g_slist_remove(sip->openconns, conn);
connection_destroy(conn);
@@ -474,41 +476,29 @@
-static void simple_canwrite_cb(gpointer data, gint source, PurpleInputCondition cond) {
- PurpleConnection *gc = data;
- struct simple_account_data *sip = purple_connection_get_protocol_data(gc);
- const gchar *output = NULL;
+simple_push_bytes_cb(GObject *sender, GAsyncResult *res, gpointer data) + PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(sender); + struct simple_account_data *sip = data; - max_write = purple_circular_buffer_get_max_read(sip->txbuf);
- purple_input_remove(sip->tx_handler);
+ result = purple_queued_output_stream_push_bytes_finish(stream, res, &error); - output = purple_circular_buffer_get_output(sip->txbuf);
- written = write(sip->fd, output, max_write);
+ purple_queued_output_stream_clear_queue(stream); - if(written < 0 && errno == EAGAIN)
- else if (written <= 0) {
- /*TODO: do we really want to disconnect on a failure to write?*/
- gchar *tmp = g_strdup_printf(_("Lost connection with server: %s"),
- purple_connection_error(gc,
- PURPLE_CONNECTION_ERROR_NETWORK_ERROR, tmp);
+ if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_prefix_error(&error, "%s", _("Lost connection with server: ")); + purple_connection_take_error(sip->gc, error);
- purple_circular_buffer_mark_read(sip->txbuf, written);
-static void simple_input_cb(gpointer data, gint source, PurpleInputCondition cond);
+static gboolean simple_input_cb(GObject *stream, gpointer data); send_later_cb(GObject *sender, GAsyncResult *res, gpointer data)
@@ -517,8 +507,8 @@
struct simple_account_data *sip;
struct sip_connection *conn;
GSocketConnection *sockconn;
sockconn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(sender),
@@ -532,23 +522,35 @@
- socket = g_socket_connection_get_socket(sockconn);
- g_assert(socket != NULL);
- fd = g_socket_get_fd(socket);
sip = purple_connection_get_protocol_data(gc);
+ sip->input = g_io_stream_get_input_stream(G_IO_STREAM(sockconn)); + sip->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(G_IO_STREAM(sockconn))); - simple_canwrite_cb(gc, sip->fd, PURPLE_INPUT_WRITE);
+ writelen = purple_circular_buffer_get_max_read(sip->txbuf); + buf = purple_circular_buffer_get_output(sip->txbuf); - /* If there is more to write now, we need to register a handler */
- if(purple_circular_buffer_get_used(sip->txbuf) > 0)
- sip->tx_handler = purple_input_add(sip->fd, PURPLE_INPUT_WRITE,
- simple_canwrite_cb, gc);
+ output = g_bytes_new(buf, writelen); + purple_queued_output_stream_push_bytes_async( + sip->output, output, G_PRIORITY_DEFAULT, sip->cancellable, + simple_push_bytes_cb, sip); - conn = connection_create(sip, sockconn, fd);
- conn->inputhandler = purple_input_add(sip->fd, PURPLE_INPUT_READ, simple_input_cb, gc);
+ purple_circular_buffer_mark_read(sip->txbuf, writelen); + conn = connection_create(sip, sockconn); + source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable); + g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL); + conn->inputhandler = g_source_attach(source, NULL); + g_source_unref(source); static void sendlater(PurpleConnection *gc, const char *buf) {
@@ -591,39 +593,18 @@
purple_debug_info("simple", "could not send packet\n");
+ if (sip->output == NULL) {
- ret = write(sip->fd, buf, writelen);
- if (ret < 0 && errno == EAGAIN)
- else if(ret <= 0) { /* XXX: When does this happen legitimately? */
- sip->tx_handler = purple_input_add(sip->fd,
- PURPLE_INPUT_WRITE, simple_canwrite_cb,
- /* XXX: is it OK to do this? You might get part of a request sent
- with part of another. */
- if(purple_circular_buffer_get_used(sip->txbuf) > 0)
- purple_circular_buffer_append(sip->txbuf, "\r\n", 2);
- purple_circular_buffer_append(sip->txbuf, buf + ret,
+ output = g_bytes_new(buf, writelen); + purple_queued_output_stream_push_bytes_async( + sip->output, output, G_PRIORITY_DEFAULT, sip->cancellable, + simple_push_bytes_cb, sip); @@ -1754,15 +1735,19 @@
-static void simple_input_cb(gpointer data, gint source, PurpleInputCondition cond)
+simple_input_cb(GObject *stream, gpointer data) + GInputStream *input = G_INPUT_STREAM(stream); PurpleConnection *gc = data;
struct simple_account_data *sip = purple_connection_get_protocol_data(gc);
- struct sip_connection *conn = connection_find(sip, source);
+ struct sip_connection *conn = connection_find(sip, input); purple_debug_error("simple", "Connection not found!\n");
+ return G_SOURCE_REMOVE; if(conn->inbuflen < conn->inbufused + SIMPLE_BUF_INC) {
@@ -1770,21 +1755,41 @@
conn->inbuf = g_realloc(conn->inbuf, conn->inbuflen);
- len = read(source, conn->inbuf + conn->inbufused, SIMPLE_BUF_INC - 1);
+ len = g_pollable_input_stream_read_nonblocking( + G_POLLABLE_INPUT_STREAM(stream), conn->inbuf + conn->inbufused, + SIMPLE_BUF_INC - 1, sip->cancellable, &error); + if (error->code == G_IO_ERROR_WOULD_BLOCK) { + return G_SOURCE_CONTINUE; + } else if (error->code != G_IO_ERROR_CANCELLED) { + /* There has been an error reading from the socket */ + purple_debug_info("simple", "simple_input_cb: read error"); + if (sip->input == input) { + g_clear_object(&sip->input); + g_clear_object(&sip->output); + connection_remove(sip, input); + return G_SOURCE_REMOVE; + } else if (len == 0) { /* The other end has closed the socket */ + purple_debug_warning("simple", "simple_input_cb: connection closed"); + if (sip->input == input) { + g_clear_object(&sip->input); + g_clear_object(&sip->output); + connection_remove(sip, input); + return G_SOURCE_REMOVE; - if(len < 0 && errno == EAGAIN)
- purple_debug_info("simple", "simple_input_cb: read error\n");
- connection_remove(sip, source);
- if(sip->fd == source) sip->fd = -1;
purple_connection_update_last_received(gc);
conn->inbuf[conn->inbufused] = '\0';
process_input(sip, conn);
+ return G_SOURCE_CONTINUE; /* Callback for new connections on incoming TCP port */
@@ -1796,19 +1801,15 @@
PurpleConnection *gc = PURPLE_CONNECTION(source_object);
struct simple_account_data *sip = purple_connection_get_protocol_data(gc);
struct sip_connection *conn;
- socket = g_socket_connection_get_socket(connection);
- g_assert(socket != NULL);
- fd = g_socket_get_fd(socket);
+ conn = connection_create(sip, g_object_ref(connection)); - _purple_network_set_common_socket_flags(fd);
- conn = connection_create(sip, g_object_ref(connection), fd);
- purple_input_add(fd, PURPLE_INPUT_READ, simple_input_cb, gc);
+ source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable); + g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL); + conn->inputhandler = g_source_attach(source, NULL); + g_source_unref(source); @@ -1818,8 +1819,7 @@
struct simple_account_data *sip;
struct sip_connection *conn;
GSocketConnection *sockconn;
sockconn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(sender),
@@ -1833,21 +1833,23 @@
- socket = g_socket_connection_get_socket(sockconn);
- g_assert(socket != NULL);
- fd = g_socket_get_fd(socket);
+ sip = purple_connection_get_protocol_data(gc); + sip->input = g_io_stream_get_input_stream(G_IO_STREAM(sockconn)); + sip->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(G_IO_STREAM(sockconn))); - sip = purple_connection_get_protocol_data(gc);
- conn = connection_create(sip, sockconn, fd);
+ conn = connection_create(sip, sockconn); sip->registertimeout = g_timeout_add_seconds(
g_random_int_range(10, 100), (GSourceFunc)subscribe_timeout, sip);
- conn->inputhandler = purple_input_add(sip->fd, PURPLE_INPUT_READ, simple_input_cb, gc);
+ source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable); + g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL); + conn->inputhandler = g_source_attach(source, NULL); + g_source_unref(source); static guint simple_ht_hash_nick(const char *nick) {
@@ -2114,8 +2116,6 @@
purple_input_remove(sip->listenpa);
- purple_input_remove(sip->tx_handler);
g_source_remove(sip->resendtimeout);
if (sip->registertimeout)
@@ -2132,6 +2132,8 @@
if (sip->listen_data != NULL)
purple_network_listen_cancel(sip->listen_data);
+ g_clear_object(&sip->input); + g_clear_object(&sip->output);