Using fork and pipe functions in C to count lines, characters, and words from file with multiple processes

1.6k Views Asked by At

I am trying to use multiple processes with the fork and pipe functions to modify a program that reads through a file and return the total number of lines, words, and characters. The program compiles and runs fine, but currently the output is only correct when the user inputs 1 for the number of child processes. For every other positive integer the program returns the correct value multiplied by the number of processes. (ex: if number of lines in file is 4 and user inputs 2 for number of processes, the returned value is 8) Does anyone know how I can fix this so that each process divides the work of reading through the file instead of each reading through the entire thing? The user enters the file name and the number of child processes they want when they run the program. I do not care about efficiency at the moment, only that the output is correct. Here is my code:

//wc.h
#ifndef WC_H
#define WC_H

#include <stdio.h>

typedef struct count_t {
        int linecount;
        int wordcount;
        int charcount;
} count_t;

count_t word_count(FILE* fp, long offset, long size);
extern int crashRate;

#endif



//wc_mul.c
#include "wc.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>

#define MAX_PROC 100
#define MAX_FORK 100

int crashRate = 0;

count_t word_count(FILE* fp, long offset, long size)
{
        char ch;
        long rbytes = 0;

        count_t count;
        // Initialize counter variables
        count.linecount = 0;
        count.wordcount = 0;
        count.charcount = 0;
        
        printf("[pid %d] reading %ld bytes from offset %ld\n", getpid(), size, offset);

        if(fseek(fp, offset, SEEK_SET) < 0) {
                printf("[pid %d] fseek error!\n", getpid());
        }

        while ((ch=getc(fp)) != EOF && rbytes < size) {
                // Increment character count if NOT new line or space
                if (ch != ' ' && ch != '\n') { ++count.charcount; }

                // Increment word count if new line or space character
                if (ch == ' ' || ch == '\n') { ++count.wordcount; }

                // Increment line count if new line character
                if (ch == '\n') { ++count.linecount; }
                rbytes++;
        }

        srand(getpid());
        if(crashRate > 0 && (rand()%100 < crashRate)) 
        {
                printf("[pid %d] crashed.\n", getpid());
                abort();
        }

        return count;
}


int main(int argc, char **argv)
{
                long fsize;
                FILE *fp;
                int numJobs;
        //plist_t plist[MAX_PROC];
                count_t total, count, buf;
                int i, j, pid, status, p[2];
                int nFork = 0;

        if(argc < 3) {
                printf("usage: wc <# of processes> <filname>\n");
                return 0;
        }
        
        if(argc > 3) {
                crashRate = atoi(argv[3]);
                if(crashRate < 0) crashRate = 0;
                if(crashRate > 50) crashRate = 50;
        }
        printf("crashRate RATE: %d\n", crashRate);


        numJobs = atoi(argv[1]);
        if(numJobs > MAX_PROC) numJobs = MAX_PROC;

        total.linecount = 0;
        total.wordcount = 0;
        total.charcount = 0;

        // Open file in read-only mode
        fp = fopen(argv[2], "r");

        if(fp == NULL) {
                printf("File open error: %s\n", argv[2]);
                printf("usage: wc <# of processes> <filname>\n");
                return 0;
        }

        fseek(fp, 0L, SEEK_END);
        fsize = ftell(fp);
        
        fclose(fp);
        // calculate file offset and size to read for each child
        
        for(i = 0; i < numJobs; i++) {
                if(nFork++ > MAX_FORK) return 0;
                if (pipe(p) != 0)
                  exit(1);
                pid = fork();
                if(pid < 0) {
                        printf("Fork failed.\n");
                } else if(pid == 0) {
                        // Child
                        fp = fopen(argv[2], "r");           
                        count = word_count(fp, 0, fsize);
                        write(p[1], &count, sizeof(count));
                        close(p[0]);
                        close(p[1]);
                        // send the result to the parent through the message queue
                        fclose(fp);
                        return 0;
                }
        }
        waitpid(pid, &status, 0);
        close(p[1]);
        for (j=0; j < numJobs; j++) {
          read(p[0], &buf, sizeof(count));
          total.linecount += buf.linecount;
          total.wordcount += buf.wordcount;
          total.charcount += buf.charcount;
        }
        // Parent
        // wait for all children
        // check their exit status
        // read the result from normalliy terminated child
        // re-crete new child if there is one or more failed child
        
        printf("\n========== Final Results ================\n");
        printf("Total Lines : %d \n", total.linecount);
        printf("Total Words : %d \n", total.wordcount);
        printf("Total Characters : %d \n", total.charcount);
        printf("=========================================\n");
        return(0);
}
1

There are 1 best solutions below

4
On
  1. The pipe must be opened before you fork. Otherwise it's not available in the parent.

  2. Pass a pointer to count, not count itself, to read and write:

write(p[1], &count, sizeof(count)); .. read(p[0], &buf, sizeof(count));

  1. Each child needs to close the write-end of the pipe after the write via close(p[1]). Now, you're only closing the read-end.

PS: And adding the usual result checking for read and write would be advisable too.


If you use a single pipe for multiple processes, once the first child closes the pipe, it's closed and can't be read anymore.

You need an array of pipes, one for each process:

int p[numJobs][2];

Below code worked for me. I added some printf for better understanding.

int main(int argc, char **argv)
{
    long fsize;
    FILE *fp;
    int numJobs;

    count_t total, count, buf;
    int i, j, pid, status;

    if(argc < 3) {
            printf("usage: wc <# of processes> <filname>\n");
            return 0;
    }
        
    if(argc > 3) {
            crashRate = atoi(argv[3]);
            if(crashRate < 0) crashRate = 0;
            if(crashRate > 50) crashRate = 50;
    }
    printf("crashRate RATE: %d\n", crashRate);


    numJobs = atoi(argv[1]);
    if(numJobs > MAX_PROC) numJobs = MAX_PROC;

    int p[numJobs][2];
    
    total.linecount = 0;
    total.wordcount = 0;
    total.charcount = 0;

    // Open file in read-only mode
    fp = fopen(argv[2], "r");

    if(fp == NULL) {
            printf("File open error: %s\n", argv[2]);
            printf("usage: wc <# of processes> <filname>\n");
            return 0;
    }

    fseek(fp, 0L, SEEK_END);
    fsize = ftell(fp);
    
    fclose(fp);
        // calculate file offset and size to read for each child
        
    for(i = 0; i < numJobs; i++) {
        
        if (pipe(p[i]) != 0) exit(1);

        pid = fork();
        
        if(pid < 0) {
            printf("Fork failed.\n");
            exit(1);
        } else if(pid == 0) {
            
            // Child
            fp = fopen(argv[2], "r");
            count = word_count(fp, 0, fsize);
            fclose(fp);

            close(p[i][0]);

            // send the result to the parent through the message queue
            long bytes_sent;
            
            if ( (bytes_sent = write(p[i][1], &count, sizeof(count)) ) ==-1) {
                printf("Writing into pipe failed.\n");
                exit(1);
            };
            printf("Child  process %d sent %ld bytes (%'d lines, %'d words, %'d chars) \n",getpid(), bytes_sent, count.linecount, count.wordcount, count.charcount);
            
            close(p[i][1]);
            _exit(0);
            
        }
    }
    
    
    // wait for all child processes to close
    while(wait(NULL) != -1){};

    long bytes_read;
    
    for (j=0; j < numJobs; j++) {
                
        if ((bytes_read = read(p[j][0], &buf, sizeof(buf)) ) ==-1) {
            printf("Reading from pipe failed.\n");
            exit(1);
        };

        if (bytes_read){
            
            printf("Parent process %d read %ld bytes (%'d lines, %'d words, %'d chars) \n",getpid(), bytes_read, buf.linecount, buf.wordcount, buf.charcount);

            total.linecount += buf.linecount;
            total.wordcount += buf.wordcount;
            total.charcount += buf.charcount;
        }
                    
        close(p[j][0]);
        close(p[j][1]);
    }


    // Parent
        // wait for all children
        // check their exit status
        // read the result from normalliy terminated child
        // re-create new child if there is one or more failed child
        
        printf("\n========== Final Results ================\n");
        printf("Total Lines : %d \n", total.linecount);
        printf("Total Words : %d \n", total.wordcount);
        printf("Total Characters : %d \n", total.charcount);
        printf("=========================================\n");
        return(0);
}