Branch data Line data Source code
1 : : /*
2 : : * Copyright 2006-2008 Savarese Software Research Corporation
3 : : *
4 : : * Licensed under the Apache License, Version 2.0 (the "License");
5 : : * you may not use this file except in compliance with the License.
6 : : * You may obtain a copy of the License at
7 : : *
8 : : * http://www.savarese.com/software/ApacheLicense-2.0
9 : : *
10 : : * Unless required by applicable law or agreed to in writing, software
11 : : * distributed under the License is distributed on an "AS IS" BASIS,
12 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 : : * See the License for the specific language governing permissions and
14 : : * limitations under the License.
15 : : */
16 : :
17 : : /**
18 : : * @file
19 : : * This header defines the Caller class and Wisp communication
20 : : * protocol support classes.
21 : : */
22 : :
23 : : #ifndef __SSRC_WISP_PROTOCOL_CALLER_H
24 : : #define __SSRC_WISP_PROTOCOL_CALLER_H
25 : :
26 : : #include <ssrc/spread.h>
27 : : #include <ssrc/wisp/protocol/ServiceProtocol.h>
28 : :
29 : : #include <queue>
30 : : #include <sstream>
31 : : #include <stdexcept>
32 : :
33 : : #include <boost/shared_ptr.hpp>
34 : : #include <boost/multi_index_container.hpp>
35 : : #include <boost/multi_index/mem_fun.hpp>
36 : : #include <boost/multi_index/hashed_index.hpp>
37 : : #include <boost/multi_index/composite_key.hpp>
38 : :
39 : : __BEGIN_NS_SSRC_WISP_PROTOCOL
40 : :
41 : : using std::string;
42 : : using NS_SSRC_SPREAD::GroupList;
43 : : using NS_SSRC_SPREAD::Message;
44 : : using NS_SSRC_SPREAD::Mailbox;
45 : :
46 : : /**
47 : : * Wisp calls can request a Spread message service type if desired. If
48 : : * not specified, DefaultMessageServiceType is used.
49 : : */
50 : : const Message::Service DefaultMessageServiceType = Message::SafeSelfDiscard;
51 : :
52 : : /**
53 : : * CallType defines allowable messaging call types. Currently only
54 : : * one-way and two-way calls are supported. In the future, a
55 : : * multi-way call type may be added, involving a single request and
56 : : * multiple responses (from different responder). A streaming call
57 : : * may also be added, where a single request results in a stream of
58 : : * responses from a single responder.
59 : : */
60 : : enum CallType {
61 : : /** A fire and forget invocation. No return value is expected. */
62 : : OneWay,
63 : :
64 : : /** Requires a single request and a single response. */
65 : : TwoWay
66 : :
67 : : /*, MultiWay */
68 : : };
69 : :
70 : : /**
71 : : * The protocol::detail namespace defines implementation-specific
72 : : * structures that are not intended for use by library client code.
73 : : */
74 : : namespace detail {
75 : : /** An empty structure used to represent a void return type for Wisp calls. */
76 : : struct VoidReturnType { };
77 : : }
78 : :
79 : : /**
80 : : * The three-parameter CallTraits template defines all the type
81 : : * information characterizing a TwoWay call.
82 : : *
83 : : * @param CallerType The type of Caller through which the call must be made.
84 : : * @param ParameterType The parameter type required by the call.
85 : : * @param ReturnType The type returned by the call.
86 : : */
87 : : template<typename CallerType, typename ParameterType,
88 : : typename ReturnType = detail::VoidReturnType>
89 : : struct CallTraits {
90 : : /** The type of call (TwoWay). */
91 : : static const CallType call_type = TwoWay;
92 : :
93 : : /** The parameter type required by the call. */
94 : : typedef ParameterType parameter_type;
95 : :
96 : : /** The type returned by the call. */
97 : : typedef ReturnType return_type;
98 : :
99 : : /** The type of Caller through which the call must be made. */
100 : : typedef CallerType caller_type;
101 : :
102 : : /** The type of future associated with an asynchronous invocation. */
103 : : typedef typename caller_type::template Future<return_type> future_type;
104 : :
105 : : /** The type of future pointer returned by an asynchronous invocation. */
106 : : typedef typename future_type::shared_ptr future_ptr;
107 : : };
108 : :
109 : : /**
110 : : * The two-parameter CallTraits template defines all the type
111 : : * information characterizing a OneWay call.
112 : : *
113 : : * @param CallerType The type of Caller through which the call must be made.
114 : : * @param ParameterType The parameter type of the call.
115 : : */
116 : : template<typename CallerType, typename ParameterType>
117 : : struct CallTraits<CallerType, ParameterType, detail::VoidReturnType> {
118 : : /** The type of call (OneWay). */
119 : : static const CallType call_type = OneWay;
120 : :
121 : : /** The parameter type required by the call. */
122 : : typedef ParameterType parameter_type;
123 : :
124 : : /** The type returned by the call (void). */
125 : : typedef detail::VoidReturnType return_type;
126 : :
127 : : /** The type of Caller through which the call must be made. */
128 : : typedef CallerType caller_type;
129 : : };
130 : :
131 : : /**
132 : : * Defines the CallTraits for a one-way call.
133 : : *
134 : : * @param caller The type of Caller through which the call must be made.
135 : : * @param method The name of the call.
136 : : */
137 : : #define WISP_ONE_WAY_CALL(caller, method) \
138 : : typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \
139 : : Call ## method
140 : :
141 : : /**
142 : : * Defines the CallTraits for a two-way call.
143 : : *
144 : : * @param caller The type of Caller through which the call must be made.
145 : : * @param method The name of the call.
146 : : * @param result The type returned by the call.
147 : : */
148 : : #define WISP_TWO_WAY_CALL(caller, method, result) \
149 : : typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \
150 : : Call ## method
151 : :
152 : : /**
153 : : * A CallException is thrown when a call cannot complete because of an
154 : : * unexpected condition. This may occur either during a synchronous
155 : : * call or the completion of an asynchronous call via a Future.
156 : : */
157 [ # # ]: 0 : class CallException : public std::runtime_error {
158 : : public:
159 : 0 : explicit CallException(const string & message) :
160 : 0 : std::runtime_error(message)
161 : 0 : { }
162 : : };
163 : :
164 : : /**
165 : : * The type of a call token. Every call made by a Caller instance is
166 : : * identified by a unique token.
167 : : */
168 : : typedef std::uint32_t wisp_call_token;
169 : :
170 : : /**
171 : : * The type used by the CallHeader to store the CallRole of the call
172 : : * being made.
173 : : */
174 : : typedef std::uint8_t wisp_call_role;
175 : :
176 : :
177 : : /**
178 : : * The role of a call being made. This is similar to CallType, but is
179 : : * used in the CallHeader to identify the role of a message
180 : : * participating in a call.
181 : : */
182 : : enum CallRole {
183 : : /** The message is part of a OneWay call. */
184 : : OneWayRequest,
185 : :
186 : : /** The message is initiating a TwoWay call. */
187 : : TwoWayRequest,
188 : :
189 : : /** The message is completing a TwoWay call. */
190 : : TwoWayResponse
191 : :
192 : : /*, MultiWayResponse */
193 : : };
194 : :
195 : :
196 : : /**
197 : : * Every call is described by a CallHeader followed by a message. The
198 : : * CallHeader describes the message type via an id, the call it is
199 : : * participating in via a token, and the role of the message in the call.
200 : : */
201 : : struct CallHeader {
202 : : /** An identifier denoting the message type. */
203 : : wisp_message_id id;
204 : : /** A token denoting the call responsible for generating the message. */
205 : : wisp_call_token token;
206 : : /** A value indicating the role of the call (e.g., OneWayRequest). */
207 : : wisp_call_role role;
208 : : /**
209 : : * The size of the message in bytes. It is non-zero only for jumbo messages.
210 : : */
211 : : unsigned int message_size;
212 : :
213 : : /**
214 : : * Constructs a CallHeader with all members initialized to zero.
215 : : */
216 : 44 : CallHeader() : id(0), token(0), role(0), message_size(0) { }
217 : :
218 : : /**
219 : : * Constructs a CallHeader.
220 : : */
221 : 1 : CallHeader(const wisp_message_id id,
222 : : const wisp_call_token token,
223 : : const CallRole role,
224 : : const unsigned int size = 0) :
225 : 1 : id(id), token(token), role(role), message_size(size) { }
226 : :
227 : : /**
228 : : * Sets all members to 0.
229 : : */
230 : 2 : void clear() {
231 : 2 : id = 0, token = 0, role = 0, message_size = 0;
232 : 2 : }
233 : :
234 : : /**
235 : : * Initializes members to specified values.
236 : : */
237 : 36 : void init(const wisp_message_id _id,
238 : : const wisp_call_token _token,
239 : : const CallRole _role,
240 : : const unsigned int _size = 0)
241 : : {
242 : 36 : id = _id, token = _token, role = _role, message_size = _size;
243 : 36 : }
244 : :
245 : : /**
246 : : * Returns true if the following message is a jumbo message fragment.
247 : : * @return true if the following message is a jumbo message fragment.
248 : : */
249 : 61 : bool is_jumbo_fragment() const {
250 : 61 : return (message_size > 0);
251 : : }
252 : :
253 : : template<class Archive>
254 : 102 : void serialize(Archive & ar, const unsigned int) {
255 : 102 : ar & id & token & role & message_size;
256 : 102 : }
257 : : };
258 : :
259 : : /**
260 : : * Stores all information associated with a received call, including the
261 : : * raw message data.
262 : : */
263 [ + - ]: 48 : struct MessageInfo {
264 : : /** The call header. */
265 : : CallHeader header;
266 : : /** The message that was received. */
267 : : Message message;
268 : : /** The list of groups to which the message was sent. */
269 : : GroupList groups;
270 : :
271 : : explicit
272 : 44 : MessageInfo(const unsigned int message_capacity = Message::DefaultCapacity) :
273 [ + - ]: 44 : header(), message(message_capacity), groups()
274 : 44 : { }
275 : :
276 : 26 : wisp_message_protocol protocol() const {
277 : 26 : return message.type();
278 : : }
279 : :
280 : 43 : wisp_message_id id() const {
281 : 43 : return header.id;
282 : : }
283 : :
284 : 112 : wisp_call_token token() const {
285 : 112 : return header.token;
286 : : }
287 : :
288 : 102 : wisp_call_role role() const {
289 : 102 : return header.role;
290 : : }
291 : :
292 : 86 : const string & sender() const {
293 : 86 : return message.sender();
294 : : }
295 : :
296 : 36 : void init_header(const wisp_message_id id,
297 : : const wisp_call_token token,
298 : : const CallRole role)
299 : : {
300 : 36 : header.init(id, token, role);
301 : 36 : }
302 : :
303 : 2 : void clear_header() {
304 : 2 : header.clear();
305 : 2 : }
306 : : };
307 : :
308 : : typedef boost::shared_ptr<MessageInfo> message_info_ptr;
309 : :
310 : : const bool GroupMembershipEnable = true;
311 : : const bool GroupMembershipDisable = false;
312 : :
313 : : /**
314 : : *
315 : : */
316 : : template<typename PT = BinaryPackingTraits>
317 [ + - ][ + - ]: 12 : class Caller {
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
318 : : template<typename ReturnType> friend class Future;
319 : :
320 : : public:
321 : : typedef PT packing_traits;
322 : : typedef typename packing_traits::packer_type packer_type;
323 : : typedef typename packing_traits::unpacker_type unpacker_type;
324 : :
325 : : // Don't remove this definition. It is necessary to prevent an
326 : : // unresolved MaxUnfragmentedMessageSize symbol in debug builds.
327 : : #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U
328 : : static const unsigned int MaxUnfragmentedMessageSize =
329 : : __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE;
330 : :
331 : : private:
332 : : typedef std::deque<message_info_ptr> request_queue;
333 : :
334 : : // For now, use hashed_unique, but for multiple responses will need
335 : : // hashed_non_unique.
336 : : typedef boost::multi_index_container<
337 : : message_info_ptr,
338 : : boost::multi_index::indexed_by<
339 : : boost::multi_index::hashed_unique<
340 : : boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
341 : : &MessageInfo::token> >
342 : : > > response_map;
343 : :
344 : : typedef boost::multi_index_container<
345 : : message_info_ptr,
346 : : boost::multi_index::indexed_by<
347 : : boost::multi_index::hashed_unique<
348 : : boost::multi_index::composite_key<
349 : : MessageInfo,
350 : : boost::multi_index::const_mem_fun<MessageInfo, const string &,
351 : : &MessageInfo::sender>,
352 : : boost::multi_index::const_mem_fun<MessageInfo, wisp_call_role,
353 : : &MessageInfo::role>,
354 : : boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
355 : : &MessageInfo::token> > >
356 : : > > jumbo_message_map;
357 : :
358 : : Mailbox _mbox;
359 : : wisp_call_token _call_token;
360 : : packer_type _packer;
361 : : unpacker_type _unpacker;
362 : :
363 : : /** @todo Should queue size be bounded? */
364 : : request_queue _requests;
365 : : response_map _responses;
366 : : jumbo_message_map _jumbo_messages;
367 : : jumbo_message_map::key_from_value _jumbo_key_from_value;
368 : :
369 : : const unsigned int _initial_message_capacity;
370 : : // scratch variable
371 : : message_info_ptr _info;
372 : : MessageInfo _send_info;
373 : :
374 : 0 : static void throwCallException(const wisp_message_protocol expected_proto,
375 : : const wisp_message_protocol actual_proto,
376 : : const wisp_message_id expected_id,
377 : : const wisp_message_id actual_id)
378 : : SSRC_DECL_THROW(CallException)
379 : : {
380 : 0 : std::ostringstream message;
381 [ # # ][ # # ]: 0 : message << "Mismatched message id or protocol.\nExpected protocol: "
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
382 : : << expected_proto << " Actual protocol: " << actual_proto
383 : : << "\nExpected id: " << expected_id
384 : : << " Actual id: " << actual_id << std::endl;
385 [ # # ][ # # ]: 0 : throw CallException(message.str());
386 : : }
387 : :
388 : 0 : static void throwCallException(const wisp_call_token expected_token,
389 : : const wisp_call_token actual_token)
390 : : SSRC_DECL_THROW(CallException)
391 : : {
392 : 0 : std::ostringstream message;
393 [ # # ][ # # ]: 0 : message << "Mismatched call token.\nExpected token: "
[ # # ][ # # ]
[ # # ]
394 : : << expected_token << " Actual token: " << actual_token
395 : : << std::endl;
396 [ # # ][ # # ]: 0 : throw CallException(message.str());
397 : : }
398 : :
399 : 13 : void reset_receive_info() {
400 [ + - ]: 13 : _info.reset(new MessageInfo(std::min(_info->message.capacity(), __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE)));
401 : 13 : }
402 : :
403 : 8 : void push_request() {
404 : 8 : _requests.push_back(_info);
405 : 8 : reset_receive_info();
406 : 8 : }
407 : :
408 : 2 : void insert_response() {
409 : 2 : _responses.insert(_info);
410 : 2 : reset_receive_info();
411 : 2 : }
412 : :
413 : : enum JumboFragmentResult {
414 : : JumboFragmentFirstFragment,
415 : : JumboFragmentNextFragment,
416 : : JumboFragmentCompleteRequest,
417 : : JumboFragmentCompleteResponse
418 : : };
419 : :
420 : 28 : JumboFragmentResult insert_jumbo_fragment(const message_info_ptr & msginfo,
421 : : const unsigned int header_size)
422 : : {
423 : 28 : JumboFragmentResult result = JumboFragmentNextFragment;
424 : 28 : Message & message = msginfo->message;
425 : : jumbo_message_map::iterator it =
426 : 28 : _jumbo_messages.find(_jumbo_key_from_value(msginfo));
427 : :
428 [ + + ]: 28 : if(it == _jumbo_messages.end()) {
429 : : // Pre-allocate memory for remaining fragments.
430 [ + - ][ + - ]: 8 : message_info_ptr info(new MessageInfo(msginfo->header.message_size));
431 [ + - ][ + - ]: 4 : message.seek(message.size());
432 [ + - ]: 4 : *info = *msginfo;
433 [ + - ]: 4 : _jumbo_messages.insert(info);
434 : 4 : result = JumboFragmentFirstFragment;
435 : : } else {
436 : 24 : const message_info_ptr & info = *it;
437 : :
438 : 24 : info->message.write(&message[header_size], message.size()-header_size);
439 : :
440 [ + + ]: 24 : if(info->message.offset() >= info->header.message_size) {
441 : 3 : info->message.seek(header_size);
442 [ + + ]: 3 : if(info->role() == TwoWayResponse) {
443 : 1 : _responses.insert(info);
444 : 1 : result = JumboFragmentCompleteResponse;
445 : : } else {
446 : 2 : _requests.push_back(info);
447 : 2 : result = JumboFragmentCompleteRequest;
448 : : }
449 : 3 : _jumbo_messages.erase(it);
450 : : }
451 : : }
452 : 28 : return result;
453 : : }
454 : :
455 : : // Receives call responses, but not requests and membership messages
456 : : template<typename MessageType>
457 : 10 : void receive_response(MessageType & msg, const wisp_call_token token)
458 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
459 : : CallException)
460 : : {
461 : : begin_receive_response:
462 : 10 : typename response_map::iterator it(_responses.find(token));
463 : :
464 [ + + + + ]: 10 : if(it == _responses.end()) {
465 [ - + ][ + + ]: 7 : if(!_info.unique()) {
466 : 2 : reset_receive_info();
467 : : }
468 : 36 : do {
469 : 43 : Message & message = _info->message;
470 : :
471 : 43 : _mbox.receive(message);
472 : 43 : _mbox.copy_groups(_info->groups);
473 : :
474 [ + - + + ]: 43 : if(message.is_regular()) {
475 : 41 : CallHeader & header = _info->header;
476 : :
477 : 41 : message.rewind();
478 : :
479 : 41 : const unsigned int header_size = _unpacker.unpack(header, message);
480 : :
481 [ - + + + ]: 41 : if(header.is_jumbo_fragment()) {
482 : : const JumboFragmentResult jumbo_result =
483 : 28 : insert_jumbo_fragment(_info, header_size);
484 [ # # + + ]: 28 : if(jumbo_result == JumboFragmentCompleteResponse) {
485 : 1 : goto begin_receive_response;
486 : : }
487 : : } else {
488 [ + - ][ + + ]: 13 : if(header.role == TwoWayResponse) {
489 [ + - ][ + + ]: 7 : if(header.token == token)
490 : 6 : break;
491 : : else
492 : 1 : insert_response();
493 : : } else
494 : 6 : push_request();
495 : : }
496 : : } else {
497 : 2 : _info->clear_header();
498 : 2 : push_request();
499 : : }
500 : : } while(true);
501 : 6 : unpack(msg, *_info);
502 : : } else {
503 : 3 : unpack(msg, *(*it));
504 : 3 : _responses.erase(it);
505 : : }
506 : 9 : }
507 : :
508 : : // We assume integers wrap around to min at max.
509 : 24 : wisp_call_token next_token() {
510 : 24 : return _call_token++;
511 : : }
512 : :
513 : : template<typename DestinationType, typename MessageType>
514 : 36 : void send(const DestinationType & dest,
515 : : const MessageType & msg,
516 : : const wisp_call_token token,
517 : : const CallRole role,
518 : : const Message::Service service)
519 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
520 : : {
521 : : unsigned int message_size;
522 : :
523 : 36 : _send_info.init_header(msg.id, token, role);
524 : 36 : _send_info.message.rewind();
525 : 36 : _packer.pack(_send_info.header, _send_info.message);
526 : 36 : _packer.pack(msg, _send_info.message);
527 : 36 : _send_info.message.set_type(msg.protocol);
528 : 36 : _send_info.message.set_service(service);
529 : :
530 [ + - + - : 36 : if((message_size = _send_info.message.size())
+ - + - +
- - + ]
531 : : <= MaxUnfragmentedMessageSize)
532 : 33 : _mbox.send(_send_info.message, dest);
533 : : else {
534 : 3 : _send_info.header.message_size = message_size;
535 : 3 : _send_info.message.rewind();
536 : :
537 : : const unsigned int header_size =
538 : 3 : _packer.pack(_send_info.header, _send_info.message);
539 : : const unsigned int max_fragment_size =
540 : 3 : MaxUnfragmentedMessageSize - header_size;
541 : :
542 : : // We never fragment a header, so there's no risk of
543 : : // header_size equaling message_size..
544 [ # # ][ # # ]: 30 : for(unsigned int offset = header_size, size = 0;
[ # # ][ # # ]
[ # # ][ + + ]
545 : : offset < message_size; offset+=size)
546 : : {
547 : 27 : size = std::min(max_fragment_size, message_size - offset);
548 : 27 : _mbox.clear_message_parts();
549 : 27 : _mbox.add_message_part(&_send_info.message[0], header_size);
550 : 27 : _mbox.add_message_part(&_send_info.message[offset], size);
551 : 27 : _mbox.send(dest, msg.protocol, service);
552 : : }
553 : 3 : _mbox.clear_message_parts();
554 : : }
555 : 36 : }
556 : :
557 : : public:
558 : : // Using jumbo_message_map::key_type doesn't work because it's a composite
559 : : // key that stores a reference, so we have to use a compatible key.
560 : : typedef
561 : : boost::tuple<string, wisp_call_role, wisp_call_token> jumbo_message_key_type;
562 : :
563 : : template<typename ReturnType>
564 : : class Future {
565 : : friend class Caller;
566 : :
567 : : bool _valid;
568 : : wisp_call_token _token;
569 : : Caller & _caller;
570 : :
571 : 13 : Future(Caller & caller, const wisp_call_token token) :
572 : 13 : _valid(true), _token(token), _caller(caller)
573 : 13 : { }
574 : :
575 : : public:
576 : : typedef ReturnType return_type;
577 : : typedef boost::shared_ptr<Future> shared_ptr;
578 : :
579 : : // Note, you can't sit in a while(!future.ready()) loop because
580 : : // there's no receiver thread.
581 : 2 : bool ready() const {
582 : 2 : return _caller.returned(_token);
583 : : }
584 : :
585 : 22 : bool valid() const {
586 : 22 : return _valid;
587 : : }
588 : :
589 : 15 : wisp_call_token token() const {
590 : 15 : return _token;
591 : : }
592 : :
593 : 8 : void receive(return_type & result)
594 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
595 : : CallException)
596 : : {
597 [ - + ][ - + ]: 8 : if(!valid())
598 [ # # ][ # # ]: 0 : throw CallException("Invalid future.");
[ # # ][ # # ]
599 : :
600 : 8 : _caller.receive_response(result, _token);
601 : :
602 : 8 : _valid = false;
603 : 8 : }
604 : :
605 : : // Exclusively for supporting continuations in Service
606 : 3 : void unpack(return_type & result, MessageInfo & info)
607 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
608 : : CallException)
609 : : {
610 [ - + ]: 3 : if(!valid())
611 [ # # ][ # # ]: 0 : throw CallException("Invalid future.");
612 : :
613 [ + - ]: 3 : if(info.token() == _token)
614 : 3 : _caller.unpack(result, info);
615 : : else
616 : 0 : throwCallException(_token, info.token());
617 : :
618 : 3 : _valid = false;
619 : 3 : }
620 : : };
621 : :
622 : : explicit
623 : 12 : Caller(const string & connection = "",
624 : : const string & name = "",
625 : : const unsigned int message_capacity = Message::DefaultCapacity,
626 : : const bool group_membership = GroupMembershipDisable) :
627 : : _mbox(connection, name, group_membership), _call_token(0),
628 : : _packer(), _unpacker(), _requests(), _responses(),
629 : : _jumbo_messages(),
630 : : _jumbo_key_from_value(_jumbo_messages.key_extractor()),
631 : : _initial_message_capacity(message_capacity),
632 : : _info(new MessageInfo(std::min(message_capacity, __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE))),
633 [ + - ][ + - ]: 12 : _send_info(message_capacity)
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
634 : 12 : { }
635 : :
636 : : #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE
637 : :
638 : : // Not intended to be subclassed outside of Wisp.
639 : : //virtual ~Caller() { }
640 : :
641 : : // We expose this for allowing retrieval of group information.
642 : 21 : const Mailbox & mbox() {
643 : 21 : return _mbox;
644 : : }
645 : :
646 : 2 : packer_type & packer() {
647 : 2 : return _packer;
648 : : }
649 : :
650 : 1 : unpacker_type & unpacker() {
651 : 1 : return _unpacker;
652 : : }
653 : :
654 : 3 : wisp_call_token call_token() {
655 : 3 : return _call_token;
656 : : }
657 : :
658 : 34 : const string & name() const {
659 : 34 : return _mbox.private_group();
660 : : }
661 : :
662 : : bool group_membership() const {
663 : : return _mbox.group_membership();
664 : : }
665 : :
666 : 1 : void join(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
667 : 1 : _mbox.join(group);
668 : 1 : }
669 : :
670 : 1 : void leave(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
671 : 1 : _mbox.leave(group);
672 : 1 : }
673 : :
674 : : unsigned int initial_message_capacity() const {
675 : : return _initial_message_capacity;
676 : : }
677 : :
678 : 3 : unsigned int message_capacity() const {
679 : 3 : return _info->message.capacity();
680 : : }
681 : :
682 : : unsigned int request_queue_size() const {
683 : : return _requests.size();
684 : : }
685 : :
686 : : unsigned int response_map_size() const {
687 : : return _responses.size();
688 : : }
689 : :
690 : 2 : bool returned(const wisp_call_token token) const {
691 : 2 : return (_responses.find(token) != _responses.end());
692 : : }
693 : :
694 : 3 : unsigned int count_jumbo_messages() const {
695 : 3 : return _jumbo_messages.size();
696 : : }
697 : :
698 : : template<typename key_container>
699 : 1 : void collect_jumbo_message_keys(key_container & container) {
700 [ + + ]: 3 : for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(),
701 : 1 : end = _jumbo_messages.end(); it != end; ++it)
702 : : {
703 : 1 : const MessageInfo *info = it->get();
704 [ + - ][ + - ]: 1 : container.insert(container.end(), jumbo_message_key_type(info->sender(), info->role(), info->token()));
705 : : }
706 : 1 : }
707 : :
708 : : template<typename iterator_type>
709 : 2 : void erase_jumbo_messages(const iterator_type & begin,
710 : : const iterator_type & end)
711 : : {
712 : : // You can't erase a compatible key, so we have to perform a lookup first
713 [ + + ]: 4 : for(iterator_type it = begin; it != end; ++it) {
714 : 2 : jumbo_message_map::iterator msg = _jumbo_messages.find(*it);
715 [ + + ]: 2 : if(msg != _jumbo_messages.end())
716 : 1 : _jumbo_messages.erase(msg);
717 : : }
718 : 2 : }
719 : :
720 : : // Call only after unpacking CallHeader so message offset is correct.
721 : : template<typename MessageType>
722 : 20 : void unpack(MessageType & msg, MessageInfo & info)
723 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
724 : : CallException)
725 : : {
726 [ + - ][ + - ]: 20 : if(info.id() == MessageType::id &&
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
727 : : info.protocol() == MessageType::protocol)
728 : 20 : _unpacker.unpack(msg, info.message);
729 : : else
730 : 0 : throwCallException(MessageType::protocol, info.protocol(),
731 : : MessageType::id, info.id());
732 : 20 : }
733 : :
734 : : /**
735 : : * Assumes _info.unique() == true. We do not check _info.unique()
736 : : * for performance reasons (unique() may acquire thread mutexes).
737 : : * This function is intended to be called only by an event
738 : : * dispatcher inside an event loop in a single thread.
739 : : */
740 : 17 : void receive(message_info_ptr & info)
741 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
742 : : {
743 : : // Drain responses first, then requests.
744 [ - + ]: 17 : if(!_responses.empty()) {
745 : 0 : response_map::iterator it = _responses.begin();
746 : 0 : info = *it;
747 : 0 : _responses.erase(it);
748 [ + + ]: 17 : } else if(!_requests.empty()) {
749 : 1 : info = _requests.front();
750 : 1 : _requests.pop_front();
751 : : } else {
752 : 0 : do {
753 : 16 : Message & message = _info->message;
754 : :
755 : 16 : _mbox.receive(message);
756 : 16 : _mbox.copy_groups(_info->groups);
757 : :
758 [ + - ]: 16 : if(message.is_regular()) {
759 : 16 : CallHeader & header = _info->header;
760 : :
761 : 16 : message.rewind();
762 : :
763 : 16 : const unsigned int header_size = _unpacker.unpack(header, message);
764 : :
765 [ - + ]: 16 : if(header.is_jumbo_fragment()) {
766 : : const JumboFragmentResult jumbo_result =
767 : 0 : insert_jumbo_fragment(_info, header_size);
768 [ # # ]: 0 : if(jumbo_result == JumboFragmentCompleteResponse) {
769 : 0 : response_map::iterator it = _responses.begin();
770 : 0 : info = *it;
771 : 0 : _responses.erase(it);
772 : 0 : return;
773 [ # # ]: 0 : } else if(jumbo_result == JumboFragmentCompleteRequest) {
774 : 0 : info = _requests.front();
775 : 0 : _requests.pop_front();
776 : 0 : return;
777 : : }
778 : : } else
779 : 16 : break;
780 : : } else {
781 : 0 : _info->clear_header();
782 : 0 : break;
783 : : }
784 : : } while(true);
785 : 17 : info = _info;
786 : : }
787 : : }
788 : :
789 : : // Also implement timeout.
790 : : // Receives requests and membership messages, but not call responses
791 : 12 : void receive_request(message_info_ptr & info)
792 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
793 : : {
794 [ + + ]: 12 : if(!_requests.empty()) {
795 : 9 : info = _requests.front();
796 : 9 : _requests.pop_front();
797 : : } else {
798 [ + + ]: 3 : if(!_info.unique()) {
799 : 1 : reset_receive_info();
800 : : }
801 : 1 : do {
802 : 4 : Message & message = _info->message;
803 : :
804 : 4 : _mbox.receive(message);
805 : 4 : _mbox.copy_groups(_info->groups);
806 : :
807 [ + - ]: 4 : if(message.is_regular()) {
808 : 4 : CallHeader & header = _info->header;
809 : :
810 : 4 : message.rewind();
811 : :
812 : 4 : const unsigned int header_size = _unpacker.unpack(header, message);
813 : :
814 [ - + ]: 4 : if(header.is_jumbo_fragment()) {
815 : : const JumboFragmentResult jumbo_result =
816 : 0 : insert_jumbo_fragment(_info, header_size);
817 [ # # ]: 0 : if(jumbo_result == JumboFragmentCompleteRequest) {
818 : 0 : info = _requests.front();
819 : 0 : _requests.pop_front();
820 : 12 : return;
821 : : }
822 : : } else {
823 [ + + ]: 4 : if(header.role == TwoWayResponse)
824 : 1 : insert_response();
825 : : else
826 : 3 : break;
827 : : }
828 : : } else {
829 : 0 : _info->clear_header();
830 : 0 : break;
831 : : }
832 : : } while(true);
833 : 3 : info = _info;
834 : : }
835 : : }
836 : :
837 : : template<typename Traits, typename DestinationType>
838 : 10 : void send(const DestinationType & dest,
839 : : const typename Traits::parameter_type & param,
840 : : const Message::Service service = DefaultMessageServiceType)
841 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
842 : : {
843 : : static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
844 : 10 : send(dest, param, next_token(), OneWayRequest, service);
845 : 10 : }
846 : :
847 : : template<typename Traits, typename DestinationType>
848 : 12 : void reply(const DestinationType & dest,
849 : : const wisp_call_token token,
850 : : const typename Traits::parameter_type & param,
851 : : const Message::Service service = DefaultMessageServiceType)
852 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
853 : : {
854 : : static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
855 : 12 : send(dest, param, token, TwoWayResponse, service);
856 : 12 : }
857 : :
858 : : // Synchronous call.
859 : : template<typename Traits, typename DestinationType>
860 : 1 : void call(const DestinationType & dest,
861 : : typename Traits::return_type *ret,
862 : : const typename Traits::parameter_type & param,
863 : : const Message::Service service = DefaultMessageServiceType)
864 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
865 : : CallException)
866 : : {
867 : : static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
868 : 1 : wisp_call_token token = next_token();
869 : 1 : send(dest, param, token, TwoWayRequest, service);
870 : 1 : receive_response(*ret, token);
871 : 1 : }
872 : :
873 : : // Asynchronous call.
874 : : template<typename Traits, typename DestinationType>
875 : : typename Future<typename Traits::return_type>::shared_ptr
876 : 13 : call(const DestinationType & dest,
877 : : const typename Traits::parameter_type & param,
878 : : const Message::Service service = DefaultMessageServiceType)
879 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
880 : : {
881 : : static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
882 : : typedef Future<typename Traits::return_type> future_type;
883 : 13 : wisp_call_token token = next_token();
884 : :
885 : 13 : send(dest, param, token, TwoWayRequest, service);
886 : :
887 : 13 : return typename future_type::shared_ptr(new future_type(*this, token));
888 : : }
889 : :
890 : : private:
891 : : template<CallType call_type> struct CallTag { };
892 : :
893 : : template<typename Traits, typename DestinationType>
894 : : typename Future<typename Traits::return_type>::shared_ptr
895 : 3 : _call(const CallTag<OneWay> &,
896 : : const DestinationType & dest,
897 : : const typename Traits::parameter_type & param,
898 : : const Message::Service service = DefaultMessageServiceType)
899 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
900 : : {
901 : : typedef Future<typename Traits::return_type> future_type;
902 : 3 : send<Traits>(dest, param, service);
903 : 3 : return typename future_type::shared_ptr();
904 : : }
905 : :
906 : : template<typename Traits, typename DestinationType>
907 : : typename Future<typename Traits::return_type>::shared_ptr
908 : 11 : _call(const CallTag<TwoWay> &,
909 : : const DestinationType & dest,
910 : : const typename Traits::parameter_type & param,
911 : : const Message::Service service = DefaultMessageServiceType)
912 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
913 : : {
914 : 11 : return call<Traits>(dest, param, service);
915 : : }
916 : :
917 : : public:
918 : :
919 : : template<typename Traits, typename DestinationType>
920 : : void operator()(const Traits &,
921 : : const DestinationType & dest,
922 : : const wisp_call_token token,
923 : : const typename Traits::parameter_type & param,
924 : : const Message::Service service =
925 : : DefaultMessageServiceType)
926 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
927 : : {
928 : : reply<Traits>(dest, token, param, service);
929 : : }
930 : :
931 : : template<typename Traits, typename DestinationType>
932 : : void operator()(const Traits &,
933 : : const DestinationType & dest,
934 : : typename Traits::return_type *ret,
935 : : const typename Traits::parameter_type & param,
936 : : const Message::Service service =
937 : : DefaultMessageServiceType)
938 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
939 : : CallException)
940 : : {
941 : : call<Traits>(dest, ret, param, service);
942 : : }
943 : :
944 : : template<typename Traits, typename DestinationType>
945 : : typename Future<typename Traits::return_type>::shared_ptr
946 : : operator()(const Traits &,
947 : : const DestinationType & dest,
948 : : const typename Traits::parameter_type & param,
949 : : const Message::Service service = DefaultMessageServiceType)
950 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
951 : : {
952 : : return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service);
953 : : }
954 : :
955 : :
956 : : // Convenience functions that invoke parameter_type constructor for
957 : : // you. We permit up to WISP_CALLER_MAX_PARAMETERS - 1
958 : : // parameter_type arguments. Any more, and you should probably use
959 : : // a different approach.
960 : :
961 : : // Send.
962 : :
963 : : template<typename Traits, typename DestinationType>
964 : 3 : void send(const DestinationType & dest,
965 : : const Message::Service service = DefaultMessageServiceType)
966 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
967 : : {
968 : 3 : send<Traits>(dest, typename Traits::parameter_type(), service);
969 : 3 : }
970 : :
971 : : template<typename Traits, typename DestinationType, typename... P>
972 : 2 : void send(const Message::Service service,
973 : : const DestinationType & dest,
974 : : P && ...p)
975 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
976 : : {
977 [ + - ][ + - ]: 2 : send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...),
[ + - ][ + - ]
[ + - ][ + - ]
978 : : service);
979 : 2 : }
980 : :
981 : : template<typename Traits, typename DestinationType, typename... P>
982 : : void sendp(const DestinationType & dest, P && ...p)
983 : :
984 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
985 : : {
986 : : send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...));
987 : : }
988 : :
989 : : // Reply.
990 : :
991 : : template<typename Traits, typename DestinationType>
992 : : void reply(const DestinationType & dest,
993 : : const wisp_call_token token,
994 : : const Message::Service service = DefaultMessageServiceType)
995 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
996 : : {
997 : : reply<Traits>(dest, token, typename Traits::parameter_type(), service);
998 : : }
999 : :
1000 : : template<typename Traits, typename DestinationType>
1001 : 8 : void operator()(const Traits &,
1002 : : const DestinationType & dest,
1003 : : const wisp_call_token token,
1004 : : const Message::Service service =
1005 : : DefaultMessageServiceType)
1006 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1007 : : {
1008 : 8 : reply<Traits>(dest, token, typename Traits::parameter_type(), service);
1009 : 8 : }
1010 : :
1011 : : template<typename Traits, typename DestinationType, typename... P>
1012 : 3 : void reply(const Message::Service service,
1013 : : const DestinationType & dest,
1014 : : const wisp_call_token token,
1015 : : P && ...p)
1016 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1017 : : {
1018 [ + - ]: 3 : reply<Traits>(dest, token,
1019 : : typename Traits::parameter_type(std::forward<P>(p)...),
1020 : : service);
1021 : 3 : }
1022 : :
1023 : : template<typename Traits, typename DestinationType, typename... P>
1024 : : void replyp(const DestinationType & dest,
1025 : : const wisp_call_token token,
1026 : : P && ...p)
1027 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1028 : : {
1029 : : reply<Traits>(dest, token,
1030 : : typename Traits::parameter_type(std::forward<P>(p)...));
1031 : : }
1032 : :
1033 : : // Synchronous call.
1034 : : template<typename Traits, typename DestinationType>
1035 : : void call(const DestinationType & dest,
1036 : : typename Traits::return_type *ret,
1037 : : const Message::Service service = DefaultMessageServiceType)
1038 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1039 : : CallException)
1040 : : {
1041 : : call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1042 : : }
1043 : :
1044 : : template<typename Traits, typename DestinationType>
1045 : : void operator()(const Traits &,
1046 : : const DestinationType & dest,
1047 : : typename Traits::return_type *ret,
1048 : : const Message::Service service = DefaultMessageServiceType)
1049 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1050 : : CallException)
1051 : : {
1052 : : call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1053 : : }
1054 : :
1055 : : template<typename Traits, typename DestinationType, typename... P>
1056 : : void call(const Message::Service service,
1057 : : const DestinationType & dest,
1058 : : typename Traits::return_type *ret,
1059 : : P && ...p)
1060 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1061 : : {
1062 : : call<Traits>(dest, ret,
1063 : : typename Traits::parameter_type(std::forward<P>(p)...),
1064 : : service);
1065 : : }
1066 : :
1067 : : template<typename Traits, typename DestinationType, typename... P>
1068 : 1 : void callp(const DestinationType & dest,
1069 : : typename Traits::return_type *ret,
1070 : : P && ...p)
1071 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1072 : : {
1073 [ + - ]: 1 : call<Traits>(dest, ret,
1074 : : typename Traits::parameter_type(std::forward<P>(p)...));
1075 : 1 : }
1076 : :
1077 : : // Asynchronous call.
1078 : : template<typename Traits, typename DestinationType>
1079 : : typename Future<typename Traits::return_type>::shared_ptr
1080 : : call(const DestinationType & dest,
1081 : : const Message::Service service = DefaultMessageServiceType)
1082 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1083 : : {
1084 : : return call<Traits>(dest, typename Traits::parameter_type(), service);
1085 : : }
1086 : :
1087 : : template<typename Traits, typename DestinationType, typename... P>
1088 : : typename Future<typename Traits::return_type>::shared_ptr
1089 : : call(const Message::Service service, const DestinationType & dest,
1090 : : P && ...p)
1091 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1092 : : {
1093 : : return call<Traits>(dest,
1094 : : typename Traits::parameter_type(std::forward<P>(p)...),
1095 : : service);
1096 : : }
1097 : :
1098 : : template<typename Traits, typename DestinationType, typename... P>
1099 : : typename Future<typename Traits::return_type>::shared_ptr
1100 : : callp(const DestinationType & dest, P && ...p)
1101 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1102 : : {
1103 : : return call<Traits>(dest,
1104 : : typename Traits::parameter_type(std::forward<P>(p)...));
1105 : : }
1106 : :
1107 : : // Dual-purpose operator
1108 : :
1109 : : template<typename Traits, typename DestinationType>
1110 : : typename Future<typename Traits::return_type>::shared_ptr
1111 : 14 : operator()(const Traits &,
1112 : : const DestinationType & dest,
1113 : : const Message::Service service = DefaultMessageServiceType)
1114 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1115 : : {
1116 : : return
1117 : : _call<Traits>(CallTag<Traits::call_type>(),
1118 : 14 : dest, typename Traits::parameter_type(), service);
1119 : : }
1120 : : };
1121 : :
1122 : : __END_NS_SSRC_WISP_PROTOCOL
1123 : :
1124 : : #endif
|