/* Copyright (c) 1996, Ruslan R. Laishev (@RRL) */ #include "nntp.h" WorkerContext *WctxPtr [ NNTPMAXWORKER ]; pthread_mutex_t Wctxm; int pthread_sleep (int); /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerClient (WorkerContext *Wctxp) { long status; int (*nntp_fun) (WorkerContext *); int sz; NNTP_LOGT(Wctxp,LOGD,"Started."); sz = sprintf(Wctxp->bufp,"200 %s is ready (Posting Ok)\r\n",ID$IDver); if ( 0 > send(Wctxp->chan,Wctxp->bufp,sz, 0) ) { NNTP_WorkerKill (Wctxp); pthread_exit (NULL); } while (1) { if ( 0 > nntp_cmd_get(Wctxp->chan,Wctxp->bufp ,BUFPSZ) ) { NNTP_LOGT(Wctxp,LOGE,"'nntp_cmd_get':%s.",strerror(errno)); NNTP_WorkerKill (Wctxp); pthread_exit (NULL); } if ( nntp_fun = nntp_cmd_parse (Wctxp) ) { if ( 0 > nntp_fun (Wctxp) ) { NNTP_WorkerKill (Wctxp); pthread_exit (NULL); } } else { sz = sprintf(Wctxp->bufp,"500 UnRecognized command\r\n"); send(Wctxp->chan,Wctxp->bufp,sz,0); } } } /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerKill ( WorkerContext *Wctxp ) { pthread_mutex_lock(&Wctxm); NNTP_LOGT(Wctxp,LOGD,"Kill worker."); DBclose_stream (&Wctxp->Msgrab); DBclose_stream (&Wctxp->Grprab); if ( Wctxp->type == T_Suck ) DBclose_stream (&Wctxp->Suckrab); if ( Wctxp->type == T_Feed ) DBclose_stream (&Wctxp->Feedrab); nntp_sockshut(Wctxp->chan); NNTP_LOGT(Wctxp,LOGW,"End."); WctxPtr [ Wctxp->indx ] = NULL; free(Wctxp); pthread_mutex_unlock(&Wctxm); } /* *-------------------------------------------------------------------------------- */ void *NNTP_WorkerInit( int WorkerType, int chan ) { long status; WorkerContext *Wctxp; int indx; pthread_mutex_lock(&Wctxm); while(1) { Wctxp = NULL; for (indx = 0;indx < NNTPMAXWORKER;indx++) if ( NULL == WctxPtr [indx] ) break; if ( indx >= NNTPMAXWORKER ) break; if ( NULL == (Wctxp = calloc(1,sizeof (WorkerContext))) ) { NNTP_LOG(LOGE,"[WorkerInit]'calloc'."); break; } NNTP_LOG(LOGD,"[WorkerInit]'calloc' memory (%d b)-Ok.", sizeof(WorkerContext)); if (-1 == status) { NNTP_LOG(LOGE,"[WorkerInit]Init mutex."); free (Wctxp); Wctxp = NULL; break; } if ( MsgDBopen_stream (&Wctxp->Msgrab) ) { NNTP_LOG(LOGE,"[WorkerInit]open MsgStream."); free (Wctxp); Wctxp = NULL; break; } if ( GrpDBopen_stream (&Wctxp->Grprab) ) { NNTP_LOG(LOGE,"[WorkerInit]GrpDBopen_stream."); DBclose_stream(&Wctxp->Msgrab); free (Wctxp); Wctxp = NULL; break; } if ( WorkerType == T_Suck ) { if ( SuckDBopen_stream (&Wctxp->Suckrab) ) { NNTP_LOG(LOGE,"[WorkerInit]SuckDBopen_stream."); DBclose_stream(&Wctxp->Msgrab); DBclose_stream(&Wctxp->Grprab); free (Wctxp); Wctxp = NULL; break; } } if ( WorkerType == T_Feed ) { if ( FeedDBopen_stream (&Wctxp->Feedrab) ) { NNTP_LOG(LOGE,"[WorkerInit]FeedDBopen_stream."); DBclose_stream(&Wctxp->Msgrab); DBclose_stream(&Wctxp->Grprab); free (Wctxp); Wctxp = NULL; break; } } Wctxp->indx = indx; Wctxp->chan = chan; Wctxp->type = WorkerType; WctxPtr[indx] = Wctxp; break; } pthread_mutex_unlock(&Wctxm); NNTP_LOG(LOGD,"[WorkerInit][Th:%d,Ch:%d]-Ok.",indx,chan); return (void *) Wctxp; } /* *-------------------------------------------------------------------------------- */ int NNTP_InitBosses (void) { memset (WctxPtr,0,sizeof(WctxPtr)); return (pthread_mutex_init (&Wctxm,pthread_mutexattr_default)); } /* *-------------------------------------------------------------------------------- */ void *NNTP_ClientBoss (void) { long status; int chan; WorkerContext *Wctxp; int MainSocketHnd, SocketHnd; struct sockaddr_in MainSocket, SockAddr; struct hostent MainHostEnt, HostEnt, *pHostEnt; int len; char SRVBUSY[] = "\r\n500 Sorry, NNTP Server is busy, try later.\r\n"; struct timespec interval; if ( -1 == (MainSocketHnd = socket (AF_INET, SOCK_STREAM, 0)) ) { NNTP_LOG(LOGF,"[Client]'socket' %s.",strerror(errno)); pthread_exit (NULL); } status = 1; if ( -1 == setsockopt(MainSocketHnd,SOL_SOCKET,SO_REUSEADDR,(char *)&status, sizeof (status)) ) { NNTP_LOG(LOGF,"[Client]'setsockopt' %s.",strerror(errno)); pthread_exit (NULL); } pthread_lock_global_np (); if ( NULL == (pHostEnt = gethostbyname (nntp_conf.LocalHost)) ) { NNTP_LOG(LOGF,"[Client]'gethostbyname' %s.",strerror(errno)); pthread_unlock_global_np (); pthread_exit (NULL); } else NNTP_LOG(LOGI,"[Client]Local Host=%s",pHostEnt->h_name); MainHostEnt = *pHostEnt; pthread_unlock_global_np (); MainSocket.sin_family = AF_INET; MainSocket.sin_port = htons(nntp_conf.LocalPort); MainSocket.sin_addr.s_addr = INADDR_ANY; /* * Bind MainSocket to MainSocketHnd */ if ( -1 == bind (MainSocketHnd,(struct sockaddr *) &MainSocket, sizeof (MainSocket)) ) { NNTP_LOG(LOGF,"[Client]'bind' %s.",strerror(errno)); pthread_exit (NULL); } if ( -1 == listen (MainSocketHnd, 5) ) { NNTP_LOG(LOGF,"[Client]'listen' %s.",strerror(errno)); pthread_exit (NULL); } while (1) { if ( 0 > (chan = accept(MainSocketHnd,(struct sockaddr *)&MainSocket,&len)) ) { NNTP_LOG(LOGF,"[Client]'accept' %s.",strerror(errno)); continue; } NNTP_LOG(LOGD,"[Client]Incoming connection request,chan=%d",chan); if ( !(Wctxp = NNTP_WorkerInit(T_Clnt,chan)) ) { send(chan,SRVBUSY,sizeof(SRVBUSY) - 1,0); nntp_sockshut(chan); NNTP_LOG(LOGE,"[Client]No free Wctxp."); continue; } NNTP_LOG(LOGW,"[Client]Start Worker [Th:%d,Ch:%d].", Wctxp->indx,Wctxp->chan); status = pthread_create(&Wctxp->tid,pthread_attr_default, NNTP_WorkerClient,(pthread_addr_t) Wctxp); if (status == -1) { NNTP_LOG(LOGF,"[Client]pthread_create-%s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOG(LOGD,"[Client]Start Worker [Th:%d,Ch:%d]-Ok.", Wctxp->indx,Wctxp->chan); } } /* *-------------------------------------------------------------------------------- */ void *NNTP_SuckBoss (void) { WorkerContext *Wctxp; char *cp0,*cp1; char *SuckR = nntp_conf.Suck; long IPn; char IPs [32]; if ( !(*nntp_conf.Suck) ) { NNTP_LOG(LOGI,"[Suck]No Suck defined."); pthread_exit (NULL); } while (1) { if ( !(*SuckR) ) { NNTP_LOG(LOGW,"[Suck]All hosts is polled."); SuckR = nntp_conf.Suck; pthread_sleep (nntp_conf.SuckInterval); continue; } if ( !(Wctxp = NNTP_WorkerInit(T_Suck,0)) ) { NNTP_LOG(LOGF,"[Suck]No free Wctx."); pthread_exit (NULL); } if ( NULL != (cp0 = strchr (SuckR,FIELDSEP)) ) { strncpy (Wctxp->bufp,SuckR,cp0-SuckR); Wctxp->bufp [cp0-SuckR ] = 0; SuckR = cp0+1; } else { strcpy (Wctxp->bufp,SuckR); SuckR += strlen(SuckR); } cp0 = strchr(Wctxp->bufp,':'); *(cp0) = 0; cp0++; cp1 = strchr(cp0,':'); *(cp1) = 0; cp1++; NNTP_LOGT(Wctxp,LOGD,"Try connect to <%s:%s>.",Wctxp->bufp,cp1); pthread_lock_global_np (); Wctxp->chan = net_connect (Wctxp,Wctxp->bufp,atoi(cp1),&IPn); pthread_unlock_global_np (); if ( 0 > Wctxp->chan ) { NNTP_LOGT(Wctxp,LOGE,"'net_connect' %s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOGT(Wctxp,LOGI,"Suck from.....<%s(%s):%s>.",Wctxp->bufp, ip_to_a(IPn,IPs),cp1); NNTP_LOGT(Wctxp,LOGI,"Group list....<%s>.",cp0); nntp_suck (Wctxp,cp0,IPs); NNTP_WorkerKill (Wctxp); } /* end while (1) */ } /* *-------------------------------------------------------------------------------- */ void *NNTP_FeedBoss (void) { WorkerContext *Wctxp; char *cp0,*cp1,*cp2,*cp3; char *FeedR = nntp_conf.Feed; long IPn; char IPs [32]; if ( !(*nntp_conf.Feed) ) { NNTP_LOG(LOGI,"[Feed]No Feed defined."); pthread_exit (NULL); } while (1) { if ( !(*FeedR) ) { NNTP_LOG(LOGW,"[Feed]All hosts is polled."); FeedR = nntp_conf.Feed; pthread_sleep (nntp_conf.FeedInterval); continue; } if ( !(Wctxp = NNTP_WorkerInit(T_Feed,0)) ) { NNTP_LOG(LOGF,"[Feed]No free Wctx."); pthread_exit (NULL); } if ( NULL != (cp0 = strchr (FeedR,FIELDSEP)) ) { strncpy (Wctxp->bufp,FeedR,cp0-FeedR); Wctxp->bufp [cp0-FeedR ] = 0; FeedR = cp0+1; } else { strcpy (Wctxp->bufp,FeedR); FeedR += strlen(FeedR); } cp0 = strchr(Wctxp->bufp,':'); *(cp0) = 0; cp0++; cp1 = strchr(cp0,':'); *(cp1) = 0; cp1++; cp2 = strchr(cp1,':'); *(cp2) = 0; cp2++; cp3 = strchr(cp2,':'); *(cp3) = 0; cp3++; NNTP_LOGT(Wctxp,LOGD,"Try connect to '%s':%s.",Wctxp->bufp,cp2); pthread_lock_global_np (); Wctxp->chan = net_connect (Wctxp,Wctxp->bufp,atoi(cp2),&IPn); pthread_unlock_global_np (); if ( 0 > Wctxp->chan ) { NNTP_LOGT(Wctxp,LOGE,"'net_connect' %s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOGT(Wctxp,LOGI,"Feed to.......<%s(%s):%s>.",Wctxp->bufp, ip_to_a (IPn,IPs),cp2); NNTP_LOGT(Wctxp,LOGI,"Group list....<%s>.",cp0); NNTP_LOGT(Wctxp,LOGI,"Posting Type..<%s>.",cp1); NNTP_LOGT(Wctxp,LOGI,"Exclude Paths.<%s>.",cp3); nntp_feed (Wctxp,cp0,IPs,(*cp1=='p'?1:0),cp3); NNTP_WorkerKill (Wctxp); } /* end while (1) */ } /* *-------------------------------------------------------------------------------- */ void *NNTP_ExpireBoss (void) { WorkerContext *Wctxp; if ( !nntp_conf.ExpireInterval ) { NNTP_LOG(LOGI,"[Expr]No ExpireInterval defined."); pthread_exit (NULL); } while (1) { pthread_sleep (nntp_conf.ExpireInterval); if ( !(Wctxp = NNTP_WorkerInit(T_Expr,0)) ) { NNTP_LOG(LOGF,"[Expr]No free Wctx."); continue; } if ( nntp_conf.ExpireInterval ) { nntp_expire (Wctxp); } NNTP_WorkerKill (Wctxp); } } /* *-------------------------------------------------------------------------------- */ int pthread_sleep (int sec) { long status; pthread_mutex_t mtx; pthread_cond_t cnd; struct timespec delta,abs; delta.tv_sec = sec; delta.tv_nsec= 0; if ( status = pthread_mutex_init (&mtx,pthread_mutexattr_default) ) return status; if ( status = pthread_cond_init (&cnd,pthread_condattr_default) ) return status; if ( status = pthread_get_expiration_np ( &delta , &abs ) ) return status; return ( pthread_cond_timedwait( &cnd , &mtx , &abs ) ); }