GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
*/ |
||
4 |
#include "cvmfs_config.h" |
||
5 |
#include "cache_extern.h" |
||
6 |
|||
7 |
#include <errno.h> |
||
8 |
#include <fcntl.h> |
||
9 |
#include <inttypes.h> |
||
10 |
#include <stdint.h> |
||
11 |
#include <sys/socket.h> |
||
12 |
#include <unistd.h> |
||
13 |
|||
14 |
#include <algorithm> |
||
15 |
#include <cassert> |
||
16 |
#ifdef __APPLE__ |
||
17 |
#include <cstdlib> |
||
18 |
#endif |
||
19 |
#include <cstring> |
||
20 |
#include <map> |
||
21 |
#include <new> |
||
22 |
#include <set> |
||
23 |
#include <string> |
||
24 |
|||
25 |
#include "atomic.h" |
||
26 |
#include "cache.pb.h" |
||
27 |
#include "hash.h" |
||
28 |
#include "logging.h" |
||
29 |
#ifdef __APPLE__ |
||
30 |
#include "smalloc.h" |
||
31 |
#endif |
||
32 |
#include "util/pointer.h" |
||
33 |
#include "util/posix.h" |
||
34 |
#include "util/string.h" |
||
35 |
#include "util_concurrency.h" |
||
36 |
|||
37 |
using namespace std; // NOLINT |
||
38 |
|||
39 |
namespace { |
||
40 |
|||
41 |
6125 |
int Ack2Errno(cvmfs::EnumStatus status_code) { |
|
42 |
✓✗✗✗ ✓✓✗✗ ✗✗✓✗ |
6125 |
switch (status_code) { |
43 |
case cvmfs::STATUS_OK: |
||
44 |
6109 |
return 0; |
|
45 |
case cvmfs::STATUS_NOSUPPORT: |
||
46 |
return -EOPNOTSUPP; |
||
47 |
case cvmfs::STATUS_FORBIDDEN: |
||
48 |
return -EPERM; |
||
49 |
case cvmfs::STATUS_NOSPACE: |
||
50 |
return -ENOSPC; |
||
51 |
case cvmfs::STATUS_NOENTRY: |
||
52 |
9 |
return -ENOENT; |
|
53 |
case cvmfs::STATUS_MALFORMED: |
||
54 |
2 |
return -EINVAL; |
|
55 |
case cvmfs::STATUS_IOERR: |
||
56 |
return -EIO; |
||
57 |
case cvmfs::STATUS_CORRUPTED: |
||
58 |
return -EIO; |
||
59 |
case cvmfs::STATUS_TIMEOUT: |
||
60 |
return -EIO; |
||
61 |
case cvmfs::STATUS_BADCOUNT: |
||
62 |
return -EINVAL; |
||
63 |
case cvmfs::STATUS_OUTOFBOUNDS: |
||
64 |
5 |
return -EINVAL; |
|
65 |
default: |
||
66 |
return -EIO; |
||
67 |
} |
||
68 |
} |
||
69 |
|||
70 |
} // anonymous namespace |
||
71 |
|||
72 |
17 |
const shash::Any ExternalCacheManager::kInvalidHandle; |
|
73 |
|||
74 |
|||
75 |
int ExternalCacheManager::AbortTxn(void *txn) { |
||
76 |
int result = Reset(txn); |
||
77 |
#ifdef __APPLE__ |
||
78 |
free(reinterpret_cast<Transaction *>(txn)->buffer); |
||
79 |
#endif |
||
80 |
return result; |
||
81 |
} |
||
82 |
|||
83 |
|||
84 |
39 |
bool ExternalCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) { |
|
85 |
✗✓ | 39 |
assert(quota_mgr != NULL); |
86 |
39 |
quota_mgr_ = quota_mgr; |
|
87 |
17 |
LogCvmfs(kLogCache, kLogDebug, "set quota manager"); |
|
88 |
39 |
return true; |
|
89 |
} |
||
90 |
|||
91 |
|||
92 |
9105 |
void ExternalCacheManager::CallRemotely(ExternalCacheManager::RpcJob *rpc_job) { |
|
93 |
✓✓ | 9105 |
if (!spawned_) { |
94 |
7077 |
transport_.SendFrame(rpc_job->frame_send()); |
|
95 |
7077 |
uint32_t save_att_size = rpc_job->frame_recv()->att_size(); |
|
96 |
bool again; |
||
97 |
✓✓ | 7078 |
do { |
98 |
7078 |
again = false; |
|
99 |
7078 |
bool retval = transport_.RecvFrame(rpc_job->frame_recv()); |
|
100 |
✗✓ | 7078 |
assert(retval); |
101 |
✓✓ | 7078 |
if (rpc_job->frame_recv()->IsMsgOutOfBand()) { |
102 |
google::protobuf::MessageLite *msg_typed = |
||
103 |
1 |
rpc_job->frame_recv()->GetMsgTyped(); |
|
104 |
✗✓ | 1 |
assert(msg_typed->GetTypeName() == "cvmfs.MsgDetach"); |
105 |
1 |
quota_mgr_->BroadcastBackchannels("R"); // release pinned catalogs |
|
106 |
1 |
rpc_job->frame_recv()->Reset(save_att_size); |
|
107 |
1 |
again = true; |
|
108 |
} |
||
109 |
} while (again); |
||
110 |
} else { |
||
111 |
2028 |
Signal signal; |
|
112 |
{ |
||
113 |
2028 |
MutexLockGuard guard(lock_inflight_rpcs_); |
|
114 |
2028 |
inflight_rpcs_.push_back(RpcInFlight(rpc_job, &signal)); |
|
115 |
} |
||
116 |
{ |
||
117 |
2028 |
MutexLockGuard guard(lock_send_fd_); |
|
118 |
2028 |
transport_.SendFrame(rpc_job->frame_send()); |
|
119 |
} |
||
120 |
2028 |
signal.Wait(); |
|
121 |
} |
||
122 |
9105 |
} |
|
123 |
|||
124 |
|||
125 |
3068 |
int ExternalCacheManager::ChangeRefcount(const shash::Any &id, int change_by) { |
|
126 |
3068 |
cvmfs::MsgHash object_id; |
|
127 |
3068 |
transport_.FillMsgHash(id, &object_id); |
|
128 |
3068 |
cvmfs::MsgRefcountReq msg_refcount; |
|
129 |
3068 |
msg_refcount.set_session_id(session_id_); |
|
130 |
3068 |
msg_refcount.set_req_id(NextRequestId()); |
|
131 |
3068 |
msg_refcount.set_allocated_object_id(&object_id); |
|
132 |
3068 |
msg_refcount.set_change_by(change_by); |
|
133 |
3068 |
RpcJob rpc_job(&msg_refcount); |
|
134 |
3068 |
CallRemotely(&rpc_job); |
|
135 |
3068 |
msg_refcount.release_object_id(); |
|
136 |
|||
137 |
3068 |
cvmfs::MsgRefcountReply *msg_reply = rpc_job.msg_refcount_reply(); |
|
138 |
3068 |
return Ack2Errno(msg_reply->status()); |
|
139 |
} |
||
140 |
|||
141 |
|||
142 |
513 |
int ExternalCacheManager::Close(int fd) { |
|
143 |
513 |
ReadOnlyHandle handle; |
|
144 |
{ |
||
145 |
513 |
WriteLockGuard guard(rwlock_fd_table_); |
|
146 |
513 |
handle = fd_table_.GetHandle(fd); |
|
147 |
✓✓ | 513 |
if (handle.id == kInvalidHandle) |
148 |
1 |
return -EBADF; |
|
149 |
512 |
int retval = fd_table_.CloseFd(fd); |
|
150 |
✗✓✓✓ |
512 |
assert(retval == 0); |
151 |
} |
||
152 |
|||
153 |
512 |
return ChangeRefcount(handle.id, -1); |
|
154 |
} |
||
155 |
|||
156 |
|||
157 |
2038 |
int ExternalCacheManager::CommitTxn(void *txn) { |
|
158 |
2038 |
Transaction *transaction = reinterpret_cast<Transaction *>(txn); |
|
159 |
LogCvmfs(kLogCache, kLogDebug, "committing %s", |
||
160 |
6 |
transaction->id.ToString().c_str()); |
|
161 |
2038 |
int retval = Flush(true, transaction); |
|
162 |
✗✓ | 2038 |
if (retval != 0) |
163 |
return retval; |
||
164 |
|||
165 |
2038 |
int refcount = transaction->open_fds - 1; |
|
166 |
✓✓ | 2038 |
if (refcount != 0) |
167 |
2036 |
return ChangeRefcount(transaction->id, refcount); |
|
168 |
#ifdef __APPLE__ |
||
169 |
free(transaction->buffer); |
||
170 |
#endif |
||
171 |
2 |
return 0; |
|
172 |
} |
||
173 |
|||
174 |
|||
175 |
int ExternalCacheManager::ConnectLocator(const std::string &locator) { |
||
176 |
vector<string> tokens = SplitString(locator, '='); |
||
177 |
int result = -1; |
||
178 |
if (tokens[0] == "unix") { |
||
179 |
result = ConnectSocket(tokens[1]); |
||
180 |
} else if (tokens[0] == "tcp") { |
||
181 |
vector<string> tcp_address = SplitString(tokens[1], ':'); |
||
182 |
if (tcp_address.size() != 2) |
||
183 |
return -EINVAL; |
||
184 |
result = ConnectTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1])); |
||
185 |
} else { |
||
186 |
return -EINVAL; |
||
187 |
} |
||
188 |
if (result < 0) { |
||
189 |
if (errno) { |
||
190 |
LogCvmfs(kLogCache, kLogDebug | kLogStderr, |
||
191 |
"Failed to connect to socket: %s", strerror(errno)); |
||
192 |
} else { |
||
193 |
LogCvmfs(kLogCache, kLogDebug | kLogStderr, |
||
194 |
"Failed to connect to socket (unknown error)"); |
||
195 |
} |
||
196 |
return -EIO; |
||
197 |
} |
||
198 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslog, |
||
199 |
"connected to cache plugin at %s", locator.c_str()); |
||
200 |
return result; |
||
201 |
} |
||
202 |
|||
203 |
|||
204 |
41 |
ExternalCacheManager *ExternalCacheManager::Create( |
|
205 |
int fd_connection, |
||
206 |
unsigned max_open_fds, |
||
207 |
const string &ident) |
||
208 |
{ |
||
209 |
UniquePtr<ExternalCacheManager> cache_mgr( |
||
210 |
41 |
new ExternalCacheManager(fd_connection, max_open_fds)); |
|
211 |
✗✓ | 41 |
assert(cache_mgr.IsValid()); |
212 |
|||
213 |
41 |
cvmfs::MsgHandshake msg_handshake; |
|
214 |
41 |
msg_handshake.set_protocol_version(kPbProtocolVersion); |
|
215 |
41 |
msg_handshake.set_name(ident); |
|
216 |
41 |
CacheTransport::Frame frame_send(&msg_handshake); |
|
217 |
41 |
cache_mgr->transport_.SendFrame(&frame_send); |
|
218 |
|||
219 |
41 |
CacheTransport::Frame frame_recv; |
|
220 |
41 |
bool retval = cache_mgr->transport_.RecvFrame(&frame_recv); |
|
221 |
✗✓ | 41 |
if (!retval) |
222 |
return NULL; |
||
223 |
41 |
google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped(); |
|
224 |
✗✓ | 41 |
if (msg_typed->GetTypeName() != "cvmfs.MsgHandshakeAck") |
225 |
return NULL; |
||
226 |
cvmfs::MsgHandshakeAck *msg_ack = |
||
227 |
41 |
reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed); |
|
228 |
41 |
cache_mgr->session_id_ = msg_ack->session_id(); |
|
229 |
41 |
cache_mgr->capabilities_ = msg_ack->capabilities(); |
|
230 |
41 |
cache_mgr->max_object_size_ = msg_ack->max_object_size(); |
|
231 |
✗✓ | 41 |
assert(cache_mgr->max_object_size_ > 0); |
232 |
✗✓ | 41 |
if (cache_mgr->max_object_size_ > kMaxSupportedObjectSize) { |
233 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
||
234 |
"external cache manager object size too large (%u)", |
||
235 |
cache_mgr->max_object_size_); |
||
236 |
return NULL; |
||
237 |
} |
||
238 |
✗✓ | 41 |
if (cache_mgr->max_object_size_ < kMinSupportedObjectSize) { |
239 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
||
240 |
"external cache manager object size too small (%u)", |
||
241 |
cache_mgr->max_object_size_); |
||
242 |
return NULL; |
||
243 |
} |
||
244 |
✓✓ | 41 |
if (msg_ack->has_pid()) |
245 |
17 |
cache_mgr->pid_plugin_ = msg_ack->pid(); |
|
246 |
41 |
return cache_mgr.Release(); |
|
247 |
} |
||
248 |
|||
249 |
|||
250 |
/** |
||
251 |
* Tries to connect to the plugin at locator, or, if it doesn't exist, spawns |
||
252 |
* a new plugin using cmdline. Two processes could try to spawn the plugin at |
||
253 |
* the same time. In this case, the plugin should indicate to the client to |
||
254 |
* retry connecting. |
||
255 |
*/ |
||
256 |
ExternalCacheManager::PluginHandle *ExternalCacheManager::CreatePlugin( |
||
257 |
const std::string &locator, |
||
258 |
const std::vector<std::string> &cmd_line) |
||
259 |
{ |
||
260 |
UniquePtr<PluginHandle> plugin_handle(new PluginHandle()); |
||
261 |
unsigned num_attempts = 0; |
||
262 |
bool try_again = false; |
||
263 |
do { |
||
264 |
num_attempts++; |
||
265 |
if (num_attempts > 2) { |
||
266 |
// Prevent violate busy loops |
||
267 |
SafeSleepMs(1000); |
||
268 |
} |
||
269 |
plugin_handle->fd_connection_ = ConnectLocator(locator); |
||
270 |
if (plugin_handle->IsValid()) { |
||
271 |
break; |
||
272 |
} else if (plugin_handle->fd_connection_ == -EINVAL) { |
||
273 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslog, |
||
274 |
"Invalid locator: %s", locator.c_str()); |
||
275 |
plugin_handle->error_msg_ = "Invalid locator: " + locator; |
||
276 |
break; |
||
277 |
} else { |
||
278 |
if (num_attempts > 1) { |
||
279 |
LogCvmfs(kLogCache, kLogDebug | kLogStderr, |
||
280 |
"Failed to connect to external cache manager: %d", |
||
281 |
plugin_handle->fd_connection_); |
||
282 |
} |
||
283 |
plugin_handle->error_msg_ = "Failed to connect to external cache manager"; |
||
284 |
} |
||
285 |
|||
286 |
try_again = SpawnPlugin(cmd_line); |
||
287 |
} while (try_again); |
||
288 |
|||
289 |
return plugin_handle.Release(); |
||
290 |
} |
||
291 |
|||
292 |
|||
293 |
2027 |
void ExternalCacheManager::CtrlTxn( |
|
294 |
const ObjectInfo &object_info, |
||
295 |
const int flags, |
||
296 |
void *txn) |
||
297 |
{ |
||
298 |
2027 |
Transaction *transaction = reinterpret_cast<Transaction *>(txn); |
|
299 |
2027 |
transaction->object_info = object_info; |
|
300 |
2027 |
transaction->object_info_modified = true; |
|
301 |
2027 |
} |
|
302 |
|||
303 |
|||
304 |
string ExternalCacheManager::Describe() { |
||
305 |
return "External cache manager\n"; |
||
306 |
} |
||
307 |
|||
308 |
|||
309 |
2 |
bool ExternalCacheManager::DoFreeState(void *data) { |
|
310 |
FdTable<ReadOnlyHandle> *fd_table = |
||
311 |
2 |
reinterpret_cast<FdTable<ReadOnlyHandle> *>(data); |
|
312 |
✓✗ | 2 |
delete fd_table; |
313 |
2 |
return true; |
|
314 |
} |
||
315 |
|||
316 |
|||
317 |
522 |
int ExternalCacheManager::DoOpen(const shash::Any &id) { |
|
318 |
522 |
int fd = -1; |
|
319 |
{ |
||
320 |
522 |
WriteLockGuard guard(rwlock_fd_table_); |
|
321 |
522 |
fd = fd_table_.OpenFd(ReadOnlyHandle(id)); |
|
322 |
✓✓ | 522 |
if (fd < 0) { |
323 |
LogCvmfs(kLogCache, kLogDebug, "error while creating new fd", |
||
324 |
2 |
strerror(-fd)); |
|
325 |
2 |
return fd; |
|
326 |
} |
||
327 |
} |
||
328 |
|||
329 |
520 |
int status_refcnt = ChangeRefcount(id, 1); |
|
330 |
✓✓ | 520 |
if (status_refcnt == 0) |
331 |
510 |
return fd; |
|
332 |
|||
333 |
10 |
WriteLockGuard guard(rwlock_fd_table_); |
|
334 |
10 |
int retval = fd_table_.CloseFd(fd); |
|
335 |
✗✓ | 10 |
assert(retval == 0); |
336 |
10 |
return status_refcnt; |
|
337 |
} |
||
338 |
|||
339 |
|||
340 |
2 |
bool ExternalCacheManager::DoRestoreState(void *data) { |
|
341 |
FdTable<ReadOnlyHandle> *other = |
||
342 |
2 |
reinterpret_cast<FdTable<ReadOnlyHandle> *>(data); |
|
343 |
2 |
fd_table_.AssignFrom(*other); |
|
344 |
2 |
cvmfs::MsgIoctl msg_ioctl; |
|
345 |
2 |
msg_ioctl.set_session_id(session_id_); |
|
346 |
2 |
msg_ioctl.set_conncnt_change_by(-1); |
|
347 |
2 |
CacheTransport::Frame frame(&msg_ioctl); |
|
348 |
2 |
transport_.SendFrame(&frame); |
|
349 |
2 |
return true; |
|
350 |
} |
||
351 |
|||
352 |
|||
353 |
2 |
void *ExternalCacheManager::DoSaveState() { |
|
354 |
2 |
cvmfs::MsgIoctl msg_ioctl; |
|
355 |
2 |
msg_ioctl.set_session_id(session_id_); |
|
356 |
2 |
msg_ioctl.set_conncnt_change_by(1); |
|
357 |
2 |
CacheTransport::Frame frame(&msg_ioctl); |
|
358 |
2 |
transport_.SendFrame(&frame); |
|
359 |
2 |
return fd_table_.Clone(); |
|
360 |
} |
||
361 |
|||
362 |
|||
363 |
129 |
int ExternalCacheManager::Dup(int fd) { |
|
364 |
129 |
shash::Any id = GetHandle(fd); |
|
365 |
✓✓ | 129 |
if (id == kInvalidHandle) |
366 |
1 |
return -EBADF; |
|
367 |
128 |
return DoOpen(id); |
|
368 |
} |
||
369 |
|||
370 |
|||
371 |
41 |
ExternalCacheManager::ExternalCacheManager( |
|
372 |
int fd_connection, |
||
373 |
unsigned max_open_fds) |
||
374 |
: pid_plugin_(0) |
||
375 |
, fd_table_(max_open_fds, ReadOnlyHandle()) |
||
376 |
, transport_(fd_connection) |
||
377 |
, session_id_(-1) |
||
378 |
, max_object_size_(0) |
||
379 |
, spawned_(false) |
||
380 |
, terminated_(false) |
||
381 |
41 |
, capabilities_(cvmfs::CAP_NONE) |
|
382 |
{ |
||
383 |
41 |
int retval = pthread_rwlock_init(&rwlock_fd_table_, NULL); |
|
384 |
✗✓ | 41 |
assert(retval == 0); |
385 |
41 |
retval = pthread_mutex_init(&lock_send_fd_, NULL); |
|
386 |
✗✓ | 41 |
assert(retval == 0); |
387 |
41 |
retval = pthread_mutex_init(&lock_inflight_rpcs_, NULL); |
|
388 |
✗✓ | 41 |
assert(retval == 0); |
389 |
41 |
memset(&thread_read_, 0, sizeof(thread_read_)); |
|
390 |
41 |
atomic_init64(&next_request_id_); |
|
391 |
41 |
} |
|
392 |
|||
393 |
|||
394 |
78 |
ExternalCacheManager::~ExternalCacheManager() { |
|
395 |
39 |
terminated_ = true; |
|
396 |
39 |
MemoryFence(); |
|
397 |
✓✗ | 39 |
if (session_id_ >= 0) { |
398 |
39 |
cvmfs::MsgQuit msg_quit; |
|
399 |
39 |
msg_quit.set_session_id(session_id_); |
|
400 |
39 |
CacheTransport::Frame frame(&msg_quit); |
|
401 |
39 |
transport_.SendFrame(&frame); |
|
402 |
} |
||
403 |
39 |
shutdown(transport_.fd_connection(), SHUT_RDWR); |
|
404 |
✓✓ | 39 |
if (spawned_) |
405 |
1 |
pthread_join(thread_read_, NULL); |
|
406 |
39 |
close(transport_.fd_connection()); |
|
407 |
39 |
pthread_rwlock_destroy(&rwlock_fd_table_); |
|
408 |
39 |
pthread_mutex_destroy(&lock_send_fd_); |
|
409 |
39 |
pthread_mutex_destroy(&lock_inflight_rpcs_); |
|
410 |
✗✓ | 78 |
} |
411 |
|||
412 |
|||
413 |
3024 |
int ExternalCacheManager::Flush(bool do_commit, Transaction *transaction) { |
|
414 |
✓✓ | 3024 |
if (transaction->committed) |
415 |
2 |
return 0; |
|
416 |
LogCvmfs(kLogCache, kLogDebug, "flushing %u bytes for %s", |
||
417 |
608 |
transaction->buf_pos, transaction->id.ToString().c_str()); |
|
418 |
3022 |
cvmfs::MsgHash object_id; |
|
419 |
3022 |
transport_.FillMsgHash(transaction->id, &object_id); |
|
420 |
3022 |
cvmfs::MsgStoreReq msg_store; |
|
421 |
3022 |
msg_store.set_session_id(session_id_); |
|
422 |
3022 |
msg_store.set_req_id(transaction->transaction_id); |
|
423 |
3022 |
msg_store.set_allocated_object_id(&object_id); |
|
424 |
3022 |
msg_store.set_part_nr((transaction->size / max_object_size_) + 1); |
|
425 |
3022 |
msg_store.set_expected_size(transaction->expected_size); |
|
426 |
3022 |
msg_store.set_last_part(do_commit); |
|
427 |
|||
428 |
✓✓ | 3022 |
if (transaction->object_info_modified) { |
429 |
cvmfs::EnumObjectType object_type; |
||
430 |
3014 |
transport_.FillObjectType(transaction->object_info.type, &object_type); |
|
431 |
3014 |
msg_store.set_object_type(object_type); |
|
432 |
3014 |
msg_store.set_description(transaction->object_info.description); |
|
433 |
} |
||
434 |
|||
435 |
3022 |
RpcJob rpc_job(&msg_store); |
|
436 |
3022 |
rpc_job.set_attachment_send(transaction->buffer, transaction->buf_pos); |
|
437 |
// TODO(jblomer): allow for out of order chunk upload |
||
438 |
3022 |
CallRemotely(&rpc_job); |
|
439 |
3022 |
msg_store.release_object_id(); |
|
440 |
|||
441 |
3022 |
cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply(); |
|
442 |
✓✗ | 3022 |
if (msg_reply->status() == cvmfs::STATUS_OK) { |
443 |
3022 |
transaction->flushed = true; |
|
444 |
✓✓ | 3022 |
if (do_commit) |
445 |
2038 |
transaction->committed = true; |
|
446 |
} |
||
447 |
3022 |
return Ack2Errno(msg_reply->status()); |
|
448 |
} |
||
449 |
|||
450 |
|||
451 |
430 |
shash::Any ExternalCacheManager::GetHandle(int fd) { |
|
452 |
430 |
ReadLockGuard guard(rwlock_fd_table_); |
|
453 |
430 |
ReadOnlyHandle handle = fd_table_.GetHandle(fd); |
|
454 |
430 |
return handle.id; |
|
455 |
} |
||
456 |
|||
457 |
|||
458 |
41 |
int64_t ExternalCacheManager::GetSize(int fd) { |
|
459 |
41 |
shash::Any id = GetHandle(fd); |
|
460 |
✓✓ | 41 |
if (id == kInvalidHandle) |
461 |
1 |
return -EBADF; |
|
462 |
|||
463 |
40 |
cvmfs::MsgHash object_id; |
|
464 |
40 |
transport_.FillMsgHash(id, &object_id); |
|
465 |
40 |
cvmfs::MsgObjectInfoReq msg_info; |
|
466 |
40 |
msg_info.set_session_id(session_id_); |
|
467 |
40 |
msg_info.set_req_id(NextRequestId()); |
|
468 |
40 |
msg_info.set_allocated_object_id(&object_id); |
|
469 |
40 |
RpcJob rpc_job(&msg_info); |
|
470 |
40 |
CallRemotely(&rpc_job); |
|
471 |
40 |
msg_info.release_object_id(); |
|
472 |
|||
473 |
40 |
cvmfs::MsgObjectInfoReply *msg_reply = rpc_job.msg_object_info_reply(); |
|
474 |
✓✓ | 40 |
if (msg_reply->status() == cvmfs::STATUS_OK) { |
475 |
✗✓ | 39 |
assert(msg_reply->has_size()); |
476 |
39 |
return msg_reply->size(); |
|
477 |
} |
||
478 |
1 |
return Ack2Errno(msg_reply->status()); |
|
479 |
} |
||
480 |
|||
481 |
|||
482 |
1 |
void *ExternalCacheManager::MainRead(void *data) { |
|
483 |
ExternalCacheManager *cache_mgr = |
||
484 |
1 |
reinterpret_cast<ExternalCacheManager *>(data); |
|
485 |
1 |
LogCvmfs(kLogCache, kLogDebug, "starting external cache reader thread"); |
|
486 |
|||
487 |
1 |
unsigned char buffer[cache_mgr->max_object_size_]; |
|
488 |
while (true) { |
||
489 |
3029 |
CacheTransport::Frame frame_recv; |
|
490 |
3029 |
frame_recv.set_attachment(buffer, cache_mgr->max_object_size_); |
|
491 |
3029 |
bool retval = cache_mgr->transport_.RecvFrame(&frame_recv); |
|
492 |
✓✓ | 3029 |
if (!retval) |
493 |
break; |
||
494 |
|||
495 |
uint64_t req_id; |
||
496 |
3028 |
uint64_t part_nr = 0; |
|
497 |
3028 |
google::protobuf::MessageLite *msg = frame_recv.GetMsgTyped(); |
|
498 |
✓✓ | 3028 |
if (msg->GetTypeName() == "cvmfs.MsgRefcountReply") { |
499 |
19 |
req_id = reinterpret_cast<cvmfs::MsgRefcountReply *>(msg)->req_id(); |
|
500 |
✓✓ | 3009 |
} else if (msg->GetTypeName() == "cvmfs.MsgObjectInfoReply") { |
501 |
9 |
req_id = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg)->req_id(); |
|
502 |
✓✓ | 3000 |
} else if (msg->GetTypeName() == "cvmfs.MsgReadReply") { |
503 |
1800 |
req_id = reinterpret_cast<cvmfs::MsgReadReply *>(msg)->req_id(); |
|
504 |
✓✓ | 1200 |
} else if (msg->GetTypeName() == "cvmfs.MsgStoreReply") { |
505 |
200 |
req_id = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->req_id(); |
|
506 |
200 |
part_nr = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->part_nr(); |
|
507 |
✗✓ | 1000 |
} else if (msg->GetTypeName() == "cvmfs.MsgInfoReply") { |
508 |
req_id = reinterpret_cast<cvmfs::MsgInfoReply *>(msg)->req_id(); |
||
509 |
✗✓ | 1000 |
} else if (msg->GetTypeName() == "cvmfs.MsgShrinkReply") { |
510 |
req_id = reinterpret_cast<cvmfs::MsgShrinkReply *>(msg)->req_id(); |
||
511 |
✗✓ | 1000 |
} else if (msg->GetTypeName() == "cvmfs.MsgListReply") { |
512 |
req_id = reinterpret_cast<cvmfs::MsgListReply *>(msg)->req_id(); |
||
513 |
✓✗ | 1000 |
} else if (msg->GetTypeName() == "cvmfs.MsgDetach") { |
514 |
// Release pinned catalogs |
||
515 |
1000 |
cache_mgr->quota_mgr_->BroadcastBackchannels("R"); |
|
516 |
1000 |
continue; |
|
517 |
} else { |
||
518 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, "unexpected message %s", |
||
519 |
msg->GetTypeName().c_str()); |
||
520 |
abort(); |
||
521 |
} |
||
522 |
|||
523 |
2028 |
RpcInFlight rpc_inflight; |
|
524 |
{ |
||
525 |
2028 |
MutexLockGuard guard(cache_mgr->lock_inflight_rpcs_); |
|
526 |
✓✗ | 2032 |
for (unsigned i = 0; i < cache_mgr->inflight_rpcs_.size(); ++i) { |
527 |
2032 |
RpcJob *rpc_job = cache_mgr->inflight_rpcs_[i].rpc_job; |
|
528 |
✓✓✓✗ ✓✓ |
2032 |
if ((rpc_job->req_id() == req_id) && (rpc_job->part_nr() == part_nr)) { |
529 |
2028 |
rpc_inflight = cache_mgr->inflight_rpcs_[i]; |
|
530 |
cache_mgr->inflight_rpcs_.erase( |
||
531 |
2028 |
cache_mgr->inflight_rpcs_.begin() + i); |
|
532 |
2028 |
break; |
|
533 |
} |
||
534 |
2028 |
} |
|
535 |
} |
||
536 |
✗✓ | 2028 |
if (rpc_inflight.rpc_job == NULL) { |
537 |
LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug, |
||
538 |
"got unmatched rpc reply"); |
||
539 |
continue; |
||
540 |
} |
||
541 |
2028 |
rpc_inflight.rpc_job->frame_recv()->MergeFrom(frame_recv); |
|
542 |
2028 |
rpc_inflight.signal->Wakeup(); |
|
543 |
} |
||
544 |
|||
545 |
✗✓ | 1 |
if (!cache_mgr->terminated_) { |
546 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
547 |
"connection to external cache manager broken (%d)", errno); |
||
548 |
abort(); |
||
549 |
} |
||
550 |
1 |
LogCvmfs(kLogCache, kLogDebug, "stopping external cache reader thread"); |
|
551 |
1 |
return NULL; |
|
552 |
} |
||
553 |
|||
554 |
|||
555 |
394 |
int ExternalCacheManager::Open(const BlessedObject &object) { |
|
556 |
394 |
return DoOpen(object.id); |
|
557 |
} |
||
558 |
|||
559 |
|||
560 |
2 |
int ExternalCacheManager::OpenFromTxn(void *txn) { |
|
561 |
2 |
Transaction *transaction = reinterpret_cast<Transaction *>(txn); |
|
562 |
LogCvmfs(kLogCache, kLogDebug, "open fd for transaction %s", |
||
563 |
transaction->id.ToString().c_str()); |
||
564 |
2 |
int retval = Flush(true, transaction); |
|
565 |
✗✓ | 2 |
if (retval != 0) |
566 |
return retval; |
||
567 |
|||
568 |
2 |
int fd = -1; |
|
569 |
{ |
||
570 |
2 |
WriteLockGuard guard(rwlock_fd_table_); |
|
571 |
2 |
fd = fd_table_.OpenFd(ReadOnlyHandle(transaction->id)); |
|
572 |
✗✓ | 2 |
if (fd < 0) { |
573 |
LogCvmfs(kLogCache, kLogDebug, "error while creating new fd", |
||
574 |
strerror(-fd)); |
||
575 |
return fd; |
||
576 |
} |
||
577 |
} |
||
578 |
2 |
transaction->open_fds++; |
|
579 |
2 |
return fd; |
|
580 |
} |
||
581 |
|||
582 |
|||
583 |
258 |
int64_t ExternalCacheManager::Pread( |
|
584 |
int fd, |
||
585 |
void *buf, |
||
586 |
uint64_t size, |
||
587 |
uint64_t offset) |
||
588 |
{ |
||
589 |
258 |
shash::Any id = GetHandle(fd); |
|
590 |
✓✓ | 258 |
if (id == kInvalidHandle) |
591 |
1 |
return -EBADF; |
|
592 |
|||
593 |
257 |
cvmfs::MsgHash object_id; |
|
594 |
257 |
transport_.FillMsgHash(id, &object_id); |
|
595 |
257 |
uint64_t nbytes = 0; |
|
596 |
✓✓✓✓ ✓✓ |
257 |
while (nbytes < size) { |
597 |
uint64_t batch_size = |
||
598 |
2920 |
std::min(size - nbytes, static_cast<uint64_t>(max_object_size_)); |
|
599 |
2920 |
cvmfs::MsgReadReq msg_read; |
|
600 |
2920 |
msg_read.set_session_id(session_id_); |
|
601 |
2920 |
msg_read.set_req_id(NextRequestId()); |
|
602 |
2920 |
msg_read.set_allocated_object_id(&object_id); |
|
603 |
2920 |
msg_read.set_offset(offset + nbytes); |
|
604 |
2920 |
msg_read.set_size(batch_size); |
|
605 |
2920 |
RpcJob rpc_job(&msg_read); |
|
606 |
rpc_job.set_attachment_recv(reinterpret_cast<char *>(buf) + nbytes, |
||
607 |
2920 |
batch_size); |
|
608 |
2920 |
CallRemotely(&rpc_job); |
|
609 |
2920 |
msg_read.release_object_id(); |
|
610 |
|||
611 |
2920 |
cvmfs::MsgReadReply *msg_reply = rpc_job.msg_read_reply(); |
|
612 |
✓✓ | 2920 |
if (msg_reply->status() == cvmfs::STATUS_OK) { |
613 |
2915 |
nbytes += rpc_job.frame_recv()->att_size(); |
|
614 |
// Fuse sends in rounded up buffers, so short reads are expected |
||
615 |
✓✓ | 2915 |
if (rpc_job.frame_recv()->att_size() < batch_size) |
616 |
9 |
return nbytes; |
|
617 |
} else { |
||
618 |
5 |
return Ack2Errno(msg_reply->status()); |
|
619 |
} |
||
620 |
} |
||
621 |
243 |
return size; |
|
622 |
} |
||
623 |
|||
624 |
|||
625 |
2 |
int ExternalCacheManager::Readahead(int fd) { |
|
626 |
2 |
shash::Any id = GetHandle(fd); |
|
627 |
✓✓ | 2 |
if (id == kInvalidHandle) |
628 |
1 |
return -EBADF; |
|
629 |
// No-op |
||
630 |
1 |
return 0; |
|
631 |
} |
||
632 |
|||
633 |
|||
634 |
7 |
int ExternalCacheManager::Reset(void *txn) { |
|
635 |
7 |
Transaction *transaction = reinterpret_cast<Transaction *>(txn); |
|
636 |
7 |
transaction->buf_pos = 0; |
|
637 |
7 |
transaction->size = 0; |
|
638 |
7 |
transaction->open_fds = 0; |
|
639 |
7 |
transaction->committed = false; |
|
640 |
7 |
transaction->object_info_modified = true; |
|
641 |
|||
642 |
✓✓ | 7 |
if (!transaction->flushed) |
643 |
6 |
return 0; |
|
644 |
|||
645 |
1 |
cvmfs::MsgHash object_id; |
|
646 |
1 |
transport_.FillMsgHash(transaction->id, &object_id); |
|
647 |
1 |
cvmfs::MsgStoreAbortReq msg_abort; |
|
648 |
1 |
msg_abort.set_session_id(session_id_); |
|
649 |
1 |
msg_abort.set_req_id(transaction->transaction_id); |
|
650 |
1 |
msg_abort.set_allocated_object_id(&object_id); |
|
651 |
1 |
RpcJob rpc_job(&msg_abort); |
|
652 |
1 |
CallRemotely(&rpc_job); |
|
653 |
1 |
msg_abort.release_object_id(); |
|
654 |
1 |
cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply(); |
|
655 |
1 |
transaction->transaction_id = NextRequestId(); |
|
656 |
1 |
transaction->flushed = false; |
|
657 |
1 |
return Ack2Errno(msg_reply->status()); |
|
658 |
} |
||
659 |
|||
660 |
|||
661 |
1 |
void ExternalCacheManager::Spawn() { |
|
662 |
1 |
int retval = pthread_create(&thread_read_, NULL, MainRead, this); |
|
663 |
✗✓ | 1 |
assert(retval == 0); |
664 |
1 |
spawned_ = true; |
|
665 |
1 |
} |
|
666 |
|||
667 |
|||
668 |
/** |
||
669 |
* Returns true if the plugin could be spawned or was spawned by another |
||
670 |
* process. |
||
671 |
*/ |
||
672 |
bool ExternalCacheManager::SpawnPlugin(const vector<string> &cmd_line) { |
||
673 |
if (cmd_line.empty()) |
||
674 |
return false; |
||
675 |
|||
676 |
int pipe_ready[2]; |
||
677 |
MakePipe(pipe_ready); |
||
678 |
set<int> preserve_filedes; |
||
679 |
preserve_filedes.insert(pipe_ready[1]); |
||
680 |
|||
681 |
int fd_null_read = open("/dev/null", O_RDONLY); |
||
682 |
int fd_null_write = open("/dev/null", O_WRONLY); |
||
683 |
assert((fd_null_read >= 0) && (fd_null_write >= 0)); |
||
684 |
map<int, int> map_fildes; |
||
685 |
map_fildes[fd_null_read] = 0; |
||
686 |
map_fildes[fd_null_write] = 1; |
||
687 |
map_fildes[fd_null_write] = 2; |
||
688 |
|||
689 |
pid_t child_pid; |
||
690 |
int retval = setenv(CacheTransport::kEnvReadyNotifyFd, |
||
691 |
StringifyInt(pipe_ready[1]).c_str(), 1); |
||
692 |
assert(retval == 0); |
||
693 |
retval = ManagedExec(cmd_line, |
||
694 |
preserve_filedes, |
||
695 |
map_fildes, |
||
696 |
false, // drop_credentials |
||
697 |
true, // double fork |
||
698 |
&child_pid); |
||
699 |
unsetenv(CacheTransport::kEnvReadyNotifyFd); |
||
700 |
close(fd_null_read); |
||
701 |
close(fd_null_write); |
||
702 |
if (!retval) { |
||
703 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
||
704 |
"failed to start cache plugin '%s'", |
||
705 |
JoinStrings(cmd_line, " ").c_str()); |
||
706 |
ClosePipe(pipe_ready); |
||
707 |
return false; |
||
708 |
} |
||
709 |
|||
710 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslog, |
||
711 |
"started cache plugin '%s' (pid %d), waiting for it to become ready", |
||
712 |
JoinStrings(cmd_line, " ").c_str(), child_pid); |
||
713 |
close(pipe_ready[1]); |
||
714 |
char buf; |
||
715 |
if (read(pipe_ready[0], &buf, 1) != 1) { |
||
716 |
close(pipe_ready[0]); |
||
717 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
||
718 |
"cache plugin did not start properly"); |
||
719 |
return false; |
||
720 |
} |
||
721 |
close(pipe_ready[0]); |
||
722 |
|||
723 |
if (buf == CacheTransport::kReadyNotification) |
||
724 |
return true; |
||
725 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
||
726 |
"cache plugin failed to create an endpoint"); |
||
727 |
return false; |
||
728 |
} |
||
729 |
|||
730 |
|||
731 |
2040 |
int ExternalCacheManager::StartTxn( |
|
732 |
const shash::Any &id, |
||
733 |
uint64_t size, |
||
734 |
void *txn) |
||
735 |
{ |
||
736 |
✓✓ | 2040 |
if (!(capabilities_ & cvmfs::CAP_WRITE)) |
737 |
2 |
return -EROFS; |
|
738 |
|||
739 |
✓✗ | 2038 |
Transaction *transaction = new (txn) Transaction(id); |
740 |
2038 |
transaction->expected_size = size; |
|
741 |
2038 |
transaction->transaction_id = NextRequestId(); |
|
742 |
#ifdef __APPLE__ |
||
743 |
transaction->buffer = |
||
744 |
reinterpret_cast<unsigned char *>(smalloc(max_object_size_)); |
||
745 |
#endif |
||
746 |
2038 |
return 0; |
|
747 |
} |
||
748 |
|||
749 |
|||
750 |
2042 |
int64_t ExternalCacheManager::Write(const void *buf, uint64_t size, void *txn) { |
|
751 |
2042 |
Transaction *transaction = reinterpret_cast<Transaction *>(txn); |
|
752 |
✗✓ | 2042 |
assert(!transaction->committed); |
753 |
LogCvmfs(kLogCache, kLogDebug, "writing %" PRIu64 " bytes for %s", |
||
754 |
8 |
size, transaction->id.ToString().c_str()); |
|
755 |
|||
756 |
✓✗ | 2042 |
if (transaction->expected_size != kSizeUnknown) { |
757 |
✗✓ | 2042 |
if (transaction->size + size > transaction->expected_size) { |
758 |
LogCvmfs(kLogCache, kLogDebug, |
||
759 |
"Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")", |
||
760 |
transaction->size + size, transaction->expected_size); |
||
761 |
return -EFBIG; |
||
762 |
} |
||
763 |
} |
||
764 |
|||
765 |
2042 |
uint64_t written = 0; |
|
766 |
2042 |
const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf); |
|
767 |
✓✓ | 7107 |
while (written < size) { |
768 |
✓✓ | 3023 |
if (transaction->buf_pos == max_object_size_) { |
769 |
984 |
bool do_commit = false; |
|
770 |
✓✗ | 984 |
if (transaction->expected_size != kSizeUnknown) |
771 |
984 |
do_commit = (transaction->size + written) == transaction->expected_size; |
|
772 |
984 |
int retval = Flush(do_commit, transaction); |
|
773 |
✗✓ | 984 |
if (retval != 0) { |
774 |
transaction->size += written; |
||
775 |
return retval; |
||
776 |
} |
||
777 |
984 |
transaction->size += transaction->buf_pos; |
|
778 |
984 |
transaction->buf_pos = 0; |
|
779 |
} |
||
780 |
3023 |
uint64_t remaining = size - written; |
|
781 |
3023 |
uint64_t space_in_buffer = max_object_size_ - transaction->buf_pos; |
|
782 |
3023 |
uint64_t batch_size = std::min(remaining, space_in_buffer); |
|
783 |
3023 |
memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size); |
|
784 |
3023 |
transaction->buf_pos += batch_size; |
|
785 |
3023 |
written += batch_size; |
|
786 |
3023 |
read_pos += batch_size; |
|
787 |
} |
||
788 |
2042 |
return written; |
|
789 |
} |
||
790 |
|||
791 |
|||
792 |
//------------------------------------------------------------------------------ |
||
793 |
|||
794 |
|||
795 |
14 |
bool ExternalQuotaManager::DoListing( |
|
796 |
cvmfs::EnumObjectType type, |
||
797 |
vector<cvmfs::MsgListRecord> *result) |
||
798 |
{ |
||
799 |
✗✓ | 14 |
if (!(cache_mgr_->capabilities_ & cvmfs::CAP_LIST)) |
800 |
return false; |
||
801 |
|||
802 |
14 |
uint64_t listing_id = 0; |
|
803 |
14 |
bool more_data = false; |
|
804 |
✓✓ | 16 |
do { |
805 |
16 |
cvmfs::MsgListReq msg_list; |
|
806 |
16 |
msg_list.set_session_id(cache_mgr_->session_id_); |
|
807 |
16 |
msg_list.set_req_id(cache_mgr_->NextRequestId()); |
|
808 |
16 |
msg_list.set_listing_id(listing_id); |
|
809 |
16 |
msg_list.set_object_type(type); |
|
810 |
16 |
ExternalCacheManager::RpcJob rpc_job(&msg_list); |
|
811 |
16 |
cache_mgr_->CallRemotely(&rpc_job); |
|
812 |
|||
813 |
16 |
cvmfs::MsgListReply *msg_reply = rpc_job.msg_list_reply(); |
|
814 |
✗✓ | 16 |
if (msg_reply->status() != cvmfs::STATUS_OK) |
815 |
return false; |
||
816 |
16 |
more_data = !msg_reply->is_last_part(); |
|
817 |
16 |
listing_id = msg_reply->listing_id(); |
|
818 |
✓✓ | 204020 |
for (int i = 0; i < msg_reply->list_record_size(); ++i) { |
819 |
204004 |
result->push_back(msg_reply->list_record(i)); |
|
820 |
} |
||
821 |
} while (more_data); |
||
822 |
|||
823 |
14 |
return true; |
|
824 |
} |
||
825 |
|||
826 |
|||
827 |
10 |
bool ExternalQuotaManager::Cleanup(const uint64_t leave_size) { |
|
828 |
✗✓ | 10 |
if (!(cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK)) |
829 |
return false; |
||
830 |
|||
831 |
10 |
cvmfs::MsgShrinkReq msg_shrink; |
|
832 |
10 |
msg_shrink.set_session_id(cache_mgr_->session_id_); |
|
833 |
10 |
msg_shrink.set_req_id(cache_mgr_->NextRequestId()); |
|
834 |
10 |
msg_shrink.set_shrink_to(leave_size); |
|
835 |
10 |
ExternalCacheManager::RpcJob rpc_job(&msg_shrink); |
|
836 |
10 |
cache_mgr_->CallRemotely(&rpc_job); |
|
837 |
|||
838 |
10 |
cvmfs::MsgShrinkReply *msg_reply = rpc_job.msg_shrink_reply(); |
|
839 |
10 |
return msg_reply->status() == cvmfs::STATUS_OK; |
|
840 |
} |
||
841 |
|||
842 |
|||
843 |
39 |
ExternalQuotaManager *ExternalQuotaManager::Create( |
|
844 |
ExternalCacheManager *cache_mgr) |
||
845 |
{ |
||
846 |
UniquePtr<ExternalQuotaManager> quota_mgr( |
||
847 |
39 |
new ExternalQuotaManager(cache_mgr)); |
|
848 |
✗✓ | 39 |
assert(quota_mgr.IsValid()); |
849 |
|||
850 |
39 |
return quota_mgr.Release(); |
|
851 |
} |
||
852 |
|||
853 |
|||
854 |
28 |
int ExternalQuotaManager::GetInfo(QuotaInfo *quota_info) { |
|
855 |
✗✓ | 28 |
if (!(cache_mgr_->capabilities_ & cvmfs::CAP_INFO)) |
856 |
return Ack2Errno(cvmfs::STATUS_NOSUPPORT); |
||
857 |
|||
858 |
28 |
cvmfs::MsgInfoReq msg_info; |
|
859 |
28 |
msg_info.set_session_id(cache_mgr_->session_id_); |
|
860 |
28 |
msg_info.set_req_id(cache_mgr_->NextRequestId()); |
|
861 |
28 |
ExternalCacheManager::RpcJob rpc_job(&msg_info); |
|
862 |
28 |
cache_mgr_->CallRemotely(&rpc_job); |
|
863 |
|||
864 |
28 |
cvmfs::MsgInfoReply *msg_reply = rpc_job.msg_info_reply(); |
|
865 |
✓✗ | 28 |
if (msg_reply->status() == cvmfs::STATUS_OK) { |
866 |
28 |
quota_info->size = msg_reply->size_bytes(); |
|
867 |
28 |
quota_info->used = msg_reply->used_bytes(); |
|
868 |
28 |
quota_info->pinned = msg_reply->pinned_bytes(); |
|
869 |
✓✗ | 28 |
if (msg_reply->no_shrink() >= 0) |
870 |
28 |
quota_info->no_shrink = msg_reply->no_shrink(); |
|
871 |
} |
||
872 |
28 |
return Ack2Errno(msg_reply->status()); |
|
873 |
} |
||
874 |
|||
875 |
|||
876 |
5 |
uint64_t ExternalQuotaManager::GetCapacity() { |
|
877 |
5 |
QuotaInfo info; |
|
878 |
5 |
int retval = GetInfo(&info); |
|
879 |
✗✓ | 5 |
if (retval != 0) |
880 |
return uint64_t(-1); |
||
881 |
5 |
return info.size; |
|
882 |
} |
||
883 |
|||
884 |
|||
885 |
uint64_t ExternalQuotaManager::GetCleanupRate(uint64_t period_s) { |
||
886 |
QuotaInfo info; |
||
887 |
int retval = GetInfo(&info); |
||
888 |
if (retval != 0) |
||
889 |
return 0; |
||
890 |
return info.no_shrink; |
||
891 |
} |
||
892 |
|||
893 |
|||
894 |
15 |
uint64_t ExternalQuotaManager::GetSize() { |
|
895 |
15 |
QuotaInfo info; |
|
896 |
15 |
int retval = GetInfo(&info); |
|
897 |
✗✓ | 15 |
if (retval != 0) |
898 |
return 0; |
||
899 |
15 |
return info.used; |
|
900 |
} |
||
901 |
|||
902 |
|||
903 |
8 |
uint64_t ExternalQuotaManager::GetSizePinned() { |
|
904 |
8 |
QuotaInfo info; |
|
905 |
8 |
int retval = GetInfo(&info); |
|
906 |
✗✓ | 8 |
if (retval != 0) |
907 |
return 0; |
||
908 |
8 |
return info.pinned; |
|
909 |
} |
||
910 |
|||
911 |
|||
912 |
bool ExternalQuotaManager::HasCapability(Capabilities capability) { |
||
913 |
switch (capability) { |
||
914 |
case kCapIntrospectSize: |
||
915 |
return cache_mgr_->capabilities_ & cvmfs::CAP_INFO; |
||
916 |
case kCapIntrospectCleanupRate: |
||
917 |
return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK_RATE; |
||
918 |
case kCapList: |
||
919 |
return cache_mgr_->capabilities_ & cvmfs::CAP_LIST; |
||
920 |
case kCapShrink: |
||
921 |
return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK; |
||
922 |
case kCapListeners: |
||
923 |
return true; |
||
924 |
default: |
||
925 |
return false; |
||
926 |
} |
||
927 |
} |
||
928 |
|||
929 |
|||
930 |
3 |
vector<string> ExternalQuotaManager::List() { |
|
931 |
3 |
vector<string> result; |
|
932 |
3 |
vector<cvmfs::MsgListRecord> raw_list; |
|
933 |
3 |
bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_list); |
|
934 |
✓✗ | 3 |
if (!retval) |
935 |
return result; |
||
936 |
✓✓ | 102005 |
for (unsigned i = 0; i < raw_list.size(); ++i) |
937 |
102002 |
result.push_back(raw_list[i].description()); |
|
938 |
return result; |
||
939 |
} |
||
940 |
|||
941 |
|||
942 |
1 |
vector<string> ExternalQuotaManager::ListCatalogs() { |
|
943 |
1 |
vector<string> result; |
|
944 |
1 |
vector<cvmfs::MsgListRecord> raw_list; |
|
945 |
1 |
bool retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_list); |
|
946 |
✓✗ | 1 |
if (!retval) |
947 |
return result; |
||
948 |
✗✓ | 1 |
for (unsigned i = 0; i < raw_list.size(); ++i) |
949 |
result.push_back(raw_list[i].description()); |
||
950 |
return result; |
||
951 |
} |
||
952 |
|||
953 |
|||
954 |
3 |
vector<string> ExternalQuotaManager::ListPinned() { |
|
955 |
3 |
vector<string> result; |
|
956 |
✓✓✗✗ ✗✗ |
3 |
vector<cvmfs::MsgListRecord> raw_lists[3]; |
957 |
3 |
bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]); |
|
958 |
✓✗ | 3 |
if (!retval) |
959 |
return result; |
||
960 |
3 |
retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]); |
|
961 |
✓✗ | 3 |
if (!retval) |
962 |
return result; |
||
963 |
3 |
retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]); |
|
964 |
✓✗ | 3 |
if (!retval) |
965 |
return result; |
||
966 |
✓✓ | 12 |
for (unsigned i = 0; i < sizeof(raw_lists) / sizeof(raw_lists[0]); ++i) { |
967 |
✓✓ | 102011 |
for (unsigned j = 0; j < raw_lists[i].size(); ++j) { |
968 |
✓✓ | 102002 |
if (raw_lists[i][j].pinned()) |
969 |
200 |
result.push_back(raw_lists[i][j].description()); |
|
970 |
} |
||
971 |
} |
||
972 |
return result; |
||
973 |
} |
||
974 |
|||
975 |
|||
976 |
1 |
vector<string> ExternalQuotaManager::ListVolatile() { |
|
977 |
1 |
vector<string> result; |
|
978 |
1 |
vector<cvmfs::MsgListRecord> raw_list; |
|
979 |
1 |
bool retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_list); |
|
980 |
✓✗ | 1 |
if (!retval) |
981 |
return result; |
||
982 |
✗✓ | 1 |
for (unsigned i = 0; i < raw_list.size(); ++i) |
983 |
result.push_back(raw_list[i].description()); |
||
984 |
return result; |
||
985 |
} |
||
986 |
|||
987 |
|||
988 |
1 |
void ExternalQuotaManager::RegisterBackChannel( |
|
989 |
int back_channel[2], |
||
990 |
const string &channel_id) |
||
991 |
{ |
||
992 |
1 |
shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id)); |
|
993 |
1 |
MakePipe(back_channel); |
|
994 |
1 |
LockBackChannels(); |
|
995 |
✗✓ | 1 |
assert(back_channels_.find(hash_id) == back_channels_.end()); |
996 |
1 |
back_channels_[hash_id] = back_channel[1]; |
|
997 |
1 |
UnlockBackChannels(); |
|
998 |
1 |
} |
|
999 |
|||
1000 |
|||
1001 |
1 |
void ExternalQuotaManager::UnregisterBackChannel( |
|
1002 |
int back_channel[2], |
||
1003 |
const string &channel_id) |
||
1004 |
{ |
||
1005 |
1 |
shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id)); |
|
1006 |
1 |
LockBackChannels(); |
|
1007 |
1 |
back_channels_.erase(hash_id); |
|
1008 |
1 |
UnlockBackChannels(); |
|
1009 |
1 |
ClosePipe(back_channel); |
|
1010 |
✓✗✓✗ |
52 |
} |
Generated by: GCOVR (Version 4.1) |