1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
#include "cvmfs_config.h" |
5 |
|
|
#include "cache_transport.h" |
6 |
|
|
|
7 |
|
|
#include <alloca.h> |
8 |
|
|
#include <errno.h> |
9 |
|
|
#include <sys/socket.h> |
10 |
|
|
|
11 |
|
|
#include <cassert> |
12 |
|
|
#include <cstdlib> |
13 |
|
|
#include <cstring> |
14 |
|
|
|
15 |
|
|
#include "hash.h" |
16 |
|
|
#include "logging.h" |
17 |
|
|
#include "smalloc.h" |
18 |
|
|
#include "util/posix.h" |
19 |
|
|
|
20 |
|
|
// TODO(jblomer): Check for possible starvation of plugin by dying clients |
21 |
|
|
// (blocking read). Probably only relevant for TCP sockets. |
22 |
|
|
|
23 |
|
|
using namespace std; // NOLINT |
24 |
|
|
|
25 |
|
|
const char *CacheTransport::kEnvReadyNotifyFd = |
26 |
|
|
"__CVMFS_CACHE_EXTERNAL_PIPE_READY__"; |
27 |
|
|
|
28 |
|
|
/** |
29 |
|
|
* Called on the sender side to wrap a message into a MsgRpc message for wire |
30 |
|
|
* transfer. |
31 |
|
|
*/ |
32 |
|
13622 |
cvmfs::MsgRpc *CacheTransport::Frame::GetMsgRpc() { |
33 |
✗✓ |
13622 |
assert(msg_typed_ != NULL); |
34 |
✓✗ |
13622 |
if (!is_wrapped_) |
35 |
|
13622 |
WrapMsg(); |
36 |
|
13622 |
return &msg_rpc_; |
37 |
|
|
} |
38 |
|
|
|
39 |
|
|
|
40 |
|
|
/** |
41 |
|
|
* Called on the receiving end of an RPC to extract the actual message from the |
42 |
|
|
* MsgRpc. |
43 |
|
|
*/ |
44 |
|
15621 |
google::protobuf::MessageLite *CacheTransport::Frame::GetMsgTyped() { |
45 |
✗✓ |
15621 |
assert(msg_rpc_.IsInitialized()); |
46 |
✓✓ |
15619 |
if (msg_typed_ == NULL) |
47 |
|
8541 |
UnwrapMsg(); |
48 |
|
15619 |
return msg_typed_; |
49 |
|
|
} |
50 |
|
|
|
51 |
|
|
|
52 |
|
15621 |
CacheTransport::Frame::Frame() |
53 |
|
|
: owns_msg_typed_(false) |
54 |
|
|
, msg_typed_(NULL) |
55 |
|
|
, attachment_(NULL) |
56 |
|
|
, att_size_(0) |
57 |
|
|
, is_wrapped_(false) |
58 |
|
15621 |
, is_msg_out_of_band_(false) |
59 |
|
15621 |
{ } |
60 |
|
|
|
61 |
|
|
|
62 |
|
13621 |
CacheTransport::Frame::Frame(google::protobuf::MessageLite *m) |
63 |
|
|
: owns_msg_typed_(false) |
64 |
|
|
, msg_typed_(m) |
65 |
|
|
, attachment_(NULL) |
66 |
|
|
, att_size_(0) |
67 |
|
|
, is_wrapped_(false) |
68 |
|
13621 |
, is_msg_out_of_band_(false) |
69 |
|
13621 |
{ } |
70 |
|
|
|
71 |
|
|
|
72 |
|
29244 |
CacheTransport::Frame::~Frame() { |
73 |
|
29244 |
Reset(0); |
74 |
|
|
} |
75 |
|
|
|
76 |
|
|
|
77 |
|
7078 |
bool CacheTransport::Frame::IsMsgOutOfBand() { |
78 |
✗✓ |
7078 |
assert(msg_rpc_.IsInitialized()); |
79 |
✓✗ |
7078 |
if (msg_typed_ == NULL) |
80 |
|
7078 |
UnwrapMsg(); |
81 |
|
7078 |
return is_msg_out_of_band_; |
82 |
|
|
} |
83 |
|
|
|
84 |
|
|
|
85 |
|
2028 |
void CacheTransport::Frame::MergeFrom(const Frame &other) { |
86 |
|
2028 |
msg_rpc_.CheckTypeAndMergeFrom(other.msg_rpc_); |
87 |
|
2028 |
owns_msg_typed_ = true; |
88 |
✓✓ |
2028 |
if (other.att_size_ > 0) { |
89 |
✗✓ |
1800 |
assert(att_size_ >= other.att_size_); |
90 |
|
1800 |
memcpy(attachment_, other.attachment_, other.att_size_); |
91 |
|
1800 |
att_size_ = other.att_size_; |
92 |
|
|
} |
93 |
|
2028 |
} |
94 |
|
|
|
95 |
|
|
|
96 |
|
13591 |
bool CacheTransport::Frame::ParseMsgRpc(void *buffer, uint32_t size) { |
97 |
|
13591 |
bool retval = msg_rpc_.ParseFromArray(buffer, size); |
98 |
✗✓ |
13581 |
if (!retval) |
99 |
|
|
return false; |
100 |
|
|
|
101 |
|
|
// Cleanup typed message when Frame leaves scope |
102 |
|
13581 |
owns_msg_typed_ = true; |
103 |
|
13581 |
return true; |
104 |
|
|
} |
105 |
|
|
|
106 |
|
|
|
107 |
|
29245 |
void CacheTransport::Frame::Release() { |
108 |
✓✓ |
29245 |
if (owns_msg_typed_) |
109 |
|
15622 |
return; |
110 |
|
|
|
111 |
|
13623 |
msg_rpc_.release_msg_refcount_req(); |
112 |
|
13623 |
msg_rpc_.release_msg_refcount_reply(); |
113 |
|
13623 |
msg_rpc_.release_msg_read_req(); |
114 |
|
13623 |
msg_rpc_.release_msg_read_reply(); |
115 |
|
13623 |
msg_rpc_.release_msg_object_info_req(); |
116 |
|
13623 |
msg_rpc_.release_msg_object_info_reply(); |
117 |
|
13623 |
msg_rpc_.release_msg_store_req(); |
118 |
|
13623 |
msg_rpc_.release_msg_store_abort_req(); |
119 |
|
13623 |
msg_rpc_.release_msg_store_reply(); |
120 |
|
13623 |
msg_rpc_.release_msg_handshake(); |
121 |
|
13623 |
msg_rpc_.release_msg_handshake_ack(); |
122 |
|
13623 |
msg_rpc_.release_msg_quit(); |
123 |
|
13623 |
msg_rpc_.release_msg_ioctl(); |
124 |
|
13623 |
msg_rpc_.release_msg_info_req(); |
125 |
|
13623 |
msg_rpc_.release_msg_info_reply(); |
126 |
|
13623 |
msg_rpc_.release_msg_shrink_req(); |
127 |
|
13623 |
msg_rpc_.release_msg_shrink_reply(); |
128 |
|
13623 |
msg_rpc_.release_msg_list_req(); |
129 |
|
13623 |
msg_rpc_.release_msg_list_reply(); |
130 |
|
13623 |
msg_rpc_.release_msg_detach(); |
131 |
|
|
} |
132 |
|
|
|
133 |
|
|
|
134 |
|
29245 |
void CacheTransport::Frame::Reset(uint32_t original_att_size) { |
135 |
|
29245 |
msg_typed_ = NULL; |
136 |
|
29245 |
att_size_ = original_att_size; |
137 |
|
29245 |
is_wrapped_ = false; |
138 |
|
29245 |
is_msg_out_of_band_ = false; |
139 |
|
29245 |
Release(); |
140 |
|
29245 |
msg_rpc_.Clear(); |
141 |
|
29243 |
owns_msg_typed_ = false; |
142 |
|
29243 |
} |
143 |
|
|
|
144 |
|
|
|
145 |
|
13622 |
void CacheTransport::Frame::WrapMsg() { |
146 |
✓✓ |
13622 |
if (msg_typed_->GetTypeName() == "cvmfs.MsgHandshake") { |
147 |
|
|
msg_rpc_.set_allocated_msg_handshake( |
148 |
|
41 |
reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed_)); |
149 |
✓✓ |
13581 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgHandshakeAck") { |
150 |
|
|
msg_rpc_.set_allocated_msg_handshake_ack( |
151 |
|
17 |
reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed_)); |
152 |
✓✓ |
13564 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgQuit") { |
153 |
|
|
msg_rpc_.set_allocated_msg_quit( |
154 |
|
39 |
reinterpret_cast<cvmfs::MsgQuit *>(msg_typed_)); |
155 |
✓✓ |
13525 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgIoctl") { |
156 |
|
|
msg_rpc_.set_allocated_msg_ioctl( |
157 |
|
4 |
reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed_)); |
158 |
✓✓ |
13521 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgRefcountReq") { |
159 |
|
|
msg_rpc_.set_allocated_msg_refcount_req( |
160 |
|
3068 |
reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed_)); |
161 |
✓✓ |
10453 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgRefcountReply") { |
162 |
|
|
msg_rpc_.set_allocated_msg_refcount_reply( |
163 |
|
564 |
reinterpret_cast<cvmfs::MsgRefcountReply *>(msg_typed_)); |
164 |
✓✓ |
9889 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgObjectInfoReq") { |
165 |
|
|
msg_rpc_.set_allocated_msg_object_info_req( |
166 |
|
40 |
reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed_)); |
167 |
✓✓ |
9849 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgObjectInfoReply") { |
168 |
|
|
msg_rpc_.set_allocated_msg_object_info_reply( |
169 |
|
16 |
reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg_typed_)); |
170 |
✓✓ |
9833 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgReadReq") { |
171 |
|
|
msg_rpc_.set_allocated_msg_read_req( |
172 |
|
2920 |
reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed_)); |
173 |
✓✓ |
6913 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgReadReply") { |
174 |
|
|
msg_rpc_.set_allocated_msg_read_reply( |
175 |
|
2211 |
reinterpret_cast<cvmfs::MsgReadReply *>(msg_typed_)); |
176 |
✓✓ |
4702 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgStoreReq") { |
177 |
|
|
msg_rpc_.set_allocated_msg_store_req( |
178 |
|
3022 |
reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed_)); |
179 |
✓✓ |
1680 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgStoreAbortReq") { |
180 |
|
|
msg_rpc_.set_allocated_msg_store_abort_req( |
181 |
|
1 |
reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed_)); |
182 |
✓✓ |
1679 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgStoreReply") { |
183 |
|
|
msg_rpc_.set_allocated_msg_store_reply( |
184 |
|
609 |
reinterpret_cast<cvmfs::MsgStoreReply *>(msg_typed_)); |
185 |
✓✓ |
1070 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgInfoReq") { |
186 |
|
|
msg_rpc_.set_allocated_msg_info_req( |
187 |
|
28 |
reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed_)); |
188 |
✓✓ |
1042 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgInfoReply") { |
189 |
|
|
msg_rpc_.set_allocated_msg_info_reply( |
190 |
|
4 |
reinterpret_cast<cvmfs::MsgInfoReply *>(msg_typed_)); |
191 |
✓✓ |
1038 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgShrinkReq") { |
192 |
|
|
msg_rpc_.set_allocated_msg_shrink_req( |
193 |
|
10 |
reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed_)); |
194 |
✓✓ |
1028 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgShrinkReply") { |
195 |
|
|
msg_rpc_.set_allocated_msg_shrink_reply( |
196 |
|
2 |
reinterpret_cast<cvmfs::MsgShrinkReply *>(msg_typed_)); |
197 |
✓✓ |
1026 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgListReq") { |
198 |
|
|
msg_rpc_.set_allocated_msg_list_req( |
199 |
|
16 |
reinterpret_cast<cvmfs::MsgListReq *>(msg_typed_)); |
200 |
✓✓ |
1010 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgListReply") { |
201 |
|
|
msg_rpc_.set_allocated_msg_list_reply( |
202 |
|
8 |
reinterpret_cast<cvmfs::MsgListReply *>(msg_typed_)); |
203 |
✓✗ |
1002 |
} else if (msg_typed_->GetTypeName() == "cvmfs.MsgDetach") { |
204 |
|
|
msg_rpc_.set_allocated_msg_detach( |
205 |
|
1002 |
reinterpret_cast<cvmfs::MsgDetach *>(msg_typed_)); |
206 |
|
1002 |
is_msg_out_of_band_ = true; |
207 |
|
|
} else { |
208 |
|
|
// Unexpected message type, should never happen |
209 |
|
|
abort(); |
210 |
|
|
} |
211 |
|
13622 |
is_wrapped_ = true; |
212 |
|
13622 |
} |
213 |
|
|
|
214 |
|
|
|
215 |
|
15619 |
void CacheTransport::Frame::UnwrapMsg() { |
216 |
✓✓ |
15619 |
if (msg_rpc_.has_msg_handshake()) { |
217 |
|
17 |
msg_typed_ = msg_rpc_.mutable_msg_handshake(); |
218 |
✓✓ |
15602 |
} else if (msg_rpc_.has_msg_handshake_ack()) { |
219 |
|
41 |
msg_typed_ = msg_rpc_.mutable_msg_handshake_ack(); |
220 |
✓✓ |
15561 |
} else if (msg_rpc_.has_msg_quit()) { |
221 |
|
12 |
msg_typed_ = msg_rpc_.mutable_msg_quit(); |
222 |
✓✓ |
15549 |
} else if (msg_rpc_.has_msg_ioctl()) { |
223 |
|
4 |
msg_typed_ = msg_rpc_.mutable_msg_ioctl(); |
224 |
✓✓ |
15545 |
} else if (msg_rpc_.has_msg_refcount_req()) { |
225 |
|
564 |
msg_typed_ = msg_rpc_.mutable_msg_refcount_req(); |
226 |
✓✓ |
14981 |
} else if (msg_rpc_.has_msg_refcount_reply()) { |
227 |
|
3087 |
msg_typed_ = msg_rpc_.mutable_msg_refcount_reply(); |
228 |
✓✓ |
11894 |
} else if (msg_rpc_.has_msg_object_info_req()) { |
229 |
|
16 |
msg_typed_ = msg_rpc_.mutable_msg_object_info_req(); |
230 |
✓✓ |
11878 |
} else if (msg_rpc_.has_msg_object_info_reply()) { |
231 |
|
49 |
msg_typed_ = msg_rpc_.mutable_msg_object_info_reply(); |
232 |
✓✓ |
11829 |
} else if (msg_rpc_.has_msg_read_req()) { |
233 |
|
2211 |
msg_typed_ = msg_rpc_.mutable_msg_read_req(); |
234 |
✓✓ |
9621 |
} else if (msg_rpc_.has_msg_read_reply()) { |
235 |
|
4720 |
msg_typed_ = msg_rpc_.mutable_msg_read_reply(); |
236 |
✓✓ |
4901 |
} else if (msg_rpc_.has_msg_store_req()) { |
237 |
|
608 |
msg_typed_ = msg_rpc_.mutable_msg_store_req(); |
238 |
✓✓ |
4293 |
} else if (msg_rpc_.has_msg_store_abort_req()) { |
239 |
|
1 |
msg_typed_ = msg_rpc_.mutable_msg_store_abort_req(); |
240 |
✓✓ |
4292 |
} else if (msg_rpc_.has_msg_store_reply()) { |
241 |
|
3223 |
msg_typed_ = msg_rpc_.mutable_msg_store_reply(); |
242 |
✓✓ |
1069 |
} else if (msg_rpc_.has_msg_info_req()) { |
243 |
|
4 |
msg_typed_ = msg_rpc_.mutable_msg_info_req(); |
244 |
✓✓ |
1065 |
} else if (msg_rpc_.has_msg_info_reply()) { |
245 |
|
28 |
msg_typed_ = msg_rpc_.mutable_msg_info_reply(); |
246 |
✓✓ |
1037 |
} else if (msg_rpc_.has_msg_shrink_req()) { |
247 |
|
2 |
msg_typed_ = msg_rpc_.mutable_msg_shrink_req(); |
248 |
✓✓ |
1035 |
} else if (msg_rpc_.has_msg_shrink_reply()) { |
249 |
|
10 |
msg_typed_ = msg_rpc_.mutable_msg_shrink_reply(); |
250 |
✓✓ |
1025 |
} else if (msg_rpc_.has_msg_list_req()) { |
251 |
|
8 |
msg_typed_ = msg_rpc_.mutable_msg_list_req(); |
252 |
✓✓ |
1017 |
} else if (msg_rpc_.has_msg_list_reply()) { |
253 |
|
16 |
msg_typed_ = msg_rpc_.mutable_msg_list_reply(); |
254 |
✓✗ |
1001 |
} else if (msg_rpc_.has_msg_detach()) { |
255 |
|
1001 |
msg_typed_ = msg_rpc_.mutable_msg_detach(); |
256 |
|
1001 |
is_msg_out_of_band_ = true; |
257 |
|
|
} else { |
258 |
|
|
// Unexpected message type, should never happen |
259 |
|
|
abort(); |
260 |
|
|
} |
261 |
|
15622 |
} |
262 |
|
|
|
263 |
|
|
|
264 |
|
|
//------------------------------------------------------------------------------ |
265 |
|
|
|
266 |
|
|
|
267 |
|
41 |
CacheTransport::CacheTransport(int fd_connection) |
268 |
|
|
: fd_connection_(fd_connection) |
269 |
|
41 |
, flags_(0) |
270 |
|
|
{ |
271 |
✗✓ |
41 |
assert(fd_connection_ >= 0); |
272 |
|
41 |
} |
273 |
|
|
|
274 |
|
|
|
275 |
|
4449 |
CacheTransport::CacheTransport(int fd_connection, uint32_t flags) |
276 |
|
|
: fd_connection_(fd_connection) |
277 |
|
4449 |
, flags_(flags) |
278 |
|
|
{ |
279 |
✗✓ |
4449 |
assert(fd_connection_ >= 0); |
280 |
|
4449 |
} |
281 |
|
|
|
282 |
|
|
|
283 |
|
206388 |
void CacheTransport::FillMsgHash( |
284 |
|
|
const shash::Any &hash, |
285 |
|
|
cvmfs::MsgHash *msg_hash) |
286 |
|
|
{ |
287 |
✓✓✓✗
|
206388 |
switch (hash.algorithm) { |
288 |
|
|
case shash::kSha1: |
289 |
|
206364 |
msg_hash->set_algorithm(cvmfs::HASH_SHA1); |
290 |
|
206364 |
break; |
291 |
|
|
case shash::kRmd160: |
292 |
|
12 |
msg_hash->set_algorithm(cvmfs::HASH_RIPEMD160); |
293 |
|
12 |
break; |
294 |
|
|
case shash::kShake128: |
295 |
|
12 |
msg_hash->set_algorithm(cvmfs::HASH_SHAKE128); |
296 |
|
12 |
break; |
297 |
|
|
default: |
298 |
|
|
abort(); |
299 |
|
|
} |
300 |
|
206388 |
msg_hash->set_digest(hash.digest, shash::kDigestSizes[hash.algorithm]); |
301 |
|
206388 |
} |
302 |
|
|
|
303 |
|
|
|
304 |
|
3014 |
void CacheTransport::FillObjectType( |
305 |
|
|
CacheManager::ObjectType object_type, |
306 |
|
|
cvmfs::EnumObjectType *wire_type) |
307 |
|
|
{ |
308 |
✓✗✗✗
|
3014 |
switch (object_type) { |
309 |
|
|
case CacheManager::kTypeRegular: |
310 |
|
|
// TODO(jblomer): "pinned" should mean a permanently open fd |
311 |
|
|
case CacheManager::kTypePinned: |
312 |
|
3014 |
*wire_type = cvmfs::OBJECT_REGULAR; |
313 |
|
3014 |
break; |
314 |
|
|
case CacheManager::kTypeCatalog: |
315 |
|
|
*wire_type = cvmfs::OBJECT_CATALOG; |
316 |
|
|
break; |
317 |
|
|
case CacheManager::kTypeVolatile: |
318 |
|
|
*wire_type = cvmfs::OBJECT_VOLATILE; |
319 |
|
|
break; |
320 |
|
|
default: |
321 |
|
|
abort(); |
322 |
|
|
} |
323 |
|
3014 |
} |
324 |
|
|
|
325 |
|
|
|
326 |
|
3399 |
bool CacheTransport::ParseMsgHash( |
327 |
|
|
const cvmfs::MsgHash &msg_hash, |
328 |
|
|
shash::Any *hash) |
329 |
|
|
{ |
330 |
✓✗✗✗
|
3399 |
switch (msg_hash.algorithm()) { |
331 |
|
|
case cvmfs::HASH_SHA1: |
332 |
|
3399 |
hash->algorithm = shash::kSha1; |
333 |
|
3399 |
break; |
334 |
|
|
case cvmfs::HASH_RIPEMD160: |
335 |
|
|
hash->algorithm = shash::kRmd160; |
336 |
|
|
break; |
337 |
|
|
case cvmfs::HASH_SHAKE128: |
338 |
|
|
hash->algorithm = shash::kShake128; |
339 |
|
|
break; |
340 |
|
|
default: |
341 |
|
|
return false; |
342 |
|
|
} |
343 |
|
3399 |
const unsigned digest_size = shash::kDigestSizes[hash->algorithm]; |
344 |
✗✓ |
3399 |
if (msg_hash.digest().length() != digest_size) |
345 |
|
|
return false; |
346 |
|
3399 |
memcpy(hash->digest, msg_hash.digest().data(), digest_size); |
347 |
|
3399 |
return true; |
348 |
|
|
} |
349 |
|
|
|
350 |
|
|
|
351 |
|
|
bool CacheTransport::ParseObjectType( |
352 |
|
|
cvmfs::EnumObjectType wire_type, |
353 |
|
|
CacheManager::ObjectType *object_type) |
354 |
|
|
{ |
355 |
|
|
switch (wire_type) { |
356 |
|
|
case cvmfs::OBJECT_REGULAR: |
357 |
|
|
*object_type = CacheManager::kTypeRegular; |
358 |
|
|
return true; |
359 |
|
|
case cvmfs::OBJECT_CATALOG: |
360 |
|
|
*object_type = CacheManager::kTypeCatalog; |
361 |
|
|
return true; |
362 |
|
|
case cvmfs::OBJECT_VOLATILE: |
363 |
|
|
*object_type = CacheManager::kTypeVolatile; |
364 |
|
|
return true; |
365 |
|
|
default: |
366 |
|
|
return false; |
367 |
|
|
} |
368 |
|
|
} |
369 |
|
|
|
370 |
|
|
|
371 |
|
13595 |
bool CacheTransport::RecvFrame(CacheTransport::Frame *frame) { |
372 |
|
|
uint32_t size; |
373 |
|
|
bool has_attachment; |
374 |
|
13595 |
bool retval = RecvHeader(&size, &has_attachment); |
375 |
✓✓ |
13595 |
if (!retval) |
376 |
|
1 |
return false; |
377 |
|
|
|
378 |
|
|
void *buffer; |
379 |
✓✓ |
13594 |
if (size <= kMaxStackAlloc) |
380 |
|
10304 |
buffer = alloca(size); |
381 |
|
|
else |
382 |
|
3290 |
buffer = smalloc(size); |
383 |
|
13594 |
ssize_t nbytes = SafeRead(fd_connection_, buffer, size); |
384 |
✓✓✗✓
|
13594 |
if ((nbytes < 0) || (static_cast<uint32_t>(nbytes) != size)) { |
385 |
|
|
if (size > kMaxStackAlloc) { free(buffer); } |
386 |
|
|
return false; |
387 |
|
|
} |
388 |
|
|
|
389 |
|
13594 |
uint32_t msg_size = size; |
390 |
✓✓ |
13594 |
if (has_attachment) { |
391 |
✗✓ |
3520 |
if (size < 2) { |
392 |
|
|
// kMaxStackAlloc is > 2 (of course!) but we'll leave the condition here |
393 |
|
|
// for consistency. |
394 |
|
|
if (size > kMaxStackAlloc) { free(buffer); } |
395 |
|
|
return false; |
396 |
|
|
} |
397 |
|
|
msg_size = (*reinterpret_cast<unsigned char *>(buffer)) + |
398 |
|
3520 |
((*(reinterpret_cast<unsigned char *>(buffer) + 1)) << 8); |
399 |
✗✓ |
3520 |
if ((msg_size + kInnerHeaderSize) > size) { |
400 |
|
|
if (size > kMaxStackAlloc) { free(buffer); } |
401 |
|
|
return false; |
402 |
|
|
} |
403 |
|
|
} |
404 |
|
|
|
405 |
|
|
void *ptr_msg = has_attachment |
406 |
|
|
? (reinterpret_cast<char *>(buffer) + kInnerHeaderSize) |
407 |
✓✓ |
13594 |
: buffer; |
408 |
|
13594 |
retval = frame->ParseMsgRpc(ptr_msg, msg_size); |
409 |
✗✓ |
13594 |
if (!retval) { |
410 |
|
|
if (size > kMaxStackAlloc) { free(buffer); } |
411 |
|
|
return false; |
412 |
|
|
} |
413 |
|
|
|
414 |
✓✓ |
13594 |
if (has_attachment) { |
415 |
|
3520 |
uint32_t attachment_size = size - (msg_size + kInnerHeaderSize); |
416 |
✗✓ |
3520 |
if (frame->att_size() < attachment_size) { |
417 |
|
|
if (size > kMaxStackAlloc) { free(buffer); } |
418 |
|
|
return false; |
419 |
|
|
} |
420 |
|
|
void *ptr_attachment = |
421 |
|
3520 |
reinterpret_cast<char *>(buffer) + kInnerHeaderSize + msg_size; |
422 |
|
3520 |
memcpy(frame->attachment(), ptr_attachment, attachment_size); |
423 |
|
3520 |
frame->set_att_size(attachment_size); |
424 |
|
|
} else { |
425 |
|
10074 |
frame->set_att_size(0); |
426 |
|
|
} |
427 |
✓✓ |
13594 |
if (size > kMaxStackAlloc) { free(buffer); } |
428 |
|
13594 |
return true; |
429 |
|
|
} |
430 |
|
|
|
431 |
|
|
|
432 |
|
13595 |
bool CacheTransport::RecvHeader(uint32_t *size, bool *has_attachment) { |
433 |
|
|
unsigned char header[kHeaderSize]; |
434 |
|
13595 |
ssize_t nbytes = SafeRead(fd_connection_, header, kHeaderSize); |
435 |
✓✗✓✓
|
13595 |
if ((nbytes < 0) || (static_cast<unsigned>(nbytes) != kHeaderSize)) |
436 |
|
1 |
return false; |
437 |
✗✓ |
13594 |
if ((header[0] & (~kFlagHasAttachment)) != kWireProtocolVersion) |
438 |
|
|
return false; |
439 |
|
13594 |
*has_attachment = header[0] & kFlagHasAttachment; |
440 |
|
13594 |
*size = header[1] + (header[2] << 8) + (header[3] << 16); |
441 |
✓✗✓✗
|
13594 |
return (*size > 0) && (*size <= kMaxMsgSize); |
442 |
|
|
} |
443 |
|
|
|
444 |
|
|
|
445 |
|
13622 |
void CacheTransport::SendData( |
446 |
|
|
void *message, |
447 |
|
|
uint32_t msg_size, |
448 |
|
|
void *attachment, |
449 |
|
|
uint32_t att_size) |
450 |
|
|
{ |
451 |
|
|
uint32_t total_size = |
452 |
✓✓ |
13622 |
msg_size + att_size + ((att_size > 0) ? kInnerHeaderSize : 0); |
453 |
|
|
|
454 |
✗✓ |
13622 |
assert(total_size > 0); |
455 |
✗✓ |
13622 |
assert(total_size <= kMaxMsgSize); |
456 |
|
|
LogCvmfs(kLogCache, kLogDebug, |
457 |
|
7885 |
"sending message of size %u to cache transport", total_size); |
458 |
|
|
|
459 |
|
|
unsigned char header[kHeaderSize]; |
460 |
✓✓ |
13622 |
header[0] = kWireProtocolVersion | ((att_size == 0) ? 0 : kFlagHasAttachment); |
461 |
|
13622 |
header[1] = (total_size & 0x000000FF); |
462 |
|
13622 |
header[2] = (total_size & 0x0000FF00) >> 8; |
463 |
|
13622 |
header[3] = (total_size & 0x00FF0000) >> 16; |
464 |
|
|
// Only transferred if an attachment is present. Otherwise the overall size |
465 |
|
|
// is also the size of the protobuf message. |
466 |
|
|
unsigned char inner_header[kInnerHeaderSize]; |
467 |
|
|
|
468 |
|
|
struct iovec iov[4]; |
469 |
|
13622 |
iov[0].iov_base = header; |
470 |
|
13622 |
iov[0].iov_len = kHeaderSize; |
471 |
|
|
|
472 |
✓✓ |
13622 |
if (att_size > 0) { |
473 |
|
5229 |
inner_header[0] = (msg_size & 0x000000FF); |
474 |
|
5229 |
inner_header[1] = (msg_size & 0x0000FF00) >> 8; |
475 |
|
5229 |
iov[1].iov_base = inner_header; |
476 |
|
5229 |
iov[1].iov_len = kInnerHeaderSize; |
477 |
|
5229 |
iov[2].iov_base = message; |
478 |
|
5229 |
iov[2].iov_len = msg_size; |
479 |
|
5229 |
iov[3].iov_base = attachment; |
480 |
|
5229 |
iov[3].iov_len = att_size; |
481 |
|
|
} else { |
482 |
|
8393 |
iov[1].iov_base = message; |
483 |
|
8393 |
iov[1].iov_len = msg_size; |
484 |
|
|
} |
485 |
✓✓ |
13622 |
if (flags_ & kFlagSendNonBlocking) { |
486 |
✓✗ |
1002 |
SendNonBlocking(iov, (att_size == 0) ? 2 : 4); |
487 |
|
1002 |
return; |
488 |
|
|
} |
489 |
✓✓ |
12620 |
bool retval = SafeWriteV(fd_connection_, iov, (att_size == 0) ? 2 : 4); |
490 |
|
|
|
491 |
✗✓✗✗
|
12620 |
if (!retval && !(flags_ & kFlagSendIgnoreFailure)) { |
492 |
|
|
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
493 |
|
|
"failed to write to external cache transport (%d), aborting", |
494 |
|
|
errno); |
495 |
|
|
abort(); |
496 |
|
|
} |
497 |
|
|
} |
498 |
|
|
|
499 |
|
1002 |
void CacheTransport::SendNonBlocking(struct iovec *iov, unsigned iovcnt) { |
500 |
✗✓ |
1002 |
assert(iovcnt > 0); |
501 |
|
1002 |
unsigned total_size = 0; |
502 |
✓✓ |
3006 |
for (unsigned i = 0; i < iovcnt; ++i) |
503 |
|
2004 |
total_size += iov[i].iov_len; |
504 |
|
1002 |
unsigned char *buffer = reinterpret_cast<unsigned char *>(alloca(total_size)); |
505 |
|
|
|
506 |
|
1002 |
unsigned pos = 0; |
507 |
✓✓ |
3006 |
for (unsigned i = 0; i < iovcnt; ++i) { |
508 |
|
2004 |
memcpy(buffer + pos, iov[i].iov_base, iov[i].iov_len); |
509 |
|
2004 |
pos += iov[i].iov_len; |
510 |
|
|
} |
511 |
|
|
|
512 |
|
1002 |
int retval = send(fd_connection_, buffer, total_size, MSG_DONTWAIT); |
513 |
✗✓ |
1002 |
if (retval < 0) { |
514 |
|
|
assert(errno != EMSGSIZE); |
515 |
|
|
if (!(flags_ & kFlagSendIgnoreFailure)) { |
516 |
|
|
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
517 |
|
|
"failed to write to external cache transport (%d), aborting", |
518 |
|
|
errno); |
519 |
|
|
abort(); |
520 |
|
|
} |
521 |
|
|
} |
522 |
|
1002 |
} |
523 |
|
|
|
524 |
|
|
|
525 |
|
13622 |
void CacheTransport::SendFrame(CacheTransport::Frame *frame) { |
526 |
|
13622 |
cvmfs::MsgRpc *msg_rpc = frame->GetMsgRpc(); |
527 |
|
13622 |
int32_t size = msg_rpc->ByteSize(); |
528 |
✗✓ |
13622 |
assert(size > 0); |
529 |
|
|
#ifdef __APPLE__ |
530 |
|
|
void *buffer = smalloc(size); |
531 |
|
|
#else |
532 |
|
13622 |
void *buffer = alloca(size); |
533 |
|
|
#endif |
534 |
|
13622 |
bool retval = msg_rpc->SerializeToArray(buffer, size); |
535 |
✗✓ |
13622 |
assert(retval); |
536 |
|
13622 |
SendData(buffer, size, frame->attachment(), frame->att_size()); |
537 |
|
|
#ifdef __APPLE__ |
538 |
|
|
free(buffer); |
539 |
|
|
#endif |
540 |
|
13621 |
} |