システムプログラミング 演習8:データベースサーバ (2)

2009年11月12日

データベースサーバ (2)

本演習では,マルチスレッドの簡単なデータベースサーバの作成を行う.

マルチスレッドサーバ

まず,前回の演習における回答の一つを紹介する.
static char *program_name = "sp8-server";

static pthread_once_t create_thread_once = PTHREAD_ONCE_INIT;
static pthread_attr_t attr;

void
create_thread_init(void)
{
	int err;

	err = pthread_attr_init(&attr);
	if (err != 0)
		logutil_fatal("pthread_attr_init: %d", err);
	err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
	if (err != 0)
		logutil_fatal("PTHREAD_CREATE_DETACHED: %d", err);
}

int
create_detached_thread(void *(*thread_main)(void *), void *arg)
{
	pthread_t thread_id;

	pthread_once(&create_thread_once, create_thread_init);
	return (pthread_create(&thread_id, &attr, thread_main, arg));
}

int
protocol_switch(struct peer *peer)
{
	char request;
	ssize_t sz;
	int err;

	sz = read(peer_fd(peer), &request, sizeof(request));
	if (sz == 0) {
		/* eof */
		logutil_info("disconnected");
		peer_protocol_error(peer);
		return (0);
	}
	else if (sz < 0) {
		err = errno;
		logutil_error("read error: %d", err);
		peer_protocol_error(peer);
		return (err); /* finish on error */
	}
	assert(sz == sizeof(request));

	switch (request) {
	case 'P':
		err = server_put(peer);
		break;
	case 'G':
		err = server_get(peer);
		break;
	default:
		logutil_warning("unknown request: %d", request);
		err = EPROTO;
	}
	return (err);
}

void *
protocol_main(void *arg)
{
	struct peer *peer = arg;
	int err;

	/* authorize peer */
	if ((err = peer_authorize(peer)) != 0) {
		logutil_error("peer_authorize: %d", err);
		goto peer_free;
	}

	while (!peer_is_protocol_error(peer)) {
		err = protocol_switch(peer);
		/* continue until protocol error happens */
	}

 peer_free:
	peer_free(peer);
	/* this return value won't be used, because this thread is detached */
	return (NULL);
}

void
main_loop(int accepting_socket)
{
	int err, client_socket;
	struct sockaddr_in client_addr;
	socklen_t client_addr_size;
	struct peer *peer;

	/* XXX FIXME too many threads.  Consider to use a thread pool. */
	for (;;) {
		client_addr_size = sizeof(client_addr);
		client_socket = accept(accepting_socket,
		   (struct sockaddr *)&client_addr, &client_addr_size);
		if (client_socket < 0) {
			if (errno != EINTR)
				logutil_error("accept error: %d", errno);
		} else if ((err = peer_alloc(client_socket, &peer)) != 0) {
			logutil_warning("peer_alloc: %d", err);
			close(client_socket);
		} else if ((err = create_detached_thread(protocol_main, peer))
		    != 0) {
			logutil_warning("create_detached_thread: %d", err);
			peer_free(peer);
			sleep(1);
		}
	}
}

int
open_accepting_socket(int port)
{
	struct sockaddr_in self_addr;
	socklen_t self_addr_size;
	int sock, sockopt;

	memset(&self_addr, 0, sizeof(self_addr));
	self_addr.sin_family = AF_INET;
	self_addr.sin_addr.s_addr = INADDR_ANY;
	self_addr.sin_port = htons(port);
	self_addr_size = sizeof(self_addr);
	sock = socket(PF_INET, SOCK_STREAM, 0);
	if (sock < 0)
		logutil_fatal("accepting socket: %d", errno);
	sockopt = 1;
	if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
	    &sockopt, sizeof(sockopt)) == -1)
		logutil_warning("SO_REUSEADDR: %d", errno);
	if (bind(sock, (struct sockaddr *)&self_addr, self_addr_size) < 0)
		logutil_fatal("bind accepting socket: %d", errno);
	if (listen(sock, SOMAXCONN) < 0)
		logutil_fatal("listen: %d", errno);
	return (sock);
}

void *
termsigs_handler(void *p)
{
	sigset_t *termsigs = p;
	int sig;

	if (sigwait(termsigs, &sig) == -1)
		logutil_warning("termsigs_handler: %d", errno);

	logutil_info("signal %d received: terminating...", sig);
	logutil_info("bye");
	exit(0);
}

int
main(int argc, char **argv)
{
	char *port_number = NULL;
	int err, ch, sock, server_port = DEFAULT_SERVER_PORT;
	int debug_mode = 0;
	sigset_t termsigs;

	if (argc >= 1)
		program_name = basename(argv[0]);

	while ((ch = getopt(argc, argv, "dp:")) != -1) {
		switch (ch) {
		case 'd':
			debug_mode = 1;
			break;
		case 'p':
			port_number = optarg;
			break;
		case '?':
		default:
			usage();
		}
	}
	argc -= optind;
	argv += optind;

	if (port_number != NULL)
		server_port = strtol(port_number, NULL, 0);
	sock = open_accepting_socket(server_port);

	if (!debug_mode) {
		logutil_syslog_open(program_name, LOG_PID, LOG_LOCAL0);
		daemon(0, 0);
	}

	/*
	 * We don't want SIGPIPE, but want EPIPE on write(2)/close(2).
	 */
	signal(SIGPIPE, SIG_IGN);

	/*
	 * block SIGINT and SIGTERM, and create a "signal handler"
	 * thread, termsigs_handler, for them.
	 */
	sigemptyset(&termsigs);
	sigaddset(&termsigs, SIGINT);
	sigaddset(&termsigs, SIGTERM);
	pthread_sigmask(SIG_BLOCK, &termsigs, NULL);
	err = create_detached_thread(termsigs_handler, &termsigs);
	if (err != 0)
		logutil_fatal("create_thread(termsigs_handler): %d", err);

	main_loop(sock);

	/*NOTREACHED*/
	return (0); /* to shut up warning */
}

その他の参考プログラム

上記のファイルを持ってきて,
% autoreconf --install
を実行すると ./configure に必要なファイルが生成される.

% ./configure --prefix=<install_directory>
% make
% make install
--prefixで,インストールしたいディレクトリを指定する. --prefixを指定しないと /usr/local が利用される.

演習問題

[演習8-1]
上記のプログラムにおいて,string の key と value を登録する server_put() と key に対して登録されている value を返す server_get() を実装しなさい.登録にはハッシュ表を利用すること.
[演習8-2]
上記プログラムの動作を,複数の端末から telnet で同時にアクセスして, 動作を確認しなさい.
[演習8-3]
データベースサーバをボードコンピュータで動作させ,動作を確認しなさ い.

提出締切は 11/18(水) 20:00とする. 作成したプログラムだけでは無く,考察および実行結果も含めること. サブジェクトを SP8 学籍番号 とし,tatebe _at_ cs までメールすること. 本文には授業の感想を含めること.


Osamu Tatebe
Last modified: Thu Nov 5 15:44:53 JST 2009