Caller.h
Go to the documentation of this file.
00001 /* 00002 * Copyright 2006-2008 Savarese Software Research Corporation 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.savarese.com/software/ApacheLicense-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00023 #ifndef __SSRC_WISP_PROTOCOL_CALLER_H 00024 #define __SSRC_WISP_PROTOCOL_CALLER_H 00025 00026 #include <ssrc/spread.h> 00027 #include <ssrc/wisp/protocol/ServiceProtocol.h> 00028 00029 #include <queue> 00030 #include <sstream> 00031 #include <stdexcept> 00032 00033 #include <boost/shared_ptr.hpp> 00034 #include <boost/multi_index_container.hpp> 00035 #include <boost/multi_index/mem_fun.hpp> 00036 #include <boost/multi_index/hashed_index.hpp> 00037 #include <boost/multi_index/composite_key.hpp> 00038 00039 __BEGIN_NS_SSRC_WISP_PROTOCOL 00040 00041 using std::string; 00042 using NS_SSRC_SPREAD::GroupList; 00043 using NS_SSRC_SPREAD::Message; 00044 using NS_SSRC_SPREAD::Mailbox; 00045 00050 const Message::Service DefaultMessageServiceType = Message::SafeSelfDiscard; 00051 00060 enum CallType { 00062 OneWay, 00063 00065 TwoWay 00066 00067 /*, MultiWay */ 00068 }; 00069 00074 namespace detail { 00076 struct VoidReturnType { }; 00077 } 00078 00087 template<typename CallerType, typename ParameterType, 00088 typename ReturnType = detail::VoidReturnType> 00089 struct CallTraits { 00091 static const CallType call_type = TwoWay; 00092 00094 typedef ParameterType parameter_type; 00095 00097 typedef ReturnType return_type; 00098 00100 typedef CallerType caller_type; 00101 00103 typedef typename caller_type::template Future<return_type> future_type; 00104 00106 typedef typename future_type::shared_ptr future_ptr; 00107 }; 00108 00116 template<typename CallerType, typename ParameterType> 00117 struct CallTraits<CallerType, ParameterType, detail::VoidReturnType> { 00119 static const CallType call_type = OneWay; 00120 00122 typedef ParameterType parameter_type; 00123 00125 typedef detail::VoidReturnType return_type; 00126 00128 typedef CallerType caller_type; 00129 }; 00130 00137 #define WISP_ONE_WAY_CALL(caller, method) \ 00138 typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \ 00139 Call ## method 00140 00148 #define WISP_TWO_WAY_CALL(caller, method, result) \ 00149 typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \ 00150 Call ## method 00151 00157 class CallException : public std::runtime_error { 00158 public: 00159 explicit CallException(const string & message) : 00160 std::runtime_error(message) 00161 { } 00162 }; 00163 00168 typedef std::uint32_t wisp_call_token; 00169 00174 typedef std::uint8_t wisp_call_role; 00175 00176 00182 enum CallRole { 00184 OneWayRequest, 00185 00187 TwoWayRequest, 00188 00190 TwoWayResponse 00191 00192 /*, MultiWayResponse */ 00193 }; 00194 00195 00201 struct CallHeader { 00203 wisp_message_id id; 00205 wisp_call_token token; 00207 wisp_call_role role; 00211 unsigned int message_size; 00212 00216 CallHeader() : id(0), token(0), role(0), message_size(0) { } 00217 00221 CallHeader(const wisp_message_id id, 00222 const wisp_call_token token, 00223 const CallRole role, 00224 const unsigned int size = 0) : 00225 id(id), token(token), role(role), message_size(size) { } 00226 00230 void clear() { 00231 id = 0, token = 0, role = 0, message_size = 0; 00232 } 00233 00237 void init(const wisp_message_id _id, 00238 const wisp_call_token _token, 00239 const CallRole _role, 00240 const unsigned int _size = 0) 00241 { 00242 id = _id, token = _token, role = _role, message_size = _size; 00243 } 00244 00249 bool is_jumbo_fragment() const { 00250 return (message_size > 0); 00251 } 00252 00253 template<class Archive> 00254 void serialize(Archive & ar, const unsigned int) { 00255 ar & id & token & role & message_size; 00256 } 00257 }; 00258 00263 struct MessageInfo { 00265 CallHeader header; 00267 Message message; 00269 GroupList groups; 00270 00271 explicit 00272 MessageInfo(const unsigned int message_capacity = Message::DefaultCapacity) : 00273 header(), message(message_capacity), groups() 00274 { } 00275 00276 wisp_message_protocol protocol() const { 00277 return message.type(); 00278 } 00279 00280 wisp_message_id id() const { 00281 return header.id; 00282 } 00283 00284 wisp_call_token token() const { 00285 return header.token; 00286 } 00287 00288 wisp_call_role role() const { 00289 return header.role; 00290 } 00291 00292 const string & sender() const { 00293 return message.sender(); 00294 } 00295 00296 void init_header(const wisp_message_id id, 00297 const wisp_call_token token, 00298 const CallRole role) 00299 { 00300 header.init(id, token, role); 00301 } 00302 00303 void clear_header() { 00304 header.clear(); 00305 } 00306 }; 00307 00308 typedef boost::shared_ptr<MessageInfo> message_info_ptr; 00309 00310 const bool GroupMembershipEnable = true; 00311 const bool GroupMembershipDisable = false; 00312 00316 template<typename PT = BinaryPackingTraits> 00317 class Caller { 00318 template<typename ReturnType> friend class Future; 00319 00320 public: 00321 typedef PT packing_traits; 00322 typedef typename packing_traits::packer_type packer_type; 00323 typedef typename packing_traits::unpacker_type unpacker_type; 00324 00325 // Don't remove this definition. It is necessary to prevent an 00326 // unresolved MaxUnfragmentedMessageSize symbol in debug builds. 00327 #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U 00328 static const unsigned int MaxUnfragmentedMessageSize = 00329 __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE; 00330 00331 private: 00332 typedef std::deque<message_info_ptr> request_queue; 00333 00334 // For now, use hashed_unique, but for multiple responses will need 00335 // hashed_non_unique. 00336 typedef boost::multi_index_container< 00337 message_info_ptr, 00338 boost::multi_index::indexed_by< 00339 boost::multi_index::hashed_unique< 00340 boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token, 00341 &MessageInfo::token> > 00342 > > response_map; 00343 00344 typedef boost::multi_index_container< 00345 message_info_ptr, 00346 boost::multi_index::indexed_by< 00347 boost::multi_index::hashed_unique< 00348 boost::multi_index::composite_key< 00349 MessageInfo, 00350 boost::multi_index::const_mem_fun<MessageInfo, const string &, 00351 &MessageInfo::sender>, 00352 boost::multi_index::const_mem_fun<MessageInfo, wisp_call_role, 00353 &MessageInfo::role>, 00354 boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token, 00355 &MessageInfo::token> > > 00356 > > jumbo_message_map; 00357 00358 Mailbox _mbox; 00359 wisp_call_token _call_token; 00360 packer_type _packer; 00361 unpacker_type _unpacker; 00362 00364 request_queue _requests; 00365 response_map _responses; 00366 jumbo_message_map _jumbo_messages; 00367 jumbo_message_map::key_from_value _jumbo_key_from_value; 00368 00369 const unsigned int _initial_message_capacity; 00370 // scratch variable 00371 message_info_ptr _info; 00372 MessageInfo _send_info; 00373 00374 static void throwCallException(const wisp_message_protocol expected_proto, 00375 const wisp_message_protocol actual_proto, 00376 const wisp_message_id expected_id, 00377 const wisp_message_id actual_id) 00378 SSRC_DECL_THROW(CallException) 00379 { 00380 std::ostringstream message; 00381 message << "Mismatched message id or protocol.\nExpected protocol: " 00382 << expected_proto << " Actual protocol: " << actual_proto 00383 << "\nExpected id: " << expected_id 00384 << " Actual id: " << actual_id << std::endl; 00385 throw CallException(message.str()); 00386 } 00387 00388 static void throwCallException(const wisp_call_token expected_token, 00389 const wisp_call_token actual_token) 00390 SSRC_DECL_THROW(CallException) 00391 { 00392 std::ostringstream message; 00393 message << "Mismatched call token.\nExpected token: " 00394 << expected_token << " Actual token: " << actual_token 00395 << std::endl; 00396 throw CallException(message.str()); 00397 } 00398 00399 void reset_receive_info() { 00400 _info.reset(new MessageInfo(std::min(_info->message.capacity(), __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE))); 00401 } 00402 00403 void push_request() { 00404 _requests.push_back(_info); 00405 reset_receive_info(); 00406 } 00407 00408 void insert_response() { 00409 _responses.insert(_info); 00410 reset_receive_info(); 00411 } 00412 00413 enum JumboFragmentResult { 00414 JumboFragmentFirstFragment, 00415 JumboFragmentNextFragment, 00416 JumboFragmentCompleteRequest, 00417 JumboFragmentCompleteResponse 00418 }; 00419 00420 JumboFragmentResult insert_jumbo_fragment(const message_info_ptr & msginfo, 00421 const unsigned int header_size) 00422 { 00423 JumboFragmentResult result = JumboFragmentNextFragment; 00424 Message & message = msginfo->message; 00425 jumbo_message_map::iterator it = 00426 _jumbo_messages.find(_jumbo_key_from_value(msginfo)); 00427 00428 if(it == _jumbo_messages.end()) { 00429 // Pre-allocate memory for remaining fragments. 00430 message_info_ptr info(new MessageInfo(msginfo->header.message_size)); 00431 message.seek(message.size()); 00432 *info = *msginfo; 00433 _jumbo_messages.insert(info); 00434 result = JumboFragmentFirstFragment; 00435 } else { 00436 const message_info_ptr & info = *it; 00437 00438 info->message.write(&message[header_size], message.size()-header_size); 00439 00440 if(info->message.offset() >= info->header.message_size) { 00441 info->message.seek(header_size); 00442 if(info->role() == TwoWayResponse) { 00443 _responses.insert(info); 00444 result = JumboFragmentCompleteResponse; 00445 } else { 00446 _requests.push_back(info); 00447 result = JumboFragmentCompleteRequest; 00448 } 00449 _jumbo_messages.erase(it); 00450 } 00451 } 00452 return result; 00453 } 00454 00455 // Receives call responses, but not requests and membership messages 00456 template<typename MessageType> 00457 void receive_response(MessageType & msg, const wisp_call_token token) 00458 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00459 CallException) 00460 { 00461 begin_receive_response: 00462 typename response_map::iterator it(_responses.find(token)); 00463 00464 if(it == _responses.end()) { 00465 if(!_info.unique()) { 00466 reset_receive_info(); 00467 } 00468 do { 00469 Message & message = _info->message; 00470 00471 _mbox.receive(message); 00472 _mbox.copy_groups(_info->groups); 00473 00474 if(message.is_regular()) { 00475 CallHeader & header = _info->header; 00476 00477 message.rewind(); 00478 00479 const unsigned int header_size = _unpacker.unpack(header, message); 00480 00481 if(header.is_jumbo_fragment()) { 00482 const JumboFragmentResult jumbo_result = 00483 insert_jumbo_fragment(_info, header_size); 00484 if(jumbo_result == JumboFragmentCompleteResponse) { 00485 goto begin_receive_response; 00486 } 00487 } else { 00488 if(header.role == TwoWayResponse) { 00489 if(header.token == token) 00490 break; 00491 else 00492 insert_response(); 00493 } else 00494 push_request(); 00495 } 00496 } else { 00497 _info->clear_header(); 00498 push_request(); 00499 } 00500 } while(true); 00501 unpack(msg, *_info); 00502 } else { 00503 unpack(msg, *(*it)); 00504 _responses.erase(it); 00505 } 00506 } 00507 00508 // We assume integers wrap around to min at max. 00509 wisp_call_token next_token() { 00510 return _call_token++; 00511 } 00512 00513 template<typename DestinationType, typename MessageType> 00514 void send(const DestinationType & dest, 00515 const MessageType & msg, 00516 const wisp_call_token token, 00517 const CallRole role, 00518 const Message::Service service) 00519 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00520 { 00521 unsigned int message_size; 00522 00523 _send_info.init_header(msg.id, token, role); 00524 _send_info.message.rewind(); 00525 _packer.pack(_send_info.header, _send_info.message); 00526 _packer.pack(msg, _send_info.message); 00527 _send_info.message.set_type(msg.protocol); 00528 _send_info.message.set_service(service); 00529 00530 if((message_size = _send_info.message.size()) 00531 <= MaxUnfragmentedMessageSize) 00532 _mbox.send(_send_info.message, dest); 00533 else { 00534 _send_info.header.message_size = message_size; 00535 _send_info.message.rewind(); 00536 00537 const unsigned int header_size = 00538 _packer.pack(_send_info.header, _send_info.message); 00539 const unsigned int max_fragment_size = 00540 MaxUnfragmentedMessageSize - header_size; 00541 00542 // We never fragment a header, so there's no risk of 00543 // header_size equaling message_size.. 00544 for(unsigned int offset = header_size, size = 0; 00545 offset < message_size; offset+=size) 00546 { 00547 size = std::min(max_fragment_size, message_size - offset); 00548 _mbox.clear_message_parts(); 00549 _mbox.add_message_part(&_send_info.message[0], header_size); 00550 _mbox.add_message_part(&_send_info.message[offset], size); 00551 _mbox.send(dest, msg.protocol, service); 00552 } 00553 _mbox.clear_message_parts(); 00554 } 00555 } 00556 00557 public: 00558 // Using jumbo_message_map::key_type doesn't work because it's a composite 00559 // key that stores a reference, so we have to use a compatible key. 00560 typedef 00561 boost::tuple<string, wisp_call_role, wisp_call_token> jumbo_message_key_type; 00562 00563 template<typename ReturnType> 00564 class Future { 00565 friend class Caller; 00566 00567 bool _valid; 00568 wisp_call_token _token; 00569 Caller & _caller; 00570 00571 Future(Caller & caller, const wisp_call_token token) : 00572 _valid(true), _token(token), _caller(caller) 00573 { } 00574 00575 public: 00576 typedef ReturnType return_type; 00577 typedef boost::shared_ptr<Future> shared_ptr; 00578 00579 // Note, you can't sit in a while(!future.ready()) loop because 00580 // there's no receiver thread. 00581 bool ready() const { 00582 return _caller.returned(_token); 00583 } 00584 00585 bool valid() const { 00586 return _valid; 00587 } 00588 00589 wisp_call_token token() const { 00590 return _token; 00591 } 00592 00593 void receive(return_type & result) 00594 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00595 CallException) 00596 { 00597 if(!valid()) 00598 throw CallException("Invalid future."); 00599 00600 _caller.receive_response(result, _token); 00601 00602 _valid = false; 00603 } 00604 00605 // Exclusively for supporting continuations in Service 00606 void unpack(return_type & result, MessageInfo & info) 00607 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00608 CallException) 00609 { 00610 if(!valid()) 00611 throw CallException("Invalid future."); 00612 00613 if(info.token() == _token) 00614 _caller.unpack(result, info); 00615 else 00616 throwCallException(_token, info.token()); 00617 00618 _valid = false; 00619 } 00620 }; 00621 00622 explicit 00623 Caller(const string & connection = "", 00624 const string & name = "", 00625 const unsigned int message_capacity = Message::DefaultCapacity, 00626 const bool group_membership = GroupMembershipDisable) : 00627 _mbox(connection, name, group_membership), _call_token(0), 00628 _packer(), _unpacker(), _requests(), _responses(), 00629 _jumbo_messages(), 00630 _jumbo_key_from_value(_jumbo_messages.key_extractor()), 00631 _initial_message_capacity(message_capacity), 00632 _info(new MessageInfo(std::min(message_capacity, __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE))), 00633 _send_info(message_capacity) 00634 { } 00635 00636 #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 00637 00638 // Not intended to be subclassed outside of Wisp. 00639 //virtual ~Caller() { } 00640 00641 // We expose this for allowing retrieval of group information. 00642 const Mailbox & mbox() { 00643 return _mbox; 00644 } 00645 00646 packer_type & packer() { 00647 return _packer; 00648 } 00649 00650 unpacker_type & unpacker() { 00651 return _unpacker; 00652 } 00653 00654 wisp_call_token call_token() { 00655 return _call_token; 00656 } 00657 00658 const string & name() const { 00659 return _mbox.private_group(); 00660 } 00661 00662 bool group_membership() const { 00663 return _mbox.group_membership(); 00664 } 00665 00666 void join(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) { 00667 _mbox.join(group); 00668 } 00669 00670 void leave(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) { 00671 _mbox.leave(group); 00672 } 00673 00674 unsigned int initial_message_capacity() const { 00675 return _initial_message_capacity; 00676 } 00677 00678 unsigned int message_capacity() const { 00679 return _info->message.capacity(); 00680 } 00681 00682 unsigned int request_queue_size() const { 00683 return _requests.size(); 00684 } 00685 00686 unsigned int response_map_size() const { 00687 return _responses.size(); 00688 } 00689 00690 bool returned(const wisp_call_token token) const { 00691 return (_responses.find(token) != _responses.end()); 00692 } 00693 00694 unsigned int count_jumbo_messages() const { 00695 return _jumbo_messages.size(); 00696 } 00697 00698 template<typename key_container> 00699 void collect_jumbo_message_keys(key_container & container) { 00700 for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(), 00701 end = _jumbo_messages.end(); it != end; ++it) 00702 { 00703 const MessageInfo *info = it->get(); 00704 container.insert(container.end(), jumbo_message_key_type(info->sender(), info->role(), info->token())); 00705 } 00706 } 00707 00708 template<typename iterator_type> 00709 void erase_jumbo_messages(const iterator_type & begin, 00710 const iterator_type & end) 00711 { 00712 // You can't erase a compatible key, so we have to perform a lookup first 00713 for(iterator_type it = begin; it != end; ++it) { 00714 jumbo_message_map::iterator msg = _jumbo_messages.find(*it); 00715 if(msg != _jumbo_messages.end()) 00716 _jumbo_messages.erase(msg); 00717 } 00718 } 00719 00720 // Call only after unpacking CallHeader so message offset is correct. 00721 template<typename MessageType> 00722 void unpack(MessageType & msg, MessageInfo & info) 00723 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00724 CallException) 00725 { 00726 if(info.id() == MessageType::id && 00727 info.protocol() == MessageType::protocol) 00728 _unpacker.unpack(msg, info.message); 00729 else 00730 throwCallException(MessageType::protocol, info.protocol(), 00731 MessageType::id, info.id()); 00732 } 00733 00740 void receive(message_info_ptr & info) 00741 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00742 { 00743 // Drain responses first, then requests. 00744 if(!_responses.empty()) { 00745 response_map::iterator it = _responses.begin(); 00746 info = *it; 00747 _responses.erase(it); 00748 } else if(!_requests.empty()) { 00749 info = _requests.front(); 00750 _requests.pop_front(); 00751 } else { 00752 do { 00753 Message & message = _info->message; 00754 00755 _mbox.receive(message); 00756 _mbox.copy_groups(_info->groups); 00757 00758 if(message.is_regular()) { 00759 CallHeader & header = _info->header; 00760 00761 message.rewind(); 00762 00763 const unsigned int header_size = _unpacker.unpack(header, message); 00764 00765 if(header.is_jumbo_fragment()) { 00766 const JumboFragmentResult jumbo_result = 00767 insert_jumbo_fragment(_info, header_size); 00768 if(jumbo_result == JumboFragmentCompleteResponse) { 00769 response_map::iterator it = _responses.begin(); 00770 info = *it; 00771 _responses.erase(it); 00772 return; 00773 } else if(jumbo_result == JumboFragmentCompleteRequest) { 00774 info = _requests.front(); 00775 _requests.pop_front(); 00776 return; 00777 } 00778 } else 00779 break; 00780 } else { 00781 _info->clear_header(); 00782 break; 00783 } 00784 } while(true); 00785 info = _info; 00786 } 00787 } 00788 00789 // Also implement timeout. 00790 // Receives requests and membership messages, but not call responses 00791 void receive_request(message_info_ptr & info) 00792 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00793 { 00794 if(!_requests.empty()) { 00795 info = _requests.front(); 00796 _requests.pop_front(); 00797 } else { 00798 if(!_info.unique()) { 00799 reset_receive_info(); 00800 } 00801 do { 00802 Message & message = _info->message; 00803 00804 _mbox.receive(message); 00805 _mbox.copy_groups(_info->groups); 00806 00807 if(message.is_regular()) { 00808 CallHeader & header = _info->header; 00809 00810 message.rewind(); 00811 00812 const unsigned int header_size = _unpacker.unpack(header, message); 00813 00814 if(header.is_jumbo_fragment()) { 00815 const JumboFragmentResult jumbo_result = 00816 insert_jumbo_fragment(_info, header_size); 00817 if(jumbo_result == JumboFragmentCompleteRequest) { 00818 info = _requests.front(); 00819 _requests.pop_front(); 00820 return; 00821 } 00822 } else { 00823 if(header.role == TwoWayResponse) 00824 insert_response(); 00825 else 00826 break; 00827 } 00828 } else { 00829 _info->clear_header(); 00830 break; 00831 } 00832 } while(true); 00833 info = _info; 00834 } 00835 } 00836 00837 template<typename Traits, typename DestinationType> 00838 void send(const DestinationType & dest, 00839 const typename Traits::parameter_type & param, 00840 const Message::Service service = DefaultMessageServiceType) 00841 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00842 { 00843 static_assert(Traits::call_type == OneWay, "expected call_type OneWay"); 00844 send(dest, param, next_token(), OneWayRequest, service); 00845 } 00846 00847 template<typename Traits, typename DestinationType> 00848 void reply(const DestinationType & dest, 00849 const wisp_call_token token, 00850 const typename Traits::parameter_type & param, 00851 const Message::Service service = DefaultMessageServiceType) 00852 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00853 { 00854 static_assert(Traits::call_type == OneWay, "expected call_type OneWay"); 00855 send(dest, param, token, TwoWayResponse, service); 00856 } 00857 00858 // Synchronous call. 00859 template<typename Traits, typename DestinationType> 00860 void call(const DestinationType & dest, 00861 typename Traits::return_type *ret, 00862 const typename Traits::parameter_type & param, 00863 const Message::Service service = DefaultMessageServiceType) 00864 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00865 CallException) 00866 { 00867 static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay"); 00868 wisp_call_token token = next_token(); 00869 send(dest, param, token, TwoWayRequest, service); 00870 receive_response(*ret, token); 00871 } 00872 00873 // Asynchronous call. 00874 template<typename Traits, typename DestinationType> 00875 typename Future<typename Traits::return_type>::shared_ptr 00876 call(const DestinationType & dest, 00877 const typename Traits::parameter_type & param, 00878 const Message::Service service = DefaultMessageServiceType) 00879 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00880 { 00881 static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay"); 00882 typedef Future<typename Traits::return_type> future_type; 00883 wisp_call_token token = next_token(); 00884 00885 send(dest, param, token, TwoWayRequest, service); 00886 00887 return typename future_type::shared_ptr(new future_type(*this, token)); 00888 } 00889 00890 private: 00891 template<CallType call_type> struct CallTag { }; 00892 00893 template<typename Traits, typename DestinationType> 00894 typename Future<typename Traits::return_type>::shared_ptr 00895 _call(const CallTag<OneWay> &, 00896 const DestinationType & dest, 00897 const typename Traits::parameter_type & param, 00898 const Message::Service service = DefaultMessageServiceType) 00899 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00900 { 00901 typedef Future<typename Traits::return_type> future_type; 00902 send<Traits>(dest, param, service); 00903 return typename future_type::shared_ptr(); 00904 } 00905 00906 template<typename Traits, typename DestinationType> 00907 typename Future<typename Traits::return_type>::shared_ptr 00908 _call(const CallTag<TwoWay> &, 00909 const DestinationType & dest, 00910 const typename Traits::parameter_type & param, 00911 const Message::Service service = DefaultMessageServiceType) 00912 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00913 { 00914 return call<Traits>(dest, param, service); 00915 } 00916 00917 public: 00918 00919 template<typename Traits, typename DestinationType> 00920 void operator()(const Traits &, 00921 const DestinationType & dest, 00922 const wisp_call_token token, 00923 const typename Traits::parameter_type & param, 00924 const Message::Service service = 00925 DefaultMessageServiceType) 00926 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00927 { 00928 reply<Traits>(dest, token, param, service); 00929 } 00930 00931 template<typename Traits, typename DestinationType> 00932 void operator()(const Traits &, 00933 const DestinationType & dest, 00934 typename Traits::return_type *ret, 00935 const typename Traits::parameter_type & param, 00936 const Message::Service service = 00937 DefaultMessageServiceType) 00938 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 00939 CallException) 00940 { 00941 call<Traits>(dest, ret, param, service); 00942 } 00943 00944 template<typename Traits, typename DestinationType> 00945 typename Future<typename Traits::return_type>::shared_ptr 00946 operator()(const Traits &, 00947 const DestinationType & dest, 00948 const typename Traits::parameter_type & param, 00949 const Message::Service service = DefaultMessageServiceType) 00950 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00951 { 00952 return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service); 00953 } 00954 00955 00956 // Convenience functions that invoke parameter_type constructor for 00957 // you. We permit up to WISP_CALLER_MAX_PARAMETERS - 1 00958 // parameter_type arguments. Any more, and you should probably use 00959 // a different approach. 00960 00961 // Send. 00962 00963 template<typename Traits, typename DestinationType> 00964 void send(const DestinationType & dest, 00965 const Message::Service service = DefaultMessageServiceType) 00966 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00967 { 00968 send<Traits>(dest, typename Traits::parameter_type(), service); 00969 } 00970 00971 template<typename Traits, typename DestinationType, typename... P> 00972 void send(const Message::Service service, 00973 const DestinationType & dest, 00974 P && ...p) 00975 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00976 { 00977 send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...), 00978 service); 00979 } 00980 00981 template<typename Traits, typename DestinationType, typename... P> 00982 void sendp(const DestinationType & dest, P && ...p) 00983 00984 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00985 { 00986 send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...)); 00987 } 00988 00989 // Reply. 00990 00991 template<typename Traits, typename DestinationType> 00992 void reply(const DestinationType & dest, 00993 const wisp_call_token token, 00994 const Message::Service service = DefaultMessageServiceType) 00995 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 00996 { 00997 reply<Traits>(dest, token, typename Traits::parameter_type(), service); 00998 } 00999 01000 template<typename Traits, typename DestinationType> 01001 void operator()(const Traits &, 01002 const DestinationType & dest, 01003 const wisp_call_token token, 01004 const Message::Service service = 01005 DefaultMessageServiceType) 01006 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01007 { 01008 reply<Traits>(dest, token, typename Traits::parameter_type(), service); 01009 } 01010 01011 template<typename Traits, typename DestinationType, typename... P> 01012 void reply(const Message::Service service, 01013 const DestinationType & dest, 01014 const wisp_call_token token, 01015 P && ...p) 01016 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01017 { 01018 reply<Traits>(dest, token, 01019 typename Traits::parameter_type(std::forward<P>(p)...), 01020 service); 01021 } 01022 01023 template<typename Traits, typename DestinationType, typename... P> 01024 void replyp(const DestinationType & dest, 01025 const wisp_call_token token, 01026 P && ...p) 01027 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01028 { 01029 reply<Traits>(dest, token, 01030 typename Traits::parameter_type(std::forward<P>(p)...)); 01031 } 01032 01033 // Synchronous call. 01034 template<typename Traits, typename DestinationType> 01035 void call(const DestinationType & dest, 01036 typename Traits::return_type *ret, 01037 const Message::Service service = DefaultMessageServiceType) 01038 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 01039 CallException) 01040 { 01041 call<Traits>(dest, ret, typename Traits::parameter_type(), service); 01042 } 01043 01044 template<typename Traits, typename DestinationType> 01045 void operator()(const Traits &, 01046 const DestinationType & dest, 01047 typename Traits::return_type *ret, 01048 const Message::Service service = DefaultMessageServiceType) 01049 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure, 01050 CallException) 01051 { 01052 call<Traits>(dest, ret, typename Traits::parameter_type(), service); 01053 } 01054 01055 template<typename Traits, typename DestinationType, typename... P> 01056 void call(const Message::Service service, 01057 const DestinationType & dest, 01058 typename Traits::return_type *ret, 01059 P && ...p) 01060 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01061 { 01062 call<Traits>(dest, ret, 01063 typename Traits::parameter_type(std::forward<P>(p)...), 01064 service); 01065 } 01066 01067 template<typename Traits, typename DestinationType, typename... P> 01068 void callp(const DestinationType & dest, 01069 typename Traits::return_type *ret, 01070 P && ...p) 01071 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01072 { 01073 call<Traits>(dest, ret, 01074 typename Traits::parameter_type(std::forward<P>(p)...)); 01075 } 01076 01077 // Asynchronous call. 01078 template<typename Traits, typename DestinationType> 01079 typename Future<typename Traits::return_type>::shared_ptr 01080 call(const DestinationType & dest, 01081 const Message::Service service = DefaultMessageServiceType) 01082 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01083 { 01084 return call<Traits>(dest, typename Traits::parameter_type(), service); 01085 } 01086 01087 template<typename Traits, typename DestinationType, typename... P> 01088 typename Future<typename Traits::return_type>::shared_ptr 01089 call(const Message::Service service, const DestinationType & dest, 01090 P && ...p) 01091 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01092 { 01093 return call<Traits>(dest, 01094 typename Traits::parameter_type(std::forward<P>(p)...), 01095 service); 01096 } 01097 01098 template<typename Traits, typename DestinationType, typename... P> 01099 typename Future<typename Traits::return_type>::shared_ptr 01100 callp(const DestinationType & dest, P && ...p) 01101 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01102 { 01103 return call<Traits>(dest, 01104 typename Traits::parameter_type(std::forward<P>(p)...)); 01105 } 01106 01107 // Dual-purpose operator 01108 01109 template<typename Traits, typename DestinationType> 01110 typename Future<typename Traits::return_type>::shared_ptr 01111 operator()(const Traits &, 01112 const DestinationType & dest, 01113 const Message::Service service = DefaultMessageServiceType) 01114 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure) 01115 { 01116 return 01117 _call<Traits>(CallTag<Traits::call_type>(), 01118 dest, typename Traits::parameter_type(), service); 01119 } 01120 }; 01121 01122 __END_NS_SSRC_WISP_PROTOCOL 01123 01124 #endif
Copyright © 2006-2010 Savarese Software Research Corporation. All rights reserved.
Copyright © 2011 Savarese Software Research Corporation. All rights reserved