/sbase

Automatically exported from code.google.com/p/sbase

Primary LanguageC

libsbase是一个对服务, 线程, 连接等与业务逻辑无关的封装, 可以用于服务端和客户端的网络应用开发. sbase包括服务管理(service), 线程管理(procthread), 连接管理(connection), 采用libevbase(类似libevent)的网络IO读写通知模式, 对业务逻辑数据的处理采用消息队列(message_queue)的方式运行, 每个连接(connection)可以定义独立的会话特性(SESSION), 附着与连接上的业务逻辑采用回调的方式执行, 具体的回调函数封装于session内. typedef struct _SESSION { /* packet */ int timeout; //超时设置 int childid;//代理子连接ID void *child;//代理子连接指针 int parentid;//代理连接父连接ID void *parent;//代理连接父连接指针 int packet_type; int packet_length; char *packet_delimiter; int packet_delimiter_length; int buffer_size;

/* methods */
/* 当连接发生读写错误或者对端断开的时候 用于错误处理 */
int (*error_handler)(struct _CONN *, CB_DATA *packet, CB_DATA *cache, CB_DATA *chunk);
/* 从连接的数据buffer区内读取数据头 */
int (*packet_reader)(struct _CONN *, CB_DATA *buffer);
/* 数据头被读取后的处理 */
int (*packet_handler)(struct _CONN *, CB_DATA *packet);
/* 数据头之外的数据处理 */
int (*data_handler)(struct _CONN *, CB_DATA *packet, CB_DATA *cache, CB_DATA *chunk);
/* 接受完的文件处理 */
int (*file_handler)(struct _CONN *, CB_DATA *packet, CB_DATA *cache);
/* 带外数据处理 */
int (*oob_handler)(struct _CONN *, CB_DATA *oob);
/* 超时处理(需要通过conn->set_timeout()) */
int (*timeout_handler)(struct _CONN *, CB_DATA *packet, CB_DATA *cache, CB_DATA *chunk);
/* 事务处理(需要通过service->new_transaction()注册事务 )*/
int (*transaction_handler)(struct _CONN *, int tid);

}SESSION; sbase总体结构 sbase->services service->procthreads procthread->connections. sbase 可以同时运行多个service (sbase->add_service()). sbase主线程负责管理所有service下的监听端口的文件描数字, 如果sbase通过libevbase发现有fd可读会通知service下的成员负责accept操作和添加新连接操作. typedef struct _SBASE { /* base option / / 多进程模式下进程数 / int nchilds; / 连接数限制 / int connections_limit; / while 循环usleep的微妙数 / int usec_sleep; / 运行状态 1 为运行 0 为停止 / int running_status; / libevbse */ EVBASE evbase; / services[] 列表 */ struct _SERVICE *services; / 当前运行service个数 / int running_services; / 心跳计数 */ long long nheartbeat;

/* timer && logger */
void *logger;
void *timer;

/* evtimer 超时检查器 */
void *evtimer;

/* message queue for proc mode 多进程模式下的消息队列 */
void *message_queue;

int  (*set_log)(struct _SBASE *, char *);
int  (*set_evlog)(struct _SBASE *, char *);

/* 添加服务 service需要通过service_init() 初始化以后 */
int  (*add_service)(struct _SBASE *, struct _SERVICE *);    
/* 运行sbase下的所有服务 */
int  (*running)(struct _SBASE *, int time_usec);
/* 停止sbase下的所有服务 */
void (*stop)(struct _SBASE *);
void (*clean)(struct _SBASE **);

}SBASE; service 负责管理服务下的所有线程以及连接, service本身不参与主循环的工作, service只是接受sbase的调用以及连接(connection)的调用, 与service相关的业务逻辑都是运行在service->procthreads线程之上, 另外任务(task)运行在service->daemons线程之上. service之上的所有连接根据文件描数字(fd)散列到procthreads之上. typedef struct _SERVICE { /* global / int lock; / service下所有线程运行usleep 微妙数*/ int usec_sleep; /* sbase指针 */ SBASE sbase; / service下所有线程访问service公共资源的锁 */ void *mutex;

/* heartbeat */
/* running heartbeat_handler when looped hearbeat_interval times 心跳间隔 */
int heartbeat_interval;
void *heartbeat_arg;
/* 心跳回调 */
CALLBACK *heartbeat_handler;
/* 设置服务心跳心跳控制由sbase->evtimer完成 */
void (*set_heartbeat)(struct _SERVICE *, int interval, CALLBACK *handler, void *arg);
/* 心跳操作激活 会调用service->hearbeat_handler()  */
void (*active_heartbeat)(struct _SERVICE *);

/* working mode 运行模式*/
int working_mode;
/* 多进程模式下使用 */
struct _PROCTHREAD *daemon;
/* 线程数 */
int nprocthreads;
/* 连接所在线程池 */
struct _PROCTHREAD **procthreads;
/* 后台线程个数 */
int ndaemons;
/* 用于task的后台线程池, 可选择是否用, 如果ndaemons为0表示不用 */
struct _PROCTHREAD **daemons;


/* socket and inet addr option  网络连接或本地监听相关的参数 */
int family;
int sock_type;
struct  sockaddr_in sa;
char *ip;
int port;
int fd;
int backlog;

/* service option 服务运行相关 */
int service_type;
char *service_name;
/* 设置服务包括本地监听设置, 初始化远程连接参数 */
int  (*set)(struct _SERVICE *service);
/* 运行服务, 初始化线程 */
int  (*run)(struct _SERVICE *service);
/* 停止服务 */
void (*stop)(struct _SERVICE *service);

/* event option libevbase 相关 */
EVBASE *evbase;
EVENT *event;

/* message queue for proc mode 多进程模式下的消息队列, 等同sbase->message_queue */
void *message_queue;

/* chunks queue */
/* chunks queue 用于回收存放使用过的chunk片段循环使用, 减少内存分配,提高总体性能。*/
void *chunks_queue;
struct _CHUNK *(*popchunk)(struct _SERVICE *service);
int (*pushchunk)(struct _SERVICE *service, struct _CHUNK *cp);

/* connections option 连接相关参数 */
/* 连接数限制 */
int connections_limit;
/* 当前connecions位置最大值 */
int index_max;
/* 当前运行连接总数 */
int running_connections;
/* service下的所有连接列表 用于统一管理连接 */
struct _CONN **connections;
/* C_SERVICE ONLY 客户端服务操作方法 */
/* 客户端连接数 */
int client_connections_limit;
/* 发起新连接 */
struct _CONN *(*newconn)(struct _SERVICE *service, int inet_family, int sock_type,
        char *ip, int port, SESSION *session);
/* 添加新连接 */
struct _CONN *(*addconn)(struct _SERVICE *service, int sock_type, int fd,
        char *remote_ip, int remote_port, char *local_ip, int local_port, SESSION *);
/* 获取空闲连接 */
struct _CONN *(*getconn)(struct _SERVICE *service);
/* 添加正常正确的连接到service->connecions */
int     (*pushconn)(struct _SERVICE *service, struct _CONN *conn);
/* 从service->connections 删除指定连接 */
int     (*popconn)(struct _SERVICE *service, struct _CONN *conn);
/* 代理 */
struct _CONN *(*newproxy)(struct _SERVICE *service, struct _CONN * parent, int inet_family,
        int sock_type, char *ip, int port, SESSION *session);
   struct _CONN *(*findconn)(struct _SERVICE *service, int index);
/* evtimer 超时检查器 */
void *evtimer;
int evid;

/* timer and logger */
void *timer;
void *logger;
int  is_inside_logger;
int (*set_log)(struct _SERVICE *service, char *logfile);

/* transaction and task */
int ntask;
/* 运行新任务 任务运行散列通过ntask散列到service->daemons[]之上 */
int (*newtask)(struct _SERVICE *, CALLBACK *, void *arg);
/* 在指定连接上执行事务 通过conn->index定位service->connections[] 上 */
int (*newtransaction)(struct _SERVICE *, struct _CONN *, int tid);

/* service default session option 设置服务默认业务逻辑的会话参数 */
SESSION session;
int (*set_session)(struct _SERVICE *, SESSION *);

/* clean */
void (*clean)(struct _SERVICE **pservice);

}SERVICE; procthread负责管理线程之上的所有连接, 包括通过libevbase管理连接的读写通知, 管理新连接添加, 数据头读, 数据头处理, 数据处理, 事务处理, 错误处理, 超时处理等消息. 新连接操作流程:

  1. sbase->evbase->loop() 发现某个连接;

  2. service->event_handler() 完成accept, conn_init() 初始化连接, 然后添加NEW_CONNECTION消息到i = fd%service->nprocthreads, service->procthreadsi->message_queue 对应的消息队列;

  3. procthread 通过消息队列循环获得NEW_CONNECTION消息,然后procthread->add_connection()加入新连接, 添加连接fd的读写事件到procthread->evbase, 同时service->pushconn() 注册连接到service->connections;

  4. procthread->evbase->loop() 检查连接状态, 同时通过conn->event_handler激活读写;

  5. conn->read_handler()读取数据到conn->buffer, 同时调用conn->packet_reader()读取数据头;

  6. 数据头读取后添加MESSAGE_PACKET_HANDLE消息, procthread通过消息队列循环调用conn->packet_handler()处理数据头;

  7. conn->recv_chunk()或者conn->recv_file()可以接收指定长度的数据块或者文件;

  8. 数据接收完成以后添加新的数据处理消息MESSAGE_DATA_HANDLE, procthread通过消息队列循环调用conn->data_handler();

  9. 数据处理完毕, 如果需要发送数据的话可以通过conn->push_chunk()发送数据块或者通过conn->push_file()发送文件, 同时添加fd写事件到procthread->evbase, 添加数据参数到conn->send_queue;

  10. procthread->evbase->loop()通知连接写数据, 每次根据连接buffer最大写, 然后等待下一次loop()直到完成数据发送, 连接的初始化和一个会话到此完成. 客户端连接建立:

  11. 调用service->newconn(), 此处的采用非阻塞连接, 连接并非马上建立, 标记conn->status = CONN_STATUS_READY, 然后进入连接添加流程;

  12. 进入之前服务连接流程 4 的时候, 如果碰到conn->status == CONN_STATUS_READY, 检查socket是否有错, 如果有错当做错误处理, 关闭连接.

  13. 其他处理类似. /* procthread / typedef struct _PROCTHREAD { / global */ int lock; SERVICE *service; int running_status; int usec_sleep; int index; long threadid;

    /* message queue */ void *message_queue;

    /* evbase */ EVBASE *evbase;

    /* connection */ struct _CONN *connections; / 添加新连接消息 */ int (*addconn)(struct _PROCTHREAD *procthread, struct _CONN conn); / 新连接添加到procthread */ int (*add_connection)(struct _PROCTHREAD *procthread, struct _CONN conn); / 结束连接 */ int (*terminate_connection)(struct _PROCTHREAD *procthread, struct _CONN *conn);

    /* logger */ void *logger;

    /* task and transaction 处理新的事务 和 任务 */ int (*newtask)(struct _PROCTHREAD *, CALLBACK *, void *arg); int (*newtransaction)(struct _PROCTHREAD *, struct _CONN *, int tid);

    /* heartbeat */ void (*active_heartbeat)(struct _PROCTHREAD *, CALLBACK *handler, void *arg); void (*state)(struct _PROCTHREAD *, CALLBACK *handler, void *arg);

    /* normal */ void (*run)(void *arg); void (*stop)(struct _PROCTHREAD *procthread); void (*terminate)(struct _PROCTHREAD *procthread); void (*clean)(struct _PROCTHREAD **procthread); }PROCTHREAD;

typedef struct _CONN { /* global */ int index; void *parent;

/* conenction */
int sock_type;
int fd;
char remote_ip[SB_IP_MAX];
int remote_port;
char local_ip[SB_IP_MAX];
int  local_port;
/* 初始化设置 */
int (*set)(struct _CONN *);
int (*close)(struct _CONN *);
int (*over)(struct _CONN *);
int (*terminate)(struct _CONN *);

/* connection bytes stats */
long long   recv_oob_total;
long long   sent_oob_total;
long long   recv_data_total;
long long   sent_data_total;

/* evbase */
EVBASE *evbase;
EVENT *event;

/* evtimer */
void *evtimer;
int evid;

/* buffer */
void *buffer;
void *packet;
void *cache;
void *chunk;
void *oob;

/* logger and timer */
void *logger;
void *timer;

/* queue */
void *send_queue;



/* message queue */
void *message_queue;
/* client transaction state */
int status;
int istate;
int c_state;
int c_id;
int (*start_cstate)(struct _CONN *);
int (*over_cstate)(struct _CONN *);

/* transaction */
int s_id;
int s_state;

/* event state */
int evstate;

#define EVSTATE_INIT 0 #define EVSTATE_WAIT 1 int (*wait_evstate)(struct _CONN *); int (*over_evstate)(struct _CONN *);

/* timeout */
int timeout;
/*  设置读写超时时间 */
int (*set_timeout)(struct _CONN *, int timeout_usec);
/*  超时处理 */
int (*timeout_handler)(struct _CONN *);
/* message */
int (*push_message)(struct _CONN *, int message_id);

/* session */
/* 从网络IO读数据 */
int (*read_handler)(struct _CONN *);
/* 写数据到网络IO */
int (*write_handler)(struct _CONN *);
/* 读数据头 */
int (*packet_reader)(struct _CONN *);
/* 数据头处理 */
int (*packet_handler)(struct _CONN *);
/* 带外数据处理 */
int (*oob_handler)(struct _CONN *);
/*  数据处理 */
int (*data_handler)(struct _CONN *);
/* 事务处理 */
int (*transaction_handler)(struct _CONN *, int );
/* 缓存少量数据不适合存放大量数据 */
int (*save_cache)(struct _CONN *, void *data, int size);
int (*bind_proxy)(struct _CONN *, struct _CONN *);
int (*proxy_handler)(struct _CONN *);
int (*close_proxy)(struct _CONN *);
int (*push_exchange)(struct _CONN *, void *data, int size);


/* chunk */
/* 用于从buffer或者网络读取数据到chunk或者文件 */
int (*chunk_reader)(struct _CONN *);
/* 准备接收数据块 */
int (*recv_chunk)(struct _CONN *, int size);
/* 准备接收文件 */
int (*recv_file)(struct _CONN *, char *file, long long offset, long long size);
/* 添加数据块到发送队列 */
int (*push_chunk)(struct _CONN *, void *data, int size);
/* 添加文件到发送队列 */
int (*push_file)(struct _CONN *, char *file, long long offset, long long size);

/* session option and callback  设置会话参数 */
SESSION session;
int (*set_session)(struct _CONN *, SESSION *session);

/* normal */
void (*clean)(struct _CONN **pconn);

}CONN; 编译安装: homepage : http://sbase.googlecode.com install : ./configure --enable-debug make

test & demo : ./src/lechod -c doc/rc.lechod.ini

required libevbase 0.0.14 from http://sbase.googlecode.com/files/libevbase-0.0.15.tar.gz

NOTE: if you want to build rpm , look at doc/libsbase.spec