22 #ifndef __SSRC_WISP_PROTOCOL_CALLER_H
23 #define __SSRC_WISP_PROTOCOL_CALLER_H
25 #include <ssrc/spread.h>
32 #include <boost/shared_ptr.hpp>
33 #include <boost/multi_index_container.hpp>
34 #include <boost/multi_index/mem_fun.hpp>
35 #include <boost/multi_index/hashed_index.hpp>
36 #include <boost/multi_index/composite_key.hpp>
41 using NS_SSRC_SPREAD::GroupList;
42 using NS_SSRC_SPREAD::Message;
43 using NS_SSRC_SPREAD::Mailbox;
86 template<
typename CallerType,
typename ParameterType,
102 typedef typename caller_type::template Future<return_type>
future_type;
115 template<
typename CallerType,
typename ParameterType>
136 #define WISP_ONE_WAY_CALL(caller, method) \
137 typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \
147 #define WISP_TWO_WAY_CALL(caller, method, result) \
148 typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \
159 std::runtime_error(message)
223 const unsigned int size = 0) :
224 id(id), token(token), role(role), message_size(size) { }
230 id = 0, token = 0, role = 0, message_size = 0;
239 const unsigned int _size = 0)
241 id = _id, token = _token, role = _role, message_size = _size;
249 return (message_size > 0);
252 template<
class Archive>
254 ar &
id & token & role & message_size;
271 MessageInfo(
const unsigned int message_capacity = Message::DefaultCapacity) :
272 header(), message(message_capacity), groups()
276 return message.type();
292 return message.sender();
299 header.
init(
id, token, role);
315 template<
typename PT = BinaryPackingTraits>
317 template<
typename ReturnType>
friend class Future;
326 #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U
327 static const unsigned int MaxUnfragmentedMessageSize =
331 typedef std::deque<message_info_ptr> request_queue;
335 typedef boost::multi_index_container<
337 boost::multi_index::indexed_by<
338 boost::multi_index::hashed_unique<
340 &MessageInfo::token> >
343 typedef boost::multi_index_container<
345 boost::multi_index::indexed_by<
346 boost::multi_index::hashed_unique<
347 boost::multi_index::composite_key<
349 boost::multi_index::const_mem_fun<MessageInfo,
const string &,
350 &MessageInfo::sender>,
354 &MessageInfo::token> > >
355 > > jumbo_message_map;
358 wisp_call_token _call_token;
360 unpacker_type _unpacker;
363 request_queue _requests;
364 response_map _responses;
365 jumbo_message_map _jumbo_messages;
366 jumbo_message_map::key_from_value _jumbo_key_from_value;
368 const unsigned int _initial_message_capacity;
370 message_info_ptr _info;
371 MessageInfo _send_info;
379 std::ostringstream message;
380 message <<
"Mismatched message id or protocol.\nExpected protocol: "
381 << expected_proto <<
" Actual protocol: " << actual_proto
382 <<
"\nExpected id: " << expected_id
383 <<
" Actual id: " << actual_id << std::endl;
387 static void throwCallException(
const wisp_call_token expected_token,
388 const wisp_call_token actual_token)
391 std::ostringstream message;
392 message <<
"Mismatched call token.\nExpected token: "
393 << expected_token <<
" Actual token: " << actual_token
398 void reset_receive_info() {
402 void push_request() {
403 _requests.push_back(_info);
404 reset_receive_info();
407 void insert_response() {
408 _responses.insert(_info);
409 reset_receive_info();
412 enum JumboFragmentResult {
413 JumboFragmentFirstFragment,
414 JumboFragmentNextFragment,
415 JumboFragmentCompleteRequest,
416 JumboFragmentCompleteResponse
419 JumboFragmentResult insert_jumbo_fragment(
const message_info_ptr & msginfo,
420 const unsigned int header_size)
422 JumboFragmentResult result = JumboFragmentNextFragment;
423 Message & message = msginfo->message;
424 jumbo_message_map::iterator it =
425 _jumbo_messages.find(_jumbo_key_from_value(msginfo));
427 if(it == _jumbo_messages.end()) {
429 message_info_ptr info(
new MessageInfo(msginfo->header.message_size));
430 message.seek(message.size());
432 _jumbo_messages.insert(info);
433 result = JumboFragmentFirstFragment;
435 const message_info_ptr & info = *it;
437 info->message.write(&message[header_size], message.size()-header_size);
439 if(info->message.offset() >= info->header.message_size) {
440 info->message.seek(header_size);
442 _responses.insert(info);
443 result = JumboFragmentCompleteResponse;
445 _requests.push_back(info);
446 result = JumboFragmentCompleteRequest;
448 _jumbo_messages.erase(it);
455 template<
typename MessageType>
456 void receive_response(MessageType & msg,
const wisp_call_token token)
457 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
460 begin_receive_response:
461 typename response_map::iterator it(_responses.find(token));
463 if(it == _responses.end()) {
464 if(!_info.unique()) {
465 reset_receive_info();
468 Message & message = _info->message;
470 _mbox.receive(message);
471 _mbox.copy_groups(_info->groups);
473 if(message.is_regular()) {
474 CallHeader & header = _info->header;
478 const unsigned int header_size = _unpacker.unpack(header, message);
480 if(header.is_jumbo_fragment()) {
481 const JumboFragmentResult jumbo_result =
482 insert_jumbo_fragment(_info, header_size);
483 if(jumbo_result == JumboFragmentCompleteResponse) {
484 goto begin_receive_response;
488 if(header.token == token)
496 _info->clear_header();
503 _responses.erase(it);
508 wisp_call_token next_token() {
509 return _call_token++;
512 template<
typename DestinationType,
typename MessageType>
513 void send(
const DestinationType & dest,
514 const MessageType & msg,
515 const wisp_call_token token,
517 const Message::Service service)
518 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
520 bool done_packing(
false);
530 #define __WISP_WRITE_AREA_EXHAUSTED "write area exhausted"
536 _packer.pack(msg, _send_info.
message);
538 }
catch(
const std::ios_base::failure & e) {
546 }
while(!done_packing);
548 #undef __WISP_WRITE_AREA_EXHAUSTED
550 _send_info.
message.set_type(msg.protocol);
551 _send_info.
message.set_service(service);
553 if(_send_info.
message.size() <= MaxUnfragmentedMessageSize) {
554 _mbox.send(_send_info.
message, dest);
556 const unsigned int message_size =
560 const unsigned int header_size =
562 const unsigned int max_fragment_size =
563 MaxUnfragmentedMessageSize - header_size;
567 for(
unsigned int offset = header_size, size = 0;
568 offset < message_size; offset+=size)
570 size = std::min(max_fragment_size, message_size - offset);
571 _mbox.clear_message_parts();
572 _mbox.add_message_part(&_send_info.
message[0], header_size);
573 _mbox.add_message_part(&_send_info.
message[offset], size);
574 _mbox.send(dest, msg.protocol, service);
576 _mbox.clear_message_parts();
586 template<
typename ReturnType>
591 wisp_call_token _token;
595 _valid(true), _token(token), _caller(caller)
617 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
623 _caller.receive_response(result, _token);
629 void unpack(return_type & result, MessageInfo & info)
630 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
636 if(info.token() == _token)
637 _caller.
unpack(result, info);
639 throwCallException(_token, info.token());
647 const string & name =
"",
648 const unsigned int message_capacity = Message::DefaultCapacity,
650 _mbox(connection, name, group_membership), _call_token(0),
651 _packer(), _unpacker(), _requests(), _responses(),
653 _jumbo_key_from_value(_jumbo_messages.key_extractor()),
654 _initial_message_capacity(message_capacity),
656 _send_info(message_capacity)
659 #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE
682 return _mbox.private_group();
686 return _mbox.group_membership();
689 void join(
const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
693 void leave(
const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
698 return _initial_message_capacity;
702 return _info->message.capacity();
706 return _send_info.
message.capacity();
710 return _requests.size();
714 return _responses.size();
718 return (_responses.find(token) != _responses.end());
722 return _jumbo_messages.size();
725 template<
typename key_container>
727 for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(),
728 end = _jumbo_messages.end(); it != end; ++it)
730 const MessageInfo *info = it->get();
731 container.insert(container.end(), jumbo_message_key_type(info->
sender(), info->
role(), info->
token()));
735 template<
typename iterator_type>
737 const iterator_type & end)
740 for(iterator_type it = begin; it != end; ++it) {
741 jumbo_message_map::iterator msg = _jumbo_messages.find(*it);
742 if(msg != _jumbo_messages.end())
743 _jumbo_messages.erase(msg);
748 template<
typename MessageType>
749 void unpack(MessageType & msg, MessageInfo & info)
750 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
753 if(info.id() == MessageType::id &&
754 info.protocol() == MessageType::protocol)
755 _unpacker.unpack(msg, info.message);
757 throwCallException(MessageType::protocol, info.protocol(),
758 MessageType::id, info.id());
768 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
771 if(!_responses.empty()) {
772 response_map::iterator it = _responses.begin();
774 _responses.erase(it);
775 }
else if(!_requests.empty()) {
776 info = _requests.front();
777 _requests.pop_front();
780 Message & message = _info->message;
782 _mbox.receive(message);
783 _mbox.copy_groups(_info->groups);
785 if(message.is_regular()) {
790 const unsigned int header_size = _unpacker.unpack(header, message);
793 const JumboFragmentResult jumbo_result =
794 insert_jumbo_fragment(_info, header_size);
795 if(jumbo_result == JumboFragmentCompleteResponse) {
796 response_map::iterator it = _responses.begin();
798 _responses.erase(it);
800 }
else if(jumbo_result == JumboFragmentCompleteRequest) {
801 info = _requests.front();
802 _requests.pop_front();
808 _info->clear_header();
819 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
821 if(!_requests.empty()) {
822 info = _requests.front();
823 _requests.pop_front();
825 if(!_info.unique()) {
826 reset_receive_info();
829 Message & message = _info->message;
831 _mbox.receive(message);
832 _mbox.copy_groups(_info->groups);
834 if(message.is_regular()) {
839 const unsigned int header_size = _unpacker.unpack(header, message);
842 const JumboFragmentResult jumbo_result =
843 insert_jumbo_fragment(_info, header_size);
844 if(jumbo_result == JumboFragmentCompleteRequest) {
845 info = _requests.front();
846 _requests.pop_front();
856 _info->clear_header();
864 template<
typename Traits,
typename DestinationType>
865 void send(
const DestinationType & dest,
866 const typename Traits::parameter_type & param,
868 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
870 static_assert(Traits::call_type ==
OneWay,
"expected call_type OneWay");
874 template<
typename Traits,
typename DestinationType>
875 void reply(
const DestinationType & dest,
876 const wisp_call_token token,
877 const typename Traits::parameter_type & param,
879 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
881 static_assert(Traits::call_type ==
OneWay,
"expected call_type OneWay");
886 template<
typename Traits,
typename DestinationType>
887 void call(
const DestinationType & dest,
888 typename Traits::return_type *ret,
889 const typename Traits::parameter_type & param,
891 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
894 static_assert(Traits::call_type ==
TwoWay,
"expected call_type TwoWay");
895 wisp_call_token token = next_token();
897 receive_response(*ret, token);
901 template<
typename Traits,
typename DestinationType>
902 typename Future<typename Traits::return_type>::shared_ptr
903 call(
const DestinationType & dest,
904 const typename Traits::parameter_type & param,
906 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
908 static_assert(Traits::call_type ==
TwoWay,
"expected call_type TwoWay");
910 wisp_call_token token = next_token();
914 return typename future_type::shared_ptr(
new future_type(*
this, token));
918 template<CallType call_type>
struct CallTag { };
920 template<
typename Traits,
typename DestinationType>
921 typename Future<typename Traits::return_type>::shared_ptr
922 _call(
const CallTag<OneWay> &,
923 const DestinationType & dest,
924 const typename Traits::parameter_type & param,
926 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
928 typedef Future<typename Traits::return_type> future_type;
929 send<Traits>(dest, param, service);
930 return typename future_type::shared_ptr();
933 template<
typename Traits,
typename DestinationType>
934 typename Future<typename Traits::return_type>::shared_ptr
935 _call(
const CallTag<TwoWay> &,
936 const DestinationType & dest,
937 const typename Traits::parameter_type & param,
939 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
941 return call<Traits>(dest, param, service);
946 template<
typename Traits,
typename DestinationType>
948 const DestinationType & dest,
949 const wisp_call_token token,
950 const typename Traits::parameter_type & param,
951 const Message::Service service =
953 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
955 reply<Traits>(dest, token, param, service);
958 template<
typename Traits,
typename DestinationType>
960 const DestinationType & dest,
961 typename Traits::return_type *ret,
962 const typename Traits::parameter_type & param,
963 const Message::Service service =
965 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
968 call<Traits>(dest, ret, param, service);
971 template<
typename Traits,
typename DestinationType>
972 typename Future<typename Traits::return_type>::shared_ptr
974 const DestinationType & dest,
975 const typename Traits::parameter_type & param,
977 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
979 return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service);
990 template<
typename Traits,
typename DestinationType>
991 void send(
const DestinationType & dest,
993 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
995 send<Traits>(dest,
typename Traits::parameter_type(), service);
998 template<
typename Traits,
typename DestinationType,
typename... P>
999 void send(
const Message::Service service,
1000 const DestinationType & dest,
1002 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1004 send<Traits>(dest,
typename Traits::parameter_type(std::forward<P>(p)...),
1008 template<
typename Traits,
typename DestinationType,
typename... P>
1009 void sendp(
const DestinationType & dest, P && ...p)
1011 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1013 send<Traits>(dest,
typename Traits::parameter_type(std::forward<P>(p)...));
1018 template<
typename Traits,
typename DestinationType>
1019 void reply(
const DestinationType & dest,
1020 const wisp_call_token token,
1022 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1024 reply<Traits>(dest, token,
typename Traits::parameter_type(), service);
1027 template<
typename Traits,
typename DestinationType>
1029 const DestinationType & dest,
1030 const wisp_call_token token,
1031 const Message::Service service =
1033 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1035 reply<Traits>(dest, token,
typename Traits::parameter_type(), service);
1038 template<
typename Traits,
typename DestinationType,
typename... P>
1039 void reply(
const Message::Service service,
1040 const DestinationType & dest,
1041 const wisp_call_token token,
1043 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1045 reply<Traits>(dest, token,
1046 typename Traits::parameter_type(std::forward<P>(p)...),
1050 template<
typename Traits,
typename DestinationType,
typename... P>
1052 const wisp_call_token token,
1054 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1056 reply<Traits>(dest, token,
1057 typename Traits::parameter_type(std::forward<P>(p)...));
1061 template<
typename Traits,
typename DestinationType>
1062 void call(
const DestinationType & dest,
1063 typename Traits::return_type *ret,
1065 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
1068 call<Traits>(dest, ret,
typename Traits::parameter_type(), service);
1071 template<
typename Traits,
typename DestinationType>
1073 const DestinationType & dest,
1074 typename Traits::return_type *ret,
1076 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
1079 call<Traits>(dest, ret,
typename Traits::parameter_type(), service);
1082 template<
typename Traits,
typename DestinationType,
typename... P>
1083 void call(
const Message::Service service,
1084 const DestinationType & dest,
1085 typename Traits::return_type *ret,
1087 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1089 call<Traits>(dest, ret,
1090 typename Traits::parameter_type(std::forward<P>(p)...),
1094 template<
typename Traits,
typename DestinationType,
typename... P>
1095 void callp(
const DestinationType & dest,
1096 typename Traits::return_type *ret,
1098 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1100 call<Traits>(dest, ret,
1101 typename Traits::parameter_type(std::forward<P>(p)...));
1105 template<
typename Traits,
typename DestinationType>
1106 typename Future<typename Traits::return_type>::shared_ptr
1109 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1111 return call<Traits>(dest,
typename Traits::parameter_type(), service);
1114 template<
typename Traits,
typename DestinationType,
typename... P>
1115 typename Future<typename Traits::return_type>::shared_ptr
1116 call(
const Message::Service service,
const DestinationType & dest,
1118 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1120 return call<Traits>(dest,
1121 typename Traits::parameter_type(std::forward<P>(p)...),
1125 template<
typename Traits,
typename DestinationType,
typename... P>
1126 typename Future<typename Traits::return_type>::shared_ptr
1127 callp(
const DestinationType & dest, P && ...p)
1128 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1130 return call<Traits>(dest,
1131 typename Traits::parameter_type(std::forward<P>(p)...));
1136 template<
typename Traits,
typename DestinationType>
1137 typename Future<typename Traits::return_type>::shared_ptr
1139 const DestinationType & dest,
1141 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1144 _call<Traits>(CallTag<Traits::call_type>(),
1145 dest,
typename Traits::parameter_type(), service);