Savarese Software Research Corporation
Mailbox.cc
Go to the documentation of this file.
00001 /*
00002  *
00003  * Copyright 2006,2007 Savarese Software Research Corporation
00004  *
00005  * Licensed under the Apache License, Version 2.0 (the "License");
00006  * you may not use this file except in compliance with the License.
00007  * You may obtain a copy of the License at
00008  *
00009  *     http://www.savarese.com/software/ApacheLicense-2.0
00010  *
00011  * Unless required by applicable law or agreed to in writing, software
00012  * distributed under the License is distributed on an "AS IS" BASIS,
00013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014  * See the License for the specific language governing permissions and
00015  * limitations under the License.
00016  */
00017 
00018 #include <ssrc/spread/Mailbox.h>
00019 #include <fcntl.h>
00020 
00021 __BEGIN_NS_SSRC_SPREAD
00022 
00027 const Timeout Mailbox::ZeroTimeout(0, 0);
00028 
00029 
00057 Mailbox::Mailbox(const string & connection, const string & name,
00058                  const bool group_membership, const Timeout & timeout,
00059                  const Priority priority)
00060   SSRC_DECL_THROW(Error)
00061 {
00062   int result;
00063   Spread::group_type private_group;
00064 
00065   result =
00066     Spread::SP_connect_timeout(connection.c_str(),
00067                                (name.size() == 0 ? 0 : name.c_str()), priority,
00068                                group_membership, &_mbox, private_group,
00069                                timeout);
00070 
00071   if(result != Error::AcceptSession)
00072     throw Error(result);
00073 
00074   ::fcntl(_mbox, F_SETFD, (::fcntl(_mbox, F_GETFD) | FD_CLOEXEC));
00075 
00076   _connection = connection;
00077   _group_membership = group_membership;
00078   _private_group = private_group;
00079   _name = split_private_group(_private_group).first;
00080 
00081   _drop_receive  = false;
00082   _killed        = false;
00083 }
00084 
00094 int Mailbox::send(const ScatterMessage & message, const GroupList & groups)
00095   SSRC_DECL_THROW(Error)
00096 {
00097   int result;
00098 
00099   result =
00100     Spread::SP_multigroup_scat_multicast(_mbox, message.service(),
00101                                          groups.size(), groups.groups(),
00102                                          message.type(), message.scatter());
00103   if(result < 0)
00104     throw Error(result);
00105 
00106   return result;
00107 }
00108 
00112 int Mailbox::send(const Message & message, const GroupList & groups)
00113   SSRC_DECL_THROW(Error)
00114 {
00115   clear_message_parts();
00116   add_message_part(message);
00117   _scatter.set_type(message.type());
00118   _scatter.set_service(message.service());
00119   return send(_scatter, groups);
00120 }
00121 
00126 int Mailbox::send(const Message & message, const string & group)
00127   SSRC_DECL_THROW(Error)
00128 {
00129   clear_groups();
00130   add_group(group);
00131   clear_message_parts();
00132   add_message_part(message);
00133   _scatter.set_type(message.type());
00134   _scatter.set_service(message.service());
00135   return send(_scatter, _groups);
00136 }
00137 
00169 int Mailbox::receive(ScatterMessage & message, GroupList & groups)
00170   SSRC_DECL_THROW(BufferSizeError, Error)
00171 {
00172   int result, num_groups, endian_mismatch;
00173   BaseMessage::message_type type;
00174   BaseMessage::service_type stype;
00175   Spread::group_type sender;
00176 
00177   groups.resize(groups.capacity());
00178   message.init_pre_receive();
00179 
00180  try_again:
00181   type = 0, num_groups = 0, endian_mismatch = 0;
00182   stype = (_drop_receive ? BaseMessage::DropReceive : 0);
00183 
00184   result =
00185     Spread::SP_scat_receive(_mbox, &stype, sender, groups.size(), &num_groups,
00186                             groups.groups(), &type, &endian_mismatch,
00187                             message.scatter());
00188   if(!_drop_receive) {
00189     if(result == Error::GroupsTooShort) {
00190       if(num_groups < 0) {
00191         groups.resize(-num_groups);
00192         if(endian_mismatch == 0)
00193           goto try_again;
00194         else
00195           result = Error::BufferTooShort;
00196       } else
00197         throw BufferSizeError(Error::GroupsTooShort, -num_groups);
00198     }
00199 
00200     if(result == Error::BufferTooShort) {
00201       if(num_groups < 0)
00202         groups.resize(-num_groups);
00203 
00204       if(message.count_message_objects() > 0 && endian_mismatch < 0) {
00205         const unsigned int last_message = message.count_message_objects() - 1;
00206         Message *m = message.message(last_message);
00207         // endian_mismatch stores negative of incoming message size
00208         message.resize_message(last_message,
00209                                - endian_mismatch - message.size() + m->size());
00210         goto try_again;
00211       } else
00212         throw BufferSizeError(Error::BufferTooShort, -endian_mismatch);
00213     } else if(result < 0)
00214       throw Error(result);
00215   } else if(result < 0 && result != Error::GroupsTooShort &&
00216             result != Error::BufferTooShort)
00217     throw Error(result);
00218 
00219   message.set_type(type);
00220   message.set_service(stype);
00221   message.set_sender(sender);
00222   message.set_endian_mismatch(endian_mismatch != 0);
00223 
00224   groups.resize(num_groups);
00225   message.init_post_receive(result);
00226 
00227   return result;
00228 }
00229 
00230 __END_NS_SSRC_SPREAD

Savarese Software Research Corporation
Copyright © 2006-2011 Savarese Software Research Corporation. All rights reserved.