service/service.h
Go to the documentation of this file.
00001 /* 00002 * Copyright 2006-2008 Savarese Software Research Corporation 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.savarese.com/software/ApacheLicense-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00022 #ifndef __SSRC_WISP_SERVICE_SERVICE_H 00023 #define __SSRC_WISP_SERVICE_SERVICE_H 00024 00025 #include <ssrc/wisp/protocol/ContinuationCaller.h> 00026 #include <ssrc/wisp/service/EventLoop.h> 00027 00028 #include <boost/bind.hpp> 00029 00030 #ifdef WISP_DEBUG 00031 #include <iostream> 00032 #endif 00033 00034 __BEGIN_NS_SSRC_WISP_SERVICE 00035 00036 using std::string; 00037 using NS_SSRC_SPREAD::MembershipInfo; 00038 using NS_SSRC_SPREAD::Message; 00039 using NS_SSRC_SPREAD::GroupList; 00040 using NS_SSRC_WISP_PROTOCOL::MessageInfo; 00041 using NS_SSRC_WISP_PROTOCOL::message_info_ptr; 00042 using NS_SSRC_WISP_PROTOCOL::CallException; 00043 using NS_SSRC_WISP_PROTOCOL::GroupMembershipDisable; 00044 using NS_SSRC_WISP_PROTOCOL::wisp_message_protocol; 00045 using NS_SSRC_WISP_PROTOCOL::wisp_message_id; 00046 00047 #define WISP_SERVICE_REQUEST(MessageType) \ 00048 set_request_handler<MessageType>(*this) 00049 #define WISP_SERVICE_RESPONSE(MessageType) \ 00050 set_response_handler<MessageType>(*this) 00051 #define WISP_SERVICE_REQUEST_T(MessageType) \ 00052 this->template set_request_handler<MessageType>(*this) 00053 #define WISP_SERVICE_RESPONSE_T(MessageType) \ 00054 this->template set_response_handler<MessageType>(*this) 00055 00056 #define WISP_SERVICE_REQUEST_BUFFERED(MessageType, msg) \ 00057 set_request_handler<MessageType>(*this, msg) 00058 #define WISP_SERVICE_RESPONSE_BUFFERED(MessageType, msg) \ 00059 set_response_handler<MessageType>(*this, msg) 00060 #define WISP_SERVICE_REQUEST_BUFFERED_T(MessageType, msg) \ 00061 this->template set_request_handler<MessageType>(*this, msg) 00062 #define WISP_SERVICE_RESPONSE_BUFFERED_T(MessageType, msg) \ 00063 this->template set_response_handler<MessageType>(*this, msg) 00064 00065 00066 typedef boost::function<void (const EventInfo &)> timeout_handler; 00067 // We expose message_handler_type, message_handler_entry, and 00068 // message_handler_map specifically to support dynamically loaded handlers. 00069 typedef boost::function<void (MessageInfo &)> message_handler_type; 00070 00071 struct message_handler_entry { 00072 wisp_message_protocol protocol; 00073 wisp_message_id id; 00074 message_handler_type handle_message; 00075 00076 message_handler_entry(const wisp_message_protocol protocol, 00077 const wisp_message_id id, 00078 const message_handler_type & message_handler) : 00079 protocol(protocol), id(id), handle_message(message_handler) 00080 { } 00081 }; 00082 00083 typedef boost::multi_index_container< 00084 message_handler_entry, 00085 boost::multi_index::indexed_by< 00086 boost::multi_index::hashed_unique< 00087 boost::multi_index::composite_key< 00088 message_handler_entry, 00089 boost::multi_index::member<message_handler_entry, 00090 wisp_message_protocol, 00091 &message_handler_entry::protocol>, 00092 boost::multi_index::member<message_handler_entry, wisp_message_id, 00093 &message_handler_entry::id> > 00094 > 00095 > > message_handler_map; 00096 00097 /* BEGIN EXPERIMENTAL CODE */ 00098 // TODO: This is all one big kluge and should be re-implemented, but it works.. 00099 class ServiceContext { 00100 public: 00101 class TimeoutHandler : public EventHandler { 00102 ServiceContext & _context; 00103 bool _once; 00104 timeout_handler _handler; 00105 00106 public: 00107 00108 TimeoutHandler(ServiceContext & context, 00109 const timeout_handler & handler, bool once) : 00110 _context(context), _once(once), _handler(handler) 00111 { } 00112 00113 virtual ~TimeoutHandler() { } 00114 00115 virtual void handle_timeout(const EventInfo & info) { 00116 _handler(info); 00117 // TODO: WARNING: This could blow up because the current object 00118 // may be deleted on the call to remove_timeout. This works only 00119 // because the removal happens at the end of the function and we 00120 // don't access 'this' again. Seek an alternative solution. 00121 if(_once) 00122 _context.remove_timeout(this); 00123 } 00124 00125 void execute() { 00126 handle_timeout(EventInfo(_context.event_loop(), EventLoop::None, false)); 00127 } 00128 00129 bool once() { 00130 return _once; 00131 } 00132 00133 const TimeoutHandler *address() const { 00134 return this; 00135 } 00136 }; 00137 00138 typedef boost::shared_ptr<TimeoutHandler> timeout_handler_ptr; 00139 00140 typedef boost::multi_index_container< 00141 timeout_handler_ptr, 00142 boost::multi_index::indexed_by< 00143 boost::multi_index::hashed_unique< 00144 boost::multi_index::const_mem_fun<TimeoutHandler, 00145 const TimeoutHandler *, 00146 &TimeoutHandler::address> > 00147 > > timeout_map; 00148 00149 private: 00150 EventLoop *loop; 00151 timeout_map _timeouts; 00152 00153 public: 00154 00155 ServiceContext(EventLoop * loop = 0) : loop(loop) { } 00156 00157 timeout_map::size_type count_timeouts() const { 00158 return _timeouts.size(); 00159 } 00160 00161 void clear_timeouts() { 00162 for(timeout_map::iterator it = _timeouts.begin(), end = _timeouts.end(); 00163 it != end; ++it) 00164 remove_timeout(it->get()); 00165 } 00166 00167 timeout_handler_ptr add_timeout(const timeout_handler & handler_fun, 00168 const TimeValue & timeout, 00169 const bool once) 00170 { 00171 timeout_handler_ptr handler(new TimeoutHandler(*this, handler_fun, once)); 00172 _timeouts.insert(handler); 00173 loop->add_handler(*handler, EventLoop::None, timeout); 00174 return handler; 00175 } 00176 00177 void remove_timeout(TimeoutHandler *timeout) { 00178 // must remove from loop first or we lose pointer!!! 00179 loop->remove_handler(*timeout); 00180 _timeouts.erase(timeout); 00181 } 00182 00183 EventLoop & event_loop() { return *loop; } 00184 }; 00185 00186 00187 typedef ServiceContext::timeout_handler_ptr timeout_ptr; 00188 00189 /* END EXPERIMENTAL CODE */ 00190 00191 template<typename PackingTraits = BinaryPackingTraits> 00192 class ServiceProtocolProcessor { 00193 public: 00194 typedef PackingTraits packing_traits; 00195 typedef typename protocol::ContinuationCaller<packing_traits> caller_type; 00196 00197 enum State { Starting, Started, Stopping, Stopped }; 00198 00207 static const bool GroupMembership = GroupMembershipDisable; 00208 00209 private: 00210 State _state; 00211 00212 private: 00213 00214 message_handler_map _request_handlers; 00215 message_handler_map _response_handlers; 00216 00217 protected: 00218 caller_type & _caller; 00219 00220 // WARNING!! This is highly experimental and only made available so 00221 // a subclass may add handlers to the event loop for I/O on other 00222 // descriptors. The context is only valid in the Starting and 00223 // Started states. 00224 ServiceContext & context() { return _context; }; 00225 00226 virtual void process_membership_message(const MessageInfo & msginfo, 00227 const MembershipInfo & meminfo) 00228 { } 00229 00230 // So you can reuse message types that are expensive to create on each call. 00231 template<typename MessageType, typename Impl> 00232 void request(Impl & impl, MessageType & msg, MessageInfo & msginfo) 00233 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00234 CallException) 00235 { 00236 _caller.unpack(msg, msginfo); 00237 // Cast forces msginfo to be const arg in process_request 00238 impl.process_request(msg, static_cast<const MessageInfo &>(msginfo)); 00239 } 00240 00241 template<typename MessageType, typename Impl> 00242 void request(Impl & impl, MessageInfo & msginfo) 00243 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00244 { 00245 MessageType msg; 00246 request(impl, msg, msginfo); 00247 } 00248 00249 template<typename MessageType, typename Impl> 00250 void respond(Impl & impl, MessageType & msg, MessageInfo & msginfo) 00251 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00252 CallException) 00253 { 00254 _caller.unpack(msg, msginfo); 00255 // Cast forces msginfo to be const arg in process_response 00256 impl.process_response(msg, static_cast<const MessageInfo &>(msginfo)); 00257 } 00258 00259 template<typename MessageType, typename Impl> 00260 void respond(Impl & impl, MessageInfo & msginfo) 00261 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00262 { 00263 MessageType msg; 00264 respond(impl, msg, msginfo); 00265 } 00266 00267 bool set_request_handler(const message_handler_entry & handler) { 00268 return _request_handlers.insert(handler).second; 00269 } 00270 00271 message_handler_map::size_type 00272 remove_request_handler(const message_handler_entry & handler) { 00273 return 00274 _request_handlers.erase(_request_handlers.key_extractor()((handler))); 00275 } 00276 00277 void clear_request_handlers() { 00278 _request_handlers.clear(); 00279 } 00280 00281 template<typename MessageType, typename Impl> 00282 bool set_request_handler(Impl & impl) { 00283 return set_request_handler(message_handler_entry(MessageType::protocol, 00284 MessageType::id, 00285 boost::bind(&ServiceProtocolProcessor::template request<MessageType, Impl>, this, std::ref(impl), _1))); 00286 } 00287 00288 template<typename MessageType, typename Impl> 00289 bool set_request_handler(Impl & impl, MessageType & buffer) { 00290 return set_request_handler(message_handler_entry(MessageType::protocol, 00291 MessageType::id, 00292 boost::bind(&ServiceProtocolProcessor::template request<MessageType, Impl>, this, std::ref(impl), std::ref(buffer), _1))); 00293 } 00294 00295 bool set_response_handler(const message_handler_entry & handler) { 00296 return _response_handlers.insert(handler).second; 00297 } 00298 00299 message_handler_map::size_type 00300 remove_response_handler(const message_handler_entry & handler) { 00301 return 00302 _response_handlers.erase(_response_handlers.key_extractor()((handler))); 00303 } 00304 00305 void clear_response_handlers() { 00306 _response_handlers.clear(); 00307 } 00308 00309 template<typename MessageType, typename Impl> 00310 bool set_response_handler(Impl & impl) { 00311 return set_response_handler(message_handler_entry(MessageType::protocol, 00312 MessageType::id, 00313 boost::bind(&ServiceProtocolProcessor::template respond<MessageType, Impl>, this, std::ref(impl), _1))); } 00314 00315 template<typename MessageType, typename Impl> 00316 bool set_response_handler(Impl & impl, MessageType & buffer) { 00317 return set_response_handler(message_handler_entry(MessageType::protocol, 00318 MessageType::id, 00319 boost::bind(&ServiceProtocolProcessor::template respond<MessageType, Impl>, this, std::ref(impl), std::ref(buffer), _1))); 00320 } 00321 00322 virtual void transition(State state) { 00323 _state = state; 00324 } 00325 00326 public: 00327 00328 explicit ServiceProtocolProcessor(caller_type & caller) : 00329 _state(Stopped), 00330 _request_handlers(), _response_handlers(), _caller(caller) 00331 { } 00332 00333 virtual ~ServiceProtocolProcessor() { } 00334 00335 string name() const { 00336 return _caller.name(); 00337 } 00338 00339 State state() const { 00340 return _state; 00341 } 00342 00343 void start() { 00344 transition(Starting); 00345 } 00346 00347 // Services listening for membership messages should wait for self-leave 00348 // before transitioning to Stopped. 00349 void stop() { 00350 transition(Stopping); 00351 } 00352 00353 void membership(const MessageInfo & msginfo, 00354 const MembershipInfo & meminfo) 00355 { 00356 process_membership_message(msginfo, meminfo); 00357 } 00358 00359 void request(MessageInfo & msginfo) { 00360 message_handler_map::iterator it = 00361 _request_handlers.find(boost::make_tuple(msginfo.protocol(), 00362 msginfo.id())); 00363 if(it != _request_handlers.end()) 00364 it->handle_message(msginfo); 00365 // TODO: add an else log unhandled message 00366 } 00367 00371 void response(MessageInfo & msginfo) { 00372 if(!_caller.resume(msginfo)) { 00373 message_handler_map::iterator it = 00374 _response_handlers.find(boost::make_tuple(msginfo.protocol(), 00375 msginfo.id())); 00376 if(it != _response_handlers.end()) 00377 it->handle_message(msginfo); 00378 // TODO: add an else log unhandled message 00379 } 00380 } 00381 00382 /* BEGIN EXPERIMENTAL CODE */ 00383 // TODO: This is all one big kluge and should be re-implemented (which 00384 // is why this is stuffed at the end of the class), but it works. 00385 00386 timeout_ptr schedule_timeout(const timeout_handler & handler, 00387 const TimeValue & timeout, 00388 const bool once = EventLoop::Persist) 00389 { 00390 return _context.add_timeout(handler, timeout, once); 00391 } 00392 00393 void cancel_timeout(const timeout_ptr & timeout) { 00394 _context.remove_timeout(timeout.get()); 00395 } 00396 00397 void clear_timeouts() { 00398 _context.clear_timeouts(); 00399 } 00400 00401 ServiceContext::timeout_map::size_type count_timeouts() const { 00402 return _context.count_timeouts(); 00403 } 00404 00405 private: 00406 template<typename PP> friend class ServiceEventHandler; 00407 00408 ServiceContext _context; 00409 00410 void set_service_context(const ServiceContext & context) { 00411 _context = context; 00412 } 00413 00414 /* END EXPERIMENTAL CODE */ 00415 }; 00416 00417 template<typename PP> 00418 class ServiceEventHandler : public EventHandler { 00419 public: 00420 typedef PP protocol_processor; 00421 WISP_IMPORT_T(protocol_processor, caller_type); 00422 00423 private: 00424 caller_type & _caller; 00425 protocol_processor _protocol; 00426 // Scratch variables for handle_event(). 00427 message_info_ptr _message_info; 00428 MembershipInfo _membership_info; 00429 protocol::wisp_call_token _min_token, _max_token; 00430 std::vector<typename caller_type::jumbo_message_key_type> _jumbo_message_keys; 00431 00432 protected: 00433 00434 virtual void remove_handler(EventLoop & loop) { 00435 /* BEGIN EXPERIMENTAL CODE */ 00436 // TODO: This is a kluge and should be re-implemented, but it works. 00437 _protocol.clear_timeouts(); 00438 /* END EXPERIMENTAL CODE */ 00439 loop.remove_handler(*this); 00440 _caller.cancel_all(); 00441 } 00442 00443 public: 00444 00445 explicit ServiceEventHandler(caller_type & caller) : 00446 _caller(caller), _protocol(caller), 00447 _message_info(new MessageInfo(caller.message_capacity())), 00448 _membership_info(), 00449 _min_token(0), _max_token(0), 00450 _jumbo_message_keys(4) 00451 { } 00452 00453 template<typename Initializer> 00454 explicit ServiceEventHandler(caller_type & caller, 00455 const Initializer & initializer) : 00456 _caller(caller), _protocol(caller, initializer), 00457 _message_info(), 00458 _membership_info(), 00459 _min_token(0), _max_token(0), 00460 _jumbo_message_keys(4) 00461 { } 00462 00463 virtual ~ServiceEventHandler() { } 00464 00465 typename protocol_processor::State state() const { 00466 return _protocol.state(); 00467 } 00468 00469 virtual int event_descriptor() const { 00470 return _caller.mbox().descriptor(); 00471 } 00472 00473 virtual void handle_read(const EventInfo & info) { 00474 try { 00475 if(state() != protocol_processor::Stopped) { 00476 _caller.receive(_message_info); 00477 00478 if(_message_info->message.is_regular() && 00479 _protocol.state() < protocol_processor::Stopping) 00480 { 00481 if(_message_info->role() == protocol::TwoWayResponse) 00482 _protocol.response(*_message_info); 00483 else 00484 _protocol.request(*_message_info); 00485 } else if(_message_info->message.is_membership()) { 00486 _message_info->message.get_membership_info(_membership_info); 00487 _protocol.membership(*_message_info, _membership_info); 00488 } 00489 00490 if(state() == protocol_processor::Stopped) 00491 remove_handler(info.event_loop()); 00492 } 00493 } catch(const boost::archive::archive_exception & ae) { 00494 // TODO: log 00495 #ifdef WISP_DEBUG 00496 std::cerr << _protocol.name() 00497 << ": Caught boost::archive::archive_exception: " 00498 << ae.what() << "\nContinuing."; 00499 #endif 00500 } catch(const std::ios_base::failure & iof) { 00501 // TODO: log. probably don't want to try again 00502 #ifdef WISP_DEBUG 00503 std::cerr << _protocol.name() << ": Caught std::ios_base::failure: " 00504 << iof.what() << "\nContinuing."; 00505 #endif 00506 } 00507 } 00508 00509 virtual void handle_timeout(const EventInfo & info) { 00510 // Cancel continuations 00511 _caller.cancel_range(_min_token, _max_token); 00512 _min_token = _max_token; 00513 _max_token = _caller.call_token(); 00514 00515 // Clear out incomplete jumbo messages 00516 if(!_jumbo_message_keys.empty()) { 00517 if(_caller.count_jumbo_messages() > 0) { 00518 _caller.erase_jumbo_messages(_jumbo_message_keys.begin(), 00519 _jumbo_message_keys.end()); 00520 } 00521 _jumbo_message_keys.clear(); 00522 } 00523 00524 if(_caller.count_jumbo_messages() > 0) { 00525 _caller.collect_jumbo_message_keys(_jumbo_message_keys); 00526 } 00527 } 00528 00529 protocol_processor & protocol() { 00530 return _protocol; 00531 } 00532 00533 void start(EventLoop & loop, const unsigned int call_timeout) { 00534 _min_token = _max_token = _caller.call_token(); 00535 loop.add_handler(*this, EventLoop::Read, TimeValue(call_timeout, 0)); 00536 /* BEGIN EXPERIMENTAL CODE */ 00537 // TODO: This is a kluge and should be re-implemented, but it works. 00538 _protocol.set_service_context(ServiceContext(&loop)); 00539 /* END EXPERIMENTAL CODE */ 00540 _protocol.start(); 00541 } 00542 00543 void stop() { 00544 _protocol.stop(); 00545 } 00546 }; 00547 00548 00552 template<typename EH> 00553 class Service { 00554 public: 00555 typedef EH event_handler; 00556 WISP_IMPORT_T(event_handler, caller_type); 00557 WISP_IMPORT_T(event_handler, protocol_processor); 00558 00559 private: 00560 caller_type _caller; 00561 event_handler _handler; 00562 00563 protected: 00564 00565 protocol_processor & protocol() { 00566 return _handler.protocol(); 00567 } 00568 00569 public: 00570 00571 explicit Service(const string & connection = "", 00572 const string & name = "", 00573 const unsigned int message_capacity = 00574 Message::DefaultCapacity) : 00575 _caller(connection, name, message_capacity, 00576 protocol_processor::GroupMembership), 00577 _handler(_caller) 00578 { } 00579 00580 template<typename Initializer> 00581 explicit Service(const Initializer & initializer, 00582 const string & connection = "", 00583 const string & name = "", 00584 const unsigned int message_capacity = 00585 Message::DefaultCapacity) : 00586 _caller(connection, name, message_capacity, 00587 protocol_processor::GroupMembership), 00588 _handler(_caller, initializer) 00589 { } 00590 00591 const string & name() const { 00592 return _caller.name(); 00593 } 00594 00595 typename protocol_processor::State state() { 00596 return protocol().state(); 00597 } 00598 00599 void start(EventLoop & loop, unsigned int call_timeout) { 00600 _handler.start(loop, call_timeout); 00601 } 00602 00603 void stop() { 00604 _handler.stop(); 00605 } 00606 }; 00607 00608 __END_NS_SSRC_WISP_SERVICE 00609 00610 #endif
Copyright © 2006-2010 Savarese Software Research Corporation. All rights reserved.
Copyright © 2011 Savarese Software Research Corporation. All rights reserved