GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_plugin/channel.cc
Date: 2025-09-28 02:35:26
Exec Total Coverage
Lines: 484 596 81.2%
Branches: 343 805 42.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "channel.h"
6
7 #include <errno.h>
8 #include <poll.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <sys/un.h>
12 #include <unistd.h>
13
14 #include <cassert>
15 #include <cstring>
16 #include <vector>
17
18 #include "util/concurrency.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/smalloc.h"
25 #include "util/string.h"
26
27 using namespace std; // NOLINT
28
29
30 SessionCtx *SessionCtx::instance_ = NULL;
31
32 void SessionCtx::CleanupInstance() {
33 delete instance_;
34 instance_ = NULL;
35 }
36
37
38 49 SessionCtx::SessionCtx() {
39 49 lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
40 49 smalloc(sizeof(pthread_mutex_t)));
41 49 const int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
42
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == 0);
43 49 }
44
45
46 SessionCtx::~SessionCtx() {
47 pthread_mutex_destroy(lock_tls_blocks_);
48 free(lock_tls_blocks_);
49
50 for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
51 delete tls_blocks_[i];
52 }
53
54 const int retval = pthread_key_delete(thread_local_storage_);
55 assert(retval == 0);
56 }
57
58
59 471086 SessionCtx *SessionCtx::GetInstance() {
60
2/2
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 471037 times.
471086 if (instance_ == NULL) {
61 49 instance_ = new SessionCtx();
62 49 const int retval = pthread_key_create(&instance_->thread_local_storage_,
63 TlsDestructor);
64
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == 0);
65 }
66
67 471086 return instance_;
68 }
69
70
71 136024 void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
72 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
73 136024 pthread_getspecific(thread_local_storage_));
74
3/4
✓ Branch 0 taken 135975 times.
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 135975 times.
136024 if ((tls == NULL) || !tls->is_set) {
75 49 *id = 0;
76 49 *reponame = NULL;
77 49 *client_instance = NULL;
78 } else {
79 135975 *id = tls->id;
80 135975 *reponame = tls->reponame;
81 135975 *client_instance = tls->client_instance;
82 }
83 136024 }
84
85
86 bool SessionCtx::IsSet() {
87 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
88 pthread_getspecific(thread_local_storage_));
89 if (tls == NULL)
90 return false;
91
92 return tls->is_set;
93 }
94
95
96 167531 void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
97 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
98 167531 pthread_getspecific(thread_local_storage_));
99
100
2/2
✓ Branch 0 taken 735 times.
✓ Branch 1 taken 166796 times.
167531 if (tls == NULL) {
101
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 tls = new ThreadLocalStorage(id, reponame, client_instance);
102 735 const int retval = pthread_setspecific(thread_local_storage_, tls);
103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 735 times.
735 assert(retval == 0);
104 735 const MutexLockGuard lock_guard(lock_tls_blocks_);
105
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 tls_blocks_.push_back(tls);
106 735 } else {
107 166796 tls->id = id;
108 166796 tls->reponame = reponame;
109 166796 tls->client_instance = client_instance;
110 166796 tls->is_set = true;
111 }
112 167531 }
113
114
115 735 void SessionCtx::TlsDestructor(void *data) {
116 735 ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
117
1/2
✓ Branch 0 taken 735 times.
✗ Branch 1 not taken.
735 delete tls;
118
119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 735 times.
735 assert(instance_);
120 735 const MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
121 735 for (vector<ThreadLocalStorage *>::iterator
122 735 i = instance_->tls_blocks_.begin(),
123 735 iEnd = instance_->tls_blocks_.end();
124
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 i != iEnd;
125 ++i) {
126
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 if ((*i) == tls) {
127
1/2
✓ Branch 2 taken 735 times.
✗ Branch 3 not taken.
735 instance_->tls_blocks_.erase(i);
128 735 break;
129 }
130 }
131 735 }
132
133
134 167531 void SessionCtx::Unset() {
135 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
136 167531 pthread_getspecific(thread_local_storage_));
137
1/2
✓ Branch 0 taken 167531 times.
✗ Branch 1 not taken.
167531 if (tls != NULL) {
138 167531 tls->is_set = false;
139 167531 tls->id = 0;
140 167531 tls->reponame = NULL;
141 167531 tls->client_instance = NULL;
142 }
143 167531 }
144
145
146 //------------------------------------------------------------------------------
147
148
149 882 CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
150 882 : id(id), name(name) {
151
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 vector<string> tokens = SplitString(name, ':');
152 882 reponame = strdup(tokens[0].c_str());
153
2/2
✓ Branch 1 taken 784 times.
✓ Branch 2 taken 98 times.
882 if (tokens.size() > 1)
154 784 client_instance = strdup(tokens[1].c_str());
155 else
156 98 client_instance = NULL;
157 882 }
158
159 const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
160
161
162 49098 void CachePlugin::AskToDetach() {
163 49098 char detach = kSignalDetach;
164
1/2
✓ Branch 1 taken 49098 times.
✗ Branch 2 not taken.
49098 WritePipe(pipe_ctrl_[1], &detach, 1);
165 49098 }
166
167
168 833 CachePlugin::CachePlugin(uint64_t capabilities)
169 833 : is_local_(false)
170 833 , capabilities_(capabilities)
171 833 , fd_socket_(-1)
172 833 , fd_socket_lock_(-1)
173 833 , running_(0)
174 833 , num_workers_(0)
175 833 , max_object_size_(kDefaultMaxObjectSize)
176
1/2
✓ Branch 2 taken 833 times.
✗ Branch 3 not taken.
833 , num_inlimbo_clients_(0) {
177 833 atomic_init64(&next_session_id_);
178 833 atomic_init64(&next_txn_id_);
179 833 atomic_init64(&next_lst_id_);
180 // Don't use listing id zero
181 833 atomic_inc64(&next_lst_id_);
182
1/2
✓ Branch 2 taken 833 times.
✗ Branch 3 not taken.
833 txn_ids_.Init(128, UniqueRequest(), HashUniqueRequest);
183 833 memset(&thread_io_, 0, sizeof(thread_io_));
184
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 MakePipe(pipe_ctrl_);
185 833 }
186
187
188 1666 CachePlugin::~CachePlugin() {
189 1666 Terminate();
190 1666 ClosePipe(pipe_ctrl_);
191
1/2
✓ Branch 0 taken 833 times.
✗ Branch 1 not taken.
1666 if (fd_socket_ >= 0)
192 1666 close(fd_socket_);
193
1/2
✓ Branch 0 taken 833 times.
✗ Branch 1 not taken.
1666 if (fd_socket_lock_ >= 0)
194 1666 UnlockFile(fd_socket_lock_);
195 }
196
197
198 49 void CachePlugin::HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
199 CacheTransport *transport) {
200
1/2
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
49 const SessionCtxGuard session_guard(msg_req->session_id(), this);
201
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 cvmfs::MsgBreadcrumbReply msg_reply;
202
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 CacheTransport::Frame frame_send(&msg_reply);
203
204 49 msg_reply.set_req_id(msg_req->req_id());
205
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 manifest::Breadcrumb breadcrumb;
206
1/2
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
49 const bool retval = transport->ParseMsgHash(msg_req->breadcrumb().hash(),
207 &breadcrumb.catalog_hash);
208
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 if (!retval) {
209 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
210 "malformed hash received from client");
211 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
212 } else {
213 49 breadcrumb.timestamp = msg_req->breadcrumb().timestamp();
214
1/2
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
49 if (msg_req->breadcrumb().has_revision()) {
215 49 breadcrumb.revision = msg_req->breadcrumb().revision();
216 } else {
217 breadcrumb.revision = 0;
218 }
219
1/2
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
49 const cvmfs::EnumStatus status = StoreBreadcrumb(
220 49 msg_req->breadcrumb().fqrn(), breadcrumb);
221
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 msg_reply.set_status(status);
222 }
223
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 transport->SendFrame(&frame_send);
224 49 }
225
226
227 98 void CachePlugin::HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
228 CacheTransport *transport) {
229
1/2
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
98 const SessionCtxGuard session_guard(msg_req->session_id(), this);
230
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 cvmfs::MsgBreadcrumbReply msg_reply;
231
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 CacheTransport::Frame frame_send(&msg_reply);
232
233 98 msg_reply.set_req_id(msg_req->req_id());
234
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 manifest::Breadcrumb breadcrumb;
235
1/2
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
98 const cvmfs::EnumStatus status = LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
236
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 msg_reply.set_status(status);
237
2/2
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 49 times.
98 if (status == cvmfs::STATUS_OK) {
238
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 49 times.
49 assert(breadcrumb.IsValid());
239
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
49 cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
240
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 transport->FillMsgHash(breadcrumb.catalog_hash, msg_hash);
241
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
49 cvmfs::MsgBreadcrumb *msg_breadcrumb = new cvmfs::MsgBreadcrumb();
242
1/2
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
49 msg_breadcrumb->set_fqrn(msg_req->fqrn());
243 49 msg_breadcrumb->set_allocated_hash(msg_hash);
244 49 msg_breadcrumb->set_timestamp(breadcrumb.timestamp);
245 49 msg_breadcrumb->set_revision(breadcrumb.revision);
246 49 msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
247 }
248
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 transport->SendFrame(&frame_send);
249 98 }
250
251
252 882 void CachePlugin::HandleHandshake(cvmfs::MsgHandshake *msg_req,
253 CacheTransport *transport) {
254 882 const uint64_t session_id = NextSessionId();
255
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 if (msg_req->has_name()) {
256
2/4
✓ Branch 2 taken 882 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 882 times.
✗ Branch 6 not taken.
882 sessions_[session_id] = SessionInfo(session_id, msg_req->name());
257 } else {
258 sessions_[session_id] = SessionInfo(
259 session_id, "anonymous client (" + StringifyInt(session_id) + ")");
260 }
261
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 cvmfs::MsgHandshakeAck msg_ack;
262
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 CacheTransport::Frame frame_send(&msg_ack);
263
264
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 msg_ack.set_status(cvmfs::STATUS_OK);
265
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 msg_ack.set_name(name_);
266 882 msg_ack.set_protocol_version(kPbProtocolVersion);
267 882 msg_ack.set_max_object_size(max_object_size_);
268 882 msg_ack.set_session_id(session_id);
269 882 msg_ack.set_capabilities(capabilities_);
270
1/2
✓ Branch 0 taken 882 times.
✗ Branch 1 not taken.
882 if (is_local_)
271 882 msg_ack.set_pid(getpid());
272
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 transport->SendFrame(&frame_send);
273 882 }
274
275
276 196 void CachePlugin::HandleInfo(cvmfs::MsgInfoReq *msg_req,
277 CacheTransport *transport) {
278
1/2
✓ Branch 2 taken 196 times.
✗ Branch 3 not taken.
196 const SessionCtxGuard session_guard(msg_req->session_id(), this);
279
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 cvmfs::MsgInfoReply msg_reply;
280
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 CacheTransport::Frame frame_send(&msg_reply);
281
282 196 msg_reply.set_req_id(msg_req->req_id());
283 196 Info info;
284
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 const cvmfs::EnumStatus status = GetInfo(&info);
285
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 196 times.
196 if (status != cvmfs::STATUS_OK) {
286 LogSessionError(msg_req->session_id(), status,
287 "failed to query cache status");
288 }
289 196 msg_reply.set_size_bytes(info.size_bytes);
290 196 msg_reply.set_used_bytes(info.used_bytes);
291 196 msg_reply.set_pinned_bytes(info.pinned_bytes);
292 196 msg_reply.set_no_shrink(info.no_shrink);
293
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 msg_reply.set_status(status);
294
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 transport->SendFrame(&frame_send);
295 196 }
296
297
298 196 void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
299
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 196 times.
196 if (!msg_req->has_conncnt_change_by())
300 return;
301 196 const int32_t conncnt_change_by = msg_req->conncnt_change_by();
302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 196 times.
196 if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
303 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
304 "invalid request to drop connection counter below zero");
305 return;
306 }
307
2/2
✓ Branch 0 taken 98 times.
✓ Branch 1 taken 98 times.
196 if (conncnt_change_by > 0) {
308
2/4
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
98 LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
309 } else {
310
2/4
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
98 LogSessionInfo(msg_req->session_id(), "release session lock");
311 }
312 196 num_inlimbo_clients_ += conncnt_change_by;
313 }
314
315
316 490 void CachePlugin::HandleList(cvmfs::MsgListReq *msg_req,
317 CacheTransport *transport) {
318
1/2
✓ Branch 2 taken 490 times.
✗ Branch 3 not taken.
490 const SessionCtxGuard session_guard(msg_req->session_id(), this);
319
1/2
✓ Branch 1 taken 490 times.
✗ Branch 2 not taken.
490 cvmfs::MsgListReply msg_reply;
320
1/2
✓ Branch 1 taken 490 times.
✗ Branch 2 not taken.
490 CacheTransport::Frame frame_send(&msg_reply);
321
322 490 msg_reply.set_req_id(msg_req->req_id());
323 490 int64_t listing_id = msg_req->listing_id();
324 490 msg_reply.set_listing_id(listing_id);
325 490 msg_reply.set_is_last_part(true);
326
327 cvmfs::EnumStatus status;
328
2/2
✓ Branch 1 taken 294 times.
✓ Branch 2 taken 196 times.
490 if (msg_req->listing_id() == 0) {
329 294 listing_id = NextLstId();
330
1/2
✓ Branch 2 taken 294 times.
✗ Branch 3 not taken.
294 status = ListingBegin(listing_id, msg_req->object_type());
331
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 294 times.
294 if (status != cvmfs::STATUS_OK) {
332 LogSessionError(msg_req->session_id(), status,
333 "failed to start enumeration of objects");
334 msg_reply.set_status(status);
335 transport->SendFrame(&frame_send);
336 return;
337 }
338 294 msg_reply.set_listing_id(listing_id);
339 }
340
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 490 times.
490 assert(listing_id != 0);
341
342
1/2
✓ Branch 1 taken 490 times.
✗ Branch 2 not taken.
490 ObjectInfo item;
343 490 unsigned total_size = 0;
344
3/4
✓ Branch 1 taken 9800294 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9800000 times.
✓ Branch 4 taken 294 times.
9800294 while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
345
1/2
✓ Branch 1 taken 9800000 times.
✗ Branch 2 not taken.
9800000 cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
346
2/4
✓ Branch 1 taken 9800000 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9800000 times.
✗ Branch 5 not taken.
9800000 cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
347
1/2
✓ Branch 1 taken 9800000 times.
✗ Branch 2 not taken.
9800000 transport->FillMsgHash(item.id, msg_hash);
348 9800000 msg_list_record->set_allocated_hash(msg_hash);
349 9800000 msg_list_record->set_pinned(item.pinned);
350
1/2
✓ Branch 1 taken 9800000 times.
✗ Branch 2 not taken.
9800000 msg_list_record->set_description(item.description);
351 // Approximation of the message size
352 9800000 total_size += sizeof(item) + item.description.length();
353
2/2
✓ Branch 0 taken 196 times.
✓ Branch 1 taken 9799804 times.
9800000 if (total_size > kListingSize)
354 196 break;
355 }
356
2/2
✓ Branch 0 taken 294 times.
✓ Branch 1 taken 196 times.
490 if (status == cvmfs::STATUS_OUTOFBOUNDS) {
357
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 ListingEnd(listing_id);
358 294 status = cvmfs::STATUS_OK;
359 } else {
360 196 msg_reply.set_is_last_part(false);
361 }
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 490 times.
490 if (status != cvmfs::STATUS_OK) {
363 LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
364 }
365
1/2
✓ Branch 1 taken 490 times.
✗ Branch 2 not taken.
490 msg_reply.set_status(status);
366
1/2
✓ Branch 1 taken 490 times.
✗ Branch 2 not taken.
490 transport->SendFrame(&frame_send);
367
3/6
✓ Branch 2 taken 490 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 490 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 490 times.
✗ Branch 9 not taken.
490 }
368
369
370 784 void CachePlugin::HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
371 CacheTransport *transport) {
372
1/2
✓ Branch 2 taken 784 times.
✗ Branch 3 not taken.
784 const SessionCtxGuard session_guard(msg_req->session_id(), this);
373
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 cvmfs::MsgObjectInfoReply msg_reply;
374
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 CacheTransport::Frame frame_send(&msg_reply);
375
376 784 msg_reply.set_req_id(msg_req->req_id());
377
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 shash::Any object_id;
378
1/2
✓ Branch 2 taken 784 times.
✗ Branch 3 not taken.
784 const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
379
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 784 times.
784 if (!retval) {
380 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
381 "malformed hash received from client");
382 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
383 } else {
384
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 ObjectInfo info;
385
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 const cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
386
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 msg_reply.set_status(status);
387
2/2
✓ Branch 0 taken 735 times.
✓ Branch 1 taken 49 times.
784 if (status == cvmfs::STATUS_OK) {
388
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 msg_reply.set_object_type(info.object_type);
389 735 msg_reply.set_size(info.size);
390
1/2
✓ Branch 0 taken 49 times.
✗ Branch 1 not taken.
49 } else if (status != cvmfs::STATUS_NOENTRY) {
391
2/4
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 49 times.
✗ Branch 7 not taken.
49 LogSessionError(msg_req->session_id(), status,
392 "failed retrieving object details");
393 }
394 784 }
395
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 transport->SendFrame(&frame_send);
396 784 }
397
398
399 108339 void CachePlugin::HandleRead(cvmfs::MsgReadReq *msg_req,
400 CacheTransport *transport) {
401
1/2
✓ Branch 1 taken 108339 times.
✗ Branch 2 not taken.
108339 const SessionCtxGuard session_guard(msg_req->session_id(), this);
402
1/2
✓ Branch 1 taken 108339 times.
✗ Branch 2 not taken.
108339 cvmfs::MsgReadReply msg_reply;
403
1/2
✓ Branch 1 taken 108339 times.
✗ Branch 2 not taken.
108339 CacheTransport::Frame frame_send(&msg_reply);
404
405 108339 msg_reply.set_req_id(msg_req->req_id());
406
1/2
✓ Branch 1 taken 108339 times.
✗ Branch 2 not taken.
108339 shash::Any object_id;
407
1/2
✓ Branch 2 taken 108339 times.
✗ Branch 3 not taken.
108339 const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
408
3/6
✓ Branch 0 taken 108339 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 108339 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 108339 times.
108339 if (!retval || (msg_req->size() > max_object_size_)) {
409 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
410 "malformed hash received from client");
411 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
412 transport->SendFrame(&frame_send);
413 return;
414 }
415 108339 unsigned size = msg_req->size();
416 #ifdef __APPLE__
417 unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
418 #else
419 108339 unsigned char buffer[size];
420 #endif
421
1/2
✓ Branch 2 taken 108339 times.
✗ Branch 3 not taken.
108339 const cvmfs::EnumStatus status = Pread(object_id, msg_req->offset(), &size,
422 buffer);
423
1/2
✓ Branch 1 taken 108339 times.
✗ Branch 2 not taken.
108339 msg_reply.set_status(status);
424
2/2
✓ Branch 0 taken 108290 times.
✓ Branch 1 taken 49 times.
108339 if (status == cvmfs::STATUS_OK) {
425 108290 frame_send.set_attachment(buffer, size);
426 } else {
427
2/4
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 49 times.
✗ Branch 7 not taken.
49 LogSessionError(msg_req->session_id(), status,
428 "failed to read from object");
429 }
430
1/2
✓ Branch 1 taken 108339 times.
✗ Branch 2 not taken.
108339 transport->SendFrame(&frame_send);
431 #ifdef __APPLE__
432 free(buffer);
433 #endif
434
3/6
✓ Branch 2 taken 108339 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108339 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 108339 times.
✗ Branch 9 not taken.
216678 }
435
436
437 27636 void CachePlugin::HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
438 CacheTransport *transport) {
439
1/2
✓ Branch 2 taken 27636 times.
✗ Branch 3 not taken.
27636 const SessionCtxGuard session_guard(msg_req->session_id(), this);
440
1/2
✓ Branch 1 taken 27636 times.
✗ Branch 2 not taken.
27636 cvmfs::MsgRefcountReply msg_reply;
441
1/2
✓ Branch 1 taken 27636 times.
✗ Branch 2 not taken.
27636 CacheTransport::Frame frame_send(&msg_reply);
442
443 27636 msg_reply.set_req_id(msg_req->req_id());
444
1/2
✓ Branch 1 taken 27636 times.
✗ Branch 2 not taken.
27636 shash::Any object_id;
445
1/2
✓ Branch 2 taken 27636 times.
✗ Branch 3 not taken.
27636 const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
446
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 27636 times.
27636 if (!retval) {
447 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
448 "malformed hash received from client");
449 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
450 } else {
451
1/2
✓ Branch 2 taken 27636 times.
✗ Branch 3 not taken.
27636 const cvmfs::EnumStatus status = ChangeRefcount(object_id,
452 msg_req->change_by());
453
1/2
✓ Branch 1 taken 27636 times.
✗ Branch 2 not taken.
27636 msg_reply.set_status(status);
454
4/4
✓ Branch 0 taken 98 times.
✓ Branch 1 taken 27538 times.
✓ Branch 2 taken 49 times.
✓ Branch 3 taken 49 times.
27636 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
455
1/2
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
49 LogSessionError(msg_req->session_id(), status,
456
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
98 "failed to open/close object " + object_id.ToString());
457 }
458 }
459
1/2
✓ Branch 1 taken 27636 times.
✗ Branch 2 not taken.
27636 transport->SendFrame(&frame_send);
460 27636 }
461
462
463 169491 bool CachePlugin::HandleRequest(int fd_con) {
464 169491 CacheTransport transport(fd_con, CacheTransport::kFlagSendIgnoreFailure);
465 169491 char buffer[max_object_size_];
466
1/2
✓ Branch 1 taken 169491 times.
✗ Branch 2 not taken.
169491 CacheTransport::Frame frame_recv;
467 169491 frame_recv.set_attachment(buffer, max_object_size_);
468
1/2
✓ Branch 1 taken 169491 times.
✗ Branch 2 not taken.
169491 const bool retval = transport.RecvFrame(&frame_recv);
469
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 169491 times.
169491 if (!retval) {
470 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
471 "failed to receive request from connection (%d)", errno);
472 return false;
473 }
474
475
1/2
✓ Branch 1 taken 169491 times.
✗ Branch 2 not taken.
169491 google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
476
477
3/4
✓ Branch 1 taken 169491 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 882 times.
✓ Branch 6 taken 168609 times.
169491 if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
478 882 cvmfs::MsgHandshake *msg_req = reinterpret_cast<cvmfs::MsgHandshake *>(
479 msg_typed);
480
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 HandleHandshake(msg_req, &transport);
481
3/4
✓ Branch 1 taken 168609 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 882 times.
✓ Branch 6 taken 167727 times.
168609 } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
482 882 cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
483 2646 const map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(
484
1/2
✓ Branch 2 taken 882 times.
✗ Branch 3 not taken.
882 msg_req->session_id());
485
1/2
✓ Branch 3 taken 882 times.
✗ Branch 4 not taken.
882 if (iter != sessions_.end()) {
486 882 free(iter->second.reponame);
487 882 free(iter->second.client_instance);
488 }
489
1/2
✓ Branch 2 taken 882 times.
✗ Branch 3 not taken.
882 sessions_.erase(msg_req->session_id());
490 882 return false;
491
3/4
✓ Branch 1 taken 167727 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 196 times.
✓ Branch 6 taken 167531 times.
167727 } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
492
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
493
3/4
✓ Branch 1 taken 167531 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 27636 times.
✓ Branch 6 taken 139895 times.
167531 } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
494 27636 cvmfs::MsgRefcountReq *msg_req = reinterpret_cast<cvmfs::MsgRefcountReq *>(
495 msg_typed);
496
1/2
✓ Branch 1 taken 27636 times.
✗ Branch 2 not taken.
27636 HandleRefcount(msg_req, &transport);
497
3/4
✓ Branch 1 taken 139895 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 784 times.
✓ Branch 6 taken 139111 times.
139895 } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
498 cvmfs::MsgObjectInfoReq
499 784 *msg_req = reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
500
1/2
✓ Branch 1 taken 784 times.
✗ Branch 2 not taken.
784 HandleObjectInfo(msg_req, &transport);
501
3/4
✓ Branch 1 taken 139111 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 108339 times.
✓ Branch 6 taken 30772 times.
139111 } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
502 108339 cvmfs::MsgReadReq *msg_req = reinterpret_cast<cvmfs::MsgReadReq *>(
503 msg_typed);
504
1/2
✓ Branch 1 taken 108339 times.
✗ Branch 2 not taken.
108339 HandleRead(msg_req, &transport);
505
3/4
✓ Branch 1 taken 30772 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 29792 times.
✓ Branch 6 taken 980 times.
30772 } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
506 29792 cvmfs::MsgStoreReq *msg_req = reinterpret_cast<cvmfs::MsgStoreReq *>(
507 msg_typed);
508
1/2
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
29792 HandleStore(msg_req, &frame_recv, &transport);
509
3/4
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 49 times.
✓ Branch 6 taken 931 times.
980 } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
510 cvmfs::MsgStoreAbortReq
511 49 *msg_req = reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
512
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 HandleStoreAbort(msg_req, &transport);
513
3/4
✓ Branch 1 taken 931 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 196 times.
✓ Branch 6 taken 735 times.
931 } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
514 196 cvmfs::MsgInfoReq *msg_req = reinterpret_cast<cvmfs::MsgInfoReq *>(
515 msg_typed);
516
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 HandleInfo(msg_req, &transport);
517
3/4
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 98 times.
✓ Branch 6 taken 637 times.
735 } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
518 98 cvmfs::MsgShrinkReq *msg_req = reinterpret_cast<cvmfs::MsgShrinkReq *>(
519 msg_typed);
520
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 HandleShrink(msg_req, &transport);
521
3/4
✓ Branch 1 taken 637 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 490 times.
✓ Branch 6 taken 147 times.
637 } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
522 490 cvmfs::MsgListReq *msg_req = reinterpret_cast<cvmfs::MsgListReq *>(
523 msg_typed);
524
1/2
✓ Branch 1 taken 490 times.
✗ Branch 2 not taken.
490 HandleList(msg_req, &transport);
525
3/4
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 49 times.
✓ Branch 6 taken 98 times.
147 } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbStoreReq") {
526 cvmfs::MsgBreadcrumbStoreReq
527 49 *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed);
528
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 HandleBreadcrumbStore(msg_req, &transport);
529
2/4
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 98 times.
✗ Branch 6 not taken.
98 } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbLoadReq") {
530 cvmfs::MsgBreadcrumbLoadReq
531 98 *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed);
532
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 HandleBreadcrumbLoad(msg_req, &transport);
533 } else {
534 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
535 "unexpected message from client: %s",
536 std::string(msg_typed->GetTypeName()).c_str());
537 return false;
538 }
539
540 168609 return true;
541
1/2
✓ Branch 1 taken 169491 times.
✗ Branch 2 not taken.
338982 }
542
543
544 98 void CachePlugin::HandleShrink(cvmfs::MsgShrinkReq *msg_req,
545 CacheTransport *transport) {
546
1/2
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
98 const SessionCtxGuard session_guard(msg_req->session_id(), this);
547
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 cvmfs::MsgShrinkReply msg_reply;
548
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 CacheTransport::Frame frame_send(&msg_reply);
549
550 98 msg_reply.set_req_id(msg_req->req_id());
551 98 uint64_t used_bytes = 0;
552
1/2
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
98 const cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
553 98 msg_reply.set_used_bytes(used_bytes);
554
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 msg_reply.set_status(status);
555
3/4
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
98 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
556 LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
557 }
558
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 transport->SendFrame(&frame_send);
559 98 }
560
561
562 49 void CachePlugin::HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
563 CacheTransport *transport) {
564
1/2
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
49 const SessionCtxGuard session_guard(msg_req->session_id(), this);
565
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 cvmfs::MsgStoreReply msg_reply;
566
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 CacheTransport::Frame frame_send(&msg_reply);
567 49 msg_reply.set_req_id(msg_req->req_id());
568 49 msg_reply.set_part_nr(0);
569 uint64_t txn_id;
570 49 const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
571
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 const bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
572
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 if (!retval) {
573 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
574 "malformed transaction id received from client");
575 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
576 } else {
577
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 const cvmfs::EnumStatus status = AbortTxn(txn_id);
578
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 msg_reply.set_status(status);
579
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 if (status != cvmfs::STATUS_OK) {
580 LogSessionError(msg_req->session_id(), status,
581 "failed to abort transaction");
582 }
583
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 txn_ids_.Erase(uniq_req);
584 }
585
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 transport->SendFrame(&frame_send);
586 49 }
587
588
589 29792 void CachePlugin::HandleStore(cvmfs::MsgStoreReq *msg_req,
590 CacheTransport::Frame *frame,
591 CacheTransport *transport) {
592
1/2
✓ Branch 2 taken 29792 times.
✗ Branch 3 not taken.
29792 const SessionCtxGuard session_guard(msg_req->session_id(), this);
593
1/2
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
29792 cvmfs::MsgStoreReply msg_reply;
594
1/2
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
29792 CacheTransport::Frame frame_send(&msg_reply);
595 29792 msg_reply.set_req_id(msg_req->req_id());
596 29792 msg_reply.set_part_nr(msg_req->part_nr());
597
1/2
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
29792 shash::Any object_id;
598
1/2
✓ Branch 2 taken 29792 times.
✗ Branch 3 not taken.
29792 bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
599
1/2
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
29792 if (!retval || (frame->att_size() > max_object_size_)
600
5/8
✓ Branch 0 taken 29792 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 147 times.
✓ Branch 4 taken 29645 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 147 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 29792 times.
59584 || ((frame->att_size() < max_object_size_) && !msg_req->last_part())) {
601 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
602 "malformed hash or bad object size received from client");
603 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
604 transport->SendFrame(&frame_send);
605 return;
606 }
607
608 29792 const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
609 uint64_t txn_id;
610 29792 cvmfs::EnumStatus status = cvmfs::STATUS_OK;
611
2/2
✓ Branch 1 taken 343 times.
✓ Branch 2 taken 29449 times.
29792 if (msg_req->part_nr() == 1) {
612
2/4
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 343 times.
343 if (txn_ids_.Contains(uniq_req)) {
613 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
614 "invalid attempt to restart running transaction");
615 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
616 transport->SendFrame(&frame_send);
617 return;
618 }
619 343 txn_id = NextTxnId();
620
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 ObjectInfo info;
621 343 info.id = object_id;
622
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 if (msg_req->has_expected_size()) {
623 343 info.size = msg_req->expected_size();
624 }
625
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 if (msg_req->has_object_type()) {
626 343 info.object_type = msg_req->object_type();
627 }
628
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 if (msg_req->has_description()) {
629
1/2
✓ Branch 2 taken 343 times.
✗ Branch 3 not taken.
343 info.description = msg_req->description();
630 }
631
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 status = StartTxn(object_id, txn_id, info);
632
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 343 times.
343 if (status != cvmfs::STATUS_OK) {
633 LogSessionError(msg_req->session_id(), status,
634 "failed to start transaction");
635 msg_reply.set_status(status);
636 transport->SendFrame(&frame_send);
637 return;
638 }
639
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 txn_ids_.Insert(uniq_req, txn_id);
640
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 } else {
641
1/2
✓ Branch 1 taken 29449 times.
✗ Branch 2 not taken.
29449 retval = txn_ids_.Lookup(uniq_req, &txn_id);
642
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29449 times.
29449 if (!retval) {
643 LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
644 "invalid transaction received from client");
645 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
646 transport->SendFrame(&frame_send);
647 return;
648 }
649 }
650
651 // TODO(jblomer): check part number and send objects up in order
652
2/2
✓ Branch 1 taken 29743 times.
✓ Branch 2 taken 49 times.
29792 if (frame->att_size() > 0) {
653
1/2
✓ Branch 2 taken 29743 times.
✗ Branch 3 not taken.
59486 status = WriteTxn(txn_id,
654 29743 reinterpret_cast<unsigned char *>(frame->attachment()),
655 frame->att_size());
656
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29743 times.
29743 if (status != cvmfs::STATUS_OK) {
657 LogSessionError(msg_req->session_id(), status, "failure writing object");
658 msg_reply.set_status(status);
659 transport->SendFrame(&frame_send);
660 return;
661 }
662 }
663
664
2/2
✓ Branch 1 taken 294 times.
✓ Branch 2 taken 29498 times.
29792 if (msg_req->last_part()) {
665
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 status = CommitTxn(txn_id);
666
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 294 times.
294 if (status != cvmfs::STATUS_OK) {
667 LogSessionError(msg_req->session_id(), status,
668 "failure committing object");
669 }
670
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 txn_ids_.Erase(uniq_req);
671 }
672
1/2
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
29792 msg_reply.set_status(status);
673
1/2
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
29792 transport->SendFrame(&frame_send);
674
3/6
✓ Branch 1 taken 29792 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 29792 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 29792 times.
✗ Branch 8 not taken.
29792 }
675
676
677 833 bool CachePlugin::IsRunning() { return atomic_read32(&running_) != 0; }
678
679
680 833 bool CachePlugin::Listen(const string &locator) {
681
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 vector<string> tokens = SplitString(locator, '=');
682
1/2
✓ Branch 2 taken 833 times.
✗ Branch 3 not taken.
833 if (tokens[0] == "unix") {
683
1/2
✓ Branch 2 taken 833 times.
✗ Branch 3 not taken.
833 const string lock_path = tokens[1] + ".lock";
684
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 fd_socket_lock_ = TryLockFile(lock_path);
685
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 833 times.
833 if (fd_socket_lock_ == -1) {
686 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
687 "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
688 NotifySupervisor(CacheTransport::kFailureNotification);
689 return false;
690
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 833 times.
833 } else if (fd_socket_lock_ == -2) {
691 // Another plugin process probably started in the meantime
692 NotifySupervisor(CacheTransport::kReadyNotification);
693 if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
694 LogCvmfs(kLogCache, kLogSyslogErr | kLogStderr,
695 "failed to lock on %s, file is busy", lock_path.c_str());
696 }
697 return false;
698 }
699
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 833 times.
833 assert(fd_socket_lock_ >= 0);
700
1/2
✓ Branch 2 taken 833 times.
✗ Branch 3 not taken.
833 fd_socket_ = MakeSocket(tokens[1], 0600);
701 833 is_local_ = true;
702
1/4
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
833 } else if (tokens[0] == "tcp") {
703 vector<string> tcp_address = SplitString(tokens[1], ':');
704 if (tcp_address.size() != 2) {
705 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, "invalid locator: %s",
706 locator.c_str());
707 NotifySupervisor(CacheTransport::kFailureNotification);
708 return false;
709 }
710 fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
711 } else {
712 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
713 "unknown endpoint in locator: %s", locator.c_str());
714 NotifySupervisor(CacheTransport::kFailureNotification);
715 return false;
716 }
717
718
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 833 times.
833 if (fd_socket_ < 0) {
719 if (errno == EADDRINUSE) {
720 // Another plugin process probably started in the meantime
721 NotifySupervisor(CacheTransport::kReadyNotification);
722 } else {
723 LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
724 "failed to create endpoint %s (%d)", locator.c_str(), errno);
725 NotifySupervisor(CacheTransport::kFailureNotification);
726 }
727 is_local_ = false;
728 return false;
729 }
730 833 const int retval = listen(fd_socket_, 32);
731
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 833 times.
833 assert(retval == 0);
732
733 833 return true;
734 833 }
735
736
737 196 void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
738
3/6
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 196 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 196 times.
✗ Branch 8 not taken.
392 string session_str("unidentified client (" + StringifyInt(session_id) + ")");
739
1/2
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
196 const map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(
740 196 session_id);
741
1/2
✓ Branch 3 taken 196 times.
✗ Branch 4 not taken.
196 if (iter != sessions_.end()) {
742
1/2
✓ Branch 2 taken 196 times.
✗ Branch 3 not taken.
196 session_str = iter->second.name;
743 }
744
1/2
✓ Branch 3 taken 196 times.
✗ Branch 4 not taken.
196 LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "session '%s': %s",
745 session_str.c_str(), msg.c_str());
746 196 }
747
748
749 147 void CachePlugin::LogSessionError(uint64_t session_id,
750 cvmfs::EnumStatus status,
751 const std::string &msg) {
752
3/6
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 147 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 147 times.
✗ Branch 8 not taken.
294 string session_str("unidentified client (" + StringifyInt(session_id) + ")");
753
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 const map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(
754 147 session_id);
755
1/2
✓ Branch 3 taken 147 times.
✗ Branch 4 not taken.
147 if (iter != sessions_.end()) {
756
1/2
✓ Branch 2 taken 147 times.
✗ Branch 3 not taken.
147 session_str = iter->second.name;
757 }
758
1/2
✓ Branch 4 taken 147 times.
✗ Branch 5 not taken.
147 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, "session '%s': %s (%d - %s)",
759 session_str.c_str(), msg.c_str(), status,
760 CacheTransportCode2Ascii(status));
761 147 }
762
763
764 833 void *CachePlugin::MainProcessRequests(void *data) {
765 833 CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
766
767 833 platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
768
769 833 vector<struct pollfd> watch_fds;
770 // Elements 0, 1: control pipe, socket fd
771 struct pollfd watch_ctrl;
772 833 watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
773 833 watch_ctrl.events = POLLIN | POLLPRI;
774
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 watch_fds.push_back(watch_ctrl);
775 struct pollfd watch_socket;
776 833 watch_socket.fd = cache_plugin->fd_socket_;
777 833 watch_socket.events = POLLIN | POLLPRI;
778
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 watch_fds.push_back(watch_socket);
779
780 833 bool terminated = false;
781
1/2
✓ Branch 0 taken 220304 times.
✗ Branch 1 not taken.
220304 while (!terminated) {
782
2/2
✓ Branch 1 taken 659197 times.
✓ Branch 2 taken 220304 times.
879501 for (unsigned i = 0; i < watch_fds.size(); ++i)
783 659197 watch_fds[i].revents = 0;
784
1/2
✓ Branch 3 taken 220304 times.
✗ Branch 4 not taken.
220304 const int retval = poll(&watch_fds[0], watch_fds.size(), -1);
785
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 220304 times.
220304 if (retval < 0) {
786 if (errno == EINTR)
787 continue;
788 PANIC(kLogSyslogErr | kLogDebug, "cache plugin connection failure (%d)",
789 errno);
790 }
791
792 // Termination or detach
793
2/2
✓ Branch 1 taken 49931 times.
✓ Branch 2 taken 170373 times.
220304 if (watch_fds[0].revents) {
794 char signal;
795
1/2
✓ Branch 2 taken 49931 times.
✗ Branch 3 not taken.
49931 ReadPipe(watch_fds[0].fd, &signal, 1);
796
2/2
✓ Branch 0 taken 49098 times.
✓ Branch 1 taken 833 times.
49931 if (signal == kSignalDetach) {
797
1/2
✓ Branch 1 taken 49098 times.
✗ Branch 2 not taken.
49098 cache_plugin->SendDetachRequests();
798 49098 continue;
799 }
800
801 // termination
802
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 833 times.
833 if (watch_fds.size() > 2) {
803 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
804 "terminating external cache manager with pending connections");
805 }
806 833 break;
807 }
808
809 // New connection
810
2/2
✓ Branch 1 taken 882 times.
✓ Branch 2 taken 169491 times.
170373 if (watch_fds[1].revents) {
811 struct sockaddr_un remote;
812 882 socklen_t socket_size = sizeof(remote);
813
1/2
✓ Branch 2 taken 882 times.
✗ Branch 3 not taken.
882 const int fd_con = accept(watch_fds[1].fd, (struct sockaddr *)&remote,
814 882 &socket_size);
815
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 882 times.
882 if (fd_con < 0) {
816 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
817 "failed to establish connection (%d)", errno);
818 continue;
819 }
820 struct pollfd watch_con;
821 882 watch_con.fd = fd_con;
822 882 watch_con.events = POLLIN | POLLPRI;
823
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 watch_fds.push_back(watch_con);
824
1/2
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
882 cache_plugin->connections_.insert(fd_con);
825 }
826
827 // New request
828
2/2
✓ Branch 1 taken 170373 times.
✓ Branch 2 taken 170373 times.
340746 for (unsigned i = 2; i < watch_fds.size();) {
829
2/2
✓ Branch 1 taken 169491 times.
✓ Branch 2 taken 882 times.
170373 if (watch_fds[i].revents) {
830
1/2
✓ Branch 2 taken 169491 times.
✗ Branch 3 not taken.
169491 const bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
831
2/2
✓ Branch 0 taken 882 times.
✓ Branch 1 taken 168609 times.
169491 if (!proceed) {
832
1/2
✓ Branch 2 taken 882 times.
✗ Branch 3 not taken.
882 close(watch_fds[i].fd);
833
1/2
✓ Branch 2 taken 882 times.
✗ Branch 3 not taken.
882 cache_plugin->connections_.erase(watch_fds[i].fd);
834
1/2
✓ Branch 4 taken 882 times.
✗ Branch 5 not taken.
882 watch_fds.erase(watch_fds.begin() + i);
835 882 if ((getenv(CacheTransport::kEnvReadyNotifyFd) != NULL)
836 && (cache_plugin->connections_.empty())
837
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 882 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 882 times.
882 && (cache_plugin->num_inlimbo_clients_ == 0)) {
838 LogCvmfs(kLogCache, kLogSyslog,
839 "stopping cache plugin, no more active clients");
840 terminated = true;
841 break;
842 }
843 } else {
844 168609 i++;
845 }
846 } else {
847 882 i++;
848 }
849 }
850 }
851
852 // 0, 1 being closed by destructor
853
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 833 times.
833 for (unsigned i = 2; i < watch_fds.size(); ++i)
854 close(watch_fds[i].fd);
855
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 cache_plugin->txn_ids_.Clear();
856
857 833 signal(SIGPIPE, save_sigpipe);
858 833 return NULL;
859 833 }
860
861
862 /**
863 * Used during startup to synchronize with the cvmfs client.
864 */
865 833 void CachePlugin::NotifySupervisor(char signal) {
866 833 char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
867
1/2
✓ Branch 0 taken 833 times.
✗ Branch 1 not taken.
833 if (pipe_ready == NULL)
868 833 return;
869 const int fd_pipe_ready = String2Int64(pipe_ready);
870 WritePipe(fd_pipe_ready, &signal, 1);
871 }
872
873
874 833 void CachePlugin::ProcessRequests(unsigned num_workers) {
875 833 num_workers_ = num_workers;
876 833 const int retval = pthread_create(&thread_io_, NULL, MainProcessRequests,
877 this);
878
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 833 times.
833 assert(retval == 0);
879 833 NotifySupervisor(CacheTransport::kReadyNotification);
880 833 atomic_cas32(&running_, 0, 1);
881 833 }
882
883
884 49098 void CachePlugin::SendDetachRequests() {
885 49098 set<int>::const_iterator iter = connections_.begin();
886 49098 const set<int>::const_iterator iter_end = connections_.end();
887
2/2
✓ Branch 2 taken 49098 times.
✓ Branch 3 taken 49098 times.
98196 for (; iter != iter_end; ++iter) {
888 49098 CacheTransport transport(*iter,
889 CacheTransport::kFlagSendIgnoreFailure
890
1/2
✓ Branch 1 taken 49098 times.
✗ Branch 2 not taken.
49098 | CacheTransport::kFlagSendNonBlocking);
891
1/2
✓ Branch 1 taken 49098 times.
✗ Branch 2 not taken.
49098 cvmfs::MsgDetach msg_detach;
892
1/2
✓ Branch 1 taken 49098 times.
✗ Branch 2 not taken.
49098 CacheTransport::Frame frame_send(&msg_detach);
893
1/2
✓ Branch 1 taken 49098 times.
✗ Branch 2 not taken.
49098 transport.SendFrame(&frame_send);
894 49098 }
895 49098 }
896
897
898 833 void CachePlugin::Terminate() {
899
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 if (IsRunning()) {
900 833 char terminate = kSignalTerminate;
901
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 WritePipe(pipe_ctrl_[1], &terminate, 1);
902
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 pthread_join(thread_io_, NULL);
903 833 atomic_cas32(&running_, 1, 0);
904 }
905 833 }
906
907
908 void CachePlugin::WaitFor() {
909 if (!IsRunning())
910 return;
911 pthread_join(thread_io_, NULL);
912 }
913