Savarese Software Research Corporation
service/service.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2006-2008 Savarese Software Research Corporation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.savarese.com/software/ApacheLicense-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00022 #ifndef __SSRC_WISP_SERVICE_SERVICE_H
00023 #define __SSRC_WISP_SERVICE_SERVICE_H
00024 
00025 #include <ssrc/wisp/protocol/ContinuationCaller.h>
00026 #include <ssrc/wisp/service/EventLoop.h>
00027 
00028 #include <boost/bind.hpp>
00029 
00030 #ifdef WISP_DEBUG
00031 #include <iostream>
00032 #endif
00033 
00034 __BEGIN_NS_SSRC_WISP_SERVICE
00035 
00036 using std::string;
00037 using NS_SSRC_SPREAD::MembershipInfo;
00038 using NS_SSRC_SPREAD::Message;
00039 using NS_SSRC_SPREAD::GroupList;
00040 using NS_SSRC_WISP_PROTOCOL::MessageInfo;
00041 using NS_SSRC_WISP_PROTOCOL::message_info_ptr;
00042 using NS_SSRC_WISP_PROTOCOL::CallException;
00043 using NS_SSRC_WISP_PROTOCOL::GroupMembershipDisable;
00044 using NS_SSRC_WISP_PROTOCOL::wisp_message_protocol;
00045 using NS_SSRC_WISP_PROTOCOL::wisp_message_id;
00046 
00047 #define WISP_SERVICE_REQUEST(MessageType) \
00048   set_request_handler<MessageType>(*this)
00049 #define WISP_SERVICE_RESPONSE(MessageType) \
00050   set_response_handler<MessageType>(*this)
00051 #define WISP_SERVICE_REQUEST_T(MessageType) \
00052   this->template set_request_handler<MessageType>(*this)
00053 #define WISP_SERVICE_RESPONSE_T(MessageType) \
00054   this->template set_response_handler<MessageType>(*this)
00055 
00056 #define WISP_SERVICE_REQUEST_BUFFERED(MessageType, msg) \
00057   set_request_handler<MessageType>(*this, msg)
00058 #define WISP_SERVICE_RESPONSE_BUFFERED(MessageType, msg) \
00059   set_response_handler<MessageType>(*this, msg)
00060 #define WISP_SERVICE_REQUEST_BUFFERED_T(MessageType, msg) \
00061   this->template set_request_handler<MessageType>(*this, msg)
00062 #define WISP_SERVICE_RESPONSE_BUFFERED_T(MessageType, msg) \
00063   this->template set_response_handler<MessageType>(*this, msg)
00064 
00065 
00066 typedef boost::function<void (const EventInfo &)> timeout_handler;
00067 // We expose message_handler_type, message_handler_entry, and
00068 // message_handler_map specifically to support dynamically loaded handlers.
00069 typedef boost::function<void (MessageInfo &)> message_handler_type;
00070 
00071 struct message_handler_entry {
00072   wisp_message_protocol protocol;
00073   wisp_message_id id;
00074   message_handler_type handle_message;
00075 
00076   message_handler_entry(const wisp_message_protocol protocol,
00077                         const wisp_message_id id,
00078                         const message_handler_type & message_handler) :
00079     protocol(protocol), id(id), handle_message(message_handler)
00080   { }
00081 };
00082 
00083 typedef boost::multi_index_container<
00084   message_handler_entry,
00085   boost::multi_index::indexed_by<
00086     boost::multi_index::hashed_unique<
00087       boost::multi_index::composite_key<
00088         message_handler_entry,
00089         boost::multi_index::member<message_handler_entry,
00090                                    wisp_message_protocol,
00091                                    &message_handler_entry::protocol>,
00092         boost::multi_index::member<message_handler_entry, wisp_message_id,
00093                                    &message_handler_entry::id> >
00094       >
00095     > > message_handler_map;
00096 
00097 /* BEGIN EXPERIMENTAL CODE */
00098 // TODO: This is all one big kluge and should be re-implemented, but it works..
00099 class ServiceContext {
00100 public:
00101   class TimeoutHandler : public EventHandler {
00102     ServiceContext & _context;
00103     bool _once;
00104     timeout_handler _handler;
00105 
00106   public:
00107 
00108     TimeoutHandler(ServiceContext & context,
00109                    const timeout_handler & handler, bool once) :
00110       _context(context), _once(once), _handler(handler)
00111     { }
00112 
00113     virtual ~TimeoutHandler() { }
00114 
00115     virtual void handle_timeout(const EventInfo & info) {
00116       _handler(info);
00117       // TODO: WARNING: This could blow up because the current object
00118       // may be deleted on the call to remove_timeout.  This works only
00119       // because the removal happens at the end of the function and we
00120       // don't access 'this' again.  Seek an alternative solution.
00121       if(_once)
00122         _context.remove_timeout(this);
00123     }
00124 
00125     void execute() {
00126       handle_timeout(EventInfo(_context.event_loop(), EventLoop::None, false));
00127     }
00128 
00129     bool once() {
00130       return _once;
00131     }
00132 
00133     const TimeoutHandler *address() const {
00134       return this;
00135     }
00136   };
00137 
00138   typedef boost::shared_ptr<TimeoutHandler> timeout_handler_ptr;
00139 
00140   typedef boost::multi_index_container<
00141     timeout_handler_ptr,
00142     boost::multi_index::indexed_by<
00143       boost::multi_index::hashed_unique<
00144         boost::multi_index::const_mem_fun<TimeoutHandler,
00145                                           const TimeoutHandler *,
00146                                           &TimeoutHandler::address> >
00147   > > timeout_map;
00148 
00149 private:
00150   EventLoop *loop;
00151   timeout_map _timeouts;
00152 
00153 public:
00154 
00155   ServiceContext(EventLoop * loop = 0) : loop(loop) { }
00156 
00157   timeout_map::size_type count_timeouts() const {
00158     return _timeouts.size();
00159   }
00160 
00161   void clear_timeouts() {
00162     for(timeout_map::iterator it = _timeouts.begin(), end = _timeouts.end();
00163         it != end; ++it)
00164       remove_timeout(it->get());
00165   }
00166 
00167   timeout_handler_ptr add_timeout(const timeout_handler & handler_fun,
00168                                   const TimeValue & timeout,
00169                                   const bool once)
00170   {
00171     timeout_handler_ptr handler(new TimeoutHandler(*this, handler_fun, once));
00172     _timeouts.insert(handler);
00173     loop->add_handler(*handler, EventLoop::None, timeout);
00174     return handler;
00175   }
00176 
00177   void remove_timeout(TimeoutHandler *timeout) {
00178     // must remove from loop first or we lose pointer!!!
00179     loop->remove_handler(*timeout);
00180     _timeouts.erase(timeout);
00181   }
00182 
00183   EventLoop & event_loop() { return *loop; }
00184 };
00185 
00186 
00187 typedef ServiceContext::timeout_handler_ptr timeout_ptr; 
00188 
00189 /* END EXPERIMENTAL CODE */
00190 
00191 template<typename PackingTraits = BinaryPackingTraits>
00192 class ServiceProtocolProcessor {
00193 public:
00194   typedef PackingTraits packing_traits;
00195   typedef typename protocol::ContinuationCaller<packing_traits> caller_type;
00196 
00197   enum State { Starting, Started, Stopping, Stopped };
00198 
00207   static const bool GroupMembership = GroupMembershipDisable;
00208 
00209 private:
00210   State _state;
00211 
00212 private:
00213 
00214   message_handler_map _request_handlers;
00215   message_handler_map _response_handlers;
00216 
00217 protected:
00218   caller_type & _caller;
00219 
00220   // WARNING!! This is highly experimental and only made available so
00221   // a subclass may add handlers to the event loop for I/O on other
00222   // descriptors.  The context is only valid in the Starting and
00223   // Started states.
00224   ServiceContext & context() { return _context; };
00225 
00226   virtual void process_membership_message(const MessageInfo & msginfo,
00227                                           const MembershipInfo & meminfo)
00228   { }
00229 
00230   // So you can reuse message types that are expensive to create on each call.
00231   template<typename MessageType, typename Impl>
00232   void request(Impl & impl, MessageType & msg, MessageInfo & msginfo)
00233     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00234                     CallException)
00235   {
00236     _caller.unpack(msg, msginfo);
00237     // Cast forces msginfo to be const arg in process_request
00238     impl.process_request(msg, static_cast<const MessageInfo &>(msginfo));
00239   }
00240 
00241   template<typename MessageType, typename Impl>
00242   void request(Impl & impl, MessageInfo & msginfo)
00243     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00244   {
00245     MessageType msg;
00246     request(impl, msg, msginfo);
00247   }
00248 
00249   template<typename MessageType, typename Impl>
00250   void respond(Impl & impl, MessageType & msg, MessageInfo & msginfo)
00251     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00252                     CallException)
00253   {
00254     _caller.unpack(msg, msginfo);
00255     // Cast forces msginfo to be const arg in process_response
00256     impl.process_response(msg, static_cast<const MessageInfo &>(msginfo));
00257   }
00258 
00259   template<typename MessageType, typename Impl>
00260   void respond(Impl & impl, MessageInfo & msginfo)
00261     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00262   {
00263     MessageType msg;
00264     respond(impl, msg, msginfo);
00265   }
00266 
00267   bool set_request_handler(const message_handler_entry & handler) {
00268     return _request_handlers.insert(handler).second;
00269   }
00270 
00271   message_handler_map::size_type
00272   remove_request_handler(const message_handler_entry & handler) {
00273     return
00274       _request_handlers.erase(_request_handlers.key_extractor()((handler)));
00275   }
00276 
00277   void clear_request_handlers() {
00278     _request_handlers.clear();
00279   }
00280 
00281   template<typename MessageType, typename Impl>
00282   bool set_request_handler(Impl & impl) {
00283     return set_request_handler(message_handler_entry(MessageType::protocol,
00284                                                      MessageType::id,
00285                                                      boost::bind(&ServiceProtocolProcessor::template request<MessageType, Impl>, this, std::ref(impl), _1)));
00286   }
00287 
00288   template<typename MessageType, typename Impl>
00289   bool set_request_handler(Impl & impl, MessageType & buffer)  {
00290     return set_request_handler(message_handler_entry(MessageType::protocol,
00291                                                      MessageType::id,
00292                                                      boost::bind(&ServiceProtocolProcessor::template request<MessageType, Impl>, this, std::ref(impl), std::ref(buffer), _1)));
00293   }
00294 
00295   bool set_response_handler(const message_handler_entry & handler) {
00296     return _response_handlers.insert(handler).second;
00297   }
00298 
00299   message_handler_map::size_type
00300   remove_response_handler(const message_handler_entry & handler) {
00301     return
00302       _response_handlers.erase(_response_handlers.key_extractor()((handler)));
00303   }
00304 
00305   void clear_response_handlers() {
00306     _response_handlers.clear();
00307   }
00308 
00309   template<typename MessageType, typename Impl>
00310   bool set_response_handler(Impl & impl) {
00311     return set_response_handler(message_handler_entry(MessageType::protocol,
00312                                                       MessageType::id,
00313                                                       boost::bind(&ServiceProtocolProcessor::template respond<MessageType, Impl>, this, std::ref(impl), _1)));  }
00314 
00315   template<typename MessageType, typename Impl>
00316   bool set_response_handler(Impl & impl, MessageType & buffer) {
00317     return set_response_handler(message_handler_entry(MessageType::protocol,
00318                                                       MessageType::id,
00319                                                       boost::bind(&ServiceProtocolProcessor::template respond<MessageType, Impl>, this, std::ref(impl), std::ref(buffer), _1)));
00320   }
00321 
00322   virtual void transition(State state) {
00323     _state = state;
00324   }
00325 
00326 public:
00327 
00328   explicit ServiceProtocolProcessor(caller_type & caller) :
00329     _state(Stopped),
00330     _request_handlers(), _response_handlers(), _caller(caller)
00331   { }
00332 
00333   virtual ~ServiceProtocolProcessor() { }
00334 
00335   string name() const {
00336     return _caller.name();
00337   }
00338 
00339   State state() const {
00340     return _state;
00341   }
00342 
00343   void start() {
00344     transition(Starting);
00345   }
00346 
00347   // Services listening for membership messages should wait for self-leave
00348   // before transitioning to Stopped.
00349   void stop() {
00350     transition(Stopping);
00351   }
00352 
00353   void membership(const MessageInfo & msginfo,
00354                   const MembershipInfo & meminfo)
00355   {
00356     process_membership_message(msginfo, meminfo);
00357   }
00358 
00359   void request(MessageInfo & msginfo) {
00360     message_handler_map::iterator it = 
00361       _request_handlers.find(boost::make_tuple(msginfo.protocol(),
00362                                                msginfo.id()));
00363     if(it != _request_handlers.end())
00364       it->handle_message(msginfo);
00365     // TODO: add an else log unhandled message
00366   }
00367 
00371   void response(MessageInfo & msginfo) {
00372     if(!_caller.resume(msginfo)) {
00373       message_handler_map::iterator it = 
00374         _response_handlers.find(boost::make_tuple(msginfo.protocol(),
00375                                                   msginfo.id()));
00376       if(it != _response_handlers.end())
00377         it->handle_message(msginfo);
00378       // TODO: add an else log unhandled message
00379     }
00380   }
00381 
00382   /* BEGIN EXPERIMENTAL CODE */
00383   // TODO: This is all one big kluge and should be re-implemented (which
00384   // is why this is stuffed at the end of the class), but it works.
00385 
00386   timeout_ptr schedule_timeout(const timeout_handler & handler,
00387                                const TimeValue & timeout,
00388                                const bool once = EventLoop::Persist)
00389   {
00390     return _context.add_timeout(handler, timeout, once);
00391   }
00392 
00393   void cancel_timeout(const timeout_ptr & timeout) {
00394     _context.remove_timeout(timeout.get());
00395   }
00396 
00397   void clear_timeouts() {
00398     _context.clear_timeouts();
00399   }
00400 
00401   ServiceContext::timeout_map::size_type count_timeouts() const {
00402     return _context.count_timeouts();
00403   }
00404 
00405 private:
00406   template<typename PP> friend class ServiceEventHandler;
00407 
00408   ServiceContext _context;
00409 
00410   void set_service_context(const ServiceContext & context) {
00411     _context = context;
00412   }
00413 
00414   /* END EXPERIMENTAL CODE */
00415 };
00416 
00417 template<typename PP>
00418 class ServiceEventHandler : public EventHandler {
00419 public:
00420   typedef PP protocol_processor;
00421   WISP_IMPORT_T(protocol_processor, caller_type);
00422 
00423 private:
00424   caller_type & _caller;
00425   protocol_processor _protocol;
00426   // Scratch variables for handle_event().
00427   message_info_ptr _message_info;
00428   MembershipInfo _membership_info;
00429   protocol::wisp_call_token _min_token, _max_token;
00430   std::vector<typename caller_type::jumbo_message_key_type> _jumbo_message_keys;
00431 
00432 protected:
00433 
00434   virtual void remove_handler(EventLoop & loop) {
00435     /* BEGIN EXPERIMENTAL CODE */
00436     // TODO: This is a kluge and should be re-implemented, but it works.
00437     _protocol.clear_timeouts();
00438     /* END EXPERIMENTAL CODE */
00439     loop.remove_handler(*this);
00440     _caller.cancel_all();
00441   }
00442 
00443 public:
00444 
00445   explicit ServiceEventHandler(caller_type & caller) :
00446     _caller(caller), _protocol(caller),
00447     _message_info(new MessageInfo(caller.message_capacity())),
00448     _membership_info(),
00449     _min_token(0), _max_token(0),
00450     _jumbo_message_keys(4)
00451   { }
00452 
00453   template<typename Initializer>
00454   explicit ServiceEventHandler(caller_type & caller,
00455                                const Initializer & initializer) :
00456     _caller(caller), _protocol(caller, initializer),
00457     _message_info(),
00458     _membership_info(),
00459     _min_token(0), _max_token(0),
00460     _jumbo_message_keys(4)
00461   { }
00462 
00463   virtual ~ServiceEventHandler() { }
00464 
00465   typename protocol_processor::State state() const {
00466     return _protocol.state();
00467   }
00468 
00469   virtual int event_descriptor() const {
00470     return _caller.mbox().descriptor();
00471   }
00472 
00473   virtual void handle_read(const EventInfo & info) {
00474     try {
00475       if(state() != protocol_processor::Stopped) {
00476         _caller.receive(_message_info);
00477 
00478         if(_message_info->message.is_regular() &&
00479            _protocol.state() < protocol_processor::Stopping)
00480         {
00481           if(_message_info->role() == protocol::TwoWayResponse)
00482             _protocol.response(*_message_info);
00483           else
00484             _protocol.request(*_message_info);
00485         } else if(_message_info->message.is_membership()) {
00486           _message_info->message.get_membership_info(_membership_info);
00487           _protocol.membership(*_message_info, _membership_info);
00488         }
00489 
00490         if(state() == protocol_processor::Stopped)
00491           remove_handler(info.event_loop());
00492       }
00493     } catch(const boost::archive::archive_exception & ae) {
00494       // TODO: log
00495 #ifdef WISP_DEBUG
00496       std::cerr << _protocol.name()
00497                 << ": Caught boost::archive::archive_exception: " 
00498                 << ae.what() << "\nContinuing.";
00499 #endif
00500     } catch(const std::ios_base::failure & iof) {
00501       // TODO: log. probably don't want to try again
00502 #ifdef WISP_DEBUG
00503       std::cerr << _protocol.name() << ": Caught std::ios_base::failure: "
00504                 << iof.what() << "\nContinuing.";
00505 #endif
00506     }
00507   }
00508 
00509   virtual void handle_timeout(const EventInfo & info) {
00510     // Cancel continuations
00511     _caller.cancel_range(_min_token, _max_token);
00512     _min_token = _max_token;
00513     _max_token = _caller.call_token();
00514 
00515     // Clear out incomplete jumbo messages
00516     if(!_jumbo_message_keys.empty()) {
00517       if(_caller.count_jumbo_messages() > 0) {
00518         _caller.erase_jumbo_messages(_jumbo_message_keys.begin(),
00519                                      _jumbo_message_keys.end());
00520       }
00521       _jumbo_message_keys.clear();
00522     }
00523 
00524     if(_caller.count_jumbo_messages() > 0) {
00525       _caller.collect_jumbo_message_keys(_jumbo_message_keys);
00526     }
00527   }
00528 
00529   protocol_processor & protocol() {
00530     return _protocol;
00531   }
00532 
00533   void start(EventLoop & loop, const unsigned int call_timeout) {
00534     _min_token = _max_token = _caller.call_token();
00535     loop.add_handler(*this, EventLoop::Read, TimeValue(call_timeout, 0));
00536     /* BEGIN EXPERIMENTAL CODE */
00537     // TODO: This is a kluge and should be re-implemented, but it works.
00538     _protocol.set_service_context(ServiceContext(&loop));
00539     /* END EXPERIMENTAL CODE */
00540     _protocol.start();
00541   }
00542 
00543   void stop() {
00544     _protocol.stop();
00545   }
00546 };
00547 
00548 
00552 template<typename EH>
00553 class Service {
00554 public:
00555   typedef EH event_handler;
00556   WISP_IMPORT_T(event_handler, caller_type);
00557   WISP_IMPORT_T(event_handler, protocol_processor);
00558 
00559 private:
00560   caller_type _caller;
00561   event_handler _handler;
00562 
00563 protected:
00564 
00565   protocol_processor & protocol() {
00566     return _handler.protocol();
00567   }
00568 
00569 public:
00570 
00571   explicit Service(const string & connection = "",
00572                    const string & name = "", 
00573                    const unsigned int message_capacity =
00574                    Message::DefaultCapacity) :
00575     _caller(connection, name, message_capacity,
00576             protocol_processor::GroupMembership),
00577     _handler(_caller)
00578   { }
00579 
00580   template<typename Initializer>
00581   explicit Service(const Initializer & initializer,
00582                    const string & connection = "",
00583                    const string & name = "", 
00584                    const unsigned int message_capacity =
00585                    Message::DefaultCapacity) :
00586     _caller(connection, name, message_capacity,
00587             protocol_processor::GroupMembership),
00588     _handler(_caller, initializer)
00589   { }
00590 
00591   const string & name() const {
00592     return _caller.name();
00593   }
00594 
00595   typename protocol_processor::State state() {
00596     return protocol().state();
00597   }
00598 
00599   void start(EventLoop & loop, unsigned int call_timeout) {
00600     _handler.start(loop, call_timeout);
00601   }
00602 
00603   void stop() {
00604     _handler.stop();
00605   }
00606 };
00607 
00608 __END_NS_SSRC_WISP_SERVICE
00609 
00610 #endif

Savarese Software Research Corporation
Copyright © 2006-2010 Savarese Software Research Corporation. All rights reserved.