GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_plugin/channel.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 478 590 81.0%
Branches: 342 805 42.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "channel.h"
6
7 #include <errno.h>
8 #include <poll.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <sys/un.h>
12 #include <unistd.h>
13
14 #include <cassert>
15 #include <cstring>
16 #include <vector>
17
18 #include "util/concurrency.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/smalloc.h"
25 #include "util/string.h"
26
27 using namespace std; // NOLINT
28
29
30 SessionCtx *SessionCtx::instance_ = NULL;
31
32 void SessionCtx::CleanupInstance() {
33 delete instance_;
34 instance_ = NULL;
35 }
36
37
38 15 SessionCtx::SessionCtx() {
39 15 lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
40 15 smalloc(sizeof(pthread_mutex_t)));
41 15 const int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
42
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 assert(retval == 0);
43 15 }
44
45
46 SessionCtx::~SessionCtx() {
47 pthread_mutex_destroy(lock_tls_blocks_);
48 free(lock_tls_blocks_);
49
50 for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
51 delete tls_blocks_[i];
52 }
53
54 const int retval = pthread_key_delete(thread_local_storage_);
55 assert(retval == 0);
56 }
57
58
59 144210 SessionCtx *SessionCtx::GetInstance() {
60
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 144195 times.
144210 if (instance_ == NULL) {
61 15 instance_ = new SessionCtx();
62 const int retval =
63 15 pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
64
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 assert(retval == 0);
65 }
66
67 144210 return instance_;
68 }
69
70
71 41640 void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
72 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
73 41640 pthread_getspecific(thread_local_storage_));
74
3/4
✓ Branch 0 taken 41625 times.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 41625 times.
41640 if ((tls == NULL) || !tls->is_set) {
75 15 *id = 0;
76 15 *reponame = NULL;
77 15 *client_instance = NULL;
78 } else {
79 41625 *id = tls->id;
80 41625 *reponame = tls->reponame;
81 41625 *client_instance = tls->client_instance;
82 }
83 41640 }
84
85
86 bool SessionCtx::IsSet() {
87 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
88 pthread_getspecific(thread_local_storage_));
89 if (tls == NULL)
90 return false;
91
92 return tls->is_set;
93 }
94
95
96 51285 void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
97 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
98 51285 pthread_getspecific(thread_local_storage_));
99
100
2/2
✓ Branch 0 taken 225 times.
✓ Branch 1 taken 51060 times.
51285 if (tls == NULL) {
101
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 tls = new ThreadLocalStorage(id, reponame, client_instance);
102 225 const int retval = pthread_setspecific(thread_local_storage_, tls);
103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 225 times.
225 assert(retval == 0);
104 225 const MutexLockGuard lock_guard(lock_tls_blocks_);
105
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 tls_blocks_.push_back(tls);
106 225 } else {
107 51060 tls->id = id;
108 51060 tls->reponame = reponame;
109 51060 tls->client_instance = client_instance;
110 51060 tls->is_set = true;
111 }
112 51285 }
113
114
115 225 void SessionCtx::TlsDestructor(void *data) {
116 225 ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
117
1/2
✓ Branch 0 taken 225 times.
✗ Branch 1 not taken.
225 delete tls;
118
119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 225 times.
225 assert(instance_);
120 225 const MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
121 225 for (vector<ThreadLocalStorage *>::iterator
122 225 i = instance_->tls_blocks_.begin(),
123 225 iEnd = instance_->tls_blocks_.end();
124
1/2
✓ Branch 2 taken 225 times.
✗ Branch 3 not taken.
225 i != iEnd; ++i) {
125
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 if ((*i) == tls) {
126
1/2
✓ Branch 2 taken 225 times.
✗ Branch 3 not taken.
225 instance_->tls_blocks_.erase(i);
127 225 break;
128 }
129 }
130 225 }
131
132
133 51285 void SessionCtx::Unset() {
134 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
135 51285 pthread_getspecific(thread_local_storage_));
136
1/2
✓ Branch 0 taken 51285 times.
✗ Branch 1 not taken.
51285 if (tls != NULL) {
137 51285 tls->is_set = false;
138 51285 tls->id = 0;
139 51285 tls->reponame = NULL;
140 51285 tls->client_instance = NULL;
141 }
142 51285 }
143
144
145 //------------------------------------------------------------------------------
146
147
148 270 CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
149 270 : id(id), name(name) {
150
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 vector<string> tokens = SplitString(name, ':');
151 270 reponame = strdup(tokens[0].c_str());
152
2/2
✓ Branch 1 taken 240 times.
✓ Branch 2 taken 30 times.
270 if (tokens.size() > 1)
153 240 client_instance = strdup(tokens[1].c_str());
154 else
155 30 client_instance = NULL;
156 270 }
157
158 const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
159
160
161 15030 void CachePlugin::AskToDetach() {
162 15030 char detach = kSignalDetach;
163
1/2
✓ Branch 1 taken 15030 times.
✗ Branch 2 not taken.
15030 WritePipe(pipe_ctrl_[1], &detach, 1);
164 15030 }
165
166
167 255 CachePlugin::CachePlugin(uint64_t capabilities)
168 255 : is_local_(false)
169 255 , capabilities_(capabilities)
170 255 , fd_socket_(-1)
171 255 , fd_socket_lock_(-1)
172 255 , running_(0)
173 255 , num_workers_(0)
174 255 , max_object_size_(kDefaultMaxObjectSize)
175
1/2
✓ Branch 2 taken 255 times.
✗ Branch 3 not taken.
255 , num_inlimbo_clients_(0) {
176 255 atomic_init64(&next_session_id_);
177 255 atomic_init64(&next_txn_id_);
178 255 atomic_init64(&next_lst_id_);
179 // Don't use listing id zero
180 255 atomic_inc64(&next_lst_id_);
181
1/2
✓ Branch 2 taken 255 times.
✗ Branch 3 not taken.
255 txn_ids_.Init(128, UniqueRequest(), HashUniqueRequest);
182 255 memset(&thread_io_, 0, sizeof(thread_io_));
183
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 MakePipe(pipe_ctrl_);
184 255 }
185
186
187 510 CachePlugin::~CachePlugin() {
188 510 Terminate();
189 510 ClosePipe(pipe_ctrl_);
190
1/2
✓ Branch 0 taken 255 times.
✗ Branch 1 not taken.
510 if (fd_socket_ >= 0)
191 510 close(fd_socket_);
192
1/2
✓ Branch 0 taken 255 times.
✗ Branch 1 not taken.
510 if (fd_socket_lock_ >= 0)
193 510 UnlockFile(fd_socket_lock_);
194 }
195
196
197 15 void CachePlugin::HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
198 CacheTransport *transport) {
199
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 const SessionCtxGuard session_guard(msg_req->session_id(), this);
200
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 cvmfs::MsgBreadcrumbReply msg_reply;
201
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 CacheTransport::Frame frame_send(&msg_reply);
202
203 15 msg_reply.set_req_id(msg_req->req_id());
204
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 manifest::Breadcrumb breadcrumb;
205
1/2
✓ Branch 3 taken 15 times.
✗ Branch 4 not taken.
15 const bool retval = transport->ParseMsgHash(msg_req->breadcrumb().hash(),
206 &breadcrumb.catalog_hash);
207
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 if (!retval) {
208 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
209 "malformed hash received from client");
210 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
211 } else {
212 15 breadcrumb.timestamp = msg_req->breadcrumb().timestamp();
213
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 if (msg_req->breadcrumb().has_revision()) {
214 15 breadcrumb.revision = msg_req->breadcrumb().revision();
215 } else {
216 breadcrumb.revision = 0;
217 }
218 const cvmfs::EnumStatus status =
219
1/2
✓ Branch 3 taken 15 times.
✗ Branch 4 not taken.
15 StoreBreadcrumb(msg_req->breadcrumb().fqrn(), breadcrumb);
220
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 msg_reply.set_status(status);
221 }
222
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 transport->SendFrame(&frame_send);
223 15 }
224
225
226 30 void CachePlugin::HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
227 CacheTransport *transport) {
228
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 const SessionCtxGuard session_guard(msg_req->session_id(), this);
229
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 cvmfs::MsgBreadcrumbReply msg_reply;
230
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 CacheTransport::Frame frame_send(&msg_reply);
231
232 30 msg_reply.set_req_id(msg_req->req_id());
233
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 manifest::Breadcrumb breadcrumb;
234
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 const cvmfs::EnumStatus status = LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
235
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 msg_reply.set_status(status);
236
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 15 times.
30 if (status == cvmfs::STATUS_OK) {
237
2/4
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 15 times.
15 assert(breadcrumb.IsValid());
238
2/4
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15 times.
✗ Branch 5 not taken.
15 cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
239
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 transport->FillMsgHash(breadcrumb.catalog_hash, msg_hash);
240
2/4
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15 times.
✗ Branch 5 not taken.
15 cvmfs::MsgBreadcrumb *msg_breadcrumb = new cvmfs::MsgBreadcrumb();
241
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 msg_breadcrumb->set_fqrn(msg_req->fqrn());
242 15 msg_breadcrumb->set_allocated_hash(msg_hash);
243 15 msg_breadcrumb->set_timestamp(breadcrumb.timestamp);
244 15 msg_breadcrumb->set_revision(breadcrumb.revision);
245 15 msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
246 }
247
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 transport->SendFrame(&frame_send);
248 30 }
249
250
251 270 void CachePlugin::HandleHandshake(cvmfs::MsgHandshake *msg_req,
252 CacheTransport *transport) {
253 270 const uint64_t session_id = NextSessionId();
254
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 if (msg_req->has_name()) {
255
2/4
✓ Branch 2 taken 270 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 270 times.
✗ Branch 6 not taken.
270 sessions_[session_id] = SessionInfo(session_id, msg_req->name());
256 } else {
257 sessions_[session_id] = SessionInfo(
258 session_id, "anonymous client (" + StringifyInt(session_id) + ")");
259 }
260
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 cvmfs::MsgHandshakeAck msg_ack;
261
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 CacheTransport::Frame frame_send(&msg_ack);
262
263
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 msg_ack.set_status(cvmfs::STATUS_OK);
264
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 msg_ack.set_name(name_);
265 270 msg_ack.set_protocol_version(kPbProtocolVersion);
266 270 msg_ack.set_max_object_size(max_object_size_);
267 270 msg_ack.set_session_id(session_id);
268 270 msg_ack.set_capabilities(capabilities_);
269
1/2
✓ Branch 0 taken 270 times.
✗ Branch 1 not taken.
270 if (is_local_)
270 270 msg_ack.set_pid(getpid());
271
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 transport->SendFrame(&frame_send);
272 270 }
273
274
275 60 void CachePlugin::HandleInfo(cvmfs::MsgInfoReq *msg_req,
276 CacheTransport *transport) {
277
1/2
✓ Branch 2 taken 60 times.
✗ Branch 3 not taken.
60 const SessionCtxGuard session_guard(msg_req->session_id(), this);
278
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 cvmfs::MsgInfoReply msg_reply;
279
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 CacheTransport::Frame frame_send(&msg_reply);
280
281 60 msg_reply.set_req_id(msg_req->req_id());
282 60 Info info;
283
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 const cvmfs::EnumStatus status = GetInfo(&info);
284
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if (status != cvmfs::STATUS_OK) {
285 LogSessionError(msg_req->session_id(), status,
286 "failed to query cache status");
287 }
288 60 msg_reply.set_size_bytes(info.size_bytes);
289 60 msg_reply.set_used_bytes(info.used_bytes);
290 60 msg_reply.set_pinned_bytes(info.pinned_bytes);
291 60 msg_reply.set_no_shrink(info.no_shrink);
292
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 msg_reply.set_status(status);
293
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 transport->SendFrame(&frame_send);
294 60 }
295
296
297 60 void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
298
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 60 times.
60 if (!msg_req->has_conncnt_change_by())
299 return;
300 60 const int32_t conncnt_change_by = msg_req->conncnt_change_by();
301
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
302 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
303 "invalid request to drop connection counter below zero");
304 return;
305 }
306
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 30 times.
60 if (conncnt_change_by > 0) {
307
2/4
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 30 times.
✗ Branch 7 not taken.
30 LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
308 } else {
309
2/4
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 30 times.
✗ Branch 7 not taken.
30 LogSessionInfo(msg_req->session_id(), "release session lock");
310 }
311 60 num_inlimbo_clients_ += conncnt_change_by;
312 }
313
314
315 150 void CachePlugin::HandleList(cvmfs::MsgListReq *msg_req,
316 CacheTransport *transport) {
317
1/2
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
150 const SessionCtxGuard session_guard(msg_req->session_id(), this);
318
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 cvmfs::MsgListReply msg_reply;
319
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 CacheTransport::Frame frame_send(&msg_reply);
320
321 150 msg_reply.set_req_id(msg_req->req_id());
322 150 int64_t listing_id = msg_req->listing_id();
323 150 msg_reply.set_listing_id(listing_id);
324 150 msg_reply.set_is_last_part(true);
325
326 cvmfs::EnumStatus status;
327
2/2
✓ Branch 1 taken 90 times.
✓ Branch 2 taken 60 times.
150 if (msg_req->listing_id() == 0) {
328 90 listing_id = NextLstId();
329
1/2
✓ Branch 2 taken 90 times.
✗ Branch 3 not taken.
90 status = ListingBegin(listing_id, msg_req->object_type());
330
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90 if (status != cvmfs::STATUS_OK) {
331 LogSessionError(msg_req->session_id(), status,
332 "failed to start enumeration of objects");
333 msg_reply.set_status(status);
334 transport->SendFrame(&frame_send);
335 return;
336 }
337 90 msg_reply.set_listing_id(listing_id);
338 }
339
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 150 times.
150 assert(listing_id != 0);
340
341
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 ObjectInfo item;
342 150 unsigned total_size = 0;
343
3/4
✓ Branch 1 taken 3000090 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3000000 times.
✓ Branch 4 taken 90 times.
3000090 while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
344
1/2
✓ Branch 1 taken 3000000 times.
✗ Branch 2 not taken.
3000000 cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
345
2/4
✓ Branch 1 taken 3000000 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3000000 times.
✗ Branch 5 not taken.
3000000 cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
346
1/2
✓ Branch 1 taken 3000000 times.
✗ Branch 2 not taken.
3000000 transport->FillMsgHash(item.id, msg_hash);
347 3000000 msg_list_record->set_allocated_hash(msg_hash);
348 3000000 msg_list_record->set_pinned(item.pinned);
349
1/2
✓ Branch 1 taken 3000000 times.
✗ Branch 2 not taken.
3000000 msg_list_record->set_description(item.description);
350 // Approximation of the message size
351 3000000 total_size += sizeof(item) + item.description.length();
352
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 2999940 times.
3000000 if (total_size > kListingSize)
353 60 break;
354 }
355
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 60 times.
150 if (status == cvmfs::STATUS_OUTOFBOUNDS) {
356
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
90 ListingEnd(listing_id);
357 90 status = cvmfs::STATUS_OK;
358 } else {
359 60 msg_reply.set_is_last_part(false);
360 }
361
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 150 times.
150 if (status != cvmfs::STATUS_OK) {
362 LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
363 }
364
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 msg_reply.set_status(status);
365
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 transport->SendFrame(&frame_send);
366
3/6
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 150 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 150 times.
✗ Branch 9 not taken.
150 }
367
368
369 240 void CachePlugin::HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
370 CacheTransport *transport) {
371
1/2
✓ Branch 2 taken 240 times.
✗ Branch 3 not taken.
240 const SessionCtxGuard session_guard(msg_req->session_id(), this);
372
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 cvmfs::MsgObjectInfoReply msg_reply;
373
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 CacheTransport::Frame frame_send(&msg_reply);
374
375 240 msg_reply.set_req_id(msg_req->req_id());
376
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 shash::Any object_id;
377
1/2
✓ Branch 2 taken 240 times.
✗ Branch 3 not taken.
240 const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
378
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 if (!retval) {
379 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
380 "malformed hash received from client");
381 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
382 } else {
383
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 ObjectInfo info;
384
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 const cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
385
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 msg_reply.set_status(status);
386
2/2
✓ Branch 0 taken 225 times.
✓ Branch 1 taken 15 times.
240 if (status == cvmfs::STATUS_OK) {
387
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 msg_reply.set_object_type(info.object_type);
388 225 msg_reply.set_size(info.size);
389
1/2
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
15 } else if (status != cvmfs::STATUS_NOENTRY) {
390
2/4
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 15 times.
✗ Branch 7 not taken.
15 LogSessionError(msg_req->session_id(), status,
391 "failed retrieving object details");
392 }
393 240 }
394
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 transport->SendFrame(&frame_send);
395 240 }
396
397
398 33165 void CachePlugin::HandleRead(cvmfs::MsgReadReq *msg_req,
399 CacheTransport *transport) {
400
1/2
✓ Branch 1 taken 33165 times.
✗ Branch 2 not taken.
33165 const SessionCtxGuard session_guard(msg_req->session_id(), this);
401
1/2
✓ Branch 1 taken 33165 times.
✗ Branch 2 not taken.
33165 cvmfs::MsgReadReply msg_reply;
402
1/2
✓ Branch 1 taken 33165 times.
✗ Branch 2 not taken.
33165 CacheTransport::Frame frame_send(&msg_reply);
403
404 33165 msg_reply.set_req_id(msg_req->req_id());
405
1/2
✓ Branch 1 taken 33165 times.
✗ Branch 2 not taken.
33165 shash::Any object_id;
406
1/2
✓ Branch 2 taken 33165 times.
✗ Branch 3 not taken.
33165 const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
407
3/6
✓ Branch 0 taken 33165 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 33165 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 33165 times.
33165 if (!retval || (msg_req->size() > max_object_size_)) {
408 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
409 "malformed hash received from client");
410 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
411 transport->SendFrame(&frame_send);
412 return;
413 }
414 33165 unsigned size = msg_req->size();
415 #ifdef __APPLE__
416 unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
417 #else
418 33165 unsigned char buffer[size];
419 #endif
420 const cvmfs::EnumStatus status =
421
1/2
✓ Branch 2 taken 33165 times.
✗ Branch 3 not taken.
33165 Pread(object_id, msg_req->offset(), &size, buffer);
422
1/2
✓ Branch 1 taken 33165 times.
✗ Branch 2 not taken.
33165 msg_reply.set_status(status);
423
2/2
✓ Branch 0 taken 33150 times.
✓ Branch 1 taken 15 times.
33165 if (status == cvmfs::STATUS_OK) {
424 33150 frame_send.set_attachment(buffer, size);
425 } else {
426
2/4
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 15 times.
✗ Branch 7 not taken.
15 LogSessionError(msg_req->session_id(), status,
427 "failed to read from object");
428 }
429
1/2
✓ Branch 1 taken 33165 times.
✗ Branch 2 not taken.
33165 transport->SendFrame(&frame_send);
430 #ifdef __APPLE__
431 free(buffer);
432 #endif
433
3/6
✓ Branch 2 taken 33165 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 33165 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 33165 times.
✗ Branch 9 not taken.
66330 }
434
435
436 8460 void CachePlugin::HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
437 CacheTransport *transport) {
438
1/2
✓ Branch 2 taken 8460 times.
✗ Branch 3 not taken.
8460 const SessionCtxGuard session_guard(msg_req->session_id(), this);
439
1/2
✓ Branch 1 taken 8460 times.
✗ Branch 2 not taken.
8460 cvmfs::MsgRefcountReply msg_reply;
440
1/2
✓ Branch 1 taken 8460 times.
✗ Branch 2 not taken.
8460 CacheTransport::Frame frame_send(&msg_reply);
441
442 8460 msg_reply.set_req_id(msg_req->req_id());
443
1/2
✓ Branch 1 taken 8460 times.
✗ Branch 2 not taken.
8460 shash::Any object_id;
444
1/2
✓ Branch 2 taken 8460 times.
✗ Branch 3 not taken.
8460 const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
445
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8460 times.
8460 if (!retval) {
446 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
447 "malformed hash received from client");
448 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
449 } else {
450 const cvmfs::EnumStatus status =
451
1/2
✓ Branch 2 taken 8460 times.
✗ Branch 3 not taken.
8460 ChangeRefcount(object_id, msg_req->change_by());
452
1/2
✓ Branch 1 taken 8460 times.
✗ Branch 2 not taken.
8460 msg_reply.set_status(status);
453
4/4
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 8430 times.
✓ Branch 2 taken 15 times.
✓ Branch 3 taken 15 times.
8460 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
454
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 LogSessionError(msg_req->session_id(), status,
455
2/4
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15 times.
✗ Branch 5 not taken.
30 "failed to open/close object " + object_id.ToString());
456 }
457 }
458
1/2
✓ Branch 1 taken 8460 times.
✗ Branch 2 not taken.
8460 transport->SendFrame(&frame_send);
459 8460 }
460
461
462 51885 bool CachePlugin::HandleRequest(int fd_con) {
463 51885 CacheTransport transport(fd_con, CacheTransport::kFlagSendIgnoreFailure);
464 51885 char buffer[max_object_size_];
465
1/2
✓ Branch 1 taken 51885 times.
✗ Branch 2 not taken.
51885 CacheTransport::Frame frame_recv;
466 51885 frame_recv.set_attachment(buffer, max_object_size_);
467
1/2
✓ Branch 1 taken 51885 times.
✗ Branch 2 not taken.
51885 const bool retval = transport.RecvFrame(&frame_recv);
468
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51885 times.
51885 if (!retval) {
469 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
470 "failed to receive request from connection (%d)", errno);
471 return false;
472 }
473
474
1/2
✓ Branch 1 taken 51885 times.
✗ Branch 2 not taken.
51885 google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
475
476
3/4
✓ Branch 1 taken 51885 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 270 times.
✓ Branch 6 taken 51615 times.
51885 if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
477 270 cvmfs::MsgHandshake *msg_req = reinterpret_cast<cvmfs::MsgHandshake *>(
478 msg_typed);
479
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 HandleHandshake(msg_req, &transport);
480
3/4
✓ Branch 1 taken 51615 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 270 times.
✓ Branch 6 taken 51345 times.
51615 } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
481 270 cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
482 const map<uint64_t, SessionInfo>::const_iterator iter =
483
1/2
✓ Branch 2 taken 270 times.
✗ Branch 3 not taken.
270 sessions_.find(msg_req->session_id());
484
1/2
✓ Branch 3 taken 270 times.
✗ Branch 4 not taken.
270 if (iter != sessions_.end()) {
485 270 free(iter->second.reponame);
486 270 free(iter->second.client_instance);
487 }
488
1/2
✓ Branch 2 taken 270 times.
✗ Branch 3 not taken.
270 sessions_.erase(msg_req->session_id());
489 270 return false;
490
3/4
✓ Branch 1 taken 51345 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 60 times.
✓ Branch 6 taken 51285 times.
51345 } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
491
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
492
3/4
✓ Branch 1 taken 51285 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 8460 times.
✓ Branch 6 taken 42825 times.
51285 } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
493 8460 cvmfs::MsgRefcountReq *msg_req = reinterpret_cast<cvmfs::MsgRefcountReq *>(
494 msg_typed);
495
1/2
✓ Branch 1 taken 8460 times.
✗ Branch 2 not taken.
8460 HandleRefcount(msg_req, &transport);
496
3/4
✓ Branch 1 taken 42825 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 240 times.
✓ Branch 6 taken 42585 times.
42825 } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
497 cvmfs::MsgObjectInfoReq
498 240 *msg_req = reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
499
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 HandleObjectInfo(msg_req, &transport);
500
3/4
✓ Branch 1 taken 42585 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 33165 times.
✓ Branch 6 taken 9420 times.
42585 } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
501 33165 cvmfs::MsgReadReq *msg_req = reinterpret_cast<cvmfs::MsgReadReq *>(
502 msg_typed);
503
1/2
✓ Branch 1 taken 33165 times.
✗ Branch 2 not taken.
33165 HandleRead(msg_req, &transport);
504
3/4
✓ Branch 1 taken 9420 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 9120 times.
✓ Branch 6 taken 300 times.
9420 } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
505 9120 cvmfs::MsgStoreReq *msg_req = reinterpret_cast<cvmfs::MsgStoreReq *>(
506 msg_typed);
507
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 HandleStore(msg_req, &frame_recv, &transport);
508
3/4
✓ Branch 1 taken 300 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 15 times.
✓ Branch 6 taken 285 times.
300 } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
509 cvmfs::MsgStoreAbortReq
510 15 *msg_req = reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
511
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 HandleStoreAbort(msg_req, &transport);
512
3/4
✓ Branch 1 taken 285 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 60 times.
✓ Branch 6 taken 225 times.
285 } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
513 60 cvmfs::MsgInfoReq *msg_req = reinterpret_cast<cvmfs::MsgInfoReq *>(
514 msg_typed);
515
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 HandleInfo(msg_req, &transport);
516
3/4
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 30 times.
✓ Branch 6 taken 195 times.
225 } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
517 30 cvmfs::MsgShrinkReq *msg_req = reinterpret_cast<cvmfs::MsgShrinkReq *>(
518 msg_typed);
519
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 HandleShrink(msg_req, &transport);
520
3/4
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 150 times.
✓ Branch 6 taken 45 times.
195 } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
521 150 cvmfs::MsgListReq *msg_req = reinterpret_cast<cvmfs::MsgListReq *>(
522 msg_typed);
523
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 HandleList(msg_req, &transport);
524
3/4
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 15 times.
✓ Branch 6 taken 30 times.
45 } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbStoreReq") {
525 cvmfs::MsgBreadcrumbStoreReq
526 15 *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed);
527
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 HandleBreadcrumbStore(msg_req, &transport);
528
2/4
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 30 times.
✗ Branch 6 not taken.
30 } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbLoadReq") {
529 cvmfs::MsgBreadcrumbLoadReq
530 30 *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed);
531
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 HandleBreadcrumbLoad(msg_req, &transport);
532 } else {
533 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
534 "unexpected message from client: %s",
535 std::string(msg_typed->GetTypeName()).c_str());
536 return false;
537 }
538
539 51615 return true;
540
1/2
✓ Branch 1 taken 51885 times.
✗ Branch 2 not taken.
103770 }
541
542
543 30 void CachePlugin::HandleShrink(cvmfs::MsgShrinkReq *msg_req,
544 CacheTransport *transport) {
545
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 const SessionCtxGuard session_guard(msg_req->session_id(), this);
546
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 cvmfs::MsgShrinkReply msg_reply;
547
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 CacheTransport::Frame frame_send(&msg_reply);
548
549 30 msg_reply.set_req_id(msg_req->req_id());
550 30 uint64_t used_bytes = 0;
551
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 const cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
552 30 msg_reply.set_used_bytes(used_bytes);
553
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 msg_reply.set_status(status);
554
3/4
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
30 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
555 LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
556 }
557
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 transport->SendFrame(&frame_send);
558 30 }
559
560
561 15 void CachePlugin::HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
562 CacheTransport *transport) {
563
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 const SessionCtxGuard session_guard(msg_req->session_id(), this);
564
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 cvmfs::MsgStoreReply msg_reply;
565
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 CacheTransport::Frame frame_send(&msg_reply);
566 15 msg_reply.set_req_id(msg_req->req_id());
567 15 msg_reply.set_part_nr(0);
568 uint64_t txn_id;
569 15 const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
570
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 const bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 if (!retval) {
572 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
573 "malformed transaction id received from client");
574 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
575 } else {
576
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 const cvmfs::EnumStatus status = AbortTxn(txn_id);
577
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 msg_reply.set_status(status);
578
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 if (status != cvmfs::STATUS_OK) {
579 LogSessionError(msg_req->session_id(), status,
580 "failed to abort transaction");
581 }
582
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 txn_ids_.Erase(uniq_req);
583 }
584
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 transport->SendFrame(&frame_send);
585 15 }
586
587
588 9120 void CachePlugin::HandleStore(cvmfs::MsgStoreReq *msg_req,
589 CacheTransport::Frame *frame,
590 CacheTransport *transport) {
591
1/2
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
9120 const SessionCtxGuard session_guard(msg_req->session_id(), this);
592
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 cvmfs::MsgStoreReply msg_reply;
593
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 CacheTransport::Frame frame_send(&msg_reply);
594 9120 msg_reply.set_req_id(msg_req->req_id());
595 9120 msg_reply.set_part_nr(msg_req->part_nr());
596
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 shash::Any object_id;
597
1/2
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
9120 bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
598
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 if (!retval || (frame->att_size() > max_object_size_)
599
5/8
✓ Branch 0 taken 9120 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 45 times.
✓ Branch 4 taken 9075 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 45 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 9120 times.
18240 || ((frame->att_size() < max_object_size_) && !msg_req->last_part())) {
600 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
601 "malformed hash or bad object size received from client");
602 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
603 transport->SendFrame(&frame_send);
604 return;
605 }
606
607 9120 const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
608 uint64_t txn_id;
609 9120 cvmfs::EnumStatus status = cvmfs::STATUS_OK;
610
2/2
✓ Branch 1 taken 105 times.
✓ Branch 2 taken 9015 times.
9120 if (msg_req->part_nr() == 1) {
611
2/4
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 105 times.
105 if (txn_ids_.Contains(uniq_req)) {
612 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
613 "invalid attempt to restart running transaction");
614 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
615 transport->SendFrame(&frame_send);
616 return;
617 }
618 105 txn_id = NextTxnId();
619
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 ObjectInfo info;
620 105 info.id = object_id;
621
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 if (msg_req->has_expected_size()) {
622 105 info.size = msg_req->expected_size();
623 }
624
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 if (msg_req->has_object_type()) {
625 105 info.object_type = msg_req->object_type();
626 }
627
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 if (msg_req->has_description()) {
628
1/2
✓ Branch 2 taken 105 times.
✗ Branch 3 not taken.
105 info.description = msg_req->description();
629 }
630
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 status = StartTxn(object_id, txn_id, info);
631
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 105 times.
105 if (status != cvmfs::STATUS_OK) {
632 LogSessionError(msg_req->session_id(), status,
633 "failed to start transaction");
634 msg_reply.set_status(status);
635 transport->SendFrame(&frame_send);
636 return;
637 }
638
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 txn_ids_.Insert(uniq_req, txn_id);
639
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 } else {
640
1/2
✓ Branch 1 taken 9015 times.
✗ Branch 2 not taken.
9015 retval = txn_ids_.Lookup(uniq_req, &txn_id);
641
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9015 times.
9015 if (!retval) {
642 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
643 "invalid transaction received from client");
644 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
645 transport->SendFrame(&frame_send);
646 return;
647 }
648 }
649
650 // TODO(jblomer): check part number and send objects up in order
651
2/2
✓ Branch 1 taken 9105 times.
✓ Branch 2 taken 15 times.
9120 if (frame->att_size() > 0) {
652
1/2
✓ Branch 2 taken 9105 times.
✗ Branch 3 not taken.
18210 status = WriteTxn(txn_id,
653 9105 reinterpret_cast<unsigned char *>(frame->attachment()),
654 frame->att_size());
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9105 times.
9105 if (status != cvmfs::STATUS_OK) {
656 LogSessionError(msg_req->session_id(), status, "failure writing object");
657 msg_reply.set_status(status);
658 transport->SendFrame(&frame_send);
659 return;
660 }
661 }
662
663
2/2
✓ Branch 1 taken 90 times.
✓ Branch 2 taken 9030 times.
9120 if (msg_req->last_part()) {
664
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
90 status = CommitTxn(txn_id);
665
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90 if (status != cvmfs::STATUS_OK) {
666 LogSessionError(msg_req->session_id(), status,
667 "failure committing object");
668 }
669
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
90 txn_ids_.Erase(uniq_req);
670 }
671
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 msg_reply.set_status(status);
672
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 transport->SendFrame(&frame_send);
673
3/6
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9120 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 9120 times.
✗ Branch 8 not taken.
9120 }
674
675
676 255 bool CachePlugin::IsRunning() { return atomic_read32(&running_) != 0; }
677
678
679 255 bool CachePlugin::Listen(const string &locator) {
680
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 vector<string> tokens = SplitString(locator, '=');
681
1/2
✓ Branch 2 taken 255 times.
✗ Branch 3 not taken.
255 if (tokens[0] == "unix") {
682
1/2
✓ Branch 2 taken 255 times.
✗ Branch 3 not taken.
255 const string lock_path = tokens[1] + ".lock";
683
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 fd_socket_lock_ = TryLockFile(lock_path);
684
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255 times.
255 if (fd_socket_lock_ == -1) {
685 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
686 "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
687 NotifySupervisor(CacheTransport::kFailureNotification);
688 return false;
689
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255 times.
255 } else if (fd_socket_lock_ == -2) {
690 // Another plugin process probably started in the meantime
691 NotifySupervisor(CacheTransport::kReadyNotification);
692 if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
693 LogCvmfs(kLogCache, kLogSyslogErr | kLogStderr,
694 "failed to lock on %s, file is busy", lock_path.c_str());
695 }
696 return false;
697 }
698
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255 times.
255 assert(fd_socket_lock_ >= 0);
699
1/2
✓ Branch 2 taken 255 times.
✗ Branch 3 not taken.
255 fd_socket_ = MakeSocket(tokens[1], 0600);
700 255 is_local_ = true;
701
1/4
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
255 } else if (tokens[0] == "tcp") {
702 vector<string> tcp_address = SplitString(tokens[1], ':');
703 if (tcp_address.size() != 2) {
704 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, "invalid locator: %s",
705 locator.c_str());
706 NotifySupervisor(CacheTransport::kFailureNotification);
707 return false;
708 }
709 fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
710 } else {
711 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
712 "unknown endpoint in locator: %s", locator.c_str());
713 NotifySupervisor(CacheTransport::kFailureNotification);
714 return false;
715 }
716
717
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255 times.
255 if (fd_socket_ < 0) {
718 if (errno == EADDRINUSE) {
719 // Another plugin process probably started in the meantime
720 NotifySupervisor(CacheTransport::kReadyNotification);
721 } else {
722 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
723 "failed to create endpoint %s (%d)", locator.c_str(), errno);
724 NotifySupervisor(CacheTransport::kFailureNotification);
725 }
726 is_local_ = false;
727 return false;
728 }
729 255 const int retval = listen(fd_socket_, 32);
730
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255 times.
255 assert(retval == 0);
731
732 255 return true;
733 255 }
734
735
736 60 void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
737
3/6
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 60 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 60 times.
✗ Branch 8 not taken.
120 string session_str("unidentified client (" + StringifyInt(session_id) + ")");
738 const map<uint64_t, SessionInfo>::const_iterator iter =
739
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 sessions_.find(session_id);
740
1/2
✓ Branch 3 taken 60 times.
✗ Branch 4 not taken.
60 if (iter != sessions_.end()) {
741
1/2
✓ Branch 2 taken 60 times.
✗ Branch 3 not taken.
60 session_str = iter->second.name;
742 }
743
1/2
✓ Branch 3 taken 60 times.
✗ Branch 4 not taken.
60 LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "session '%s': %s",
744 session_str.c_str(), msg.c_str());
745 60 }
746
747
748 45 void CachePlugin::LogSessionError(uint64_t session_id,
749 cvmfs::EnumStatus status,
750 const std::string &msg) {
751
3/6
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 45 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 45 times.
✗ Branch 8 not taken.
90 string session_str("unidentified client (" + StringifyInt(session_id) + ")");
752 const map<uint64_t, SessionInfo>::const_iterator iter =
753
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 sessions_.find(session_id);
754
1/2
✓ Branch 3 taken 45 times.
✗ Branch 4 not taken.
45 if (iter != sessions_.end()) {
755
1/2
✓ Branch 2 taken 45 times.
✗ Branch 3 not taken.
45 session_str = iter->second.name;
756 }
757
1/2
✓ Branch 4 taken 45 times.
✗ Branch 5 not taken.
45 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, "session '%s': %s (%d - %s)",
758 session_str.c_str(), msg.c_str(), status,
759 CacheTransportCode2Ascii(status));
760 45 }
761
762
763 255 void *CachePlugin::MainProcessRequests(void *data) {
764 255 CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
765
766 255 platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
767
768 255 vector<struct pollfd> watch_fds;
769 // Elements 0, 1: control pipe, socket fd
770 struct pollfd watch_ctrl;
771 255 watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
772 255 watch_ctrl.events = POLLIN | POLLPRI;
773
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 watch_fds.push_back(watch_ctrl);
774 struct pollfd watch_socket;
775 255 watch_socket.fd = cache_plugin->fd_socket_;
776 255 watch_socket.events = POLLIN | POLLPRI;
777
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 watch_fds.push_back(watch_socket);
778
779 255 bool terminated = false;
780
1/2
✓ Branch 0 taken 67170 times.
✗ Branch 1 not taken.
67170 while (!terminated) {
781
2/2
✓ Branch 1 taken 200985 times.
✓ Branch 2 taken 67170 times.
268155 for (unsigned i = 0; i < watch_fds.size(); ++i)
782 200985 watch_fds[i].revents = 0;
783
1/2
✓ Branch 3 taken 67170 times.
✗ Branch 4 not taken.
67170 const int retval = poll(&watch_fds[0], watch_fds.size(), -1);
784
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 67170 times.
67170 if (retval < 0) {
785 if (errno == EINTR)
786 continue;
787 PANIC(kLogSyslogErr | kLogDebug, "cache plugin connection failure (%d)",
788 errno);
789 }
790
791 // Termination or detach
792
2/2
✓ Branch 1 taken 15285 times.
✓ Branch 2 taken 51885 times.
67170 if (watch_fds[0].revents) {
793 char signal;
794
1/2
✓ Branch 2 taken 15285 times.
✗ Branch 3 not taken.
15285 ReadPipe(watch_fds[0].fd, &signal, 1);
795
2/2
✓ Branch 0 taken 15030 times.
✓ Branch 1 taken 255 times.
15285 if (signal == kSignalDetach) {
796
1/2
✓ Branch 1 taken 15030 times.
✗ Branch 2 not taken.
15030 cache_plugin->SendDetachRequests();
797 15030 continue;
798 }
799
800 // termination
801
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 255 times.
255 if (watch_fds.size() > 2) {
802 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
803 "terminating external cache manager with pending connections");
804 }
805 255 break;
806 }
807
808 // New connection
809
2/2
✓ Branch 1 taken 270 times.
✓ Branch 2 taken 51615 times.
51885 if (watch_fds[1].revents) {
810 struct sockaddr_un remote;
811 270 socklen_t socket_size = sizeof(remote);
812 const int fd_con =
813
1/2
✓ Branch 2 taken 270 times.
✗ Branch 3 not taken.
270 accept(watch_fds[1].fd, (struct sockaddr *)&remote, &socket_size);
814
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 270 times.
270 if (fd_con < 0) {
815 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
816 "failed to establish connection (%d)", errno);
817 continue;
818 }
819 struct pollfd watch_con;
820 270 watch_con.fd = fd_con;
821 270 watch_con.events = POLLIN | POLLPRI;
822
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 watch_fds.push_back(watch_con);
823
1/2
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
270 cache_plugin->connections_.insert(fd_con);
824 }
825
826 // New request
827
2/2
✓ Branch 1 taken 51885 times.
✓ Branch 2 taken 51885 times.
103770 for (unsigned i = 2; i < watch_fds.size();) {
828
1/2
✓ Branch 1 taken 51885 times.
✗ Branch 2 not taken.
51885 if (watch_fds[i].revents) {
829
1/2
✓ Branch 2 taken 51885 times.
✗ Branch 3 not taken.
51885 const bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
830
2/2
✓ Branch 0 taken 270 times.
✓ Branch 1 taken 51615 times.
51885 if (!proceed) {
831
1/2
✓ Branch 2 taken 270 times.
✗ Branch 3 not taken.
270 close(watch_fds[i].fd);
832
1/2
✓ Branch 2 taken 270 times.
✗ Branch 3 not taken.
270 cache_plugin->connections_.erase(watch_fds[i].fd);
833
1/2
✓ Branch 4 taken 270 times.
✗ Branch 5 not taken.
270 watch_fds.erase(watch_fds.begin() + i);
834 270 if ((getenv(CacheTransport::kEnvReadyNotifyFd) != NULL)
835 && (cache_plugin->connections_.empty())
836
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 270 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 270 times.
270 && (cache_plugin->num_inlimbo_clients_ == 0)) {
837 LogCvmfs(kLogCache, kLogSyslog,
838 "stopping cache plugin, no more active clients");
839 terminated = true;
840 break;
841 }
842 } else {
843 51615 i++;
844 }
845 } else {
846 i++;
847 }
848 }
849 }
850
851 // 0, 1 being closed by destructor
852
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 255 times.
255 for (unsigned i = 2; i < watch_fds.size(); ++i)
853 close(watch_fds[i].fd);
854
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 cache_plugin->txn_ids_.Clear();
855
856 255 signal(SIGPIPE, save_sigpipe);
857 255 return NULL;
858 255 }
859
860
861 /**
862 * Used during startup to synchronize with the cvmfs client.
863 */
864 255 void CachePlugin::NotifySupervisor(char signal) {
865 255 char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
866
1/2
✓ Branch 0 taken 255 times.
✗ Branch 1 not taken.
255 if (pipe_ready == NULL)
867 255 return;
868 const int fd_pipe_ready = String2Int64(pipe_ready);
869 WritePipe(fd_pipe_ready, &signal, 1);
870 }
871
872
873 255 void CachePlugin::ProcessRequests(unsigned num_workers) {
874 255 num_workers_ = num_workers;
875 const int retval =
876 255 pthread_create(&thread_io_, NULL, MainProcessRequests, this);
877
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255 times.
255 assert(retval == 0);
878 255 NotifySupervisor(CacheTransport::kReadyNotification);
879 255 atomic_cas32(&running_, 0, 1);
880 255 }
881
882
883 15030 void CachePlugin::SendDetachRequests() {
884 15030 set<int>::const_iterator iter = connections_.begin();
885 15030 const set<int>::const_iterator iter_end = connections_.end();
886
2/2
✓ Branch 2 taken 15030 times.
✓ Branch 3 taken 15030 times.
30060 for (; iter != iter_end; ++iter) {
887 15030 CacheTransport transport(*iter,
888 CacheTransport::kFlagSendIgnoreFailure
889
1/2
✓ Branch 1 taken 15030 times.
✗ Branch 2 not taken.
15030 | CacheTransport::kFlagSendNonBlocking);
890
1/2
✓ Branch 1 taken 15030 times.
✗ Branch 2 not taken.
15030 cvmfs::MsgDetach msg_detach;
891
1/2
✓ Branch 1 taken 15030 times.
✗ Branch 2 not taken.
15030 CacheTransport::Frame frame_send(&msg_detach);
892
1/2
✓ Branch 1 taken 15030 times.
✗ Branch 2 not taken.
15030 transport.SendFrame(&frame_send);
893 15030 }
894 15030 }
895
896
897 255 void CachePlugin::Terminate() {
898
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 if (IsRunning()) {
899 255 char terminate = kSignalTerminate;
900
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 WritePipe(pipe_ctrl_[1], &terminate, 1);
901
1/2
✓ Branch 1 taken 255 times.
✗ Branch 2 not taken.
255 pthread_join(thread_io_, NULL);
902 255 atomic_cas32(&running_, 1, 0);
903 }
904 255 }
905
906
907 void CachePlugin::WaitFor() {
908 if (!IsRunning())
909 return;
910 pthread_join(thread_io_, NULL);
911 }
912