xeme/xemeinputstream.c

Thu, 13 Mar 2025 16:04:57 -0500

author
Gary Kramlich <grim@reaperworld.com>
date
Thu, 13 Mar 2025 16:04:57 -0500
changeset 63
fe3187889638
parent 57
c9dd63f9603b
permissions
-rw-r--r--

Update gi-docgen to 2025.3

Testing Done:
Called in the turtles.

Reviewed at https://reviews.imfreedom.org/r/3886/

/*
 * Copyright (C) 2023-2025 Xeme Developers
 *
 * Xeme is the legal property of its developers, whose names are too
 * numerous to list here. Please refer to the AUTHORS file distributed
 * with this source distribution
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, see
 * <https://www.gnu.org/licenses/>.
 */

#include "xemeinputstream.h"

#include "xemecore.h"
#include "xemeextensionmap.h"
#include "xemestring.h"

const gsize XEME_INPUT_STREAM_BUFFER_LEN = 4096;

enum {
	PROP_0,
	PROP_OUTPUT_STREAM,
	N_PROPERTIES,
};
static GParamSpec *properties[N_PROPERTIES] = {NULL, };

enum {
	SIG_CLOSED,
	SIG_RESTART_REQUESTED,
	N_SIGNALS,
};
static guint signals[N_SIGNALS] = {0, };

typedef struct {
	gboolean running;

	GInputStream *input;
	XemeOutputStream *output;
	GMarkupParseContext *context;

	XemeExtensionMap *features;
} XemeInputStreamPrivate;

G_DEFINE_TYPE_WITH_PRIVATE(XemeInputStream, xeme_input_stream,
                           XEME_TYPE_STREAM)

/******************************************************************************
 * Helpers
 *****************************************************************************/
static void
xeme_input_stream_read_bytes_async_cb(G_GNUC_UNUSED GObject *source,
                                      GAsyncResult *result, gpointer data)
{
	XemeInputStream *stream = data;
	XemeInputStreamPrivate *priv = NULL;
	GBytes *bytes = NULL;
	GError *error = NULL;
	const char *raw = NULL;
	gsize length = 0;

	priv = xeme_input_stream_get_instance_private(stream);

	bytes = g_input_stream_read_bytes_finish(priv->input, result, &error);
	if(bytes == NULL) {
		g_message("failed to read: %s",
		          error != NULL ? error->message : "unknown error");
		g_clear_error(&error);

		xeme_input_stream_stop(stream, NULL);

		return;
	}

	raw = g_bytes_get_data(bytes, &length);
	if(!g_markup_parse_context_parse(priv->context, raw, length, &error)) {
		g_warning("we hit an error: %s", error->message);
		length = 0;

		xeme_input_stream_stop(stream, NULL);

		return;
	}

	/* Unref the data we read. */
	g_bytes_unref(bytes);

	/* Make another asynchronous read call. */
	if(length > 0 && priv->running == TRUE) {
		GCancellable *cancellable = NULL;

		cancellable = xeme_stream_get_cancellable(XEME_STREAM(stream));

		g_input_stream_read_bytes_async(priv->input,
		                                XEME_INPUT_STREAM_BUFFER_LEN,
		                                G_PRIORITY_DEFAULT, cancellable,
		                                xeme_input_stream_read_bytes_async_cb,
		                                stream);
	}
}

/******************************************************************************
 * Parser
 *****************************************************************************/
static void
xeme_input_stream_parse_stream(XemeInputStream *input_stream,
                               G_GNUC_UNUSED const char *element_name,
                               const char **attribute_names,
                               const char **attribute_values,
                               GError **error)
{
	XemeStream *stream = XEME_STREAM(input_stream);
	gboolean ret = FALSE;
	const char *id = NULL;
	const char *to = NULL;
	const char *from = NULL;
	const char *version = NULL;
	const char *xml_lang = NULL;
	const char *xmlns = NULL;
	const char *xmlns_stream = NULL;

	ret = g_markup_collect_attributes(
		element_name, attribute_names, attribute_values, error,
		G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "id", &id,
		G_MARKUP_COLLECT_STRING, "to", &to,
		G_MARKUP_COLLECT_STRING, "from", &from,
		G_MARKUP_COLLECT_STRING, "version", &version,
		G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "xml:lang", &xml_lang,
		G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "xmlns", &xmlns,
		G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "xmlns:stream", &xmlns_stream,
	    G_MARKUP_COLLECT_INVALID);

	if(!ret) {
		return;
	}

	if(!xeme_str_is_empty(id)) {
		xeme_stream_set_id(stream, id);
	}

	xeme_stream_set_to(stream, to);
	xeme_stream_set_from(stream, from);

	if(!xeme_str_equal(version, XEME_STREAM_VERSION)) {
		g_set_error(error, XEME_DOMAIN, 0,
		            "got version='%s' but we were expecting '%s'",
		            version, XEME_STREAM_VERSION);
		return;
	}

	if(!xeme_str_is_empty(xml_lang)) {
		xeme_stream_set_language(stream, xml_lang);
	}

	if(!xeme_str_equal(xmlns, XEME_STREAM_XMLNS)) {
		g_set_error(error, XEME_DOMAIN, 0,
		            "got xmlns='%s' but was expecting '%s'",
		            xmlns, XEME_STREAM_XMLNS);
		return;
	}

	if(!xeme_str_equal(xmlns_stream, XEME_STREAM_XMLNS_STREAM)) {
		g_set_error(error, XEME_DOMAIN, 0,
		            "got xmlns:stream='%s' but was expecting '%s'",
		            xmlns_stream, XEME_STREAM_XMLNS_STREAM);
		return;
	}
}

static void
xeme_connection_parser_element_start(G_GNUC_UNUSED GMarkupParseContext *context,
                                     const char *name,
                                     const char **attribute_names,
                                     const char **attribute_values,
                                     gpointer data,
                                     GError **error)
{
	XemeInputStream *stream = data;
	XemeInputStreamPrivate *priv = NULL;

	priv = xeme_input_stream_get_instance_private(stream);

	if(xeme_str_equal(name, "stream:stream")) {
		xeme_input_stream_parse_stream(stream, name, attribute_names,
		                               attribute_values, error);
	} else if(xeme_str_equal(name, "stream:features")) {
		xeme_extension_map_parse_start(priv->features, stream, context);
	} else {
		g_warning("unexpected element start '%s'", name);
	}
}

static void
xeme_connection_parser_element_end(G_GNUC_UNUSED GMarkupParseContext *context,
                                   const char *name,
                                   gpointer data,
                                   GError **error)
{
	XemeInputStream *stream = data;

	if(xeme_str_equal(name, "stream:stream")) {
		/* The connection is being terminated. */
		xeme_input_stream_stop(stream, error);

		return;
	} else {
		g_warning("unexpected element end '%s'", name);
	}
}

static GMarkupParser parser = {
	.start_element = xeme_connection_parser_element_start,
	.end_element = xeme_connection_parser_element_end,
};

/******************************************************************************
 * GObject Implementation
 *****************************************************************************/
static void
xeme_input_stream_finalize(GObject *obj) {
	XemeInputStream *stream = XEME_INPUT_STREAM(obj);
	XemeInputStreamPrivate *priv = NULL;

	priv = xeme_input_stream_get_instance_private(stream);

	g_clear_pointer(&priv->context, g_markup_parse_context_free);
	g_clear_object(&priv->features);
	g_clear_object(&priv->input);
	g_clear_object(&priv->output);

	G_OBJECT_CLASS(xeme_input_stream_parent_class)->finalize(obj);
}

static void
xeme_input_stream_get_property(GObject *obj, guint param_id, GValue *value,
                               GParamSpec *pspec)
{
	XemeInputStream *stream = XEME_INPUT_STREAM(obj);

	switch(param_id) {
	case PROP_OUTPUT_STREAM:
		g_value_set_object(value, xeme_input_stream_get_output_stream(stream));
		break;
	default:
		G_OBJECT_WARN_INVALID_PROPERTY_ID(obj, param_id, pspec);
		break;
	}
}

static void
xeme_input_stream_init(XemeInputStream *stream) {
	XemeInputStreamPrivate *priv = NULL;

	priv = xeme_input_stream_get_instance_private(stream);
	priv->running = FALSE;

	priv->context = g_markup_parse_context_new(&parser, 0, stream, NULL);

	priv->features = xeme_extension_map_new();
}

static void
xeme_input_stream_class_init(XemeInputStreamClass *klass) {
	GObjectClass *obj_class = G_OBJECT_CLASS(klass);

	obj_class->finalize = xeme_input_stream_finalize;
	obj_class->get_property = xeme_input_stream_get_property;

	/**
	 * XemeInputStream:output-stream:
	 *
	 * The output stream that should be used for requests and responses.
	 *
	 * This is only set after [method@InputStream.start] is called.
	 *
	 * Since: 0.1
	 */
	properties[PROP_OUTPUT_STREAM] = g_param_spec_object(
		"output-stream", NULL, NULL,
		XEME_TYPE_OUTPUT_STREAM,
		G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);

	g_object_class_install_properties(obj_class, N_PROPERTIES, properties);

	/**
	 * XemeInputStream::closed:
	 *
	 * Emitted when the input stream is closed.
	 *
	 * Since: 0.1
	 */
	signals[SIG_CLOSED] = g_signal_new_class_handler(
		"closed",
		G_OBJECT_CLASS_TYPE(klass),
		G_SIGNAL_RUN_LAST,
		NULL,
		NULL,
		NULL,
		NULL,
		G_TYPE_NONE,
		0);

	/**
	 * XemeInputStream::restart-requested:
	 *
	 * Emitted when the remote side has requested that the output stream be
	 * restarted.
	 *
	 * This is typically requested during feature negotiation which happens
	 * during the initial connection.
	 *
	 * Since: 0.1
	 */
	signals[SIG_RESTART_REQUESTED] = g_signal_new_class_handler(
		"restart-requested",
		G_OBJECT_CLASS_TYPE(klass),
		G_SIGNAL_RUN_LAST,
		NULL,
		NULL,
		NULL,
		NULL,
		G_TYPE_NONE,
		0);
}

/******************************************************************************
 * Public API
 *****************************************************************************/
XemeInputStream *
xeme_input_stream_new(void) {
	return g_object_new(XEME_TYPE_INPUT_STREAM, NULL);
}

gboolean
xeme_input_stream_start(XemeInputStream *stream, GInputStream *input,
                        XemeOutputStream *output, GError **error)
{
	XemeInputStreamPrivate *priv = NULL;
	GCancellable *cancellable = NULL;

	g_return_val_if_fail(XEME_IS_INPUT_STREAM(stream), FALSE);
	g_return_val_if_fail(G_IS_INPUT_STREAM(input), FALSE);
	g_return_val_if_fail(XEME_IS_OUTPUT_STREAM(output), FALSE);
	g_return_val_if_fail(error == NULL || *error == NULL, FALSE);

	priv = xeme_input_stream_get_instance_private(stream);

	if(priv->running) {
		g_set_error_literal(error, XEME_DOMAIN, 0,
		                    "input stream has already been started");

		return FALSE;
	}

	cancellable = xeme_stream_get_cancellable(XEME_STREAM(stream));
	if(G_IS_CANCELLABLE(cancellable)) {
		g_cancellable_reset(cancellable);
	}

	/* Store the output stream. */
	if(g_set_object(&priv->output, output)) {
		g_object_notify_by_pspec(G_OBJECT(stream),
		                         properties[PROP_OUTPUT_STREAM]);
	}

	/* Setup the read handler. */
	priv->input = g_object_ref(input);
	g_input_stream_read_bytes_async(priv->input,
	                                XEME_INPUT_STREAM_BUFFER_LEN,
	                                G_PRIORITY_DEFAULT, cancellable,
	                                xeme_input_stream_read_bytes_async_cb,
	                                stream);

	priv->running = TRUE;

	return TRUE;
}

gboolean
xeme_input_stream_stop(XemeInputStream *stream, GError **error) {
	XemeInputStreamPrivate *priv = NULL;
	GCancellable *cancellable = NULL;
	gboolean ret = FALSE;

	g_return_val_if_fail(XEME_IS_INPUT_STREAM(stream), FALSE);

	priv = xeme_input_stream_get_instance_private(stream);
	if(priv->running == FALSE) {
		return TRUE;
	}

	cancellable = xeme_stream_get_cancellable(XEME_STREAM(stream));
	if(G_IS_CANCELLABLE(cancellable) && !g_cancellable_is_cancelled(cancellable)) {
		g_cancellable_cancel(cancellable);
	}

	priv->running = FALSE;

	ret = g_input_stream_close(priv->input, NULL, error);
	if(ret) {
		g_signal_emit(stream, signals[SIG_CLOSED], 0);
		g_clear_object(&priv->input);
	}

	return ret;
}

void
xeme_input_stream_restart_requested(XemeInputStream *stream) {
	g_return_if_fail(XEME_IS_INPUT_STREAM(stream));

	g_signal_emit(stream, signals[SIG_RESTART_REQUESTED], 0);
}

XemeOutputStream *
xeme_input_stream_get_output_stream(XemeInputStream *stream) {
	XemeInputStreamPrivate *priv = NULL;

	g_return_val_if_fail(XEME_IS_INPUT_STREAM(stream), NULL);

	priv = xeme_input_stream_get_instance_private(stream);

	return priv->output;
}

mercurial