Savarese Software Research Corporation
ContinuationCaller.h
Go to the documentation of this file.
1 /* Copyright 2006-2013 Savarese Software Research Corporation.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.savarese.com/software/ApacheLicense-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
22 #ifndef __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H
23 #define __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H
24 
26 
27 #include <functional>
28 
29 #include <boost/multi_index/member.hpp>
30 #include <boost/multi_index/ordered_index.hpp>
31 
33 
34 struct Continuation {
35  // We save token as an optimization to avoid invoking a virtual function
36  // in the hash index.
38 
39 protected:
40  explicit Continuation(wisp_call_token token) : token(token) { }
41 
42 public:
43  virtual ~Continuation() = default;
44 
45  virtual void resume(MessageInfo & msginfo) = 0;
46 };
47 
48 
49 typedef std::shared_ptr<Continuation> continuation_ptr;
51 
52 template<typename CallTraits_, typename Closure>
54 public:
55  typedef CallTraits_ call_traits;
56  typedef typename call_traits::future_ptr future_ptr;
57  typedef typename call_traits::return_type return_type;
58  // Closure should be void (const return_type &)
59  typedef Closure continuation_function;
60 
61 private:
62  future_ptr _future;
63  return_type _result;
64  continuation_function _continue;
65 
66 public:
67 
68  FutureContinuation(const future_ptr & future,
69  continuation_function && continue_) :
70  Continuation(future->token()),
71  _future(future), _result(),
72  _continue(std::forward<continuation_function>(continue_))
73  { }
74 
75  virtual ~FutureContinuation() = default;
76 
77  virtual void resume(MessageInfo & msginfo) {
78  _future->unpack(_result, msginfo);
79  _continue(_result);
80  }
81 };
82 
83 // Specialization for use with std::bind
84 template<typename CallTraits_>
85 class FutureContinuation<CallTraits_,
86  std::function<void (const typename CallTraits_::return_type &)> >:
87  public Continuation
88 {
89 public:
90  typedef CallTraits_ call_traits;
91  typedef typename call_traits::future_ptr future_ptr;
92  typedef typename call_traits::return_type return_type;
93  typedef std::function<void (const return_type &)> continuation_function;
94 
95 private:
96  future_ptr _future;
97  return_type _result;
98  continuation_function _continue;
99 
100 public:
101 
102  FutureContinuation(const future_ptr & future,
103  continuation_function && continue_) :
104  Continuation(future->token()),
105  _future(future), _result(),
106  _continue(std::forward<continuation_function>(continue_))
107  { }
108 
109  virtual ~FutureContinuation() = default;
110 
111  virtual void resume(MessageInfo & msginfo) {
112  _future->unpack(_result, msginfo);
113  _continue(_result);
114  }
115 };
116 
117 template<typename PT = BinaryPackingTraits>
118 class ContinuationCaller : public protocol::Caller<PT> {
119  typedef protocol::Caller<PT> super;
120 
121  enum { ByHash, ByOrder };
122 
123  typedef boost::multi_index_container<
125  boost::multi_index::indexed_by<
126  boost::multi_index::hashed_unique<
127  boost::multi_index::member<Continuation, wisp_call_token,
129  boost::multi_index::ordered_non_unique<
130  boost::multi_index::member<Continuation, wisp_call_token,
131  &Continuation::token> >
132  > > continuation_map;
133 
134  typedef
135  typename continuation_map::template nth_index<ByHash>::type index_by_hash;
136  typedef
137  typename continuation_map::template nth_index<ByOrder>::type index_by_order;
138 
139  continuation_map _continuations;
140 
141 public:
142 
143  explicit
144  ContinuationCaller(const std::string & connection = "",
145  const std::string & name = "",
146  const unsigned int message_capacity = Message::DefaultCapacity,
147  const bool group_membership = GroupMembershipDisable) :
148  super(connection, name, message_capacity, group_membership),
149  _continuations()
150  { }
151 
152  unsigned int continuations_map_size() const {
153  return _continuations.size();
154  }
155 
156  template<typename CallTraits_, typename Closure, typename DestinationType>
157  continuation_ptr
158  split_call(Closure && continue_,
159  const DestinationType & dest,
160  const typename CallTraits_::parameter_type & param,
161  const Message::Service service = DefaultMessageServiceType)
162  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
163  {
164  return std::make_shared<FutureContinuation<CallTraits_, Closure>>(super::template call<CallTraits_>(dest, param, service), std::forward<Closure>(continue_));
165  }
166 
167  void schedule(const continuation_ptr & continuation) {
168  _continuations.insert(continuation);
169  }
170 
171  template<typename CallTraits_, typename Closure, typename DestinationType>
172  void future_call(Closure && continue_,
173  const DestinationType & dest,
174  const typename CallTraits_::parameter_type & param,
175  const Message::Service service = DefaultMessageServiceType)
176  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
177  {
178  schedule(split_call<CallTraits_>(std::forward<Closure>(continue_),
179  dest, param, service));
180  }
181 
182  // Cancels all pending requests with min_token <= token < max_token
183  void cancel_range(wisp_call_token min_token, wisp_call_token max_token) {
184  index_by_order & index = _continuations.template get<ByOrder>();
185 
186  start:
187  if(min_token < max_token)
188  index.erase(index.lower_bound(min_token), index.upper_bound(max_token));
189  else if(min_token > max_token) {
190  index.erase(index.lower_bound(min_token), index.end());
191  min_token = 0;
192  goto start;
193  }
194  }
195 
196  void cancel_all() {
197  _continuations.clear();
198  }
199 
200  continuation_ptr cancel(wisp_call_token token) {
201  continuation_map::iterator it = _continuations.find(token);
202 
203  if(it != _continuations.end()) {
204  continuation_ptr result = *it;
205  _continuations.erase(it);
206  return result;
207  }
208 
209  return NullContinuation;
210  }
211 
212  bool resume(MessageInfo & msginfo) {
213  continuation_ptr result = cancel(msginfo.token());
214 
215  if(result != NullContinuation) {
216  result->resume(msginfo);
217  return true;
218  }
219 
220  return false;
221  }
222 
223  template<typename CallTraits_, typename Closure, typename DestinationType>
224  void future_call(Closure && continue_,
225  const DestinationType & dest,
226  const Message::Service service = DefaultMessageServiceType)
227  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
228  {
229  future_call<CallTraits_>(std::forward<Closure>(continue_),
230  dest, typename CallTraits_::parameter_type(),
231  service);
232  }
233 
234  template<typename CallTraits_, typename Closure, typename DestinationType,
235  typename... P>
236  void future_call(const Message::Service service,
237  Closure && continue_,
238  const DestinationType & dest, P && ...p)
239  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
240  {
241  future_call<CallTraits_>(std::forward<Closure>(continue_), dest,
242  typename CallTraits_::parameter_type(std::forward<P>(p)...),
243  service);
244  }
245 
246  template<typename CallTraits_, typename Closure, typename DestinationType,
247  typename... P>
248  void future_callp(Closure && continue_,
249  const DestinationType & dest, P && ...p)
250  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
251  {
252  future_call<CallTraits_>(std::forward<Closure>(continue_), dest,
253  typename CallTraits_::parameter_type(std::forward<P>(p)...));
254  }
255 
256 };
257 
259 
260 #endif

Savarese Software Research Corporation
Copyright © 2006-2012 Savarese Software Research Corporation. All rights reserved.