Savarese Software Research Corporation
Caller.h
Go to the documentation of this file.
1 /* Copyright 2006-2013 Savarese Software Research Corporation.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.savarese.com/software/ApacheLicense-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
22 #ifndef __SSRC_WISP_PROTOCOL_CALLER_H
23 #define __SSRC_WISP_PROTOCOL_CALLER_H
24 
25 #include <ssrc/spread.h>
27 
28 #include <queue>
29 #include <sstream>
30 #include <stdexcept>
31 
32 #include <boost/shared_ptr.hpp>
33 #include <boost/multi_index_container.hpp>
34 #include <boost/multi_index/mem_fun.hpp>
35 #include <boost/multi_index/hashed_index.hpp>
36 #include <boost/multi_index/composite_key.hpp>
37 
39 
40 using std::string;
41 using NS_SSRC_SPREAD::GroupList;
42 using NS_SSRC_SPREAD::Message;
43 using NS_SSRC_SPREAD::Mailbox;
44 
49 const Message::Service DefaultMessageServiceType = Message::SafeSelfDiscard;
50 
59 enum CallType {
62 
65 
66  /*, MultiWay */
67 };
68 
73 namespace detail {
75  struct VoidReturnType { };
76 }
77 
86 template<typename CallerType, typename ParameterType,
87  typename ReturnType = detail::VoidReturnType>
88 struct CallTraits {
90  static const CallType call_type = TwoWay;
91 
93  typedef ParameterType parameter_type;
94 
96  typedef ReturnType return_type;
97 
99  typedef CallerType caller_type;
100 
102  typedef typename caller_type::template Future<return_type> future_type;
103 
105  typedef typename future_type::shared_ptr future_ptr;
106 };
107 
115 template<typename CallerType, typename ParameterType>
116 struct CallTraits<CallerType, ParameterType, detail::VoidReturnType> {
118  static const CallType call_type = OneWay;
119 
121  typedef ParameterType parameter_type;
122 
125 
127  typedef CallerType caller_type;
128 };
129 
136 #define WISP_ONE_WAY_CALL(caller, method) \
137  typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \
138  Call ## method
139 
147 #define WISP_TWO_WAY_CALL(caller, method, result) \
148  typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \
149  Call ## method
150 
156 class CallException : public std::runtime_error {
157 public:
158  explicit CallException(const string & message) :
159  std::runtime_error(message)
160  { }
161 };
162 
167 typedef std::uint32_t wisp_call_token;
168 
173 typedef std::uint8_t wisp_call_role;
174 
175 
181 enum CallRole {
184 
187 
190 
191  /*, MultiWayResponse */
192 };
193 
194 
200 struct CallHeader {
210  unsigned int message_size;
211 
215  CallHeader() : id(0), token(0), role(0), message_size(0) { }
216 
221  const wisp_call_token token,
222  const CallRole role,
223  const unsigned int size = 0) :
224  id(id), token(token), role(role), message_size(size) { }
225 
229  void clear() {
230  id = 0, token = 0, role = 0, message_size = 0;
231  }
232 
236  void init(const wisp_message_id _id,
237  const wisp_call_token _token,
238  const CallRole _role,
239  const unsigned int _size = 0)
240  {
241  id = _id, token = _token, role = _role, message_size = _size;
242  }
243 
248  bool is_jumbo_fragment() const {
249  return (message_size > 0);
250  }
251 
252  template<class Archive>
253  void serialize(Archive & ar, const unsigned int) {
254  ar & id & token & role & message_size;
255  }
256 };
257 
262 struct MessageInfo {
266  Message message;
268  GroupList groups;
269 
270  explicit
271  MessageInfo(const unsigned int message_capacity = Message::DefaultCapacity) :
272  header(), message(message_capacity), groups()
273  { }
274 
276  return message.type();
277  }
278 
279  wisp_message_id id() const {
280  return header.id;
281  }
282 
284  return header.token;
285  }
286 
287  wisp_call_role role() const {
288  return header.role;
289  }
290 
291  const string & sender() const {
292  return message.sender();
293  }
294 
295  void init_header(const wisp_message_id id,
296  const wisp_call_token token,
297  const CallRole role)
298  {
299  header.init(id, token, role);
300  }
301 
302  void clear_header() {
303  header.clear();
304  }
305 };
306 
307 typedef boost::shared_ptr<MessageInfo> message_info_ptr;
308 
309 const bool GroupMembershipEnable = true;
310 const bool GroupMembershipDisable = false;
311 
315 template<typename PT = BinaryPackingTraits>
316 class Caller {
317  template<typename ReturnType> friend class Future;
318 
319 public:
320  typedef PT packing_traits;
321  typedef typename packing_traits::packer_type packer_type;
322  typedef typename packing_traits::unpacker_type unpacker_type;
323 
324  // Don't remove this definition. It is necessary to prevent an
325  // unresolved MaxUnfragmentedMessageSize symbol in debug builds.
326 #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U
327  static const unsigned int MaxUnfragmentedMessageSize =
329 
330 private:
331  typedef std::deque<message_info_ptr> request_queue;
332 
333  // For now, use hashed_unique, but for multiple responses will need
334  // hashed_non_unique.
335  typedef boost::multi_index_container<
337  boost::multi_index::indexed_by<
338  boost::multi_index::hashed_unique<
339  boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
340  &MessageInfo::token> >
341  > > response_map;
342 
343  typedef boost::multi_index_container<
345  boost::multi_index::indexed_by<
346  boost::multi_index::hashed_unique<
347  boost::multi_index::composite_key<
348  MessageInfo,
349  boost::multi_index::const_mem_fun<MessageInfo, const string &,
350  &MessageInfo::sender>,
351  boost::multi_index::const_mem_fun<MessageInfo, wisp_call_role,
352  &MessageInfo::role>,
353  boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
354  &MessageInfo::token> > >
355  > > jumbo_message_map;
356 
357  Mailbox _mbox;
358  wisp_call_token _call_token;
359  packer_type _packer;
360  unpacker_type _unpacker;
361 
363  request_queue _requests;
364  response_map _responses;
365  jumbo_message_map _jumbo_messages;
366  jumbo_message_map::key_from_value _jumbo_key_from_value;
367 
368  const unsigned int _initial_message_capacity;
369  // scratch variable
370  message_info_ptr _info;
371  MessageInfo _send_info;
372 
373  static void throwCallException(const wisp_message_protocol expected_proto,
374  const wisp_message_protocol actual_proto,
375  const wisp_message_id expected_id,
376  const wisp_message_id actual_id)
377  SSRC_DECL_THROW(CallException)
378  {
379  std::ostringstream message;
380  message << "Mismatched message id or protocol.\nExpected protocol: "
381  << expected_proto << " Actual protocol: " << actual_proto
382  << "\nExpected id: " << expected_id
383  << " Actual id: " << actual_id << std::endl;
384  throw CallException(message.str());
385  }
386 
387  static void throwCallException(const wisp_call_token expected_token,
388  const wisp_call_token actual_token)
389  SSRC_DECL_THROW(CallException)
390  {
391  std::ostringstream message;
392  message << "Mismatched call token.\nExpected token: "
393  << expected_token << " Actual token: " << actual_token
394  << std::endl;
395  throw CallException(message.str());
396  }
397 
398  void reset_receive_info() {
399  _info.reset(new MessageInfo(std::min(_info->message.capacity(), __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE)));
400  }
401 
402  void push_request() {
403  _requests.push_back(_info);
404  reset_receive_info();
405  }
406 
407  void insert_response() {
408  _responses.insert(_info);
409  reset_receive_info();
410  }
411 
412  enum JumboFragmentResult {
413  JumboFragmentFirstFragment,
414  JumboFragmentNextFragment,
415  JumboFragmentCompleteRequest,
416  JumboFragmentCompleteResponse
417  };
418 
419  JumboFragmentResult insert_jumbo_fragment(const message_info_ptr & msginfo,
420  const unsigned int header_size)
421  {
422  JumboFragmentResult result = JumboFragmentNextFragment;
423  Message & message = msginfo->message;
424  jumbo_message_map::iterator it =
425  _jumbo_messages.find(_jumbo_key_from_value(msginfo));
426 
427  if(it == _jumbo_messages.end()) {
428  // Pre-allocate memory for remaining fragments.
429  message_info_ptr info(new MessageInfo(msginfo->header.message_size));
430  message.seek(message.size());
431  *info = *msginfo;
432  _jumbo_messages.insert(info);
433  result = JumboFragmentFirstFragment;
434  } else {
435  const message_info_ptr & info = *it;
436 
437  info->message.write(&message[header_size], message.size()-header_size);
438 
439  if(info->message.offset() >= info->header.message_size) {
440  info->message.seek(header_size);
441  if(info->role() == TwoWayResponse) {
442  _responses.insert(info);
443  result = JumboFragmentCompleteResponse;
444  } else {
445  _requests.push_back(info);
446  result = JumboFragmentCompleteRequest;
447  }
448  _jumbo_messages.erase(it);
449  }
450  }
451  return result;
452  }
453 
454  // Receives call responses, but not requests and membership messages
455  template<typename MessageType>
456  void receive_response(MessageType & msg, const wisp_call_token token)
457  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
458  CallException)
459  {
460  begin_receive_response:
461  typename response_map::iterator it(_responses.find(token));
462 
463  if(it == _responses.end()) {
464  if(!_info.unique()) {
465  reset_receive_info();
466  }
467  do {
468  Message & message = _info->message;
469 
470  _mbox.receive(message);
471  _mbox.copy_groups(_info->groups);
472 
473  if(message.is_regular()) {
474  CallHeader & header = _info->header;
475 
476  message.rewind();
477 
478  const unsigned int header_size = _unpacker.unpack(header, message);
479 
480  if(header.is_jumbo_fragment()) {
481  const JumboFragmentResult jumbo_result =
482  insert_jumbo_fragment(_info, header_size);
483  if(jumbo_result == JumboFragmentCompleteResponse) {
484  goto begin_receive_response;
485  }
486  } else {
487  if(header.role == TwoWayResponse) {
488  if(header.token == token)
489  break;
490  else
491  insert_response();
492  } else
493  push_request();
494  }
495  } else {
496  _info->clear_header();
497  push_request();
498  }
499  } while(true);
500  unpack(msg, *_info);
501  } else {
502  unpack(msg, *(*it));
503  _responses.erase(it);
504  }
505  }
506 
507  // We assume integers wrap around to min at max.
508  wisp_call_token next_token() {
509  return _call_token++;
510  }
511 
512  template<typename DestinationType, typename MessageType>
513  void send(const DestinationType & dest,
514  const MessageType & msg,
515  const wisp_call_token token,
516  const CallRole role,
517  const Message::Service service)
518  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
519  {
520  bool done_packing(false);
521 
522  _send_info.init_header(msg.id, token, role);
523 
524  // We increase the message buffer size by a factor of two and try
525  // again when the write area is exhausted. We don't try to
526  // prevent an infinite loop because the message we are writing is
527  // finite in size; therefore the loop must terminate. See WISP-11
528  // for more information.
529 
530 #define __WISP_WRITE_AREA_EXHAUSTED "write area exhausted"
531 
532  do {
533  try {
534  _send_info.message.rewind();
535  _packer.pack(_send_info.header, _send_info.message);
536  _packer.pack(msg, _send_info.message);
537  done_packing = true;
538  } catch(const std::ios_base::failure & e) {
539  constexpr std::size_t len = sizeof(__WISP_WRITE_AREA_EXHAUSTED) - 1;
540  if(std::strncmp(e.what(), __WISP_WRITE_AREA_EXHAUSTED, len) == 0) {
541  _send_info.message.resize(2*_send_info.message.capacity());
542  } else {
543  throw;
544  }
545  }
546  } while(!done_packing);
547 
548 #undef __WISP_WRITE_AREA_EXHAUSTED
549 
550  _send_info.message.set_type(msg.protocol);
551  _send_info.message.set_service(service);
552 
553  if(_send_info.message.size() <= MaxUnfragmentedMessageSize) {
554  _mbox.send(_send_info.message, dest);
555  } else {
556  const unsigned int message_size =
557  _send_info.header.message_size = _send_info.message.size();
558  _send_info.message.rewind();
559 
560  const unsigned int header_size =
561  _packer.pack(_send_info.header, _send_info.message);
562  const unsigned int max_fragment_size =
563  MaxUnfragmentedMessageSize - header_size;
564 
565  // We never fragment a header, so there's no risk of
566  // header_size equaling message_size..
567  for(unsigned int offset = header_size, size = 0;
568  offset < message_size; offset+=size)
569  {
570  size = std::min(max_fragment_size, message_size - offset);
571  _mbox.clear_message_parts();
572  _mbox.add_message_part(&_send_info.message[0], header_size);
573  _mbox.add_message_part(&_send_info.message[offset], size);
574  _mbox.send(dest, msg.protocol, service);
575  }
576  _mbox.clear_message_parts();
577  }
578  }
579 
580 public:
581  // Using jumbo_message_map::key_type doesn't work because it's a composite
582  // key that stores a reference, so we have to use a compatible key.
583  typedef
584  boost::tuple<string, wisp_call_role, wisp_call_token> jumbo_message_key_type;
585 
586  template<typename ReturnType>
587  class Future {
588  friend class Caller;
589 
590  bool _valid;
591  wisp_call_token _token;
592  Caller & _caller;
593 
594  Future(Caller & caller, const wisp_call_token token) :
595  _valid(true), _token(token), _caller(caller)
596  { }
597 
598  public:
599  typedef ReturnType return_type;
600  typedef boost::shared_ptr<Future> shared_ptr;
601 
602  // Note, you can't sit in a while(!future.ready()) loop because
603  // there's no receiver thread.
604  bool ready() const {
605  return _caller.returned(_token);
606  }
607 
608  bool valid() const {
609  return _valid;
610  }
611 
612  wisp_call_token token() const {
613  return _token;
614  }
615 
616  void receive(return_type & result)
617  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
619  {
620  if(!valid())
621  throw CallException("Invalid future.");
622 
623  _caller.receive_response(result, _token);
624 
625  _valid = false;
626  }
627 
628  // Exclusively for supporting continuations in Service
629  void unpack(return_type & result, MessageInfo & info)
630  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
632  {
633  if(!valid())
634  throw CallException("Invalid future.");
635 
636  if(info.token() == _token)
637  _caller.unpack(result, info);
638  else
639  throwCallException(_token, info.token());
640 
641  _valid = false;
642  }
643  };
644 
645  explicit
646  Caller(const string & connection = "",
647  const string & name = "",
648  const unsigned int message_capacity = Message::DefaultCapacity,
649  const bool group_membership = GroupMembershipDisable) :
650  _mbox(connection, name, group_membership), _call_token(0),
651  _packer(), _unpacker(), _requests(), _responses(),
652  _jumbo_messages(),
653  _jumbo_key_from_value(_jumbo_messages.key_extractor()),
654  _initial_message_capacity(message_capacity),
655  _info(new MessageInfo(std::min(message_capacity, __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE))),
656  _send_info(message_capacity)
657  { }
658 
659 #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE
660 
661  // Not intended to be subclassed outside of Wisp.
662  //virtual ~Caller() = default;
663 
664  // We expose this for allowing retrieval of group information.
665  const Mailbox & mbox() {
666  return _mbox;
667  }
668 
669  packer_type & packer() {
670  return _packer;
671  }
672 
673  unpacker_type & unpacker() {
674  return _unpacker;
675  }
676 
677  wisp_call_token call_token() {
678  return _call_token;
679  }
680 
681  const string & name() const {
682  return _mbox.private_group();
683  }
684 
685  bool group_membership() const {
686  return _mbox.group_membership();
687  }
688 
689  void join(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
690  _mbox.join(group);
691  }
692 
693  void leave(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
694  _mbox.leave(group);
695  }
696 
697  unsigned int message_capacity_initial() const {
698  return _initial_message_capacity;
699  }
700 
701  unsigned int message_capacity_receive() const {
702  return _info->message.capacity();
703  }
704 
705  unsigned int message_capacity_send() const {
706  return _send_info.message.capacity();
707  }
708 
709  unsigned int request_queue_size() const {
710  return _requests.size();
711  }
712 
713  unsigned int response_map_size() const {
714  return _responses.size();
715  }
716 
717  bool returned(const wisp_call_token token) const {
718  return (_responses.find(token) != _responses.end());
719  }
720 
721  unsigned int count_jumbo_messages() const {
722  return _jumbo_messages.size();
723  }
724 
725  template<typename key_container>
726  void collect_jumbo_message_keys(key_container & container) {
727  for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(),
728  end = _jumbo_messages.end(); it != end; ++it)
729  {
730  const MessageInfo *info = it->get();
731  container.insert(container.end(), jumbo_message_key_type(info->sender(), info->role(), info->token()));
732  }
733  }
734 
735  template<typename iterator_type>
736  void erase_jumbo_messages(const iterator_type & begin,
737  const iterator_type & end)
738  {
739  // You can't erase a compatible key, so we have to perform a lookup first
740  for(iterator_type it = begin; it != end; ++it) {
741  jumbo_message_map::iterator msg = _jumbo_messages.find(*it);
742  if(msg != _jumbo_messages.end())
743  _jumbo_messages.erase(msg);
744  }
745  }
746 
747  // Call only after unpacking CallHeader so message offset is correct.
748  template<typename MessageType>
749  void unpack(MessageType & msg, MessageInfo & info)
750  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
752  {
753  if(info.id() == MessageType::id &&
754  info.protocol() == MessageType::protocol)
755  _unpacker.unpack(msg, info.message);
756  else
757  throwCallException(MessageType::protocol, info.protocol(),
758  MessageType::id, info.id());
759  }
760 
767  void receive(message_info_ptr & info)
768  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
769  {
770  // Drain responses first, then requests.
771  if(!_responses.empty()) {
772  response_map::iterator it = _responses.begin();
773  info = *it;
774  _responses.erase(it);
775  } else if(!_requests.empty()) {
776  info = _requests.front();
777  _requests.pop_front();
778  } else {
779  do {
780  Message & message = _info->message;
781 
782  _mbox.receive(message);
783  _mbox.copy_groups(_info->groups);
784 
785  if(message.is_regular()) {
786  CallHeader & header = _info->header;
787 
788  message.rewind();
789 
790  const unsigned int header_size = _unpacker.unpack(header, message);
791 
792  if(header.is_jumbo_fragment()) {
793  const JumboFragmentResult jumbo_result =
794  insert_jumbo_fragment(_info, header_size);
795  if(jumbo_result == JumboFragmentCompleteResponse) {
796  response_map::iterator it = _responses.begin();
797  info = *it;
798  _responses.erase(it);
799  return;
800  } else if(jumbo_result == JumboFragmentCompleteRequest) {
801  info = _requests.front();
802  _requests.pop_front();
803  return;
804  }
805  } else
806  break;
807  } else {
808  _info->clear_header();
809  break;
810  }
811  } while(true);
812  info = _info;
813  }
814  }
815 
816  // Also implement timeout.
817  // Receives requests and membership messages, but not call responses
818  void receive_request(message_info_ptr & info)
819  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
820  {
821  if(!_requests.empty()) {
822  info = _requests.front();
823  _requests.pop_front();
824  } else {
825  if(!_info.unique()) {
826  reset_receive_info();
827  }
828  do {
829  Message & message = _info->message;
830 
831  _mbox.receive(message);
832  _mbox.copy_groups(_info->groups);
833 
834  if(message.is_regular()) {
835  CallHeader & header = _info->header;
836 
837  message.rewind();
838 
839  const unsigned int header_size = _unpacker.unpack(header, message);
840 
841  if(header.is_jumbo_fragment()) {
842  const JumboFragmentResult jumbo_result =
843  insert_jumbo_fragment(_info, header_size);
844  if(jumbo_result == JumboFragmentCompleteRequest) {
845  info = _requests.front();
846  _requests.pop_front();
847  return;
848  }
849  } else {
850  if(header.role == TwoWayResponse)
851  insert_response();
852  else
853  break;
854  }
855  } else {
856  _info->clear_header();
857  break;
858  }
859  } while(true);
860  info = _info;
861  }
862  }
863 
864  template<typename Traits, typename DestinationType>
865  void send(const DestinationType & dest,
866  const typename Traits::parameter_type & param,
867  const Message::Service service = DefaultMessageServiceType)
868  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
869  {
870  static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
871  send(dest, param, next_token(), OneWayRequest, service);
872  }
873 
874  template<typename Traits, typename DestinationType>
875  void reply(const DestinationType & dest,
876  const wisp_call_token token,
877  const typename Traits::parameter_type & param,
878  const Message::Service service = DefaultMessageServiceType)
879  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
880  {
881  static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
882  send(dest, param, token, TwoWayResponse, service);
883  }
884 
885  // Synchronous call.
886  template<typename Traits, typename DestinationType>
887  void call(const DestinationType & dest,
888  typename Traits::return_type *ret,
889  const typename Traits::parameter_type & param,
890  const Message::Service service = DefaultMessageServiceType)
891  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
893  {
894  static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
895  wisp_call_token token = next_token();
896  send(dest, param, token, TwoWayRequest, service);
897  receive_response(*ret, token);
898  }
899 
900  // Asynchronous call.
901  template<typename Traits, typename DestinationType>
902  typename Future<typename Traits::return_type>::shared_ptr
903  call(const DestinationType & dest,
904  const typename Traits::parameter_type & param,
905  const Message::Service service = DefaultMessageServiceType)
906  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
907  {
908  static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
909  typedef Future<typename Traits::return_type> future_type;
910  wisp_call_token token = next_token();
911 
912  send(dest, param, token, TwoWayRequest, service);
913 
914  return typename future_type::shared_ptr(new future_type(*this, token));
915  }
916 
917 private:
918  template<CallType call_type> struct CallTag { };
919 
920  template<typename Traits, typename DestinationType>
921  typename Future<typename Traits::return_type>::shared_ptr
922  _call(const CallTag<OneWay> &,
923  const DestinationType & dest,
924  const typename Traits::parameter_type & param,
925  const Message::Service service = DefaultMessageServiceType)
926  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
927  {
928  typedef Future<typename Traits::return_type> future_type;
929  send<Traits>(dest, param, service);
930  return typename future_type::shared_ptr();
931  }
932 
933  template<typename Traits, typename DestinationType>
934  typename Future<typename Traits::return_type>::shared_ptr
935  _call(const CallTag<TwoWay> &,
936  const DestinationType & dest,
937  const typename Traits::parameter_type & param,
938  const Message::Service service = DefaultMessageServiceType)
939  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
940  {
941  return call<Traits>(dest, param, service);
942  }
943 
944 public:
945 
946  template<typename Traits, typename DestinationType>
947  void operator()(const Traits &,
948  const DestinationType & dest,
949  const wisp_call_token token,
950  const typename Traits::parameter_type & param,
951  const Message::Service service =
953  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
954  {
955  reply<Traits>(dest, token, param, service);
956  }
957 
958  template<typename Traits, typename DestinationType>
959  void operator()(const Traits &,
960  const DestinationType & dest,
961  typename Traits::return_type *ret,
962  const typename Traits::parameter_type & param,
963  const Message::Service service =
965  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
967  {
968  call<Traits>(dest, ret, param, service);
969  }
970 
971  template<typename Traits, typename DestinationType>
972  typename Future<typename Traits::return_type>::shared_ptr
973  operator()(const Traits &,
974  const DestinationType & dest,
975  const typename Traits::parameter_type & param,
976  const Message::Service service = DefaultMessageServiceType)
977  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
978  {
979  return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service);
980  }
981 
982 
983  // Convenience functions that invoke parameter_type constructor for
984  // you. We permit up to WISP_CALLER_MAX_PARAMETERS - 1
985  // parameter_type arguments. Any more, and you should probably use
986  // a different approach.
987 
988  // Send.
989 
990  template<typename Traits, typename DestinationType>
991  void send(const DestinationType & dest,
992  const Message::Service service = DefaultMessageServiceType)
993  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
994  {
995  send<Traits>(dest, typename Traits::parameter_type(), service);
996  }
997 
998  template<typename Traits, typename DestinationType, typename... P>
999  void send(const Message::Service service,
1000  const DestinationType & dest,
1001  P && ...p)
1002  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1003  {
1004  send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...),
1005  service);
1006  }
1007 
1008  template<typename Traits, typename DestinationType, typename... P>
1009  void sendp(const DestinationType & dest, P && ...p)
1010 
1011  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1012  {
1013  send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...));
1014  }
1015 
1016  // Reply.
1017 
1018  template<typename Traits, typename DestinationType>
1019  void reply(const DestinationType & dest,
1020  const wisp_call_token token,
1021  const Message::Service service = DefaultMessageServiceType)
1022  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1023  {
1024  reply<Traits>(dest, token, typename Traits::parameter_type(), service);
1025  }
1026 
1027  template<typename Traits, typename DestinationType>
1028  void operator()(const Traits &,
1029  const DestinationType & dest,
1030  const wisp_call_token token,
1031  const Message::Service service =
1033  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1034  {
1035  reply<Traits>(dest, token, typename Traits::parameter_type(), service);
1036  }
1037 
1038  template<typename Traits, typename DestinationType, typename... P>
1039  void reply(const Message::Service service,
1040  const DestinationType & dest,
1041  const wisp_call_token token,
1042  P && ...p)
1043  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1044  {
1045  reply<Traits>(dest, token,
1046  typename Traits::parameter_type(std::forward<P>(p)...),
1047  service);
1048  }
1049 
1050  template<typename Traits, typename DestinationType, typename... P>
1051  void replyp(const DestinationType & dest,
1052  const wisp_call_token token,
1053  P && ...p)
1054  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1055  {
1056  reply<Traits>(dest, token,
1057  typename Traits::parameter_type(std::forward<P>(p)...));
1058  }
1059 
1060  // Synchronous call.
1061  template<typename Traits, typename DestinationType>
1062  void call(const DestinationType & dest,
1063  typename Traits::return_type *ret,
1064  const Message::Service service = DefaultMessageServiceType)
1065  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1066  CallException)
1067  {
1068  call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1069  }
1070 
1071  template<typename Traits, typename DestinationType>
1072  void operator()(const Traits &,
1073  const DestinationType & dest,
1074  typename Traits::return_type *ret,
1075  const Message::Service service = DefaultMessageServiceType)
1076  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1077  CallException)
1078  {
1079  call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1080  }
1081 
1082  template<typename Traits, typename DestinationType, typename... P>
1083  void call(const Message::Service service,
1084  const DestinationType & dest,
1085  typename Traits::return_type *ret,
1086  P && ...p)
1087  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1088  {
1089  call<Traits>(dest, ret,
1090  typename Traits::parameter_type(std::forward<P>(p)...),
1091  service);
1092  }
1093 
1094  template<typename Traits, typename DestinationType, typename... P>
1095  void callp(const DestinationType & dest,
1096  typename Traits::return_type *ret,
1097  P && ...p)
1098  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1099  {
1100  call<Traits>(dest, ret,
1101  typename Traits::parameter_type(std::forward<P>(p)...));
1102  }
1103 
1104  // Asynchronous call.
1105  template<typename Traits, typename DestinationType>
1106  typename Future<typename Traits::return_type>::shared_ptr
1107  call(const DestinationType & dest,
1108  const Message::Service service = DefaultMessageServiceType)
1109  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1110  {
1111  return call<Traits>(dest, typename Traits::parameter_type(), service);
1112  }
1113 
1114  template<typename Traits, typename DestinationType, typename... P>
1115  typename Future<typename Traits::return_type>::shared_ptr
1116  call(const Message::Service service, const DestinationType & dest,
1117  P && ...p)
1118  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1119  {
1120  return call<Traits>(dest,
1121  typename Traits::parameter_type(std::forward<P>(p)...),
1122  service);
1123  }
1124 
1125  template<typename Traits, typename DestinationType, typename... P>
1126  typename Future<typename Traits::return_type>::shared_ptr
1127  callp(const DestinationType & dest, P && ...p)
1128  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1129  {
1130  return call<Traits>(dest,
1131  typename Traits::parameter_type(std::forward<P>(p)...));
1132  }
1133 
1134  // Dual-purpose operator
1135 
1136  template<typename Traits, typename DestinationType>
1137  typename Future<typename Traits::return_type>::shared_ptr
1138  operator()(const Traits &,
1139  const DestinationType & dest,
1140  const Message::Service service = DefaultMessageServiceType)
1141  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1142  {
1143  return
1144  _call<Traits>(CallTag<Traits::call_type>(),
1145  dest, typename Traits::parameter_type(), service);
1146  }
1147 };
1148 
1150 
1151 #endif

Savarese Software Research Corporation
Copyright © 2006-2012 Savarese Software Research Corporation. All rights reserved.