Savarese Software Research Corporation
Mailbox.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2006,2007 Savarese Software Research Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.savarese.com/software/ApacheLicense-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #include <ssrc/spread/Mailbox.h>
19 #include <fcntl.h>
20 
22 
27 const Timeout Mailbox::ZeroTimeout(0, 0);
28 
29 
57 Mailbox::Mailbox(const string & connection, const string & name,
58  const bool group_membership, const Timeout & timeout,
59  const Priority priority)
60  SSRC_DECL_THROW(Error)
61 {
62  int result;
63  Spread::group_type private_group;
64 
65  result =
66  Spread::SP_connect_timeout(connection.c_str(),
67  (name.size() == 0 ? 0 : name.c_str()), priority,
68  group_membership, &_mbox, private_group,
69  timeout);
70 
71  if(result != Error::AcceptSession)
72  throw Error(result);
73 
74  ::fcntl(_mbox, F_SETFD, (::fcntl(_mbox, F_GETFD) | FD_CLOEXEC));
75 
76  _connection = connection;
77  _group_membership = group_membership;
78  _private_group = private_group;
79  _name = split_private_group(_private_group).first;
80 
81  _drop_receive = false;
82  _killed = false;
83 }
84 
94 int Mailbox::send(const ScatterMessage & message, const GroupList & groups)
95  SSRC_DECL_THROW(Error)
96 {
97  int result;
98 
99  result =
100  Spread::SP_multigroup_scat_multicast(_mbox, message.service(),
101  groups.size(), groups.groups(),
102  message.type(), message.scatter());
103  if(result < 0)
104  throw Error(result);
105 
106  return result;
107 }
108 
112 int Mailbox::send(const Message & message, const GroupList & groups)
113  SSRC_DECL_THROW(Error)
114 {
115  clear_message_parts();
116  add_message_part(message);
117  _scatter.set_type(message.type());
118  _scatter.set_service(message.service());
119  return send(_scatter, groups);
120 }
121 
126 int Mailbox::send(const Message & message, const string & group)
127  SSRC_DECL_THROW(Error)
128 {
129  clear_groups();
130  add_group(group);
131  clear_message_parts();
132  add_message_part(message);
133  _scatter.set_type(message.type());
134  _scatter.set_service(message.service());
135  return send(_scatter, _groups);
136 }
137 
169 int Mailbox::receive(ScatterMessage & message, GroupList & groups)
170  SSRC_DECL_THROW(BufferSizeError, Error)
171 {
176 
177  groups.resize(groups.capacity());
178  message.init_pre_receive();
179 
180  try_again:
181  type = 0, num_groups = 0, endian_mismatch = 0;
182  stype = (_drop_receive ? BaseMessage::DropReceive : 0);
183 
184  result =
185  Spread::SP_scat_receive(_mbox, &stype, sender, groups.size(), &num_groups,
186  groups.groups(), &type, &endian_mismatch,
187  message.scatter());
188  if(!_drop_receive) {
189  if(result == Error::GroupsTooShort) {
190  if(num_groups < 0) {
191  groups.resize(-num_groups);
192  if(endian_mismatch == 0)
193  goto try_again;
194  else
195  result = Error::BufferTooShort;
196  } else
197  throw BufferSizeError(Error::GroupsTooShort, -num_groups);
198  }
199 
200  if(result == Error::BufferTooShort) {
201  if(num_groups < 0)
202  groups.resize(-num_groups);
203 
204  if(message.count_message_objects() > 0 && endian_mismatch < 0) {
205  const unsigned int last_message = message.count_message_objects() - 1;
206  Message *m = message.message(last_message);
207  // endian_mismatch stores negative of incoming message size
208  message.resize_message(last_message,
209  - endian_mismatch - message.size() + m->size());
210  goto try_again;
211  } else
212  throw BufferSizeError(Error::BufferTooShort, -endian_mismatch);
213  } else if(result < 0)
214  throw Error(result);
215  } else if(result < 0 && result != Error::GroupsTooShort &&
216  result != Error::BufferTooShort)
217  throw Error(result);
218 
219  message.set_type(type);
220  message.set_service(stype);
221  message.set_sender(sender);
222  message.set_endian_mismatch(endian_mismatch != 0);
223 
224  groups.resize(num_groups);
225  message.init_post_receive(result);
226 
227  return result;
228 }
229 

Savarese Software Research Corporation
Copyright © 2006-2015 Savarese Software Research Corporation. All rights reserved.