/* Copyright (c) 1994-7 Ruslan R. Laishev. All Right reserved !!! */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include int pthread_sleep (int); #define TIMEOUT 600 /* 10 minute's */ char imbx_lnm [ ] = "MLA1$MFSERV"; char ombx_lnm [ ] = "MFSERV$MLA1"; $DESCRIPTOR(imbx_dsc,imbx_lnm); $DESCRIPTOR(ombx_dsc,ombx_lnm); typedef struct { int net_chan; /* network socket number*/ int mbx_chan; /* mailbox chanel number*/ char *bf; /* pointer to IO buffer */ int sz; /* size of IO buffer */ int ListenSock; /* value of listen sock */ int ListenPort; /* value of TCP port */ char *mbx_dsc; struct sockaddr_in s_a_in; time_t expire; int indx; /* 0 NET->MBX->MBX-> or 1 MBX->NET->NET->MBX */ pthread_mutex_t mutex; } WorkerContext; static WorkerContext Wctx [2 ] = { {0,0,NULL,0,0,1075,&imbx_dsc}, {0,0,NULL,0,0,1076,&ombx_dsc} }; typedef struct { short int cnd, cnt; int dev; } io_status_block; /*-----------------------------------------------------------*/ void NET_chan_shut (int sock) { shutdown (socket,2); close (socket); } /*-----------------------------------------------------------*/ int NET_init (int Port,struct sockaddr_in *s_a_in) { char Host [ 256 ]; struct hostent hostents; struct hostent *hostentp; int Sock; if ( 0 > gethostname (Host,sizeof(Host)) ) return -1; if ( 0 > ( Sock = socket (AF_INET, SOCK_STREAM, 0)) ) return -1; if ( NULL == (hostentp = gethostbyname (Host)) ) { NET_chan_shut(Sock); return -1; } hostents = *hostentp; s_a_in->sin_family = hostents.h_addrtype; s_a_in->sin_port = htons(Port); s_a_in->sin_addr = * ((struct in_addr *) hostents.h_addr); if ( 0 > bind (Sock,(struct sockaddr *) s_a_in, sizeof (struct sockaddr_in)) ) { NET_chan_shut(Sock); return -1; } return Sock; } /*-----------------------------------------------------------*/ int NET_chan_open (int SockHnd, struct sockaddr_in *s_a_in) { int namelength; int sh; if ( listen (SockHnd, 1) ) { NET_chan_shut(SockHnd); return -1; } namelength = sizeof (struct sockaddr_in); if ( 0 > (sh = accept (SockHnd, (struct sockaddr *) s_a_in, &namelength)) ) { NET_chan_shut(sh); return -1; } return sh; } /*-----------------------------------------------------------*/ int NET_chan_recv (int chan, char *bf, int sz) { int i; for (i = 0; i < sz;i++) { if ( 0 > recv(chan,bf+i,1, 0) ) break; if ( '\t' == (*(bf+i)) ) break; } return i; } /*-----------------------------------------------------------*/ int NET_chan_send (int chan, char *bf, int sz) { if ( 0 > send(chan,bf,sz, 0) ) return -1; return sz; } /*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/ int MBX_chan_open (char *dsc) { int chan; if (SS$_NORMAL != sys$crembx(1,&chan,0,0,0,0,dsc,0) ) { perror("sys$crembx"); return -1; } sys$delmbx(chan); return chan; } /*-----------------------------------------------------------*/ int MBX_chan_send (int chan, char *bf, int sz) { io_status_block iosb; long status; status = sys$qio (0,chan, IO$_WRITEVBLK|IO$M_READERCHECK|IO$M_NORSWAIT, &iosb, 0,0, bf,sz, 0,0,0,0); if ( iosb.cnd =! SS$_NORMAL ) { perror("sys$qiow-mbx-send"); return -1; } return iosb.cnt; } /*-----------------------------------------------------------*/ int MBX_chan_recv (int chan, char *bf, int sz) { io_status_block iosb; long status; status = sys$qiow (0,chan, IO$_READVBLK|IO$M_WRITERCHECK, &iosb, 0,0, bf,sz, 0,0,0,0); if ( iosb.cnd != SS$_NORMAL ) { perror("sys$qiow-mbx-recv"); return -1; } return iosb.cnt; } /*-----------------------------------------------------------*/ int MBX_init (char *log_name) { union buff { char trnl_name [64]; int info; } buff; int status; short ret_l; struct item_list { unsigned short buff_l, item_c; char *buff_p; short *buff_l_p; unsigned term; } item_list = { sizeof(buff.trnl_name), LNM$_STRING, buff.trnl_name,&ret_l,0}; char s0 [ ] = "LNM$SYSTEM_TABLE"; char s2 [ ] = "MBAXXXXXXX:"; $DESCRIPTOR(tbl_name,s0); $DESCRIPTOR(mbx_name,s2); if (SS$_NORMAL != sys$trnlnm (0,&tbl_name,log_name,0,&item_list) ) return -1; /* Get mailbox_device default_buffer_size */ mbx_name.dsc$w_length = ret_l; memcpy(mbx_name.dsc$a_pointer,buff.trnl_name,ret_l); item_list.item_c = DVI$_DEVBUFSIZ; status = sys$getdvi (0,0,&mbx_name,&item_list,0,0,0,0); if (status == SS$_NORMAL) return buff.info; return -1; } /*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/ void WorkerStop (WorkerContext *Wctxp) { pthread_mutex_lock(&Wctxp->mutex); if ( Wctxp->bf) free ( Wctxp->bf ); Wctxp->bf = NULL; if ( Wctxp->net_chan ) { NET_chan_shut (Wctxp->net_chan); Wctxp->net_chan = 0; } if ( Wctxp->ListenSock ) { NET_chan_shut (Wctxp->ListenSock); Wctxp->ListenSock = 0; } pthread_mutex_unlock(&Wctxp->mutex); } /*-----------------------------------------------------------*/ void WorkerStart (WorkerContext *Wctxp) { int sz; printf("\n[Tid:%02.2d]Started.",Wctxp->indx); while (1) { pthread_sleep (TIMEOUT/300); if ( 0 > (Wctxp->sz = MBX_init (Wctxp->mbx_dsc)) ) { perror("MBX_init"); continue; } if ( 0 > (Wctxp->mbx_chan = MBX_chan_open (Wctxp->mbx_dsc)) ) { perror("MBX_open"); continue; } if ( 0 < (Wctxp->ListenSock = NET_init (Wctxp->ListenPort, &Wctxp->s_a_in)) ) break; perror("NET_init"); } printf("\n[Tid:%02.2d]Wait incomming connection request.",Wctxp->indx); if (0 > (Wctxp->net_chan = NET_chan_open (Wctxp->ListenSock, &Wctxp->s_a_in)) ) { perror("NET_open"); WorkerStop (Wctxp); pthread_exit ((pthread_addr_t) 0); } printf("\n[Tid:%02.2d]Incomming connection (net_chan=%d,mbx_chan=%d).", Wctxp->indx,Wctxp->net_chan,Wctxp->mbx_chan); if ( NULL == (Wctxp->bf = malloc (Wctxp->sz)) ) { perror("malloc"); WorkerStop (Wctxp); pthread_exit ((pthread_addr_t) 0); } /* 0 NET->MBX->MBX-> or 1 MBX->NET->NET->MBX */ while (1) { if ( !Wctxp->indx ) { printf("\n[Tid:%02.2d]Begin - net->mbx->mbx->net.", Wctxp->indx); pthread_mutex_lock(&Wctxp->mutex); time(&Wctxp->expire); Wctxp->expire += 10*60; pthread_mutex_unlock(&Wctxp->mutex); if (0 > (sz = NET_chan_recv (Wctxp->net_chan, Wctxp->bf, Wctxp->sz)) ) break; printf("\n[Tid:%02.2d]net_recv %d byte", Wctxp->indx,sz); if (0 > MBX_chan_send (Wctxp->mbx_chan, Wctxp->bf,sz) ) break; printf("\n[Tid:%02.2d]mbx_send %d byte", Wctxp->indx,sz); if (0 > (sz = MBX_chan_recv (Wctxp->mbx_chan, Wctxp->bf, Wctxp->sz)) ) break; printf("\n[Tid:%02.2d]mbx_recv %d byte", Wctxp->indx,sz); if (0 > NET_chan_send (Wctxp->net_chan, Wctxp->bf,sz) ) break; printf("\n[Tid:%02.2d]net_send %d byte", Wctxp->indx,sz); } else { printf("\n[Tid:%02.2d]Begin - mbx->net->net->mbx.", Wctxp->indx); if (0 > (sz = MBX_chan_recv (Wctxp->mbx_chan, Wctxp->bf, Wctxp->sz)) ) break; printf("\n[Tid:%02.2d]mbx_recv %d byte", Wctxp->indx,sz); if (0 > NET_chan_send (Wctxp->net_chan, Wctxp->bf,sz) ) break; printf("\n[Tid:%02.2d]net_send %d byte", Wctxp->indx,sz); if (0 > (sz = NET_chan_recv (Wctxp->net_chan, Wctxp->bf, Wctxp->sz)) ) break; printf("\n[Tid:%02.2d]net_recv %d byte", Wctxp->indx,sz); if (0 > MBX_chan_send (Wctxp->mbx_chan, Wctxp->bf,sz) ) break; printf("\n[Tid:%02.2d]mbx_send %d byte", Wctxp->indx,sz); } } printf("\n[Tid:%02.2d]End.",Wctxp->indx); WorkerStop (Wctxp); pthread_exit ((pthread_addr_t) 0); } /*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/ void main (void) { pthread_t t_id [2]; int t_ind; time_t t0,t_time; printf("DECThread's ST400/PAI Message Exchange Server v1.0"); printf("Copyright (c) 1994-7 Ruslan R. Laishev. All Right reserved !!!"); while (1) { for (t_ind = 0; t_ind < 2;t_ind++) { puts("Start Thread"); if ( -1 == pthread_mutex_init (&Wctx[t_ind].mutex, pthread_mutexattr_default) ) { perror("pthread_mutex_init"); sys$exit(errno); } Wctx[t_ind].indx = t_ind; if ( -1 == pthread_create(&t_id[t_ind],pthread_attr_default, WorkerStart, (pthread_addr_t) &Wctx[t_ind]) ) { perror("pthread_create"); sys$exit(errno); } } for (t_ind = 0; ;t_ind++,t_ind ^= 1) { pthread_mutex_lock(&Wctx[t_ind].mutex); time(&t0); t_time = Wctx[t_ind].expire; if ( (t0 > t_time) && t_time ) { puts("Restart Thread"); pthread_mutex_lock(&Wctx[(t_ind+1) ^ 1].mutex); WorkerStop (&Wctx[0]); WorkerStop (&Wctx[1]); pthread_cancel (t_id[0]); pthread_cancel (t_id[1]); pthread_mutex_unlock(&Wctx[(t_ind+1) ^ 1].mutex); break; } pthread_mutex_unlock(&Wctx[t_ind].mutex); pthread_sleep (TIMEOUT/300); } } } /*-----------------------------------------------------------*/ int pthread_sleep (int sec) { long status; pthread_mutex_t mtx; pthread_cond_t cnd; struct timespec delta,abs; delta.tv_sec = sec; delta.tv_nsec= 0; if ( status = pthread_mutex_init (&mtx,pthread_mutexattr_default) ) return status; if ( status = pthread_cond_init (&cnd ,pthread_condattr_default) ) return status; if ( status = pthread_get_expiration_np ( &delta , &abs ) ) return status; return ( pthread_cond_timedwait( &cnd , &mtx , &abs ) ); }