[LTP] [PATCH 4/6] api/channel: Add channel abstraction for message passing

Richard Palethorpe rpalethorpe@suse.com
Tue Sep 27 18:14:06 CEST 2022


Adds an API for the channel pattern and an implementation based on
pipes.

A channel supports synchronous and asynchronous modes. When a message
is sent synchronously it's guaranteed to have been sent and ack'ed by
the time send returns. Similar for receiving.

In asynchronous mode, send returns immediately and a callback must be
registered to handle the response. This requires that the channel is
registered with an event loop. Again it's similar for receiving.

Messages are binary safe strings with a specified size.

The use of pipes instead of shared memory is motivated by the idea
that the same interface can be used for remote or isolated
processes.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 include/tst_channel.h |  97 ++++++++++
 lib/tst_channel.c     | 410 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 507 insertions(+)
 create mode 100644 include/tst_channel.h
 create mode 100644 lib/tst_channel.c

diff --git a/include/tst_channel.h b/include/tst_channel.h
new file mode 100644
index 000000000..c572d213b
--- /dev/null
+++ b/include/tst_channel.h
@@ -0,0 +1,97 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#include "tst_evloop.h"
+#include "tst_state_machine.h"
+
+#ifndef TST_CHANNEL_H
+#define TST_CHANNEL_H
+
+enum tst_chan_mode {
+	CHM_SYNC,
+	CHM_ASYNC,
+};
+
+enum tst_chan_state {
+	CHS_CLOSED,
+	CHS_READY,
+	CHS_RECV,
+	CHS_SEND
+};
+
+struct tst_chan_buf {
+	char *ptr;
+	size_t len;
+	size_t off;
+};
+
+struct tst_chan;
+struct tst_chan_ops {
+	void (*const close)(struct tst_chan *self);
+
+	void (*const send)(struct tst_chan *self);
+	void (*const recv)(struct tst_chan *self);
+
+	int (*const on_epoll)(struct tst_chan *self, uint32_t events);
+};
+
+struct tst_chan {
+	const struct tst_chan_ops *ops;
+	void *priv;
+
+	enum tst_chan_mode mode;
+	struct tst_state_mach mach;
+	long long last_seen;
+
+	struct tst_epoll_event_data evdata;
+
+	struct tst_chan_buf in;
+	struct tst_chan_buf out;
+
+	void *user_priv;
+	void (*on_send)(struct tst_chan *self, char *sent, size_t len);
+	void (*on_recv)(struct tst_chan *self, char *recv, size_t len);
+};
+
+enum tst_pchan_msg_kind {
+	PCMK_ACK = 1,
+	PCMK_DATA
+};
+
+struct tst_pchan_envelope {
+	unsigned int kind;
+	unsigned int len;
+} __attribute__((packed));
+
+enum tst_pchan_state {
+	PCS_IDLE,
+	PCS_RECV_DATA,
+	PCS_SEND_ACK,
+	PCS_SEND_DATA,
+	PCS_RECV_ACK,
+};
+
+struct tst_pchan {
+	int infd;
+	int outfd;
+
+	struct tst_pchan_envelope envelope;
+	struct tst_chan_buf envelope_buf;
+
+	unsigned int out_full:1;
+
+	struct tst_state_mach mach;
+};
+
+void tst_chan_send(struct tst_chan *self, char *msg, size_t len);
+void tst_chan_recv(struct tst_chan *self, char *msg, size_t len);
+void tst_chan_seen(struct tst_chan *self);
+long long tst_chan_elapsed(struct tst_chan *self);
+
+void tst_pchan_open(struct tst_chan *self, int infd, int outfd,
+		    struct tst_evloop *const evloop);
+void tst_pchan_close(struct tst_chan *self);
+
+#endif
diff --git a/lib/tst_channel.c b/lib/tst_channel.c
new file mode 100644
index 000000000..6d96cccab
--- /dev/null
+++ b/lib/tst_channel.c
@@ -0,0 +1,410 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+#define _GNU_SOURCE
+#define TST_NO_DEFAULT_MAIN
+
+#include "tst_timer.h"
+#include "tst_safe_clocks.h"
+#include "tst_channel.h"
+
+static struct tst_state_matrix chan_state_mat = {
+	.names = {
+		[CHS_CLOSED] = "closed",
+		[CHS_READY] = "ready",
+		[CHS_RECV] = "receiving",
+		[CHS_SEND] = "sending"
+	},
+	.states = {
+		[CHS_CLOSED] = 1 << CHS_READY,
+		[CHS_READY] = 1 << CHS_CLOSED | 1 << CHS_RECV | 1 << CHS_SEND,
+		[CHS_RECV] = 1 << CHS_CLOSED | 1 << CHS_READY,
+		[CHS_SEND] = 1 << CHS_CLOSED | 1 << CHS_READY,
+	}
+};
+
+static struct tst_state_matrix pchan_state_mat = {
+	.names = {
+		[PCS_IDLE] = "idle",
+		[PCS_RECV_DATA] = "receiving data",
+		[PCS_SEND_ACK] = "sending ack",
+		[PCS_SEND_DATA]= "sending data",
+		[PCS_RECV_ACK] = "receving ack",
+	},
+	.states = {
+		[PCS_IDLE] = 1 << PCS_IDLE | 1 << PCS_RECV_DATA | 1 << PCS_SEND_DATA,
+		[PCS_RECV_DATA] = 1 << PCS_SEND_ACK | 1 << PCS_IDLE,
+		[PCS_SEND_ACK] = 1 << PCS_IDLE,
+		[PCS_SEND_DATA]= 1 << PCS_RECV_ACK | 1 << PCS_IDLE,
+		[PCS_RECV_ACK] = 1 << PCS_IDLE,
+	},
+};
+
+static size_t chan_buf_more(struct tst_chan_buf *self)
+{
+	return self->off < self->len;
+}
+
+void tst_chan_send(struct tst_chan *self, char *msg, size_t len)
+{
+	if (self->mode == CHM_SYNC)
+		goto send;
+
+	if (!self->on_send) {
+		tst_brk(TBROK, "In async mode, but on_sent cb not set");
+		return;
+	}
+
+	if (!self->evdata.on_epoll) {
+		tst_brk(TBROK, "In async mode, but not added to epoll");
+		return;
+	}
+
+send:
+	self->out.ptr = msg;
+	self->out.len = len;
+	self->out.off = 0;
+
+	TST_STATE_SET(&self->mach, CHS_SEND);
+	self->ops->send(self);
+}
+
+void tst_chan_recv(struct tst_chan *self, char *msg, size_t len)
+{
+	if (self->mode == CHM_SYNC)
+		goto recv;
+
+	if (!self->on_recv) {
+		tst_brk(TBROK, "In async mode, but on_recv cb not set");
+		return;
+	}
+
+	if (!self->evdata.on_epoll) {
+		tst_brk(TBROK, "In async mode, but not added to epoll");
+		return;
+	}
+
+recv:
+	self->in.ptr = msg;
+	self->in.len = len;
+	self->in.off = 0;
+
+	TST_STATE_SET(&self->mach, CHS_RECV);
+	self->ops->recv(self);
+}
+
+void tst_chan_seen(struct tst_chan *self)
+{
+	struct timespec now;
+
+	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
+	self->last_seen = tst_timespec_to_us(now);
+}
+
+long long tst_chan_elapsed(struct tst_chan *self)
+{
+	struct timespec now;
+
+	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
+
+	return tst_timespec_to_us(now) - self->last_seen;
+}
+
+static void pipe_chan_write(struct tst_chan *self, struct tst_chan_buf *buf)
+{
+	ssize_t ret;
+	size_t written = buf->off;
+	struct tst_pchan *priv = self->priv;
+
+	while (written < buf->len) {
+		ret = write(priv->outfd,
+			    buf->ptr + written, buf->len - written);
+
+		if (ret == -1) {
+			if (self->mode == CHM_ASYNC && errno == EAGAIN) {
+				priv->out_full = 1;
+				break;
+			}
+
+			if (errno == EINTR)
+				continue;
+
+			tst_brk(TBROK | TERRNO, "write");
+		}
+
+		written += ret;
+	}
+
+	buf->off = written;
+}
+
+static void pipe_chan_read(struct tst_chan *self, struct tst_chan_buf *buf)
+{
+	ssize_t ret;
+	size_t recved = buf->off;
+	struct tst_pchan *priv = self->priv;
+
+	while (recved < buf->len) {
+		ret = read(priv->infd,
+			   buf->ptr + recved, buf->len - recved);
+
+		if (!ret)
+			tst_brk(TBROK, "PID %d: read(%d) = EOF", getpid(), priv->infd);
+
+		if (ret == -1) {
+			if (self->mode == CHM_ASYNC && errno == EAGAIN)
+				break;
+
+			if (errno == EINTR)
+				continue;
+
+			tst_brk(TBROK | TERRNO, "read");
+		}
+
+		recved += ret;
+	}
+
+	buf->off = recved;
+}
+
+static void pipe_chan_send(struct tst_chan *self)
+{
+	struct tst_pchan *priv = self->priv;
+	const enum tst_pchan_state state =
+		TST_STATE_GET(&priv->mach, 1 << PCS_IDLE | 1 << PCS_SEND_DATA | 1 << PCS_RECV_ACK);
+
+ 	TST_STATE_EXP(&self->mach, 1 << CHS_READY | 1 << CHS_SEND);
+
+	switch (state) {
+	case PCS_IDLE:
+		priv->envelope.kind = PCMK_DATA;
+		priv->envelope.len = self->out.len;
+		priv->envelope_buf.ptr = (char *)&priv->envelope;
+		priv->envelope_buf.len = sizeof(priv->envelope);
+		priv->envelope_buf.off = 0;
+
+		TST_STATE_SET(&priv->mach, PCS_SEND_DATA);
+		break;
+	case PCS_SEND_DATA:
+		break;
+	case PCS_RECV_ACK:
+		goto ack;
+	default:
+		break;
+	}
+
+	if (priv->out_full)
+		return;
+
+	pipe_chan_write(self, &priv->envelope_buf);
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	pipe_chan_write(self, &self->out);
+	if (chan_buf_more(&self->out))
+		return;
+
+	TST_STATE_SET(&priv->mach, PCS_RECV_ACK);
+	priv->envelope_buf.off = 0;
+
+	if (self->mode == CHM_ASYNC)
+		return;
+ack:
+	pipe_chan_read(self, &priv->envelope_buf);
+
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	if (priv->envelope.kind != PCMK_ACK || priv->envelope.len) {
+		tst_brk(TBROK, "Malformed ack");
+		return;
+	}
+
+	tst_chan_seen(self);
+	TST_STATE_SET(&priv->mach, PCS_IDLE);
+	TST_STATE_SET(&self->mach, CHS_READY);
+
+	if (self->on_send)
+		self->on_send(self, self->out.ptr, self->out.len);
+}
+
+static void pipe_chan_recv(struct tst_chan *self)
+{
+	struct tst_pchan *priv = self->priv;
+	const enum tst_pchan_state state =
+		TST_STATE_GET(&priv->mach, 1 << PCS_IDLE | 1 << PCS_RECV_DATA | 1 << PCS_SEND_ACK);
+
+	TST_STATE_EXP(&self->mach, 1 << CHS_READY | 1 << CHS_RECV);
+
+	switch (state) {
+	case PCS_IDLE:
+		priv->envelope_buf.off = 0;
+
+		TST_STATE_SET(&priv->mach, PCS_RECV_DATA);
+		break;
+	case PCS_RECV_DATA:
+		break;
+	case PCS_SEND_ACK:
+		goto ack;
+	default:
+		break;
+	}
+
+	pipe_chan_read(self, &priv->envelope_buf);
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	if (priv->envelope.kind != PCMK_DATA) {
+		tst_brk(TBROK, "Expected data message, but got: %d", priv->envelope.kind);
+		return;
+	}
+
+	if (priv->envelope.len > self->in.len) {
+		tst_brk(TBROK, "Incoming message too large: %ul", priv->envelope.len);
+		return;
+	}
+
+	self->in.len = priv->envelope.len;
+	pipe_chan_read(self, &self->in);
+	if (chan_buf_more(&self->in))
+		return;
+
+	TST_STATE_SET(&priv->mach, PCS_SEND_ACK);
+	priv->envelope.kind = PCMK_ACK;
+	priv->envelope.len = 0;
+	priv->envelope_buf.off = 0;
+
+ack:
+	pipe_chan_write(self, &priv->envelope_buf);
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	TST_STATE_SET(&priv->mach, PCS_IDLE);
+	TST_STATE_SET(&self->mach, CHS_READY);
+
+	if (self->on_recv)
+		self->on_recv(self, self->in.ptr, self->in.len);
+}
+
+static int pipe_chan_on_epoll(struct tst_chan *self, uint32_t events)
+{
+	struct tst_pchan *priv = self->priv;
+	enum tst_chan_state chs = TST_STATE_GET(&self->mach, TST_STATE_ANY);
+	enum tst_pchan_state phs = TST_STATE_GET(&priv->mach, TST_STATE_ANY);
+
+	if (events | EPOLLOUT) {
+		priv->out_full = 0;
+
+		switch (chs) {
+		case CHS_RECV:
+			if (phs == PCS_RECV_DATA)
+				break;
+
+			self->ops->recv(self);
+			break;
+		case CHS_SEND:
+			if (phs == PCS_RECV_ACK)
+				break;
+
+			self->ops->send(self);
+			break;
+		case CHS_READY:
+		case CHS_CLOSED:
+			TST_STATE_EXP(&priv->mach, 1 << PCS_IDLE);
+			break;
+		}
+	}
+
+	if (events | EPOLLIN) {
+		switch (chs) {
+		case CHS_RECV:
+			if (phs == PCS_SEND_ACK)
+				break;
+
+			self->ops->recv(self);
+			break;
+		case CHS_SEND:
+			if (phs == PCS_SEND_DATA)
+				break;
+
+			self->ops->send(self);
+			break;
+		case CHS_READY:
+		case CHS_CLOSED:
+			TST_STATE_EXP(&priv->mach, 1 << PCS_IDLE);
+			break;
+		}
+	}
+
+	if (events & EPOLLERR) {
+		switch (phs) {
+		case PCS_RECV_DATA:
+		case PCS_SEND_ACK:
+		case PCS_SEND_DATA:
+			tst_brk(TBROK, "Channel closed during operation");
+		default:
+			break;
+		}
+
+		if (chs != CHS_CLOSED)
+			self->ops->close(self);
+	}
+
+	return 1;
+}
+
+static struct tst_chan_ops pipe_chan_ops = {
+	.close = tst_pchan_close,
+	.send = pipe_chan_send,
+	.recv = pipe_chan_recv,
+	.on_epoll = pipe_chan_on_epoll,
+};
+
+void tst_pchan_open(struct tst_chan *self,
+		    int infd, int outfd,
+		    struct tst_evloop *const evloop)
+{
+	struct tst_pchan *priv = self->priv;
+
+	self->mach.mat = &chan_state_mat;
+	TST_STATE_EXP(&self->mach, 1 << CHS_CLOSED);
+
+	self->ops = &pipe_chan_ops;
+
+	if (!priv)
+		tst_brk(TBROK, "Channel should have pipe_chan priv preallocated");
+
+	priv->mach.mat = &pchan_state_mat;
+	TST_STATE_EXP(&priv->mach, 1 << PCS_IDLE);
+
+	priv->out_full = 0;
+	priv->infd = infd;
+	priv->outfd = outfd;
+	priv->envelope_buf.ptr = (char *)&priv->envelope;
+	priv->envelope_buf.len = sizeof(priv->envelope);
+
+	if (!evloop)
+		goto out;
+
+	self->mode = CHM_ASYNC;
+	self->evdata.on_epoll = (tst_on_epoll_fn)self->ops->on_epoll;
+	self->evdata.self = self;
+	tst_evloop_add(evloop, &self->evdata, infd, EPOLLIN);
+	tst_evloop_add(evloop, &self->evdata, outfd, EPOLLOUT | EPOLLET);
+
+out:
+	TST_STATE_SET(&self->mach, CHS_READY);
+	tst_chan_seen(self);
+}
+
+void tst_pchan_close(struct tst_chan *self)
+{
+	struct tst_pchan *priv = self->priv;
+
+	close(priv->infd);
+	close(priv->outfd);
+
+	TST_STATE_SET(&priv->mach, PCS_IDLE);
+	TST_STATE_SET(&self->mach, CHS_CLOSED);
+}
-- 
2.36.1



More information about the ltp mailing list