Savarese Software Research Corporation
ContinuationCaller.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2006-2008 Savarese Software Research Corporation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.savarese.com/software/ApacheLicense-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00023 #ifndef __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H
00024 #define __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H
00025 
00026 #include <ssrc/wisp/protocol/Caller.h>
00027 
00028 #include <boost/function.hpp>
00029 #include <boost/multi_index/member.hpp>
00030 #include <boost/multi_index/ordered_index.hpp>
00031 
00032 __BEGIN_NS_SSRC_WISP_PROTOCOL
00033 
00034 struct Continuation {
00035   // We save token as an optimization to avoid invoking a virtual function
00036   // in the hash index.
00037   wisp_call_token token;
00038 
00039 protected:
00040   explicit Continuation(wisp_call_token token) : token(token) { }
00041 
00042 public:
00043   virtual ~Continuation() { }
00044 
00045   virtual void resume(MessageInfo & msginfo) = 0;
00046 };
00047 
00048 
00049 typedef boost::shared_ptr<Continuation> continuation_ptr;
00050 const continuation_ptr NullContinuation = continuation_ptr();
00051 
00052 template<typename CallTraits_, typename Closure>
00053 class FutureContinuation : public Continuation {
00054 public:
00055   typedef CallTraits_ call_traits;
00056   typedef typename call_traits::future_ptr future_ptr;
00057   typedef typename call_traits::return_type return_type;
00058   // Closure should be void (const return_type &)
00059   typedef Closure continuation_function;
00060 
00061 private:
00062   future_ptr _future;
00063   return_type _result;
00064   continuation_function _continue;
00065 
00066 public:
00067 
00068   FutureContinuation(const future_ptr & future,
00069                      continuation_function && continue_) :
00070     Continuation(future->token()),
00071     _future(future), _result(),
00072     _continue(std::forward<continuation_function>(continue_))
00073   { }
00074 
00075   virtual ~FutureContinuation() { }
00076 
00077   virtual void resume(MessageInfo & msginfo) {
00078     _future->unpack(_result, msginfo);
00079     _continue(_result);
00080   }
00081 };
00082 
00083 // Specialization for use with boost::bind
00084 template<typename CallTraits_>
00085 class FutureContinuation<CallTraits_,
00086                          boost::function<void (const typename CallTraits_::return_type &)> >:
00087   public Continuation
00088 {
00089 public:
00090   typedef CallTraits_ call_traits;
00091   typedef typename call_traits::future_ptr future_ptr;
00092   typedef typename call_traits::return_type return_type;
00093   typedef boost::function<void (const return_type &)> continuation_function;
00094 
00095 private:
00096   future_ptr _future;
00097   return_type _result;
00098   continuation_function _continue;
00099 
00100 public:
00101 
00102   FutureContinuation(const future_ptr & future,
00103                      continuation_function && continue_) :
00104     Continuation(future->token()),
00105     _future(future), _result(),
00106     _continue(std::forward<continuation_function>(continue_))
00107   { }
00108 
00109   virtual ~FutureContinuation() { }
00110 
00111   virtual void resume(MessageInfo & msginfo) {
00112     _future->unpack(_result, msginfo);
00113     _continue(_result);
00114   }
00115 };
00116 
00117 template<typename PT = BinaryPackingTraits>
00118 class ContinuationCaller : public protocol::Caller<PT> {
00119   typedef protocol::Caller<PT> super;
00120 
00121   enum { ByHash, ByOrder };
00122 
00123   typedef boost::multi_index_container<
00124     continuation_ptr,
00125     boost::multi_index::indexed_by<
00126       boost::multi_index::hashed_unique<
00127         boost::multi_index::member<Continuation, wisp_call_token,
00128                                    &Continuation::token> >,
00129       boost::multi_index::ordered_non_unique<
00130         boost::multi_index::member<Continuation, wisp_call_token,
00131                                    &Continuation::token> >
00132       > > continuation_map;
00133 
00134   typedef
00135   typename continuation_map::template nth_index<ByHash>::type index_by_hash;
00136   typedef
00137   typename continuation_map::template nth_index<ByOrder>::type index_by_order;
00138 
00139   continuation_map _continuations;
00140 
00141 public:
00142 
00143   explicit
00144   ContinuationCaller(const string & connection = "",
00145                      const string & name = "", 
00146                      const unsigned int message_capacity = Message::DefaultCapacity,
00147                      const bool group_membership = GroupMembershipDisable) :
00148     super(connection, name, message_capacity, group_membership),
00149     _continuations()
00150   { }
00151 
00152   unsigned int continuations_map_size() const {
00153     return _continuations.size();
00154   }
00155 
00156   template<typename CallTraits_, typename Closure, typename DestinationType>
00157   continuation_ptr
00158   split_call(Closure && continue_,
00159              const DestinationType & dest,
00160              const typename CallTraits_::parameter_type & param,
00161              const Message::Service service = DefaultMessageServiceType)
00162     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00163   {
00164     return continuation_ptr(new FutureContinuation<CallTraits_, Closure>(super::template call<CallTraits_>(dest, param, service), std::forward<Closure>(continue_)));
00165   }
00166 
00167   void schedule(const continuation_ptr & continuation) {
00168     _continuations.insert(continuation);
00169   }
00170 
00171   template<typename CallTraits_, typename Closure, typename DestinationType>
00172   void future_call(Closure && continue_,
00173                    const DestinationType & dest,
00174                    const typename CallTraits_::parameter_type & param,
00175                    const Message::Service service = DefaultMessageServiceType)
00176     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00177   {
00178     schedule(split_call<CallTraits_>(std::forward<Closure>(continue_),
00179                                      dest, param, service));
00180   }
00181 
00182   // Cancels all pending requests with min_token <= token < max_token
00183   void cancel_range(wisp_call_token min_token, wisp_call_token max_token) {
00184     index_by_order & index = _continuations.template get<ByOrder>();
00185 
00186   start:
00187     if(min_token < max_token)
00188       index.erase(index.lower_bound(min_token), index.upper_bound(max_token));
00189     else if(min_token > max_token) {
00190       index.erase(index.lower_bound(min_token), index.end());
00191       min_token = 0;
00192       goto start;
00193     }
00194   }
00195 
00196   void cancel_all() {
00197     _continuations.clear();
00198   }
00199 
00200   continuation_ptr cancel(wisp_call_token token) {
00201     continuation_map::iterator it = _continuations.find(token);
00202 
00203     if(it != _continuations.end()) {
00204       continuation_ptr result = *it;
00205       _continuations.erase(it);
00206       return result;
00207     }
00208 
00209     return NullContinuation;
00210   }
00211 
00212   bool resume(MessageInfo & msginfo) {
00213     continuation_ptr result = cancel(msginfo.token());
00214 
00215     if(result != NullContinuation) {
00216       result->resume(msginfo);
00217       return true;
00218     }
00219 
00220     return false;
00221   }
00222 
00223   template<typename CallTraits_, typename Closure, typename DestinationType>
00224   void future_call(Closure && continue_,
00225                    const DestinationType & dest,
00226                    const Message::Service service = DefaultMessageServiceType)
00227     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00228   {
00229     future_call<CallTraits_>(std::forward<Closure>(continue_),
00230                              dest, typename CallTraits_::parameter_type(),
00231                              service);
00232   }
00233 
00234   template<typename CallTraits_, typename Closure, typename DestinationType,
00235            typename... P>
00236   void future_call(const Message::Service service,
00237                    Closure && continue_,
00238                    const DestinationType & dest, P && ...p)
00239     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)    
00240   {
00241     future_call<CallTraits_>(std::forward<Closure>(continue_), dest,
00242                              typename CallTraits_::parameter_type(std::forward<P>(p)...),
00243                              service);
00244   }
00245 
00246   template<typename CallTraits_, typename Closure, typename DestinationType,
00247            typename... P>
00248   void future_callp(Closure && continue_,
00249                     const DestinationType & dest, P && ...p)
00250     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)    
00251   {
00252     future_call<CallTraits_>(std::forward<Closure>(continue_), dest,
00253                              typename CallTraits_::parameter_type(std::forward<P>(p)...));
00254   }
00255 
00256 };
00257 
00258 __END_NS_SSRC_WISP_PROTOCOL
00259 
00260 #endif

Savarese Software Research Corporation
Copyright © 2006-2010 Savarese Software Research Corporation. All rights reserved.