Real time log based statistics

Some time ago I came across a need to generate real time statistics of how many emails do our Exim installations send per second. Didn’t want to go into reconfiguring exim itself, thus I wrote a simple C program, that follows syslog generated file, does little parsing on a timestamp, and writes stats to disks every couple seconds. It :

  • does work only on one kind of timestamps (%Y-%m-%d %H:%M:%S)
  • accepts no parameters apart of the output file
  • does reading from stdin in main loop, and has another thread to write stats to disk
  • could be written better, faster, cleaner, using less memory, having less IO impact
  • generally works… been running for months without any problems (it will reallocate more memory if needed and clean after itself after a while)
  • can work on any log, as far timestamp is in the right format (unless you make it accepting other ;) ), as long you can grep what you want to count
  • is released under WTFPL

When it comes to Exim, it would be probably way more feasible (less IO overhead) to add a transport filter and count there, but then you’ll have all the issues with locking (exim sends emails with multiple queue runners), parallelism etc. Plus if your filter failed, you lose your MTA…

Once you compile, simply tail -f any log file and pipe it through the meter. Every 10 seconds it will write to the output file (which can be later fetched via HTTP) you specified as a parameter three numbers:

  1. how many events per second were in the piped in input, over last 10 seconds
  2. 10
  3. timestamp of the last update
/**
 * Copyright (C) 2009 Maciej Wiercinski http://blog.wiercinski.net/
 *
 * This program is free software. It comes without any warranty, to
 * the extent permitted by applicable law. You can redistribute it
 * and/or modify it under the terms of the Do What The Fuck You Want
 * To Public License, Version 2, as published by Sam Hocevar. See
 * http://sam.zoy.org/wtfpl/COPYING for more details.
 */

#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define   READBUFSIZE 512
#define   QUEUE_BLOCK 128
#define   HITTIME 1
#define   COLLECTSTATS 10 

/**
Compilation:
    gcc -lpthread monitor.c  -o monitor

To test:
    while [ 1 ]; do sleepenh 0.`echo "$RANDOM % 10000" | bc |  awk '{ printf "%05d" , $1; }'` > /dev/null; date +'%Y-%m-%d %H:%M:%S'; done | ./monitor output

Usage :
tail --follow=name /var/log/exim4/mainlog | grep Completed --line-buffered   | ./monitor stats.txt
*/

pthread_mutex_t mx_queue = PTHREAD_MUTEX_INITIALIZER;


struct rw_parms {
    int ** queue;
    int * queuesize;
    int * elems;
    char * filepath;
};


int comparator (const void *a, const void *b) {
    return (*(int*)b - *(int*)a );
}

void * writer(void *ptr) {
    struct rw_parms * rp;   FILE * f;
    double result;
    int i = 0; 
    int diff;
    rp = (struct rw_parms *) ptr;

    while(1) {
        sleep(HITTIME);
        pthread_mutex_lock(&mx_queue);
        qsort(*(rp->queue), *(rp->elems), sizeof(int), comparator);
        for(i= 0 ; i< (*(rp->elems) ); i++) {
            diff = time(NULL) - (*(rp->queue))[i];
            if(diff > COLLECTSTATS) {
                (*(rp->elems)) --;         }
        }
        result = (double)(*(rp->elems)) / (double) COLLECTSTATS;
        f = fopen(rp->filepath,"w");
        if(!f) {
            fprintf(stderr, "\n Failed to open a file");
            return NULL;
        }
        fprintf(f,"%.1lf %d %d", result, COLLECTSTATS,time(NULL));
        fclose(f);
        pthread_mutex_unlock(&mx_queue);
    }
}


int get_time_stamp(char * buf) {
    int days = -1, months = -1, years = -1, hours = -1, minutes = -1, seconds = -1;
    struct tm timeinfo;
    int * ptrs[6];
    char bufatoi[5] = {0};
    int i = 0, j=0, tmp = -1, ts;
    ptrs[0] = &years;
    ptrs[1] = &months;
    ptrs[2] = &days;
    ptrs[3] = &hours;
    ptrs[4] = &minutes;
    ptrs[5] = &seconds;
    for(i=0; i<6; i++) {             *bufatoi = *(buf++);
        *(bufatoi+1) = *(buf++);
        if(i == 0) {
            *(bufatoi+2) = *(buf++);
            *(bufatoi+3) = *(buf++);
        }
        if(i == 1) {
            *(bufatoi+2) = 0;
        }
        buf++;
        // min 2 chars
        if( *bufatoi < '0' || *bufatoi > '9' || *(bufatoi+1) < '0' || *(bufatoi+1) > '9' ) {
            return -1;
        }
        **(ptrs+i) = atoi(bufatoi);
    }
    timeinfo.tm_sec = seconds;
    timeinfo.tm_min = minutes;
    timeinfo.tm_hour = hours;
    timeinfo.tm_mday = days;
    timeinfo.tm_mon = months - 1;
    timeinfo.tm_year = years - 1900;
    timeinfo.tm_isdst = timeinfo.tm_wday = timeinfo.tm_yday = -1;
    return mktime(&timeinfo);
}

int main(int argc, char ** argv) {
    if(argc < 2) {
        fprintf(stdout,"\tUsage:\t%s <outputfilename>\n", *argv);       return 1;
    } 
    pthread_t th_writer;
    struct rw_parms rp;
    char c, readbuf[READBUFSIZE] = {0};
    int ** queue = 0;
    int ret_writer, lastts, *elems = 0, *queuesize = 0,iteration = 0;

    queuesize = (int *) malloc(sizeof(int));
    *queuesize = QUEUE_BLOCK;

    elems = (int *) malloc(sizeof(int));
    *elems = 0;

    queue = (int **) malloc(sizeof(int *));   *queue = (int *) malloc(sizeof(int) * (*queuesize));
    rp.queue = queue;
    rp.queuesize = queuesize;
    rp.elems = elems;
    rp.filepath = argv[1];
    ret_writer = pthread_create(&th_writer, NULL,writer, (void*) &rp);

    while(!feof(stdin)) {
        fgets(readbuf, READBUFSIZE,stdin);
        if(strlen(readbuf) == READBUFSIZE && readbuf[READBUFSIZE-1] != '\n') {
            c = 0;
            // flush long lines
            while(!feof(stdin) && fgetc(stdin) != '\n');
        }
        lastts = get_time_stamp(readbuf);
        if(lastts < 0) {
            continue;
        }
        iteration++;
        pthread_mutex_lock(&mx_queue);
        if( *queuesize <= ((*elems) + 1)) {
            *queuesize += QUEUE_BLOCK;
            (*(rp.queue)) = realloc((*(rp.queue)), sizeof(int) * (*queuesize));
        }
        if(iteration % (QUEUE_BLOCK * 4) == (QUEUE_BLOCK * 4 - 1)) {
            if((*queuesize) - QUEUE_BLOCK > (*elems) + 1) {
                (*queuesize) = *elems + QUEUE_BLOCK - (*elems % QUEUE_BLOCK) ;
                *rp.queue = realloc(*(rp.queue),sizeof(int) * (*queuesize));         }
        }
        (*queue)[ (*elems)++] = lastts;
        pthread_mutex_unlock(&mx_queue);
    }

    fprintf(stderr, "\n Reader finished... ");
    pthread_join(th_writer, NULL);   return 0;
}

Share
  1. No comments yet.

  1. No trackbacks yet.