GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
*/ |
||
4 |
#include "cvmfs_config.h" |
||
5 |
#include "channel.h" |
||
6 |
|||
7 |
#include <errno.h> |
||
8 |
#include <poll.h> |
||
9 |
#include <signal.h> |
||
10 |
#include <sys/socket.h> |
||
11 |
#include <sys/un.h> |
||
12 |
#include <unistd.h> |
||
13 |
|||
14 |
#include <cassert> |
||
15 |
#include <cstring> |
||
16 |
#include <vector> |
||
17 |
|||
18 |
#include "logging.h" |
||
19 |
#include "platform.h" |
||
20 |
#include "smalloc.h" |
||
21 |
#include "util/pointer.h" |
||
22 |
#include "util/posix.h" |
||
23 |
#include "util/string.h" |
||
24 |
#include "util_concurrency.h" |
||
25 |
|||
26 |
using namespace std; // NOLINT |
||
27 |
|||
28 |
|||
29 |
SessionCtx *SessionCtx::instance_ = NULL; |
||
30 |
|||
31 |
void SessionCtx::CleanupInstance() { |
||
32 |
delete instance_; |
||
33 |
instance_ = NULL; |
||
34 |
} |
||
35 |
|||
36 |
|||
37 |
1 |
SessionCtx::SessionCtx() { |
|
38 |
lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>( |
||
39 |
1 |
smalloc(sizeof(pthread_mutex_t))); |
|
40 |
1 |
int retval = pthread_mutex_init(lock_tls_blocks_, NULL); |
|
41 |
✗✓ | 1 |
assert(retval == 0); |
42 |
1 |
} |
|
43 |
|||
44 |
|||
45 |
SessionCtx::~SessionCtx() { |
||
46 |
pthread_mutex_destroy(lock_tls_blocks_); |
||
47 |
free(lock_tls_blocks_); |
||
48 |
|||
49 |
for (unsigned i = 0; i < tls_blocks_.size(); ++i) { |
||
50 |
delete tls_blocks_[i]; |
||
51 |
} |
||
52 |
|||
53 |
int retval = pthread_key_delete(thread_local_storage_); |
||
54 |
assert(retval == 0); |
||
55 |
} |
||
56 |
|||
57 |
|||
58 |
9604 |
SessionCtx *SessionCtx::GetInstance() { |
|
59 |
✓✓ | 9604 |
if (instance_ == NULL) { |
60 |
1 |
instance_ = new SessionCtx(); |
|
61 |
int retval = |
||
62 |
1 |
pthread_key_create(&instance_->thread_local_storage_, TlsDestructor); |
|
63 |
✗✓ | 1 |
assert(retval == 0); |
64 |
} |
||
65 |
|||
66 |
9604 |
return instance_; |
|
67 |
} |
||
68 |
|||
69 |
|||
70 |
2776 |
void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) { |
|
71 |
ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>( |
||
72 |
2776 |
pthread_getspecific(thread_local_storage_)); |
|
73 |
✓✓✗✓ |
2777 |
if ((tls == NULL) || !tls->is_set) { |
74 |
1 |
*id = 0; |
|
75 |
1 |
*reponame = NULL; |
|
76 |
1 |
*client_instance = NULL; |
|
77 |
} else { |
||
78 |
2775 |
*id = tls->id; |
|
79 |
2775 |
*reponame = tls->reponame; |
|
80 |
2775 |
*client_instance = tls->client_instance; |
|
81 |
} |
||
82 |
2776 |
} |
|
83 |
|||
84 |
|||
85 |
bool SessionCtx::IsSet() { |
||
86 |
ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>( |
||
87 |
pthread_getspecific(thread_local_storage_)); |
||
88 |
if (tls == NULL) |
||
89 |
return false; |
||
90 |
|||
91 |
return tls->is_set; |
||
92 |
} |
||
93 |
|||
94 |
|||
95 |
3414 |
void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) { |
|
96 |
ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>( |
||
97 |
3414 |
pthread_getspecific(thread_local_storage_)); |
|
98 |
|||
99 |
✓✓ | 3414 |
if (tls == NULL) { |
100 |
14 |
tls = new ThreadLocalStorage(id, reponame, client_instance); |
|
101 |
14 |
int retval = pthread_setspecific(thread_local_storage_, tls); |
|
102 |
✗✓ | 14 |
assert(retval == 0); |
103 |
14 |
MutexLockGuard lock_guard(lock_tls_blocks_); |
|
104 |
14 |
tls_blocks_.push_back(tls); |
|
105 |
} else { |
||
106 |
3400 |
tls->id = id; |
|
107 |
3400 |
tls->reponame = reponame; |
|
108 |
3400 |
tls->client_instance = client_instance; |
|
109 |
3400 |
tls->is_set = true; |
|
110 |
} |
||
111 |
3414 |
} |
|
112 |
|||
113 |
|||
114 |
14 |
void SessionCtx::TlsDestructor(void *data) { |
|
115 |
14 |
ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data); |
|
116 |
14 |
delete tls; |
|
117 |
|||
118 |
✗✓ | 14 |
assert(instance_); |
119 |
14 |
MutexLockGuard lock_guard(instance_->lock_tls_blocks_); |
|
120 |
✓✗ | 14 |
for (vector<ThreadLocalStorage *>::iterator i = |
121 |
14 |
instance_->tls_blocks_.begin(), iEnd = instance_->tls_blocks_.end(); |
|
122 |
i != iEnd; ++i) |
||
123 |
{ |
||
124 |
✓✗ | 14 |
if ((*i) == tls) { |
125 |
14 |
instance_->tls_blocks_.erase(i); |
|
126 |
14 |
break; |
|
127 |
} |
||
128 |
14 |
} |
|
129 |
14 |
} |
|
130 |
|||
131 |
|||
132 |
3414 |
void SessionCtx::Unset() { |
|
133 |
ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>( |
||
134 |
3414 |
pthread_getspecific(thread_local_storage_)); |
|
135 |
✓✗ | 3414 |
if (tls != NULL) { |
136 |
3414 |
tls->is_set = false; |
|
137 |
3414 |
tls->id = 0; |
|
138 |
3414 |
tls->reponame = NULL; |
|
139 |
3414 |
tls->client_instance = NULL; |
|
140 |
} |
||
141 |
3414 |
} |
|
142 |
|||
143 |
|||
144 |
//------------------------------------------------------------------------------ |
||
145 |
|||
146 |
|||
147 |
17 |
CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name) |
|
148 |
: id(id) |
||
149 |
17 |
, name(name) |
|
150 |
{ |
||
151 |
17 |
vector<string> tokens = SplitString(name, ':'); |
|
152 |
17 |
reponame = strdup(tokens[0].c_str()); |
|
153 |
✓✓ | 17 |
if (tokens.size() > 1) |
154 |
15 |
client_instance = strdup(tokens[1].c_str()); |
|
155 |
else |
||
156 |
2 |
client_instance = NULL; |
|
157 |
} |
||
158 |
|||
159 |
const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1); |
||
160 |
|||
161 |
|||
162 |
1002 |
void CachePlugin::AskToDetach() { |
|
163 |
1002 |
char detach = kSignalDetach; |
|
164 |
1002 |
WritePipe(pipe_ctrl_[1], &detach, 1); |
|
165 |
1002 |
} |
|
166 |
|||
167 |
|||
168 |
16 |
CachePlugin::CachePlugin(uint64_t capabilities) |
|
169 |
: is_local_(false) |
||
170 |
, capabilities_(capabilities) |
||
171 |
, fd_socket_(-1) |
||
172 |
, fd_socket_lock_(-1) |
||
173 |
, running_(0) |
||
174 |
, num_workers_(0) |
||
175 |
, max_object_size_(kDefaultMaxObjectSize) |
||
176 |
16 |
, num_inlimbo_clients_(0) |
|
177 |
{ |
||
178 |
16 |
atomic_init64(&next_session_id_); |
|
179 |
16 |
atomic_init64(&next_txn_id_); |
|
180 |
16 |
atomic_init64(&next_lst_id_); |
|
181 |
// Don't use listing id zero |
||
182 |
16 |
atomic_inc64(&next_lst_id_); |
|
183 |
16 |
txn_ids_.Init(128, UniqueRequest(), HashUniqueRequest); |
|
184 |
16 |
memset(&thread_io_, 0, sizeof(thread_io_)); |
|
185 |
16 |
MakePipe(pipe_ctrl_); |
|
186 |
} |
||
187 |
|||
188 |
|||
189 |
16 |
CachePlugin::~CachePlugin() { |
|
190 |
16 |
Terminate(); |
|
191 |
16 |
ClosePipe(pipe_ctrl_); |
|
192 |
✓✗ | 16 |
if (fd_socket_ >= 0) |
193 |
16 |
close(fd_socket_); |
|
194 |
✓✗ | 16 |
if (fd_socket_lock_ >= 0) |
195 |
16 |
UnlockFile(fd_socket_lock_); |
|
196 |
} |
||
197 |
|||
198 |
|||
199 |
17 |
void CachePlugin::HandleHandshake( |
|
200 |
cvmfs::MsgHandshake *msg_req, |
||
201 |
CacheTransport *transport) |
||
202 |
{ |
||
203 |
17 |
uint64_t session_id = NextSessionId(); |
|
204 |
✓✗ | 17 |
if (msg_req->has_name()) { |
205 |
17 |
sessions_[session_id] = SessionInfo(session_id, msg_req->name()); |
|
206 |
} else { |
||
207 |
sessions_[session_id] = SessionInfo(session_id, |
||
208 |
"anonymous client (" + StringifyInt(session_id) + ")"); |
||
209 |
} |
||
210 |
17 |
cvmfs::MsgHandshakeAck msg_ack; |
|
211 |
17 |
CacheTransport::Frame frame_send(&msg_ack); |
|
212 |
|||
213 |
17 |
msg_ack.set_status(cvmfs::STATUS_OK); |
|
214 |
17 |
msg_ack.set_name(name_); |
|
215 |
17 |
msg_ack.set_protocol_version(kPbProtocolVersion); |
|
216 |
17 |
msg_ack.set_max_object_size(max_object_size_); |
|
217 |
17 |
msg_ack.set_session_id(session_id); |
|
218 |
17 |
msg_ack.set_capabilities(capabilities_); |
|
219 |
✓✗ | 17 |
if (is_local_) |
220 |
17 |
msg_ack.set_pid(getpid()); |
|
221 |
17 |
transport->SendFrame(&frame_send); |
|
222 |
17 |
} |
|
223 |
|||
224 |
|||
225 |
4 |
void CachePlugin::HandleInfo( |
|
226 |
cvmfs::MsgInfoReq *msg_req, |
||
227 |
CacheTransport *transport) |
||
228 |
{ |
||
229 |
4 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
230 |
4 |
cvmfs::MsgInfoReply msg_reply; |
|
231 |
4 |
CacheTransport::Frame frame_send(&msg_reply); |
|
232 |
|||
233 |
4 |
msg_reply.set_req_id(msg_req->req_id()); |
|
234 |
4 |
Info info; |
|
235 |
4 |
cvmfs::EnumStatus status = GetInfo(&info); |
|
236 |
✗✓ | 4 |
if (status != cvmfs::STATUS_OK) { |
237 |
LogSessionError(msg_req->session_id(), status, |
||
238 |
"failed to query cache status"); |
||
239 |
} |
||
240 |
4 |
msg_reply.set_size_bytes(info.size_bytes); |
|
241 |
4 |
msg_reply.set_used_bytes(info.used_bytes); |
|
242 |
4 |
msg_reply.set_pinned_bytes(info.pinned_bytes); |
|
243 |
4 |
msg_reply.set_no_shrink(info.no_shrink); |
|
244 |
4 |
msg_reply.set_status(status); |
|
245 |
4 |
transport->SendFrame(&frame_send); |
|
246 |
4 |
} |
|
247 |
|||
248 |
|||
249 |
4 |
void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) { |
|
250 |
✗✓ | 4 |
if (!msg_req->has_conncnt_change_by()) |
251 |
return; |
||
252 |
4 |
int32_t conncnt_change_by = msg_req->conncnt_change_by(); |
|
253 |
✗✓ | 4 |
if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) { |
254 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
255 |
"invalid request to drop connection counter below zero"); |
||
256 |
return; |
||
257 |
} |
||
258 |
✓✓ | 4 |
if (conncnt_change_by > 0) { |
259 |
2 |
LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime"); |
|
260 |
} else { |
||
261 |
2 |
LogSessionInfo(msg_req->session_id(), "release session lock"); |
|
262 |
} |
||
263 |
4 |
num_inlimbo_clients_ += conncnt_change_by; |
|
264 |
} |
||
265 |
|||
266 |
|||
267 |
8 |
void CachePlugin::HandleList( |
|
268 |
cvmfs::MsgListReq *msg_req, |
||
269 |
CacheTransport *transport) |
||
270 |
{ |
||
271 |
8 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
272 |
8 |
cvmfs::MsgListReply msg_reply; |
|
273 |
8 |
CacheTransport::Frame frame_send(&msg_reply); |
|
274 |
|||
275 |
8 |
msg_reply.set_req_id(msg_req->req_id()); |
|
276 |
8 |
int64_t listing_id = msg_req->listing_id(); |
|
277 |
8 |
msg_reply.set_listing_id(listing_id); |
|
278 |
8 |
msg_reply.set_is_last_part(true); |
|
279 |
|||
280 |
cvmfs::EnumStatus status; |
||
281 |
✓✓ | 8 |
if (msg_req->listing_id() == 0) { |
282 |
6 |
listing_id = NextLstId(); |
|
283 |
6 |
status = ListingBegin(listing_id, msg_req->object_type()); |
|
284 |
✗✓ | 6 |
if (status != cvmfs::STATUS_OK) { |
285 |
LogSessionError(msg_req->session_id(), status, |
||
286 |
"failed to start enumeration of objects"); |
||
287 |
msg_reply.set_status(status); |
||
288 |
transport->SendFrame(&frame_send); |
||
289 |
return; |
||
290 |
} |
||
291 |
6 |
msg_reply.set_listing_id(listing_id); |
|
292 |
} |
||
293 |
✗✓ | 8 |
assert(listing_id != 0); |
294 |
|||
295 |
8 |
ObjectInfo item; |
|
296 |
8 |
unsigned total_size = 0; |
|
297 |
✓✓ | 200014 |
while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) { |
298 |
200000 |
cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record(); |
|
299 |
200000 |
cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash(); |
|
300 |
200000 |
transport->FillMsgHash(item.id, msg_hash); |
|
301 |
200000 |
msg_list_record->set_allocated_hash(msg_hash); |
|
302 |
200000 |
msg_list_record->set_pinned(item.pinned); |
|
303 |
200000 |
msg_list_record->set_description(item.description); |
|
304 |
// Approximation of the message size |
||
305 |
200000 |
total_size += sizeof(item) + item.description.length(); |
|
306 |
✓✓ | 200000 |
if (total_size > kListingSize) |
307 |
2 |
break; |
|
308 |
} |
||
309 |
✓✓ | 8 |
if (status == cvmfs::STATUS_OUTOFBOUNDS) { |
310 |
6 |
ListingEnd(listing_id); |
|
311 |
6 |
status = cvmfs::STATUS_OK; |
|
312 |
} else { |
||
313 |
2 |
msg_reply.set_is_last_part(false); |
|
314 |
} |
||
315 |
✗✓ | 8 |
if (status != cvmfs::STATUS_OK) { |
316 |
LogSessionError(msg_req->session_id(), status, "failed enumerate objects"); |
||
317 |
} |
||
318 |
8 |
msg_reply.set_status(status); |
|
319 |
✗✓✗✓ |
8 |
transport->SendFrame(&frame_send); |
320 |
} |
||
321 |
|||
322 |
|||
323 |
16 |
void CachePlugin::HandleObjectInfo( |
|
324 |
cvmfs::MsgObjectInfoReq *msg_req, |
||
325 |
CacheTransport *transport) |
||
326 |
{ |
||
327 |
16 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
328 |
16 |
cvmfs::MsgObjectInfoReply msg_reply; |
|
329 |
16 |
CacheTransport::Frame frame_send(&msg_reply); |
|
330 |
|||
331 |
16 |
msg_reply.set_req_id(msg_req->req_id()); |
|
332 |
16 |
shash::Any object_id; |
|
333 |
16 |
bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id); |
|
334 |
✗✓ | 16 |
if (!retval) { |
335 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
336 |
"malformed hash received from client"); |
||
337 |
msg_reply.set_status(cvmfs::STATUS_MALFORMED); |
||
338 |
} else { |
||
339 |
16 |
ObjectInfo info; |
|
340 |
16 |
cvmfs::EnumStatus status = GetObjectInfo(object_id, &info); |
|
341 |
16 |
msg_reply.set_status(status); |
|
342 |
✓✓ | 16 |
if (status == cvmfs::STATUS_OK) { |
343 |
15 |
msg_reply.set_object_type(info.object_type); |
|
344 |
15 |
msg_reply.set_size(info.size); |
|
345 |
✓✗ | 1 |
} else if (status != cvmfs::STATUS_NOENTRY) { |
346 |
LogSessionError(msg_req->session_id(), status, |
||
347 |
1 |
"failed retrieving object details"); |
|
348 |
} |
||
349 |
} |
||
350 |
16 |
transport->SendFrame(&frame_send); |
|
351 |
16 |
} |
|
352 |
|||
353 |
|||
354 |
2211 |
void CachePlugin::HandleRead( |
|
355 |
cvmfs::MsgReadReq *msg_req, |
||
356 |
CacheTransport *transport) |
||
357 |
{ |
||
358 |
2211 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
359 |
2211 |
cvmfs::MsgReadReply msg_reply; |
|
360 |
2211 |
CacheTransport::Frame frame_send(&msg_reply); |
|
361 |
|||
362 |
2211 |
msg_reply.set_req_id(msg_req->req_id()); |
|
363 |
2211 |
shash::Any object_id; |
|
364 |
2211 |
bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id); |
|
365 |
✓✗✗✓ ✗✓ |
2211 |
if (!retval || (msg_req->size() > max_object_size_)) { |
366 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
367 |
"malformed hash received from client"); |
||
368 |
msg_reply.set_status(cvmfs::STATUS_MALFORMED); |
||
369 |
transport->SendFrame(&frame_send); |
||
370 |
return; |
||
371 |
} |
||
372 |
2211 |
unsigned size = msg_req->size(); |
|
373 |
#ifdef __APPLE__ |
||
374 |
unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size)); |
||
375 |
#else |
||
376 |
2211 |
unsigned char buffer[size]; |
|
377 |
#endif |
||
378 |
2211 |
cvmfs::EnumStatus status = Pread(object_id, msg_req->offset(), &size, buffer); |
|
379 |
2211 |
msg_reply.set_status(status); |
|
380 |
✓✓ | 2211 |
if (status == cvmfs::STATUS_OK) { |
381 |
2210 |
frame_send.set_attachment(buffer, size); |
|
382 |
} else { |
||
383 |
LogSessionError(msg_req->session_id(), status, |
||
384 |
1 |
"failed to read from object"); |
|
385 |
} |
||
386 |
✗✓✗✓ ✗✓ |
2211 |
transport->SendFrame(&frame_send); |
387 |
#ifdef __APPLE__ |
||
388 |
free(buffer); |
||
389 |
#endif |
||
390 |
} |
||
391 |
|||
392 |
|||
393 |
564 |
void CachePlugin::HandleRefcount( |
|
394 |
cvmfs::MsgRefcountReq *msg_req, |
||
395 |
CacheTransport *transport) |
||
396 |
{ |
||
397 |
564 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
398 |
564 |
cvmfs::MsgRefcountReply msg_reply; |
|
399 |
564 |
CacheTransport::Frame frame_send(&msg_reply); |
|
400 |
|||
401 |
564 |
msg_reply.set_req_id(msg_req->req_id()); |
|
402 |
564 |
shash::Any object_id; |
|
403 |
564 |
bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id); |
|
404 |
✗✓ | 564 |
if (!retval) { |
405 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
406 |
"malformed hash received from client"); |
||
407 |
msg_reply.set_status(cvmfs::STATUS_MALFORMED); |
||
408 |
} else { |
||
409 |
564 |
cvmfs::EnumStatus status = ChangeRefcount(object_id, msg_req->change_by()); |
|
410 |
564 |
msg_reply.set_status(status); |
|
411 |
✓✓✓✓ |
564 |
if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) { |
412 |
LogSessionError(msg_req->session_id(), status, |
||
413 |
1 |
"failed to open/close object"); |
|
414 |
} |
||
415 |
} |
||
416 |
564 |
transport->SendFrame(&frame_send); |
|
417 |
564 |
} |
|
418 |
|||
419 |
|||
420 |
3447 |
bool CachePlugin::HandleRequest(int fd_con) { |
|
421 |
3447 |
CacheTransport transport(fd_con, CacheTransport::kFlagSendIgnoreFailure); |
|
422 |
3447 |
char buffer[max_object_size_]; |
|
423 |
3447 |
CacheTransport::Frame frame_recv; |
|
424 |
3447 |
frame_recv.set_attachment(buffer, max_object_size_); |
|
425 |
3447 |
bool retval = transport.RecvFrame(&frame_recv); |
|
426 |
✗✓ | 3447 |
if (!retval) { |
427 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
428 |
"failed to receive request from connection (%d)", errno); |
||
429 |
return false; |
||
430 |
} |
||
431 |
|||
432 |
3447 |
google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped(); |
|
433 |
|||
434 |
✓✓ | 3447 |
if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") { |
435 |
cvmfs::MsgHandshake *msg_req = |
||
436 |
17 |
reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed); |
|
437 |
17 |
HandleHandshake(msg_req, &transport); |
|
438 |
✓✓ | 3430 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") { |
439 |
12 |
cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed); |
|
440 |
map<uint64_t, SessionInfo>::const_iterator iter = |
||
441 |
12 |
sessions_.find(msg_req->session_id()); |
|
442 |
✓✗ | 12 |
if (iter != sessions_.end()) { |
443 |
12 |
free(iter->second.reponame); |
|
444 |
12 |
free(iter->second.client_instance); |
|
445 |
} |
||
446 |
12 |
sessions_.erase(msg_req->session_id()); |
|
447 |
12 |
return false; |
|
448 |
✓✓ | 3418 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") { |
449 |
4 |
HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed)); |
|
450 |
✓✓ | 3414 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") { |
451 |
cvmfs::MsgRefcountReq *msg_req = |
||
452 |
564 |
reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed); |
|
453 |
564 |
HandleRefcount(msg_req, &transport); |
|
454 |
✓✓ | 2850 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") { |
455 |
cvmfs::MsgObjectInfoReq *msg_req = |
||
456 |
16 |
reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed); |
|
457 |
16 |
HandleObjectInfo(msg_req, &transport); |
|
458 |
✓✓ | 2834 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") { |
459 |
cvmfs::MsgReadReq *msg_req = |
||
460 |
2211 |
reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed); |
|
461 |
2211 |
HandleRead(msg_req, &transport); |
|
462 |
✓✓ | 623 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") { |
463 |
cvmfs::MsgStoreReq *msg_req = |
||
464 |
608 |
reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed); |
|
465 |
608 |
HandleStore(msg_req, &frame_recv, &transport); |
|
466 |
✓✓ | 15 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") { |
467 |
cvmfs::MsgStoreAbortReq *msg_req = |
||
468 |
1 |
reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed); |
|
469 |
1 |
HandleStoreAbort(msg_req, &transport); |
|
470 |
✓✓ | 14 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") { |
471 |
cvmfs::MsgInfoReq *msg_req = |
||
472 |
4 |
reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed); |
|
473 |
4 |
HandleInfo(msg_req, &transport); |
|
474 |
✓✓ | 10 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") { |
475 |
cvmfs::MsgShrinkReq *msg_req = |
||
476 |
2 |
reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed); |
|
477 |
2 |
HandleShrink(msg_req, &transport); |
|
478 |
✓✗ | 8 |
} else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") { |
479 |
cvmfs::MsgListReq *msg_req = |
||
480 |
8 |
reinterpret_cast<cvmfs::MsgListReq *>(msg_typed); |
|
481 |
8 |
HandleList(msg_req, &transport); |
|
482 |
} else { |
||
483 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
484 |
"unexpected message from client: %s", |
||
485 |
msg_typed->GetTypeName().c_str()); |
||
486 |
return false; |
||
487 |
} |
||
488 |
|||
489 |
3435 |
return true; |
|
490 |
} |
||
491 |
|||
492 |
|||
493 |
2 |
void CachePlugin::HandleShrink( |
|
494 |
cvmfs::MsgShrinkReq *msg_req, |
||
495 |
CacheTransport *transport) |
||
496 |
{ |
||
497 |
2 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
498 |
2 |
cvmfs::MsgShrinkReply msg_reply; |
|
499 |
2 |
CacheTransport::Frame frame_send(&msg_reply); |
|
500 |
|||
501 |
2 |
msg_reply.set_req_id(msg_req->req_id()); |
|
502 |
2 |
uint64_t used_bytes = 0; |
|
503 |
2 |
cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes); |
|
504 |
2 |
msg_reply.set_used_bytes(used_bytes); |
|
505 |
2 |
msg_reply.set_status(status); |
|
506 |
✓✓✗✓ |
2 |
if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) { |
507 |
LogSessionError(msg_req->session_id(), status, "failed to cleanup cache"); |
||
508 |
} |
||
509 |
2 |
transport->SendFrame(&frame_send); |
|
510 |
2 |
} |
|
511 |
|||
512 |
|||
513 |
1 |
void CachePlugin::HandleStoreAbort( |
|
514 |
cvmfs::MsgStoreAbortReq *msg_req, |
||
515 |
CacheTransport *transport) |
||
516 |
{ |
||
517 |
1 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
518 |
1 |
cvmfs::MsgStoreReply msg_reply; |
|
519 |
1 |
CacheTransport::Frame frame_send(&msg_reply); |
|
520 |
1 |
msg_reply.set_req_id(msg_req->req_id()); |
|
521 |
1 |
msg_reply.set_part_nr(0); |
|
522 |
uint64_t txn_id; |
||
523 |
1 |
UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id()); |
|
524 |
1 |
bool retval = txn_ids_.Lookup(uniq_req, &txn_id); |
|
525 |
✗✓ | 1 |
if (!retval) { |
526 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
527 |
"malformed transaction id received from client"); |
||
528 |
msg_reply.set_status(cvmfs::STATUS_MALFORMED); |
||
529 |
} else { |
||
530 |
1 |
cvmfs::EnumStatus status = AbortTxn(txn_id); |
|
531 |
1 |
msg_reply.set_status(status); |
|
532 |
✗✓ | 1 |
if (status != cvmfs::STATUS_OK) { |
533 |
LogSessionError(msg_req->session_id(), status, |
||
534 |
"failed to abort transaction"); |
||
535 |
} |
||
536 |
1 |
txn_ids_.Erase(uniq_req); |
|
537 |
} |
||
538 |
1 |
transport->SendFrame(&frame_send); |
|
539 |
1 |
} |
|
540 |
|||
541 |
|||
542 |
608 |
void CachePlugin::HandleStore( |
|
543 |
cvmfs::MsgStoreReq *msg_req, |
||
544 |
CacheTransport::Frame *frame, |
||
545 |
CacheTransport *transport) |
||
546 |
{ |
||
547 |
608 |
SessionCtxGuard session_guard(msg_req->session_id(), this); |
|
548 |
608 |
cvmfs::MsgStoreReply msg_reply; |
|
549 |
608 |
CacheTransport::Frame frame_send(&msg_reply); |
|
550 |
608 |
msg_reply.set_req_id(msg_req->req_id()); |
|
551 |
608 |
msg_reply.set_part_nr(msg_req->part_nr()); |
|
552 |
608 |
shash::Any object_id; |
|
553 |
608 |
bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id); |
|
554 |
✓✗✓✗ ✓✓✗✓ ✗✓ |
608 |
if ( !retval || |
555 |
(frame->att_size() > max_object_size_) || |
||
556 |
((frame->att_size() < max_object_size_) && !msg_req->last_part()) ) |
||
557 |
{ |
||
558 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
559 |
"malformed hash or bad object size received from client"); |
||
560 |
msg_reply.set_status(cvmfs::STATUS_MALFORMED); |
||
561 |
transport->SendFrame(&frame_send); |
||
562 |
return; |
||
563 |
} |
||
564 |
|||
565 |
608 |
UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id()); |
|
566 |
uint64_t txn_id; |
||
567 |
608 |
cvmfs::EnumStatus status = cvmfs::STATUS_OK; |
|
568 |
✓✓ | 608 |
if (msg_req->part_nr() == 1) { |
569 |
✗✓ | 7 |
if (txn_ids_.Contains(uniq_req)) { |
570 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
571 |
"invalid attempt to restart running transaction"); |
||
572 |
msg_reply.set_status(cvmfs::STATUS_MALFORMED); |
||
573 |
transport->SendFrame(&frame_send); |
||
574 |
return; |
||
575 |
} |
||
576 |
7 |
txn_id = NextTxnId(); |
|
577 |
7 |
ObjectInfo info; |
|
578 |
7 |
info.id = object_id; |
|
579 |
✓✗ | 7 |
if (msg_req->has_expected_size()) {info.size = msg_req->expected_size();} |
580 |
✓✗ | 7 |
if (msg_req->has_object_type()) {info.object_type = msg_req->object_type();} |
581 |
✓✗ | 7 |
if (msg_req->has_description()) {info.description = msg_req->description();} |
582 |
7 |
status = StartTxn(object_id, txn_id, info); |
|
583 |
✗✓ | 7 |
if (status != cvmfs::STATUS_OK) { |
584 |
LogSessionError(msg_req->session_id(), status, |
||
585 |
"failed to start transaction"); |
||
586 |
msg_reply.set_status(status); |
||
587 |
transport->SendFrame(&frame_send); |
||
588 |
return; |
||
589 |
} |
||
590 |
✗✓ | 7 |
txn_ids_.Insert(uniq_req, txn_id); |
591 |
} else { |
||
592 |
601 |
retval = txn_ids_.Lookup(uniq_req, &txn_id); |
|
593 |
✗✓ | 601 |
if (!retval) { |
594 |
LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED, |
||
595 |
"invalid transaction received from client"); |
||
596 |
msg_reply.set_status(cvmfs::STATUS_MALFORMED); |
||
597 |
transport->SendFrame(&frame_send); |
||
598 |
return; |
||
599 |
} |
||
600 |
} |
||
601 |
|||
602 |
// TODO(jblomer): check part number and send objects up in order |
||
603 |
✓✓ | 608 |
if (frame->att_size() > 0) { |
604 |
status = WriteTxn(txn_id, |
||
605 |
reinterpret_cast<unsigned char *>(frame->attachment()), |
||
606 |
607 |
frame->att_size()); |
|
607 |
✗✓ | 607 |
if (status != cvmfs::STATUS_OK) { |
608 |
LogSessionError(msg_req->session_id(), status, "failure writing object"); |
||
609 |
msg_reply.set_status(status); |
||
610 |
transport->SendFrame(&frame_send); |
||
611 |
return; |
||
612 |
} |
||
613 |
} |
||
614 |
|||
615 |
✓✓ | 608 |
if (msg_req->last_part()) { |
616 |
6 |
status = CommitTxn(txn_id); |
|
617 |
✗✓ | 6 |
if (status != cvmfs::STATUS_OK) { |
618 |
LogSessionError(msg_req->session_id(), status, |
||
619 |
"failure committing object"); |
||
620 |
} |
||
621 |
6 |
txn_ids_.Erase(uniq_req); |
|
622 |
} |
||
623 |
608 |
msg_reply.set_status(status); |
|
624 |
✓✗✓✗ |
608 |
transport->SendFrame(&frame_send); |
625 |
} |
||
626 |
|||
627 |
|||
628 |
16 |
bool CachePlugin::IsRunning() { |
|
629 |
16 |
return atomic_read32(&running_) != 0; |
|
630 |
} |
||
631 |
|||
632 |
|||
633 |
16 |
bool CachePlugin::Listen(const string &locator) { |
|
634 |
16 |
vector<string> tokens = SplitString(locator, '='); |
|
635 |
✓✗ | 16 |
if (tokens[0] == "unix") { |
636 |
16 |
string lock_path = tokens[1] + ".lock"; |
|
637 |
16 |
fd_socket_lock_ = TryLockFile(lock_path); |
|
638 |
✗✓ | 16 |
if (fd_socket_lock_ == -1) { |
639 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
640 |
"failed to acquire lock file %s (%d)", lock_path.c_str(), errno); |
||
641 |
NotifySupervisor(CacheTransport::kFailureNotification); |
||
642 |
return false; |
||
643 |
✗✓ | 16 |
} else if (fd_socket_lock_ == -2) { |
644 |
// Another plugin process probably started in the meantime |
||
645 |
NotifySupervisor(CacheTransport::kReadyNotification); |
||
646 |
if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) { |
||
647 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogStderr, |
||
648 |
"failed to lock on %s, file is busy", lock_path.c_str()); |
||
649 |
} |
||
650 |
return false; |
||
651 |
} |
||
652 |
✗✓ | 16 |
assert(fd_socket_lock_ >= 0); |
653 |
16 |
fd_socket_ = MakeSocket(tokens[1], 0600); |
|
654 |
✓✗ | 16 |
is_local_ = true; |
655 |
} else if (tokens[0] == "tcp") { |
||
656 |
vector<string> tcp_address = SplitString(tokens[1], ':'); |
||
657 |
if (tcp_address.size() != 2) { |
||
658 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
659 |
"invalid locator: %s", locator.c_str()); |
||
660 |
NotifySupervisor(CacheTransport::kFailureNotification); |
||
661 |
return false; |
||
662 |
} |
||
663 |
fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1])); |
||
664 |
} else { |
||
665 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
666 |
"unknown endpoint in locator: %s", locator.c_str()); |
||
667 |
NotifySupervisor(CacheTransport::kFailureNotification); |
||
668 |
return false; |
||
669 |
} |
||
670 |
|||
671 |
✗✓ | 16 |
if (fd_socket_ < 0) { |
672 |
if (errno == EADDRINUSE) { |
||
673 |
// Another plugin process probably started in the meantime |
||
674 |
NotifySupervisor(CacheTransport::kReadyNotification); |
||
675 |
} else { |
||
676 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
677 |
"failed to create endpoint %s (%d)", locator.c_str(), errno); |
||
678 |
NotifySupervisor(CacheTransport::kFailureNotification); |
||
679 |
} |
||
680 |
is_local_ = false; |
||
681 |
return false; |
||
682 |
} |
||
683 |
16 |
int retval = listen(fd_socket_, 32); |
|
684 |
✗✓ | 16 |
assert(retval == 0); |
685 |
|||
686 |
16 |
return true; |
|
687 |
} |
||
688 |
|||
689 |
|||
690 |
4 |
void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) { |
|
691 |
4 |
string session_str("unidentified client (" + StringifyInt(session_id) + ")"); |
|
692 |
4 |
map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id); |
|
693 |
✓✗ | 4 |
if (iter != sessions_.end()) { |
694 |
4 |
session_str = iter->second.name; |
|
695 |
} |
||
696 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslog, |
||
697 |
4 |
"session '%s': %s", session_str.c_str(), msg.c_str()); |
|
698 |
4 |
} |
|
699 |
|||
700 |
|||
701 |
3 |
void CachePlugin::LogSessionError( |
|
702 |
uint64_t session_id, |
||
703 |
cvmfs::EnumStatus status, |
||
704 |
const std::string &msg) |
||
705 |
{ |
||
706 |
3 |
string session_str("unidentified client (" + StringifyInt(session_id) + ")"); |
|
707 |
3 |
map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id); |
|
708 |
✓✗ | 3 |
if (iter != sessions_.end()) { |
709 |
3 |
session_str = iter->second.name; |
|
710 |
} |
||
711 |
LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
||
712 |
"session '%s': %s (%d - %s)", |
||
713 |
session_str.c_str(), msg.c_str(), status, |
||
714 |
3 |
CacheTransportCode2Ascii(status)); |
|
715 |
3 |
} |
|
716 |
|||
717 |
|||
718 |
16 |
void *CachePlugin::MainProcessRequests(void *data) { |
|
719 |
16 |
CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data); |
|
720 |
|||
721 |
16 |
platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN); |
|
722 |
|||
723 |
16 |
vector<struct pollfd> watch_fds; |
|
724 |
// Elements 0, 1: control pipe, socket fd |
||
725 |
struct pollfd watch_ctrl; |
||
726 |
16 |
watch_ctrl.fd = cache_plugin->pipe_ctrl_[0]; |
|
727 |
16 |
watch_ctrl.events = POLLIN | POLLPRI; |
|
728 |
16 |
watch_fds.push_back(watch_ctrl); |
|
729 |
struct pollfd watch_socket; |
||
730 |
16 |
watch_socket.fd = cache_plugin->fd_socket_; |
|
731 |
16 |
watch_socket.events = POLLIN | POLLPRI; |
|
732 |
16 |
watch_fds.push_back(watch_socket); |
|
733 |
|||
734 |
16 |
bool terminated = false; |
|
735 |
✓✗ | 4498 |
while (!terminated) { |
736 |
✓✓ | 17900 |
for (unsigned i = 0; i < watch_fds.size(); ++i) |
737 |
13418 |
watch_fds[i].revents = 0; |
|
738 |
4482 |
int retval = poll(&watch_fds[0], watch_fds.size(), -1); |
|
739 |
✗✓ | 4482 |
if (retval < 0) { |
740 |
if (errno == EINTR) |
||
741 |
continue; |
||
742 |
LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, |
||
743 |
"cache plugin connection failure (%d)", errno); |
||
744 |
abort(); |
||
745 |
} |
||
746 |
|||
747 |
// Termination or detach |
||
748 |
✓✓ | 4482 |
if (watch_fds[0].revents) { |
749 |
char signal; |
||
750 |
1018 |
ReadPipe(watch_fds[0].fd, &signal, 1); |
|
751 |
✓✓ | 1018 |
if (signal == kSignalDetach) { |
752 |
1002 |
cache_plugin->SendDetachRequests(); |
|
753 |
1002 |
continue; |
|
754 |
} |
||
755 |
|||
756 |
// termination |
||
757 |
✓✓ | 16 |
if (watch_fds.size() > 2) { |
758 |
LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug, |
||
759 |
5 |
"terminating external cache manager with pending connections"); |
|
760 |
} |
||
761 |
16 |
break; |
|
762 |
} |
||
763 |
|||
764 |
// New connection |
||
765 |
✓✓ | 3464 |
if (watch_fds[1].revents) { |
766 |
struct sockaddr_un remote; |
||
767 |
17 |
socklen_t socket_size = sizeof(remote); |
|
768 |
int fd_con = |
||
769 |
17 |
accept(watch_fds[1].fd, (struct sockaddr *)&remote, &socket_size); |
|
770 |
✗✓ | 17 |
if (fd_con < 0) { |
771 |
LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug, |
||
772 |
"failed to establish connection (%d)", errno); |
||
773 |
continue; |
||
774 |
} |
||
775 |
struct pollfd watch_con; |
||
776 |
17 |
watch_con.fd = fd_con; |
|
777 |
17 |
watch_con.events = POLLIN | POLLPRI; |
|
778 |
17 |
watch_fds.push_back(watch_con); |
|
779 |
17 |
cache_plugin->connections_.insert(fd_con); |
|
780 |
} |
||
781 |
|||
782 |
// New request |
||
783 |
✓✓ | 10392 |
for (unsigned i = 2; i < watch_fds.size(); ) { |
784 |
✓✓ | 3464 |
if (watch_fds[i].revents) { |
785 |
3447 |
bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd); |
|
786 |
✓✓ | 3447 |
if (!proceed) { |
787 |
12 |
close(watch_fds[i].fd); |
|
788 |
12 |
cache_plugin->connections_.erase(watch_fds[i].fd); |
|
789 |
12 |
watch_fds.erase(watch_fds.begin() + i); |
|
790 |
✗✓✗✗ ✗✗✗✓ |
12 |
if ( (getenv(CacheTransport::kEnvReadyNotifyFd) != NULL) && |
791 |
(cache_plugin->connections_.empty()) && |
||
792 |
(cache_plugin->num_inlimbo_clients_ == 0) ) |
||
793 |
{ |
||
794 |
LogCvmfs(kLogCache, kLogSyslog, |
||
795 |
"stopping cache plugin, no more active clients"); |
||
796 |
terminated = true; |
||
797 |
break; |
||
798 |
} |
||
799 |
} else { |
||
800 |
3435 |
i++; |
|
801 |
} |
||
802 |
} else { |
||
803 |
17 |
i++; |
|
804 |
} |
||
805 |
} |
||
806 |
} |
||
807 |
|||
808 |
// 0, 1 being closed by destructor |
||
809 |
✓✓ | 21 |
for (unsigned i = 2; i < watch_fds.size(); ++i) |
810 |
5 |
close(watch_fds[i].fd); |
|
811 |
16 |
cache_plugin->txn_ids_.Clear(); |
|
812 |
|||
813 |
16 |
signal(SIGPIPE, save_sigpipe); |
|
814 |
16 |
return NULL; |
|
815 |
} |
||
816 |
|||
817 |
|||
818 |
/** |
||
819 |
* Used during startup to synchronize with the cvmfs client. |
||
820 |
*/ |
||
821 |
16 |
void CachePlugin::NotifySupervisor(char signal) { |
|
822 |
16 |
char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd); |
|
823 |
✓✗ | 16 |
if (pipe_ready == NULL) |
824 |
16 |
return; |
|
825 |
int fd_pipe_ready = String2Int64(pipe_ready); |
||
826 |
WritePipe(fd_pipe_ready, &signal, 1); |
||
827 |
} |
||
828 |
|||
829 |
|||
830 |
16 |
void CachePlugin::ProcessRequests(unsigned num_workers) { |
|
831 |
16 |
num_workers_ = num_workers; |
|
832 |
16 |
int retval = pthread_create(&thread_io_, NULL, MainProcessRequests, this); |
|
833 |
✗✓ | 16 |
assert(retval == 0); |
834 |
16 |
NotifySupervisor(CacheTransport::kReadyNotification); |
|
835 |
16 |
atomic_cas32(&running_, 0, 1); |
|
836 |
16 |
} |
|
837 |
|||
838 |
|||
839 |
1002 |
void CachePlugin::SendDetachRequests() { |
|
840 |
1002 |
set<int>::const_iterator iter = connections_.begin(); |
|
841 |
1002 |
set<int>::const_iterator iter_end = connections_.end(); |
|
842 |
✓✓ | 1002 |
for (; iter != iter_end; ++iter) { |
843 |
CacheTransport transport(*iter, |
||
844 |
CacheTransport::kFlagSendIgnoreFailure | |
||
845 |
1002 |
CacheTransport::kFlagSendNonBlocking); |
|
846 |
1002 |
cvmfs::MsgDetach msg_detach; |
|
847 |
1002 |
CacheTransport::Frame frame_send(&msg_detach); |
|
848 |
1002 |
transport.SendFrame(&frame_send); |
|
849 |
} |
||
850 |
1002 |
} |
|
851 |
|||
852 |
|||
853 |
16 |
void CachePlugin::Terminate() { |
|
854 |
✓✗ | 16 |
if (IsRunning()) { |
855 |
16 |
char terminate = kSignalTerminate; |
|
856 |
16 |
WritePipe(pipe_ctrl_[1], &terminate, 1); |
|
857 |
16 |
pthread_join(thread_io_, NULL); |
|
858 |
16 |
atomic_cas32(&running_, 1, 0); |
|
859 |
} |
||
860 |
16 |
} |
|
861 |
|||
862 |
|||
863 |
void CachePlugin::WaitFor() { |
||
864 |
if (!IsRunning()) |
||
865 |
return; |
||
866 |
pthread_join(thread_io_, NULL); |
||
867 |
✓✗✓✗ |
45 |
} |
Generated by: GCOVR (Version 4.1) |