Savarese Software Research Corporation
Caller.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2006-2008 Savarese Software Research Corporation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.savarese.com/software/ApacheLicense-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00023 #ifndef __SSRC_WISP_PROTOCOL_CALLER_H
00024 #define __SSRC_WISP_PROTOCOL_CALLER_H
00025 
00026 #include <ssrc/spread.h>
00027 #include <ssrc/wisp/protocol/ServiceProtocol.h>
00028 
00029 #include <queue>
00030 #include <sstream>
00031 #include <stdexcept>
00032 
00033 #include <boost/shared_ptr.hpp>
00034 #include <boost/multi_index_container.hpp>
00035 #include <boost/multi_index/mem_fun.hpp>
00036 #include <boost/multi_index/hashed_index.hpp>
00037 #include <boost/multi_index/composite_key.hpp>
00038 
00039 __BEGIN_NS_SSRC_WISP_PROTOCOL
00040 
00041 using std::string;
00042 using NS_SSRC_SPREAD::GroupList;
00043 using NS_SSRC_SPREAD::Message;
00044 using NS_SSRC_SPREAD::Mailbox;
00045 
00050 const Message::Service DefaultMessageServiceType = Message::SafeSelfDiscard;
00051 
00060 enum CallType {
00062   OneWay,
00063 
00065   TwoWay
00066 
00067   /*, MultiWay */
00068 };
00069 
00074 namespace detail {
00076   struct VoidReturnType { };
00077 }
00078 
00087 template<typename CallerType, typename ParameterType,
00088          typename ReturnType = detail::VoidReturnType>
00089 struct CallTraits { 
00091   static const CallType call_type = TwoWay;
00092 
00094   typedef ParameterType parameter_type;
00095 
00097   typedef ReturnType return_type;
00098 
00100   typedef CallerType caller_type;
00101 
00103   typedef typename caller_type::template Future<return_type> future_type;
00104 
00106   typedef typename future_type::shared_ptr future_ptr;
00107 };
00108 
00116 template<typename CallerType, typename ParameterType>
00117 struct CallTraits<CallerType, ParameterType, detail::VoidReturnType> { 
00119   static const CallType call_type = OneWay;
00120 
00122   typedef ParameterType parameter_type;
00123 
00125   typedef detail::VoidReturnType return_type;
00126 
00128   typedef CallerType caller_type;
00129 };
00130 
00137 #define WISP_ONE_WAY_CALL(caller, method) \
00138   typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \
00139   Call ## method
00140 
00148 #define WISP_TWO_WAY_CALL(caller, method, result) \
00149   typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \
00150   Call ## method
00151 
00157 class CallException : public std::runtime_error {
00158 public:
00159   explicit CallException(const string & message) :
00160     std::runtime_error(message)
00161   { }
00162 };
00163 
00168 typedef std::uint32_t wisp_call_token;
00169 
00174 typedef std::uint8_t wisp_call_role;
00175 
00176 
00182 enum CallRole {
00184   OneWayRequest,
00185   
00187   TwoWayRequest,
00188 
00190   TwoWayResponse
00191 
00192   /*, MultiWayResponse */
00193 };
00194 
00195 
00201 struct CallHeader {
00203   wisp_message_id id;
00205   wisp_call_token token;
00207   wisp_call_role role;
00211   unsigned int message_size;
00212 
00216   CallHeader() : id(0), token(0), role(0), message_size(0) { }
00217 
00221   CallHeader(const wisp_message_id id,
00222              const wisp_call_token token,
00223              const CallRole role,
00224              const unsigned int size = 0) :
00225     id(id), token(token), role(role), message_size(size) { }
00226 
00230   void clear() {
00231     id = 0, token = 0, role = 0, message_size = 0;
00232   }
00233 
00237   void init(const wisp_message_id _id,
00238             const wisp_call_token _token,
00239             const CallRole _role,
00240             const unsigned int _size = 0)
00241   {
00242     id = _id, token = _token, role = _role, message_size = _size;
00243   }
00244 
00249   bool is_jumbo_fragment() const {
00250     return (message_size > 0);
00251   }
00252 
00253   template<class Archive>
00254   void serialize(Archive & ar, const unsigned int) {
00255     ar & id & token & role & message_size;
00256   }
00257 };
00258 
00263 struct MessageInfo {
00265   CallHeader header;
00267   Message message;
00269   GroupList groups;
00270 
00271   explicit
00272   MessageInfo(const unsigned int message_capacity = Message::DefaultCapacity) :
00273     header(), message(message_capacity), groups()
00274   { }
00275 
00276   wisp_message_protocol protocol() const {
00277     return message.type();
00278   }
00279 
00280   wisp_message_id id() const {
00281     return header.id;
00282   }
00283 
00284   wisp_call_token token() const {
00285     return header.token;
00286   }
00287 
00288   wisp_call_role role() const {
00289     return header.role;
00290   }
00291 
00292   const string & sender() const {
00293     return message.sender();
00294   }
00295 
00296   void init_header(const wisp_message_id id,
00297                    const wisp_call_token token,
00298                    const CallRole role)
00299   {
00300     header.init(id, token, role);
00301   }
00302 
00303   void clear_header() {
00304     header.clear();
00305   }
00306 };
00307 
00308 typedef boost::shared_ptr<MessageInfo> message_info_ptr;
00309 
00310 const bool GroupMembershipEnable = true;
00311 const bool GroupMembershipDisable = false;
00312 
00316 template<typename PT = BinaryPackingTraits> 
00317 class Caller {
00318   template<typename ReturnType> friend class Future;
00319 
00320 public:
00321   typedef PT packing_traits;
00322   typedef typename packing_traits::packer_type packer_type;
00323   typedef typename packing_traits::unpacker_type unpacker_type;
00324 
00325   // Don't remove this definition.  It is necessary to prevent an
00326   // unresolved MaxUnfragmentedMessageSize symbol in debug builds.
00327 #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U
00328   static const unsigned int MaxUnfragmentedMessageSize =
00329     __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE;
00330 
00331 private:
00332   typedef std::deque<message_info_ptr> request_queue;
00333 
00334   // For now, use hashed_unique, but for multiple responses will need
00335   // hashed_non_unique.
00336   typedef boost::multi_index_container<
00337     message_info_ptr,
00338     boost::multi_index::indexed_by<
00339       boost::multi_index::hashed_unique<
00340         boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
00341                                           &MessageInfo::token> >
00342       > > response_map;
00343 
00344   typedef boost::multi_index_container<
00345     message_info_ptr,
00346     boost::multi_index::indexed_by<
00347       boost::multi_index::hashed_unique<
00348         boost::multi_index::composite_key<
00349           MessageInfo,
00350           boost::multi_index::const_mem_fun<MessageInfo, const string &,
00351                                             &MessageInfo::sender>,
00352           boost::multi_index::const_mem_fun<MessageInfo, wisp_call_role,
00353                                             &MessageInfo::role>,
00354           boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
00355                                             &MessageInfo::token> > >
00356           > > jumbo_message_map;
00357 
00358   Mailbox _mbox;
00359   wisp_call_token _call_token;
00360   packer_type _packer;
00361   unpacker_type _unpacker;
00362 
00364   request_queue _requests;
00365   response_map _responses;
00366   jumbo_message_map _jumbo_messages;
00367   jumbo_message_map::key_from_value _jumbo_key_from_value;
00368   
00369   const unsigned int _initial_message_capacity;
00370   // scratch variable
00371   message_info_ptr _info;
00372   MessageInfo _send_info;
00373 
00374   static void throwCallException(const wisp_message_protocol expected_proto,
00375                                  const wisp_message_protocol actual_proto,
00376                                  const wisp_message_id expected_id,
00377                                  const wisp_message_id actual_id)
00378     SSRC_DECL_THROW(CallException)
00379   {
00380     std::ostringstream message;
00381     message << "Mismatched message id or protocol.\nExpected protocol: "
00382             << expected_proto << " Actual protocol: " << actual_proto
00383             << "\nExpected id: " << expected_id
00384             << " Actual id: " << actual_id << std::endl;
00385     throw CallException(message.str());
00386   }
00387 
00388   static void throwCallException(const wisp_call_token expected_token,
00389                                  const wisp_call_token actual_token)
00390     SSRC_DECL_THROW(CallException)
00391   {
00392     std::ostringstream message;
00393     message << "Mismatched call token.\nExpected token: "
00394             << expected_token << " Actual token: " << actual_token
00395             << std::endl;
00396     throw CallException(message.str());
00397   }
00398 
00399   void reset_receive_info() {
00400     _info.reset(new MessageInfo(std::min(_info->message.capacity(), __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE)));
00401   }
00402 
00403   void push_request() {
00404     _requests.push_back(_info);
00405     reset_receive_info();
00406   }
00407 
00408   void insert_response() {
00409     _responses.insert(_info);
00410     reset_receive_info();
00411   }
00412 
00413   enum JumboFragmentResult {
00414     JumboFragmentFirstFragment,
00415     JumboFragmentNextFragment,
00416     JumboFragmentCompleteRequest,
00417     JumboFragmentCompleteResponse
00418   };
00419 
00420   JumboFragmentResult insert_jumbo_fragment(const message_info_ptr & msginfo,
00421                                             const unsigned int header_size)
00422   {
00423     JumboFragmentResult result = JumboFragmentNextFragment;
00424     Message & message = msginfo->message;
00425     jumbo_message_map::iterator it =
00426       _jumbo_messages.find(_jumbo_key_from_value(msginfo));
00427 
00428     if(it == _jumbo_messages.end()) {
00429       // Pre-allocate memory for remaining fragments.
00430       message_info_ptr info(new MessageInfo(msginfo->header.message_size));
00431       message.seek(message.size());
00432       *info = *msginfo;
00433       _jumbo_messages.insert(info);
00434       result = JumboFragmentFirstFragment;
00435     } else {
00436       const message_info_ptr & info = *it;
00437 
00438       info->message.write(&message[header_size], message.size()-header_size);
00439 
00440       if(info->message.offset() >= info->header.message_size) {
00441         info->message.seek(header_size);
00442         if(info->role() == TwoWayResponse) {
00443           _responses.insert(info);
00444           result = JumboFragmentCompleteResponse;
00445         } else {
00446           _requests.push_back(info);
00447           result = JumboFragmentCompleteRequest;
00448         }
00449         _jumbo_messages.erase(it);
00450       }
00451     }
00452     return result;
00453   }
00454 
00455   // Receives call responses, but not requests and membership messages
00456   template<typename MessageType>
00457   void receive_response(MessageType & msg, const wisp_call_token token)
00458     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00459                     CallException)
00460   {
00461   begin_receive_response:
00462     typename response_map::iterator it(_responses.find(token));
00463 
00464     if(it == _responses.end()) {
00465       if(!_info.unique()) {
00466         reset_receive_info();
00467       }
00468       do {
00469         Message & message = _info->message;
00470 
00471         _mbox.receive(message);
00472         _mbox.copy_groups(_info->groups);
00473 
00474         if(message.is_regular()) {
00475           CallHeader & header = _info->header;
00476 
00477           message.rewind();
00478 
00479           const unsigned int header_size = _unpacker.unpack(header, message);
00480 
00481           if(header.is_jumbo_fragment()) {
00482             const JumboFragmentResult jumbo_result =
00483               insert_jumbo_fragment(_info, header_size);
00484             if(jumbo_result == JumboFragmentCompleteResponse) {
00485               goto begin_receive_response;
00486             }
00487           } else {
00488             if(header.role == TwoWayResponse) {
00489               if(header.token == token)
00490                 break;
00491               else
00492                 insert_response();
00493             } else
00494               push_request();
00495           }
00496         } else {
00497           _info->clear_header();
00498           push_request();
00499         }
00500       } while(true);
00501       unpack(msg, *_info);
00502     } else {
00503       unpack(msg, *(*it));
00504       _responses.erase(it);
00505     }
00506   }
00507 
00508   // We assume integers wrap around to min at max.
00509   wisp_call_token next_token() {
00510     return _call_token++;
00511   }
00512 
00513   template<typename DestinationType, typename MessageType>
00514   void send(const DestinationType & dest,
00515             const MessageType & msg,
00516             const wisp_call_token token,
00517             const CallRole role,
00518             const Message::Service service)
00519     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00520   {
00521     unsigned int message_size;
00522 
00523     _send_info.init_header(msg.id, token, role);
00524     _send_info.message.rewind();
00525     _packer.pack(_send_info.header, _send_info.message);
00526     _packer.pack(msg, _send_info.message);
00527     _send_info.message.set_type(msg.protocol);
00528     _send_info.message.set_service(service);
00529 
00530     if((message_size = _send_info.message.size())
00531        <= MaxUnfragmentedMessageSize)
00532       _mbox.send(_send_info.message, dest);
00533     else {
00534       _send_info.header.message_size = message_size;
00535       _send_info.message.rewind();
00536 
00537       const unsigned int header_size =
00538         _packer.pack(_send_info.header, _send_info.message);
00539       const unsigned int max_fragment_size =
00540         MaxUnfragmentedMessageSize - header_size;
00541 
00542       // We never fragment a header, so there's no risk of
00543       // header_size equaling message_size..
00544       for(unsigned int offset = header_size, size = 0;
00545           offset < message_size; offset+=size)
00546       {
00547         size = std::min(max_fragment_size, message_size - offset);
00548         _mbox.clear_message_parts();
00549         _mbox.add_message_part(&_send_info.message[0], header_size);
00550         _mbox.add_message_part(&_send_info.message[offset], size);
00551         _mbox.send(dest, msg.protocol, service);
00552       }
00553       _mbox.clear_message_parts();
00554     }
00555   }
00556 
00557 public:
00558   // Using jumbo_message_map::key_type doesn't work because it's a composite
00559   // key that stores a reference, so we have to use a compatible key.
00560   typedef
00561   boost::tuple<string, wisp_call_role, wisp_call_token> jumbo_message_key_type;
00562 
00563   template<typename ReturnType>
00564   class Future {
00565     friend class Caller;
00566 
00567     bool _valid;
00568     wisp_call_token _token;
00569     Caller & _caller;
00570 
00571     Future(Caller & caller, const wisp_call_token token) :
00572       _valid(true), _token(token), _caller(caller)
00573     { }
00574 
00575   public:
00576     typedef ReturnType return_type;
00577     typedef boost::shared_ptr<Future> shared_ptr;
00578 
00579     // Note, you can't sit in a while(!future.ready()) loop because
00580     // there's no receiver thread.
00581     bool ready() const {
00582       return _caller.returned(_token);
00583     }
00584 
00585     bool valid() const {
00586       return _valid;
00587     }
00588 
00589     wisp_call_token token() const {
00590       return _token;
00591     }
00592 
00593     void receive(return_type & result)
00594       SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00595                       CallException)
00596     {
00597       if(!valid())
00598         throw CallException("Invalid future.");
00599 
00600       _caller.receive_response(result, _token);
00601 
00602       _valid = false;
00603     }
00604 
00605     // Exclusively for supporting continuations in Service
00606     void unpack(return_type & result, MessageInfo & info)
00607       SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00608                       CallException)
00609     {
00610       if(!valid())
00611         throw CallException("Invalid future.");
00612 
00613       if(info.token() == _token)
00614         _caller.unpack(result, info);
00615       else
00616         throwCallException(_token, info.token());
00617 
00618       _valid = false;
00619     }
00620   };
00621 
00622   explicit
00623   Caller(const string & connection = "",
00624          const string & name = "", 
00625          const unsigned int message_capacity = Message::DefaultCapacity,
00626          const bool group_membership = GroupMembershipDisable) :
00627     _mbox(connection, name, group_membership), _call_token(0),
00628     _packer(), _unpacker(), _requests(), _responses(),
00629     _jumbo_messages(),
00630     _jumbo_key_from_value(_jumbo_messages.key_extractor()),
00631     _initial_message_capacity(message_capacity),
00632     _info(new MessageInfo(std::min(message_capacity, __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE))),
00633     _send_info(message_capacity)
00634   { }
00635 
00636 #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE
00637 
00638   // Not intended to be subclassed outside of Wisp.
00639   //virtual ~Caller() { }
00640 
00641   // We expose this for allowing retrieval of group information.
00642   const Mailbox & mbox() {
00643     return _mbox;
00644   }
00645 
00646   packer_type & packer() {
00647     return _packer;
00648   }
00649 
00650   unpacker_type & unpacker() {
00651     return _unpacker;
00652   }
00653 
00654   wisp_call_token call_token() {
00655     return _call_token;
00656   }
00657 
00658   const string & name() const {
00659     return _mbox.private_group();
00660   }
00661 
00662   bool group_membership() const {
00663     return _mbox.group_membership();
00664   }
00665 
00666   void join(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
00667     _mbox.join(group);
00668   }
00669 
00670   void leave(const string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
00671     _mbox.leave(group);
00672   }
00673 
00674   unsigned int initial_message_capacity() const {
00675     return _initial_message_capacity;
00676   }
00677 
00678   unsigned int message_capacity() const {
00679     return _info->message.capacity();
00680   }
00681 
00682   unsigned int request_queue_size() const {
00683     return _requests.size();
00684   }
00685 
00686   unsigned int response_map_size() const {
00687     return _responses.size();
00688   }
00689 
00690   bool returned(const wisp_call_token token) const {
00691     return (_responses.find(token) != _responses.end());
00692   }
00693 
00694   unsigned int count_jumbo_messages() const {
00695     return _jumbo_messages.size();
00696   }
00697 
00698   template<typename key_container>
00699   void collect_jumbo_message_keys(key_container & container) {
00700     for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(),
00701           end = _jumbo_messages.end(); it != end; ++it)
00702     {
00703       const MessageInfo *info = it->get();
00704       container.insert(container.end(), jumbo_message_key_type(info->sender(), info->role(), info->token()));
00705     }
00706   }
00707 
00708   template<typename iterator_type>
00709   void erase_jumbo_messages(const iterator_type & begin,
00710                             const iterator_type & end)
00711   {
00712     // You can't erase a compatible key, so we have to perform a lookup first
00713     for(iterator_type it = begin; it != end; ++it) {
00714       jumbo_message_map::iterator msg = _jumbo_messages.find(*it);
00715       if(msg != _jumbo_messages.end())
00716         _jumbo_messages.erase(msg);
00717     }
00718   }
00719 
00720   // Call only after unpacking CallHeader so message offset is correct.
00721   template<typename MessageType>
00722   void unpack(MessageType & msg, MessageInfo & info)
00723     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00724                     CallException)
00725   {
00726     if(info.id() == MessageType::id &&
00727        info.protocol() == MessageType::protocol)
00728       _unpacker.unpack(msg, info.message);
00729     else
00730       throwCallException(MessageType::protocol, info.protocol(),
00731                          MessageType::id, info.id());
00732   }
00733 
00740   void receive(message_info_ptr & info)
00741     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00742   {
00743     // Drain responses first, then requests.
00744     if(!_responses.empty()) {
00745       response_map::iterator it = _responses.begin();
00746       info = *it;
00747       _responses.erase(it);
00748     } else if(!_requests.empty()) {
00749       info = _requests.front();
00750       _requests.pop_front();
00751     } else {
00752       do {
00753         Message & message = _info->message;
00754 
00755         _mbox.receive(message);
00756         _mbox.copy_groups(_info->groups);
00757 
00758         if(message.is_regular()) {
00759           CallHeader & header = _info->header;
00760 
00761           message.rewind();
00762 
00763           const unsigned int header_size = _unpacker.unpack(header, message);
00764 
00765           if(header.is_jumbo_fragment()) {
00766             const JumboFragmentResult jumbo_result =
00767               insert_jumbo_fragment(_info, header_size);
00768             if(jumbo_result == JumboFragmentCompleteResponse) {
00769               response_map::iterator it = _responses.begin();
00770               info = *it;
00771               _responses.erase(it);
00772               return;
00773             } else if(jumbo_result == JumboFragmentCompleteRequest) {
00774               info = _requests.front();
00775               _requests.pop_front();
00776               return;
00777             }
00778           } else
00779             break;
00780         } else {
00781           _info->clear_header();
00782           break;
00783         }
00784       } while(true);
00785       info = _info;
00786     }
00787   }
00788 
00789   // Also implement timeout.
00790   // Receives requests and membership messages, but not call responses
00791   void receive_request(message_info_ptr & info)
00792     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00793   {
00794     if(!_requests.empty()) {
00795       info = _requests.front();
00796       _requests.pop_front();
00797     } else {
00798       if(!_info.unique()) {
00799         reset_receive_info();
00800       }
00801       do {
00802         Message & message = _info->message;
00803 
00804         _mbox.receive(message);
00805         _mbox.copy_groups(_info->groups);
00806 
00807         if(message.is_regular()) {
00808           CallHeader & header = _info->header;
00809 
00810           message.rewind();
00811 
00812           const unsigned int header_size = _unpacker.unpack(header, message);
00813 
00814           if(header.is_jumbo_fragment()) {
00815             const JumboFragmentResult jumbo_result =
00816               insert_jumbo_fragment(_info, header_size);
00817             if(jumbo_result == JumboFragmentCompleteRequest) {
00818               info = _requests.front();
00819               _requests.pop_front();
00820               return;
00821             }
00822           } else {
00823             if(header.role == TwoWayResponse)
00824               insert_response();
00825             else
00826               break;
00827           }
00828         } else {
00829           _info->clear_header();
00830           break;
00831         }
00832       } while(true);
00833       info = _info;
00834     }
00835   }
00836 
00837   template<typename Traits, typename DestinationType>
00838   void send(const DestinationType & dest,
00839             const typename Traits::parameter_type & param,
00840             const Message::Service service = DefaultMessageServiceType)
00841     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00842   {
00843     static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
00844     send(dest, param, next_token(), OneWayRequest, service);
00845   }
00846 
00847   template<typename Traits, typename DestinationType>
00848   void reply(const DestinationType & dest,
00849              const wisp_call_token token,
00850              const typename Traits::parameter_type & param,
00851              const Message::Service service = DefaultMessageServiceType)
00852     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00853   {
00854     static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
00855     send(dest, param, token, TwoWayResponse, service);
00856   }
00857 
00858   // Synchronous call.
00859   template<typename Traits, typename DestinationType>
00860   void call(const DestinationType & dest,
00861             typename Traits::return_type *ret,
00862             const typename Traits::parameter_type & param,
00863             const Message::Service service = DefaultMessageServiceType)
00864     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00865                     CallException)
00866   {
00867     static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
00868     wisp_call_token token = next_token();
00869     send(dest, param, token, TwoWayRequest, service);
00870     receive_response(*ret, token);
00871   }
00872 
00873   // Asynchronous call.
00874   template<typename Traits, typename DestinationType>
00875   typename Future<typename Traits::return_type>::shared_ptr
00876   call(const DestinationType & dest,
00877        const typename Traits::parameter_type & param,
00878        const Message::Service service = DefaultMessageServiceType)
00879     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00880   {
00881     static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
00882     typedef Future<typename Traits::return_type> future_type;
00883     wisp_call_token token = next_token();
00884 
00885     send(dest, param, token, TwoWayRequest, service);
00886 
00887     return typename future_type::shared_ptr(new future_type(*this, token));
00888   }
00889 
00890 private:
00891   template<CallType call_type> struct CallTag { };
00892 
00893   template<typename Traits, typename DestinationType>
00894   typename Future<typename Traits::return_type>::shared_ptr
00895   _call(const CallTag<OneWay> &,
00896         const DestinationType & dest,
00897         const typename Traits::parameter_type & param,
00898         const Message::Service service = DefaultMessageServiceType)
00899     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00900   {
00901     typedef Future<typename Traits::return_type> future_type;
00902     send<Traits>(dest, param, service);
00903     return typename future_type::shared_ptr();
00904   }
00905 
00906   template<typename Traits, typename DestinationType>
00907   typename Future<typename Traits::return_type>::shared_ptr
00908   _call(const CallTag<TwoWay> &,
00909         const DestinationType & dest,
00910         const typename Traits::parameter_type & param,
00911         const Message::Service service = DefaultMessageServiceType)
00912     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00913   {
00914     return call<Traits>(dest, param, service);
00915   }
00916 
00917 public:
00918 
00919   template<typename Traits, typename DestinationType>
00920   void operator()(const Traits &,
00921                   const DestinationType & dest,
00922                   const wisp_call_token token,
00923                   const typename Traits::parameter_type & param,
00924                   const Message::Service service = 
00925                   DefaultMessageServiceType)
00926     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00927   {
00928     reply<Traits>(dest, token, param, service);
00929   }
00930 
00931   template<typename Traits, typename DestinationType>
00932   void operator()(const Traits &,
00933                   const DestinationType & dest,
00934                   typename Traits::return_type *ret,
00935                   const typename Traits::parameter_type & param,
00936                   const Message::Service service =
00937                   DefaultMessageServiceType)
00938     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
00939                     CallException)
00940   {
00941     call<Traits>(dest, ret, param, service);
00942   }
00943 
00944   template<typename Traits, typename DestinationType>
00945   typename Future<typename Traits::return_type>::shared_ptr
00946   operator()(const Traits &,
00947              const DestinationType & dest,
00948              const typename Traits::parameter_type & param,
00949              const Message::Service service = DefaultMessageServiceType)
00950     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00951   {
00952     return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service);
00953   }
00954 
00955 
00956   // Convenience functions that invoke parameter_type constructor for
00957   // you.  We permit up to WISP_CALLER_MAX_PARAMETERS - 1
00958   // parameter_type arguments.  Any more, and you should probably use
00959   // a different approach.
00960 
00961   // Send.
00962 
00963   template<typename Traits, typename DestinationType>
00964   void send(const DestinationType & dest,
00965             const Message::Service service = DefaultMessageServiceType)
00966     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00967   {
00968     send<Traits>(dest, typename Traits::parameter_type(), service);
00969   }
00970 
00971   template<typename Traits, typename DestinationType, typename... P>
00972   void send(const Message::Service service,
00973             const DestinationType & dest,
00974             P && ...p)
00975     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00976   {
00977     send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...),
00978                  service);
00979   }
00980 
00981   template<typename Traits, typename DestinationType, typename... P>
00982   void sendp(const DestinationType & dest, P && ...p)
00983 
00984     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00985   {
00986     send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...));
00987   }
00988 
00989   // Reply.
00990 
00991   template<typename Traits, typename DestinationType>
00992   void reply(const DestinationType & dest,
00993              const wisp_call_token token,
00994              const Message::Service service = DefaultMessageServiceType)
00995     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
00996   {
00997     reply<Traits>(dest, token, typename Traits::parameter_type(), service);
00998   }
00999 
01000   template<typename Traits, typename DestinationType>
01001   void operator()(const Traits &,
01002                   const DestinationType & dest,
01003                   const wisp_call_token token,
01004                   const Message::Service service = 
01005                   DefaultMessageServiceType)
01006     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01007   {
01008     reply<Traits>(dest, token, typename Traits::parameter_type(), service);
01009   }
01010 
01011   template<typename Traits, typename DestinationType, typename... P>
01012   void reply(const Message::Service service,
01013              const DestinationType & dest,
01014              const wisp_call_token token,
01015              P && ...p)
01016     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01017   {
01018     reply<Traits>(dest, token,
01019                   typename Traits::parameter_type(std::forward<P>(p)...),
01020                   service);
01021   }
01022 
01023   template<typename Traits, typename DestinationType, typename... P>
01024   void replyp(const DestinationType & dest,
01025               const wisp_call_token token,
01026               P && ...p)
01027     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01028   {
01029     reply<Traits>(dest, token,
01030                   typename Traits::parameter_type(std::forward<P>(p)...));
01031   }
01032 
01033   // Synchronous call.
01034   template<typename Traits, typename DestinationType>
01035   void call(const DestinationType & dest,
01036             typename Traits::return_type *ret,
01037             const Message::Service service = DefaultMessageServiceType)
01038     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
01039                     CallException)
01040   {
01041     call<Traits>(dest, ret, typename Traits::parameter_type(), service);
01042   }
01043 
01044   template<typename Traits, typename DestinationType>
01045   void operator()(const Traits &,
01046                   const DestinationType & dest,
01047                   typename Traits::return_type *ret,
01048                   const Message::Service service = DefaultMessageServiceType)
01049     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
01050                     CallException)
01051   {
01052     call<Traits>(dest, ret, typename Traits::parameter_type(), service);
01053   }
01054 
01055   template<typename Traits, typename DestinationType, typename... P>
01056   void call(const Message::Service service,
01057             const DestinationType & dest,
01058             typename Traits::return_type *ret,
01059             P && ...p)
01060     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01061   {
01062     call<Traits>(dest, ret,
01063                  typename Traits::parameter_type(std::forward<P>(p)...),
01064                  service);
01065   }
01066 
01067   template<typename Traits, typename DestinationType, typename... P>
01068   void callp(const DestinationType & dest,
01069              typename Traits::return_type *ret,
01070              P && ...p)
01071     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01072   {
01073     call<Traits>(dest, ret,
01074                  typename Traits::parameter_type(std::forward<P>(p)...));
01075   }
01076 
01077   // Asynchronous call.
01078   template<typename Traits, typename DestinationType>
01079   typename Future<typename Traits::return_type>::shared_ptr
01080   call(const DestinationType & dest,
01081        const Message::Service service = DefaultMessageServiceType)
01082     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01083   {
01084     return call<Traits>(dest, typename Traits::parameter_type(), service);
01085   }
01086 
01087   template<typename Traits, typename DestinationType, typename... P>
01088   typename Future<typename Traits::return_type>::shared_ptr
01089   call(const Message::Service service, const DestinationType & dest,
01090        P && ...p)
01091     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01092   {
01093     return call<Traits>(dest,
01094                         typename Traits::parameter_type(std::forward<P>(p)...),
01095                         service);
01096   }
01097 
01098   template<typename Traits, typename DestinationType, typename... P>
01099   typename Future<typename Traits::return_type>::shared_ptr
01100   callp(const DestinationType & dest, P && ...p)
01101     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01102   {
01103     return call<Traits>(dest,
01104                         typename Traits::parameter_type(std::forward<P>(p)...));
01105   }
01106 
01107   // Dual-purpose operator
01108 
01109   template<typename Traits, typename DestinationType>
01110   typename Future<typename Traits::return_type>::shared_ptr
01111   operator()(const Traits &,
01112              const DestinationType & dest,
01113              const Message::Service service = DefaultMessageServiceType)
01114     SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
01115   {
01116     return
01117       _call<Traits>(CallTag<Traits::call_type>(),
01118                     dest, typename Traits::parameter_type(), service);
01119   }
01120 };
01121 
01122 __END_NS_SSRC_WISP_PROTOCOL
01123 
01124 #endif

Savarese Software Research Corporation
Copyright © 2006-2010 Savarese Software Research Corporation. All rights reserved.