GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/talk.cc
Date: 2025-02-02 02:34:22
Exec Total Coverage
Lines: 0 614 0.0%
Branches: 0 1605 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Implements a socket interface to cvmfs. This way commands can be send
5 * to cvmfs. When cvmfs is running, the socket
6 * /var/cache/cvmfs2/$INSTANCE/cvmfs_io
7 * is available for command input and reply messages, resp.
8 *
9 * Cvmfs comes with the cvmfs_talk script, that handles writing and reading the
10 * socket.
11 *
12 * The talk module runs in a separate thread.
13 */
14
15 #ifndef __STDC_FORMAT_MACROS
16 #define __STDC_FORMAT_MACROS
17 #endif
18
19
20 #include "talk.h"
21
22 #include <errno.h>
23 #include <inttypes.h>
24 #include <pthread.h>
25 #include <stdint.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <sys/uio.h>
30 #include <sys/un.h>
31 #include <unistd.h>
32
33 #include <cassert>
34 #include <cstdlib>
35 #include <cstring>
36 #include <string>
37 #include <vector>
38
39 #include "cache.h"
40 #include "cache_posix.h"
41 #include "catalog_mgr_client.h"
42 #include "cvmfs.h"
43 #include "duplex_sqlite3.h"
44 #include "fuse_remount.h"
45 #include "glue_buffer.h"
46 #include "loader.h"
47 #include "lru_md.h"
48 #include "monitor.h"
49 #include "mountpoint.h"
50 #include "network/download.h"
51 #include "nfs_maps.h"
52 #include "options.h"
53 #include "quota.h"
54 #include "shortstring.h"
55 #include "statistics.h"
56 #include "tracer.h"
57 #include "util/logging.h"
58 #include "util/platform.h"
59 #include "util/pointer.h"
60 #include "wpad.h"
61
62 using namespace std; // NOLINT
63
64
65 void TalkManager::Answer(int con_fd, const string &msg) {
66 (void)send(con_fd, &msg[0], msg.length(), MSG_NOSIGNAL);
67 }
68
69
70 void TalkManager::AnswerStringList(int con_fd, const vector<string> &list) {
71 string list_str;
72 for (unsigned i = 0; i < list.size(); ++i) {
73 list_str += list[i] + "\n";
74 }
75 Answer(con_fd, list_str);
76 }
77
78
79 TalkManager *TalkManager::Create(
80 const string &socket_path,
81 MountPoint *mount_point,
82 FuseRemounter *remounter)
83 {
84 UniquePtr<TalkManager>
85 talk_manager(new TalkManager(socket_path, mount_point, remounter));
86
87 talk_manager->socket_fd_ = MakeSocket(socket_path, 0660);
88 if (talk_manager->socket_fd_ == -1)
89 return NULL;
90 if (listen(talk_manager->socket_fd_, 1) == -1)
91 return NULL;
92
93 LogCvmfs(kLogTalk, kLogDebug, "socket created at %s (fd %d)",
94 socket_path.c_str(), talk_manager->socket_fd_);
95
96 return talk_manager.Release();
97 }
98
99
100 string TalkManager::FormatMetalinkInfo(download::DownloadManager *download_mgr)
101 {
102 vector<string> metalink_chain;
103 unsigned active_metalink;
104
105 download_mgr->GetMetalinkInfo(&metalink_chain, &active_metalink);
106 if (metalink_chain.size() == 0)
107 return "No metalinks defined\n";
108
109 string metalink_str;
110 for (unsigned i = 0; i < metalink_chain.size(); ++i) {
111 metalink_str += " [" + StringifyInt(i) + "] " + metalink_chain[i] + "\n";
112 }
113 metalink_str += "Active metalink " + StringifyInt(active_metalink) + ": " +
114 metalink_chain[active_metalink] + "\n";
115 return metalink_str;
116 }
117
118 string TalkManager::FormatHostInfo(download::DownloadManager *download_mgr) {
119 vector<string> host_chain;
120 vector<int> rtt;
121 unsigned active_host;
122
123 download_mgr->GetHostInfo(&host_chain, &rtt, &active_host);
124 if (host_chain.size() == 0)
125 return "No hosts defined\n";
126
127 string host_str;
128 for (unsigned i = 0; i < host_chain.size(); ++i) {
129 host_str += " [" + StringifyInt(i) + "] " + host_chain[i] + " (";
130 if (rtt[i] == download::DownloadManager::kProbeUnprobed)
131 host_str += "unprobed";
132 else if (rtt[i] == download::DownloadManager::kProbeDown)
133 host_str += "host down";
134 else if (rtt[i] == download::DownloadManager::kProbeGeo)
135 host_str += "geographically ordered";
136 else
137 host_str += StringifyInt(rtt[i]) + " ms";
138 host_str += ")\n";
139 }
140 host_str += "Active host " + StringifyInt(active_host) + ": " +
141 host_chain[active_host] + "\n";
142 return host_str;
143 }
144
145 string TalkManager::FormatProxyInfo(download::DownloadManager *download_mgr) {
146 vector< vector<download::DownloadManager::ProxyInfo> > proxy_chain;
147 unsigned active_group;
148 unsigned fallback_group;
149
150 download_mgr->GetProxyInfo(&proxy_chain, &active_group, &fallback_group);
151 string proxy_str;
152 if (proxy_chain.size()) {
153 proxy_str += "Load-balance groups:\n";
154 for (unsigned i = 0; i < proxy_chain.size(); ++i) {
155 vector<string> urls;
156 for (unsigned j = 0; j < proxy_chain[i].size(); ++j) {
157 urls.push_back(proxy_chain[i][j].Print());
158 }
159 proxy_str +=
160 "[" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") + "\n";
161 }
162 proxy_str += "Active proxy: [" + StringifyInt(active_group) + "] " +
163 proxy_chain[active_group][0].url + "\n";
164 if (fallback_group < proxy_chain.size())
165 proxy_str += "First fallback group: [" +
166 StringifyInt(fallback_group) + "]\n";
167 } else {
168 proxy_str = "No proxies defined\n";
169 }
170 return proxy_str;
171 }
172
173
174 /**
175 * Listener thread on the socket.
176 * TODO(jblomer): create Format... helpers to shorten this method
177 */
178 void *TalkManager::MainResponder(void *data) {
179 TalkManager *talk_mgr = reinterpret_cast<TalkManager *>(data);
180 MountPoint *mount_point = talk_mgr->mount_point_;
181 FileSystem *file_system = mount_point->file_system();
182 FuseRemounter *remounter = talk_mgr->remounter_;
183 LogCvmfs(kLogTalk, kLogDebug, "talk thread started");
184
185 struct sockaddr_un remote;
186 socklen_t socket_size = sizeof(remote);
187 int con_fd = -1;
188 while (true) {
189 if (con_fd >= 0) {
190 shutdown(con_fd, SHUT_RDWR);
191 close(con_fd);
192 }
193 LogCvmfs(kLogTalk, kLogDebug, "accepting connections on socketfd %d",
194 talk_mgr->socket_fd_);
195 if ((con_fd = accept(talk_mgr->socket_fd_,
196 (struct sockaddr *)&remote,
197 &socket_size)) < 0)
198 {
199 LogCvmfs(kLogTalk, kLogDebug, "terminating talk thread (fd %d, errno %d)",
200 con_fd, errno);
201 break;
202 }
203
204 char buf[kMaxCommandSize];
205 int bytes_read;
206 if ((bytes_read = recv(con_fd, buf, sizeof(buf), 0)) <= 0)
207 continue;
208
209 if (buf[bytes_read-1] == '\0')
210 bytes_read--;
211 const string line = string(buf, bytes_read);
212 LogCvmfs(kLogTalk, kLogDebug, "received %s (length %lu)",
213 line.c_str(), line.length());
214
215 if (line == "tracebuffer flush") {
216 mount_point->tracer()->Flush();
217 talk_mgr->Answer(con_fd, "OK\n");
218 } else if (line == "cache size") {
219 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
220 if (!quota_mgr->HasCapability(QuotaManager::kCapIntrospectSize)) {
221 talk_mgr->Answer(con_fd, "Cache cannot report its size\n");
222 } else {
223 uint64_t size_unpinned = quota_mgr->GetSize();
224 uint64_t size_pinned = quota_mgr->GetSizePinned();
225 const string size_str = "Current cache size is " +
226 StringifyInt(size_unpinned / (1024*1024)) + "MB (" +
227 StringifyInt(size_unpinned) + " Bytes), pinned: " +
228 StringifyInt(size_pinned / (1024*1024)) + "MB (" +
229 StringifyInt(size_pinned) + " Bytes)\n";
230 talk_mgr->Answer(con_fd, size_str);
231 }
232 } else if (line == "cache instance") {
233 talk_mgr->Answer(con_fd, file_system->cache_mgr()->Describe());
234 } else if (line == "cache list") {
235 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
236 if (!quota_mgr->HasCapability(QuotaManager::kCapList)) {
237 talk_mgr->Answer(con_fd, "Cache cannot list its entries\n");
238 } else {
239 vector<string> ls = quota_mgr->List();
240 talk_mgr->AnswerStringList(con_fd, ls);
241 }
242 } else if (line == "cache list pinned") {
243 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
244 if (!quota_mgr->HasCapability(QuotaManager::kCapList)) {
245 talk_mgr->Answer(con_fd, "Cache cannot list its entries\n");
246 } else {
247 vector<string> ls_pinned = quota_mgr->ListPinned();
248 talk_mgr->AnswerStringList(con_fd, ls_pinned);
249 }
250 } else if (line == "cache list catalogs") {
251 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
252 if (!quota_mgr->HasCapability(QuotaManager::kCapList)) {
253 talk_mgr->Answer(con_fd, "Cache cannot list its entries\n");
254 } else {
255 vector<string> ls_catalogs = quota_mgr->ListCatalogs();
256 talk_mgr->AnswerStringList(con_fd, ls_catalogs);
257 }
258 } else if (line.substr(0, 12) == "cleanup rate") {
259 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
260 if (!quota_mgr->HasCapability(QuotaManager::kCapIntrospectCleanupRate)) {
261 talk_mgr->Answer(con_fd, "Unsupported by this cache\n");
262 } else {
263 if (line.length() < 14) {
264 talk_mgr->Answer(con_fd, "Usage: cleanup rate <period in mn>\n");
265 } else {
266 const uint64_t period_s = String2Uint64(line.substr(13)) * 60;
267 const uint64_t rate = quota_mgr->GetCleanupRate(period_s);
268 talk_mgr->Answer(con_fd, StringifyInt(rate) + "\n");
269 }
270 }
271 } else if (line.substr(0, 15) == "cache limit set") {
272 if (line.length() < 16) {
273 talk_mgr->Answer(con_fd, "Usage: cache limit set <MB>\n");
274 } else {
275 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
276 const uint64_t size = String2Uint64(line.substr(16));
277 if (size < 1000) {
278 talk_mgr->Answer(con_fd, "New limit too low (minimum 1000)\n");
279 } else {
280 if(quota_mgr->SetLimit(size * 1024*1024)) {
281 file_system->options_mgr()->SetValueFromTalk("CVMFS_QUOTA_LIMIT", StringifyUint(size));
282 talk_mgr->Answer(con_fd, "OK\n");
283 } else {
284 talk_mgr->Answer(con_fd, "Limit not reset\n");
285 }
286 }
287 }
288 } else if (line == "cache limit get") {
289 std::string limit_from_options;
290 file_system->options_mgr()->GetValue("CVMFS_QUOTA_LIMIT", &limit_from_options);
291 talk_mgr->Answer(con_fd, limit_from_options + "\n");
292 } else if (line.substr(0, 7) == "cleanup") {
293 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
294 if (!quota_mgr->HasCapability(QuotaManager::kCapShrink)) {
295 talk_mgr->Answer(con_fd, "Cache cannot trigger eviction\n");
296 } else {
297 if (line.length() < 9) {
298 talk_mgr->Answer(con_fd, "Usage: cleanup <MB>\n");
299 } else {
300 const uint64_t size = String2Uint64(line.substr(8))*1024*1024;
301 if (quota_mgr->Cleanup(size)) {
302 talk_mgr->Answer(con_fd, "OK\n");
303 } else {
304 talk_mgr->Answer(con_fd, "Not fully cleaned "
305 "(there might be pinned chunks)\n");
306 }
307 }
308 }
309 } else if (line.substr(0, 5) == "evict") {
310 assert(mount_point->file_system()->type() == FileSystem::kFsFuse);
311 if (line.length() < 7) {
312 talk_mgr->Answer(con_fd, "Usage: evict <path>\n");
313 } else {
314 const string path = line.substr(6);
315 const bool found_regular = cvmfs::Evict(path);
316 if (found_regular)
317 talk_mgr->Answer(con_fd, "OK\n");
318 else
319 talk_mgr->Answer(con_fd, "No such regular file\n");
320 }
321 } else if (line.substr(0, 3) == "pin") {
322 assert(mount_point->file_system()->type() == FileSystem::kFsFuse);
323 if (line.length() < 5) {
324 talk_mgr->Answer(con_fd, "Usage: pin <path>\n");
325 } else {
326 const string path = line.substr(4);
327 const bool found_regular = cvmfs::Pin(path);
328 if (found_regular)
329 talk_mgr->Answer(con_fd, "OK\n");
330 else
331 talk_mgr->Answer(con_fd, "No such regular file or pinning failed\n");
332 }
333 } else if (line == "mountpoint") {
334 talk_mgr->Answer(con_fd, cvmfs::loader_exports_->mount_point + "\n");
335 } else if (line == "device id") {
336 if (cvmfs::loader_exports_->version >= 5)
337 talk_mgr->Answer(con_fd, cvmfs::loader_exports_->device_id + "\n");
338 else
339 talk_mgr->Answer(con_fd, "0:0\n");
340 } else if (line.substr(0, 13) == "send mount fd") {
341 // Hidden command intended to be used only by the cvmfs mount helper
342 if (line.length() < 15) {
343 talk_mgr->Answer(con_fd, "EINVAL\n");
344 } else {
345 std::string socket_path = line.substr(14);
346 bool retval = cvmfs::SendFuseFd(socket_path);
347 talk_mgr->Answer(con_fd, retval ? "OK\n" : "Failed\n");
348 LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslog,
349 "Attempt to send fuse connection info to new mount (via %s)%s",
350 socket_path.c_str(), retval ? "" : " -- failed!");
351 }
352 } else if (line.substr(0, 7) == "remount") {
353 FuseRemounter::Status status;
354 if (line == "remount sync")
355 status = remounter->CheckSynchronously();
356 else
357 status = remounter->Check();
358 switch (status) {
359 case FuseRemounter::kStatusFailGeneral:
360 talk_mgr->Answer(con_fd, "Failed\n");
361 break;
362 case FuseRemounter::kStatusFailNoSpace:
363 talk_mgr->Answer(con_fd, "Failed (no space)\n");
364 break;
365 case FuseRemounter::kStatusUp2Date:
366 talk_mgr->Answer(con_fd, "Catalog up to date\n");
367 break;
368 case FuseRemounter::kStatusDraining:
369 talk_mgr->Answer(con_fd, "New revision applied\n");
370 break;
371 case FuseRemounter::kStatusMaintenance:
372 talk_mgr->Answer(con_fd, "In maintenance mode\n");
373 break;
374 default:
375 talk_mgr->Answer(con_fd, "internal error\n");
376 }
377 } else if (line.substr(0, 6) == "chroot") {
378 if (line.length() < 8) {
379 talk_mgr->Answer(con_fd, "Usage: chroot <hash>\n");
380 } else {
381 std::string root_hash = Trim(line.substr(7), true /* trim_newline */);
382 FuseRemounter::Status status = remounter->ChangeRoot(
383 MkFromHexPtr(shash::HexPtr(root_hash), shash::kSuffixCatalog));
384 switch (status) {
385 case FuseRemounter::kStatusUp2Date:
386 talk_mgr->Answer(con_fd, "OK\n");
387 break;
388 default:
389 talk_mgr->Answer(con_fd, "Failed\n");
390 break;
391 }
392 }
393 } else if (line == "detach nested catalogs") {
394 mount_point->catalog_mgr()->DetachNested();
395 talk_mgr->Answer(con_fd, "OK\n");
396 } else if (line == "revision") {
397 string revision = StringifyInt(mount_point->catalog_mgr()->GetRevision());
398 talk_mgr->Answer(con_fd, revision + "\n");
399 } else if (line == "max ttl info") {
400 const unsigned max_ttl = mount_point->GetMaxTtlMn();
401 if (max_ttl == 0) {
402 talk_mgr->Answer(con_fd, "unset\n");
403 } else {
404 const string max_ttl_str = StringifyInt(max_ttl) + " minutes\n";
405 talk_mgr->Answer(con_fd, max_ttl_str);
406 }
407 } else if (line.substr(0, 11) == "max ttl set") {
408 if (line.length() < 13) {
409 talk_mgr->Answer(con_fd, "Usage: max ttl set <minutes>\n");
410 } else {
411 const unsigned max_ttl = String2Uint64(line.substr(12));
412 mount_point->SetMaxTtlMn(max_ttl);
413 talk_mgr->Answer(con_fd, "OK\n");
414 }
415 } else if (line.substr(0, 14) == "nameserver get") {
416 const string dns_server = mount_point->download_mgr()->GetDnsServer();
417 const string reply = !dns_server.empty() ?
418 std::string("DNS server address: ") + dns_server + "\n":
419 std::string("DNS server not set.\n");
420 talk_mgr->Answer(con_fd, reply);
421 } else if (line.substr(0, 14) == "nameserver set") {
422 if (line.length() < 16) {
423 talk_mgr->Answer(con_fd, "Usage: nameserver set <host>\n");
424 } else {
425 const string host = line.substr(15);
426 mount_point->download_mgr()->SetDnsServer(host);
427 talk_mgr->Answer(con_fd, "OK\n");
428 }
429 } else if (line == "external metalink info") {
430 const string external_metalink_info =
431 talk_mgr->FormatMetalinkInfo(mount_point->external_download_mgr());
432 talk_mgr->Answer(con_fd, external_metalink_info);
433 } else if (line == "metalink info") {
434 const string metalink_info =
435 talk_mgr->FormatMetalinkInfo(mount_point->download_mgr());
436 talk_mgr->Answer(con_fd, metalink_info);
437 } else if (line == "external host info") {
438 const string external_host_info =
439 talk_mgr->FormatHostInfo(mount_point->external_download_mgr());
440 talk_mgr->Answer(con_fd, external_host_info);
441 } else if (line == "host info") {
442 const string host_info =
443 talk_mgr->FormatHostInfo(mount_point->download_mgr());
444 talk_mgr->Answer(con_fd, host_info);
445 } else if (line == "host probe") {
446 mount_point->download_mgr()->ProbeHosts();
447 talk_mgr->Answer(con_fd, "OK\n");
448 } else if (line == "host probe geo") {
449 bool retval = mount_point->download_mgr()->ProbeGeo();
450 if (retval)
451 talk_mgr->Answer(con_fd, "OK\n");
452 else
453 talk_mgr->Answer(con_fd, "Failed\n");
454 } else if (line == "external metalink switch") {
455 mount_point->external_download_mgr()->SwitchMetalink();
456 talk_mgr->Answer(con_fd, "OK\n");
457 } else if (line == "metalink switch") {
458 mount_point->download_mgr()->SwitchMetalink();
459 talk_mgr->Answer(con_fd, "OK\n");
460 } else if (line == "external host switch") {
461 mount_point->external_download_mgr()->SwitchHost();
462 talk_mgr->Answer(con_fd, "OK\n");
463 } else if (line == "host switch") {
464 mount_point->download_mgr()->SwitchHost();
465 talk_mgr->Answer(con_fd, "OK\n");
466 } else if (line.substr(0, 21) == "external metalink set") {
467 if (line.length() < 23) {
468 talk_mgr->Answer(con_fd, "Usage: external metalink set <URL>\n");
469 } else {
470 const std::string host = line.substr(22);
471 mount_point->external_download_mgr()->SetMetalinkChain(host);
472 talk_mgr->Answer(con_fd, "OK\n");
473 }
474 } else if (line.substr(0, 12) == "metalink set") {
475 if (line.length() < 14) {
476 talk_mgr->Answer(con_fd, "Usage: metalink set <URL>\n");
477 } else {
478 const std::string host = line.substr(13);
479 mount_point->download_mgr()->SetMetalinkChain(host);
480 talk_mgr->Answer(con_fd, "OK\n");
481 }
482 } else if (line.substr(0, 17) == "external host set") {
483 if (line.length() < 19) {
484 talk_mgr->Answer(con_fd, "Usage: external host set <URL>\n");
485 } else {
486 const std::string host = line.substr(18);
487 mount_point->external_download_mgr()->SetHostChain(host);
488 talk_mgr->Answer(con_fd, "OK\n");
489 }
490 } else if (line.substr(0, 8) == "host set") {
491 if (line.length() < 10) {
492 talk_mgr->Answer(con_fd, "Usage: host set <host list>\n");
493 } else {
494 const string hosts = line.substr(9);
495 mount_point->download_mgr()->SetHostChain(hosts);
496 talk_mgr->Answer(con_fd, "OK\n");
497 }
498 } else if (line == "external proxy info") {
499 string external_proxy_info =
500 talk_mgr->FormatProxyInfo(mount_point->external_download_mgr());
501 talk_mgr->Answer(con_fd, external_proxy_info);
502 } else if (line == "proxy info") {
503 string proxy_info =
504 talk_mgr->FormatProxyInfo(mount_point->download_mgr());
505 talk_mgr->Answer(con_fd, proxy_info);
506 } else if (line == "proxy rebalance") {
507 mount_point->download_mgr()->RebalanceProxies();
508 talk_mgr->Answer(con_fd, "OK\n");
509 } else if (line == "proxy group switch") {
510 mount_point->download_mgr()->SwitchProxyGroup();
511 talk_mgr->Answer(con_fd, "OK\n");
512 } else if (line.substr(0, 18) == "external proxy set") {
513 if (line.length() < 20) {
514 talk_mgr->Answer(con_fd, "Usage: external proxy set <proxy list>\n");
515 } else {
516 string external_proxies = line.substr(19);
517 mount_point->external_download_mgr()->SetProxyChain(
518 external_proxies, "", download::DownloadManager::kSetProxyRegular);
519 talk_mgr->Answer(con_fd, "OK\n");
520 }
521 } else if (line.substr(0, 9) == "proxy set") {
522 if (line.length() < 11) {
523 talk_mgr->Answer(con_fd, "Usage: proxy set <proxy list>\n");
524 } else {
525 string proxies = line.substr(10);
526 proxies =
527 download::ResolveProxyDescription(proxies, "",
528 mount_point->download_mgr());
529 if (proxies == "") {
530 talk_mgr->Answer(con_fd, "Failed, no valid proxies\n");
531 } else {
532 mount_point->download_mgr()->SetProxyChain(
533 proxies, "", download::DownloadManager::kSetProxyRegular);
534 talk_mgr->Answer(con_fd, "OK\n");
535 }
536 }
537 } else if (line.substr(0, 14) == "proxy fallback") {
538 if (line.length() < 15) {
539 talk_mgr->Answer(con_fd, "Usage: proxy fallback <proxy list>\n");
540 } else {
541 string fallback_proxies = line.substr(15);
542 mount_point->download_mgr()->SetProxyChain(
543 "", fallback_proxies, download::DownloadManager::kSetProxyFallback);
544 talk_mgr->Answer(con_fd, "OK\n");
545 }
546 } else if (line == "timeout info") {
547 unsigned timeout;
548 unsigned timeout_direct;
549 mount_point->download_mgr()->GetTimeout(&timeout, &timeout_direct);
550 string timeout_str = "Timeout with proxy: ";
551 if (timeout)
552 timeout_str += StringifyInt(timeout) + "s\n";
553 else
554 timeout_str += "no timeout\n";
555 timeout_str += "Timeout without proxy: ";
556 if (timeout_direct)
557 timeout_str += StringifyInt(timeout_direct) + "s\n";
558 else
559 timeout_str += "no timeout\n";
560 talk_mgr->Answer(con_fd, timeout_str);
561 } else if (line.substr(0, 11) == "timeout set") {
562 if (line.length() < 13) {
563 talk_mgr->Answer(con_fd, "Usage: timeout set <proxy> <direct>\n");
564 } else {
565 uint64_t timeout;
566 uint64_t timeout_direct;
567 String2Uint64Pair(line.substr(12), &timeout, &timeout_direct);
568 mount_point->download_mgr()->SetTimeout(timeout, timeout_direct);
569 talk_mgr->Answer(con_fd, "OK\n");
570 }
571 } else if (line == "open catalogs") {
572 talk_mgr->Answer(con_fd, mount_point->catalog_mgr()->PrintHierarchy());
573 } else if (line == "drop metadata caches") {
574 // For testing
575 mount_point->inode_cache()->Pause();
576 mount_point->path_cache()->Pause();
577 mount_point->md5path_cache()->Pause();
578 mount_point->inode_cache()->Drop();
579 mount_point->path_cache()->Drop();
580 mount_point->md5path_cache()->Drop();
581 mount_point->inode_cache()->Resume();
582 mount_point->path_cache()->Resume();
583 mount_point->md5path_cache()->Resume();
584 talk_mgr->Answer(con_fd, "OK\n");
585 } else if (line == "internal affairs") {
586 int current;
587 int highwater;
588 string result;
589
590 result += "Inode Generation:\n " + cvmfs::PrintInodeGeneration();
591
592 // Manually setting the values of the ShortString counters
593 mount_point->statistics()->Lookup("pathstring.n_instances")->
594 Set(PathString::num_instances());
595 mount_point->statistics()->Lookup("pathstring.n_overflows")->
596 Set(PathString::num_overflows());
597 mount_point->statistics()->Lookup("namestring.n_instances")->
598 Set(NameString::num_instances());
599 mount_point->statistics()->Lookup("namestring.n_overflows")->
600 Set(NameString::num_overflows());
601 mount_point->statistics()->Lookup("linkstring.n_instances")->
602 Set(LinkString::num_instances());
603 mount_point->statistics()->Lookup("linkstring.n_overflows")->
604 Set(LinkString::num_overflows());
605
606 // Manually setting the inode tracker numbers
607 glue::InodeTracker::Statistics inode_stats =
608 mount_point->inode_tracker()->GetStatistics();
609 glue::DentryTracker::Statistics dentry_stats =
610 mount_point->dentry_tracker()->GetStatistics();
611 glue::PageCacheTracker::Statistics page_cache_stats =
612 mount_point->page_cache_tracker()->GetStatistics();
613 mount_point->statistics()->Lookup("inode_tracker.n_insert")->Set(
614 atomic_read64(&inode_stats.num_inserts));
615 mount_point->statistics()->Lookup("inode_tracker.n_remove")->Set(
616 atomic_read64(&inode_stats.num_removes));
617 mount_point->statistics()->Lookup("inode_tracker.no_reference")->Set(
618 atomic_read64(&inode_stats.num_references));
619 mount_point->statistics()->Lookup("inode_tracker.n_hit_inode")->Set(
620 atomic_read64(&inode_stats.num_hits_inode));
621 mount_point->statistics()->Lookup("inode_tracker.n_hit_path")->Set(
622 atomic_read64(&inode_stats.num_hits_path));
623 mount_point->statistics()->Lookup("inode_tracker.n_miss_path")->Set(
624 atomic_read64(&inode_stats.num_misses_path));
625 mount_point->statistics()->Lookup("dentry_tracker.n_insert")->Set(
626 dentry_stats.num_insert);
627 mount_point->statistics()->Lookup("dentry_tracker.n_remove")->Set(
628 dentry_stats.num_remove);
629 mount_point->statistics()->Lookup("dentry_tracker.n_prune")->Set(
630 dentry_stats.num_prune);
631 mount_point->statistics()->Lookup("page_cache_tracker.n_insert")->Set(
632 page_cache_stats.n_insert);
633 mount_point->statistics()->Lookup("page_cache_tracker.n_remove")->Set(
634 page_cache_stats.n_remove);
635 mount_point->statistics()->Lookup("page_cache_tracker.n_open_direct")->
636 Set(page_cache_stats.n_open_direct);
637 mount_point->statistics()->Lookup("page_cache_tracker.n_open_flush")->
638 Set(page_cache_stats.n_open_flush);
639 mount_point->statistics()->Lookup("page_cache_tracker.n_open_cached")->
640 Set(page_cache_stats.n_open_cached);
641
642 if (file_system->cache_mgr()->id() == kPosixCacheManager) {
643 PosixCacheManager *cache_mgr =
644 reinterpret_cast<PosixCacheManager *>(
645 file_system->cache_mgr());
646 result += "\nCache Mode: ";
647 switch (cache_mgr->cache_mode()) {
648 case PosixCacheManager::kCacheReadWrite:
649 result += "read-write";
650 break;
651 case PosixCacheManager::kCacheReadOnly:
652 result += "read-only";
653 break;
654 default:
655 result += "unknown";
656 }
657 }
658 bool drainout_mode;
659 bool maintenance_mode;
660 cvmfs::GetReloadStatus(&drainout_mode, &maintenance_mode);
661 result += "\nDrainout Mode: " + StringifyBool(drainout_mode) + "\n";
662 result += "Maintenance Mode: " + StringifyBool(maintenance_mode) + "\n";
663
664 if (file_system->IsNfsSource()) {
665 result += "\nNFS Map Statistics:\n";
666 result += file_system->nfs_maps()->GetStatistics();
667 }
668
669 result += "SQlite Statistics:\n";
670 sqlite3_status(SQLITE_STATUS_MALLOC_COUNT, &current, &highwater, 0);
671 result += " Number of allocations " + StringifyInt(current) + "\n";
672
673 sqlite3_status(SQLITE_STATUS_MEMORY_USED, &current, &highwater, 0);
674 result += " General purpose allocator " +StringifyInt(current/1024) +
675 " KB / " + StringifyInt(highwater/1024) + " KB\n";
676
677 sqlite3_status(SQLITE_STATUS_MALLOC_SIZE, &current, &highwater, 0);
678 result += " Largest malloc " + StringifyInt(highwater) + " Bytes\n";
679
680 sqlite3_status(SQLITE_STATUS_PAGECACHE_USED, &current, &highwater, 0);
681 result += " Page cache allocations " + StringifyInt(current) + " / " +
682 StringifyInt(highwater) + "\n";
683
684 sqlite3_status(SQLITE_STATUS_PAGECACHE_OVERFLOW,
685 &current, &highwater, 0);
686 result += " Page cache overflows " + StringifyInt(current/1024) +
687 " KB / " + StringifyInt(highwater/1024) + " KB\n";
688
689 sqlite3_status(SQLITE_STATUS_PAGECACHE_SIZE, &current, &highwater, 0);
690 result += " Largest page cache allocation " + StringifyInt(highwater) +
691 " Bytes\n";
692
693 sqlite3_status(SQLITE_STATUS_SCRATCH_USED, &current, &highwater, 0);
694 result += " Scratch allocations " + StringifyInt(current) + " / " +
695 StringifyInt(highwater) + "\n";
696
697 sqlite3_status(SQLITE_STATUS_SCRATCH_OVERFLOW, &current, &highwater, 0);
698 result += " Scratch overflows " + StringifyInt(current) + " / " +
699 StringifyInt(highwater) + "\n";
700
701 sqlite3_status(SQLITE_STATUS_SCRATCH_SIZE, &current, &highwater, 0);
702 result += " Largest scratch allocation " + StringifyInt(highwater/1024)
703 + " KB\n";
704
705 result += "\nPer-Connection Memory Statistics:\n" +
706 mount_point->catalog_mgr()->PrintAllMemStatistics();
707
708 result += "\nLatency distribution of system calls:\n";
709
710 result += "Lookup\n" + file_system->hist_fs_lookup()->ToString();
711 result += "Forget\n" + file_system->hist_fs_forget()->ToString();
712 result += "Multi-Forget\n"
713 + file_system->hist_fs_forget_multi()->ToString();
714 result += "Getattr\n" + file_system->hist_fs_getattr()->ToString();
715 result += "Readlink\n" + file_system->hist_fs_readlink()->ToString();
716 result += "Opendir\n" + file_system->hist_fs_opendir()->ToString();
717 result += "Releasedir\n" + file_system->hist_fs_releasedir()->ToString();
718 result += "Readdir\n" + file_system->hist_fs_readdir()->ToString();
719 result += "Open\n" + file_system->hist_fs_open()->ToString();
720 result += "Read\n" + file_system->hist_fs_read()->ToString();
721 result += "Release\n" + file_system->hist_fs_release()->ToString();
722
723 result += "\nRaw Counters:\n" +
724 mount_point->statistics()->PrintList(perf::Statistics::kPrintHeader);
725
726 talk_mgr->Answer(con_fd, result);
727 } else if (line == "reset error counters") {
728 file_system->ResetErrorCounters();
729 talk_mgr->Answer(con_fd, "OK\n");
730 } else if (line == "pid") {
731 const string pid_str = StringifyInt(cvmfs::pid_) + "\n";
732 talk_mgr->Answer(con_fd, pid_str);
733 } else if (line == "pid cachemgr") {
734 const string pid_str =
735 StringifyInt(file_system->cache_mgr()->quota_mgr()->GetPid()) + "\n";
736 talk_mgr->Answer(con_fd, pid_str);
737 } else if (line == "pid watchdog") {
738 const string pid_str = StringifyInt(Watchdog::GetPid()) + "\n";
739 talk_mgr->Answer(con_fd, pid_str);
740 } else if (line == "parameters") {
741 talk_mgr->Answer(con_fd, file_system->options_mgr()->Dump());
742 } else if (line == "hotpatch history") {
743 string history_str =
744 StringifyTime(cvmfs::loader_exports_->boot_time, true) +
745 " (start of CernVM-FS loader " +
746 cvmfs::loader_exports_->loader_version + ")\n";
747 for (loader::EventList::const_iterator i =
748 cvmfs::loader_exports_->history.begin(),
749 iEnd = cvmfs::loader_exports_->history.end(); i != iEnd; ++i)
750 {
751 history_str += StringifyTime((*i)->timestamp, true) +
752 " (loaded CernVM-FS Fuse Module " +
753 (*i)->so_version + ")\n";
754 }
755 talk_mgr->Answer(con_fd, history_str);
756 } else if (line == "vfs inodes") {
757 string result;
758 glue::InodeTracker::Cursor cursor(
759 mount_point->inode_tracker()->BeginEnumerate());
760 uint64_t inode;
761 while (mount_point->inode_tracker()->NextInode(&cursor, &inode)) {
762 result += StringifyInt(inode) + "\n";
763 }
764 mount_point->inode_tracker()->EndEnumerate(&cursor);
765 talk_mgr->Answer(con_fd, result);
766 } else if (line == "vfs entries") {
767 string result;
768 glue::InodeTracker::Cursor cursor(
769 mount_point->inode_tracker()->BeginEnumerate());
770 uint64_t inode_parent;
771 NameString name;
772 while (mount_point->inode_tracker()->NextEntry(
773 &cursor, &inode_parent, &name))
774 {
775 result += "<" + StringifyInt(inode_parent) + ">/" + name.ToString() +
776 "\n";
777 }
778 mount_point->inode_tracker()->EndEnumerate(&cursor);
779 talk_mgr->Answer(con_fd, result);
780 } else if (line == "version") {
781 const string version_str = string(CVMFS_VERSION) + " (CernVM-FS Fuse Module)\n" +
782 cvmfs::loader_exports_->loader_version + " (Loader)\n";
783 talk_mgr->Answer(con_fd, version_str);
784 } else if (line == "version patchlevel") {
785 talk_mgr->Answer(con_fd, string(CVMFS_PATCH_LEVEL) + "\n");
786 } else if (line == "tear down to read-only") {
787 if (file_system->cache_mgr()->id() != kPosixCacheManager) {
788 talk_mgr->Answer(con_fd, "not supported\n");
789 } else {
790 // hack
791 cvmfs::UnregisterQuotaListener();
792 file_system->TearDown2ReadOnly();
793 talk_mgr->Answer(con_fd, "In read-only mode\n");
794 }
795 } else if (line == "latency") {
796 string result = talk_mgr->FormatLatencies(*mount_point, file_system);
797 talk_mgr->Answer(con_fd, result);
798 } else {
799 talk_mgr->Answer(con_fd, "unknown command\n");
800 }
801 }
802
803 return NULL;
804 } // NOLINT(readability/fn_size)
805
806 string TalkManager::FormatLatencies(const MountPoint &mount_point,
807 FileSystem *file_system) {
808 string result;
809 const unsigned int bufSize = 300;
810 char buffer[bufSize];
811
812 vector<float> qs;
813 qs.push_back(.1);
814 qs.push_back(.2);
815 qs.push_back(.25);
816 qs.push_back(.3);
817 qs.push_back(.4);
818 qs.push_back(.5);
819 qs.push_back(.6);
820 qs.push_back(.7);
821 qs.push_back(.75);
822 qs.push_back(.8);
823 qs.push_back(.9);
824 qs.push_back(.95);
825 qs.push_back(.99);
826 qs.push_back(.999);
827 qs.push_back(.9999);
828
829 string repo(mount_point.fqrn());
830
831 unsigned int format_index =
832 snprintf(buffer, bufSize, "\"%s\",\"%s\",\"%s\",\"%s\"", "repository",
833 "action", "total_count", "time_unit");
834 for (unsigned int i = 0; i < qs.size(); i++) {
835 format_index += snprintf(buffer + format_index, bufSize - format_index,
836 ",%0.5f", qs[i]);
837 }
838 format_index += snprintf(buffer + format_index, bufSize - format_index, "\n");
839 assert(format_index < bufSize);
840
841 result += buffer;
842 memset(buffer, 0, sizeof(buffer));
843 format_index = 0;
844
845 vector<Log2Histogram *> hist;
846 vector<string> names;
847 hist.push_back(file_system->hist_fs_lookup());
848 names.push_back("lookup");
849 hist.push_back(file_system->hist_fs_forget());
850 names.push_back("forget");
851 hist.push_back(file_system->hist_fs_forget_multi());
852 names.push_back("forget_multi");
853 hist.push_back(file_system->hist_fs_getattr());
854 names.push_back("getattr");
855 hist.push_back(file_system->hist_fs_readlink());
856 names.push_back("readlink");
857 hist.push_back(file_system->hist_fs_opendir());
858 names.push_back("opendir");
859 hist.push_back(file_system->hist_fs_releasedir());
860 names.push_back("releasedir");
861 hist.push_back(file_system->hist_fs_readdir());
862 names.push_back("readdir");
863 hist.push_back(file_system->hist_fs_open());
864 names.push_back("open");
865 hist.push_back(file_system->hist_fs_read());
866 names.push_back("read");
867 hist.push_back(file_system->hist_fs_release());
868 names.push_back("release");
869
870 for (unsigned int j = 0; j < hist.size(); j++) {
871 Log2Histogram *h = hist[j];
872 unsigned int format_index =
873 snprintf(buffer, bufSize, "\"%s\",\"%s\",%" PRIu64 ",\"nanoseconds\"",
874 repo.c_str(), names[j].c_str(), h->N());
875 for (unsigned int i = 0; i < qs.size(); i++) {
876 format_index += snprintf(buffer + format_index, bufSize - format_index,
877 ",%u", h->GetQuantile(qs[i]));
878 }
879 format_index +=
880 snprintf(buffer + format_index, bufSize - format_index, "\n");
881 assert(format_index < bufSize);
882
883 result += buffer;
884 memset(buffer, 0, sizeof(buffer));
885 format_index = 0;
886 }
887 return result;
888 }
889
890 TalkManager::TalkManager(
891 const string &socket_path,
892 MountPoint *mount_point,
893 FuseRemounter *remounter)
894 : socket_path_(socket_path)
895 , socket_fd_(-1)
896 , mount_point_(mount_point)
897 , remounter_(remounter)
898 , spawned_(false)
899 {
900 memset(&thread_talk_, 0, sizeof(thread_talk_));
901 }
902
903
904 TalkManager::~TalkManager() {
905 if (!socket_path_.empty()) {
906 int retval = unlink(socket_path_.c_str());
907 if ((retval != 0) && (errno != ENOENT)) {
908 LogCvmfs(kLogTalk, kLogSyslogWarn,
909 "Could not remove cvmfs_io socket from cache directory (%d)",
910 errno);
911 }
912 }
913
914 if (socket_fd_ >= 0) {
915 shutdown(socket_fd_, SHUT_RDWR);
916 close(socket_fd_);
917 }
918
919 if (spawned_) {
920 pthread_join(thread_talk_, NULL);
921 LogCvmfs(kLogTalk, kLogDebug, "talk thread stopped");
922 }
923 }
924
925
926 void TalkManager::Spawn() {
927 int retval = pthread_create(&thread_talk_, NULL, MainResponder, this);
928 assert(retval == 0);
929 spawned_ = true;
930 }
931