GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.cc
Date: 2025-11-30 02:35:17
Exec Total Coverage
Lines: 554 720 76.9%
Branches: 319 717 44.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "cache_extern.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <stdint.h>
11 #include <sys/socket.h>
12 #include <unistd.h>
13
14 #include <algorithm>
15 #include <cassert>
16 #ifdef __APPLE__
17 #include <cstdlib>
18 #endif
19 #include <cstring>
20 #include <map>
21 #include <new>
22 #include <set>
23 #include <string>
24
25 #include "cache.pb.h"
26 #include "crypto/hash.h"
27 #include "util/atomic.h"
28 #include "util/concurrency.h"
29 #include "util/exception.h"
30 #include "util/logging.h"
31 #include "util/pointer.h"
32 #include "util/posix.h"
33 #ifdef __APPLE__
34 #include "util/smalloc.h"
35 #endif
36 #include "util/string.h"
37
38 using namespace std; // NOLINT
39
40 namespace {
41
42 59180 int Ack2Errno(cvmfs::EnumStatus status_code) {
43
4/12
✓ Branch 0 taken 58984 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 54 times.
✓ Branch 5 taken 92 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 50 times.
✗ Branch 11 not taken.
59180 switch (status_code) {
44 58984 case cvmfs::STATUS_OK:
45 58984 return 0;
46 case cvmfs::STATUS_NOSUPPORT:
47 return -EOPNOTSUPP;
48 case cvmfs::STATUS_FORBIDDEN:
49 return -EPERM;
50 case cvmfs::STATUS_NOSPACE:
51 return -ENOSPC;
52 54 case cvmfs::STATUS_NOENTRY:
53 54 return -ENOENT;
54 92 case cvmfs::STATUS_MALFORMED:
55 92 return -EINVAL;
56 case cvmfs::STATUS_IOERR:
57 return -EIO;
58 case cvmfs::STATUS_CORRUPTED:
59 return -EIO;
60 case cvmfs::STATUS_TIMEOUT:
61 return -EIO;
62 case cvmfs::STATUS_BADCOUNT:
63 return -EINVAL;
64 50 case cvmfs::STATUS_OUTOFBOUNDS:
65 50 return -EINVAL;
66 default:
67 return -EIO;
68 }
69 }
70
71 } // anonymous namespace
72
73 const shash::Any ExternalCacheManager::kInvalidHandle;
74
75
76 int ExternalCacheManager::AbortTxn(void *txn) {
77 const int result = Reset(txn);
78 #ifdef __APPLE__
79 free(reinterpret_cast<Transaction *>(txn)->buffer);
80 #endif
81 return result;
82 }
83
84
85 852 bool ExternalCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) {
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 852 times.
852 assert(quota_mgr != NULL);
87 852 quota_mgr_ = quota_mgr;
88 828 LogCvmfs(kLogCache, kLogDebug, "set quota manager");
89 852 return true;
90 }
91
92
93 162904 void ExternalCacheManager::CallRemotely(ExternalCacheManager::RpcJob *rpc_job) {
94
2/2
✓ Branch 0 taken 69662 times.
✓ Branch 1 taken 93242 times.
162904 if (!spawned_) {
95 69662 transport_.SendFrame(rpc_job->frame_send());
96 69662 const uint32_t save_att_size = rpc_job->frame_recv()->att_size();
97 bool again;
98
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 69662 times.
69708 do {
99 69708 again = false;
100 69708 const bool retval = transport_.RecvFrame(rpc_job->frame_recv());
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 69708 times.
69708 assert(retval);
102
2/2
✓ Branch 2 taken 46 times.
✓ Branch 3 taken 69662 times.
69708 if (rpc_job->frame_recv()->IsMsgOutOfBand()) {
103 google::protobuf::MessageLite *msg_typed = rpc_job->frame_recv()
104 46 ->GetMsgTyped();
105
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 46 times.
46 assert(msg_typed->GetTypeName() == "cvmfs.MsgDetach");
106
2/4
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 46 times.
✗ Branch 6 not taken.
46 quota_mgr_->BroadcastBackchannels("R"); // release pinned catalogs
107 46 rpc_job->frame_recv()->Reset(save_att_size);
108 46 again = true;
109 }
110 } while (again);
111 } else {
112
1/2
✓ Branch 1 taken 93288 times.
✗ Branch 2 not taken.
93242 Signal signal;
113 {
114 93288 const MutexLockGuard guard(lock_inflight_rpcs_);
115
1/2
✓ Branch 2 taken 93288 times.
✗ Branch 3 not taken.
93288 inflight_rpcs_.push_back(RpcInFlight(rpc_job, &signal));
116 93288 }
117 {
118 93288 const MutexLockGuard guard(lock_send_fd_);
119
1/2
✓ Branch 2 taken 93288 times.
✗ Branch 3 not taken.
93288 transport_.SendFrame(rpc_job->frame_send());
120 93288 }
121
1/2
✓ Branch 1 taken 93288 times.
✗ Branch 2 not taken.
93288 signal.Wait();
122 93288 }
123 162950 }
124
125
126 28448 int ExternalCacheManager::ChangeRefcount(const shash::Any &id, int change_by) {
127
1/2
✓ Branch 1 taken 28448 times.
✗ Branch 2 not taken.
28448 cvmfs::MsgHash object_id;
128
1/2
✓ Branch 1 taken 28448 times.
✗ Branch 2 not taken.
28448 transport_.FillMsgHash(id, &object_id);
129
1/2
✓ Branch 1 taken 28448 times.
✗ Branch 2 not taken.
28448 cvmfs::MsgRefcountReq msg_refcount;
130 28448 msg_refcount.set_session_id(session_id_);
131 28448 msg_refcount.set_req_id(NextRequestId());
132 28448 msg_refcount.set_allocated_object_id(&object_id);
133 28448 msg_refcount.set_change_by(change_by);
134
1/2
✓ Branch 1 taken 28448 times.
✗ Branch 2 not taken.
28448 RpcJob rpc_job(&msg_refcount);
135
1/2
✓ Branch 1 taken 28448 times.
✗ Branch 2 not taken.
28448 CallRemotely(&rpc_job);
136 28448 msg_refcount.release_object_id();
137
138
1/2
✓ Branch 1 taken 28448 times.
✗ Branch 2 not taken.
28448 cvmfs::MsgRefcountReply *msg_reply = rpc_job.msg_refcount_reply();
139 56896 return Ack2Errno(msg_reply->status());
140 28448 }
141
142
143 13068 int ExternalCacheManager::Close(int fd) {
144 13068 ReadOnlyHandle handle;
145 {
146 13068 const WriteLockGuard guard(rwlock_fd_table_);
147
1/2
✓ Branch 1 taken 13068 times.
✗ Branch 2 not taken.
13068 handle = fd_table_.GetHandle(fd);
148
2/2
✓ Branch 1 taken 46 times.
✓ Branch 2 taken 13022 times.
13068 if (handle.id == kInvalidHandle)
149 46 return -EBADF;
150
1/2
✓ Branch 1 taken 13022 times.
✗ Branch 2 not taken.
13022 const int retval = fd_table_.CloseFd(fd);
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13022 times.
13022 assert(retval == 0);
152
2/2
✓ Branch 1 taken 13022 times.
✓ Branch 2 taken 46 times.
13068 }
153
154
1/2
✓ Branch 1 taken 13022 times.
✗ Branch 2 not taken.
13022 return ChangeRefcount(handle.id, -1);
155 }
156
157
158 2308 int ExternalCacheManager::CommitTxn(void *txn) {
159 2308 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
160
1/2
✓ Branch 2 taken 276 times.
✗ Branch 3 not taken.
276 LogCvmfs(kLogCache, kLogDebug, "committing %s",
161 552 std::string(transaction->id.ToString()).c_str());
162 2308 const int retval = Flush(true, transaction);
163
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2308 times.
2308 if (retval != 0)
164 return retval;
165
166 2308 const int refcount = transaction->open_fds - 1;
167
2/2
✓ Branch 0 taken 2306 times.
✓ Branch 1 taken 2 times.
2308 if (refcount != 0)
168 2306 return ChangeRefcount(transaction->id, refcount);
169 #ifdef __APPLE__
170 free(transaction->buffer);
171 #endif
172 2 return 0;
173 }
174
175
176 int ExternalCacheManager::ConnectLocator(const std::string &locator,
177 bool print_error) {
178 vector<string> tokens = SplitString(locator, '=');
179 int result = -1;
180 if (tokens[0] == "unix") {
181 result = ConnectSocket(tokens[1]);
182 } else if (tokens[0] == "tcp") {
183 vector<string> tcp_address = SplitString(tokens[1], ':');
184 if (tcp_address.size() != 2)
185 return -EINVAL;
186 result = ConnectTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
187 } else {
188 return -EINVAL;
189 }
190 if (result < 0) {
191 if (print_error) {
192 if (errno) {
193 LogCvmfs(kLogCache, kLogDebug | kLogStderr,
194 "Failed to connect to socket: %s", strerror(errno));
195 } else {
196 LogCvmfs(kLogCache, kLogDebug | kLogStderr,
197 "Failed to connect to socket (unknown error)");
198 }
199 }
200 return -EIO;
201 }
202 LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "connected to cache plugin at %s",
203 locator.c_str());
204 return result;
205 }
206
207
208 854 ExternalCacheManager *ExternalCacheManager::Create(int fd_connection,
209 unsigned max_open_fds,
210 const string &ident) {
211 UniquePtr<ExternalCacheManager> cache_mgr(
212
3/6
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 854 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 854 times.
✗ Branch 8 not taken.
854 new ExternalCacheManager(fd_connection, max_open_fds));
213
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 854 times.
854 assert(cache_mgr.IsValid());
214
215
1/2
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
854 cvmfs::MsgHandshake msg_handshake;
216 854 msg_handshake.set_protocol_version(kPbProtocolVersion);
217
1/2
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
854 msg_handshake.set_name(ident);
218
1/2
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
854 CacheTransport::Frame frame_send(&msg_handshake);
219
1/2
✓ Branch 2 taken 854 times.
✗ Branch 3 not taken.
854 cache_mgr->transport_.SendFrame(&frame_send);
220
221
1/2
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
854 CacheTransport::Frame frame_recv;
222
1/2
✓ Branch 2 taken 854 times.
✗ Branch 3 not taken.
854 const bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
223
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 854 times.
854 if (!retval)
224 return NULL;
225
1/2
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
854 google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
226
2/4
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 854 times.
854 if (msg_typed->GetTypeName() != "cvmfs.MsgHandshakeAck")
227 return NULL;
228 854 cvmfs::MsgHandshakeAck *msg_ack = reinterpret_cast<cvmfs::MsgHandshakeAck *>(
229 msg_typed);
230 854 cache_mgr->session_id_ = msg_ack->session_id();
231 854 cache_mgr->capabilities_ = msg_ack->capabilities();
232 854 cache_mgr->max_object_size_ = msg_ack->max_object_size();
233
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 854 times.
854 assert(cache_mgr->max_object_size_ > 0);
234
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 854 times.
854 if (cache_mgr->max_object_size_ > kMaxSupportedObjectSize) {
235 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
236 "external cache manager object size too large (%u)",
237 cache_mgr->max_object_size_);
238 return NULL;
239 }
240
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 854 times.
854 if (cache_mgr->max_object_size_ < kMinSupportedObjectSize) {
241 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
242 "external cache manager object size too small (%u)",
243 cache_mgr->max_object_size_);
244 return NULL;
245 }
246
2/2
✓ Branch 1 taken 828 times.
✓ Branch 2 taken 26 times.
854 if (msg_ack->has_pid())
247 828 cache_mgr->pid_plugin_ = msg_ack->pid();
248 854 return cache_mgr.Release();
249 854 }
250
251
252 /**
253 * Tries to connect to the plugin at locator, or, if it doesn't exist, spawns
254 * a new plugin using cmdline. Two processes could try to spawn the plugin at
255 * the same time. In this case, the plugin should indicate to the client to
256 * retry connecting.
257 */
258 ExternalCacheManager::PluginHandle *ExternalCacheManager::CreatePlugin(
259 const std::string &locator, const std::vector<std::string> &cmd_line) {
260 UniquePtr<PluginHandle> plugin_handle(new PluginHandle());
261 unsigned num_attempts = 0;
262 bool try_again = false;
263 do {
264 num_attempts++;
265 if (num_attempts > 2) {
266 // Prevent violate busy loops
267 SafeSleepMs(1000);
268 }
269 plugin_handle->fd_connection_ = ConnectLocator(locator, num_attempts > 1);
270 if (plugin_handle->IsValid()) {
271 break;
272 } else if (plugin_handle->fd_connection_ == -EINVAL) {
273 LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "Invalid locator: %s",
274 locator.c_str());
275 plugin_handle->error_msg_ = "Invalid locator: " + locator;
276 break;
277 } else {
278 if (num_attempts > 1) {
279 LogCvmfs(kLogCache, kLogDebug | kLogStderr,
280 "Failed to connect to external cache manager: %d",
281 plugin_handle->fd_connection_);
282 }
283 plugin_handle->error_msg_ = "Failed to connect to external cache manager";
284 }
285
286 try_again = SpawnPlugin(cmd_line);
287 } while (try_again);
288
289 return plugin_handle.Release();
290 }
291
292
293 2252 void ExternalCacheManager::CtrlTxn(const Label &label,
294 const int flags,
295 void *txn) {
296 2252 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
297 2252 transaction->label = label;
298 2252 transaction->label_modified = true;
299 2252 }
300
301
302 string ExternalCacheManager::Describe() { return "External cache manager\n"; }
303
304
305 92 bool ExternalCacheManager::DoFreeState(void *data) {
306 FdTable<ReadOnlyHandle>
307 92 *fd_table = reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
308
1/2
✓ Branch 0 taken 92 times.
✗ Branch 1 not taken.
92 delete fd_table;
309 92 return true;
310 }
311
312
313 13212 int ExternalCacheManager::DoOpen(const shash::Any &id) {
314 13212 int fd = -1;
315 {
316 13212 const WriteLockGuard guard(rwlock_fd_table_);
317
1/2
✓ Branch 2 taken 13212 times.
✗ Branch 3 not taken.
13212 fd = fd_table_.OpenFd(ReadOnlyHandle(id));
318
2/2
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 13120 times.
13212 if (fd < 0) {
319
1/2
✓ Branch 2 taken 92 times.
✗ Branch 3 not taken.
92 LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
320 strerror(-fd));
321 92 return fd;
322 }
323
2/2
✓ Branch 1 taken 13120 times.
✓ Branch 2 taken 92 times.
13212 }
324
325
1/2
✓ Branch 1 taken 13120 times.
✗ Branch 2 not taken.
13120 const int status_refcnt = ChangeRefcount(id, 1);
326
2/2
✓ Branch 0 taken 13020 times.
✓ Branch 1 taken 100 times.
13120 if (status_refcnt == 0)
327 13020 return fd;
328
329 100 const WriteLockGuard guard(rwlock_fd_table_);
330
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 const int retval = fd_table_.CloseFd(fd);
331
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 100 times.
100 assert(retval == 0);
332 100 return status_refcnt;
333 100 }
334
335
336 92 int ExternalCacheManager::DoRestoreState(void *data) {
337 // When DoRestoreState is called, we have fd 0 assigned to the root file
338 // catalog unless this is a lower layer cache in a tiered setup
339
2/2
✓ Branch 1 taken 11684 times.
✓ Branch 2 taken 92 times.
11776 for (unsigned i = 1; i < fd_table_.GetMaxFds(); ++i) {
340
3/6
✓ Branch 2 taken 11684 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 11684 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 11684 times.
11684 assert(fd_table_.GetHandle(i) == ReadOnlyHandle());
341 }
342
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 const ReadOnlyHandle handle_root = fd_table_.GetHandle(0);
343
344 92 FdTable<ReadOnlyHandle> *other = reinterpret_cast<FdTable<ReadOnlyHandle> *>(
345 data);
346
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 fd_table_.AssignFrom(*other);
347
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 cvmfs::MsgIoctl msg_ioctl;
348 92 msg_ioctl.set_session_id(session_id_);
349 92 msg_ioctl.set_conncnt_change_by(-1);
350
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 CacheTransport::Frame frame(&msg_ioctl);
351
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 transport_.SendFrame(&frame);
352
353 92 int new_root_fd = -1;
354
2/4
✓ Branch 2 taken 92 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 92 times.
92 if (handle_root != ReadOnlyHandle()) {
355 new_root_fd = fd_table_.OpenFd(handle_root);
356 // There must be a free file descriptor because the root file catalog gets
357 // closed before a reload
358 assert(new_root_fd >= 0);
359 }
360 92 return new_root_fd;
361 92 }
362
363
364 92 void *ExternalCacheManager::DoSaveState() {
365
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 cvmfs::MsgIoctl msg_ioctl;
366 92 msg_ioctl.set_session_id(session_id_);
367 92 msg_ioctl.set_conncnt_change_by(1);
368
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 CacheTransport::Frame frame(&msg_ioctl);
369
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 transport_.SendFrame(&frame);
370
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
184 return fd_table_.Clone();
371 92 }
372
373
374 5934 int ExternalCacheManager::Dup(int fd) {
375
1/2
✓ Branch 1 taken 5934 times.
✗ Branch 2 not taken.
5934 const shash::Any id = GetHandle(fd);
376
2/2
✓ Branch 1 taken 46 times.
✓ Branch 2 taken 5888 times.
5934 if (id == kInvalidHandle)
377 46 return -EBADF;
378
1/2
✓ Branch 1 taken 5888 times.
✗ Branch 2 not taken.
5888 return DoOpen(id);
379 }
380
381
382 854 ExternalCacheManager::ExternalCacheManager(int fd_connection,
383 854 unsigned max_open_fds)
384 854 : pid_plugin_(0)
385
1/2
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
854 , fd_table_(max_open_fds, ReadOnlyHandle())
386
1/2
✓ Branch 1 taken 854 times.
✗ Branch 2 not taken.
854 , transport_(fd_connection)
387 854 , session_id_(-1)
388 854 , max_object_size_(0)
389 854 , spawned_(false)
390 854 , terminated_(false)
391 1708 , capabilities_(cvmfs::CAP_NONE) {
392 854 int retval = pthread_rwlock_init(&rwlock_fd_table_, NULL);
393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 854 times.
854 assert(retval == 0);
394 854 retval = pthread_mutex_init(&lock_send_fd_, NULL);
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 854 times.
854 assert(retval == 0);
396 854 retval = pthread_mutex_init(&lock_inflight_rpcs_, NULL);
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 854 times.
854 assert(retval == 0);
398 854 memset(&thread_read_, 0, sizeof(thread_read_));
399 854 atomic_init64(&next_request_id_);
400 854 }
401
402
403 3408 ExternalCacheManager::~ExternalCacheManager() {
404 1704 terminated_ = true;
405 1704 MemoryFence();
406
1/2
✓ Branch 0 taken 852 times.
✗ Branch 1 not taken.
1704 if (session_id_ >= 0) {
407 1704 cvmfs::MsgQuit msg_quit;
408 1704 msg_quit.set_session_id(session_id_);
409 1704 CacheTransport::Frame frame(&msg_quit);
410 1704 transport_.SendFrame(&frame);
411 }
412 1704 shutdown(transport_.fd_connection(), SHUT_RDWR);
413
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 806 times.
1704 if (spawned_)
414 92 pthread_join(thread_read_, NULL);
415 1704 close(transport_.fd_connection());
416 1704 pthread_rwlock_destroy(&rwlock_fd_table_);
417 1704 pthread_mutex_destroy(&lock_send_fd_);
418 1704 pthread_mutex_destroy(&lock_inflight_rpcs_);
419 3408 }
420
421
422 30384 int ExternalCacheManager::Flush(bool do_commit, Transaction *transaction) {
423
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 30382 times.
30384 if (transaction->committed)
424 2 return 0;
425
1/2
✓ Branch 2 taken 27968 times.
✗ Branch 3 not taken.
27968 LogCvmfs(kLogCache, kLogDebug, "flushing %u bytes for %s",
426 transaction->buf_pos,
427
1/2
✓ Branch 1 taken 27968 times.
✗ Branch 2 not taken.
55936 std::string(transaction->id.ToString()).c_str());
428
1/2
✓ Branch 1 taken 30382 times.
✗ Branch 2 not taken.
30382 cvmfs::MsgHash object_id;
429
1/2
✓ Branch 1 taken 30382 times.
✗ Branch 2 not taken.
30382 transport_.FillMsgHash(transaction->id, &object_id);
430
1/2
✓ Branch 1 taken 30382 times.
✗ Branch 2 not taken.
30382 cvmfs::MsgStoreReq msg_store;
431 30382 msg_store.set_session_id(session_id_);
432 30382 msg_store.set_req_id(transaction->transaction_id);
433 30382 msg_store.set_allocated_object_id(&object_id);
434 30382 msg_store.set_part_nr((transaction->size / max_object_size_) + 1);
435 30382 msg_store.set_expected_size(transaction->expected_size);
436 30382 msg_store.set_last_part(do_commit);
437
438
2/2
✓ Branch 0 taken 30374 times.
✓ Branch 1 taken 8 times.
30382 if (transaction->label_modified) {
439 cvmfs::EnumObjectType object_type;
440
1/2
✓ Branch 1 taken 30374 times.
✗ Branch 2 not taken.
30374 transport_.FillObjectType(transaction->label.flags, &object_type);
441
1/2
✓ Branch 1 taken 30374 times.
✗ Branch 2 not taken.
30374 msg_store.set_object_type(object_type);
442
2/4
✓ Branch 1 taken 30374 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 30374 times.
✗ Branch 5 not taken.
30374 msg_store.set_description(transaction->label.GetDescription());
443 }
444
445
1/2
✓ Branch 1 taken 30382 times.
✗ Branch 2 not taken.
30382 RpcJob rpc_job(&msg_store);
446 30382 rpc_job.set_attachment_send(transaction->buffer, transaction->buf_pos);
447 // TODO(jblomer): allow for out of order chunk upload
448
1/2
✓ Branch 1 taken 30382 times.
✗ Branch 2 not taken.
30382 CallRemotely(&rpc_job);
449 30382 msg_store.release_object_id();
450
451
1/2
✓ Branch 1 taken 30382 times.
✗ Branch 2 not taken.
30382 cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
452
1/2
✓ Branch 1 taken 30382 times.
✗ Branch 2 not taken.
30382 if (msg_reply->status() == cvmfs::STATUS_OK) {
453 30382 transaction->flushed = true;
454
2/2
✓ Branch 0 taken 2308 times.
✓ Branch 1 taken 28074 times.
30382 if (do_commit)
455 2308 transaction->committed = true;
456 }
457 30382 return Ack2Errno(msg_reply->status());
458 30382 }
459
460
461 7922 shash::Any ExternalCacheManager::GetHandle(int fd) {
462 7922 const ReadLockGuard guard(rwlock_fd_table_);
463
1/2
✓ Branch 1 taken 7922 times.
✗ Branch 2 not taken.
7922 const ReadOnlyHandle handle = fd_table_.GetHandle(fd);
464 7922 return handle.id;
465 7922 }
466
467
468 806 int64_t ExternalCacheManager::GetSize(int fd) {
469
1/2
✓ Branch 1 taken 806 times.
✗ Branch 2 not taken.
806 const shash::Any id = GetHandle(fd);
470
2/2
✓ Branch 1 taken 46 times.
✓ Branch 2 taken 760 times.
806 if (id == kInvalidHandle)
471 46 return -EBADF;
472
473
1/2
✓ Branch 1 taken 760 times.
✗ Branch 2 not taken.
760 cvmfs::MsgHash object_id;
474
1/2
✓ Branch 1 taken 760 times.
✗ Branch 2 not taken.
760 transport_.FillMsgHash(id, &object_id);
475
1/2
✓ Branch 1 taken 760 times.
✗ Branch 2 not taken.
760 cvmfs::MsgObjectInfoReq msg_info;
476 760 msg_info.set_session_id(session_id_);
477 760 msg_info.set_req_id(NextRequestId());
478 760 msg_info.set_allocated_object_id(&object_id);
479
1/2
✓ Branch 1 taken 760 times.
✗ Branch 2 not taken.
760 RpcJob rpc_job(&msg_info);
480
1/2
✓ Branch 1 taken 760 times.
✗ Branch 2 not taken.
760 CallRemotely(&rpc_job);
481 760 msg_info.release_object_id();
482
483
1/2
✓ Branch 1 taken 760 times.
✗ Branch 2 not taken.
760 cvmfs::MsgObjectInfoReply *msg_reply = rpc_job.msg_object_info_reply();
484
2/2
✓ Branch 1 taken 714 times.
✓ Branch 2 taken 46 times.
760 if (msg_reply->status() == cvmfs::STATUS_OK) {
485
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 714 times.
714 assert(msg_reply->has_size());
486 714 return msg_reply->size();
487 }
488 46 return Ack2Errno(msg_reply->status());
489 760 }
490
491
492 46 void *ExternalCacheManager::MainRead(void *data) {
493 46 ExternalCacheManager *cache_mgr = reinterpret_cast<ExternalCacheManager *>(
494 data);
495 46 LogCvmfs(kLogCache, kLogDebug, "starting external cache reader thread");
496
497 46 unsigned char buffer[cache_mgr->max_object_size_];
498 while (true) {
499
1/2
✓ Branch 1 taken 139334 times.
✗ Branch 2 not taken.
139334 CacheTransport::Frame frame_recv;
500 139334 frame_recv.set_attachment(buffer, cache_mgr->max_object_size_);
501
1/2
✓ Branch 1 taken 139334 times.
✗ Branch 2 not taken.
139334 const bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
502
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 139288 times.
139334 if (!retval)
503 46 break;
504
505 uint64_t req_id;
506 139288 uint64_t part_nr = 0;
507
1/2
✓ Branch 1 taken 139288 times.
✗ Branch 2 not taken.
139288 google::protobuf::MessageLite *msg = frame_recv.GetMsgTyped();
508
3/4
✓ Branch 1 taken 139288 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 874 times.
✓ Branch 6 taken 138414 times.
139288 if (msg->GetTypeName() == "cvmfs.MsgRefcountReply") {
509 874 req_id = reinterpret_cast<cvmfs::MsgRefcountReply *>(msg)->req_id();
510
3/4
✓ Branch 1 taken 138414 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 414 times.
✓ Branch 6 taken 138000 times.
138414 } else if (msg->GetTypeName() == "cvmfs.MsgObjectInfoReply") {
511 414 req_id = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg)->req_id();
512
3/4
✓ Branch 1 taken 138000 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 82800 times.
✓ Branch 6 taken 55200 times.
138000 } else if (msg->GetTypeName() == "cvmfs.MsgReadReply") {
513 82800 req_id = reinterpret_cast<cvmfs::MsgReadReply *>(msg)->req_id();
514
3/4
✓ Branch 1 taken 55200 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 9200 times.
✓ Branch 6 taken 46000 times.
55200 } else if (msg->GetTypeName() == "cvmfs.MsgStoreReply") {
515 9200 req_id = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->req_id();
516 9200 part_nr = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->part_nr();
517
2/4
✓ Branch 1 taken 46000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 46000 times.
46000 } else if (msg->GetTypeName() == "cvmfs.MsgInfoReply") {
518 req_id = reinterpret_cast<cvmfs::MsgInfoReply *>(msg)->req_id();
519
2/4
✓ Branch 1 taken 46000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 46000 times.
46000 } else if (msg->GetTypeName() == "cvmfs.MsgShrinkReply") {
520 req_id = reinterpret_cast<cvmfs::MsgShrinkReply *>(msg)->req_id();
521
2/4
✓ Branch 1 taken 46000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 46000 times.
46000 } else if (msg->GetTypeName() == "cvmfs.MsgListReply") {
522 req_id = reinterpret_cast<cvmfs::MsgListReply *>(msg)->req_id();
523
2/4
✓ Branch 1 taken 46000 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 46000 times.
46000 } else if (msg->GetTypeName() == "cvmfs.MsgBreadcrumbReply") {
524 req_id = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(msg)->req_id();
525
2/4
✓ Branch 1 taken 46000 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 46000 times.
✗ Branch 6 not taken.
46000 } else if (msg->GetTypeName() == "cvmfs.MsgDetach") {
526 // Release pinned catalogs
527
2/4
✓ Branch 2 taken 46000 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 46000 times.
✗ Branch 6 not taken.
46000 cache_mgr->quota_mgr_->BroadcastBackchannels("R");
528 46000 continue;
529 } else {
530 PANIC(kLogSyslogErr | kLogDebug, "unexpected message %s",
531 std::string(msg->GetTypeName()).c_str());
532 }
533
534 93288 RpcInFlight rpc_inflight;
535 {
536 93288 const MutexLockGuard guard(cache_mgr->lock_inflight_rpcs_);
537
1/2
✓ Branch 1 taken 94300 times.
✗ Branch 2 not taken.
94300 for (unsigned i = 0; i < cache_mgr->inflight_rpcs_.size(); ++i) {
538 94300 RpcJob *rpc_job = cache_mgr->inflight_rpcs_[i].rpc_job;
539
5/6
✓ Branch 1 taken 93288 times.
✓ Branch 2 taken 1012 times.
✓ Branch 4 taken 93288 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 93288 times.
✓ Branch 7 taken 1012 times.
94300 if ((rpc_job->req_id() == req_id) && (rpc_job->part_nr() == part_nr)) {
540 93288 rpc_inflight = cache_mgr->inflight_rpcs_[i];
541
1/2
✓ Branch 2 taken 93288 times.
✗ Branch 3 not taken.
93288 cache_mgr->inflight_rpcs_.erase(cache_mgr->inflight_rpcs_.begin()
542 93288 + i);
543 93288 break;
544 }
545 }
546 93288 }
547
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 93288 times.
93288 if (rpc_inflight.rpc_job == NULL) {
548 LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
549 "got unmatched rpc reply");
550 continue;
551 }
552
1/2
✓ Branch 2 taken 93288 times.
✗ Branch 3 not taken.
93288 rpc_inflight.rpc_job->frame_recv()->MergeFrom(frame_recv);
553
1/2
✓ Branch 1 taken 93288 times.
✗ Branch 2 not taken.
93288 rpc_inflight.signal->Wakeup();
554
3/3
✓ Branch 1 taken 93288 times.
✓ Branch 2 taken 46 times.
✓ Branch 3 taken 46000 times.
278622 }
555
556
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 if (!cache_mgr->terminated_) {
557 PANIC(kLogSyslogErr | kLogDebug,
558 "connection to external cache manager broken (%d)", errno);
559 }
560
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 LogCvmfs(kLogCache, kLogDebug, "stopping external cache reader thread");
561 46 return NULL;
562
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 }
563
564
565 7324 int ExternalCacheManager::Open(const LabeledObject &object) {
566 7324 return DoOpen(object.id);
567 }
568
569
570 2 int ExternalCacheManager::OpenFromTxn(void *txn) {
571 2 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
572 LogCvmfs(kLogCache, kLogDebug, "open fd for transaction %s",
573 std::string(transaction->id.ToString()).c_str());
574 2 const int retval = Flush(true, transaction);
575
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (retval != 0)
576 return retval;
577
578 2 int fd = -1;
579 {
580 2 const WriteLockGuard guard(rwlock_fd_table_);
581
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 fd = fd_table_.OpenFd(ReadOnlyHandle(transaction->id));
582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (fd < 0) {
583 LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
584 strerror(-fd));
585 return fd;
586 }
587
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 }
588 2 transaction->open_fds++;
589 2 return fd;
590 }
591
592
593 1090 int64_t ExternalCacheManager::Pread(int fd,
594 void *buf,
595 uint64_t size,
596 uint64_t offset) {
597
1/2
✓ Branch 1 taken 1090 times.
✗ Branch 2 not taken.
1090 const shash::Any id = GetHandle(fd);
598
2/2
✓ Branch 1 taken 46 times.
✓ Branch 2 taken 1044 times.
1090 if (id == kInvalidHandle)
599 46 return -EBADF;
600
601
1/2
✓ Branch 1 taken 1044 times.
✗ Branch 2 not taken.
1044 cvmfs::MsgHash object_id;
602
1/2
✓ Branch 1 taken 1044 times.
✗ Branch 2 not taken.
1044 transport_.FillMsgHash(id, &object_id);
603 1044 uint64_t nbytes = 0;
604
2/2
✓ Branch 0 taken 102394 times.
✓ Branch 1 taken 850 times.
103244 while (nbytes < size) {
605 102394 const uint64_t batch_size = std::min(
606 102394 size - nbytes, static_cast<uint64_t>(max_object_size_));
607
1/2
✓ Branch 1 taken 102394 times.
✗ Branch 2 not taken.
102394 cvmfs::MsgReadReq msg_read;
608 102394 msg_read.set_session_id(session_id_);
609 102394 msg_read.set_req_id(NextRequestId());
610 102394 msg_read.set_allocated_object_id(&object_id);
611 102394 msg_read.set_offset(offset + nbytes);
612 102394 msg_read.set_size(batch_size);
613
1/2
✓ Branch 1 taken 102394 times.
✗ Branch 2 not taken.
102394 RpcJob rpc_job(&msg_read);
614 102394 rpc_job.set_attachment_recv(reinterpret_cast<char *>(buf) + nbytes,
615 batch_size);
616
1/2
✓ Branch 1 taken 102394 times.
✗ Branch 2 not taken.
102394 CallRemotely(&rpc_job);
617 102394 msg_read.release_object_id();
618
619
1/2
✓ Branch 1 taken 102394 times.
✗ Branch 2 not taken.
102394 cvmfs::MsgReadReply *msg_reply = rpc_job.msg_read_reply();
620
2/2
✓ Branch 1 taken 102344 times.
✓ Branch 2 taken 50 times.
102394 if (msg_reply->status() == cvmfs::STATUS_OK) {
621 102344 nbytes += rpc_job.frame_recv()->att_size();
622 // Fuse sends in rounded up buffers, so short reads are expected
623
2/2
✓ Branch 2 taken 144 times.
✓ Branch 3 taken 102200 times.
102344 if (rpc_job.frame_recv()->att_size() < batch_size)
624 144 return nbytes;
625 } else {
626 50 return Ack2Errno(msg_reply->status());
627 }
628
4/4
✓ Branch 1 taken 102200 times.
✓ Branch 2 taken 194 times.
✓ Branch 4 taken 102200 times.
✓ Branch 5 taken 194 times.
102588 }
629 850 return size;
630 1044 }
631
632
633 92 int ExternalCacheManager::Readahead(int fd) {
634
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 const shash::Any id = GetHandle(fd);
635
2/2
✓ Branch 1 taken 46 times.
✓ Branch 2 taken 46 times.
92 if (id == kInvalidHandle)
636 46 return -EBADF;
637 // No-op
638 46 return 0;
639 }
640
641
642 142 int ExternalCacheManager::Reset(void *txn) {
643 142 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
644 142 transaction->buf_pos = 0;
645 142 transaction->size = 0;
646 142 transaction->open_fds = 0;
647 142 transaction->committed = false;
648 142 transaction->label_modified = true;
649
650
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 46 times.
142 if (!transaction->flushed)
651 96 return 0;
652
653
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 cvmfs::MsgHash object_id;
654
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 transport_.FillMsgHash(transaction->id, &object_id);
655
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 cvmfs::MsgStoreAbortReq msg_abort;
656 46 msg_abort.set_session_id(session_id_);
657 46 msg_abort.set_req_id(transaction->transaction_id);
658 46 msg_abort.set_allocated_object_id(&object_id);
659
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 RpcJob rpc_job(&msg_abort);
660
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 CallRemotely(&rpc_job);
661 46 msg_abort.release_object_id();
662
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
663 46 transaction->transaction_id = NextRequestId();
664 46 transaction->flushed = false;
665 46 return Ack2Errno(msg_reply->status());
666 46 }
667
668
669 96 manifest::Breadcrumb ExternalCacheManager::LoadBreadcrumb(
670 const std::string &fqrn) {
671
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 96 times.
96 if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
672 return manifest::Breadcrumb();
673
674
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 cvmfs::MsgBreadcrumbLoadReq msg_breadcrumb_load;
675 96 msg_breadcrumb_load.set_session_id(session_id_);
676 96 msg_breadcrumb_load.set_req_id(NextRequestId());
677
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 msg_breadcrumb_load.set_fqrn(fqrn);
678
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 RpcJob rpc_job(&msg_breadcrumb_load);
679
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 CallRemotely(&rpc_job);
680
681
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 manifest::Breadcrumb breadcrumb;
682
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
683
2/2
✓ Branch 1 taken 48 times.
✓ Branch 2 taken 48 times.
96 if (msg_reply->status() == cvmfs::STATUS_OK) {
684
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 48 times.
48 assert(msg_reply->has_breadcrumb());
685
1/2
✗ Branch 3 not taken.
✓ Branch 4 taken 48 times.
48 assert(msg_reply->breadcrumb().fqrn() == fqrn);
686
1/2
✓ Branch 3 taken 48 times.
✗ Branch 4 not taken.
48 const bool rv = transport_.ParseMsgHash(msg_reply->breadcrumb().hash(),
687 &breadcrumb.catalog_hash);
688
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48 times.
48 assert(rv);
689 48 breadcrumb.catalog_hash.suffix = shash::kSuffixCatalog;
690 48 breadcrumb.timestamp = msg_reply->breadcrumb().timestamp();
691
1/2
✓ Branch 2 taken 48 times.
✗ Branch 3 not taken.
48 if (msg_reply->breadcrumb().has_revision()) {
692 48 breadcrumb.revision = msg_reply->breadcrumb().revision();
693 } else {
694 breadcrumb.revision = 0;
695 }
696 }
697 96 return breadcrumb;
698 96 }
699
700
701 48 bool ExternalCacheManager::StoreBreadcrumb(const manifest::Manifest &manifest) {
702
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48 times.
48 if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
703 return false;
704
705
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 cvmfs::MsgHash hash;
706
1/2
✓ Branch 2 taken 48 times.
✗ Branch 3 not taken.
48 transport_.FillMsgHash(manifest.catalog_hash(), &hash);
707
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 cvmfs::MsgBreadcrumb breadcrumb;
708
2/4
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 48 times.
✗ Branch 5 not taken.
48 breadcrumb.set_fqrn(manifest.repository_name());
709 48 breadcrumb.set_allocated_hash(&hash);
710 48 breadcrumb.set_timestamp(manifest.publish_timestamp());
711 48 breadcrumb.set_revision(manifest.revision());
712
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 cvmfs::MsgBreadcrumbStoreReq msg_breadcrumb_store;
713 48 msg_breadcrumb_store.set_session_id(session_id_);
714 48 msg_breadcrumb_store.set_req_id(NextRequestId());
715 48 msg_breadcrumb_store.set_allocated_breadcrumb(&breadcrumb);
716
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 RpcJob rpc_job(&msg_breadcrumb_store);
717
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 CallRemotely(&rpc_job);
718 48 msg_breadcrumb_store.release_breadcrumb();
719 48 breadcrumb.release_hash();
720
721
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
722 48 return msg_reply->status() == cvmfs::STATUS_OK;
723 48 }
724
725
726 46 void ExternalCacheManager::Spawn() {
727 46 const int retval = pthread_create(&thread_read_, NULL, MainRead, this);
728
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 assert(retval == 0);
729 46 spawned_ = true;
730 46 }
731
732
733 /**
734 * Returns true if the plugin could be spawned or was spawned by another
735 * process.
736 */
737 bool ExternalCacheManager::SpawnPlugin(const vector<string> &cmd_line) {
738 if (cmd_line.empty())
739 return false;
740
741 int pipe_ready[2];
742 MakePipe(pipe_ready);
743 set<int> preserve_filedes;
744 preserve_filedes.insert(pipe_ready[1]);
745
746 const int fd_null_read = open("/dev/null", O_RDONLY);
747 const int fd_null_write = open("/dev/null", O_WRONLY);
748 assert((fd_null_read >= 0) && (fd_null_write >= 0));
749 map<int, int> map_fildes;
750 map_fildes[fd_null_read] = 0;
751 map_fildes[fd_null_write] = 1;
752 map_fildes[fd_null_write] = 2;
753
754 pid_t child_pid;
755 int retval = setenv(CacheTransport::kEnvReadyNotifyFd,
756 StringifyInt(pipe_ready[1]).c_str(), 1);
757 assert(retval == 0);
758 retval = ManagedExec(cmd_line,
759 preserve_filedes,
760 map_fildes,
761 false, // drop_credentials
762 false, // clear_env
763 true, // double fork
764 &child_pid);
765 unsetenv(CacheTransport::kEnvReadyNotifyFd);
766 close(fd_null_read);
767 close(fd_null_write);
768 if (!retval) {
769 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
770 "failed to start cache plugin '%s'",
771 JoinStrings(cmd_line, " ").c_str());
772 ClosePipe(pipe_ready);
773 return false;
774 }
775
776 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
777 "started cache plugin '%s' (pid %d), waiting for it to become ready",
778 JoinStrings(cmd_line, " ").c_str(), child_pid);
779 close(pipe_ready[1]);
780 char buf;
781 if (read(pipe_ready[0], &buf, 1) != 1) {
782 close(pipe_ready[0]);
783 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
784 "cache plugin did not start properly");
785 return false;
786 }
787 close(pipe_ready[0]);
788
789 if (buf == CacheTransport::kReadyNotification)
790 return true;
791 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
792 "cache plugin failed to create an endpoint");
793 return false;
794 }
795
796
797 2400 int ExternalCacheManager::StartTxn(const shash::Any &id,
798 uint64_t size,
799 void *txn) {
800
2/2
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 2308 times.
2400 if (!(capabilities_ & cvmfs::CAP_WRITE))
801 92 return -EROFS;
802
803 2308 Transaction *transaction = new (txn) Transaction(id);
804 2308 transaction->expected_size = size;
805 2308 transaction->transaction_id = NextRequestId();
806 #ifdef __APPLE__
807 transaction->buffer = reinterpret_cast<unsigned char *>(
808 smalloc(max_object_size_));
809 #endif
810 2308 return 0;
811 }
812
813
814 2402 int64_t ExternalCacheManager::Write(const void *buf, uint64_t size, void *txn) {
815 2402 Transaction *transaction = reinterpret_cast<Transaction *>(txn);
816
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2402 times.
2402 assert(!transaction->committed);
817
1/2
✓ Branch 2 taken 368 times.
✗ Branch 3 not taken.
368 LogCvmfs(kLogCache, kLogDebug, "writing %" PRIu64 " bytes for %s", size,
818 736 transaction->id.ToString().c_str());
819
820
1/2
✓ Branch 0 taken 2402 times.
✗ Branch 1 not taken.
2402 if (transaction->expected_size != kSizeUnknown) {
821
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2402 times.
2402 if (transaction->size + size > transaction->expected_size) {
822 LogCvmfs(kLogCache, kLogDebug,
823 "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
824 transaction->size + size, transaction->expected_size);
825 return -EFBIG;
826 }
827 }
828
829 2402 uint64_t written = 0;
830 2402 const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
831
2/2
✓ Branch 0 taken 30428 times.
✓ Branch 1 taken 2402 times.
32830 while (written < size) {
832
2/2
✓ Branch 0 taken 28074 times.
✓ Branch 1 taken 2354 times.
30428 if (transaction->buf_pos == max_object_size_) {
833 28074 bool do_commit = false;
834
1/2
✓ Branch 0 taken 28074 times.
✗ Branch 1 not taken.
28074 if (transaction->expected_size != kSizeUnknown)
835 28074 do_commit = (transaction->size + written) == transaction->expected_size;
836
1/2
✓ Branch 1 taken 28074 times.
✗ Branch 2 not taken.
28074 const int retval = Flush(do_commit, transaction);
837
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28074 times.
28074 if (retval != 0) {
838 transaction->size += written;
839 return retval;
840 }
841 28074 transaction->size += transaction->buf_pos;
842 28074 transaction->buf_pos = 0;
843 }
844 30428 const uint64_t remaining = size - written;
845 30428 const uint64_t space_in_buffer = max_object_size_ - transaction->buf_pos;
846 30428 const uint64_t batch_size = std::min(remaining, space_in_buffer);
847 30428 memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
848 30428 transaction->buf_pos += batch_size;
849 30428 written += batch_size;
850 30428 read_pos += batch_size;
851 }
852 2402 return written;
853 }
854
855
856 //------------------------------------------------------------------------------
857
858
859 284 bool ExternalQuotaManager::DoListing(cvmfs::EnumObjectType type,
860 vector<cvmfs::MsgListRecord> *result) {
861
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 284 times.
284 if (!(cache_mgr_->capabilities_ & cvmfs::CAP_LIST))
862 return false;
863
864 284 uint64_t listing_id = 0;
865 284 bool more_data = false;
866
2/2
✓ Branch 0 taken 184 times.
✓ Branch 1 taken 284 times.
468 do {
867
1/2
✓ Branch 1 taken 468 times.
✗ Branch 2 not taken.
468 cvmfs::MsgListReq msg_list;
868 468 msg_list.set_session_id(cache_mgr_->session_id_);
869 468 msg_list.set_req_id(cache_mgr_->NextRequestId());
870 468 msg_list.set_listing_id(listing_id);
871
1/2
✓ Branch 1 taken 468 times.
✗ Branch 2 not taken.
468 msg_list.set_object_type(type);
872
1/2
✓ Branch 1 taken 468 times.
✗ Branch 2 not taken.
468 ExternalCacheManager::RpcJob rpc_job(&msg_list);
873
1/2
✓ Branch 1 taken 468 times.
✗ Branch 2 not taken.
468 cache_mgr_->CallRemotely(&rpc_job);
874
875
1/2
✓ Branch 1 taken 468 times.
✗ Branch 2 not taken.
468 cvmfs::MsgListReply *msg_reply = rpc_job.msg_list_reply();
876
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 468 times.
468 if (msg_reply->status() != cvmfs::STATUS_OK)
877 return false;
878 468 more_data = !msg_reply->is_last_part();
879 468 listing_id = msg_reply->listing_id();
880
3/4
✓ Branch 1 taken 9204472 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9204004 times.
✓ Branch 4 taken 468 times.
9204472 for (int i = 0; i < msg_reply->list_record_size(); ++i) {
881
2/4
✓ Branch 1 taken 9204004 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9204004 times.
✗ Branch 5 not taken.
9204004 result->push_back(msg_reply->list_record(i));
882 }
883
2/4
✓ Branch 1 taken 468 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 468 times.
✗ Branch 5 not taken.
468 } while (more_data);
884
885 284 return true;
886 }
887
888
889 100 bool ExternalQuotaManager::Cleanup(const uint64_t leave_size) {
890
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 100 times.
100 if (!(cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK))
891 return false;
892
893
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 cvmfs::MsgShrinkReq msg_shrink;
894 100 msg_shrink.set_session_id(cache_mgr_->session_id_);
895 100 msg_shrink.set_req_id(cache_mgr_->NextRequestId());
896 100 msg_shrink.set_shrink_to(leave_size);
897
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 ExternalCacheManager::RpcJob rpc_job(&msg_shrink);
898
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 cache_mgr_->CallRemotely(&rpc_job);
899
900
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 cvmfs::MsgShrinkReply *msg_reply = rpc_job.msg_shrink_reply();
901 100 return msg_reply->status() == cvmfs::STATUS_OK;
902 100 }
903
904
905 852 ExternalQuotaManager *ExternalQuotaManager::Create(
906 ExternalCacheManager *cache_mgr) {
907 UniquePtr<ExternalQuotaManager> quota_mgr(
908
3/6
✓ Branch 1 taken 852 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 852 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 852 times.
✗ Branch 8 not taken.
852 new ExternalQuotaManager(cache_mgr));
909
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 852 times.
852 assert(quota_mgr.IsValid());
910
911 1704 return quota_mgr.Release();
912 852 }
913
914
915 208 int ExternalQuotaManager::GetInfo(QuotaInfo *quota_info) {
916
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 208 times.
208 if (!(cache_mgr_->capabilities_ & cvmfs::CAP_INFO))
917 return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
918
919
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 cvmfs::MsgInfoReq msg_info;
920 208 msg_info.set_session_id(cache_mgr_->session_id_);
921 208 msg_info.set_req_id(cache_mgr_->NextRequestId());
922
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 ExternalCacheManager::RpcJob rpc_job(&msg_info);
923
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 cache_mgr_->CallRemotely(&rpc_job);
924
925
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 cvmfs::MsgInfoReply *msg_reply = rpc_job.msg_info_reply();
926
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 if (msg_reply->status() == cvmfs::STATUS_OK) {
927 208 quota_info->size = msg_reply->size_bytes();
928 208 quota_info->used = msg_reply->used_bytes();
929 208 quota_info->pinned = msg_reply->pinned_bytes();
930
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 if (msg_reply->no_shrink() >= 0)
931 208 quota_info->no_shrink = msg_reply->no_shrink();
932 }
933 208 return Ack2Errno(msg_reply->status());
934 208 }
935
936
937 50 uint64_t ExternalQuotaManager::GetCapacity() {
938 50 QuotaInfo info;
939
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 const int retval = GetInfo(&info);
940
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (retval != 0)
941 return uint64_t(-1);
942 50 return info.size;
943 }
944
945
946 uint64_t ExternalQuotaManager::GetCleanupRate(uint64_t period_s) {
947 QuotaInfo info;
948 const int retval = GetInfo(&info);
949 if (retval != 0)
950 return 0;
951 return info.no_shrink;
952 }
953
954
955 60 uint64_t ExternalQuotaManager::GetSize() {
956 60 QuotaInfo info;
957
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 const int retval = GetInfo(&info);
958
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if (retval != 0)
959 return 0;
960 60 return info.used;
961 }
962
963
964 98 uint64_t ExternalQuotaManager::GetSizePinned() {
965 98 QuotaInfo info;
966
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 const int retval = GetInfo(&info);
967
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (retval != 0)
968 return 0;
969 98 return info.pinned;
970 }
971
972
973 bool ExternalQuotaManager::HasCapability(Capabilities capability) {
974 switch (capability) {
975 case kCapIntrospectSize:
976 return cache_mgr_->capabilities_ & cvmfs::CAP_INFO;
977 case kCapIntrospectCleanupRate:
978 return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK_RATE;
979 case kCapList:
980 return cache_mgr_->capabilities_ & cvmfs::CAP_LIST;
981 case kCapShrink:
982 return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK;
983 case kCapListeners:
984 return true;
985 default:
986 return false;
987 }
988 }
989
990
991 48 vector<string> ExternalQuotaManager::List() {
992 48 vector<string> result;
993 48 vector<cvmfs::MsgListRecord> raw_list;
994
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 const bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
995
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48 times.
48 if (!retval)
996 return result;
997
2/2
✓ Branch 1 taken 4602002 times.
✓ Branch 2 taken 48 times.
4602050 for (unsigned i = 0; i < raw_list.size(); ++i)
998
1/2
✓ Branch 3 taken 4602002 times.
✗ Branch 4 not taken.
4602002 result.push_back(raw_list[i].description());
999 48 return result;
1000 48 }
1001
1002
1003 46 vector<string> ExternalQuotaManager::ListCatalogs() {
1004 46 vector<string> result;
1005 46 vector<cvmfs::MsgListRecord> raw_list;
1006
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 const bool retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
1007
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 if (!retval)
1008 return result;
1009
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 46 times.
46 for (unsigned i = 0; i < raw_list.size(); ++i)
1010 result.push_back(raw_list[i].description());
1011 46 return result;
1012 46 }
1013
1014
1015 48 vector<string> ExternalQuotaManager::ListPinned() {
1016 48 vector<string> result;
1017
2/2
✓ Branch 1 taken 144 times.
✓ Branch 2 taken 48 times.
384 vector<cvmfs::MsgListRecord> raw_lists[3];
1018
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
1019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48 times.
48 if (!retval)
1020 return result;
1021
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
1022
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48 times.
48 if (!retval)
1023 return result;
1024
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
1025
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48 times.
48 if (!retval)
1026 return result;
1027
2/2
✓ Branch 0 taken 144 times.
✓ Branch 1 taken 48 times.
192 for (unsigned i = 0; i < sizeof(raw_lists) / sizeof(raw_lists[0]); ++i) {
1028
2/2
✓ Branch 1 taken 4602002 times.
✓ Branch 2 taken 144 times.
4602146 for (unsigned j = 0; j < raw_lists[i].size(); ++j) {
1029
2/2
✓ Branch 2 taken 200 times.
✓ Branch 3 taken 4601802 times.
4602002 if (raw_lists[i][j].pinned())
1030
1/2
✓ Branch 3 taken 200 times.
✗ Branch 4 not taken.
200 result.push_back(raw_lists[i][j].description());
1031 }
1032 }
1033 48 return result;
1034
2/4
✓ Branch 0 taken 144 times.
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
192 }
1035
1036
1037 46 vector<string> ExternalQuotaManager::ListVolatile() {
1038 46 vector<string> result;
1039 46 vector<cvmfs::MsgListRecord> raw_list;
1040
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 const bool retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
1041
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 if (!retval)
1042 return result;
1043
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 46 times.
46 for (unsigned i = 0; i < raw_list.size(); ++i)
1044 result.push_back(raw_list[i].description());
1045 46 return result;
1046 46 }
1047
1048
1049 46 void ExternalQuotaManager::RegisterBackChannel(int back_channel[2],
1050 const string &channel_id) {
1051
1/2
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
46 const shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1052
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 MakePipe(back_channel);
1053 46 LockBackChannels();
1054
2/4
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 46 times.
46 assert(back_channels_.find(hash_id) == back_channels_.end());
1055
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 back_channels_[hash_id] = back_channel[1];
1056 46 UnlockBackChannels();
1057 46 }
1058
1059
1060 46 void ExternalQuotaManager::UnregisterBackChannel(int back_channel[2],
1061 const string &channel_id) {
1062
1/2
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
46 const shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1063 46 LockBackChannels();
1064
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 back_channels_.erase(hash_id);
1065 46 UnlockBackChannels();
1066
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 ClosePipe(back_channel);
1067 46 }
1068