GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 553 720 76.8%
Branches: 319 717 44.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #include "cvmfs_config.h"
5 #include "cache_extern.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <stdint.h>
11 #include <sys/socket.h>
12 #include <unistd.h>
13
14 #include <algorithm>
15 #include <cassert>
16 #ifdef __APPLE__
17 #include <cstdlib>
18 #endif
19 #include <cstring>
20 #include <map>
21 #include <new>
22 #include <set>
23 #include <string>
24
25 #include "cache.pb.h"
26 #include "crypto/hash.h"
27 #include "util/atomic.h"
28 #include "util/concurrency.h"
29 #include "util/exception.h"
30 #include "util/logging.h"
31 #include "util/pointer.h"
32 #include "util/posix.h"
33 #ifdef __APPLE__
34 #include "util/smalloc.h"
35 #endif
36 #include "util/string.h"
37
38 using namespace std; // NOLINT
39
40 namespace {
41
42 6125 int Ack2Errno(cvmfs::EnumStatus status_code) {
43
4/12
✓ Branch 0 taken 6109 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 9 times.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 5 times.
✗ Branch 11 not taken.
6125 switch (status_code) {
44 6109 case cvmfs::STATUS_OK:
45 6109 return 0;
46 case cvmfs::STATUS_NOSUPPORT:
47 return -EOPNOTSUPP;
48 case cvmfs::STATUS_FORBIDDEN:
49 return -EPERM;
50 case cvmfs::STATUS_NOSPACE:
51 return -ENOSPC;
52 9 case cvmfs::STATUS_NOENTRY:
53 9 return -ENOENT;
54 2 case cvmfs::STATUS_MALFORMED:
55 2 return -EINVAL;
56 case cvmfs::STATUS_IOERR:
57 return -EIO;
58 case cvmfs::STATUS_CORRUPTED:
59 return -EIO;
60 case cvmfs::STATUS_TIMEOUT:
61 return -EIO;
62 case cvmfs::STATUS_BADCOUNT:
63 return -EINVAL;
64 5 case cvmfs::STATUS_OUTOFBOUNDS:
65 5 return -EINVAL;
66 default:
67 return -EIO;
68 }
69 }
70
71 } // anonymous namespace
72
73 const shash::Any ExternalCacheManager::kInvalidHandle;
74
75
76 int ExternalCacheManager::AbortTxn(void *txn) {
77 int result = Reset(txn);
78 #ifdef __APPLE__
79 free(reinterpret_cast<Transaction *>(txn)->buffer);
80 #endif
81 return result;
82 }
83
84
85 42 bool ExternalCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) {
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 assert(quota_mgr != NULL);
87 42 quota_mgr_ = quota_mgr;
88 18 LogCvmfs(kLogCache, kLogDebug, "set quota manager");
89 42 return true;
90 }
91
92
93 9105 void ExternalCacheManager::CallRemotely(ExternalCacheManager::RpcJob *rpc_job) {
94
2/2
✓ Branch 0 taken 7077 times.
✓ Branch 1 taken 2028 times.
9105 if (!spawned_) {
95 7077 transport_.SendFrame(rpc_job->frame_send());
96 7077 uint32_t save_att_size = rpc_job->frame_recv()->att_size();
97 bool again;
98
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7077 times.
7078 do {
99 7078 again = false;
100 7078 bool retval = transport_.RecvFrame(rpc_job->frame_recv());
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7078 times.
7078 assert(retval);
102
2/2
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 7077 times.
7078 if (rpc_job->frame_recv()->IsMsgOutOfBand()) {
103 google::protobuf::MessageLite *msg_typed =
104 1 rpc_job->frame_recv()->GetMsgTyped();
105
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 assert(msg_typed->GetTypeName() == "cvmfs.MsgDetach");
106
2/4
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
1 quota_mgr_->BroadcastBackchannels("R"); // release pinned catalogs
107 1 rpc_job->frame_recv()->Reset(save_att_size);
108 1 again = true;
109 }
110 } while (again);
111 } else {
112
1/2
✓ Branch 1 taken 2028 times.
✗ Branch 2 not taken.
2028 Signal signal;
113 {
114 2028 MutexLockGuard guard(lock_inflight_rpcs_);
115
1/2
✓ Branch 2 taken 2028 times.
✗ Branch 3 not taken.
2028 inflight_rpcs_.push_back(RpcInFlight(rpc_job, &signal));
116 2028 }
117 {
118 2028 MutexLockGuard guard(lock_send_fd_);
119
1/2
✓ Branch 2 taken 2028 times.
✗ Branch 3 not taken.
2028 transport_.SendFrame(rpc_job->frame_send());
120 2028 }
121
1/2
✓ Branch 1 taken 2028 times.
✗ Branch 2 not taken.
2028 signal.Wait();
122 2028 }
123 9105 }
124
125
126 3068 int ExternalCacheManager::ChangeRefcount(const shash::Any &id, int change_by) {
127
1/2
✓ Branch 1 taken 3068 times.
✗ Branch 2 not taken.
3068 cvmfs::MsgHash object_id;
128
1/2
✓ Branch 1 taken 3068 times.
✗ Branch 2 not taken.
3068 transport_.FillMsgHash(id, &object_id);
129
1/2
✓ Branch 1 taken 3068 times.
✗ Branch 2 not taken.
3068 cvmfs::MsgRefcountReq msg_refcount;
130 3068 msg_refcount.set_session_id(session_id_);
131 3068 msg_refcount.set_req_id(NextRequestId());
132 3068 msg_refcount.set_allocated_object_id(&object_id);
133 3068 msg_refcount.set_change_by(change_by);
134
1/2
✓ Branch 1 taken 3068 times.
✗ Branch 2 not taken.
3068 RpcJob rpc_job(&msg_refcount);
135
1/2
✓ Branch 1 taken 3068 times.
✗ Branch 2 not taken.
3068 CallRemotely(&rpc_job);
136 3068 msg_refcount.release_object_id();
137
138
1/2
✓ Branch 1 taken 3068 times.
✗ Branch 2 not taken.
3068 cvmfs::MsgRefcountReply *msg_reply = rpc_job.msg_refcount_reply();
139 6136 return Ack2Errno(msg_reply->status());
140 3068 }
141
142
143 513 int ExternalCacheManager::Close(int fd) {
144 513 ReadOnlyHandle handle;
145 {
146 513 WriteLockGuard guard(rwlock_fd_table_);
147
1/2
✓ Branch 1 taken 513 times.
✗ Branch 2 not taken.
513 handle = fd_table_.GetHandle(fd);
148
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 512 times.
513 if (handle.id == kInvalidHandle)
149 1 return -EBADF;
150
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 int retval = fd_table_.CloseFd(fd);
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 512 times.
512 assert(retval == 0);
152
2/2
✓ Branch 1 taken 512 times.
✓ Branch 2 taken 1 times.
513 }
153
154
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 return ChangeRefcount(handle.id, -1);
155 }
156
157
158 2038 int ExternalCacheManager::CommitTxn(void *txn) {
159 2038 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
160
1/2
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
6 LogCvmfs(kLogCache, kLogDebug, "committing %s",
161 12 transaction->id.ToString().c_str());
162 2038 int retval = Flush(true, transaction);
163
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2038 times.
2038 if (retval != 0)
164 return retval;
165
166 2038 int refcount = transaction->open_fds - 1;
167
2/2
✓ Branch 0 taken 2036 times.
✓ Branch 1 taken 2 times.
2038 if (refcount != 0)
168 2036 return ChangeRefcount(transaction->id, refcount);
169 #ifdef __APPLE__
170 free(transaction->buffer);
171 #endif
172 2 return 0;
173 }
174
175
176 int ExternalCacheManager::ConnectLocator(
177 const std::string &locator, bool print_error)
178 {
179 vector<string> tokens = SplitString(locator, '=');
180 int result = -1;
181 if (tokens[0] == "unix") {
182 result = ConnectSocket(tokens[1]);
183 } else if (tokens[0] == "tcp") {
184 vector<string> tcp_address = SplitString(tokens[1], ':');
185 if (tcp_address.size() != 2)
186 return -EINVAL;
187 result = ConnectTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
188 } else {
189 return -EINVAL;
190 }
191 if (result < 0) {
192 if (print_error) {
193 if (errno) {
194 LogCvmfs(kLogCache, kLogDebug | kLogStderr,
195 "Failed to connect to socket: %s", strerror(errno));
196 } else {
197 LogCvmfs(kLogCache, kLogDebug | kLogStderr,
198 "Failed to connect to socket (unknown error)");
199 }
200 }
201 return -EIO;
202 }
203 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
204 "connected to cache plugin at %s", locator.c_str());
205 return result;
206 }
207
208
209 44 ExternalCacheManager *ExternalCacheManager::Create(
210 int fd_connection,
211 unsigned max_open_fds,
212 const string &ident)
213 {
214 UniquePtr<ExternalCacheManager> cache_mgr(
215
3/6
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 44 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 44 times.
✗ Branch 8 not taken.
44 new ExternalCacheManager(fd_connection, max_open_fds));
216
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
44 assert(cache_mgr.IsValid());
217
218
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 cvmfs::MsgHandshake msg_handshake;
219 44 msg_handshake.set_protocol_version(kPbProtocolVersion);
220
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 msg_handshake.set_name(ident);
221
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 CacheTransport::Frame frame_send(&msg_handshake);
222
1/2
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
44 cache_mgr->transport_.SendFrame(&frame_send);
223
224
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 CacheTransport::Frame frame_recv;
225
1/2
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
44 bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
226
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 44 times.
44 if (!retval)
227 return NULL;
228
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
229
2/4
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 44 times.
44 if (msg_typed->GetTypeName() != "cvmfs.MsgHandshakeAck")
230 return NULL;
231 44 cvmfs::MsgHandshakeAck *msg_ack =
232 reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed);
233 44 cache_mgr->session_id_ = msg_ack->session_id();
234 44 cache_mgr->capabilities_ = msg_ack->capabilities();
235 44 cache_mgr->max_object_size_ = msg_ack->max_object_size();
236
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
44 assert(cache_mgr->max_object_size_ > 0);
237
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
44 if (cache_mgr->max_object_size_ > kMaxSupportedObjectSize) {
238 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
239 "external cache manager object size too large (%u)",
240 cache_mgr->max_object_size_);
241 return NULL;
242 }
243
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
44 if (cache_mgr->max_object_size_ < kMinSupportedObjectSize) {
244 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
245 "external cache manager object size too small (%u)",
246 cache_mgr->max_object_size_);
247 return NULL;
248 }
249
2/2
✓ Branch 1 taken 18 times.
✓ Branch 2 taken 26 times.
44 if (msg_ack->has_pid())
250 18 cache_mgr->pid_plugin_ = msg_ack->pid();
251 44 return cache_mgr.Release();
252 44 }
253
254
255 /**
256 * Tries to connect to the plugin at locator, or, if it doesn't exist, spawns
257 * a new plugin using cmdline. Two processes could try to spawn the plugin at
258 * the same time. In this case, the plugin should indicate to the client to
259 * retry connecting.
260 */
261 ExternalCacheManager::PluginHandle *ExternalCacheManager::CreatePlugin(
262 const std::string &locator,
263 const std::vector<std::string> &cmd_line)
264 {
265 UniquePtr<PluginHandle> plugin_handle(new PluginHandle());
266 unsigned num_attempts = 0;
267 bool try_again = false;
268 do {
269 num_attempts++;
270 if (num_attempts > 2) {
271 // Prevent violate busy loops
272 SafeSleepMs(1000);
273 }
274 plugin_handle->fd_connection_ = ConnectLocator(locator, num_attempts > 1);
275 if (plugin_handle->IsValid()) {
276 break;
277 } else if (plugin_handle->fd_connection_ == -EINVAL) {
278 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
279 "Invalid locator: %s", locator.c_str());
280 plugin_handle->error_msg_ = "Invalid locator: " + locator;
281 break;
282 } else {
283 if (num_attempts > 1) {
284 LogCvmfs(kLogCache, kLogDebug | kLogStderr,
285 "Failed to connect to external cache manager: %d",
286 plugin_handle->fd_connection_);
287 }
288 plugin_handle->error_msg_ = "Failed to connect to external cache manager";
289 }
290
291 try_again = SpawnPlugin(cmd_line);
292 } while (try_again);
293
294 return plugin_handle.Release();
295 }
296
297
298 2027 void ExternalCacheManager::CtrlTxn(
299 const Label &label,
300 const int flags,
301 void *txn)
302 {
303 2027 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
304 2027 transaction->label = label;
305 2027 transaction->label_modified = true;
306 2027 }
307
308
309 string ExternalCacheManager::Describe() {
310 return "External cache manager\n";
311 }
312
313
314 2 bool ExternalCacheManager::DoFreeState(void *data) {
315 2 FdTable<ReadOnlyHandle> *fd_table =
316 reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
317
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 delete fd_table;
318 2 return true;
319 }
320
321
322 522 int ExternalCacheManager::DoOpen(const shash::Any &id) {
323 522 int fd = -1;
324 {
325 522 WriteLockGuard guard(rwlock_fd_table_);
326
1/2
✓ Branch 2 taken 522 times.
✗ Branch 3 not taken.
522 fd = fd_table_.OpenFd(ReadOnlyHandle(id));
327
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 520 times.
522 if (fd < 0) {
328
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
329 strerror(-fd));
330 2 return fd;
331 }
332
2/2
✓ Branch 1 taken 520 times.
✓ Branch 2 taken 2 times.
522 }
333
334
1/2
✓ Branch 1 taken 520 times.
✗ Branch 2 not taken.
520 int status_refcnt = ChangeRefcount(id, 1);
335
2/2
✓ Branch 0 taken 510 times.
✓ Branch 1 taken 10 times.
520 if (status_refcnt == 0)
336 510 return fd;
337
338 10 WriteLockGuard guard(rwlock_fd_table_);
339
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 int retval = fd_table_.CloseFd(fd);
340
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(retval == 0);
341 10 return status_refcnt;
342 10 }
343
344
345 2 int ExternalCacheManager::DoRestoreState(void *data) {
346 // When DoRestoreState is called, we have fd 0 assigned to the root file
347 // catalog unless this is a lower layer cache in a tiered setup
348
2/2
✓ Branch 1 taken 254 times.
✓ Branch 2 taken 2 times.
256 for (unsigned i = 1; i < fd_table_.GetMaxFds(); ++i) {
349
3/6
✓ Branch 2 taken 254 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 254 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 254 times.
254 assert(fd_table_.GetHandle(i) == ReadOnlyHandle());
350 }
351
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ReadOnlyHandle handle_root = fd_table_.GetHandle(0);
352
353 2 FdTable<ReadOnlyHandle> *other =
354 reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
355
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 fd_table_.AssignFrom(*other);
356
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 cvmfs::MsgIoctl msg_ioctl;
357 2 msg_ioctl.set_session_id(session_id_);
358 2 msg_ioctl.set_conncnt_change_by(-1);
359
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 CacheTransport::Frame frame(&msg_ioctl);
360
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 transport_.SendFrame(&frame);
361
362 2 int new_root_fd = -1;
363
2/4
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
2 if (handle_root != ReadOnlyHandle()) {
364 new_root_fd = fd_table_.OpenFd(handle_root);
365 // There must be a free file descriptor because the root file catalog gets
366 // closed before a reload
367 assert(new_root_fd >= 0);
368 }
369 2 return new_root_fd;
370 2 }
371
372
373 2 void *ExternalCacheManager::DoSaveState() {
374
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 cvmfs::MsgIoctl msg_ioctl;
375 2 msg_ioctl.set_session_id(session_id_);
376 2 msg_ioctl.set_conncnt_change_by(1);
377
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 CacheTransport::Frame frame(&msg_ioctl);
378
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 transport_.SendFrame(&frame);
379
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
4 return fd_table_.Clone();
380 2 }
381
382
383 129 int ExternalCacheManager::Dup(int fd) {
384
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 shash::Any id = GetHandle(fd);
385
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 128 times.
129 if (id == kInvalidHandle)
386 1 return -EBADF;
387
1/2
✓ Branch 1 taken 128 times.
✗ Branch 2 not taken.
128 return DoOpen(id);
388 }
389
390
391 44 ExternalCacheManager::ExternalCacheManager(
392 int fd_connection,
393 44 unsigned max_open_fds)
394 44 : pid_plugin_(0)
395
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 , fd_table_(max_open_fds, ReadOnlyHandle())
396
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 , transport_(fd_connection)
397 44 , session_id_(-1)
398 44 , max_object_size_(0)
399 44 , spawned_(false)
400 44 , terminated_(false)
401 88 , capabilities_(cvmfs::CAP_NONE)
402 {
403 44 int retval = pthread_rwlock_init(&rwlock_fd_table_, NULL);
404
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 44 times.
44 assert(retval == 0);
405 44 retval = pthread_mutex_init(&lock_send_fd_, NULL);
406
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 44 times.
44 assert(retval == 0);
407 44 retval = pthread_mutex_init(&lock_inflight_rpcs_, NULL);
408
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 44 times.
44 assert(retval == 0);
409 44 memset(&thread_read_, 0, sizeof(thread_read_));
410 44 atomic_init64(&next_request_id_);
411 44 }
412
413
414 168 ExternalCacheManager::~ExternalCacheManager() {
415 84 terminated_ = true;
416 84 MemoryFence();
417
1/2
✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
84 if (session_id_ >= 0) {
418 84 cvmfs::MsgQuit msg_quit;
419 84 msg_quit.set_session_id(session_id_);
420 84 CacheTransport::Frame frame(&msg_quit);
421 84 transport_.SendFrame(&frame);
422 }
423 84 shutdown(transport_.fd_connection(), SHUT_RDWR);
424
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 41 times.
84 if (spawned_)
425 2 pthread_join(thread_read_, NULL);
426 84 close(transport_.fd_connection());
427 84 pthread_rwlock_destroy(&rwlock_fd_table_);
428 84 pthread_mutex_destroy(&lock_send_fd_);
429 84 pthread_mutex_destroy(&lock_inflight_rpcs_);
430 168 }
431
432
433 3024 int ExternalCacheManager::Flush(bool do_commit, Transaction *transaction) {
434
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3022 times.
3024 if (transaction->committed)
435 2 return 0;
436
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 LogCvmfs(kLogCache, kLogDebug, "flushing %u bytes for %s",
437
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
1216 transaction->buf_pos, transaction->id.ToString().c_str());
438
1/2
✓ Branch 1 taken 3022 times.
✗ Branch 2 not taken.
3022 cvmfs::MsgHash object_id;
439
1/2
✓ Branch 1 taken 3022 times.
✗ Branch 2 not taken.
3022 transport_.FillMsgHash(transaction->id, &object_id);
440
1/2
✓ Branch 1 taken 3022 times.
✗ Branch 2 not taken.
3022 cvmfs::MsgStoreReq msg_store;
441 3022 msg_store.set_session_id(session_id_);
442 3022 msg_store.set_req_id(transaction->transaction_id);
443 3022 msg_store.set_allocated_object_id(&object_id);
444 3022 msg_store.set_part_nr((transaction->size / max_object_size_) + 1);
445 3022 msg_store.set_expected_size(transaction->expected_size);
446 3022 msg_store.set_last_part(do_commit);
447
448
2/2
✓ Branch 0 taken 3014 times.
✓ Branch 1 taken 8 times.
3022 if (transaction->label_modified) {
449 cvmfs::EnumObjectType object_type;
450
1/2
✓ Branch 1 taken 3014 times.
✗ Branch 2 not taken.
3014 transport_.FillObjectType(transaction->label.flags, &object_type);
451
1/2
✓ Branch 1 taken 3014 times.
✗ Branch 2 not taken.
3014 msg_store.set_object_type(object_type);
452
2/4
✓ Branch 1 taken 3014 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3014 times.
✗ Branch 5 not taken.
3014 msg_store.set_description(transaction->label.GetDescription());
453 }
454
455
1/2
✓ Branch 1 taken 3022 times.
✗ Branch 2 not taken.
3022 RpcJob rpc_job(&msg_store);
456 3022 rpc_job.set_attachment_send(transaction->buffer, transaction->buf_pos);
457 // TODO(jblomer): allow for out of order chunk upload
458
1/2
✓ Branch 1 taken 3022 times.
✗ Branch 2 not taken.
3022 CallRemotely(&rpc_job);
459 3022 msg_store.release_object_id();
460
461
1/2
✓ Branch 1 taken 3022 times.
✗ Branch 2 not taken.
3022 cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
462
1/2
✓ Branch 1 taken 3022 times.
✗ Branch 2 not taken.
3022 if (msg_reply->status() == cvmfs::STATUS_OK) {
463 3022 transaction->flushed = true;
464
2/2
✓ Branch 0 taken 2038 times.
✓ Branch 1 taken 984 times.
3022 if (do_commit)
465 2038 transaction->committed = true;
466 }
467 3022 return Ack2Errno(msg_reply->status());
468 3022 }
469
470
471 417 shash::Any ExternalCacheManager::GetHandle(int fd) {
472 417 ReadLockGuard guard(rwlock_fd_table_);
473
1/2
✓ Branch 1 taken 417 times.
✗ Branch 2 not taken.
417 ReadOnlyHandle handle = fd_table_.GetHandle(fd);
474 417 return handle.id;
475 417 }
476
477
478 41 int64_t ExternalCacheManager::GetSize(int fd) {
479
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 shash::Any id = GetHandle(fd);
480
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 40 times.
41 if (id == kInvalidHandle)
481 1 return -EBADF;
482
483
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 cvmfs::MsgHash object_id;
484
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 transport_.FillMsgHash(id, &object_id);
485
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 cvmfs::MsgObjectInfoReq msg_info;
486 40 msg_info.set_session_id(session_id_);
487 40 msg_info.set_req_id(NextRequestId());
488 40 msg_info.set_allocated_object_id(&object_id);
489
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 RpcJob rpc_job(&msg_info);
490
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 CallRemotely(&rpc_job);
491 40 msg_info.release_object_id();
492
493
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 cvmfs::MsgObjectInfoReply *msg_reply = rpc_job.msg_object_info_reply();
494
2/2
✓ Branch 1 taken 39 times.
✓ Branch 2 taken 1 times.
40 if (msg_reply->status() == cvmfs::STATUS_OK) {
495
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 39 times.
39 assert(msg_reply->has_size());
496 39 return msg_reply->size();
497 }
498 1 return Ack2Errno(msg_reply->status());
499 40 }
500
501
502 1 void *ExternalCacheManager::MainRead(void *data) {
503 1 ExternalCacheManager *cache_mgr =
504 reinterpret_cast<ExternalCacheManager *>(data);
505 1 LogCvmfs(kLogCache, kLogDebug, "starting external cache reader thread");
506
507 1 unsigned char buffer[cache_mgr->max_object_size_];
508 while (true) {
509
1/2
✓ Branch 1 taken 3029 times.
✗ Branch 2 not taken.
3029 CacheTransport::Frame frame_recv;
510 3029 frame_recv.set_attachment(buffer, cache_mgr->max_object_size_);
511
1/2
✓ Branch 1 taken 3029 times.
✗ Branch 2 not taken.
3029 bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
512
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3028 times.
3029 if (!retval)
513 1 break;
514
515 uint64_t req_id;
516 3028 uint64_t part_nr = 0;
517
1/2
✓ Branch 1 taken 3028 times.
✗ Branch 2 not taken.
3028 google::protobuf::MessageLite *msg = frame_recv.GetMsgTyped();
518
3/4
✓ Branch 1 taken 3028 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 19 times.
✓ Branch 6 taken 3009 times.
3028 if (msg->GetTypeName() == "cvmfs.MsgRefcountReply") {
519 19 req_id = reinterpret_cast<cvmfs::MsgRefcountReply *>(msg)->req_id();
520
3/4
✓ Branch 1 taken 3009 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 9 times.
✓ Branch 6 taken 3000 times.
3009 } else if (msg->GetTypeName() == "cvmfs.MsgObjectInfoReply") {
521 9 req_id = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg)->req_id();
522
3/4
✓ Branch 1 taken 3000 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1800 times.
✓ Branch 6 taken 1200 times.
3000 } else if (msg->GetTypeName() == "cvmfs.MsgReadReply") {
523 1800 req_id = reinterpret_cast<cvmfs::MsgReadReply *>(msg)->req_id();
524
3/4
✓ Branch 1 taken 1200 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 200 times.
✓ Branch 6 taken 1000 times.
1200 } else if (msg->GetTypeName() == "cvmfs.MsgStoreReply") {
525 200 req_id = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->req_id();
526 200 part_nr = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->part_nr();
527
2/4
✓ Branch 1 taken 1000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1000 times.
1000 } else if (msg->GetTypeName() == "cvmfs.MsgInfoReply") {
528 req_id = reinterpret_cast<cvmfs::MsgInfoReply *>(msg)->req_id();
529
2/4
✓ Branch 1 taken 1000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1000 times.
1000 } else if (msg->GetTypeName() == "cvmfs.MsgShrinkReply") {
530 req_id = reinterpret_cast<cvmfs::MsgShrinkReply *>(msg)->req_id();
531
2/4
✓ Branch 1 taken 1000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1000 times.
1000 } else if (msg->GetTypeName() == "cvmfs.MsgListReply") {
532 req_id = reinterpret_cast<cvmfs::MsgListReply *>(msg)->req_id();
533
2/4
✓ Branch 1 taken 1000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1000 times.
1000 } else if (msg->GetTypeName() == "cvmfs.MsgBreadcrumbReply") {
534 req_id = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(msg)->req_id();
535
2/4
✓ Branch 1 taken 1000 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1000 times.
✗ Branch 6 not taken.
1000 } else if (msg->GetTypeName() == "cvmfs.MsgDetach") {
536 // Release pinned catalogs
537
2/4
✓ Branch 2 taken 1000 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1000 times.
✗ Branch 6 not taken.
1000 cache_mgr->quota_mgr_->BroadcastBackchannels("R");
538 1000 continue;
539 } else {
540 PANIC(kLogSyslogErr | kLogDebug, "unexpected message %s",
541 msg->GetTypeName().c_str());
542 }
543
544 2028 RpcInFlight rpc_inflight;
545 {
546 2028 MutexLockGuard guard(cache_mgr->lock_inflight_rpcs_);
547
1/2
✓ Branch 1 taken 2042 times.
✗ Branch 2 not taken.
2042 for (unsigned i = 0; i < cache_mgr->inflight_rpcs_.size(); ++i) {
548 2042 RpcJob *rpc_job = cache_mgr->inflight_rpcs_[i].rpc_job;
549
5/6
✓ Branch 1 taken 2028 times.
✓ Branch 2 taken 14 times.
✓ Branch 4 taken 2028 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2028 times.
✓ Branch 7 taken 14 times.
2042 if ((rpc_job->req_id() == req_id) && (rpc_job->part_nr() == part_nr)) {
550 2028 rpc_inflight = cache_mgr->inflight_rpcs_[i];
551
1/2
✓ Branch 2 taken 2028 times.
✗ Branch 3 not taken.
2028 cache_mgr->inflight_rpcs_.erase(
552 2028 cache_mgr->inflight_rpcs_.begin() + i);
553 2028 break;
554 }
555 }
556 2028 }
557
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2028 times.
2028 if (rpc_inflight.rpc_job == NULL) {
558 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
559 "got unmatched rpc reply");
560 continue;
561 }
562
1/2
✓ Branch 2 taken 2028 times.
✗ Branch 3 not taken.
2028 rpc_inflight.rpc_job->frame_recv()->MergeFrom(frame_recv);
563
1/2
✓ Branch 1 taken 2028 times.
✗ Branch 2 not taken.
2028 rpc_inflight.signal->Wakeup();
564
3/3
✓ Branch 1 taken 2028 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 1000 times.
6057 }
565
566
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!cache_mgr->terminated_) {
567 PANIC(kLogSyslogErr | kLogDebug,
568 "connection to external cache manager broken (%d)", errno);
569 }
570
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 LogCvmfs(kLogCache, kLogDebug, "stopping external cache reader thread");
571 1 return NULL;
572
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 }
573
574
575 394 int ExternalCacheManager::Open(const LabeledObject &object) {
576 394 return DoOpen(object.id);
577 }
578
579
580 2 int ExternalCacheManager::OpenFromTxn(void *txn) {
581 2 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
582 LogCvmfs(kLogCache, kLogDebug, "open fd for transaction %s",
583 transaction->id.ToString().c_str());
584 2 int retval = Flush(true, transaction);
585
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (retval != 0)
586 return retval;
587
588 2 int fd = -1;
589 {
590 2 WriteLockGuard guard(rwlock_fd_table_);
591
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 fd = fd_table_.OpenFd(ReadOnlyHandle(transaction->id));
592
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (fd < 0) {
593 LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
594 strerror(-fd));
595 return fd;
596 }
597
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 }
598 2 transaction->open_fds++;
599 2 return fd;
600 }
601
602
603 245 int64_t ExternalCacheManager::Pread(
604 int fd,
605 void *buf,
606 uint64_t size,
607 uint64_t offset)
608 {
609
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 shash::Any id = GetHandle(fd);
610
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 244 times.
245 if (id == kInvalidHandle)
611 1 return -EBADF;
612
613
1/2
✓ Branch 1 taken 244 times.
✗ Branch 2 not taken.
244 cvmfs::MsgHash object_id;
614
1/2
✓ Branch 1 taken 244 times.
✗ Branch 2 not taken.
244 transport_.FillMsgHash(id, &object_id);
615 244 uint64_t nbytes = 0;
616
2/2
✓ Branch 0 taken 2909 times.
✓ Branch 1 taken 230 times.
3139 while (nbytes < size) {
617 uint64_t batch_size =
618 2909 std::min(size - nbytes, static_cast<uint64_t>(max_object_size_));
619
1/2
✓ Branch 1 taken 2909 times.
✗ Branch 2 not taken.
2909 cvmfs::MsgReadReq msg_read;
620 2909 msg_read.set_session_id(session_id_);
621 2909 msg_read.set_req_id(NextRequestId());
622 2909 msg_read.set_allocated_object_id(&object_id);
623 2909 msg_read.set_offset(offset + nbytes);
624 2909 msg_read.set_size(batch_size);
625
1/2
✓ Branch 1 taken 2909 times.
✗ Branch 2 not taken.
2909 RpcJob rpc_job(&msg_read);
626 2909 rpc_job.set_attachment_recv(reinterpret_cast<char *>(buf) + nbytes,
627 batch_size);
628
1/2
✓ Branch 1 taken 2909 times.
✗ Branch 2 not taken.
2909 CallRemotely(&rpc_job);
629 2909 msg_read.release_object_id();
630
631
1/2
✓ Branch 1 taken 2909 times.
✗ Branch 2 not taken.
2909 cvmfs::MsgReadReply *msg_reply = rpc_job.msg_read_reply();
632
2/2
✓ Branch 1 taken 2904 times.
✓ Branch 2 taken 5 times.
2909 if (msg_reply->status() == cvmfs::STATUS_OK) {
633 2904 nbytes += rpc_job.frame_recv()->att_size();
634 // Fuse sends in rounded up buffers, so short reads are expected
635
2/2
✓ Branch 2 taken 9 times.
✓ Branch 3 taken 2895 times.
2904 if (rpc_job.frame_recv()->att_size() < batch_size)
636 9 return nbytes;
637 } else {
638 5 return Ack2Errno(msg_reply->status());
639 }
640
4/4
✓ Branch 1 taken 2895 times.
✓ Branch 2 taken 14 times.
✓ Branch 4 taken 2895 times.
✓ Branch 5 taken 14 times.
2923 }
641 230 return size;
642 244 }
643
644
645 2 int ExternalCacheManager::Readahead(int fd) {
646
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 shash::Any id = GetHandle(fd);
647
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
2 if (id == kInvalidHandle)
648 1 return -EBADF;
649 // No-op
650 1 return 0;
651 }
652
653
654 7 int ExternalCacheManager::Reset(void *txn) {
655 7 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
656 7 transaction->buf_pos = 0;
657 7 transaction->size = 0;
658 7 transaction->open_fds = 0;
659 7 transaction->committed = false;
660 7 transaction->label_modified = true;
661
662
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 times.
7 if (!transaction->flushed)
663 6 return 0;
664
665
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cvmfs::MsgHash object_id;
666
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 transport_.FillMsgHash(transaction->id, &object_id);
667
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cvmfs::MsgStoreAbortReq msg_abort;
668 1 msg_abort.set_session_id(session_id_);
669 1 msg_abort.set_req_id(transaction->transaction_id);
670 1 msg_abort.set_allocated_object_id(&object_id);
671
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 RpcJob rpc_job(&msg_abort);
672
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 CallRemotely(&rpc_job);
673 1 msg_abort.release_object_id();
674
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
675 1 transaction->transaction_id = NextRequestId();
676 1 transaction->flushed = false;
677 1 return Ack2Errno(msg_reply->status());
678 1 }
679
680
681 6 manifest::Breadcrumb ExternalCacheManager::LoadBreadcrumb(
682 const std::string &fqrn)
683 {
684
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
685 return manifest::Breadcrumb();
686
687
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 cvmfs::MsgBreadcrumbLoadReq msg_breadcrumb_load;
688 6 msg_breadcrumb_load.set_session_id(session_id_);
689 6 msg_breadcrumb_load.set_req_id(NextRequestId());
690
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 msg_breadcrumb_load.set_fqrn(fqrn);
691
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 RpcJob rpc_job(&msg_breadcrumb_load);
692
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 CallRemotely(&rpc_job);
693
694
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 manifest::Breadcrumb breadcrumb;
695
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
696
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3 times.
6 if (msg_reply->status() == cvmfs::STATUS_OK) {
697
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 assert(msg_reply->has_breadcrumb());
698
1/2
✗ Branch 3 not taken.
✓ Branch 4 taken 3 times.
3 assert(msg_reply->breadcrumb().fqrn() == fqrn);
699
1/2
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
3 bool rv = transport_.ParseMsgHash(msg_reply->breadcrumb().hash(),
700 &breadcrumb.catalog_hash);
701
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 assert(rv);
702 3 breadcrumb.catalog_hash.suffix = shash::kSuffixCatalog;
703 3 breadcrumb.timestamp = msg_reply->breadcrumb().timestamp();
704
1/2
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
3 if (msg_reply->breadcrumb().has_revision()) {
705 3 breadcrumb.revision = msg_reply->breadcrumb().revision();
706 } else {
707 breadcrumb.revision = 0;
708 }
709 }
710 6 return breadcrumb;
711 6 }
712
713
714 3 bool ExternalCacheManager::StoreBreadcrumb(const manifest::Manifest &manifest) {
715
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
716 return false;
717
718
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 cvmfs::MsgHash hash;
719
1/2
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
3 transport_.FillMsgHash(manifest.catalog_hash(), &hash);
720
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 cvmfs::MsgBreadcrumb breadcrumb;
721
2/4
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
3 breadcrumb.set_fqrn(manifest.repository_name());
722 3 breadcrumb.set_allocated_hash(&hash);
723 3 breadcrumb.set_timestamp(manifest.publish_timestamp());
724 3 breadcrumb.set_revision(manifest.revision());
725
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 cvmfs::MsgBreadcrumbStoreReq msg_breadcrumb_store;
726 3 msg_breadcrumb_store.set_session_id(session_id_);
727 3 msg_breadcrumb_store.set_req_id(NextRequestId());
728 3 msg_breadcrumb_store.set_allocated_breadcrumb(&breadcrumb);
729
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 RpcJob rpc_job(&msg_breadcrumb_store);
730
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 CallRemotely(&rpc_job);
731 3 msg_breadcrumb_store.release_breadcrumb();
732 3 breadcrumb.release_hash();
733
734
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
735 3 return msg_reply->status() == cvmfs::STATUS_OK;
736 3 }
737
738
739 1 void ExternalCacheManager::Spawn() {
740 1 int retval = pthread_create(&thread_read_, NULL, MainRead, this);
741
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == 0);
742 1 spawned_ = true;
743 1 }
744
745
746 /**
747 * Returns true if the plugin could be spawned or was spawned by another
748 * process.
749 */
750 bool ExternalCacheManager::SpawnPlugin(const vector<string> &cmd_line) {
751 if (cmd_line.empty())
752 return false;
753
754 int pipe_ready[2];
755 MakePipe(pipe_ready);
756 set<int> preserve_filedes;
757 preserve_filedes.insert(pipe_ready[1]);
758
759 int fd_null_read = open("/dev/null", O_RDONLY);
760 int fd_null_write = open("/dev/null", O_WRONLY);
761 assert((fd_null_read >= 0) && (fd_null_write >= 0));
762 map<int, int> map_fildes;
763 map_fildes[fd_null_read] = 0;
764 map_fildes[fd_null_write] = 1;
765 map_fildes[fd_null_write] = 2;
766
767 pid_t child_pid;
768 int retval = setenv(CacheTransport::kEnvReadyNotifyFd,
769 StringifyInt(pipe_ready[1]).c_str(), 1);
770 assert(retval == 0);
771 retval = ManagedExec(cmd_line,
772 preserve_filedes,
773 map_fildes,
774 false, // drop_credentials
775 false, // clear_env
776 true, // double fork
777 &child_pid);
778 unsetenv(CacheTransport::kEnvReadyNotifyFd);
779 close(fd_null_read);
780 close(fd_null_write);
781 if (!retval) {
782 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
783 "failed to start cache plugin '%s'",
784 JoinStrings(cmd_line, " ").c_str());
785 ClosePipe(pipe_ready);
786 return false;
787 }
788
789 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
790 "started cache plugin '%s' (pid %d), waiting for it to become ready",
791 JoinStrings(cmd_line, " ").c_str(), child_pid);
792 close(pipe_ready[1]);
793 char buf;
794 if (read(pipe_ready[0], &buf, 1) != 1) {
795 close(pipe_ready[0]);
796 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
797 "cache plugin did not start properly");
798 return false;
799 }
800 close(pipe_ready[0]);
801
802 if (buf == CacheTransport::kReadyNotification)
803 return true;
804 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
805 "cache plugin failed to create an endpoint");
806 return false;
807 }
808
809
810 2040 int ExternalCacheManager::StartTxn(
811 const shash::Any &id,
812 uint64_t size,
813 void *txn)
814 {
815
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2038 times.
2040 if (!(capabilities_ & cvmfs::CAP_WRITE))
816 2 return -EROFS;
817
818 2038 Transaction *transaction = new (txn) Transaction(id);
819 2038 transaction->expected_size = size;
820 2038 transaction->transaction_id = NextRequestId();
821 #ifdef __APPLE__
822 transaction->buffer =
823 reinterpret_cast<unsigned char *>(smalloc(max_object_size_));
824 #endif
825 2038 return 0;
826 }
827
828
829 2042 int64_t ExternalCacheManager::Write(const void *buf, uint64_t size, void *txn) {
830 2042 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
831
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2042 times.
2042 assert(!transaction->committed);
832
1/2
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
8 LogCvmfs(kLogCache, kLogDebug, "writing %" PRIu64 " bytes for %s",
833 16 size, transaction->id.ToString().c_str());
834
835
1/2
✓ Branch 0 taken 2042 times.
✗ Branch 1 not taken.
2042 if (transaction->expected_size != kSizeUnknown) {
836
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2042 times.
2042 if (transaction->size + size > transaction->expected_size) {
837 LogCvmfs(kLogCache, kLogDebug,
838 "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
839 transaction->size + size, transaction->expected_size);
840 return -EFBIG;
841 }
842 }
843
844 2042 uint64_t written = 0;
845 2042 const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
846
2/2
✓ Branch 0 taken 3023 times.
✓ Branch 1 taken 2042 times.
5065 while (written < size) {
847
2/2
✓ Branch 0 taken 984 times.
✓ Branch 1 taken 2039 times.
3023 if (transaction->buf_pos == max_object_size_) {
848 984 bool do_commit = false;
849
1/2
✓ Branch 0 taken 984 times.
✗ Branch 1 not taken.
984 if (transaction->expected_size != kSizeUnknown)
850 984 do_commit = (transaction->size + written) == transaction->expected_size;
851
1/2
✓ Branch 1 taken 984 times.
✗ Branch 2 not taken.
984 int retval = Flush(do_commit, transaction);
852
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 984 times.
984 if (retval != 0) {
853 transaction->size += written;
854 return retval;
855 }
856 984 transaction->size += transaction->buf_pos;
857 984 transaction->buf_pos = 0;
858 }
859 3023 uint64_t remaining = size - written;
860 3023 uint64_t space_in_buffer = max_object_size_ - transaction->buf_pos;
861 3023 uint64_t batch_size = std::min(remaining, space_in_buffer);
862 3023 memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
863 3023 transaction->buf_pos += batch_size;
864 3023 written += batch_size;
865 3023 read_pos += batch_size;
866 }
867 2042 return written;
868 }
869
870
871 //------------------------------------------------------------------------------
872
873
874 14 bool ExternalQuotaManager::DoListing(
875 cvmfs::EnumObjectType type,
876 vector<cvmfs::MsgListRecord> *result)
877 {
878
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14 times.
14 if (!(cache_mgr_->capabilities_ & cvmfs::CAP_LIST))
879 return false;
880
881 14 uint64_t listing_id = 0;
882 14 bool more_data = false;
883
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 14 times.
18 do {
884
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 cvmfs::MsgListReq msg_list;
885 18 msg_list.set_session_id(cache_mgr_->session_id_);
886 18 msg_list.set_req_id(cache_mgr_->NextRequestId());
887 18 msg_list.set_listing_id(listing_id);
888
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 msg_list.set_object_type(type);
889
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 ExternalCacheManager::RpcJob rpc_job(&msg_list);
890
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 cache_mgr_->CallRemotely(&rpc_job);
891
892
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 cvmfs::MsgListReply *msg_reply = rpc_job.msg_list_reply();
893
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18 times.
18 if (msg_reply->status() != cvmfs::STATUS_OK)
894 return false;
895 18 more_data = !msg_reply->is_last_part();
896 18 listing_id = msg_reply->listing_id();
897
3/4
✓ Branch 1 taken 204022 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 204004 times.
✓ Branch 4 taken 18 times.
204022 for (int i = 0; i < msg_reply->list_record_size(); ++i) {
898
2/4
✓ Branch 1 taken 204004 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 204004 times.
✗ Branch 5 not taken.
204004 result->push_back(msg_reply->list_record(i));
899 }
900
2/4
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18 times.
✗ Branch 5 not taken.
18 } while (more_data);
901
902 14 return true;
903 }
904
905
906 10 bool ExternalQuotaManager::Cleanup(const uint64_t leave_size) {
907
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 if (!(cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK))
908 return false;
909
910
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 cvmfs::MsgShrinkReq msg_shrink;
911 10 msg_shrink.set_session_id(cache_mgr_->session_id_);
912 10 msg_shrink.set_req_id(cache_mgr_->NextRequestId());
913 10 msg_shrink.set_shrink_to(leave_size);
914
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 ExternalCacheManager::RpcJob rpc_job(&msg_shrink);
915
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 cache_mgr_->CallRemotely(&rpc_job);
916
917
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 cvmfs::MsgShrinkReply *msg_reply = rpc_job.msg_shrink_reply();
918 10 return msg_reply->status() == cvmfs::STATUS_OK;
919 10 }
920
921
922 42 ExternalQuotaManager *ExternalQuotaManager::Create(
923 ExternalCacheManager *cache_mgr)
924 {
925 UniquePtr<ExternalQuotaManager> quota_mgr(
926
3/6
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 42 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 42 times.
✗ Branch 8 not taken.
42 new ExternalQuotaManager(cache_mgr));
927
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 assert(quota_mgr.IsValid());
928
929 84 return quota_mgr.Release();
930 42 }
931
932
933 28 int ExternalQuotaManager::GetInfo(QuotaInfo *quota_info) {
934
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28 times.
28 if (!(cache_mgr_->capabilities_ & cvmfs::CAP_INFO))
935 return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
936
937
1/2
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
28 cvmfs::MsgInfoReq msg_info;
938 28 msg_info.set_session_id(cache_mgr_->session_id_);
939 28 msg_info.set_req_id(cache_mgr_->NextRequestId());
940
1/2
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
28 ExternalCacheManager::RpcJob rpc_job(&msg_info);
941
1/2
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
28 cache_mgr_->CallRemotely(&rpc_job);
942
943
1/2
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
28 cvmfs::MsgInfoReply *msg_reply = rpc_job.msg_info_reply();
944
1/2
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
28 if (msg_reply->status() == cvmfs::STATUS_OK) {
945 28 quota_info->size = msg_reply->size_bytes();
946 28 quota_info->used = msg_reply->used_bytes();
947 28 quota_info->pinned = msg_reply->pinned_bytes();
948
1/2
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
28 if (msg_reply->no_shrink() >= 0)
949 28 quota_info->no_shrink = msg_reply->no_shrink();
950 }
951 28 return Ack2Errno(msg_reply->status());
952 28 }
953
954
955 5 uint64_t ExternalQuotaManager::GetCapacity() {
956 5 QuotaInfo info;
957
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 int retval = GetInfo(&info);
958
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (retval != 0)
959 return uint64_t(-1);
960 5 return info.size;
961 }
962
963
964 uint64_t ExternalQuotaManager::GetCleanupRate(uint64_t period_s) {
965 QuotaInfo info;
966 int retval = GetInfo(&info);
967 if (retval != 0)
968 return 0;
969 return info.no_shrink;
970 }
971
972
973 15 uint64_t ExternalQuotaManager::GetSize() {
974 15 QuotaInfo info;
975
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 int retval = GetInfo(&info);
976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 if (retval != 0)
977 return 0;
978 15 return info.used;
979 }
980
981
982 8 uint64_t ExternalQuotaManager::GetSizePinned() {
983 8 QuotaInfo info;
984
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 int retval = GetInfo(&info);
985
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (retval != 0)
986 return 0;
987 8 return info.pinned;
988 }
989
990
991 bool ExternalQuotaManager::HasCapability(Capabilities capability) {
992 switch (capability) {
993 case kCapIntrospectSize:
994 return cache_mgr_->capabilities_ & cvmfs::CAP_INFO;
995 case kCapIntrospectCleanupRate:
996 return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK_RATE;
997 case kCapList:
998 return cache_mgr_->capabilities_ & cvmfs::CAP_LIST;
999 case kCapShrink:
1000 return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK;
1001 case kCapListeners:
1002 return true;
1003 default:
1004 return false;
1005 }
1006 }
1007
1008
1009 3 vector<string> ExternalQuotaManager::List() {
1010 3 vector<string> result;
1011 3 vector<cvmfs::MsgListRecord> raw_list;
1012
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
1013
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!retval)
1014 return result;
1015
2/2
✓ Branch 1 taken 102002 times.
✓ Branch 2 taken 3 times.
102005 for (unsigned i = 0; i < raw_list.size(); ++i)
1016
1/2
✓ Branch 3 taken 102002 times.
✗ Branch 4 not taken.
102002 result.push_back(raw_list[i].description());
1017 3 return result;
1018 3 }
1019
1020
1021 1 vector<string> ExternalQuotaManager::ListCatalogs() {
1022 1 vector<string> result;
1023 1 vector<cvmfs::MsgListRecord> raw_list;
1024
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 bool retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
1025
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!retval)
1026 return result;
1027
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 for (unsigned i = 0; i < raw_list.size(); ++i)
1028 result.push_back(raw_list[i].description());
1029 1 return result;
1030 1 }
1031
1032
1033 3 vector<string> ExternalQuotaManager::ListPinned() {
1034 3 vector<string> result;
1035
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 3 times.
24 vector<cvmfs::MsgListRecord> raw_lists[3];
1036
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
1037
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!retval)
1038 return result;
1039
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
1040
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!retval)
1041 return result;
1042
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
1043
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!retval)
1044 return result;
1045
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 3 times.
12 for (unsigned i = 0; i < sizeof(raw_lists) / sizeof(raw_lists[0]); ++i) {
1046
2/2
✓ Branch 1 taken 102002 times.
✓ Branch 2 taken 9 times.
102011 for (unsigned j = 0; j < raw_lists[i].size(); ++j) {
1047
2/2
✓ Branch 2 taken 200 times.
✓ Branch 3 taken 101802 times.
102002 if (raw_lists[i][j].pinned())
1048
1/2
✓ Branch 3 taken 200 times.
✗ Branch 4 not taken.
200 result.push_back(raw_lists[i][j].description());
1049 }
1050 }
1051 3 return result;
1052
2/4
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
12 }
1053
1054
1055 1 vector<string> ExternalQuotaManager::ListVolatile() {
1056 1 vector<string> result;
1057 1 vector<cvmfs::MsgListRecord> raw_list;
1058
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 bool retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
1059
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!retval)
1060 return result;
1061
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 for (unsigned i = 0; i < raw_list.size(); ++i)
1062 result.push_back(raw_list[i].description());
1063 1 return result;
1064 1 }
1065
1066
1067 1 void ExternalQuotaManager::RegisterBackChannel(
1068 int back_channel[2],
1069 const string &channel_id)
1070 {
1071
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1072
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 MakePipe(back_channel);
1073 1 LockBackChannels();
1074
2/4
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1 times.
1 assert(back_channels_.find(hash_id) == back_channels_.end());
1075
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 back_channels_[hash_id] = back_channel[1];
1076 1 UnlockBackChannels();
1077 1 }
1078
1079
1080 1 void ExternalQuotaManager::UnregisterBackChannel(
1081 int back_channel[2],
1082 const string &channel_id)
1083 {
1084
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1085 1 LockBackChannels();
1086
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 back_channels_.erase(hash_id);
1087 1 UnlockBackChannels();
1088
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 ClosePipe(back_channel);
1089 1 }
1090