ContinuationCaller.h
Go to the documentation of this file.
00001 /* 00002 * Copyright 2006-2008 Savarese Software Research Corporation 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.savarese.com/software/ApacheLicense-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00023 #ifndef __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H 00024 #define __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H 00025 00026 #include <ssrc/wisp/protocol/Caller.h> 00027 00028 #include <boost/function.hpp> 00029 #include <boost/multi_index/member.hpp> 00030 #include <boost/multi_index/ordered_index.hpp> 00031 00032 __BEGIN_NS_SSRC_WISP_PROTOCOL 00033 00034 struct Continuation { 00035 // We save token as an optimization to avoid invoking a virtual function 00036 // in the hash index. 00037 wisp_call_token token; 00038 00039 protected: 00040 explicit Continuation(wisp_call_token token) : token(token) { } 00041 00042 public: 00043 virtual ~Continuation() { } 00044 00045 virtual void resume(MessageInfo & msginfo) = 0; 00046 }; 00047 00048 00049 typedef boost::shared_ptr<Continuation> continuation_ptr; 00050 const continuation_ptr NullContinuation = continuation_ptr(); 00051 00052 template<typename CallTraits_, typename Closure> 00053 class FutureContinuation : public Continuation { 00054 public: 00055 typedef CallTraits_ call_traits; 00056 typedef typename call_traits::future_ptr future_ptr; 00057 typedef typename call_traits::return_type return_type; 00058 // Closure should be void (const return_type &) 00059 typedef Closure continuation_function; 00060 00061 private: 00062 future_ptr _future; 00063 return_type _result; 00064 continuation_function _continue; 00065 00066 public: 00067 00068 FutureContinuation(const future_ptr & future, 00069 continuation_function && continue_) : 00070 Continuation(future->token()), 00071 _future(future), _result(), 00072 _continue(std::forward<continuation_function>(continue_)) 00073 { } 00074 00075 virtual ~FutureContinuation() { } 00076 00077 virtual void resume(MessageInfo & msginfo) { 00078 _future->unpack(_result, msginfo); 00079 _continue(_result); 00080 } 00081 }; 00082 00083 // Specialization for use with boost::bind 00084 template<typename CallTraits_> 00085 class FutureContinuation<CallTraits_, 00086 boost::function<void (const typename CallTraits_::return_type &)> >: 00087 public Continuation 00088 { 00089 public: 00090 typedef CallTraits_ call_traits; 00091 typedef typename call_traits::future_ptr future_ptr; 00092 typedef typename call_traits::return_type return_type; 00093 typedef boost::function<void (const return_type &)> continuation_function; 00094 00095 private: 00096 future_ptr _future; 00097 return_type _result; 00098 continuation_function _continue; 00099 00100 public: 00101 00102 FutureContinuation(const future_ptr & future, 00103 continuation_function && continue_) : 00104 Continuation(future->token()), 00105 _future(future), _result(), 00106 _continue(std::forward<continuation_function>(continue_)) 00107 { } 00108 00109 virtual ~FutureContinuation() { } 00110 00111 virtual void resume(MessageInfo & msginfo) { 00112 _future->unpack(_result, msginfo); 00113 _continue(_result); 00114 } 00115 }; 00116 00117 template<typename PT = BinaryPackingTraits> 00118 class ContinuationCaller : public protocol::Caller<PT> { 00119 typedef protocol::Caller<PT> super; 00120 00121 enum { ByHash, ByOrder }; 00122 00123 typedef boost::multi_index_container< 00124 continuation_ptr, 00125 boost::multi_index::indexed_by< 00126 boost::multi_index::hashed_unique< 00127 boost::multi_index::member<Continuation, wisp_call_token, 00128 &Continuation::token> >, 00129 boost::multi_index::ordered_non_unique< 00130 boost::multi_index::member<Continuation, wisp_call_token, 00131 &Continuation::token> > 00132 > > continuation_map; 00133 00134 typedef 00135 typename continuation_map::template nth_index<ByHash>::type index_by_hash; 00136 typedef 00137 typename continuation_map::template nth_index<ByOrder>::type index_by_order; 00138 00139 continuation_map _continuations; 00140 00141 public: 00142 00143 explicit 00144 ContinuationCaller(const string & connection = "", 00145 const string & name = "", 00146 const unsigned int message_capacity = Message::DefaultCapacity, 00147 const bool group_membership = GroupMembershipDisable) : 00148 super(connection, name, message_capacity, group_membership), 00149 _continuations() 00150 { } 00151 00152 unsigned int continuations_map_size() const { 00153 return _continuations.size(); 00154 } 00155 00156 template<typename CallTraits_, typename Closure, typename DestinationType> 00157 continuation_ptr 00158 split_call(Closure && continue_, 00159 const DestinationType & dest, 00160 const typename CallTraits_::parameter_type & param, 00161 const Message::Service service = DefaultMessageServiceType) 00162 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00163 { 00164 return continuation_ptr(new FutureContinuation<CallTraits_, Closure>(super::template call<CallTraits_>(dest, param, service), std::forward<Closure>(continue_))); 00165 } 00166 00167 void schedule(const continuation_ptr & continuation) { 00168 _continuations.insert(continuation); 00169 } 00170 00171 template<typename CallTraits_, typename Closure, typename DestinationType> 00172 void future_call(Closure && continue_, 00173 const DestinationType & dest, 00174 const typename CallTraits_::parameter_type & param, 00175 const Message::Service service = DefaultMessageServiceType) 00176 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00177 { 00178 schedule(split_call<CallTraits_>(std::forward<Closure>(continue_), 00179 dest, param, service)); 00180 } 00181 00182 // Cancels all pending requests with min_token <= token < max_token 00183 void cancel_range(wisp_call_token min_token, wisp_call_token max_token) { 00184 index_by_order & index = _continuations.template get<ByOrder>(); 00185 00186 start: 00187 if(min_token < max_token) 00188 index.erase(index.lower_bound(min_token), index.upper_bound(max_token)); 00189 else if(min_token > max_token) { 00190 index.erase(index.lower_bound(min_token), index.end()); 00191 min_token = 0; 00192 goto start; 00193 } 00194 } 00195 00196 void cancel_all() { 00197 _continuations.clear(); 00198 } 00199 00200 continuation_ptr cancel(wisp_call_token token) { 00201 continuation_map::iterator it = _continuations.find(token); 00202 00203 if(it != _continuations.end()) { 00204 continuation_ptr result = *it; 00205 _continuations.erase(it); 00206 return result; 00207 } 00208 00209 return NullContinuation; 00210 } 00211 00212 bool resume(MessageInfo & msginfo) { 00213 continuation_ptr result = cancel(msginfo.token()); 00214 00215 if(result != NullContinuation) { 00216 result->resume(msginfo); 00217 return true; 00218 } 00219 00220 return false; 00221 } 00222 00223 template<typename CallTraits_, typename Closure, typename DestinationType> 00224 void future_call(Closure && continue_, 00225 const DestinationType & dest, 00226 const Message::Service service = DefaultMessageServiceType) 00227 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00228 { 00229 future_call<CallTraits_>(std::forward<Closure>(continue_), 00230 dest, typename CallTraits_::parameter_type(), 00231 service); 00232 } 00233 00234 template<typename CallTraits_, typename Closure, typename DestinationType, 00235 typename... P> 00236 void future_call(const Message::Service service, 00237 Closure && continue_, 00238 const DestinationType & dest, P && ...p) 00239 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00240 { 00241 future_call<CallTraits_>(std::forward<Closure>(continue_), dest, 00242 typename CallTraits_::parameter_type(std::forward<P>(p)...), 00243 service); 00244 } 00245 00246 template<typename CallTraits_, typename Closure, typename DestinationType, 00247 typename... P> 00248 void future_callp(Closure && continue_, 00249 const DestinationType & dest, P && ...p) 00250 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00251 { 00252 future_call<CallTraits_>(std::forward<Closure>(continue_), dest, 00253 typename CallTraits_::parameter_type(std::forward<P>(p)...)); 00254 } 00255 00256 }; 00257 00258 __END_NS_SSRC_WISP_PROTOCOL 00259 00260 #endif
Copyright © 2006-2010 Savarese Software Research Corporation. All rights reserved.
Copyright © 2011 Savarese Software Research Corporation. All rights reserved