Branch data Line data Source code
1 : : /*
2 : : *
3 : : * Copyright 2006,2007 Savarese Software Research Corporation
4 : : *
5 : : * Licensed under the Apache License, Version 2.0 (the "License");
6 : : * you may not use this file except in compliance with the License.
7 : : * You may obtain a copy of the License at
8 : : *
9 : : * http://www.savarese.com/software/ApacheLicense-2.0
10 : : *
11 : : * Unless required by applicable law or agreed to in writing, software
12 : : * distributed under the License is distributed on an "AS IS" BASIS,
13 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : : * See the License for the specific language governing permissions and
15 : : * limitations under the License.
16 : : */
17 : :
18 : : #include <ssrc/spread/Mailbox.h>
19 : : #include <fcntl.h>
20 : :
21 : : __BEGIN_NS_SSRC_SPREAD
22 : :
23 : : /**
24 : : * A constant denoting a timeout of zero seconds and zero
25 : : * microseconds. This value is taken to mean block indefinitely.
26 : : */
27 : 1 : const Timeout Mailbox::ZeroTimeout(0, 0);
28 : :
29 : :
30 : : /**
31 : : * Creates a Mailbox configured with the specified parameters.
32 : : * Mailbox follows the "resource acquisition is initialization"
33 : : * model. When a Mailbox is created, it establishes a connection
34 : : * with the specified %Spread daemon. When it is destroyed, it
35 : : * disconnects. You cannot reuse a Mailbox to establish multiple
36 : : * connections in succession. A new Mailbox must be created for each
37 : : * daemon connection.
38 : : *
39 : : * Mailbox::Mailbox sets the FD_CLOEXEC flag on the connection's
40 : : * file descriptor.
41 : : *
42 : : * @param connection The name of the %Spread daemon to connect to. A
43 : : * zero-length string indicates that the default daemon should be
44 : : * connected to ("4803" or "4803@localhost"; this is an undocumented
45 : : * feature of SP_connect).
46 : : * @param name The name of the session (used to create the private
47 : : * group name). A zero-length string value indicates that the Spread
48 : : * daemon should assign the session a random unique name.
49 : : * @param group_membership true if you want to receive group
50 : : * membership messages, false if not. The default is true.
51 : : * @param timeout A timeout for connecting to the daemon. A value of
52 : : * zero designates no timeout (the default).
53 : : * @param priority The priority level for establishing the connection.
54 : : * @throw Error If the connection cannot be established because of a
55 : : * timeout or some other reason.
56 : : */
57 : 20 : Mailbox::Mailbox(const string & connection, const string & name,
58 : : const bool group_membership, const Timeout & timeout,
59 [ + - + - ]: 20 : const Priority priority)
60 : : SSRC_DECL_THROW(Error)
61 : : {
62 : : int result;
63 : : Spread::group_type private_group;
64 : :
65 : : result =
66 [ - + ]: 40 : Spread::SP_connect_timeout(connection.c_str(),
67 : 20 : (name.size() == 0 ? 0 : name.c_str()), priority,
68 : : group_membership, &_mbox, private_group,
69 [ + - ]: 20 : timeout);
70 : :
71 [ - + ]: 20 : if(result != Error::AcceptSession)
72 : 0 : throw Error(result);
73 : :
74 [ + - + - ]: 20 : ::fcntl(_mbox, F_SETFD, (::fcntl(_mbox, F_GETFD) | FD_CLOEXEC));
75 : :
76 [ + - ]: 20 : _connection = connection;
77 : 20 : _group_membership = group_membership;
78 [ + - ]: 20 : _private_group = private_group;
79 [ + - ]: 20 : _name = split_private_group(_private_group).first;
80 : :
81 : 20 : _drop_receive = false;
82 : 20 : _killed = false;
83 : 20 : }
84 : :
85 : : /**
86 : : * Multicasts a message to the specified groups using the service and
87 : : * message type of the message.
88 : : *
89 : : * @param message The message to send.
90 : : * @param groups The list of groups the message should be sent to.
91 : : * @return The number of bytes sent.
92 : : * @throw Error If the operation fails.
93 : : */
94 : 11 : int Mailbox::send(const ScatterMessage & message, const GroupList & groups)
95 : : SSRC_DECL_THROW(Error)
96 : : {
97 : : int result;
98 : :
99 : : result =
100 : 33 : Spread::SP_multigroup_scat_multicast(_mbox, message.service(),
101 : 11 : groups.size(), groups.groups(),
102 : 22 : message.type(), message.scatter());
103 [ - + ]: 11 : if(result < 0)
104 : 0 : throw Error(result);
105 : :
106 : 11 : return result;
107 : : }
108 : :
109 : : /**
110 : : * See send(const ScatterMessage &, const GroupList &).
111 : : */
112 : 2 : int Mailbox::send(const Message & message, const GroupList & groups)
113 : : SSRC_DECL_THROW(Error)
114 : : {
115 : 2 : clear_message_parts();
116 : 2 : add_message_part(message);
117 : 2 : _scatter.set_type(message.type());
118 : 2 : _scatter.set_service(message.service());
119 : 2 : return send(_scatter, groups);
120 : : }
121 : :
122 : : /**
123 : : * Same as send(message, _groups) where _groups is the internal
124 : : * GroupList containing only the supplied group parameter.
125 : : */
126 : 2 : int Mailbox::send(const Message & message, const string & group)
127 : : SSRC_DECL_THROW(Error)
128 : : {
129 : 2 : clear_groups();
130 : 2 : add_group(group);
131 : 2 : clear_message_parts();
132 : 2 : add_message_part(message);
133 : 2 : _scatter.set_type(message.type());
134 : 2 : _scatter.set_service(message.service());
135 : 2 : return send(_scatter, _groups);
136 : : }
137 : :
138 : : /**
139 : : * Receives a multicast group message and records the groups the
140 : : * message was sent to. If drop_receive() is false and the message
141 : : * buffers are too short, the last Message instance in the
142 : : * ScatterMessage is resized to hold any excess data (see
143 : : * ScatterMessage::add(const Message &) for additional discussion) and
144 : : * the receive attempt is retried. Also, if drop_receive() is false
145 : : * and the group buffer is too short, the GroupList is automatically resized
146 : : * and the receive attempt retried.
147 : : *
148 : : * @param message A reference to the message that will store the
149 : : * received data.
150 : : * @param groups A reference to the Grouplist that will
151 : : * store the groups the message was sent to.
152 : : *
153 : : * @return The total numbr of bytes received. If drop_receive() is
154 : : * true, either Error::BufferTooShort or Error::GroupsTooShort is
155 : : * returned when data is dropped.
156 : : *
157 : : * @throw BufferSizeError If the %Spread C API does not provide enough
158 : : * information to retry a receive (when drop_receive() is false).
159 : : * This should not happen unless there is a bug in the %Spread API.
160 : : * Also, a BufferSizeError is thrown when no Message instances are
161 : : * provided in the ScatterMessage and there is an Error::BufferTooShort
162 : : * error. In that case, it is impossible for receive to resize the
163 : : * message and retry. In such a case, you will have to manuallly
164 : : * resize your buffers and retry based on the information provided by
165 : : * the BufferSizeError.
166 : : *
167 : : * @throw Error If the operation fails.
168 : : */
169 : 10 : int Mailbox::receive(ScatterMessage & message, GroupList & groups)
170 : : SSRC_DECL_THROW(BufferSizeError, Error)
171 : : {
172 : : int result, num_groups, endian_mismatch;
173 : : BaseMessage::message_type type;
174 : : BaseMessage::service_type stype;
175 : : Spread::group_type sender;
176 : :
177 [ + - + - ]: 10 : groups.resize(groups.capacity());
178 [ + - ]: 10 : message.init_pre_receive();
179 : :
180 : : try_again:
181 : 11 : type = 0, num_groups = 0, endian_mismatch = 0;
182 [ - + ]: 11 : stype = (_drop_receive ? BaseMessage::DropReceive : 0);
183 : :
184 : : result =
185 [ + - + - ]: 11 : Spread::SP_scat_receive(_mbox, &stype, sender, groups.size(), &num_groups,
186 : : groups.groups(), &type, &endian_mismatch,
187 [ + - ]: 11 : message.scatter());
188 [ + - ]: 11 : if(!_drop_receive) {
189 [ - + ]: 11 : if(result == Error::GroupsTooShort) {
190 [ # # ]: 0 : if(num_groups < 0) {
191 [ # # ]: 0 : groups.resize(-num_groups);
192 [ # # ]: 0 : if(endian_mismatch == 0)
193 : 0 : goto try_again;
194 : : else
195 : 0 : result = Error::BufferTooShort;
196 : : } else
197 : 0 : throw BufferSizeError(Error::GroupsTooShort, -num_groups);
198 : : }
199 : :
200 [ + + ]: 11 : if(result == Error::BufferTooShort) {
201 [ - + ]: 1 : if(num_groups < 0)
202 [ # # ]: 0 : groups.resize(-num_groups);
203 : :
204 [ + - + - : 1 : if(message.count_message_objects() > 0 && endian_mismatch < 0) {
+ - ]
205 : 1 : const unsigned int last_message = message.count_message_objects() - 1;
206 : 1 : Message *m = message.message(last_message);
207 : : // endian_mismatch stores negative of incoming message size
208 : 1 : message.resize_message(last_message,
209 [ + - + - : 2 : - endian_mismatch - message.size() + m->size());
+ - ]
210 : 1 : goto try_again;
211 : : } else
212 : 0 : throw BufferSizeError(Error::BufferTooShort, -endian_mismatch);
213 [ - + ]: 10 : } else if(result < 0)
214 : 0 : throw Error(result);
215 [ # # # # : 0 : } else if(result < 0 && result != Error::GroupsTooShort &&
# # ]
216 : : result != Error::BufferTooShort)
217 : 0 : throw Error(result);
218 : :
219 : 10 : message.set_type(type);
220 : 10 : message.set_service(stype);
221 [ + - + - ]: 10 : message.set_sender(sender);
222 : 10 : message.set_endian_mismatch(endian_mismatch != 0);
223 : :
224 [ + - ]: 10 : groups.resize(num_groups);
225 [ + - ]: 10 : message.init_post_receive(result);
226 : :
227 : 10 : return result;
228 : : }
229 : :
230 [ + - + - ]: 3 : __END_NS_SSRC_SPREAD
|