[LTP] [PATCH 6/6] read_all: Migrate to the worker lib

Richard Palethorpe rpalethorpe@suse.com
Tue Sep 27 18:14:08 CEST 2022


Use the worker API added in the previous commit.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 testcases/kernel/fs/read_all/read_all.c | 588 +++++++-----------------
 1 file changed, 171 insertions(+), 417 deletions(-)

diff --git a/testcases/kernel/fs/read_all/read_all.c b/testcases/kernel/fs/read_all/read_all.c
index 266678ea7..6d3ff4594 100644
--- a/testcases/kernel/fs/read_all/read_all.c
+++ b/testcases/kernel/fs/read_all/read_all.c
@@ -27,9 +27,9 @@
  * overridden with the 'w' parameters.
  */
 #include <signal.h>
+#include <stdint.h>
 #include <sys/types.h>
 #include <sys/stat.h>
-#include <sys/wait.h>
 #include <fnmatch.h>
 #include <lapi/fnmatch.h>
 #include <stdlib.h>
@@ -40,7 +40,6 @@
 #include <unistd.h>
 #include <string.h>
 #include <limits.h>
-#include <semaphore.h>
 #include <ctype.h>
 #include <pwd.h>
 #include <grp.h>
@@ -49,28 +48,12 @@
 #include "tst_safe_clocks.h"
 #include "tst_test.h"
 #include "tst_timer.h"
+#include "tst_worker.h"
 
-#define QUEUE_SIZE 16384
 #define BUFFER_SIZE 1024
 #define MAX_PATH 4096
 #define MAX_DISPLAY 40
 
-struct queue {
-	sem_t sem;
-	int front;
-	int back;
-	char data[QUEUE_SIZE];
-	char popped[BUFFER_SIZE];
-};
-
-struct worker {
-	int i;
-	pid_t pid;
-	struct queue *q;
-	int last_seen;
-	unsigned int kill_sent:1;
-};
-
 enum dent_action {
 	DA_UNKNOWN,
 	DA_IGNORE,
@@ -78,19 +61,21 @@ enum dent_action {
 	DA_VISIT,
 };
 
+struct path_worker {
+	char cur[MAX_PATH];
+	char next[MAX_PATH];
+};
+
 static char *verbose;
 static char *quiet;
 static char *root_dir;
 static char *str_reads;
 static int reads = 1;
 static char *str_worker_count;
-static long worker_count;
 static char *str_max_workers;
 static long max_workers = 15;
-static struct worker *workers;
 static char *drop_privs;
 static char *str_worker_timeout;
-static int worker_timeout;
 static int timeout_warnings_left = 15;
 
 static char *blacklist[] = {
@@ -101,83 +86,9 @@ static char *blacklist[] = {
 	"/sys/*/cpu??*(?)/*",	/* cpu* entries with 2 or more digits */
 };
 
-static long long epoch;
-
-static int atomic_timestamp(void)
-{
-	struct timespec now;
-
-	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
-
-	return tst_timespec_to_us(now) - epoch;
-}
-
-static int queue_pop(struct queue *q)
-{
-	int i = q->front, j = 0;
-
-	sem_wait(&q->sem);
-
-	if (!q->data[i])
-		return 0;
-
-	while (q->data[i]) {
-		q->popped[j] = q->data[i];
-
-		if (++j >= BUFFER_SIZE - 1)
-			tst_brk(TBROK, "Buffer is too small for path");
-
-		 i = (i + 1) % QUEUE_SIZE;
-	}
-
-	q->popped[j] = '\0';
-	tst_atomic_store((i + 1) % QUEUE_SIZE, &q->front);
-
-	return 1;
-}
-
-static int queue_push(struct queue *q, const char *buf)
-{
-	int i = q->back, j = 0;
-	int front = tst_atomic_load(&q->front);
-
-	do {
-		q->data[i] = buf[j];
-
-		i = (i + 1) % QUEUE_SIZE;
-
-		if (i == front)
-			return 0;
-
-	} while (buf[j++]);
-
-	q->back = i;
-	sem_post(&q->sem);
-
-	return 1;
-}
-
-static struct queue *queue_init(void)
-{
-	struct queue *q = SAFE_MMAP(NULL, sizeof(*q),
-				    PROT_READ | PROT_WRITE,
-				    MAP_SHARED | MAP_ANONYMOUS,
-				    0, 0);
-
-	sem_init(&q->sem, 1, 0);
-	q->front = 0;
-	q->back = 0;
-
-	return q;
-}
-
-static void queue_destroy(struct queue *q, int is_worker)
-{
-	if (is_worker)
-		sem_destroy(&q->sem);
-
-	SAFE_MUNMAP(q, sizeof(*q));
-}
+static struct tst_workers workers;
+static struct path_worker *worker_privs;
+struct tst_evloop evloop;
 
 static void sanitize_str(char *buf, ssize_t count)
 {
@@ -208,63 +119,45 @@ static int is_blacklisted(const char *path)
 	return 0;
 }
 
-static void worker_heartbeat(const int worker)
-{
-	tst_atomic_store(atomic_timestamp(), &workers[worker].last_seen);
-}
-
-static int worker_elapsed(const int worker)
-{
-	struct worker *const w = workers + worker;
-
-	return atomic_timestamp() - tst_atomic_load(&w->last_seen);
-}
-
-static int worker_ttl(const int worker)
-{
-	return MAX(0, worker_timeout - worker_elapsed(worker));
-}
-
-static void read_test(const int worker, const char *const path)
+static void read_test(struct tst_worker *self, char *path)
 {
 	char buf[BUFFER_SIZE];
 	int fd;
 	ssize_t count;
-	const pid_t pid = workers[worker].pid;
-	int elapsed;
+	long long elapsed;
 
 	if (is_blacklisted(path))
 		return;
 
 	if (verbose)
-		tst_res(TINFO, "Worker %d: %s(%s)", pid, __func__, path);
+		tst_res(TINFO, "%s: %s(%s)", tst_worker_idstr(self), __func__, path);
 
 	fd = open(path, O_RDONLY | O_NONBLOCK);
 	if (fd < 0) {
 		if (!quiet) {
-			tst_res(TINFO | TERRNO, "Worker %d (%d): open(%s)",
-				pid, worker, path);
+			tst_res(TINFO | TERRNO, "%s: open(%s)",
+			        tst_worker_idstr(self), path);
 		}
 		return;
 	}
 
-	worker_heartbeat(worker);
+	elapsed = tst_chan_elapsed(&self->chan);
 	count = read(fd, buf, sizeof(buf) - 1);
-	elapsed = worker_elapsed(worker);
+	elapsed = tst_chan_elapsed(&self->chan) - elapsed;
 
 	if (count > 0 && verbose) {
 		sanitize_str(buf, count);
 		tst_res(TINFO,
-			"Worker %d (%d): read(%s, buf) = %zi, buf = %s, elapsed = %dus",
-			pid, worker, path, count, buf, elapsed);
+			"%s: read(%s, buf) = %zi, buf = %s, elapsed = %llus",
+			tst_worker_idstr(self), path, count, buf, elapsed);
 	} else if (!count && verbose) {
 		tst_res(TINFO,
-			"Worker %d (%d): read(%s) = EOF, elapsed = %dus",
-			pid, worker, path, elapsed);
+			"%s: read(%s) = EOF, elapsed = %llus",
+			tst_worker_idstr(self), path, elapsed);
 	} else if (count < 0 && !quiet) {
 		tst_res(TINFO | TERRNO,
-			"Worker %d (%d): read(%s), elapsed = %dus",
-			pid, worker, path, elapsed);
+			"%s: read(%s), elapsed = %llus",
+			tst_worker_idstr(self), path, elapsed);
 	}
 
 	SAFE_CLOSE(fd);
@@ -294,254 +187,213 @@ static void maybe_drop_privs(void)
 		tst_brk(TBROK | TTERRNO, "Failed to use nobody uid");
 }
 
-static int worker_run(int worker)
+static void visit_dir(struct tst_worker *self, const char *path)
 {
+	DIR *dir;
+	struct dirent *dent;
+	struct stat dent_st;
+	char dent_path[MAX_PATH];
+	enum dent_action act;
+
+	dir = opendir(path);
+	if (!dir) {
+		tst_res(TINFO | TERRNO, "opendir(%s)", path);
+		return;
+	}
+
+	while (1) {
+		errno = 0;
+		dent = readdir(dir);
+		if (!dent && errno) {
+			tst_res(TINFO | TERRNO, "readdir(%s)", path);
+			break;
+		} else if (!dent) {
+			break;
+		}
+
+		if (!strcmp(dent->d_name, ".") ||
+		    !strcmp(dent->d_name, ".."))
+			continue;
+
+		if (dent->d_type == DT_DIR)
+			act = DA_VISIT;
+		else if (dent->d_type == DT_LNK)
+			act = DA_IGNORE;
+		else if (dent->d_type == DT_UNKNOWN)
+			act = DA_UNKNOWN;
+		else
+			act = DA_READ;
+
+		snprintf(dent_path, MAX_PATH,
+			 "%s/%s", path, dent->d_name);
+
+		if (act == DA_UNKNOWN) {
+			if (lstat(dent_path, &dent_st))
+				tst_res(TINFO | TERRNO, "lstat(%s)", path);
+			else if ((dent_st.st_mode & S_IFMT) == S_IFDIR)
+				act = DA_VISIT;
+			else if ((dent_st.st_mode & S_IFMT) == S_IFLNK)
+				act = DA_IGNORE;
+			else
+				act = DA_READ;
+		}
+
+		if (act == DA_VISIT)
+			visit_dir(self, dent_path);
+		else if (act == DA_READ)
+			tst_chan_send(&self->chan, dent_path, strlen(dent_path) + 1);
+	}
+
+	if (closedir(dir))
+		tst_res(TINFO | TERRNO, "closedir(%s)", path);
+}
+
+static int dir_worker_run(struct tst_worker *self)
+{
+	visit_dir(self, root_dir);
+
+	tst_res(TINFO, "Dir Worker %d (%d): fin.", self->pid, self->i);
+	tst_chan_send(&self->chan, "", 1);
+
+	return 0;
+}
+
+static int path_worker_run(struct tst_worker *self)
+{
+	char buf[BUFFER_SIZE];
 	struct sigaction term_sa = {
 		.sa_handler = SIG_IGN,
 		.sa_flags = 0,
 	};
-	struct worker *const self = workers + worker;
-	struct queue *q = self->q;
 
 	sigaction(SIGTTIN, &term_sa, NULL);
 	maybe_drop_privs();
-	self->pid = getpid();
-
-	if (!worker_ttl(self->i)) {
-		tst_brk(TBROK,
-			"Worker timeout is too short; restarts take >%dus",
-			worker_elapsed(self->i));
-	}
 
 	while (1) {
-		worker_heartbeat(worker);
+		tst_chan_recv(&self->chan, buf, PATH_MAX);
 
-		if (!queue_pop(q))
+		if (!buf[0])
 			break;
 
-		read_test(worker, q->popped);
+		read_test(self, buf);
 	}
 
-	queue_destroy(q, 1);
 	tst_flush();
 	return 0;
 }
 
-static void spawn_workers(void)
+static void path_worker_resend(struct tst_worker *const self)
 {
-	int i;
-	struct worker *wa = workers;
-
-	memset(workers, 0, worker_count * sizeof(*workers));
+	struct path_worker *pw = self->priv;
 
-	for (i = 0; i < worker_count; i++) {
-		wa[i].i = i;
-		wa[i].q = queue_init();
-		wa[i].last_seen = atomic_timestamp();
-		wa[i].pid = SAFE_FORK();
-		if (!wa[i].pid)
-			exit(worker_run(i));
-	}
+	tst_chan_send(&self->chan, pw->next, strlen(pw->next));
 }
 
-static void restart_worker(const int worker)
+static void do_next_path(struct tst_worker *path_worker)
 {
-	struct worker *const w = workers + worker;
-	int wstatus, ret, i, q_len;
+	size_t slen = 1;
+	struct tst_worker *const dir_worker = workers.vec;
+	struct path_worker *pw = path_worker->priv;
 
-	if (!w->kill_sent) {
-		SAFE_KILL(w->pid, SIGKILL);
-		w->kill_sent = 1;
-		worker_heartbeat(worker);
-	}
-
-	ret = waitpid(w->pid, &wstatus, WNOHANG);
+	pw->next[0] = '\0';
 
-	if (!ret) {
-		if (worker_ttl(worker) > 0)
-			return;
+	if (TST_STATE_GET(&dir_worker->mach, TST_STATE_ANY) != WS_RUNNING)
+		goto send;
 
-		if (!quiet || timeout_warnings_left) {
-			tst_res(TINFO,
-				"Worker %d (%d): Timeout waiting after kill",
-				w->pid, worker);
-		}
-	} else if (ret != w->pid) {
-		tst_brk(TBROK | TERRNO, "Worker %d (%d): waitpid = %d",
-			w->pid, worker, ret);
-	}
+	tst_chan_recv(&dir_worker->chan, pw->next, BUFFER_SIZE);
 
-	w->kill_sent = 0;
+	if (!pw->next[0])
+		TST_STATE_SET(&dir_worker->mach, WS_STOPPING);
 
-	if (!w->q->popped[0]) {
-		tst_brk(TBROK,
-			"Worker %d (%d): Timed out, but doesn't appear to be reading anything",
-			w->pid, worker);
-	}
+	slen = dir_worker->chan.in.len;
+send:
+	tst_chan_send(&path_worker->chan, pw->next, slen);
 
-	if (!quiet || timeout_warnings_left) {
-		tst_res(TINFO, "Worker %d (%d): Last popped '%s'",
-			w->pid, worker, w->q->popped);
-	}
+}
 
-	/* Make sure the queue length and semaphore match. Threre is a
-	 * race in qeue_pop where the semaphore can be decremented
-	 * then the worker killed before updating q->front
-	 */
-	q_len = 0;
-	i = w->q->front;
-	while (i != w->q->back) {
-		if (!w->q->data[i])
-			q_len++;
-
-		i = (i + 1) % QUEUE_SIZE;
-	}
+static void path_worker_sent(struct tst_worker *self, char *path, size_t len)
+{
+	struct path_worker *pw = self->priv;
 
-	ret = sem_destroy(&w->q->sem);
-	if (ret == -1)
-		tst_brk(TBROK | TERRNO, "sem_destroy");
-	ret = sem_init(&w->q->sem, 1, q_len);
-	if (ret == -1)
-		tst_brk(TBROK | TERRNO, "sem_init");
+	memcpy(pw->cur, path, len);
 
-	worker_heartbeat(worker);
-	w->pid = SAFE_FORK();
+	if (!path[0])
+		return;
 
-	if (!w->pid)
-		exit(worker_run(worker));
+	do_next_path(self);
 }
 
-static void check_timeout_warnings_limit(void)
+static int check_timeout_warnings_limit(void)
 {
 	if (!quiet)
-		return;
+		return 1;
 
 	timeout_warnings_left--;
 
 	if (timeout_warnings_left)
-		return;
+		return 1;
 
 	tst_res(TINFO,
 		"Silencing timeout warnings; consider increasing LTP_RUNTIME_MUL or removing -q");
-}
-
-static int try_push_work(const int worker, const char *buf)
-{
-	int ret = 0;
-	int elapsed;
-	struct worker *const w = workers + worker;
-
-	if (w->kill_sent) {
-		restart_worker(worker);
-		return 0;
-	}
-
-	ret = queue_push(w->q, buf);
-	if (ret)
-		return 1;
-
-	elapsed = worker_elapsed(worker);
-
-	if (elapsed > worker_timeout) {
-		if (!quiet || timeout_warnings_left) {
-			tst_res(TINFO,
-				"Worker %d (%d): Stuck for %dus, restarting it",
-				w->pid, worker, elapsed);
-			check_timeout_warnings_limit();
-		}
-		restart_worker(worker);
-	}
 
 	return 0;
 }
 
-static void push_work(const int worker, const char *buf)
+static void path_worker_died(struct tst_worker *self)
 {
-	int sleep_time = 1;
+	struct path_worker *pw = self->priv;
 
-	while (!try_push_work(worker, buf)) {
-		const int ttl = worker_ttl(worker);
-
-		sleep_time = MIN(2 * sleep_time, ttl);
-		usleep(sleep_time);
+	if (pw->cur[0] == '\0') {
+		tst_brk(TBROK,
+			"%s: Died, but doesn't appear to be reading anything",
+			tst_worker_idstr(self));
 	}
-}
-
-static void stop_workers(void)
-{
-	const char stop_code[1] = { '\0' };
-	int i;
 
-	if (!workers)
-		return;
+	if (check_timeout_warnings_limit())
+		tst_res(TINFO, "%s: Died; Last sent '%s'", tst_worker_idstr(self), pw->cur);
 
-	for (i = 0; i < worker_count; i++) {
-		if (workers[i].q)
-			push_work(i, stop_code);
-	}
+	tst_worker_start(self);
+	path_worker_resend(self);
 }
 
-static void destroy_workers(void)
+static void spawn_workers(void)
 {
 	int i;
+	long wcount = workers.count;
+	struct tst_worker *wa = workers.vec;
 
-	if (!workers)
-		return;
-
-	for (i = 0; i < worker_count; i++) {
-		if (workers[i].q) {
-			queue_destroy(workers[i].q, 0);
-			workers[i].q = 0;
-		}
-	}
-}
-
-static int sched_work(const int first_worker,
-		      const char *path, int repetitions)
-{
-	int i, j;
-	int min_ttl = worker_timeout, sleep_time = 1;
-	int pushed, workers_pushed = 0;
-
-	for (i = 0, j = first_worker; i < repetitions; j++) {
-		if (j >= worker_count)
-			j = 0;
-
-		if (j == first_worker && !workers_pushed) {
-			sleep_time = MIN(2 * sleep_time, min_ttl);
-			usleep(sleep_time);
-			min_ttl = worker_timeout;
-		}
+	wa[0].name = "Dir";
+	wa[0].run = dir_worker_run;
+	wa[0].mode = TST_WORKER_SYNC;
 
-		if (j == first_worker)
-			workers_pushed = 0;
+	tst_worker_start(wa);
 
-		pushed = try_push_work(j, path);
-		i += pushed;
-		workers_pushed += pushed;
+	for (i = 1; i < wcount; i++) {
+		wa[i].name = "Path";
+		wa[i].run = path_worker_run;
+		wa[i].on_sent = path_worker_sent;
+		wa[i].on_died = path_worker_died;
+		wa[i].mode = TST_WORKER_ASYNC;
+		wa[i].priv = worker_privs + i;
 
-		if (!pushed)
-			min_ttl = MIN(min_ttl, worker_ttl(j));
+		tst_worker_start(wa + i);
+		do_next_path(wa + i);
 	}
-
-	return j;
 }
 
 static void setup(void)
 {
-	struct timespec now;
-
 	if (tst_parse_int(str_reads, &reads, 1, INT_MAX))
 		tst_brk(TBROK,
 			"Invalid reads (-r) argument: '%s'", str_reads);
 
-	if (tst_parse_long(str_max_workers, &max_workers, 1, LONG_MAX)) {
+	if (tst_parse_long(str_max_workers, &max_workers, 2, LONG_MAX)) {
 		tst_brk(TBROK,
 			"Invalid max workers (-w) argument: '%s'",
 			str_max_workers);
 	}
 
-	if (tst_parse_long(str_worker_count, &worker_count, 1, LONG_MAX)) {
+	if (tst_parse_long(str_worker_count, &workers.count, 2, LONG_MAX)) {
 		tst_brk(TBROK,
 			"Invalid worker count (-W) argument: '%s'",
 			str_worker_count);
@@ -550,139 +402,41 @@ static void setup(void)
 	if (!root_dir)
 		tst_brk(TBROK, "The directory argument (-d) is required");
 
-	if (!worker_count)
-		worker_count = MIN(MAX(tst_ncpus() - 1, 1L), max_workers);
-	workers = SAFE_MALLOC(worker_count * sizeof(*workers));
+	if (!workers.count)
+		workers.count = MIN(MAX(tst_ncpus() - 1, 2L), max_workers);
 
-	if (tst_parse_int(str_worker_timeout, &worker_timeout, 1, INT_MAX)) {
+	workers.vec = SAFE_MALLOC(workers.count * sizeof(workers.vec[0]));
+	memset(workers.vec, 0, workers.count * sizeof(workers.vec[0]));
+	worker_privs = SAFE_MALLOC(workers.count * sizeof(struct path_worker));
+
+	if (tst_parse_int(str_worker_timeout, (int *)&workers.timeout, 1, INT_MAX)) {
 		tst_brk(TBROK,
 			"Invalid worker timeout (-t) argument: '%s'",
 			str_worker_count);
 	}
 
-	if (worker_timeout) {
-		tst_res(TINFO, "Worker timeout forcibly set to %dms",
-			worker_timeout);
+	if (workers.timeout) {
+		tst_res(TINFO, "Worker timeout forcibly set to %lldms",
+			workers.timeout);
 	} else {
-		worker_timeout = 10 * tst_remaining_runtime();
-		tst_res(TINFO, "Worker timeout set to 10%% of max_runtime: %dms",
-			worker_timeout);
+		workers.timeout = 10 * tst_remaining_runtime();
+		tst_res(TINFO, "Worker timeout set to 10%% of max_runtime: %lldms",
+			workers.timeout);
 	}
-	worker_timeout *= 1000;
+	workers.timeout *= 1000;
 
-	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
-	epoch = tst_timespec_to_us(now);
-}
-
-static void reap_children(void)
-{
-	int status, bad_exit = 0;
-	pid_t pid;
-
-	for (;;) {
-		pid = wait(&status);
-
-		if (pid > 0) {
-			if (!WIFEXITED(status))
-				bad_exit = 1;
-
-			continue;
-		}
-
-		if (errno == ECHILD)
-			break;
-
-		if (errno == EINTR)
-			continue;
-
-		tst_brk(TBROK | TERRNO, "wait() failed");
-	}
-
-	if (!bad_exit)
-		return;
-
-	tst_res(TINFO,
-		"Zombie workers detected; consider increasing LTP_RUNTIME_MUL");
+	tst_workers_setup(&workers);
 }
 
 static void cleanup(void)
 {
-	stop_workers();
-	reap_children();
-	destroy_workers();
-	free(workers);
-}
-
-static void visit_dir(const char *path)
-{
-	DIR *dir;
-	struct dirent *dent;
-	struct stat dent_st;
-	char dent_path[MAX_PATH];
-	enum dent_action act;
-	int last_sched = 0;
-
-	dir = opendir(path);
-	if (!dir) {
-		tst_res(TINFO | TERRNO, "opendir(%s)", path);
-		return;
-	}
-
-	while (1) {
-		errno = 0;
-		dent = readdir(dir);
-		if (!dent && errno) {
-			tst_res(TINFO | TERRNO, "readdir(%s)", path);
-			break;
-		} else if (!dent) {
-			break;
-		}
-
-		if (!strcmp(dent->d_name, ".") ||
-		    !strcmp(dent->d_name, ".."))
-			continue;
-
-		if (dent->d_type == DT_DIR)
-			act = DA_VISIT;
-		else if (dent->d_type == DT_LNK)
-			act = DA_IGNORE;
-		else if (dent->d_type == DT_UNKNOWN)
-			act = DA_UNKNOWN;
-		else
-			act = DA_READ;
-
-		snprintf(dent_path, MAX_PATH,
-			 "%s/%s", path, dent->d_name);
-
-		if (act == DA_UNKNOWN) {
-			if (lstat(dent_path, &dent_st))
-				tst_res(TINFO | TERRNO, "lstat(%s)", path);
-			else if ((dent_st.st_mode & S_IFMT) == S_IFDIR)
-				act = DA_VISIT;
-			else if ((dent_st.st_mode & S_IFMT) == S_IFLNK)
-				act = DA_IGNORE;
-			else
-				act = DA_READ;
-		}
-
-		if (act == DA_VISIT)
-			visit_dir(dent_path);
-		else if (act == DA_READ)
-			last_sched = sched_work(last_sched, dent_path, reads);
-	}
-
-	if (closedir(dir))
-		tst_res(TINFO | TERRNO, "closedir(%s)", path);
+	tst_workers_cleanup(&workers);
 }
 
 static void run(void)
 {
 	spawn_workers();
-	visit_dir(root_dir);
-
-	stop_workers();
-	reap_children();
-	destroy_workers();
+	tst_evloop_run(&workers.evloop);
 
 	tst_res(TPASS, "Finished reading files");
 }
-- 
2.36.1



More information about the ltp mailing list