qulogic/pidgin

Parents b8c2eef48c60
Children 7119cb1afdb8
queuedoutputstream: Refactor to behave better with the rest of Gio

PurpleQueuedOutputStream uses purple_queued_output_stream_push_bytes()
in order to queue bytes to be flushed. This requires consuming code to
call flush when it's not already flushing, and doesn't allow other
GOutputStream operations to behave well in combination with the
queued operations.

This patch refactors PurpleQueuedOutputStream to behave more like other
GOutputStream operations. It adds
purple_queued_output_stream_push_bytes_async(), which has the same
signature as the write_async functions. This way, if the there are
already queued bytes, it will send the new bytes when their turn
has come, and if not, it will start sending the new bytes immediately.
No need to flush. If a non-queued operation is attempted, it will
behave the same as if any other operation is pending. If a non-queued
operation is pending and a queued operation is attempted, it will
behave the same as a non-queued operation.

Now that the callback is called per async operation, if there's a
fatal error, all remaining queued operations will likely return
this same error. purple_queued_output_stream_clear_queue() was added
to handle this issue. If a fatal error occurs, call
purple_queued_output_stream_clear_queue() in
purple_queued_output_stream_push_bytes_async()'s callback in order
to clear the queue and avoid excessive errors returning.
--- a/libpurple/queuedoutputstream.c Sat Jun 09 23:54:46 2018 +0000
+++ b/libpurple/queuedoutputstream.c Mon May 28 13:38:27 2018 -0500
@@ -27,6 +27,7 @@
struct _PurpleQueuedOutputStreamPrivate {
GAsyncQueue *queue;
GBytes *next;
+ gboolean pending_queued;
};
static GObjectClass *parent_class = NULL;
@@ -50,6 +51,7 @@
GAsyncResult *result, GError **error);
static void purple_queued_output_stream_start_flush_async(GTask *task);
+static void purple_queued_output_stream_start_push_bytes_async(GTask *task);
static void
purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
@@ -98,6 +100,7 @@
stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
stream->priv->queue =
g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
+ stream->priv->pending_queued = FALSE;
}
static void
@@ -248,3 +251,141 @@
return g_task_propagate_boolean(G_TASK(result), error);
}
+static void
+purple_queued_output_stream_push_bytes_async_cb(GObject *source,
+ GAsyncResult *res, gpointer user_data)
+{
+ GTask *task = G_TASK(user_data);
+ PurpleQueuedOutputStream *stream = g_task_get_source_object(task);
+ gssize written;
+ GBytes *bytes;
+ gsize size;
+ GError *error = NULL;
+
+ written = g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source),
+ res, &error);
+
+ bytes = g_task_get_task_data(task);
+ size = g_bytes_get_size(bytes);
+
+ if (written < 0) {
+ /* Error occurred, return error */
+ g_task_return_error(task, error);
+ g_clear_object(&task);
+ } else if (size > written) {
+ /* Partial write, prepare to send remaining data */
+ bytes = g_bytes_new_from_bytes(bytes, written, size - written);
+ g_task_set_task_data(task, bytes,
+ (GDestroyNotify)g_bytes_unref);
+ } else {
+ /* Full write, this task is finished */
+ g_task_return_boolean(task, TRUE);
+ g_clear_object(&task);
+ }
+
+ /* If g_task_return_* was called in this function, the callback
+ * may have cleared the stream. If so, there will be no remaining
+ * tasks to process here.
+ */
+
+ if (task == NULL) {
+ /* Any queued data left? */
+ task = g_async_queue_try_pop(stream->priv->queue);
+ }
+
+ if (task != NULL) {
+ /* More to process */
+ purple_queued_output_stream_start_push_bytes_async(task);
+ } else {
+ /* All done */
+ stream->priv->pending_queued = FALSE;
+ g_output_stream_clear_pending(G_OUTPUT_STREAM(stream));
+ }
+}
+
+static void
+purple_queued_output_stream_start_push_bytes_async(GTask *task)
+{
+ PurpleQueuedOutputStream *stream = g_task_get_source_object(task);
+ GOutputStream *base_stream;
+
+ base_stream = g_filter_output_stream_get_base_stream(
+ G_FILTER_OUTPUT_STREAM(stream));
+
+ g_output_stream_write_bytes_async(base_stream,
+ g_task_get_task_data(task),
+ g_task_get_priority(task),
+ g_task_get_cancellable(task),
+ purple_queued_output_stream_push_bytes_async_cb,
+ task);
+}
+
+void
+purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream,
+ GBytes *bytes, int io_priority, GCancellable *cancellable,
+ GAsyncReadyCallback callback, gpointer user_data)
+{
+ GTask *task;
+ gboolean set_pending;
+ GError *error = NULL;
+
+ g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream));
+ g_return_if_fail(bytes != NULL);
+
+ task = g_task_new(stream, cancellable, callback, user_data);
+ g_task_set_task_data(task, g_bytes_ref(bytes),
+ (GDestroyNotify)g_bytes_unref);
+ g_task_set_source_tag(task,
+ purple_queued_output_stream_push_bytes_async);
+ g_task_set_priority(task, io_priority);
+
+ set_pending = g_output_stream_set_pending(
+ G_OUTPUT_STREAM(stream), &error);
+
+ /* Since we're allowing queuing requests without blocking,
+ * it's not an error to be pending while processing queued operations.
+ */
+ if (!set_pending && (!g_error_matches(error,
+ G_IO_ERROR, G_IO_ERROR_PENDING) ||
+ !stream->priv->pending_queued)) {
+ g_task_return_error(task, error);
+ g_object_unref(task);
+ return;
+ }
+
+ stream->priv->pending_queued = TRUE;
+
+ if (set_pending) {
+ /* Start processing if there were no pending operations */
+ purple_queued_output_stream_start_push_bytes_async(task);
+ } else {
+ /* Otherwise queue the data */
+ g_async_queue_push(stream->priv->queue, task);
+ }
+}
+
+gboolean
+purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream,
+ GAsyncResult *result, GError **error)
+{
+ g_return_val_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream), FALSE);
+ g_return_val_if_fail(g_task_is_valid(result, stream), FALSE);
+ g_return_val_if_fail(g_async_result_is_tagged(result,
+ purple_queued_output_stream_push_bytes_async), FALSE);
+
+ return g_task_propagate_boolean(G_TASK(result), error);
+}
+
+void
+purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream)
+{
+ GTask *task;
+
+ g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream));
+
+ while ((task = g_async_queue_try_pop(stream->priv->queue)) != NULL) {
+ g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED,
+ "PurpleQueuedOutputStream queue cleared");
+ g_object_unref(task);
+ }
+}
--- a/libpurple/queuedoutputstream.h Sat Jun 09 23:54:46 2018 +0000
+++ b/libpurple/queuedoutputstream.h Mon May 28 13:38:27 2018 -0500
@@ -55,10 +55,12 @@
*
* To create a queued output stream, use #purple_queued_output_stream_new().
*
- * To queue data, use #purple_queued_output_stream_push_bytes().
+ * To queue data, use #purple_queued_output_stream_push_bytes_async().
*
- * Once data has been queued, flush the stream with #g_output_stream_flush()
- * or #g_output_stream_flush_async().
+ * If there's a fatal stream error, it's suggested to clear the remaining
+ * bytes queued with #purple_queued_output_stream_clear_queue() to avoid
+ * excessive errors returned in
+ * #purple_queued_output_stream_push_bytes_async()'s async callback.
*/
typedef struct _PurpleQueuedOutputStream PurpleQueuedOutputStream;
typedef struct _PurpleQueuedOutputStreamClass PurpleQueuedOutputStreamClass;
@@ -104,6 +106,57 @@
void purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
GBytes *bytes);
+/*
+ * purple_queued_output_stream_push_bytes_async
+ * @stream: #PurpleQueuedOutputStream to push bytes to
+ * @bytes: Bytes to queue
+ * @priority: IO priority of the request
+ * @cancellable: (allow-none): Optional #GCancellable object, NULL to ignore
+ * @callback: (scope async): Callback to call when the request is finished
+ * @user_data: (closure): Data to pass to the callback function
+ *
+ * Asynchronously queues and then writes data to the output stream.
+ * Once the data has been written, or an error occurs, the callback
+ * will be called.
+ *
+ * Be careful such that if there's a fatal stream error, all remaining queued
+ * operations will likely return this error. Use
+ * #purple_queued_output_stream_clear_queue() to clear the queue on such
+ * an error to only report it a single time.
+ */
+void purple_queued_output_stream_push_bytes_async(
+ PurpleQueuedOutputStream *stream, GBytes *bytes,
+ int io_priority, GCancellable *cancellable,
+ GAsyncReadyCallback callback, gpointer user_data);
+
+/*
+ * purple_queued_output_stream_push_bytes_finish
+ * @stream: #PurpleQueuedOutputStream bytes were pushed to
+ * @result: The #GAsyncResult of this operation
+ * @error: A GError location to store the error, or NULL to ignore
+ *
+ * Finishes pushing bytes asynchronously.
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ */
+gboolean purple_queued_output_stream_push_bytes_finish(
+ PurpleQueuedOutputStream *stream,
+ GAsyncResult *result, GError **error);
+
+/*
+ * purple_queued_output_stream_clear_queue
+ * @stream: #PurpleQueuedOutputStream to clear
+ *
+ * Clears the queue of any pending bytes. However, any bytes that are
+ * in the process of being sent will finish their operation.
+ *
+ * This function is useful for clearing the queue in case of an IO error.
+ * Call this in the async callback in order to clear the queue and avoid
+ * having all #purple_queue_output_stream_push_bytes_async() calls on
+ * this queue return errors if there's a fatal stream error.
+ */
+void purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream);
+
G_END_DECLS
#endif /* _PURPLE_QUEUED_OUTPUT_STREAM_H */