See More

/* This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . * */ #include #include #include #include #include #include #include #include using namespace std; using namespace CP; using namespace cppsp; using namespace RGC; #define rmb() /**/ #define wmb() /**/ #define CPPSP_SENDFILE_MIN_SIZE (1024*1024) #define CPPSP_SENDFILE_BUFSIZE (1024*16) namespace cppspServer { #define CACHELINE_SIZE 64 //currently not used template class RingBuffer { public: union { struct { T* items; int length; }; char padding1[CACHELINE_SIZE]; }; union { int rpos; char padding2[CACHELINE_SIZE]; }; union { int wpos; char padding3[CACHELINE_SIZE]; }; RingBuffer(int length): length(length),rpos(0), wpos(0) { items=new T[length]; } inline int __getlength(int i1, int i2, int wrap) { return (i2 < i1 ? i2 + wrap : i2) - i1; } inline bool canEnqueue() { return __getlength(rpos, wpos, (length*2)) < length; } inline bool canDequeue() { return __getlength(rpos, wpos, (length*2)) > 0; } T* beginEnqueue() { if(canEnqueue()) return items+(wpos%length); else return NULL; } void endEnqueue() { wmb(); wpos=(wpos+1)%(length*2); } T* beginDequeue() { if(!canDequeue()) return NULL; rmb(); return items+(rpos%length); } void endDequeue() { rpos=(rpos+1)%(length*2); } }; template class ObjectPool: public RGC::Object { public: T** items; int size; int length; ObjectPool(int size):size(size),length(0) { items=new T*[size]; } ~ObjectPool() { for(int i=0;i=size) delete obj; else { items[length++]=obj; } } }; class Host: public cppsp::DefaultHost { public: cppsp::Server server; Timer t; ObjectPool _responsePool; int _lastRequests=0; int timerState=0; void timerCB(int i) { if(!updateTime() && timerState==1) { disableTimer(); return; } if(timerState==2 && performanceCounters.totalRequestsReceived<=_lastRequests) { slowTimer(); } _lastRequests=performanceCounters.totalRequestsReceived; } Host(Poll* p, string root): _responsePool(128) { this->poll=p; defaultServer=&server; addServer(&server); server.root=root; updateTime(); t.setCallback({&Host::timerCB,this}); p->add(t); } void loadDefaultMimeDB() { File f("/usr/share/mime/globs",O_RDONLY); f.setBlocking(true); StreamReader sr(f); mgr->loadMimeDB(sr); } void enableTimer() { if(timerState==0) printf("enabling timer\n"); t.setInterval(timerShortInterval*1000); timerState=2; updateTime(true); //true indicates to inhibit cache cleaning } void slowTimer() { t.setInterval(timerLongInterval*1000); timerState=1; } void disableTimer() { printf("disabling timer\n"); t.setInterval(0); timerState=0; } inline void _requestReceived() { if(unlikely(timerState<2)) enableTimer(); performanceCounters.totalRequestsReceived++; } AsyncValue routeStaticRequest(String path) override; AsyncValue routeDynamicRequest(String path) override; }; typedef Host Server; class Request:public cppsp::CPollRequest { public: Request(CP::Socket& s, CP::StringPool* sp) : CPollRequest(s, sp) { } void* _handler; }; //handles a single connection //just instantiate-and-forget; it will self-destruct when connection is closed struct handler:public RGC::Object { Host& thr; CP::Poll& p; Socket& s; StringPool sp; Request req; cppsp::Server* server; cppsp::Response* resp; //Page* p; //MemoryStream ms; uint8_t* buf; union { iovec iov[2]; int64_t _sendFileOffset; }; staticPage* _staticPage; bool readLoopRunning; bool shouldContinueReading; bool keepAlive; handler(Host& thr,CP::Poll& poll,Socket& s):thr(thr), p(poll),s(s),sp(2048),req(this->s,&sp) { //printf("handler()\n"); req._handler=this; poll.add(this->s); s.retain(); readLoop(); } void readLoop() { readLoopRunning=true; shouldContinueReading=true; while(shouldContinueReading && req.readRequest({&handler::readCB, this})) readCB(true); readLoopRunning=false; } void readCB(bool success) { shouldContinueReading=false; if(unlikely(!success)) { destruct(); return; } //if((sp=thr._stringPoolPool.tryGet())==nullptr) sp=new StringPool(); if((resp=thr._responsePool.tryGet())) resp->init(this->s,&sp); else resp=new Response(this->s,&sp); thr._requestReceived(); auto it=req.headers.find("connection"); if(it!=req.headers.end() && (*it).value=="close")keepAlive=false; else keepAlive=true; resp->keepAlive=keepAlive; //perform vhost routing server=thr.preRouteRequest(req); server->performanceCounters.totalRequestsReceived++; req.server=server; try { server->handleRequest(req,*resp,{&handler::finalize,this}); } catch(exception& ex) { server->handleError(req,*resp,ex,{&handler::finalize,this}); } } static inline int itoa64(int64_t i, char* b) { static char const digit[] = "0123456789"; char* p = b; int l; p += (l=((i==0?0:int(log10(i))) + 1)); *p = '\0'; do { //Move back, inserting digits as u go *--p = digit[i % 10]; i = i / 10; } while (i); return l; } void handleStatic(staticPage* Sp) { Response& resp(*this->resp); (_staticPage=Sp)->retain(); try { int bufferL = resp.buffer.length(); String mime; if(Sp->mime.length()>0)mime=Sp->mime; else mime=server->defaultMime; resp.addDefaultHeaders(thr.curRFCTime,mime); { const char* tmph = "Content-Length: "; int tmphL = strlen(tmph); memcpy(resp.headers.d + resp.headers.len, tmph, tmphL); resp.headers.len += tmphL; resp.headers.len += itoa64(Sp->fileLen, resp.headers.d + resp.headers.len); (resp.headers.d + resp.headers.len)[0] = '\r'; (resp.headers.d + resp.headers.len)[1] = '\n'; resp.headers.len += 2; StreamWriter sw(resp.buffer); resp.serializeHeaders(sw); } if(Sp->fileLen>=CPPSP_SENDFILE_MIN_SIZE) { _sendFileOffset=0; s.sendAll(resp.buffer.data()+bufferL,resp.buffer.length()-bufferL, MSG_MORE, { &handler::sendHeadersCB, this }); } else { String data=Sp->data; iov[0]= {resp.buffer.data()+bufferL, (size_t)(resp.buffer.length()-bufferL)}; iov[1]= {data.data(), (size_t)data.length()}; resp.outputStream->writevAll(iov, data.length()<=0?1:2, { &handler::writevCB, this }); } } catch(exception& ex) { Sp->release(); server->handleError(req,resp,ex,{&handler::finalize,this}); } } void sendHeadersCB(int r) { if(r<0) { _staticPage->release(); end(); return; } _beginSendFile(); } void _beginSendFile() { s.sendFileFrom(_staticPage->fd,_sendFileOffset,CPPSP_SENDFILE_BUFSIZE,{&handler::sendFileCB,this}); } void sendFileCB(int r) { if(r<0) { _staticPage->release(); end(); } else if(r==0) { _staticPage->release(); finalize(); } else { _sendFileOffset+=(int64_t)r; _beginSendFile(); } } void handleDynamic(loadedPage* lp) { Response& resp(*this->resp); Page* p=lp->doCreate(&sp); //hold a strong reference to lp so that if cleanCache() etc is called by application code, //the application does not unload itself, causing a segfault p->lp=lp; p->sp=&sp; p->request=&req; p->response=&resp; p->poll=&this->p; p->server=server; p->handleRequest({&handler::handleRequestCB,this}); } void sockReadCB(int r) { if(r<=0) { free(buf); destruct(); } } void flushCB(Response& resp) { //s->shutdown(SHUT_WR); //release(); finalize(); } void writevCB(int i) { _staticPage->release(); if(likely(i>=0)) finalize(); else end(); } void handleRequestCB() { //s->shutdown(SHUT_WR); //release(); //s->repeatRead(buf,sizeof(buf),{&handler::sockReadCB,this}); finalize(); } void finalize() { if(resp->closed) { end(); return; } cleanup(); if(keepAlive) { req.init(s,&sp); if(readLoopRunning) shouldContinueReading=true; else readLoop(); } else { s.shutdown(SHUT_WR); buf=(uint8_t*)malloc(4096); s.repeatRead(buf,4096,{&handler::sockReadCB,this}); } } //cleanup and terminate the connection void end() { cleanup(); destruct(); } //deallocate resources after a request has been completed void cleanup() { server->performanceCounters.totalRequestsFinished++; thr.performanceCounters.totalRequestsFinished++; req.reset(); resp->reset(); thr._responsePool.put(resp); resp=nullptr; sp.clear(); } ~handler() { //printf("~handler()\n"); s.release(); } }; void staticHandler(staticPage* v,cppsp::Request& req, Response& resp, Delegate cb) { cppspServer::Request& r=static_cast<:request>(req); (*(handler*)r._handler).handleStatic(v); } void dynamicHandler(loadedPage* v,cppsp::Request& req, Response& resp, Delegate cb) { cppspServer::Request& r=static_cast<:request>(req); (*(handler*)r._handler).handleDynamic(v); } AsyncValue Host::routeStaticRequest(String path) { struct stat st; string tmps=path.toSTDString(); if(::stat(tmps.c_str(), &st)!=0) throwUNIXException(tmps); staticPage* sp; if(st.st_size>=CPPSP_SENDFILE_MIN_SIZE) sp=loadStaticPage(path,true,false); else sp=loadStaticPage(path); return Handler(&staticHandler,sp); } struct requestRouterState { Delegate cb; void operator()(loadedPage* lp, exception* ex) { if(lp==NULL)cb(nullptr,ex); else cb(Handler(&dynamicHandler,lp),nullptr); delete this; } }; AsyncValue Host::routeDynamicRequest(String path) { auto lp=loadPage(path); if(lp) { return Handler(&dynamicHandler,lp()); } requestRouterState* st=new requestRouterState(); lp.wait(st); return Future(&st->cb); } } namespace cppsp { struct HandlerBase { Request* request; Response* response; Delegate cb; }; template Handler makeHandler() { struct H: public RGC::Object { T* construct(RGC::Allocator& alloc) { return alloc.New(); } void operator()(Request& req, Response& resp, Delegate cb) { T* tmp=construct(*req.sp); tmp->request=&req; tmp->response=&resp; tmp->cb=cb; tmp->process(); } }; return newObj(); } }; namespace cppspEmbedded { class Server { public: cppspServer::Host host; Server(Poll* p, string root):host(p,root) { } void listen(Socket& s) { struct CB: public RGC::Object { Server* s; CB(Server* s):s(s){} void operator()(Socket* clientSock) { new cppspServer::handler(s->host,*s->host.poll,*clientSock); clientSock->release(); } }; s.repeatAccept(newObj(this)); } HandleRequestChain::item* attachHandler(const Handler& h) { return host.defaultServer->handleRequest.attach(h); } void detachHandler(HandleRequestChain::item* it) { return host.defaultServer->handleRequest.detach(it); } }; };