--- a/libpurple/queuedoutputstream.c Sat Jun 09 23:54:46 2018 +0000
+++ b/libpurple/queuedoutputstream.c Mon May 28 13:38:27 2018 -0500
@@ -27,6 +27,7 @@
struct _PurpleQueuedOutputStreamPrivate {
+ gboolean pending_queued; static GObjectClass *parent_class = NULL;
@@ -50,6 +51,7 @@
GAsyncResult *result, GError **error);
static void purple_queued_output_stream_start_flush_async(GTask *task);
+static void purple_queued_output_stream_start_push_bytes_async(GTask *task); purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
@@ -98,6 +100,7 @@
stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
+ stream->priv->pending_queued = FALSE; @@ -248,3 +251,141 @@
return g_task_propagate_boolean(G_TASK(result), error);
+purple_queued_output_stream_push_bytes_async_cb(GObject *source, + GAsyncResult *res, gpointer user_data) + GTask *task = G_TASK(user_data); + PurpleQueuedOutputStream *stream = g_task_get_source_object(task); + written = g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source), + bytes = g_task_get_task_data(task); + size = g_bytes_get_size(bytes); + /* Error occurred, return error */ + g_task_return_error(task, error); + } else if (size > written) { + /* Partial write, prepare to send remaining data */ + bytes = g_bytes_new_from_bytes(bytes, written, size - written); + g_task_set_task_data(task, bytes, + (GDestroyNotify)g_bytes_unref); + /* Full write, this task is finished */ + g_task_return_boolean(task, TRUE); + /* If g_task_return_* was called in this function, the callback + * may have cleared the stream. If so, there will be no remaining + * tasks to process here. + /* Any queued data left? */ + task = g_async_queue_try_pop(stream->priv->queue); + purple_queued_output_stream_start_push_bytes_async(task); + stream->priv->pending_queued = FALSE; + g_output_stream_clear_pending(G_OUTPUT_STREAM(stream)); +purple_queued_output_stream_start_push_bytes_async(GTask *task) + PurpleQueuedOutputStream *stream = g_task_get_source_object(task); + GOutputStream *base_stream; + base_stream = g_filter_output_stream_get_base_stream( + G_FILTER_OUTPUT_STREAM(stream)); + g_output_stream_write_bytes_async(base_stream, + g_task_get_task_data(task), + g_task_get_priority(task), + g_task_get_cancellable(task), + purple_queued_output_stream_push_bytes_async_cb, +purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream, + GBytes *bytes, int io_priority, GCancellable *cancellable, + GAsyncReadyCallback callback, gpointer user_data) + g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream)); + g_return_if_fail(bytes != NULL); + task = g_task_new(stream, cancellable, callback, user_data); + g_task_set_task_data(task, g_bytes_ref(bytes), + (GDestroyNotify)g_bytes_unref); + g_task_set_source_tag(task, + purple_queued_output_stream_push_bytes_async); + g_task_set_priority(task, io_priority); + set_pending = g_output_stream_set_pending( + G_OUTPUT_STREAM(stream), &error); + /* Since we're allowing queuing requests without blocking, + * it's not an error to be pending while processing queued operations. + if (!set_pending && (!g_error_matches(error, + G_IO_ERROR, G_IO_ERROR_PENDING) || + !stream->priv->pending_queued)) { + g_task_return_error(task, error); + stream->priv->pending_queued = TRUE; + /* Start processing if there were no pending operations */ + purple_queued_output_stream_start_push_bytes_async(task); + /* Otherwise queue the data */ + g_async_queue_push(stream->priv->queue, task); +purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream, + GAsyncResult *result, GError **error) + g_return_val_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream), FALSE); + g_return_val_if_fail(g_task_is_valid(result, stream), FALSE); + g_return_val_if_fail(g_async_result_is_tagged(result, + purple_queued_output_stream_push_bytes_async), FALSE); + return g_task_propagate_boolean(G_TASK(result), error); +purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream) + g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream)); + while ((task = g_async_queue_try_pop(stream->priv->queue)) != NULL) { + g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED, + "PurpleQueuedOutputStream queue cleared"); --- a/libpurple/queuedoutputstream.h Sat Jun 09 23:54:46 2018 +0000
+++ b/libpurple/queuedoutputstream.h Mon May 28 13:38:27 2018 -0500
@@ -55,10 +55,12 @@
* To create a queued output stream, use #purple_queued_output_stream_new().
- * To queue data, use #purple_queued_output_stream_push_bytes().
+ * To queue data, use #purple_queued_output_stream_push_bytes_async(). - * Once data has been queued, flush the stream with #g_output_stream_flush()
- * or #g_output_stream_flush_async().
+ * If there's a fatal stream error, it's suggested to clear the remaining + * bytes queued with #purple_queued_output_stream_clear_queue() to avoid + * excessive errors returned in + * #purple_queued_output_stream_push_bytes_async()'s async callback. typedef struct _PurpleQueuedOutputStream PurpleQueuedOutputStream;
typedef struct _PurpleQueuedOutputStreamClass PurpleQueuedOutputStreamClass;
@@ -104,6 +106,57 @@
void purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
+ * purple_queued_output_stream_push_bytes_async + * @stream: #PurpleQueuedOutputStream to push bytes to + * @bytes: Bytes to queue + * @priority: IO priority of the request + * @cancellable: (allow-none): Optional #GCancellable object, NULL to ignore + * @callback: (scope async): Callback to call when the request is finished + * @user_data: (closure): Data to pass to the callback function + * Asynchronously queues and then writes data to the output stream. + * Once the data has been written, or an error occurs, the callback + * Be careful such that if there's a fatal stream error, all remaining queued + * operations will likely return this error. Use + * #purple_queued_output_stream_clear_queue() to clear the queue on such + * an error to only report it a single time. +void purple_queued_output_stream_push_bytes_async( + PurpleQueuedOutputStream *stream, GBytes *bytes, + int io_priority, GCancellable *cancellable, + GAsyncReadyCallback callback, gpointer user_data); + * purple_queued_output_stream_push_bytes_finish + * @stream: #PurpleQueuedOutputStream bytes were pushed to + * @result: The #GAsyncResult of this operation + * @error: A GError location to store the error, or NULL to ignore + * Finishes pushing bytes asynchronously. + * Returns: %TRUE on success, %FALSE if there was an error +gboolean purple_queued_output_stream_push_bytes_finish( + PurpleQueuedOutputStream *stream, + GAsyncResult *result, GError **error); + * purple_queued_output_stream_clear_queue + * @stream: #PurpleQueuedOutputStream to clear + * Clears the queue of any pending bytes. However, any bytes that are + * in the process of being sent will finish their operation. + * This function is useful for clearing the queue in case of an IO error. + * Call this in the async callback in order to clear the queue and avoid + * having all #purple_queue_output_stream_push_bytes_async() calls on + * this queue return errors if there's a fatal stream error. +void purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream); #endif /* _PURPLE_QUEUED_OUTPUT_STREAM_H */