GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_plugin/channel.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 477 589 81.0%
Branches: 342 805 42.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #include "cvmfs_config.h"
5 #include "channel.h"
6
7 #include <errno.h>
8 #include <poll.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <sys/un.h>
12 #include <unistd.h>
13
14 #include <cassert>
15 #include <cstring>
16 #include <vector>
17
18 #include "util/concurrency.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/smalloc.h"
25 #include "util/string.h"
26
27 using namespace std; // NOLINT
28
29
30 SessionCtx *SessionCtx::instance_ = NULL;
31
32 void SessionCtx::CleanupInstance() {
33 delete instance_;
34 instance_ = NULL;
35 }
36
37
38 1 SessionCtx::SessionCtx() {
39 1 lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
40 1 smalloc(sizeof(pthread_mutex_t)));
41 1 int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
42
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == 0);
43 1 }
44
45
46 SessionCtx::~SessionCtx() {
47 pthread_mutex_destroy(lock_tls_blocks_);
48 free(lock_tls_blocks_);
49
50 for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
51 delete tls_blocks_[i];
52 }
53
54 int retval = pthread_key_delete(thread_local_storage_);
55 assert(retval == 0);
56 }
57
58
59 9614 SessionCtx *SessionCtx::GetInstance() {
60
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 9613 times.
9614 if (instance_ == NULL) {
61 1 instance_ = new SessionCtx();
62 int retval =
63 1 pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
64
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == 0);
65 }
66
67 9614 return instance_;
68 }
69
70
71 2776 void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
72 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
73 2776 pthread_getspecific(thread_local_storage_));
74
3/4
✓ Branch 0 taken 2775 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2775 times.
2776 if ((tls == NULL) || !tls->is_set) {
75 1 *id = 0;
76 1 *reponame = NULL;
77 1 *client_instance = NULL;
78 } else {
79 2775 *id = tls->id;
80 2775 *reponame = tls->reponame;
81 2775 *client_instance = tls->client_instance;
82 }
83 2776 }
84
85
86 bool SessionCtx::IsSet() {
87 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
88 pthread_getspecific(thread_local_storage_));
89 if (tls == NULL)
90 return false;
91
92 return tls->is_set;
93 }
94
95
96 3419 void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
97 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
98 3419 pthread_getspecific(thread_local_storage_));
99
100
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 3404 times.
3419 if (tls == NULL) {
101
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 tls = new ThreadLocalStorage(id, reponame, client_instance);
102 15 int retval = pthread_setspecific(thread_local_storage_, tls);
103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 assert(retval == 0);
104 15 MutexLockGuard lock_guard(lock_tls_blocks_);
105
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 tls_blocks_.push_back(tls);
106 15 } else {
107 3404 tls->id = id;
108 3404 tls->reponame = reponame;
109 3404 tls->client_instance = client_instance;
110 3404 tls->is_set = true;
111 }
112 3419 }
113
114
115 15 void SessionCtx::TlsDestructor(void *data) {
116 15 ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
117
1/2
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
15 delete tls;
118
119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 assert(instance_);
120 15 MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
121 15 for (vector<ThreadLocalStorage *>::iterator i =
122 15 instance_->tls_blocks_.begin(), iEnd = instance_->tls_blocks_.end();
123
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 i != iEnd; ++i)
124 {
125
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 if ((*i) == tls) {
126
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 instance_->tls_blocks_.erase(i);
127 15 break;
128 }
129 }
130 15 }
131
132
133 3419 void SessionCtx::Unset() {
134 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
135 3419 pthread_getspecific(thread_local_storage_));
136
1/2
✓ Branch 0 taken 3419 times.
✗ Branch 1 not taken.
3419 if (tls != NULL) {
137 3419 tls->is_set = false;
138 3419 tls->id = 0;
139 3419 tls->reponame = NULL;
140 3419 tls->client_instance = NULL;
141 }
142 3419 }
143
144
145 //------------------------------------------------------------------------------
146
147
148 18 CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
149 18 : id(id)
150 18 , name(name)
151 {
152
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 vector<string> tokens = SplitString(name, ':');
153 18 reponame = strdup(tokens[0].c_str());
154
2/2
✓ Branch 1 taken 16 times.
✓ Branch 2 taken 2 times.
18 if (tokens.size() > 1)
155 16 client_instance = strdup(tokens[1].c_str());
156 else
157 2 client_instance = NULL;
158 18 }
159
160 const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
161
162
163 1002 void CachePlugin::AskToDetach() {
164 1002 char detach = kSignalDetach;
165
1/2
✓ Branch 1 taken 1002 times.
✗ Branch 2 not taken.
1002 WritePipe(pipe_ctrl_[1], &detach, 1);
166 1002 }
167
168
169 17 CachePlugin::CachePlugin(uint64_t capabilities)
170 17 : is_local_(false)
171 17 , capabilities_(capabilities)
172 17 , fd_socket_(-1)
173 17 , fd_socket_lock_(-1)
174 17 , running_(0)
175 17 , num_workers_(0)
176 17 , max_object_size_(kDefaultMaxObjectSize)
177
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 , num_inlimbo_clients_(0)
178 {
179 17 atomic_init64(&next_session_id_);
180 17 atomic_init64(&next_txn_id_);
181 17 atomic_init64(&next_lst_id_);
182 // Don't use listing id zero
183 17 atomic_inc64(&next_lst_id_);
184
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 txn_ids_.Init(128, UniqueRequest(), HashUniqueRequest);
185 17 memset(&thread_io_, 0, sizeof(thread_io_));
186
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 MakePipe(pipe_ctrl_);
187 17 }
188
189
190 34 CachePlugin::~CachePlugin() {
191 34 Terminate();
192 34 ClosePipe(pipe_ctrl_);
193
1/2
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
34 if (fd_socket_ >= 0)
194 34 close(fd_socket_);
195
1/2
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
34 if (fd_socket_lock_ >= 0)
196 34 UnlockFile(fd_socket_lock_);
197 }
198
199
200 1 void CachePlugin::HandleBreadcrumbStore(
201 cvmfs::MsgBreadcrumbStoreReq *msg_req,
202 CacheTransport *transport)
203 {
204
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 SessionCtxGuard session_guard(msg_req->session_id(), this);
205
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cvmfs::MsgBreadcrumbReply msg_reply;
206
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 CacheTransport::Frame frame_send(&msg_reply);
207
208 1 msg_reply.set_req_id(msg_req->req_id());
209
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 manifest::Breadcrumb breadcrumb;
210
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 bool retval = transport->ParseMsgHash(msg_req->breadcrumb().hash(),
211 &breadcrumb.catalog_hash);
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!retval) {
213 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
214 "malformed hash received from client");
215 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
216 } else {
217 1 breadcrumb.timestamp = msg_req->breadcrumb().timestamp();
218
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 if (msg_req->breadcrumb().has_revision()) {
219 1 breadcrumb.revision = msg_req->breadcrumb().revision();
220 } else {
221 breadcrumb.revision = 0;
222 }
223 cvmfs::EnumStatus status =
224
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 StoreBreadcrumb(msg_req->breadcrumb().fqrn(), breadcrumb);
225
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 msg_reply.set_status(status);
226 }
227
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 transport->SendFrame(&frame_send);
228 1 }
229
230
231 2 void CachePlugin::HandleBreadcrumbLoad(
232 cvmfs::MsgBreadcrumbLoadReq *msg_req,
233 CacheTransport *transport)
234 {
235
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 SessionCtxGuard session_guard(msg_req->session_id(), this);
236
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 cvmfs::MsgBreadcrumbReply msg_reply;
237
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 CacheTransport::Frame frame_send(&msg_reply);
238
239 2 msg_reply.set_req_id(msg_req->req_id());
240
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 manifest::Breadcrumb breadcrumb;
241 cvmfs::EnumStatus status =
242
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
243
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 msg_reply.set_status(status);
244
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (status == cvmfs::STATUS_OK) {
245
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
1 assert(breadcrumb.IsValid());
246
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
247
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 transport->FillMsgHash(breadcrumb.catalog_hash, msg_hash);
248
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 cvmfs::MsgBreadcrumb *msg_breadcrumb = new cvmfs::MsgBreadcrumb();
249
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 msg_breadcrumb->set_fqrn(msg_req->fqrn());
250 1 msg_breadcrumb->set_allocated_hash(msg_hash);
251 1 msg_breadcrumb->set_timestamp(breadcrumb.timestamp);
252 1 msg_breadcrumb->set_revision(breadcrumb.revision);
253 1 msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
254 }
255
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 transport->SendFrame(&frame_send);
256 2 }
257
258
259 18 void CachePlugin::HandleHandshake(
260 cvmfs::MsgHandshake *msg_req,
261 CacheTransport *transport)
262 {
263 18 uint64_t session_id = NextSessionId();
264
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 if (msg_req->has_name()) {
265
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18 times.
✗ Branch 6 not taken.
18 sessions_[session_id] = SessionInfo(session_id, msg_req->name());
266 } else {
267 sessions_[session_id] = SessionInfo(session_id,
268 "anonymous client (" + StringifyInt(session_id) + ")");
269 }
270
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 cvmfs::MsgHandshakeAck msg_ack;
271
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 CacheTransport::Frame frame_send(&msg_ack);
272
273
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 msg_ack.set_status(cvmfs::STATUS_OK);
274
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 msg_ack.set_name(name_);
275 18 msg_ack.set_protocol_version(kPbProtocolVersion);
276 18 msg_ack.set_max_object_size(max_object_size_);
277 18 msg_ack.set_session_id(session_id);
278 18 msg_ack.set_capabilities(capabilities_);
279
1/2
✓ Branch 0 taken 18 times.
✗ Branch 1 not taken.
18 if (is_local_)
280 18 msg_ack.set_pid(getpid());
281
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 transport->SendFrame(&frame_send);
282 18 }
283
284
285 4 void CachePlugin::HandleInfo(
286 cvmfs::MsgInfoReq *msg_req,
287 CacheTransport *transport)
288 {
289
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 SessionCtxGuard session_guard(msg_req->session_id(), this);
290
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 cvmfs::MsgInfoReply msg_reply;
291
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 CacheTransport::Frame frame_send(&msg_reply);
292
293 4 msg_reply.set_req_id(msg_req->req_id());
294 4 Info info;
295
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 cvmfs::EnumStatus status = GetInfo(&info);
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (status != cvmfs::STATUS_OK) {
297 LogSessionError(msg_req->session_id(), status,
298 "failed to query cache status");
299 }
300 4 msg_reply.set_size_bytes(info.size_bytes);
301 4 msg_reply.set_used_bytes(info.used_bytes);
302 4 msg_reply.set_pinned_bytes(info.pinned_bytes);
303 4 msg_reply.set_no_shrink(info.no_shrink);
304
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 msg_reply.set_status(status);
305
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 transport->SendFrame(&frame_send);
306 4 }
307
308
309 4 void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
310
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (!msg_req->has_conncnt_change_by())
311 return;
312 4 int32_t conncnt_change_by = msg_req->conncnt_change_by();
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
314 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
315 "invalid request to drop connection counter below zero");
316 return;
317 }
318
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (conncnt_change_by > 0) {
319
2/4
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
2 LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
320 } else {
321
2/4
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
2 LogSessionInfo(msg_req->session_id(), "release session lock");
322 }
323 4 num_inlimbo_clients_ += conncnt_change_by;
324 }
325
326
327 10 void CachePlugin::HandleList(
328 cvmfs::MsgListReq *msg_req,
329 CacheTransport *transport)
330 {
331
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 SessionCtxGuard session_guard(msg_req->session_id(), this);
332
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 cvmfs::MsgListReply msg_reply;
333
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 CacheTransport::Frame frame_send(&msg_reply);
334
335 10 msg_reply.set_req_id(msg_req->req_id());
336 10 int64_t listing_id = msg_req->listing_id();
337 10 msg_reply.set_listing_id(listing_id);
338 10 msg_reply.set_is_last_part(true);
339
340 cvmfs::EnumStatus status;
341
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 4 times.
10 if (msg_req->listing_id() == 0) {
342 6 listing_id = NextLstId();
343
1/2
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
6 status = ListingBegin(listing_id, msg_req->object_type());
344
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (status != cvmfs::STATUS_OK) {
345 LogSessionError(msg_req->session_id(), status,
346 "failed to start enumeration of objects");
347 msg_reply.set_status(status);
348 transport->SendFrame(&frame_send);
349 return;
350 }
351 6 msg_reply.set_listing_id(listing_id);
352 }
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(listing_id != 0);
354
355
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 ObjectInfo item;
356 10 unsigned total_size = 0;
357
3/4
✓ Branch 1 taken 200006 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 200000 times.
✓ Branch 4 taken 6 times.
200006 while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
358
1/2
✓ Branch 1 taken 200000 times.
✗ Branch 2 not taken.
200000 cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
359
2/4
✓ Branch 1 taken 200000 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 200000 times.
✗ Branch 5 not taken.
200000 cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
360
1/2
✓ Branch 1 taken 200000 times.
✗ Branch 2 not taken.
200000 transport->FillMsgHash(item.id, msg_hash);
361 200000 msg_list_record->set_allocated_hash(msg_hash);
362 200000 msg_list_record->set_pinned(item.pinned);
363
1/2
✓ Branch 1 taken 200000 times.
✗ Branch 2 not taken.
200000 msg_list_record->set_description(item.description);
364 // Approximation of the message size
365 200000 total_size += sizeof(item) + item.description.length();
366
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 199996 times.
200000 if (total_size > kListingSize)
367 4 break;
368 }
369
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 4 times.
10 if (status == cvmfs::STATUS_OUTOFBOUNDS) {
370
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 ListingEnd(listing_id);
371 6 status = cvmfs::STATUS_OK;
372 } else {
373 4 msg_reply.set_is_last_part(false);
374 }
375
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 if (status != cvmfs::STATUS_OK) {
376 LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
377 }
378
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 msg_reply.set_status(status);
379
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 transport->SendFrame(&frame_send);
380
3/6
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 10 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 10 times.
✗ Branch 9 not taken.
10 }
381
382
383 16 void CachePlugin::HandleObjectInfo(
384 cvmfs::MsgObjectInfoReq *msg_req,
385 CacheTransport *transport)
386 {
387
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
16 SessionCtxGuard session_guard(msg_req->session_id(), this);
388
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 cvmfs::MsgObjectInfoReply msg_reply;
389
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 CacheTransport::Frame frame_send(&msg_reply);
390
391 16 msg_reply.set_req_id(msg_req->req_id());
392
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 shash::Any object_id;
393
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
16 bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
16 if (!retval) {
395 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
396 "malformed hash received from client");
397 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
398 } else {
399
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 ObjectInfo info;
400
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
401
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 msg_reply.set_status(status);
402
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 1 times.
16 if (status == cvmfs::STATUS_OK) {
403
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 msg_reply.set_object_type(info.object_type);
404 15 msg_reply.set_size(info.size);
405
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 } else if (status != cvmfs::STATUS_NOENTRY) {
406
2/4
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
1 LogSessionError(msg_req->session_id(), status,
407 "failed retrieving object details");
408 }
409 16 }
410
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 transport->SendFrame(&frame_send);
411 16 }
412
413
414 2211 void CachePlugin::HandleRead(
415 cvmfs::MsgReadReq *msg_req,
416 CacheTransport *transport)
417 {
418
1/2
✓ Branch 1 taken 2211 times.
✗ Branch 2 not taken.
2211 SessionCtxGuard session_guard(msg_req->session_id(), this);
419
1/2
✓ Branch 1 taken 2211 times.
✗ Branch 2 not taken.
2211 cvmfs::MsgReadReply msg_reply;
420
1/2
✓ Branch 1 taken 2211 times.
✗ Branch 2 not taken.
2211 CacheTransport::Frame frame_send(&msg_reply);
421
422 2211 msg_reply.set_req_id(msg_req->req_id());
423
1/2
✓ Branch 1 taken 2211 times.
✗ Branch 2 not taken.
2211 shash::Any object_id;
424
1/2
✓ Branch 2 taken 2211 times.
✗ Branch 3 not taken.
2211 bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
425
3/6
✓ Branch 0 taken 2211 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2211 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2211 times.
2211 if (!retval || (msg_req->size() > max_object_size_)) {
426 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
427 "malformed hash received from client");
428 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
429 transport->SendFrame(&frame_send);
430 return;
431 }
432 2211 unsigned size = msg_req->size();
433 #ifdef __APPLE__
434 unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
435 #else
436 2211 unsigned char buffer[size];
437 #endif
438
1/2
✓ Branch 2 taken 2211 times.
✗ Branch 3 not taken.
2211 cvmfs::EnumStatus status = Pread(object_id, msg_req->offset(), &size, buffer);
439
1/2
✓ Branch 1 taken 2211 times.
✗ Branch 2 not taken.
2211 msg_reply.set_status(status);
440
2/2
✓ Branch 0 taken 2210 times.
✓ Branch 1 taken 1 times.
2211 if (status == cvmfs::STATUS_OK) {
441 2210 frame_send.set_attachment(buffer, size);
442 } else {
443
2/4
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
1 LogSessionError(msg_req->session_id(), status,
444 "failed to read from object");
445 }
446
1/2
✓ Branch 1 taken 2211 times.
✗ Branch 2 not taken.
2211 transport->SendFrame(&frame_send);
447 #ifdef __APPLE__
448 free(buffer);
449 #endif
450
3/6
✓ Branch 2 taken 2211 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2211 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2211 times.
✗ Branch 9 not taken.
4422 }
451
452
453 564 void CachePlugin::HandleRefcount(
454 cvmfs::MsgRefcountReq *msg_req,
455 CacheTransport *transport)
456 {
457
1/2
✓ Branch 2 taken 564 times.
✗ Branch 3 not taken.
564 SessionCtxGuard session_guard(msg_req->session_id(), this);
458
1/2
✓ Branch 1 taken 564 times.
✗ Branch 2 not taken.
564 cvmfs::MsgRefcountReply msg_reply;
459
1/2
✓ Branch 1 taken 564 times.
✗ Branch 2 not taken.
564 CacheTransport::Frame frame_send(&msg_reply);
460
461 564 msg_reply.set_req_id(msg_req->req_id());
462
1/2
✓ Branch 1 taken 564 times.
✗ Branch 2 not taken.
564 shash::Any object_id;
463
1/2
✓ Branch 2 taken 564 times.
✗ Branch 3 not taken.
564 bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
464
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 564 times.
564 if (!retval) {
465 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
466 "malformed hash received from client");
467 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
468 } else {
469
1/2
✓ Branch 2 taken 564 times.
✗ Branch 3 not taken.
564 cvmfs::EnumStatus status = ChangeRefcount(object_id, msg_req->change_by());
470
1/2
✓ Branch 1 taken 564 times.
✗ Branch 2 not taken.
564 msg_reply.set_status(status);
471
4/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 562 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 1 times.
564 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
472
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 LogSessionError(msg_req->session_id(), status,
473
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
2 "failed to open/close object " + object_id.ToString());
474 }
475 }
476
1/2
✓ Branch 1 taken 564 times.
✗ Branch 2 not taken.
564 transport->SendFrame(&frame_send);
477 564 }
478
479
480 3459 bool CachePlugin::HandleRequest(int fd_con) {
481 3459 CacheTransport transport(fd_con, CacheTransport::kFlagSendIgnoreFailure);
482 3459 char buffer[max_object_size_];
483
1/2
✓ Branch 1 taken 3459 times.
✗ Branch 2 not taken.
3459 CacheTransport::Frame frame_recv;
484 3459 frame_recv.set_attachment(buffer, max_object_size_);
485
1/2
✓ Branch 1 taken 3459 times.
✗ Branch 2 not taken.
3459 bool retval = transport.RecvFrame(&frame_recv);
486
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3459 times.
3459 if (!retval) {
487 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
488 "failed to receive request from connection (%d)", errno);
489 return false;
490 }
491
492
1/2
✓ Branch 1 taken 3459 times.
✗ Branch 2 not taken.
3459 google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
493
494
3/4
✓ Branch 1 taken 3459 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 18 times.
✓ Branch 6 taken 3441 times.
3459 if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
495 18 cvmfs::MsgHandshake *msg_req =
496 reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed);
497
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 HandleHandshake(msg_req, &transport);
498
3/4
✓ Branch 1 taken 3441 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 18 times.
✓ Branch 6 taken 3423 times.
3441 } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
499 18 cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
500 map<uint64_t, SessionInfo>::const_iterator iter =
501
1/2
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
18 sessions_.find(msg_req->session_id());
502
1/2
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
18 if (iter != sessions_.end()) {
503 18 free(iter->second.reponame);
504 18 free(iter->second.client_instance);
505 }
506
1/2
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
18 sessions_.erase(msg_req->session_id());
507 18 return false;
508
3/4
✓ Branch 1 taken 3423 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 4 times.
✓ Branch 6 taken 3419 times.
3423 } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
509
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
510
3/4
✓ Branch 1 taken 3419 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 564 times.
✓ Branch 6 taken 2855 times.
3419 } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
511 564 cvmfs::MsgRefcountReq *msg_req =
512 reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed);
513
1/2
✓ Branch 1 taken 564 times.
✗ Branch 2 not taken.
564 HandleRefcount(msg_req, &transport);
514
3/4
✓ Branch 1 taken 2855 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 16 times.
✓ Branch 6 taken 2839 times.
2855 } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
515 16 cvmfs::MsgObjectInfoReq *msg_req =
516 reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
517
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 HandleObjectInfo(msg_req, &transport);
518
3/4
✓ Branch 1 taken 2839 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 2211 times.
✓ Branch 6 taken 628 times.
2839 } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
519 2211 cvmfs::MsgReadReq *msg_req =
520 reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed);
521
1/2
✓ Branch 1 taken 2211 times.
✗ Branch 2 not taken.
2211 HandleRead(msg_req, &transport);
522
3/4
✓ Branch 1 taken 628 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 608 times.
✓ Branch 6 taken 20 times.
628 } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
523 608 cvmfs::MsgStoreReq *msg_req =
524 reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed);
525
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 HandleStore(msg_req, &frame_recv, &transport);
526
3/4
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 19 times.
20 } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
527 1 cvmfs::MsgStoreAbortReq *msg_req =
528 reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
529
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 HandleStoreAbort(msg_req, &transport);
530
3/4
✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 4 times.
✓ Branch 6 taken 15 times.
19 } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
531 4 cvmfs::MsgInfoReq *msg_req =
532 reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed);
533
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 HandleInfo(msg_req, &transport);
534
3/4
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 13 times.
15 } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
535 2 cvmfs::MsgShrinkReq *msg_req =
536 reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed);
537
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 HandleShrink(msg_req, &transport);
538
3/4
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 10 times.
✓ Branch 6 taken 3 times.
13 } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
539 10 cvmfs::MsgListReq *msg_req =
540 reinterpret_cast<cvmfs::MsgListReq *>(msg_typed);
541
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 HandleList(msg_req, &transport);
542
3/4
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 2 times.
3 } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbStoreReq") {
543 1 cvmfs::MsgBreadcrumbStoreReq *msg_req =
544 reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed);
545
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 HandleBreadcrumbStore(msg_req, &transport);
546
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
2 } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbLoadReq") {
547 2 cvmfs::MsgBreadcrumbLoadReq *msg_req =
548 reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed);
549
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 HandleBreadcrumbLoad(msg_req, &transport);
550 } else {
551 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
552 "unexpected message from client: %s",
553 msg_typed->GetTypeName().c_str());
554 return false;
555 }
556
557 3441 return true;
558
1/2
✓ Branch 1 taken 3459 times.
✗ Branch 2 not taken.
6918 }
559
560
561 2 void CachePlugin::HandleShrink(
562 cvmfs::MsgShrinkReq *msg_req,
563 CacheTransport *transport)
564 {
565
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 SessionCtxGuard session_guard(msg_req->session_id(), this);
566
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 cvmfs::MsgShrinkReply msg_reply;
567
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 CacheTransport::Frame frame_send(&msg_reply);
568
569 2 msg_reply.set_req_id(msg_req->req_id());
570 2 uint64_t used_bytes = 0;
571
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
572 2 msg_reply.set_used_bytes(used_bytes);
573
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 msg_reply.set_status(status);
574
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
2 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
575 LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
576 }
577
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 transport->SendFrame(&frame_send);
578 2 }
579
580
581 1 void CachePlugin::HandleStoreAbort(
582 cvmfs::MsgStoreAbortReq *msg_req,
583 CacheTransport *transport)
584 {
585
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 SessionCtxGuard session_guard(msg_req->session_id(), this);
586
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cvmfs::MsgStoreReply msg_reply;
587
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 CacheTransport::Frame frame_send(&msg_reply);
588 1 msg_reply.set_req_id(msg_req->req_id());
589 1 msg_reply.set_part_nr(0);
590 uint64_t txn_id;
591 1 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
592
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
593
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!retval) {
594 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
595 "malformed transaction id received from client");
596 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
597 } else {
598
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cvmfs::EnumStatus status = AbortTxn(txn_id);
599
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 msg_reply.set_status(status);
600
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (status != cvmfs::STATUS_OK) {
601 LogSessionError(msg_req->session_id(), status,
602 "failed to abort transaction");
603 }
604
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 txn_ids_.Erase(uniq_req);
605 }
606
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 transport->SendFrame(&frame_send);
607 1 }
608
609
610 608 void CachePlugin::HandleStore(
611 cvmfs::MsgStoreReq *msg_req,
612 CacheTransport::Frame *frame,
613 CacheTransport *transport)
614 {
615
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 SessionCtxGuard session_guard(msg_req->session_id(), this);
616
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 cvmfs::MsgStoreReply msg_reply;
617
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 CacheTransport::Frame frame_send(&msg_reply);
618 608 msg_reply.set_req_id(msg_req->req_id());
619 608 msg_reply.set_part_nr(msg_req->part_nr());
620
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 shash::Any object_id;
621
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
622 1824 if ( !retval ||
623
3/6
✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 608 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 608 times.
1216 (frame->att_size() > max_object_size_) ||
624
3/4
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 605 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 3 times.
608 ((frame->att_size() < max_object_size_) && !msg_req->last_part()) )
625 {
626 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
627 "malformed hash or bad object size received from client");
628 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
629 transport->SendFrame(&frame_send);
630 return;
631 }
632
633 608 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
634 uint64_t txn_id;
635 608 cvmfs::EnumStatus status = cvmfs::STATUS_OK;
636
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 601 times.
608 if (msg_req->part_nr() == 1) {
637
2/4
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 7 times.
7 if (txn_ids_.Contains(uniq_req)) {
638 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
639 "invalid attempt to restart running transaction");
640 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
641 transport->SendFrame(&frame_send);
642 return;
643 }
644 7 txn_id = NextTxnId();
645
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 ObjectInfo info;
646 7 info.id = object_id;
647
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 if (msg_req->has_expected_size()) {info.size = msg_req->expected_size();}
648
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 if (msg_req->has_object_type()) {info.object_type = msg_req->object_type();}
649
2/4
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 7 times.
✗ Branch 6 not taken.
7 if (msg_req->has_description()) {info.description = msg_req->description();}
650
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 status = StartTxn(object_id, txn_id, info);
651
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (status != cvmfs::STATUS_OK) {
652 LogSessionError(msg_req->session_id(), status,
653 "failed to start transaction");
654 msg_reply.set_status(status);
655 transport->SendFrame(&frame_send);
656 return;
657 }
658
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 txn_ids_.Insert(uniq_req, txn_id);
659
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 } else {
660
1/2
✓ Branch 1 taken 601 times.
✗ Branch 2 not taken.
601 retval = txn_ids_.Lookup(uniq_req, &txn_id);
661
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 601 times.
601 if (!retval) {
662 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
663 "invalid transaction received from client");
664 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
665 transport->SendFrame(&frame_send);
666 return;
667 }
668 }
669
670 // TODO(jblomer): check part number and send objects up in order
671
2/2
✓ Branch 1 taken 607 times.
✓ Branch 2 taken 1 times.
608 if (frame->att_size() > 0) {
672
1/2
✓ Branch 2 taken 607 times.
✗ Branch 3 not taken.
1214 status = WriteTxn(txn_id,
673 607 reinterpret_cast<unsigned char *>(frame->attachment()),
674 frame->att_size());
675
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 607 times.
607 if (status != cvmfs::STATUS_OK) {
676 LogSessionError(msg_req->session_id(), status, "failure writing object");
677 msg_reply.set_status(status);
678 transport->SendFrame(&frame_send);
679 return;
680 }
681 }
682
683
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 602 times.
608 if (msg_req->last_part()) {
684
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 status = CommitTxn(txn_id);
685
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (status != cvmfs::STATUS_OK) {
686 LogSessionError(msg_req->session_id(), status,
687 "failure committing object");
688 }
689
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 txn_ids_.Erase(uniq_req);
690 }
691
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 msg_reply.set_status(status);
692
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 transport->SendFrame(&frame_send);
693
3/6
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 608 times.
✗ Branch 8 not taken.
608 }
694
695
696 17 bool CachePlugin::IsRunning() {
697 17 return atomic_read32(&running_) != 0;
698 }
699
700
701 17 bool CachePlugin::Listen(const string &locator) {
702
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 vector<string> tokens = SplitString(locator, '=');
703
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 if (tokens[0] == "unix") {
704
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 string lock_path = tokens[1] + ".lock";
705
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 fd_socket_lock_ = TryLockFile(lock_path);
706
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 if (fd_socket_lock_ == -1) {
707 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
708 "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
709 NotifySupervisor(CacheTransport::kFailureNotification);
710 return false;
711
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 } else if (fd_socket_lock_ == -2) {
712 // Another plugin process probably started in the meantime
713 NotifySupervisor(CacheTransport::kReadyNotification);
714 if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
715 LogCvmfs(kLogCache, kLogSyslogErr | kLogStderr,
716 "failed to lock on %s, file is busy", lock_path.c_str());
717 }
718 return false;
719 }
720
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 assert(fd_socket_lock_ >= 0);
721
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 fd_socket_ = MakeSocket(tokens[1], 0600);
722 17 is_local_ = true;
723
1/4
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
17 } else if (tokens[0] == "tcp") {
724 vector<string> tcp_address = SplitString(tokens[1], ':');
725 if (tcp_address.size() != 2) {
726 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
727 "invalid locator: %s", locator.c_str());
728 NotifySupervisor(CacheTransport::kFailureNotification);
729 return false;
730 }
731 fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
732 } else {
733 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
734 "unknown endpoint in locator: %s", locator.c_str());
735 NotifySupervisor(CacheTransport::kFailureNotification);
736 return false;
737 }
738
739
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 if (fd_socket_ < 0) {
740 if (errno == EADDRINUSE) {
741 // Another plugin process probably started in the meantime
742 NotifySupervisor(CacheTransport::kReadyNotification);
743 } else {
744 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
745 "failed to create endpoint %s (%d)", locator.c_str(), errno);
746 NotifySupervisor(CacheTransport::kFailureNotification);
747 }
748 is_local_ = false;
749 return false;
750 }
751 17 int retval = listen(fd_socket_, 32);
752
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 assert(retval == 0);
753
754 17 return true;
755 17 }
756
757
758 4 void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
759
3/6
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
8 string session_str("unidentified client (" + StringifyInt(session_id) + ")");
760
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
761
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 if (iter != sessions_.end()) {
762
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 session_str = iter->second.name;
763 }
764
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
765 "session '%s': %s", session_str.c_str(), msg.c_str());
766 4 }
767
768
769 3 void CachePlugin::LogSessionError(
770 uint64_t session_id,
771 cvmfs::EnumStatus status,
772 const std::string &msg)
773 {
774
3/6
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 3 times.
✗ Branch 8 not taken.
6 string session_str("unidentified client (" + StringifyInt(session_id) + ")");
775
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
776
1/2
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
3 if (iter != sessions_.end()) {
777
1/2
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
3 session_str = iter->second.name;
778 }
779
1/2
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
3 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
780 "session '%s': %s (%d - %s)",
781 session_str.c_str(), msg.c_str(), status,
782 CacheTransportCode2Ascii(status));
783 3 }
784
785
786 17 void *CachePlugin::MainProcessRequests(void *data) {
787 17 CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
788
789 17 platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
790
791 17 vector<struct pollfd> watch_fds;
792 // Elements 0, 1: control pipe, socket fd
793 struct pollfd watch_ctrl;
794 17 watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
795 17 watch_ctrl.events = POLLIN | POLLPRI;
796
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 watch_fds.push_back(watch_ctrl);
797 struct pollfd watch_socket;
798 17 watch_socket.fd = cache_plugin->fd_socket_;
799 17 watch_socket.events = POLLIN | POLLPRI;
800
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 watch_fds.push_back(watch_socket);
801
802 17 bool terminated = false;
803
1/2
✓ Branch 0 taken 4478 times.
✗ Branch 1 not taken.
4478 while (!terminated) {
804
2/2
✓ Branch 1 taken 13399 times.
✓ Branch 2 taken 4478 times.
17877 for (unsigned i = 0; i < watch_fds.size(); ++i)
805 13399 watch_fds[i].revents = 0;
806
1/2
✓ Branch 3 taken 4478 times.
✗ Branch 4 not taken.
4478 int retval = poll(&watch_fds[0], watch_fds.size(), -1);
807
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4478 times.
4478 if (retval < 0) {
808 if (errno == EINTR)
809 continue;
810 PANIC(kLogSyslogErr | kLogDebug, "cache plugin connection failure (%d)",
811 errno);
812 }
813
814 // Termination or detach
815
2/2
✓ Branch 1 taken 1019 times.
✓ Branch 2 taken 3459 times.
4478 if (watch_fds[0].revents) {
816 char signal;
817
1/2
✓ Branch 2 taken 1019 times.
✗ Branch 3 not taken.
1019 ReadPipe(watch_fds[0].fd, &signal, 1);
818
2/2
✓ Branch 0 taken 1002 times.
✓ Branch 1 taken 17 times.
1019 if (signal == kSignalDetach) {
819
1/2
✓ Branch 1 taken 1002 times.
✗ Branch 2 not taken.
1002 cache_plugin->SendDetachRequests();
820 1002 continue;
821 }
822
823 // termination
824
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 if (watch_fds.size() > 2) {
825 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
826 "terminating external cache manager with pending connections");
827 }
828 17 break;
829 }
830
831 // New connection
832
2/2
✓ Branch 1 taken 18 times.
✓ Branch 2 taken 3441 times.
3459 if (watch_fds[1].revents) {
833 struct sockaddr_un remote;
834 18 socklen_t socket_size = sizeof(remote);
835 int fd_con =
836
1/2
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
18 accept(watch_fds[1].fd, (struct sockaddr *)&remote, &socket_size);
837
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
18 if (fd_con < 0) {
838 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
839 "failed to establish connection (%d)", errno);
840 continue;
841 }
842 struct pollfd watch_con;
843 18 watch_con.fd = fd_con;
844 18 watch_con.events = POLLIN | POLLPRI;
845
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 watch_fds.push_back(watch_con);
846
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 cache_plugin->connections_.insert(fd_con);
847 }
848
849 // New request
850
2/2
✓ Branch 1 taken 3459 times.
✓ Branch 2 taken 3459 times.
6918 for (unsigned i = 2; i < watch_fds.size(); ) {
851
1/2
✓ Branch 1 taken 3459 times.
✗ Branch 2 not taken.
3459 if (watch_fds[i].revents) {
852
1/2
✓ Branch 2 taken 3459 times.
✗ Branch 3 not taken.
3459 bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
853
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 3441 times.
3459 if (!proceed) {
854
1/2
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
18 close(watch_fds[i].fd);
855
1/2
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
18 cache_plugin->connections_.erase(watch_fds[i].fd);
856
1/2
✓ Branch 4 taken 18 times.
✗ Branch 5 not taken.
18 watch_fds.erase(watch_fds.begin() + i);
857
0/2
✗ Branch 1 not taken.
✗ Branch 2 not taken.
18 if ( (getenv(CacheTransport::kEnvReadyNotifyFd) != NULL) &&
858
2/4
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 18 times.
18 (cache_plugin->connections_.empty()) &&
859 (cache_plugin->num_inlimbo_clients_ == 0) )
860 {
861 LogCvmfs(kLogCache, kLogSyslog,
862 "stopping cache plugin, no more active clients");
863 terminated = true;
864 break;
865 }
866 } else {
867 3441 i++;
868 }
869 } else {
870 i++;
871 }
872 }
873 }
874
875 // 0, 1 being closed by destructor
876
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 for (unsigned i = 2; i < watch_fds.size(); ++i)
877 close(watch_fds[i].fd);
878
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 cache_plugin->txn_ids_.Clear();
879
880 17 signal(SIGPIPE, save_sigpipe);
881 17 return NULL;
882 17 }
883
884
885 /**
886 * Used during startup to synchronize with the cvmfs client.
887 */
888 17 void CachePlugin::NotifySupervisor(char signal) {
889 17 char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
890
1/2
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
17 if (pipe_ready == NULL)
891 17 return;
892 int fd_pipe_ready = String2Int64(pipe_ready);
893 WritePipe(fd_pipe_ready, &signal, 1);
894 }
895
896
897 17 void CachePlugin::ProcessRequests(unsigned num_workers) {
898 17 num_workers_ = num_workers;
899 17 int retval = pthread_create(&thread_io_, NULL, MainProcessRequests, this);
900
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 assert(retval == 0);
901 17 NotifySupervisor(CacheTransport::kReadyNotification);
902 17 atomic_cas32(&running_, 0, 1);
903 17 }
904
905
906 1002 void CachePlugin::SendDetachRequests() {
907 1002 set<int>::const_iterator iter = connections_.begin();
908 1002 set<int>::const_iterator iter_end = connections_.end();
909
2/2
✓ Branch 2 taken 1002 times.
✓ Branch 3 taken 1002 times.
2004 for (; iter != iter_end; ++iter) {
910 1002 CacheTransport transport(*iter,
911 CacheTransport::kFlagSendIgnoreFailure |
912
1/2
✓ Branch 1 taken 1002 times.
✗ Branch 2 not taken.
1002 CacheTransport::kFlagSendNonBlocking);
913
1/2
✓ Branch 1 taken 1002 times.
✗ Branch 2 not taken.
1002 cvmfs::MsgDetach msg_detach;
914
1/2
✓ Branch 1 taken 1002 times.
✗ Branch 2 not taken.
1002 CacheTransport::Frame frame_send(&msg_detach);
915
1/2
✓ Branch 1 taken 1002 times.
✗ Branch 2 not taken.
1002 transport.SendFrame(&frame_send);
916 1002 }
917 1002 }
918
919
920 17 void CachePlugin::Terminate() {
921
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 if (IsRunning()) {
922 17 char terminate = kSignalTerminate;
923
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 WritePipe(pipe_ctrl_[1], &terminate, 1);
924
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 pthread_join(thread_io_, NULL);
925 17 atomic_cas32(&running_, 1, 0);
926 }
927 17 }
928
929
930 void CachePlugin::WaitFor() {
931 if (!IsRunning())
932 return;
933 pthread_join(thread_io_, NULL);
934 }
935