Branch data Line data Source code
1 : : /* Copyright 2006-2009, 2016 Savarese Software Research Corporation
2 : : *
3 : : * Licensed under the Apache License, Version 2.0 (the "License");
4 : : * you may not use this file except in compliance with the License.
5 : : * You may obtain a copy of the License at
6 : : *
7 : : * http://www.savarese.com/software/ApacheLicense-2.0
8 : : *
9 : : * Unless required by applicable law or agreed to in writing, software
10 : : * distributed under the License is distributed on an "AS IS" BASIS,
11 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : * See the License for the specific language governing permissions and
13 : : * limitations under the License.
14 : : */
15 : :
16 : : #include <vector>
17 : :
18 : : #include <boost/multi_index_container.hpp>
19 : : #include <boost/multi_index/mem_fun.hpp>
20 : : #include <boost/multi_index/ordered_index.hpp>
21 : : #include <boost/multi_index/hashed_index.hpp>
22 : :
23 : : #include <ssrc/wisp/service/EventLoop.h>
24 : : #include <ssrc/wisp/service/EventPort.h>
25 : :
26 : : __BEGIN_NS_SSRC_WISP_SERVICE
27 : :
28 : : using namespace std::rel_ops;
29 : :
30 : : enum { ByExpiration, ByIdentity };
31 : :
32 : : typedef EventPort::event_type event_type;
33 : : typedef EventPort::events_type events_type;
34 : : typedef std::vector<event_type> io_event_container;
35 : :
36 : 3 : struct EventLoopState {
37 : : typedef boost::multi_index_container<
38 : : EventHandler*,
39 : : boost::multi_index::indexed_by<
40 : : boost::multi_index::ordered_non_unique<
41 : : boost::multi_index::const_mem_fun<EventHandler, const TimeValue &,
42 : : &EventHandler::expiration> >,
43 : : boost::multi_index::hashed_unique<boost::multi_index::identity<EventHandler*> >
44 : : > > event_handler_container;
45 : :
46 : : bool running;
47 : : EventPort port;
48 : : unsigned int io_handler_count;
49 : : io_event_container io_events;
50 : : event_handler_container event_handlers;
51 : :
52 : 3 : EventLoopState() :
53 : : running(false), port(),
54 [ + - + - ]: 3 : io_handler_count(0), io_events(), event_handlers()
55 : 3 : { }
56 : : };
57 : :
58 : : typedef EventLoopState::event_handler_container event_handler_container;
59 : :
60 : : const EventLoop::EventIO EventLoop::None = EventPort::None;
61 : : const EventLoop::EventIO EventLoop::Read = EventPort::Read;
62 : : const EventLoop::EventIO EventLoop::Write = EventPort::Write;
63 : : const EventLoop::EventIO EventLoop::Error = EventPort::Error;
64 : : const EventLoop::EventIO EventLoop::Hangup = EventPort::Hangup;
65 : : /*
66 : : const EventLoop::EventIO EventLoop::Signal = EventPort::Signal;
67 : : const EventLoop::EventIO EventLoop::Timeout = EventPort::Timeout;
68 : : */
69 : :
70 : 3 : EventLoop::EventLoop() SSRC_DECL_THROW(std::runtime_error) :
71 : 3 : _state(std::make_unique<EventLoopState>())
72 : 3 : { }
73 : :
74 : : EventLoop::~EventLoop() = default;
75 : :
76 : 0 : unsigned int EventLoop::count_handlers() {
77 : 0 : return _state->event_handlers.size();
78 : : }
79 : :
80 : 21 : unsigned int EventLoop::count_io_handlers() {
81 : 21 : return _state->io_handler_count;
82 : : }
83 : :
84 : 0 : bool EventLoop::running() {
85 : 0 : return _state->running;
86 : : }
87 : :
88 : 3 : void EventLoop::add_handler(EventHandler & handler,
89 : : const int events,
90 : : const TimeValue & timeout,
91 : : const bool once)
92 : : SSRC_DECL_THROW(std::runtime_error)
93 : : {
94 [ + - + - : 3 : if(handler.event_descriptor() != EventHandler::NoDescriptor && events != None) {
+ - ]
95 [ + - ]: 3 : if(_state->port.add(handler.event_descriptor(), events, &handler, once)) {
96 [ + - ]: 3 : if(_state->io_events.size() == _state->io_handler_count)
97 [ + - ]: 3 : _state->io_events.push_back(event_type());
98 : 3 : ++_state->io_handler_count;
99 : : }
100 : : }
101 : :
102 : 3 : handler._handled_timeout = false;
103 : 3 : handler._once = once;
104 : 3 : handler._timeout = timeout;
105 : :
106 : : #if defined(WISP_HAVE_NONPERSISTENT_EVENTS) || defined(WISP_HAVE_KQUEUE)
107 : 3 : handler._events = events;
108 : : #endif
109 : :
110 [ - + ]: 3 : if(timeout == InfiniteTimeValue)
111 : 0 : handler._expiration = InfiniteTimeValue;
112 : : else {
113 : 3 : TimeValue expiration;
114 [ + - ]: 3 : expiration.now_mono()+=timeout;
115 : 3 : handler._expiration = expiration;
116 : : }
117 : :
118 [ + - ]: 3 : _state->event_handlers.insert(&handler);
119 : :
120 : 3 : }
121 : :
122 : 3 : void EventLoop::remove_handler(EventHandler & handler) {
123 : : const bool is_io_event =
124 : 3 : (handler.event_descriptor() != EventHandler::NoDescriptor);
125 : :
126 [ + - ]: 3 : if(is_io_event) {
127 : : // We do not check return value of remove because it returns false
128 : : // only if the descriptor is not associated with the port. When
129 : : // that happens (should happen only on HAVE_NONPERSISTENT_EVENTS
130 : : // platforms such as Solaris and with automatically removed events
131 : : // such as EV_ONESHOT on FreeBSD), we want to remove the handler anyway.
132 : : // Other errors throw an exception.
133 : : #if defined(WISP_HAVE_KQUEUE)
134 [ + - ]: 3 : if(!handler._once) {
135 : 3 : _state->port.remove(handler.event_descriptor(), handler.events());
136 : : }
137 : : #elif defined(WISP_HAVE_NONPERSISTENT_EVENTS)
138 : : if(!handler._once) {
139 : : _state->port.remove(handler.event_descriptor());
140 : : }
141 : : #else
142 : : _state->port.remove(handler.event_descriptor());
143 : : #endif
144 : : }
145 : :
146 : : // CAREFUL! The order of the tests matters as we erase as a side-effect
147 : : // and don't want to decrement io_handler_count for timeout handlers.
148 [ + - + - : 3 : if(_state->event_handlers.get<ByIdentity>().erase(&handler) > 0 &&
+ - ]
149 : : is_io_event)
150 : : {
151 : 3 : --_state->io_handler_count;
152 : : // We can't shrink the vector because it could mess up start loop.
153 : : //_state->io_events.pop_back();
154 : : }
155 : 3 : }
156 : :
157 : 3 : void EventLoop::start() {
158 : 3 : EventInfo info(*this);
159 : 3 : TimeValue now;
160 : 3 : io_event_container & io_events = _state->io_events;
161 : 3 : event_handler_container & event_handlers = _state->event_handlers;
162 : 3 : EventPort & port = _state->port;
163 : :
164 : 3 : _state->running = true;
165 [ + - + + : 15 : while(count_io_handlers() > 0 && _state->running) {
+ - + + ]
166 : 6 : int fd_count, timeout = -1;
167 : 6 : event_handler_container::iterator it = event_handlers.begin();
168 [ + - ]: 6 : EventHandler *handler = *it;
169 : :
170 [ + - - + ]: 6 : if(io_events.size() != count_io_handlers())
171 [ # # # # ]: 0 : io_events.resize(count_io_handlers());
172 : :
173 [ + - ]: 6 : if(handler->has_timeout()) {
174 [ + - ]: 6 : now.now_mono();
175 : :
176 [ - + ]: 6 : if(now >= handler->expiration())
177 : 0 : timeout = 0;
178 : : else
179 : 6 : timeout = (handler->expiration() - now).to_milliseconds();
180 : : }
181 : :
182 [ + - + - ]: 6 : fd_count = port.wait(&io_events[0], count_io_handlers(), timeout);
183 : :
184 [ - + ]: 6 : if(fd_count < 0) {
185 : : // TODO: log exit condition, including errno and strerror(errno).
186 [ # # ]: 0 : stop();
187 : 0 : break;
188 : : }
189 : :
190 [ + - ]: 6 : info._now = now.now_mono();
191 : :
192 [ + + ]: 12 : for(int i = 0; i < fd_count; ++i) {
193 : 6 : handler = EventPort::get_handler(io_events[i]);
194 : 6 : info._io_events = EventPort::get_events(io_events[i]);
195 : 6 : info._timeout = (handler->expiration() <= now);
196 : :
197 : : /*
198 : : if(info.signal_event())
199 : : handler->handle_signal(info);
200 : : */
201 : :
202 [ - + ]: 6 : if(info.error_event())
203 [ # # ]: 0 : handler->handle_error(info);
204 : :
205 [ - + ]: 6 : if(info.hangup_event())
206 [ # # ]: 0 : handler->handle_hangup(info);
207 : :
208 [ + - ]: 6 : if(info.read_event())
209 [ + - ]: 6 : handler->handle_read(info);
210 : :
211 [ - + ]: 6 : if(info.write_event())
212 [ # # ]: 0 : handler->handle_write(info);
213 : :
214 [ - + ]: 6 : if(info.timeout_event()) {
215 [ # # ]: 0 : handler->handle_timeout(info);
216 : 0 : handler->_handled_timeout = true;
217 [ - + ]: 6 : } else if(handler->_once) {
218 [ # # ]: 0 : remove_handler(*handler);
219 : : }
220 : :
221 : : #if defined(WISP_HAVE_NONPERSISTENT_EVENTS)
222 : : // Solaris ports require reassociation after every event.
223 : : // TODO: log error and decrement io_handler_count if this fails?
224 : : if(!handler->_once) {
225 : : port.add(handler->event_descriptor(), handler->_events, handler,
226 : : handler->_once);
227 : : }
228 : : #endif
229 : : }
230 : :
231 : 6 : info._timeout = true;
232 : 6 : info._io_events = None;
233 : :
234 : 6 : it = event_handlers.begin();
235 : : std::iterator_traits<event_handler_container::iterator>::difference_type
236 [ + - + - ]: 6 : count = std::distance(it, event_handlers.upper_bound(now));
237 : :
238 [ - + ]: 6 : while(count-- > 0) {
239 [ # # ]: 0 : handler = *it;
240 [ # # ]: 0 : it = event_handlers.erase(it);
241 : :
242 : : // Insert beforehand to allow handler the option to remove itself.
243 [ # # ]: 0 : if(!handler->_once) {
244 : 0 : handler->_expiration = now + handler->timeout();
245 [ # # ]: 0 : event_handlers.insert(handler);
246 : : } else
247 [ # # ]: 0 : remove_handler(*handler);
248 : :
249 [ # # ]: 0 : if(!handler->_handled_timeout)
250 [ # # ]: 0 : handler->handle_timeout(info);
251 : : else
252 : 0 : handler->_handled_timeout = false;
253 : : }
254 : : }
255 : 3 : }
256 : :
257 : 0 : void EventLoop::stop() {
258 : : // TODO: do something to wake up the poll.
259 : 0 : _state->running = false;
260 : 0 : }
261 : :
262 : :
263 [ + - + - ]: 6 : __END_NS_SSRC_WISP_SERVICE
|