Mailbox.cc
Go to the documentation of this file.
00001 /* 00002 * 00003 * Copyright 2006,2007 Savarese Software Research Corporation 00004 * 00005 * Licensed under the Apache License, Version 2.0 (the "License"); 00006 * you may not use this file except in compliance with the License. 00007 * You may obtain a copy of the License at 00008 * 00009 * http://www.savarese.com/software/ApacheLicense-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00018 #include <ssrc/spread/Mailbox.h> 00019 #include <fcntl.h> 00020 00021 __BEGIN_NS_SSRC_SPREAD 00022 00027 const Timeout Mailbox::ZeroTimeout(0, 0); 00028 00029 00057 Mailbox::Mailbox(const string & connection, const string & name, 00058 const bool group_membership, const Timeout & timeout, 00059 const Priority priority) 00060 SSRC_DECL_THROW(Error) 00061 { 00062 int result; 00063 Spread::group_type private_group; 00064 00065 result = 00066 Spread::SP_connect_timeout(connection.c_str(), 00067 (name.size() == 0 ? 0 : name.c_str()), priority, 00068 group_membership, &_mbox, private_group, 00069 timeout); 00070 00071 if(result != Error::AcceptSession) 00072 throw Error(result); 00073 00074 ::fcntl(_mbox, F_SETFD, (::fcntl(_mbox, F_GETFD) | FD_CLOEXEC)); 00075 00076 _connection = connection; 00077 _group_membership = group_membership; 00078 _private_group = private_group; 00079 _name = split_private_group(_private_group).first; 00080 00081 _drop_receive = false; 00082 _killed = false; 00083 } 00084 00094 int Mailbox::send(const ScatterMessage & message, const GroupList & groups) 00095 SSRC_DECL_THROW(Error) 00096 { 00097 int result; 00098 00099 result = 00100 Spread::SP_multigroup_scat_multicast(_mbox, message.service(), 00101 groups.size(), groups.groups(), 00102 message.type(), message.scatter()); 00103 if(result < 0) 00104 throw Error(result); 00105 00106 return result; 00107 } 00108 00112 int Mailbox::send(const Message & message, const GroupList & groups) 00113 SSRC_DECL_THROW(Error) 00114 { 00115 clear_message_parts(); 00116 add_message_part(message); 00117 _scatter.set_type(message.type()); 00118 _scatter.set_service(message.service()); 00119 return send(_scatter, groups); 00120 } 00121 00126 int Mailbox::send(const Message & message, const string & group) 00127 SSRC_DECL_THROW(Error) 00128 { 00129 clear_groups(); 00130 add_group(group); 00131 clear_message_parts(); 00132 add_message_part(message); 00133 _scatter.set_type(message.type()); 00134 _scatter.set_service(message.service()); 00135 return send(_scatter, _groups); 00136 } 00137 00169 int Mailbox::receive(ScatterMessage & message, GroupList & groups) 00170 SSRC_DECL_THROW(BufferSizeError, Error) 00171 { 00172 int result, num_groups, endian_mismatch; 00173 BaseMessage::message_type type; 00174 BaseMessage::service_type stype; 00175 Spread::group_type sender; 00176 00177 groups.resize(groups.capacity()); 00178 message.init_pre_receive(); 00179 00180 try_again: 00181 type = 0, num_groups = 0, endian_mismatch = 0; 00182 stype = (_drop_receive ? BaseMessage::DropReceive : 0); 00183 00184 result = 00185 Spread::SP_scat_receive(_mbox, &stype, sender, groups.size(), &num_groups, 00186 groups.groups(), &type, &endian_mismatch, 00187 message.scatter()); 00188 if(!_drop_receive) { 00189 if(result == Error::GroupsTooShort) { 00190 if(num_groups < 0) { 00191 groups.resize(-num_groups); 00192 if(endian_mismatch == 0) 00193 goto try_again; 00194 else 00195 result = Error::BufferTooShort; 00196 } else 00197 throw BufferSizeError(Error::GroupsTooShort, -num_groups); 00198 } 00199 00200 if(result == Error::BufferTooShort) { 00201 if(num_groups < 0) 00202 groups.resize(-num_groups); 00203 00204 if(message.count_message_objects() > 0 && endian_mismatch < 0) { 00205 const unsigned int last_message = message.count_message_objects() - 1; 00206 Message *m = message.message(last_message); 00207 // endian_mismatch stores negative of incoming message size 00208 message.resize_message(last_message, 00209 - endian_mismatch - message.size() + m->size()); 00210 goto try_again; 00211 } else 00212 throw BufferSizeError(Error::BufferTooShort, -endian_mismatch); 00213 } else if(result < 0) 00214 throw Error(result); 00215 } else if(result < 0 && result != Error::GroupsTooShort && 00216 result != Error::BufferTooShort) 00217 throw Error(result); 00218 00219 message.set_type(type); 00220 message.set_service(stype); 00221 message.set_sender(sender); 00222 message.set_endian_mismatch(endian_mismatch != 0); 00223 00224 groups.resize(num_groups); 00225 message.init_post_receive(result); 00226 00227 return result; 00228 } 00229 00230 __END_NS_SSRC_SPREAD
Copyright © 2006-2011 Savarese Software Research Corporation. All rights reserved.
Copyright © 2011 Savarese Software Research Corporation. All rights reserved