Branch data Line data Source code
1 : : /* Copyright 2006-2013 Savarese Software Research Corporation.
2 : : *
3 : : * Licensed under the Apache License, Version 2.0 (the "License");
4 : : * you may not use this file except in compliance with the License.
5 : : * You may obtain a copy of the License at
6 : : *
7 : : * http://www.savarese.com/software/ApacheLicense-2.0
8 : : *
9 : : * Unless required by applicable law or agreed to in writing, software
10 : : * distributed under the License is distributed on an "AS IS" BASIS,
11 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : * See the License for the specific language governing permissions and
13 : : * limitations under the License.
14 : : */
15 : :
16 : : /**
17 : : * @file
18 : : * This header defines the Caller class and Wisp communication
19 : : * protocol support classes.
20 : : */
21 : :
22 : : #ifndef __SSRC_WISP_PROTOCOL_CALLER_H
23 : : #define __SSRC_WISP_PROTOCOL_CALLER_H
24 : :
25 : : #include <ssrc/spread.h>
26 : : #include <ssrc/wisp/protocol/ServiceProtocol.h>
27 : : #include <ssrc/wisp/utility/make_smart_ptr.h>
28 : :
29 : : #include <queue>
30 : : #include <sstream>
31 : : #include <stdexcept>
32 : : #include <tuple>
33 : :
34 : : #include <boost/multi_index_container.hpp>
35 : : #include <boost/multi_index/mem_fun.hpp>
36 : : #include <boost/multi_index/hashed_index.hpp>
37 : : #include <boost/multi_index/composite_key.hpp>
38 : :
39 : : __BEGIN_NS_SSRC_WISP_PROTOCOL
40 : :
41 : : using NS_SSRC_SPREAD::GroupList;
42 : : using NS_SSRC_SPREAD::Message;
43 : : using NS_SSRC_SPREAD::Mailbox;
44 : : using NS_SSRC_WISP_UTILITY::make_smart_ptr;
45 : :
46 : : /**
47 : : * Wisp calls can request a Spread message service type if desired. If
48 : : * not specified, DefaultMessageServiceType is used.
49 : : */
50 : : const Message::Service DefaultMessageServiceType = Message::SafeSelfDiscard;
51 : :
52 : : /**
53 : : * CallType defines allowable messaging call types. Currently only
54 : : * one-way and two-way calls are supported. In the future, a
55 : : * multi-way call type may be added, involving a single request and
56 : : * multiple responses (from different responder). A streaming call
57 : : * may also be added, where a single request results in a stream of
58 : : * responses from a single responder.
59 : : */
60 : : enum CallType {
61 : : /** A fire and forget invocation. No return value is expected. */
62 : : OneWay,
63 : :
64 : : /** Requires a single request and a single response. */
65 : : TwoWay
66 : :
67 : : /*, MultiWay */
68 : : };
69 : :
70 : : /**
71 : : * The protocol::detail namespace defines implementation-specific
72 : : * structures that are not intended for use by library client code.
73 : : */
74 : : namespace detail {
75 : : /** An empty structure used to represent a void return type for Wisp calls. */
76 : : struct VoidReturnType { };
77 : : }
78 : :
79 : : /**
80 : : * The three-parameter CallTraits template defines all the type
81 : : * information characterizing a TwoWay call.
82 : : *
83 : : * @param CallerType The type of Caller through which the call must be made.
84 : : * @param ParameterType The parameter type required by the call.
85 : : * @param ReturnType The type returned by the call.
86 : : */
87 : : template<typename CallerType, typename ParameterType,
88 : : typename ReturnType = detail::VoidReturnType>
89 : : struct CallTraits {
90 : : /** The type of call (TwoWay). */
91 : : static const CallType call_type = TwoWay;
92 : :
93 : : /** The parameter type required by the call. */
94 : : typedef ParameterType parameter_type;
95 : :
96 : : /** The type returned by the call. */
97 : : typedef ReturnType return_type;
98 : :
99 : : /** The type of Caller through which the call must be made. */
100 : : typedef CallerType caller_type;
101 : :
102 : : /** The type of future associated with an asynchronous invocation. */
103 : : typedef typename caller_type::template Future<return_type> future_type;
104 : :
105 : : /** The type of future pointer returned by an asynchronous invocation. */
106 : : typedef typename future_type::shared_ptr future_ptr;
107 : : };
108 : :
109 : : /**
110 : : * The two-parameter CallTraits template defines all the type
111 : : * information characterizing a OneWay call.
112 : : *
113 : : * @param CallerType The type of Caller through which the call must be made.
114 : : * @param ParameterType The parameter type of the call.
115 : : */
116 : : template<typename CallerType, typename ParameterType>
117 : : struct CallTraits<CallerType, ParameterType, detail::VoidReturnType> {
118 : : /** The type of call (OneWay). */
119 : : static const CallType call_type = OneWay;
120 : :
121 : : /** The parameter type required by the call. */
122 : : typedef ParameterType parameter_type;
123 : :
124 : : /** The type returned by the call (void). */
125 : : typedef detail::VoidReturnType return_type;
126 : :
127 : : /** The type of Caller through which the call must be made. */
128 : : typedef CallerType caller_type;
129 : : };
130 : :
131 : : /**
132 : : * Defines the CallTraits for a one-way call.
133 : : *
134 : : * @param caller The type of Caller through which the call must be made.
135 : : * @param method The name of the call.
136 : : */
137 : : #define WISP_ONE_WAY_CALL(caller, method) \
138 : : typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \
139 : : Call ## method
140 : :
141 : : /**
142 : : * Defines the CallTraits for a two-way call.
143 : : *
144 : : * @param caller The type of Caller through which the call must be made.
145 : : * @param method The name of the call.
146 : : * @param result The type returned by the call.
147 : : */
148 : : #define WISP_TWO_WAY_CALL(caller, method, result) \
149 : : typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \
150 : : Call ## method
151 : :
152 : : /**
153 : : * A CallException is thrown when a call cannot complete because of an
154 : : * unexpected condition. This may occur either during a synchronous
155 : : * call or the completion of an asynchronous call via a Future.
156 : : */
157 : 0 : class CallException : public std::runtime_error {
158 : : public:
159 : 0 : explicit CallException(const std::string & message) :
160 : 0 : std::runtime_error(message)
161 : 0 : { }
162 : : };
163 : :
164 : : /**
165 : : * The type of a call token. Every call made by a Caller instance is
166 : : * identified by a unique token.
167 : : */
168 : : typedef std::uint32_t wisp_call_token;
169 : :
170 : : /**
171 : : * The type used by the CallHeader to store the CallRole of the call
172 : : * being made.
173 : : */
174 : : typedef std::uint8_t wisp_call_role;
175 : :
176 : :
177 : : /**
178 : : * The role of a call being made. This is similar to CallType, but is
179 : : * used in the CallHeader to identify the role of a message
180 : : * participating in a call.
181 : : */
182 : : enum CallRole {
183 : : /** The message is part of a OneWay call. */
184 : : OneWayRequest,
185 : :
186 : : /** The message is initiating a TwoWay call. */
187 : : TwoWayRequest,
188 : :
189 : : /** The message is completing a TwoWay call. */
190 : : TwoWayResponse
191 : :
192 : : /*, MultiWayResponse */
193 : : };
194 : :
195 : :
196 : : /**
197 : : * Every call is described by a CallHeader followed by a message. The
198 : : * CallHeader describes the message type via an id, the call it is
199 : : * participating in via a token, and the role of the message in the call.
200 : : */
201 : : struct CallHeader {
202 : : /** An identifier denoting the message type. */
203 : : wisp_message_id id;
204 : : /** A token denoting the call responsible for generating the message. */
205 : : wisp_call_token token;
206 : : /** A value indicating the role of the call (e.g., OneWayRequest). */
207 : : wisp_call_role role;
208 : : /**
209 : : * The size of the message in bytes. It is non-zero only for jumbo messages.
210 : : */
211 : : unsigned int message_size;
212 : :
213 : : /**
214 : : * Constructs a CallHeader with all members initialized to zero.
215 : : */
216 : 44 : CallHeader() : id(0), token(0), role(0), message_size(0) { }
217 : :
218 : : /**
219 : : * Constructs a CallHeader.
220 : : */
221 : 1 : CallHeader(const wisp_message_id id,
222 : : const wisp_call_token token,
223 : : const CallRole role,
224 : 1 : const unsigned int size = 0) :
225 : 1 : id(id), token(token), role(role), message_size(size) { }
226 : :
227 : : /**
228 : : * Sets all members to 0.
229 : : */
230 : 2 : void clear() {
231 : 2 : id = 0, token = 0, role = 0, message_size = 0;
232 : 2 : }
233 : :
234 : : /**
235 : : * Initializes members to specified values.
236 : : */
237 : 36 : void init(const wisp_message_id _id,
238 : : const wisp_call_token _token,
239 : : const CallRole _role,
240 : : const unsigned int _size = 0)
241 : : {
242 : 36 : id = _id, token = _token, role = _role, message_size = _size;
243 : 36 : }
244 : :
245 : : /**
246 : : * Returns true if the following message is a jumbo message fragment.
247 : : * @return true if the following message is a jumbo message fragment.
248 : : */
249 : 61 : bool is_jumbo_fragment() const {
250 : 61 : return (message_size > 0);
251 : : }
252 : :
253 : : template<class Archive>
254 : 129 : void serialize(Archive & ar, const unsigned int) {
255 : 129 : ar & id & token & role & message_size;
256 : 129 : }
257 : : };
258 : :
259 : : /**
260 : : * Stores all information associated with a received call, including the
261 : : * raw message data.
262 : : */
263 : 48 : struct MessageInfo {
264 : : /** The call header. */
265 : : CallHeader header;
266 : : /** The message that was received. */
267 : : Message message;
268 : : /** The list of groups to which the message was sent. */
269 : : GroupList groups;
270 : :
271 : : explicit
272 : 44 : MessageInfo(const unsigned int message_capacity = Message::DefaultCapacity) :
273 [ + - ]: 44 : header(), message(message_capacity), groups()
274 : 44 : { }
275 : :
276 : 26 : wisp_message_protocol protocol() const {
277 : 26 : return message.type();
278 : : }
279 : :
280 : 43 : wisp_message_id id() const {
281 : 43 : return header.id;
282 : : }
283 : :
284 : 112 : wisp_call_token token() const {
285 : 112 : return header.token;
286 : : }
287 : :
288 : 100 : wisp_call_role role() const {
289 : 100 : return header.role;
290 : : }
291 : :
292 : 86 : const std::string & sender() const {
293 : 86 : return message.sender();
294 : : }
295 : :
296 : 36 : void init_header(const wisp_message_id id,
297 : : const wisp_call_token token,
298 : : const CallRole role)
299 : : {
300 : 36 : header.init(id, token, role);
301 : 36 : }
302 : :
303 : 2 : void clear_header() {
304 : 2 : header.clear();
305 : 2 : }
306 : : };
307 : :
308 : : typedef std::shared_ptr<MessageInfo> message_info_ptr;
309 : :
310 : : const bool GroupMembershipEnable = true;
311 : : const bool GroupMembershipDisable = false;
312 : :
313 : : /**
314 : : *
315 : : */
316 : : template<typename PT = BinaryPackingTraits>
317 : 12 : class Caller {
318 : : template<typename ReturnType> friend class Future;
319 : :
320 : : public:
321 : : typedef PT packing_traits;
322 : : typedef typename packing_traits::packer_type packer_type;
323 : : typedef typename packing_traits::unpacker_type unpacker_type;
324 : :
325 : : // Don't remove this definition. It is necessary to prevent an
326 : : // unresolved MaxUnfragmentedMessageSize symbol in debug builds.
327 : : #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U
328 : : static const unsigned int MaxUnfragmentedMessageSize =
329 : : __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE;
330 : :
331 : : private:
332 : : typedef std::deque<message_info_ptr> request_queue;
333 : :
334 : : // For now, use hashed_unique, but for multiple responses will need
335 : : // hashed_non_unique.
336 : : typedef boost::multi_index_container<
337 : : message_info_ptr,
338 : : boost::multi_index::indexed_by<
339 : : boost::multi_index::hashed_unique<
340 : : boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
341 : : &MessageInfo::token> >
342 : : > > response_map;
343 : :
344 : : typedef boost::multi_index_container<
345 : : message_info_ptr,
346 : : boost::multi_index::indexed_by<
347 : : boost::multi_index::hashed_unique<
348 : : boost::multi_index::composite_key<
349 : : MessageInfo,
350 : : boost::multi_index::const_mem_fun<MessageInfo, const std::string &,
351 : : &MessageInfo::sender>,
352 : : boost::multi_index::const_mem_fun<MessageInfo, wisp_call_role,
353 : : &MessageInfo::role>,
354 : : boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
355 : : &MessageInfo::token> > >
356 : : > > jumbo_message_map;
357 : :
358 : : Mailbox _mbox;
359 : : wisp_call_token _call_token;
360 : : packer_type _packer;
361 : : unpacker_type _unpacker;
362 : :
363 : : /** @todo Should queue size be bounded? */
364 : : request_queue _requests;
365 : : response_map _responses;
366 : : jumbo_message_map _jumbo_messages;
367 : : jumbo_message_map::key_from_value _jumbo_key_from_value;
368 : :
369 : : const unsigned int _initial_message_capacity;
370 : : // scratch variable
371 : : message_info_ptr _info;
372 : : MessageInfo _send_info;
373 : :
374 : 0 : static void throwCallException(const wisp_message_protocol expected_proto,
375 : : const wisp_message_protocol actual_proto,
376 : : const wisp_message_id expected_id,
377 : : const wisp_message_id actual_id)
378 : : SSRC_DECL_THROW(CallException)
379 : : {
380 [ # # ]: 0 : std::ostringstream message;
381 [ # # # # ]: 0 : message << "Mismatched message id or protocol.\nExpected protocol: "
382 [ # # # # ]: 0 : << expected_proto << " Actual protocol: " << actual_proto
383 [ # # # # : 0 : << "\nExpected id: " << expected_id
# # ]
384 [ # # # # ]: 0 : << " Actual id: " << actual_id << std::endl;
385 [ # # # # ]: 0 : throw CallException(message.str());
386 : : }
387 : :
388 : 0 : static void throwCallException(const wisp_call_token expected_token,
389 : : const wisp_call_token actual_token)
390 : : SSRC_DECL_THROW(CallException)
391 : : {
392 [ # # ]: 0 : std::ostringstream message;
393 [ # # # # ]: 0 : message << "Mismatched call token.\nExpected token: "
394 [ # # # # : 0 : << expected_token << " Actual token: " << actual_token
# # ]
395 : : << std::endl;
396 [ # # # # ]: 0 : throw CallException(message.str());
397 : : }
398 : :
399 : 13 : void reset_receive_info() {
400 [ + - ]: 13 : _info = make_smart_ptr<message_info_ptr>(std::min(_info->message.capacity(), __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE));
401 : 13 : }
402 : :
403 : 8 : void push_request() {
404 : 8 : _requests.push_back(_info);
405 : 8 : reset_receive_info();
406 : 8 : }
407 : :
408 : 2 : void insert_response() {
409 : 2 : _responses.insert(_info);
410 : 2 : reset_receive_info();
411 : 2 : }
412 : :
413 : : enum JumboFragmentResult {
414 : : JumboFragmentFirstFragment,
415 : : JumboFragmentNextFragment,
416 : : JumboFragmentCompleteRequest,
417 : : JumboFragmentCompleteResponse
418 : : };
419 : :
420 : 28 : JumboFragmentResult insert_jumbo_fragment(const message_info_ptr & msginfo,
421 : : const unsigned int header_size)
422 : : {
423 : 28 : JumboFragmentResult result = JumboFragmentNextFragment;
424 : 28 : Message & message = msginfo->message;
425 : : jumbo_message_map::iterator it =
426 [ + - + - ]: 28 : _jumbo_messages.find(_jumbo_key_from_value(msginfo));
427 : :
428 [ + + # # ]: 28 : if(it == _jumbo_messages.end()) {
429 : : // Pre-allocate memory for remaining fragments.
430 [ + - ]: 8 : message_info_ptr info(make_smart_ptr<message_info_ptr>(msginfo->header.message_size));
431 [ + - + - ]: 4 : message.seek(message.size());
432 [ + - ]: 4 : *info = *msginfo;
433 [ + - ]: 4 : _jumbo_messages.insert(info);
434 : 4 : result = JumboFragmentFirstFragment;
435 : : } else {
436 [ + - ]: 24 : const message_info_ptr & info = *it;
437 : :
438 [ + - + - ]: 24 : info->message.write(&message[header_size], message.size()-header_size);
439 : :
440 [ + + ]: 24 : if(info->message.offset() >= info->header.message_size) {
441 [ + - ]: 3 : info->message.seek(header_size);
442 [ + + ]: 3 : if(info->role() == TwoWayResponse) {
443 [ + - ]: 1 : _responses.insert(info);
444 : 1 : result = JumboFragmentCompleteResponse;
445 : : } else {
446 [ + - ]: 2 : _requests.push_back(info);
447 : 2 : result = JumboFragmentCompleteRequest;
448 : : }
449 [ + - ]: 3 : _jumbo_messages.erase(it);
450 : : }
451 : : }
452 : 28 : return result;
453 : : }
454 : :
455 : : // Receives call responses, but not requests and membership messages
456 : : template<typename MessageType>
457 : 10 : void receive_response(MessageType & msg, const wisp_call_token token)
458 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
459 : : CallException)
460 : : {
461 : : begin_receive_response:
462 [ + - + - ]: 10 : typename response_map::iterator it(_responses.find(token));
463 : :
464 [ + + + + : 10 : if(it == _responses.end()) {
+ - ]
465 [ - + + + ]: 7 : if(!_info.unique()) {
466 [ # # + - ]: 2 : reset_receive_info();
467 : : }
468 : : do {
469 : 43 : Message & message = _info->message;
470 : :
471 [ + - + - ]: 43 : _mbox.receive(message);
472 [ + - + - ]: 43 : _mbox.copy_groups(_info->groups);
473 : :
474 [ + - + + ]: 43 : if(message.is_regular()) {
475 : 41 : CallHeader & header = _info->header;
476 : :
477 [ + - + - ]: 41 : message.rewind();
478 : :
479 [ + - + - ]: 41 : const unsigned int header_size = _unpacker.unpack(header, message);
480 : :
481 [ - + + + ]: 41 : if(header.is_jumbo_fragment()) {
482 : : const JumboFragmentResult jumbo_result =
483 [ # # + - ]: 28 : insert_jumbo_fragment(_info, header_size);
484 [ # # + + ]: 28 : if(jumbo_result == JumboFragmentCompleteResponse) {
485 : 1 : goto begin_receive_response;
486 : : }
487 : : } else {
488 [ + - + + ]: 13 : if(header.role == TwoWayResponse) {
489 [ + - + + ]: 7 : if(header.token == token)
490 : 6 : break;
491 : : else
492 [ # # + - ]: 1 : insert_response();
493 : : } else
494 [ # # + - ]: 6 : push_request();
495 : : }
496 : : } else {
497 : 2 : _info->clear_header();
498 [ # # + - ]: 2 : push_request();
499 : 36 : }
500 : : } while(true);
501 [ + - + - ]: 6 : unpack(msg, *_info);
502 : : } else {
503 [ + - + - : 3 : unpack(msg, *(*it));
+ - + - ]
504 [ + - + - ]: 3 : _responses.erase(it);
505 : : }
506 : 9 : }
507 : :
508 : : // We assume integers wrap around to min at max.
509 : 24 : wisp_call_token next_token() {
510 : 24 : return _call_token++;
511 : : }
512 : :
513 : : template<typename DestinationType, typename MessageType>
514 : 36 : void send(const DestinationType & dest,
515 : : const MessageType & msg,
516 : : const wisp_call_token token,
517 : : const CallRole role,
518 : : const Message::Service service)
519 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
520 : : {
521 : 36 : bool done_packing(false);
522 : :
523 : 36 : _send_info.init_header(msg.id, token, role);
524 : :
525 : : // We increase the message buffer size by a factor of two and try
526 : : // again when the write area is exhausted. We don't try to
527 : : // prevent an infinite loop because the message we are writing is
528 : : // finite in size; therefore the loop must terminate. See WISP-11
529 : : // for more information.
530 : :
531 : : #define __WISP_WRITE_AREA_EXHAUSTED "write area exhausted"
532 : :
533 : 27 : do {
534 : : try {
535 [ + - + - : 63 : _send_info.message.rewind();
+ - + - +
- + - ]
536 [ + - + - : 63 : _packer.pack(_send_info.header, _send_info.message);
+ - + - +
- + - ]
537 [ + - + - : 63 : _packer.pack(msg, _send_info.message);
+ - + - +
- + + ]
538 : 36 : done_packing = true;
539 : 54 : } catch(const std::ios_base::failure & e) {
540 : 27 : constexpr std::size_t len = sizeof(__WISP_WRITE_AREA_EXHAUSTED) - 1;
541 [ # # # # : 27 : if(std::strncmp(e.what(), __WISP_WRITE_AREA_EXHAUSTED, len) == 0) {
# # # # #
# + - ]
542 [ # # # # : 27 : _send_info.message.resize(2*_send_info.message.capacity());
# # # # #
# + - ]
543 : : } else {
544 : 0 : throw;
545 : : }
546 : : }
547 [ - + - + : 63 : } while(!done_packing);
- + - + -
+ + + ]
548 : :
549 : : #undef __WISP_WRITE_AREA_EXHAUSTED
550 : :
551 : 36 : _send_info.message.set_type(msg.protocol);
552 : 36 : _send_info.message.set_service(service);
553 : :
554 [ + - + - : 36 : if(_send_info.message.size() <= MaxUnfragmentedMessageSize) {
+ - + - +
- - + ]
555 : 33 : _mbox.send(_send_info.message, dest);
556 : : } else {
557 : : const unsigned int message_size =
558 [ # # # # : 3 : _send_info.header.message_size = _send_info.message.size();
# # # # #
# + - ]
559 [ # # # # : 3 : _send_info.message.rewind();
# # # # #
# + - ]
560 : :
561 : : const unsigned int header_size =
562 [ # # # # : 3 : _packer.pack(_send_info.header, _send_info.message);
# # # # #
# + - ]
563 : : const unsigned int max_fragment_size =
564 : 3 : MaxUnfragmentedMessageSize - header_size;
565 : :
566 : : // We never fragment a header, so there's no risk of
567 : : // header_size equaling message_size..
568 : 30 : for(unsigned int offset = header_size, size = 0;
569 [ # # # # : 30 : offset < message_size; offset+=size)
# # # # #
# + + ]
570 : : {
571 : 27 : size = std::min(max_fragment_size, message_size - offset);
572 : 27 : _mbox.clear_message_parts();
573 [ # # # # : 27 : _mbox.add_message_part(&_send_info.message[0], header_size);
# # # # #
# + - ]
574 [ # # # # : 27 : _mbox.add_message_part(&_send_info.message[offset], size);
# # # # #
# + - ]
575 [ # # # # : 27 : _mbox.send(dest, msg.protocol, service);
# # # # #
# + - ]
576 : : }
577 : 3 : _mbox.clear_message_parts();
578 : : }
579 [ # # # # : 36 : }
# # # # #
# - + ]
580 : :
581 : : public:
582 : : // Using jumbo_message_map::key_type doesn't work because it's a composite
583 : : // key that stores a reference, so we have to use a compatible key.
584 : : typedef
585 : : std::tuple<std::string, wisp_call_role, wisp_call_token>
586 : : jumbo_message_key_type;
587 : :
588 : : template<typename ReturnType>
589 : : class Future {
590 : : friend class Caller;
591 : :
592 : : bool _valid;
593 : : wisp_call_token _token;
594 : : Caller & _caller;
595 : :
596 : 13 : Future(Caller & caller, const wisp_call_token token) :
597 : 13 : _valid(true), _token(token), _caller(caller)
598 : 13 : { }
599 : :
600 : : public:
601 : : typedef ReturnType return_type;
602 : : typedef std::shared_ptr<Future> shared_ptr;
603 : :
604 : : // Note, you can't sit in a while(!future.ready()) loop because
605 : : // there's no receiver thread.
606 : 2 : bool ready() const {
607 : 2 : return _caller.returned(_token);
608 : : }
609 : :
610 : 22 : bool valid() const {
611 : 22 : return _valid;
612 : : }
613 : :
614 : 15 : wisp_call_token token() const {
615 : 15 : return _token;
616 : : }
617 : :
618 : 8 : void receive(return_type & result)
619 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
620 : : CallException)
621 : : {
622 [ - + - + ]: 8 : if(!valid())
623 [ # # # # : 0 : throw CallException("Invalid future.");
# # # # ]
624 : :
625 : 8 : _caller.receive_response(result, _token);
626 : :
627 : 8 : _valid = false;
628 : 8 : }
629 : :
630 : : // Exclusively for supporting continuations in Service
631 : 3 : void unpack(return_type & result, MessageInfo & info)
632 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
633 : : CallException)
634 : : {
635 [ - + ]: 3 : if(!valid())
636 [ # # # # ]: 0 : throw CallException("Invalid future.");
637 : :
638 [ + - ]: 3 : if(info.token() == _token)
639 : 3 : _caller.unpack(result, info);
640 : : else
641 : 0 : throwCallException(_token, info.token());
642 : :
643 : 3 : _valid = false;
644 : 3 : }
645 : : };
646 : :
647 : : explicit
648 : 12 : Caller(const std::string & connection = "",
649 : : const std::string & name = "",
650 : : const unsigned int message_capacity = Message::DefaultCapacity,
651 : : const bool group_membership = GroupMembershipDisable) :
652 : : _mbox(connection, name, group_membership), _call_token(0),
653 : : _packer(), _unpacker(), _requests(), _responses(),
654 : : _jumbo_messages(),
655 : 12 : _jumbo_key_from_value(_jumbo_messages.key_extractor()),
656 : : _initial_message_capacity(message_capacity),
657 : : _info(make_smart_ptr<message_info_ptr>(std::min(message_capacity, __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE))),
658 [ + - + - : 24 : _send_info(message_capacity)
+ - + - +
- + - + -
+ - + - ]
659 : 12 : { }
660 : :
661 : : #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE
662 : :
663 : : // Not intended to be subclassed outside of Wisp.
664 : : //virtual ~Caller() = default;
665 : :
666 : : // We expose this for allowing retrieval of group information.
667 : 13 : const Mailbox & mbox() {
668 : 13 : return _mbox;
669 : : }
670 : :
671 : 2 : packer_type & packer() {
672 : 2 : return _packer;
673 : : }
674 : :
675 : 1 : unpacker_type & unpacker() {
676 : 1 : return _unpacker;
677 : : }
678 : :
679 : 3 : wisp_call_token call_token() {
680 : 3 : return _call_token;
681 : : }
682 : :
683 : 34 : const std::string & name() const {
684 : 34 : return _mbox.private_group();
685 : : }
686 : :
687 : : bool group_membership() const {
688 : : return _mbox.group_membership();
689 : : }
690 : :
691 : 1 : void join(const std::string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
692 : 1 : _mbox.join(group);
693 : 1 : }
694 : :
695 : 1 : void leave(const std::string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
696 : 1 : _mbox.leave(group);
697 : 1 : }
698 : :
699 : 1 : unsigned int message_capacity_initial() const {
700 : 1 : return _initial_message_capacity;
701 : : }
702 : :
703 : 3 : unsigned int message_capacity_receive() const {
704 : 3 : return _info->message.capacity();
705 : : }
706 : :
707 : 1 : unsigned int message_capacity_send() const {
708 : 1 : return _send_info.message.capacity();
709 : : }
710 : :
711 : : unsigned int request_queue_size() const {
712 : : return _requests.size();
713 : : }
714 : :
715 : : unsigned int response_map_size() const {
716 : : return _responses.size();
717 : : }
718 : :
719 : 2 : bool returned(const wisp_call_token token) const {
720 [ + - + - ]: 2 : return (_responses.find(token) != _responses.end());
721 : : }
722 : :
723 : 3 : unsigned int count_jumbo_messages() const {
724 : 3 : return _jumbo_messages.size();
725 : : }
726 : :
727 : : template<typename key_container>
728 : 1 : void collect_jumbo_message_keys(key_container & container) {
729 [ + - + + : 4 : for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(),
+ - # ]
730 : 1 : end = _jumbo_messages.end(); it != end; ++it)
731 : : {
732 [ + - ]: 1 : const MessageInfo *info = it->get();
733 [ + - + - ]: 1 : container.insert(container.end(), jumbo_message_key_type(info->sender(), info->role(), info->token()));
734 : : }
735 : 1 : }
736 : :
737 : : template<typename iterator_type>
738 : 2 : void erase_jumbo_messages(const iterator_type & begin,
739 : : const iterator_type & end)
740 : : {
741 : : // You can't erase a compatible key, so we have to perform a lookup first
742 [ + + ]: 4 : for(iterator_type it = begin; it != end; ++it) {
743 [ + - ]: 2 : jumbo_message_map::iterator msg = _jumbo_messages.find(*it);
744 [ + - + + ]: 2 : if(msg != _jumbo_messages.end())
745 [ + - ]: 1 : _jumbo_messages.erase(msg);
746 : : }
747 : 2 : }
748 : :
749 : : // Call only after unpacking CallHeader so message offset is correct.
750 : : template<typename MessageType>
751 : 20 : void unpack(MessageType & msg, MessageInfo & info)
752 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
753 : : CallException)
754 : : {
755 [ + - + - : 40 : if(info.id() == MessageType::id &&
+ - + - +
- + - + -
+ - + - ]
756 : 20 : info.protocol() == MessageType::protocol)
757 : 20 : _unpacker.unpack(msg, info.message);
758 : : else
759 : 0 : throwCallException(MessageType::protocol, info.protocol(),
760 : 0 : MessageType::id, info.id());
761 : 20 : }
762 : :
763 : : /**
764 : : * Assumes _info.unique() == true. We do not check _info.unique()
765 : : * for performance reasons (unique() may acquire thread mutexes).
766 : : * This function is intended to be called only by an event
767 : : * dispatcher inside an event loop in a single thread.
768 : : */
769 : 17 : void receive(message_info_ptr & info)
770 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
771 : : {
772 : : // Drain responses first, then requests.
773 [ - + ]: 17 : if(!_responses.empty()) {
774 : 0 : response_map::iterator it = _responses.begin();
775 [ # # ]: 0 : info = *it;
776 [ # # ]: 0 : _responses.erase(it);
777 [ + + ]: 17 : } else if(!_requests.empty()) {
778 : 1 : info = _requests.front();
779 : 1 : _requests.pop_front();
780 : : } else {
781 : : do {
782 : 16 : Message & message = _info->message;
783 : :
784 : 16 : _mbox.receive(message);
785 : 16 : _mbox.copy_groups(_info->groups);
786 : :
787 [ + - ]: 16 : if(message.is_regular()) {
788 : 16 : CallHeader & header = _info->header;
789 : :
790 : 16 : message.rewind();
791 : :
792 : 16 : const unsigned int header_size = _unpacker.unpack(header, message);
793 : :
794 [ - + ]: 16 : if(header.is_jumbo_fragment()) {
795 : : const JumboFragmentResult jumbo_result =
796 : 0 : insert_jumbo_fragment(_info, header_size);
797 [ # # ]: 0 : if(jumbo_result == JumboFragmentCompleteResponse) {
798 : 0 : response_map::iterator it = _responses.begin();
799 [ # # ]: 0 : info = *it;
800 [ # # ]: 0 : _responses.erase(it);
801 : 0 : return;
802 [ # # ]: 0 : } else if(jumbo_result == JumboFragmentCompleteRequest) {
803 : 0 : info = _requests.front();
804 : 0 : _requests.pop_front();
805 : 0 : return;
806 : : }
807 : : } else
808 : 16 : break;
809 : : } else {
810 : 0 : _info->clear_header();
811 : 0 : break;
812 : 0 : }
813 : : } while(true);
814 : 16 : info = _info;
815 : : }
816 : : }
817 : :
818 : : // Also implement timeout.
819 : : // Receives requests and membership messages, but not call responses
820 : 12 : void receive_request(message_info_ptr & info)
821 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
822 : : {
823 [ + + ]: 12 : if(!_requests.empty()) {
824 : 9 : info = _requests.front();
825 : 9 : _requests.pop_front();
826 : : } else {
827 [ + + ]: 3 : if(!_info.unique()) {
828 : 1 : reset_receive_info();
829 : : }
830 : : do {
831 : 4 : Message & message = _info->message;
832 : :
833 : 4 : _mbox.receive(message);
834 : 4 : _mbox.copy_groups(_info->groups);
835 : :
836 [ + - ]: 4 : if(message.is_regular()) {
837 : 4 : CallHeader & header = _info->header;
838 : :
839 : 4 : message.rewind();
840 : :
841 : 4 : const unsigned int header_size = _unpacker.unpack(header, message);
842 : :
843 [ - + ]: 4 : if(header.is_jumbo_fragment()) {
844 : : const JumboFragmentResult jumbo_result =
845 : 0 : insert_jumbo_fragment(_info, header_size);
846 [ # # ]: 0 : if(jumbo_result == JumboFragmentCompleteRequest) {
847 : 0 : info = _requests.front();
848 : 0 : _requests.pop_front();
849 : 0 : return;
850 : : }
851 : : } else {
852 [ + + ]: 4 : if(header.role == TwoWayResponse)
853 : 1 : insert_response();
854 : : else
855 : 3 : break;
856 : : }
857 : : } else {
858 : 0 : _info->clear_header();
859 : 0 : break;
860 : 1 : }
861 : : } while(true);
862 : 3 : info = _info;
863 : : }
864 : : }
865 : :
866 : : template<typename Traits, typename DestinationType>
867 : 10 : void send(const DestinationType & dest,
868 : : const typename Traits::parameter_type & param,
869 : : const Message::Service service = DefaultMessageServiceType)
870 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
871 : : {
872 : : static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
873 : 10 : send(dest, param, next_token(), OneWayRequest, service);
874 : 10 : }
875 : :
876 : : template<typename Traits, typename DestinationType>
877 : 12 : void reply(const DestinationType & dest,
878 : : const wisp_call_token token,
879 : : const typename Traits::parameter_type & param,
880 : : const Message::Service service = DefaultMessageServiceType)
881 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
882 : : {
883 : : static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
884 : 12 : send(dest, param, token, TwoWayResponse, service);
885 : 12 : }
886 : :
887 : : // Synchronous call.
888 : : template<typename Traits, typename DestinationType>
889 : 1 : void call(const DestinationType & dest,
890 : : typename Traits::return_type *ret,
891 : : const typename Traits::parameter_type & param,
892 : : const Message::Service service = DefaultMessageServiceType)
893 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
894 : : CallException)
895 : : {
896 : : static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
897 : 1 : wisp_call_token token = next_token();
898 : 1 : send(dest, param, token, TwoWayRequest, service);
899 : 1 : receive_response(*ret, token);
900 : 1 : }
901 : :
902 : : // Asynchronous call.
903 : : template<typename Traits, typename DestinationType>
904 : : typename Future<typename Traits::return_type>::shared_ptr
905 : 13 : call(const DestinationType & dest,
906 : : const typename Traits::parameter_type & param,
907 : : const Message::Service service = DefaultMessageServiceType)
908 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
909 : : {
910 : : static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
911 : : typedef Future<typename Traits::return_type> future_type;
912 : 13 : wisp_call_token token = next_token();
913 : :
914 : 13 : send(dest, param, token, TwoWayRequest, service);
915 : :
916 : : // Can't use make_shared here because of private constructor.
917 : : //return std::make_shared<future_type>(*this, token);
918 : 13 : return typename future_type::shared_ptr(new future_type(*this, token));
919 : : }
920 : :
921 : : private:
922 : : template<CallType call_type> struct CallTag { };
923 : :
924 : : template<typename Traits, typename DestinationType>
925 : : typename Future<typename Traits::return_type>::shared_ptr
926 : 3 : _call(const CallTag<OneWay> &,
927 : : const DestinationType & dest,
928 : : const typename Traits::parameter_type & param,
929 : : const Message::Service service = DefaultMessageServiceType)
930 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
931 : : {
932 : : typedef Future<typename Traits::return_type> future_type;
933 : 3 : send<Traits>(dest, param, service);
934 : 3 : return typename future_type::shared_ptr();
935 : : }
936 : :
937 : : template<typename Traits, typename DestinationType>
938 : : typename Future<typename Traits::return_type>::shared_ptr
939 : 11 : _call(const CallTag<TwoWay> &,
940 : : const DestinationType & dest,
941 : : const typename Traits::parameter_type & param,
942 : : const Message::Service service = DefaultMessageServiceType)
943 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
944 : : {
945 : 11 : return call<Traits>(dest, param, service);
946 : : }
947 : :
948 : : public:
949 : :
950 : : template<typename Traits, typename DestinationType>
951 : : void operator()(const Traits &,
952 : : const DestinationType & dest,
953 : : const wisp_call_token token,
954 : : const typename Traits::parameter_type & param,
955 : : const Message::Service service =
956 : : DefaultMessageServiceType)
957 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
958 : : {
959 : : reply<Traits>(dest, token, param, service);
960 : : }
961 : :
962 : : template<typename Traits, typename DestinationType>
963 : : void operator()(const Traits &,
964 : : const DestinationType & dest,
965 : : typename Traits::return_type *ret,
966 : : const typename Traits::parameter_type & param,
967 : : const Message::Service service =
968 : : DefaultMessageServiceType)
969 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
970 : : CallException)
971 : : {
972 : : call<Traits>(dest, ret, param, service);
973 : : }
974 : :
975 : : template<typename Traits, typename DestinationType>
976 : : typename Future<typename Traits::return_type>::shared_ptr
977 : : operator()(const Traits &,
978 : : const DestinationType & dest,
979 : : const typename Traits::parameter_type & param,
980 : : const Message::Service service = DefaultMessageServiceType)
981 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
982 : : {
983 : : return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service);
984 : : }
985 : :
986 : :
987 : : // Convenience functions that invoke parameter_type constructor for
988 : : // you. We permit up to WISP_CALLER_MAX_PARAMETERS - 1
989 : : // parameter_type arguments. Any more, and you should probably use
990 : : // a different approach.
991 : :
992 : : // Send.
993 : :
994 : : template<typename Traits, typename DestinationType>
995 : 3 : void send(const DestinationType & dest,
996 : : const Message::Service service = DefaultMessageServiceType)
997 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
998 : : {
999 [ + - ]: 3 : send<Traits>(dest, typename Traits::parameter_type(), service);
1000 : 3 : }
1001 : :
1002 : : template<typename Traits, typename DestinationType, typename... P>
1003 : 2 : void send(const Message::Service service,
1004 : : const DestinationType & dest,
1005 : : P && ...p)
1006 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1007 : : {
1008 [ + - + - : 2 : send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...),
+ - ]
1009 : : service);
1010 : 2 : }
1011 : :
1012 : : template<typename Traits, typename DestinationType, typename... P>
1013 : : void sendp(const DestinationType & dest, P && ...p)
1014 : :
1015 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1016 : : {
1017 : : send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...));
1018 : : }
1019 : :
1020 : : // Reply.
1021 : :
1022 : : template<typename Traits, typename DestinationType>
1023 : : void reply(const DestinationType & dest,
1024 : : const wisp_call_token token,
1025 : : const Message::Service service = DefaultMessageServiceType)
1026 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1027 : : {
1028 : : reply<Traits>(dest, token, typename Traits::parameter_type(), service);
1029 : : }
1030 : :
1031 : : template<typename Traits, typename DestinationType>
1032 : 8 : void operator()(const Traits &,
1033 : : const DestinationType & dest,
1034 : : const wisp_call_token token,
1035 : : const Message::Service service =
1036 : : DefaultMessageServiceType)
1037 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1038 : : {
1039 [ + - ]: 8 : reply<Traits>(dest, token, typename Traits::parameter_type(), service);
1040 : 8 : }
1041 : :
1042 : : template<typename Traits, typename DestinationType, typename... P>
1043 : 3 : void reply(const Message::Service service,
1044 : : const DestinationType & dest,
1045 : : const wisp_call_token token,
1046 : : P && ...p)
1047 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1048 : : {
1049 [ + - ]: 3 : reply<Traits>(dest, token,
1050 : 3 : typename Traits::parameter_type(std::forward<P>(p)...),
1051 : : service);
1052 : 3 : }
1053 : :
1054 : : template<typename Traits, typename DestinationType, typename... P>
1055 : : void replyp(const DestinationType & dest,
1056 : : const wisp_call_token token,
1057 : : P && ...p)
1058 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1059 : : {
1060 : : reply<Traits>(dest, token,
1061 : : typename Traits::parameter_type(std::forward<P>(p)...));
1062 : : }
1063 : :
1064 : : // Synchronous call.
1065 : : template<typename Traits, typename DestinationType>
1066 : : void call(const DestinationType & dest,
1067 : : typename Traits::return_type *ret,
1068 : : const Message::Service service = DefaultMessageServiceType)
1069 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1070 : : CallException)
1071 : : {
1072 : : call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1073 : : }
1074 : :
1075 : : template<typename Traits, typename DestinationType>
1076 : : void operator()(const Traits &,
1077 : : const DestinationType & dest,
1078 : : typename Traits::return_type *ret,
1079 : : const Message::Service service = DefaultMessageServiceType)
1080 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1081 : : CallException)
1082 : : {
1083 : : call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1084 : : }
1085 : :
1086 : : template<typename Traits, typename DestinationType, typename... P>
1087 : : void call(const Message::Service service,
1088 : : const DestinationType & dest,
1089 : : typename Traits::return_type *ret,
1090 : : P && ...p)
1091 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1092 : : {
1093 : : call<Traits>(dest, ret,
1094 : : typename Traits::parameter_type(std::forward<P>(p)...),
1095 : : service);
1096 : : }
1097 : :
1098 : : template<typename Traits, typename DestinationType, typename... P>
1099 : 1 : void callp(const DestinationType & dest,
1100 : : typename Traits::return_type *ret,
1101 : : P && ...p)
1102 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1103 : : {
1104 [ + - ]: 1 : call<Traits>(dest, ret,
1105 : 1 : typename Traits::parameter_type(std::forward<P>(p)...));
1106 : 1 : }
1107 : :
1108 : : // Asynchronous call.
1109 : : template<typename Traits, typename DestinationType>
1110 : : typename Future<typename Traits::return_type>::shared_ptr
1111 : : call(const DestinationType & dest,
1112 : : const Message::Service service = DefaultMessageServiceType)
1113 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1114 : : {
1115 : : return call<Traits>(dest, typename Traits::parameter_type(), service);
1116 : : }
1117 : :
1118 : : template<typename Traits, typename DestinationType, typename... P>
1119 : : typename Future<typename Traits::return_type>::shared_ptr
1120 : : call(const Message::Service service, const DestinationType & dest,
1121 : : P && ...p)
1122 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1123 : : {
1124 : : return call<Traits>(dest,
1125 : : typename Traits::parameter_type(std::forward<P>(p)...),
1126 : : service);
1127 : : }
1128 : :
1129 : : template<typename Traits, typename DestinationType, typename... P>
1130 : : typename Future<typename Traits::return_type>::shared_ptr
1131 : : callp(const DestinationType & dest, P && ...p)
1132 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1133 : : {
1134 : : return call<Traits>(dest,
1135 : : typename Traits::parameter_type(std::forward<P>(p)...));
1136 : : }
1137 : :
1138 : : // Dual-purpose operator
1139 : :
1140 : : template<typename Traits, typename DestinationType>
1141 : : typename Future<typename Traits::return_type>::shared_ptr
1142 : 14 : operator()(const Traits &,
1143 : : const DestinationType & dest,
1144 : : const Message::Service service = DefaultMessageServiceType)
1145 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1146 : : {
1147 : : return
1148 : : _call<Traits>(CallTag<Traits::call_type>(),
1149 [ + - + - : 14 : dest, typename Traits::parameter_type(), service);
+ - ]
1150 : : }
1151 : : };
1152 : :
1153 : : __END_NS_SSRC_WISP_PROTOCOL
1154 : :
1155 : : #endif
|