From f9611f635cb23a13708c1046b7a39d003bc9a7ad Mon Sep 17 00:00:00 2001 From: yoshinoriterazawa Date: Mon, 15 Jul 2024 22:41:34 +0900 Subject: [PATCH 1/3] =?UTF-8?q?RTI=E3=81=AE=E5=A4=89=E6=9B=B4=E3=81=A8?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E3=82=A2=E3=83=97=E3=83=AA=E3=81=AE=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- QM_ver3.c | 381 ++++++++++++++++++++++++++++++++++++++ core/federated/RTI/main.c | 46 +++++ 2 files changed, 427 insertions(+) create mode 100644 QM_ver3.c diff --git a/QM_ver3.c b/QM_ver3.c new file mode 100644 index 000000000..e2c8962f1 --- /dev/null +++ b/QM_ver3.c @@ -0,0 +1,381 @@ +/* +ファイルの読み書きによるプロセス間通信を行う +inotify系の関数を用いて、書き込みを監視する。 +監視対象はファイルで、ファイルの配置場所は +あらかじめfile_path(inotify_fdにしている)として指定しておく。 + +コマンド、通信用ファイルのパスを先に格納 + +書き込み内容を読み取った後に内容を消去するように変更 +*/ + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define RTI_federate_nodes 2 //RTIを含めたfederate数 +#define EVENT_BUF_LEN (1024 * (EVENT_SIZE + 16)) +#define EVENT_SIZE (sizeof(struct inotify_event)) + + +typedef struct process_info { + char p_name[256]; //プロセス名 + pid_t pid; //Pid + int p_state; //プロセスの状態 + FILE *fd; //Pid,プロセス名,CPが書かれるファイルを指すポインタ + char file_path[256]; //通信用ファイルの配置場所指定 + char command[256]; //コマンドを格納 + bool check_do; //監視実行中フラグ + char cp_num[256]; //受信したcheck pointの番号 + char end_cp_num[256]; //プロセスから送信される最後のcheck point番号 + int timer_count; //各CPのカウント値(deadline配列より抽出) + int deadline[10]; //デッドラインを格納する配列(各reaction単位) +} process_info; + + +/* +初期CP受信によりタイムカウントをスタートする関数 +*/ +void time_count_start(process_info *p_info) { + p_info->timer_count = p_info->deadline[0]; + p_info->check_do = true; +} + +/* +2回目以降のCP受信によりタイムカウントをリセットする関数 +*/ +void time_count_decrement(process_info *p_info) { + int cp_num_value = atoi(p_info -> cp_num); + + if(strcmp(p_info->end_cp_num, p_info->cp_num) == 0) { + p_info->check_do = false; + } else { + p_info->timer_count = p_info->deadline[cp_num_value]; + } +} + +/* +タイマーイベントによるカウントダウン +*/ +void count_down(process_info *p_info) { + //kill関数を使う方法 管理Appをsudoで実行している場合、子プロセス(federateなど)はsudoでしか停止できない? + /* + for(int i =0; imask & IN_MODIFY) { + // 変更されたファイルを探す + for(int i = 0; i< RTI_federate_nodes; i++) { + if(event->wd == wd[i]) { + p_info[i].fd = fopen(p_info[i].file_path, "r"); + if (!p_info[i].fd) { + perror("QM: Error opening file"); + continue; + } + + if (fgets(buffer, sizeof(buffer), p_info[i].fd) != NULL) { + // デバッグ表示 + printf("QM: Data read from file by QM: %s\n", buffer); + // Pid,プロセス名が記録されていない場合は,読み込んだPid,プロセス名を記録 + if(sscanf(buffer, "process_name:%[^,],pid:%d", p_info[i].p_name, &pid_as_int) == 2) { + p_info[i].pid = (pid_t)pid_as_int; + printf("QM: read PID %d\n", p_info[i].pid); + get_count++; + } else { + perror("QM: failed get pid"); + } + } else { + printf("QM: Error reading file %d\n", i); + } + fclose(p_info[i].fd); + printf("QM: プロセス情報受信数 %d\n", get_count); + } + } + event_point += EVENT_SIZE + event->len; + } + } + } + + //関数終了デバッグ + //printf("wait_pid_and_pname End\n"); +} + +/* +プログラムを実行する +*/ +void executeProgram(process_info *p_info, int wd[], int *inotify_fd) { + //監視のためにinotifyインスタンスを生成 + if((*inotify_fd = inotify_init()) == -1) { + perror("Error inotify_init"); + exit(EXIT_FAILURE); + } + + for(int i =0; i < RTI_federate_nodes; i++) { + sleep(1); + int result = system(p_info[i].command); + if (result == -1) { + //デバッグ + perror("Error executing program"); + } else { + //デバッグ + printf("QM: Command %s executed successfully\n", p_info[i].command); + + + //pidの取得 + //p_info[i].pid = system("$!"); + //デバッグ + //printf("QM: pid %d\n", p_info[i].pid); + + + //通信用のファイルを生成 + p_info[i].fd = fopen(p_info[i].file_path, "w"); + if(p_info[i].fd == NULL) { + perror("QM: Faild make file"); + } else { + printf("QM: Make file success\n"); + } + fclose(p_info[i].fd); + printf("QM: file closed\n"); + + + //通信用ファイルを監視対象に設定 + wd[i] = inotify_add_watch(*inotify_fd, p_info[i].file_path, IN_MODIFY); + if(wd[i] == -1) { + perror("QM: Error inotify_add_watch"); + exit(EXIT_FAILURE); + } else { + printf("QM: 監視対象設定完了\n"); + } + } + } + //すべてのRTI,federateのPid,プロセス名を取得 + wait_pid_and_pname(p_info, wd, *inotify_fd); +} + + +/* +CPの書き込みを待ち、読み込む関数 +*/ +void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int *inotify_fd, int *timer_fd, fd_set *rdfs, int max_fd) { + char buffer[EVENT_BUF_LEN]; //イベント格納バッファ + timerfd_settime(*timer_fd, 0, &timer, NULL); + int count = 0; + + while (1) { + // ファイル変更イベント,タイマーイベントを待つ + fd_set tmp_fds = *rdfs; + int ret = select(max_fd, &tmp_fds, NULL, NULL, NULL); + if (ret == -1) { + perror("select"); + break; + } + + if (FD_ISSET(*timer_fd, &tmp_fds)) { + // タイマーイベントの処理 + uint64_t expirations; + if (read(*timer_fd, &expirations, sizeof(expirations)) == -1) { + perror("read"); + } else { + count_down(p_info); + } + } + + if (FD_ISSET(*inotify_fd, &tmp_fds)) { + //変更イベントを読み取る。readを使うことで一度に読み取り + int length = read(*inotify_fd, buffer, EVENT_BUF_LEN); //lengthはバイト数が入る。readで一度に読み取り + if (length < 0) { + perror("QM: read event"); + exit(EXIT_FAILURE); + } + count++; + + //読み込んだ変更イベントを1つずつ処理する + // event_point : 変更内容を順に取得するために使用.event毎の先頭アドレスを指す + int event_point = 0; + while (event_point < length) { + struct inotify_event *event = (struct inotify_event *) &buffer[event_point]; //キャストすることで、それぞれのイベントの先頭アドレスを指定 + // 変更のみ対応 + if (event->mask & IN_MODIFY) { + // 変更されたファイルを探す + for(int i = 0; i< RTI_federate_nodes; i++) { + if(event->wd == wd[i]) { + p_info[i].fd = fopen(p_info[i].file_path, "r+"); + if (!p_info[i].fd) { + perror("QM: Error opening file"); + continue; + } + + if (fgets(buffer, sizeof(buffer), p_info[i].fd) != NULL) { + // デバッグ表示 + //printf("QM: Data read from file by QM: %s", buffer); + //printf("QM: デバッグカウント %d\n", count); + + //CP情報を取得する + sscanf(buffer, "cp: %s", p_info[i].cp_num); + printf("QM: CP_num %s\n", p_info[i].cp_num); + + //CP受信による実行時間監視の開始 + if(strcmp(p_info[i].cp_num, "0") == 0) { + time_count_start(&p_info[i]); + printf("QM: counter %d\n", p_info[i].timer_count); + } else { + printf("QM: counter %d\n", p_info[i].timer_count); + time_count_decrement(&p_info[i]); + } + + int fd =fd = fileno(p_info[i].fd); + if(ftruncate(fd, 0) != 0) { + perror("Failed to truncate file\n"); + close(fd); + } + + } else { + //printf("QM: Error reading file %d\n", i); + //printf("QM: デバッグカウント %d\n", count); + } + fclose(p_info[i].fd); + } + } + event_point += EVENT_SIZE + event->len; + } + } + } + } +} + +int main() { + process_info p_info[RTI_federate_nodes]; //RTI, federateの情報を格納する構造体配列の作成 + int wd[RTI_federate_nodes]; //RTI, federateを監視するためのウォッチディスクリプタ配列 + int inotify_fd; //inotifyインスタンスのファイルディスクリプタ + fd_set rdfs; // + + /*カウントタイマー作成*/ + int timer_fd; + struct itimerspec timer; + /*カウントタイマーの時間を設定.起動間隔1ms*/ + timer.it_value.tv_sec = 0; + timer.it_value.tv_nsec = 1000000; + timer.it_interval.tv_sec = 0; + timer.it_interval.tv_nsec = 1000000; + timer_fd = timerfd_create(CLOCK_REALTIME, 0); + if (timer_fd == -1) { + perror("timerfd_create"); + return EXIT_FAILURE; + } + + + //必要情報を先に記録する + p_info_write(p_info); + + //RTI, federateの初期起動,コマンドを基に実行 + executeProgram(p_info, wd, &inotify_fd); + + //最大のファイルディスクリプタを設定 + int max_fd = (inotify_fd > timer_fd ? inotify_fd : timer_fd) + 1; + FD_ZERO(&rdfs); + FD_SET(inotify_fd, &rdfs); + FD_SET(timer_fd, &rdfs); + + //それぞれのプロセスからの通信を待つ + watch_cp_write(p_info, wd, timer, &inotify_fd, &timer_fd, &rdfs, max_fd); + + return 0; +} \ No newline at end of file diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 17d73e93e..acb450629 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -51,6 +51,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "net_util.h" #include // To trap ctrl-c and invoke a clean stop to save the trace file, if needed. #include +#include +#include +#include /** * The tracing mechanism uses the number of workers variable `_lf_number_of_workers`. @@ -275,6 +278,49 @@ int process_args(int argc, const char* argv[]) { return 1; } int main(int argc, const char* argv[]) { + + FILE *file; + pid_t pid = 0; + + pid = getpid(); + printf("RTI: Process ID: %d\n", pid); + + file = fopen("/home/yoshinoriterazawa/LF/RTI.txt", "w"); + if(file == NULL) { + perror("RTI: failed opening file"); + return 1; + } else { + // ファイルロックを取得 + int fd = fileno(file); + if (flock(fd, LOCK_EX) != 0) { + perror("RTI: failed to lock file"); + fclose(file); + return 1; + } + + fprintf(file, "process_name:RTI,pid:%d", pid); + printf("RTI: write file\n"); + + // ファイルを閉じる前にロックを解除 + if (flock(fd, LOCK_UN) != 0) { + perror("RTI: failed to unlock file"); + } + + if (fclose(file) == 0) { + // fcloseが成功した場合 + // ファイルディスクリプタの有効性を確認 + if (fcntl(fd, F_GETFD) == -1 && errno == EBADF) { + // ファイルディスクリプタが無効の場合、ファイルは正しく閉じられている + printf("RTI: File closed successfully.\n"); + } else { + // ファイルディスクリプタが有効な場合、ファイルがまだ開いている + printf("RTI: File is still open.\n"); + } + } else { + perror("RTI: Error closing file"); + } + } + initialize_lf_thread_id(); initialize_RTI(&rti); From 51a2c1a4cecdbf7704000d876fb2456cd860ef68 Mon Sep 17 00:00:00 2001 From: yoshinoriterazawa Date: Tue, 23 Jul 2024 02:14:54 +0900 Subject: [PATCH 2/3] =?UTF-8?q?=E7=9B=A3=E8=A6=96=E3=82=A2=E3=83=97?= =?UTF-8?q?=E3=83=AA=E3=81=A8RTI=E3=81=AE=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- QM_ver3.c => Monitoring_ver4.c | 269 ++++++++++++++------------------- core/federated/RTI/main.c | 3 + 2 files changed, 116 insertions(+), 156 deletions(-) rename QM_ver3.c => Monitoring_ver4.c (53%) diff --git a/QM_ver3.c b/Monitoring_ver4.c similarity index 53% rename from QM_ver3.c rename to Monitoring_ver4.c index e2c8962f1..76630ab82 100644 --- a/QM_ver3.c +++ b/Monitoring_ver4.c @@ -7,6 +7,10 @@ inotify系の関数を用いて、書き込みを監視する。 コマンド、通信用ファイルのパスを先に格納 書き込み内容を読み取った後に内容を消去するように変更 + +並列実行を考慮した監視を可能にする。以下,並列実行の要素 +1.afterを加えることによるreactionの同時実行(同じタグで複数reactionがトリガされる場合に生成) +2.thread生成による並列実行 */ #define _GNU_SOURCE @@ -15,6 +19,7 @@ inotify系の関数を用いて、書き込みを監視する。 #include #include #include +#include #include #include #include @@ -27,83 +32,69 @@ inotify系の関数を用いて、書き込みを監視する。 #include #include + #define RTI_federate_nodes 2 //RTIを含めたfederate数 #define EVENT_BUF_LEN (1024 * (EVENT_SIZE + 16)) #define EVENT_SIZE (sizeof(struct inotify_event)) +#define max_cp_num 10 +typedef struct check_point_info { + bool check_do; //監視実行中フラグ + bool start_cp; + bool end_cp; //プロセスから送信される最後のcheck point番号 + int timer_count; //各CPのカウント値(deadline配列より抽出) +} cp_info; + typedef struct process_info { - char p_name[256]; //プロセス名 pid_t pid; //Pid int p_state; //プロセスの状態 FILE *fd; //Pid,プロセス名,CPが書かれるファイルを指すポインタ char file_path[256]; //通信用ファイルの配置場所指定 char command[256]; //コマンドを格納 - bool check_do; //監視実行中フラグ char cp_num[256]; //受信したcheck pointの番号 - char end_cp_num[256]; //プロセスから送信される最後のcheck point番号 - int timer_count; //各CPのカウント値(deadline配列より抽出) - int deadline[10]; //デッドラインを格納する配列(各reaction単位) + int deadline[10]; //デッドラインを格納する配列(各reaction単位) + cp_info* cp_array[max_cp_num]; //各cpに関しての情報を格納 } process_info; - -/* -初期CP受信によりタイムカウントをスタートする関数 -*/ -void time_count_start(process_info *p_info) { - p_info->timer_count = p_info->deadline[0]; - p_info->check_do = true; -} - /* 2回目以降のCP受信によりタイムカウントをリセットする関数 */ -void time_count_decrement(process_info *p_info) { +void time_count_update(process_info *p_info) { int cp_num_value = atoi(p_info -> cp_num); - if(strcmp(p_info->end_cp_num, p_info->cp_num) == 0) { - p_info->check_do = false; + if(p_info->cp_array[cp_num_value]->end_cp == true) { + p_info->cp_array[cp_num_value - 1]->check_do = false; + } else if(p_info->cp_array[cp_num_value]->start_cp == true) { + p_info->cp_array[cp_num_value]->timer_count = p_info->deadline[cp_num_value]; + p_info->cp_array[cp_num_value]->check_do = true; } else { - p_info->timer_count = p_info->deadline[cp_num_value]; - } + p_info->cp_array[cp_num_value]->timer_count = p_info->deadline[cp_num_value]; + p_info->cp_array[cp_num_value]->check_do = true; + p_info->cp_array[cp_num_value - 1]->check_do = false; + } } /* タイマーイベントによるカウントダウン */ void count_down(process_info *p_info) { - //kill関数を使う方法 管理Appをsudoで実行している場合、子プロセス(federateなど)はsudoでしか停止できない? - /* - for(int i =0; icheck_do == true) { + p_info[i].cp_array[j]->timer_count--; + if(p_info[i].cp_array[j]->timer_count == 0) { + char cmd[50]; + sprintf(cmd, "sudo kill %d", p_info[i].pid); + printf("will kill PID: %d\n", p_info[i].pid); + + int result = system(cmd); + if(result == -1) { + perror("system kill"); + } else { + printf("QM: kill success\n"); + } } } } @@ -114,88 +105,59 @@ void count_down(process_info *p_info) { プロセスの情報を先に格納しておく(他に良い方法がありそう) */ void p_info_write(process_info *p_info) { + //p_infoのメンバー変数を初期化 + for (int i = 0; i < RTI_federate_nodes; ++i) { + // 各 process_info 構造体のメンバーを初期化 + p_info[i].pid = 0; + p_info[i].p_state = 0; + p_info[i].fd = NULL; + memset(p_info[i].file_path, 0, sizeof(p_info[i].file_path)); + memset(p_info[i].command, 0, sizeof(p_info[i].command)); + memset(p_info[i].cp_num, 0, sizeof(p_info[i].cp_num)); + memset(p_info[i].deadline, 0, sizeof(p_info[i].deadline)); + + // cp_array の各ポインタを NULL に初期化 + for (int j = 0; j < max_cp_num; ++j) { + p_info[i].cp_array[j] = NULL; + } + + // cp_info 構造体のインスタンスを動的に作成して初期化 + for (int j = 0; j < max_cp_num; ++j) { + p_info[i].cp_array[j] = (cp_info*)malloc(sizeof(cp_info)); + if (p_info[i].cp_array[j] == NULL) { + perror("Failed to allocate memory for cp_info"); + } + p_info[i].cp_array[j]->check_do = false; + p_info[i].cp_array[j]->start_cp = false; + p_info[i].cp_array[j]->end_cp = false; + p_info[i].cp_array[j]->timer_count = 0; + } + } + //コマンドを格納 - strcpy(p_info[0].command, "taskset -c 0 RTI -n 1 &"); - strcpy(p_info[1].command, "taskset -c 2 /home/yoshinoriterazawa/LF/fed-gen/filewrite/bin/federate__writer &"); + strcpy(p_info[0].command, "taskset -c 1 RTI -n 1 & echo $! > /home/yoshinoriterazawa/LF/RTI.txt"); + strcpy(p_info[1].command, "taskset -c 0,2 /home/yoshinoriterazawa/LF/fed-gen/filewrite/bin/federate__writer & echo $! > /home/yoshinoriterazawa/LF/federate_writer.txt"); //通信用ファイルのパスを格納 strcpy(p_info[0].file_path, "/home/yoshinoriterazawa/LF/RTI.txt"); strcpy(p_info[1].file_path, "/home/yoshinoriterazawa/LF/federate_writer.txt"); - //check_do(カウントダウン実行の有無)をfalseに設定 - p_info[0].check_do = false; - p_info[1].check_do = false; + //実行シーケンスの最初のCPを設定 + p_info[1].cp_array[0]->start_cp = true; + p_info[1].cp_array[4]->start_cp = true; + //end_cp_num(最後のCP番号)を格納(とりあえずfederateのみ) - strcpy(p_info[1].end_cp_num, "2"); + p_info[1].cp_array[3]->end_cp = true; + p_info[1].cp_array[7]->end_cp = true; //デッドラインを格納(とりあえずfederateのみ) p_info[1].deadline[0] = 1010; - p_info[1].deadline[1] = 1000; - //p_info[1].deadline[2] = 1010; -} - -/* -Pidとプロセス名を取得し、process_infoに格納する -*/ -void wait_pid_and_pname(process_info *p_info, int wd[], int inotify_fd) { - char buffer[EVENT_BUF_LEN]; //イベント格納バッファ - int pid_as_int; - int get_count = 0; // Pid,プロセス名の取得数を記録 - - printf("QM: wait_pid_and_pname start\n"); - fflush(stdout); - - // 全てのRTI,federateからPid,プロセス名を取得するまで繰り返す。 - while (get_count < RTI_federate_nodes) { - //変更イベントを読み取る。readを使うことで一度に読み取り - int length = read(inotify_fd, buffer, EVENT_BUF_LEN); //lengthはバイト数が入る。readで一度に読み取り - if (length < 0) { - perror("QM: read event"); - exit(EXIT_FAILURE); - } - - //読み込んだ変更イベントを1つずつ処理する - // event_point : 変更内容を順に取得するために使用.event毎の先頭アドレスを指す - int event_point = 0; - while (event_point < length) { - struct inotify_event *event = (struct inotify_event *) &buffer[event_point]; //キャストすることで、それぞれのイベントの先頭アドレスを指定 - // 変更のみ対応 - if (event->mask & IN_MODIFY) { - // 変更されたファイルを探す - for(int i = 0; i< RTI_federate_nodes; i++) { - if(event->wd == wd[i]) { - p_info[i].fd = fopen(p_info[i].file_path, "r"); - if (!p_info[i].fd) { - perror("QM: Error opening file"); - continue; - } - - if (fgets(buffer, sizeof(buffer), p_info[i].fd) != NULL) { - // デバッグ表示 - printf("QM: Data read from file by QM: %s\n", buffer); - // Pid,プロセス名が記録されていない場合は,読み込んだPid,プロセス名を記録 - if(sscanf(buffer, "process_name:%[^,],pid:%d", p_info[i].p_name, &pid_as_int) == 2) { - p_info[i].pid = (pid_t)pid_as_int; - printf("QM: read PID %d\n", p_info[i].pid); - get_count++; - } else { - perror("QM: failed get pid"); - } - } else { - printf("QM: Error reading file %d\n", i); - } - fclose(p_info[i].fd); - printf("QM: プロセス情報受信数 %d\n", get_count); - } - } - event_point += EVENT_SIZE + event->len; - } - } - } - - //関数終了デバッグ - //printf("wait_pid_and_pname End\n"); + p_info[1].deadline[1] = 100; + p_info[1].deadline[2] = 1010; + p_info[1].deadline[4] = 1010; + p_info[1].deadline[5] = 100; + p_info[1].deadline[6] = 1010; } /* @@ -217,20 +179,19 @@ void executeProgram(process_info *p_info, int wd[], int *inotify_fd) { } else { //デバッグ printf("QM: Command %s executed successfully\n", p_info[i].command); - - - //pidの取得 - //p_info[i].pid = system("$!"); - //デバッグ - //printf("QM: pid %d\n", p_info[i].pid); - - //通信用のファイルを生成 - p_info[i].fd = fopen(p_info[i].file_path, "w"); + //Pid取得 + p_info[i].fd = fopen(p_info[i].file_path, "r+"); if(p_info[i].fd == NULL) { perror("QM: Faild make file"); } else { - printf("QM: Make file success\n"); + fscanf(p_info[i].fd, "%d", &(p_info[i].pid)); + printf("QM: scanned pid %d\n", p_info[i].pid); + int fd = fileno(p_info[i].fd); + if(ftruncate(fd, 0) != 0) { + perror("Failed to truncate file\n"); + close(fd); + } } fclose(p_info[i].fd); printf("QM: file closed\n"); @@ -246,18 +207,16 @@ void executeProgram(process_info *p_info, int wd[], int *inotify_fd) { } } } - //すべてのRTI,federateのPid,プロセス名を取得 - wait_pid_and_pname(p_info, wd, *inotify_fd); } - /* CPの書き込みを待ち、読み込む関数 */ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int *inotify_fd, int *timer_fd, fd_set *rdfs, int max_fd) { char buffer[EVENT_BUF_LEN]; //イベント格納バッファ + char line[256]; timerfd_settime(*timer_fd, 0, &timer, NULL); - int count = 0; + //int count = 0; //デバッグ用 while (1) { // ファイル変更イベント,タイマーイベントを待つ @@ -285,7 +244,7 @@ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int perror("QM: read event"); exit(EXIT_FAILURE); } - count++; + //count++; //読み込んだ変更イベントを1つずつ処理する // event_point : 変更内容を順に取得するために使用.event毎の先頭アドレスを指す @@ -303,34 +262,32 @@ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int continue; } - if (fgets(buffer, sizeof(buffer), p_info[i].fd) != NULL) { - // デバッグ表示 - //printf("QM: Data read from file by QM: %s", buffer); - //printf("QM: デバッグカウント %d\n", count); - - //CP情報を取得する - sscanf(buffer, "cp: %s", p_info[i].cp_num); + while (flock(fileno(p_info[i].fd), LOCK_EX) == -1) { // 排他ロックを取得 + perror("QM: Failed to lock file"); + usleep(1000); // 待機してリトライ + } + + int last_line = 0; + while (fgets(line, sizeof(line), p_info[i].fd) != NULL) { + last_line = 1; + sscanf(line, "cp: %s", p_info[i].cp_num); printf("QM: CP_num %s\n", p_info[i].cp_num); //CP受信による実行時間監視の開始 - if(strcmp(p_info[i].cp_num, "0") == 0) { - time_count_start(&p_info[i]); - printf("QM: counter %d\n", p_info[i].timer_count); - } else { - printf("QM: counter %d\n", p_info[i].timer_count); - time_count_decrement(&p_info[i]); - } + time_count_update(&p_info[i]); + //実行デバック + int cp_num_value = atoi(p_info[i].cp_num); + printf("QM: updated count %d\n", p_info[i].cp_array[cp_num_value]->timer_count); + } - int fd =fd = fileno(p_info[i].fd); - if(ftruncate(fd, 0) != 0) { + if (last_line) { + int fd = fileno(p_info[i].fd); + if (ftruncate(fd, 0) != 0) { perror("Failed to truncate file\n"); - close(fd); } - - } else { - //printf("QM: Error reading file %d\n", i); - //printf("QM: デバッグカウント %d\n", count); + rewind(p_info[i].fd); // ファイルポインタを先頭に戻す } + fclose(p_info[i].fd); } } diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index acb450629..6487707ba 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -279,6 +279,8 @@ int process_args(int argc, const char* argv[]) { } int main(int argc, const char* argv[]) { + /*実行時にPIDを取得するので除外*/ + /* FILE *file; pid_t pid = 0; @@ -320,6 +322,7 @@ int main(int argc, const char* argv[]) { perror("RTI: Error closing file"); } } + */ initialize_lf_thread_id(); initialize_RTI(&rti); From d6eb876ad9dbc415f91caf121467ca58fba7c379 Mon Sep 17 00:00:00 2001 From: yoshinoriterazawa Date: Wed, 24 Jul 2024 14:10:00 +0900 Subject: [PATCH 3/3] =?UTF-8?q?=E7=9B=A3=E8=A6=96=E3=82=A2=E3=83=97?= =?UTF-8?q?=E3=83=AA=E3=81=AE=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Monitoring_ver4.c | 86 +++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/Monitoring_ver4.c b/Monitoring_ver4.c index 76630ab82..74223ac26 100644 --- a/Monitoring_ver4.c +++ b/Monitoring_ver4.c @@ -41,15 +41,15 @@ inotify系の関数を用いて、書き込みを監視する。 typedef struct check_point_info { bool check_do; //監視実行中フラグ - bool start_cp; - bool end_cp; //プロセスから送信される最後のcheck point番号 - int timer_count; //各CPのカウント値(deadline配列より抽出) + bool start_cp; //スレッドから送信される最初のcheck point番号 + bool end_cp; //スレッドから送信される最後のcheck point番号 + int timer_count; //各CPのカウント値(process_info.deadline配列より抽出) } cp_info; typedef struct process_info { pid_t pid; //Pid - int p_state; //プロセスの状態 - FILE *fd; //Pid,プロセス名,CPが書かれるファイルを指すポインタ + int p_state; //プロセスの状態 実行 = 1,停止 = 0 + FILE *fd; //Pid,CPが書かれるファイルを指すポインタ char file_path[256]; //通信用ファイルの配置場所指定 char command[256]; //コマンドを格納 char cp_num[256]; //受信したcheck pointの番号 @@ -58,7 +58,7 @@ typedef struct process_info { } process_info; /* -2回目以降のCP受信によりタイムカウントをリセットする関数 +受信したCPを基に,カウント値を更新する関数 */ void time_count_update(process_info *p_info) { int cp_num_value = atoi(p_info -> cp_num); @@ -72,28 +72,31 @@ void time_count_update(process_info *p_info) { p_info->cp_array[cp_num_value]->timer_count = p_info->deadline[cp_num_value]; p_info->cp_array[cp_num_value]->check_do = true; p_info->cp_array[cp_num_value - 1]->check_do = false; - } + } } /* -タイマーイベントによるカウントダウン +タイマーイベントによるカウントダウン.カウント値0で対応プロセスを強制停止 */ void count_down(process_info *p_info) { //sudoでkillする.system()を用いる for(int i =0; icheck_do == true) { + if(p_info[i].cp_array[j]->check_do == true && p_info[i].p_state == 1) { p_info[i].cp_array[j]->timer_count--; + //カウント値が0であれば停止 if(p_info[i].cp_array[j]->timer_count == 0) { char cmd[50]; sprintf(cmd, "sudo kill %d", p_info[i].pid); - printf("will kill PID: %d\n", p_info[i].pid); - + printf("Monitoring: will kill PID: %d\n", p_info[i].pid); + int result = system(cmd); if(result == -1) { - perror("system kill"); + perror("Monitoring: system kill"); } else { - printf("QM: kill success\n"); + printf("Monitoring: kill success\n"); + //プロセス状態を更新 + p_info[i].p_state = 0; } } } @@ -125,7 +128,7 @@ void p_info_write(process_info *p_info) { for (int j = 0; j < max_cp_num; ++j) { p_info[i].cp_array[j] = (cp_info*)malloc(sizeof(cp_info)); if (p_info[i].cp_array[j] == NULL) { - perror("Failed to allocate memory for cp_info"); + perror("Monitoring: Failed to allocate memory for cp_info"); } p_info[i].cp_array[j]->check_do = false; p_info[i].cp_array[j]->start_cp = false; @@ -154,19 +157,19 @@ void p_info_write(process_info *p_info) { //デッドラインを格納(とりあえずfederateのみ) p_info[1].deadline[0] = 1010; p_info[1].deadline[1] = 100; - p_info[1].deadline[2] = 1010; + p_info[1].deadline[2] = 100; p_info[1].deadline[4] = 1010; p_info[1].deadline[5] = 100; p_info[1].deadline[6] = 1010; } /* -プログラムを実行する +プログラムを実行する.その際,実行したプログラムのpidを取得 */ void executeProgram(process_info *p_info, int wd[], int *inotify_fd) { //監視のためにinotifyインスタンスを生成 if((*inotify_fd = inotify_init()) == -1) { - perror("Error inotify_init"); + perror("Monitoring: Error inotify_init"); exit(EXIT_FAILURE); } @@ -175,55 +178,57 @@ void executeProgram(process_info *p_info, int wd[], int *inotify_fd) { int result = system(p_info[i].command); if (result == -1) { //デバッグ - perror("Error executing program"); + perror("Monitoring: Error executing program"); } else { //デバッグ - printf("QM: Command %s executed successfully\n", p_info[i].command); + printf("Monitoring: Command %s executed successfully\n", p_info[i].command); + + //プロセスの状態を更新 + p_info[i].p_state = 1; //Pid取得 p_info[i].fd = fopen(p_info[i].file_path, "r+"); if(p_info[i].fd == NULL) { - perror("QM: Faild make file"); + perror("Monitoring: Faild make file"); } else { fscanf(p_info[i].fd, "%d", &(p_info[i].pid)); - printf("QM: scanned pid %d\n", p_info[i].pid); + printf("Monitoring: scanned pid %d\n", p_info[i].pid); int fd = fileno(p_info[i].fd); if(ftruncate(fd, 0) != 0) { - perror("Failed to truncate file\n"); + perror("Monitoring: Failed to truncate file\n"); close(fd); } } fclose(p_info[i].fd); - printf("QM: file closed\n"); + printf("Monitoring: file closed\n"); //通信用ファイルを監視対象に設定 wd[i] = inotify_add_watch(*inotify_fd, p_info[i].file_path, IN_MODIFY); if(wd[i] == -1) { - perror("QM: Error inotify_add_watch"); + perror("Monitoring: Error inotify_add_watch"); exit(EXIT_FAILURE); } else { - printf("QM: 監視対象設定完了\n"); + printf("Monitoring: 監視対象設定完了\n"); } } } } /* -CPの書き込みを待ち、読み込む関数 +CPの書き込みまたはタイマーイベントの起動を待ち、読み込む関数 */ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int *inotify_fd, int *timer_fd, fd_set *rdfs, int max_fd) { char buffer[EVENT_BUF_LEN]; //イベント格納バッファ - char line[256]; - timerfd_settime(*timer_fd, 0, &timer, NULL); - //int count = 0; //デバッグ用 + char line[256]; //通信用ファイルを1行ずつ読み込むための配列 + timerfd_settime(*timer_fd, 0, &timer, NULL); // タイマーイベントのセット while (1) { // ファイル変更イベント,タイマーイベントを待つ fd_set tmp_fds = *rdfs; int ret = select(max_fd, &tmp_fds, NULL, NULL, NULL); if (ret == -1) { - perror("select"); + perror("Monitoring: select function failed"); break; } @@ -231,7 +236,7 @@ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int // タイマーイベントの処理 uint64_t expirations; if (read(*timer_fd, &expirations, sizeof(expirations)) == -1) { - perror("read"); + perror("Monitoring: read function failed"); } else { count_down(p_info); } @@ -241,10 +246,9 @@ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int //変更イベントを読み取る。readを使うことで一度に読み取り int length = read(*inotify_fd, buffer, EVENT_BUF_LEN); //lengthはバイト数が入る。readで一度に読み取り if (length < 0) { - perror("QM: read event"); + perror("Monitoring: read event failed"); exit(EXIT_FAILURE); } - //count++; //読み込んだ変更イベントを1つずつ処理する // event_point : 変更内容を順に取得するために使用.event毎の先頭アドレスを指す @@ -258,12 +262,12 @@ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int if(event->wd == wd[i]) { p_info[i].fd = fopen(p_info[i].file_path, "r+"); if (!p_info[i].fd) { - perror("QM: Error opening file"); + perror("Monitoring: Error opening file"); continue; } while (flock(fileno(p_info[i].fd), LOCK_EX) == -1) { // 排他ロックを取得 - perror("QM: Failed to lock file"); + perror("Monitoring: Failed to lock file"); usleep(1000); // 待機してリトライ } @@ -271,19 +275,19 @@ void watch_cp_write(process_info *p_info, int wd[], struct itimerspec timer, int while (fgets(line, sizeof(line), p_info[i].fd) != NULL) { last_line = 1; sscanf(line, "cp: %s", p_info[i].cp_num); - printf("QM: CP_num %s\n", p_info[i].cp_num); + printf("Monitoring: CP_num %s\n", p_info[i].cp_num); //CP受信による実行時間監視の開始 time_count_update(&p_info[i]); //実行デバック int cp_num_value = atoi(p_info[i].cp_num); - printf("QM: updated count %d\n", p_info[i].cp_array[cp_num_value]->timer_count); + printf("Monitoring: updated count %d\n", p_info[i].cp_array[cp_num_value]->timer_count); } if (last_line) { int fd = fileno(p_info[i].fd); if (ftruncate(fd, 0) != 0) { - perror("Failed to truncate file\n"); + perror("Monitoring: Failed to truncate file\n"); } rewind(p_info[i].fd); // ファイルポインタを先頭に戻す } @@ -314,7 +318,7 @@ int main() { timer.it_interval.tv_nsec = 1000000; timer_fd = timerfd_create(CLOCK_REALTIME, 0); if (timer_fd == -1) { - perror("timerfd_create"); + perror("Monitoring: timerfd_create"); return EXIT_FAILURE; } @@ -322,7 +326,7 @@ int main() { //必要情報を先に記録する p_info_write(p_info); - //RTI, federateの初期起動,コマンドを基に実行 + //プログラムを実行する.その際,実行したプログラムのpidを取得 executeProgram(p_info, wd, &inotify_fd); //最大のファイルディスクリプタを設定 @@ -331,7 +335,7 @@ int main() { FD_SET(inotify_fd, &rdfs); FD_SET(timer_fd, &rdfs); - //それぞれのプロセスからの通信を待つ + //CPの書き込みまたはタイマーイベントの起動を待ち、読み込む関数 watch_cp_write(p_info, wd, timer, &inotify_fd, &timer_fd, &rdfs, max_fd); return 0;