Savarese Software Research Corporation
EventLoop.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2006-2009 Savarese Software Research Corporation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.savarese.com/software/ApacheLicense-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include <vector>
00018 
00019 #include <boost/multi_index_container.hpp>
00020 #include <boost/multi_index/mem_fun.hpp>
00021 #include <boost/multi_index/ordered_index.hpp>
00022 #include <boost/multi_index/hashed_index.hpp>
00023 
00024 #include <ssrc/wisp/service/EventLoop.h>
00025 #include <ssrc/wisp/service/EventPort.h>
00026 
00027 __BEGIN_NS_SSRC_WISP_SERVICE
00028 
00029 using namespace std::rel_ops;
00030 
00031 enum { ByExpiration, ByIdentity };
00032 
00033 typedef EventPort::event_type event_type;
00034 typedef EventPort::events_type events_type;
00035 typedef std::vector<event_type> io_event_container;
00036 
00037 struct EventLoopState {
00038   typedef boost::multi_index_container<
00039     EventHandler*,
00040     boost::multi_index::indexed_by<
00041       boost::multi_index::ordered_non_unique<
00042         boost::multi_index::const_mem_fun<EventHandler, const TimeValue &,
00043                                           &EventHandler::expiration> >,
00044       boost::multi_index::hashed_unique<boost::multi_index::identity<EventHandler*> >
00045       > > event_handler_container;
00046 
00047   bool running;
00048   EventPort port;
00049   unsigned int io_handler_count;
00050   io_event_container io_events;
00051   event_handler_container event_handlers;
00052 
00053   EventLoopState() :
00054     running(false), port(),
00055     io_handler_count(0), io_events(), event_handlers()
00056   { }
00057 };
00058 
00059 typedef EventLoopState::event_handler_container event_handler_container;
00060 
00061 const EventLoop::EventIO EventLoop::None   = EventPort::None;
00062 const EventLoop::EventIO EventLoop::Read   = EventPort::Read;
00063 const EventLoop::EventIO EventLoop::Write  = EventPort::Write;
00064 const EventLoop::EventIO EventLoop::Error  = EventPort::Error;
00065 const EventLoop::EventIO EventLoop::Hangup = EventPort::Hangup;
00066 /*
00067 const EventLoop::EventIO EventLoop::Signal  = EventPort::Signal;
00068 const EventLoop::EventIO EventLoop::Timeout = EventPort::Timeout;
00069 */
00070 
00071 EventLoop::EventLoop() SSRC_DECL_THROW(std::runtime_error) :
00072   _state(new EventLoopState())
00073 { }
00074 
00075 EventLoop::~EventLoop() { delete _state; }
00076 
00077 unsigned int EventLoop::count_handlers() {
00078   return _state->event_handlers.size();
00079 }
00080 
00081 unsigned int EventLoop::count_io_handlers() {
00082   return _state->io_handler_count;
00083 }
00084 
00085 bool EventLoop::running() {
00086   return _state->running;
00087 }
00088 
00089 void EventLoop::add_handler(EventHandler & handler,
00090                             const int events,
00091                             const TimeValue & timeout,
00092                             const bool once)
00093   SSRC_DECL_THROW(std::runtime_error)
00094 {
00095   if(handler.event_descriptor() != EventHandler::NoDescriptor && events != None) {
00096     if(_state->port.add(handler.event_descriptor(), events, &handler, once)) {
00097       if(_state->io_events.size() == _state->io_handler_count)
00098         _state->io_events.push_back(event_type());
00099       ++_state->io_handler_count;
00100     }
00101   }
00102 
00103   handler._handled_timeout = false;
00104   handler._once    = once;
00105   handler._timeout = timeout;
00106 
00107 #if defined(WISP_HAVE_NONPERSISTENT_EVENTS) || defined(WISP_HAVE_KQUEUE)
00108   handler._events = events;
00109 #endif
00110 
00111   if(timeout == InfiniteTimeValue)
00112     handler._expiration = InfiniteTimeValue;
00113   else {
00114     TimeValue expiration;
00115     expiration.now_mono()+=timeout;
00116     handler._expiration = expiration;
00117   }
00118 
00119   _state->event_handlers.insert(&handler);
00120 
00121 }
00122 
00123 void EventLoop::remove_handler(EventHandler & handler) {
00124   const bool is_io_event =
00125     (handler.event_descriptor() != EventHandler::NoDescriptor);
00126 
00127   if(is_io_event) {
00128     // We do not check return value of remove because it returns false
00129     // only if the descriptor is not associated with the port.  When
00130     // that happens (should happen only on HAVE_NONPERSISTENT_EVENTS
00131     // platforms such as Solaris), we want to remove the handler anyway.
00132     // Other errors throw an exception.
00133 #if defined(WISP_HAVE_KQUEUE)
00134     _state->port.remove(handler.event_descriptor(), handler.events());
00135 #else
00136     _state->port.remove(handler.event_descriptor());
00137 #endif
00138   }
00139 
00140   // CAREFUL!  The order of the tests matters as we erase as a side-effect
00141   // and don't want to decrement io_handler_count for timeout handlers.
00142   if(_state->event_handlers.get<ByIdentity>().erase(&handler) > 0 &&
00143      is_io_event)
00144   {
00145     --_state->io_handler_count;
00146     // We can't shrink the vector because it could mess up start loop.
00147     //_state->io_events.pop_back();
00148   }
00149 }
00150 
00151 void EventLoop::start() {
00152   EventInfo info(*this);
00153   TimeValue now;
00154   io_event_container & io_events = _state->io_events;
00155   event_handler_container & event_handlers = _state->event_handlers;
00156   EventPort & port = _state->port;
00157 
00158   _state->running = true;
00159   while(count_io_handlers() > 0 && _state->running) {
00160     int fd_count, timeout = -1;
00161     event_handler_container::iterator it = event_handlers.begin();
00162     EventHandler *handler = *it;
00163 
00164     if(io_events.size() != count_io_handlers())
00165       io_events.resize(count_io_handlers());
00166 
00167     if(handler->has_timeout()) {
00168       now.now_mono();
00169 
00170       if(now >= handler->expiration())
00171         timeout = 0;
00172       else
00173         timeout = (handler->expiration() - now).to_milliseconds();
00174     }
00175 
00176     fd_count = port.wait(&io_events[0], count_io_handlers(), timeout);
00177 
00178     if(fd_count < 0) {
00179       // TODO: log exit condition, including errno and strerror(errno).
00180       stop();
00181       break;
00182     }
00183 
00184     info._now = now.now_mono();
00185 
00186     for(int i = 0; i < fd_count; ++i) {
00187       handler = EventPort::get_handler(io_events[i]);
00188       info._io_events = EventPort::get_events(io_events[i]);
00189       info._timeout = (handler->expiration() <= now);
00190 
00191       /*
00192         if(info.signal_event())
00193         handler->handle_signal(info);
00194       */
00195 
00196       if(info.error_event())
00197         handler->handle_error(info);
00198 
00199       if(info.hangup_event())
00200         handler->handle_hangup(info);
00201 
00202       if(info.read_event())
00203         handler->handle_read(info);
00204 
00205       if(info.write_event())
00206         handler->handle_write(info);
00207 
00208       if(info.timeout_event()) {
00209         handler->handle_timeout(info);
00210         handler->_handled_timeout = true;
00211       } else if(handler->_once) {
00212         remove_handler(*handler);
00213       }
00214 
00215 #if defined(WISP_HAVE_NONPERSISTENT_EVENTS)
00216       // Solaris ports require reassociation after every event.
00217       // TODO: log error and decrement io_handler_count if this fails?
00218       if(!handler->_once) {
00219         port.add(handler->event_descriptor(), handler->_events, handler,
00220                  handler->_once);
00221       }
00222 #endif
00223     }
00224 
00225     info._timeout = true;
00226     info._io_events = None;
00227 
00228     it = event_handlers.begin();
00229     std::iterator_traits<event_handler_container::iterator>::difference_type
00230       count = std::distance(it, event_handlers.upper_bound(now));
00231 
00232     while(count-- > 0) {
00233       handler = *it;
00234       it = event_handlers.erase(it);
00235 
00236       // Insert beforehand to allow handler the option to remove itself.
00237       if(!handler->_once) {
00238         handler->_expiration = now + handler->timeout();
00239         event_handlers.insert(handler);
00240       } else
00241         remove_handler(*handler);
00242 
00243       if(!handler->_handled_timeout)
00244         handler->handle_timeout(info);
00245       else
00246         handler->_handled_timeout = false;
00247     }
00248   }
00249 }
00250 
00251 void EventLoop::stop() {
00252   // TODO: do something to wake up the poll.
00253   _state->running = false;
00254 }
00255 
00256 
00257 __END_NS_SSRC_WISP_SERVICE

Savarese Software Research Corporation
Copyright © 2006-2010 Savarese Software Research Corporation. All rights reserved.