GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/talk.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 0 624 0.0%
Branches: 0 1613 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Implements a socket interface to cvmfs. This way commands can be send
5 * to cvmfs. When cvmfs is running, the socket
6 * /var/cache/cvmfs2/$INSTANCE/cvmfs_io
7 * is available for command input and reply messages, resp.
8 *
9 * Cvmfs comes with the cvmfs_talk script, that handles writing and reading the
10 * socket.
11 *
12 * The talk module runs in a separate thread.
13 */
14
15 #ifndef __STDC_FORMAT_MACROS
16 #define __STDC_FORMAT_MACROS
17 #endif
18
19
20 #include "talk.h"
21
22 #include <errno.h>
23 #include <inttypes.h>
24 #include <pthread.h>
25 #include <stdint.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <sys/uio.h>
30 #include <sys/un.h>
31 #include <unistd.h>
32
33 #include <cassert>
34 #include <cstdlib>
35 #include <cstring>
36 #include <string>
37 #include <vector>
38
39 #include "cache.h"
40 #include "cache_posix.h"
41 #include "catalog_mgr_client.h"
42 #include "cvmfs.h"
43 #include "duplex_sqlite3.h"
44 #include "fuse_remount.h"
45 #include "glue_buffer.h"
46 #include "loader.h"
47 #include "lru_md.h"
48 #include "monitor.h"
49 #include "mountpoint.h"
50 #include "network/download.h"
51 #include "nfs_maps.h"
52 #include "options.h"
53 #include "quota.h"
54 #include "shortstring.h"
55 #include "statistics.h"
56 #include "tracer.h"
57 #include "util/logging.h"
58 #include "util/platform.h"
59 #include "util/pointer.h"
60 #include "wpad.h"
61
62 using namespace std; // NOLINT
63
64
65 void TalkManager::Answer(int con_fd, const string &msg) {
66 (void)send(con_fd, &msg[0], msg.length(), MSG_NOSIGNAL);
67 }
68
69
70 void TalkManager::AnswerStringList(int con_fd, const vector<string> &list) {
71 string list_str;
72 for (unsigned i = 0; i < list.size(); ++i) {
73 list_str += list[i] + "\n";
74 }
75 Answer(con_fd, list_str);
76 }
77
78
79 TalkManager *TalkManager::Create(const string &socket_path,
80 MountPoint *mount_point,
81 FuseRemounter *remounter) {
82 UniquePtr<TalkManager> talk_manager(
83 new TalkManager(socket_path, mount_point, remounter));
84
85 talk_manager->socket_fd_ = MakeSocket(socket_path, 0660);
86 if (talk_manager->socket_fd_ == -1)
87 return NULL;
88 if (listen(talk_manager->socket_fd_, 1) == -1)
89 return NULL;
90
91 LogCvmfs(kLogTalk, kLogDebug, "socket created at %s (fd %d)",
92 socket_path.c_str(), talk_manager->socket_fd_);
93
94 return talk_manager.Release();
95 }
96
97
98 string TalkManager::FormatMetalinkInfo(
99 download::DownloadManager *download_mgr) {
100 vector<string> metalink_chain;
101 unsigned active_metalink;
102
103 download_mgr->GetMetalinkInfo(&metalink_chain, &active_metalink);
104 if (metalink_chain.size() == 0)
105 return "No metalinks defined\n";
106
107 string metalink_str;
108 for (unsigned i = 0; i < metalink_chain.size(); ++i) {
109 metalink_str += " [" + StringifyInt(i) + "] " + metalink_chain[i] + "\n";
110 }
111 metalink_str += "Active metalink " + StringifyInt(active_metalink) + ": "
112 + metalink_chain[active_metalink] + "\n";
113 return metalink_str;
114 }
115
116 string TalkManager::FormatHostInfo(download::DownloadManager *download_mgr) {
117 vector<string> host_chain;
118 vector<int> rtt;
119 unsigned active_host;
120
121 download_mgr->GetHostInfo(&host_chain, &rtt, &active_host);
122 if (host_chain.size() == 0)
123 return "No hosts defined\n";
124
125 string host_str;
126 for (unsigned i = 0; i < host_chain.size(); ++i) {
127 host_str += " [" + StringifyInt(i) + "] " + host_chain[i] + " (";
128 if (rtt[i] == download::DownloadManager::kProbeUnprobed)
129 host_str += "unprobed";
130 else if (rtt[i] == download::DownloadManager::kProbeDown)
131 host_str += "host down";
132 else if (rtt[i] == download::DownloadManager::kProbeGeo)
133 host_str += "geographically ordered";
134 else
135 host_str += StringifyInt(rtt[i]) + " ms";
136 host_str += ")\n";
137 }
138 host_str += "Active host " + StringifyInt(active_host) + ": "
139 + host_chain[active_host] + "\n";
140 return host_str;
141 }
142
143 string TalkManager::FormatProxyInfo(download::DownloadManager *download_mgr) {
144 vector<vector<download::DownloadManager::ProxyInfo> > proxy_chain;
145 unsigned active_group;
146 unsigned fallback_group;
147
148 download_mgr->GetProxyInfo(&proxy_chain, &active_group, &fallback_group);
149 string proxy_str;
150 if (proxy_chain.size()) {
151 proxy_str += "Load-balance groups:\n";
152 for (unsigned i = 0; i < proxy_chain.size(); ++i) {
153 vector<string> urls;
154 for (unsigned j = 0; j < proxy_chain[i].size(); ++j) {
155 urls.push_back(proxy_chain[i][j].Print());
156 }
157 proxy_str += "[" + StringifyInt(i) + "] " + JoinStrings(urls, ", ")
158 + "\n";
159 }
160 proxy_str += "Active proxy: [" + StringifyInt(active_group) + "] "
161 + proxy_chain[active_group][0].url + "\n";
162 if (fallback_group < proxy_chain.size())
163 proxy_str += "First fallback group: [" + StringifyInt(fallback_group)
164 + "]\n";
165 } else {
166 proxy_str = "No proxies defined\n";
167 }
168 return proxy_str;
169 }
170
171
172 /**
173 * Listener thread on the socket.
174 * TODO(jblomer): create Format... helpers to shorten this method
175 */
176 void *TalkManager::MainResponder(void *data) {
177 TalkManager *talk_mgr = reinterpret_cast<TalkManager *>(data);
178 MountPoint *mount_point = talk_mgr->mount_point_;
179 FileSystem *file_system = mount_point->file_system();
180 FuseRemounter *remounter = talk_mgr->remounter_;
181 LogCvmfs(kLogTalk, kLogDebug, "talk thread started");
182
183 struct sockaddr_un remote;
184 socklen_t socket_size = sizeof(remote);
185 int con_fd = -1;
186 while (true) {
187 if (con_fd >= 0) {
188 shutdown(con_fd, SHUT_RDWR);
189 close(con_fd);
190 }
191 LogCvmfs(kLogTalk, kLogDebug, "accepting connections on socketfd %d",
192 talk_mgr->socket_fd_);
193 if ((con_fd = accept(
194 talk_mgr->socket_fd_, (struct sockaddr *)&remote, &socket_size))
195 < 0) {
196 LogCvmfs(kLogTalk, kLogDebug, "terminating talk thread (fd %d, errno %d)",
197 con_fd, errno);
198 break;
199 }
200
201 char buf[kMaxCommandSize];
202 int bytes_read;
203 if ((bytes_read = recv(con_fd, buf, sizeof(buf), 0)) <= 0)
204 continue;
205
206 if (buf[bytes_read - 1] == '\0')
207 bytes_read--;
208 const string line = string(buf, bytes_read);
209 LogCvmfs(kLogTalk, kLogDebug, "received %s (length %lu)", line.c_str(),
210 line.length());
211
212 if (line == "tracebuffer flush") {
213 mount_point->tracer()->Flush();
214 talk_mgr->Answer(con_fd, "OK\n");
215 } else if (line == "cache size") {
216 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
217 if (!quota_mgr->HasCapability(QuotaManager::kCapIntrospectSize)) {
218 talk_mgr->Answer(con_fd, "Cache cannot report its size\n");
219 } else {
220 const uint64_t size_unpinned = quota_mgr->GetSize();
221 const uint64_t size_pinned = quota_mgr->GetSizePinned();
222 const string size_str = "Current cache size is "
223 + StringifyInt(size_unpinned / (1024 * 1024))
224 + "MB (" + StringifyInt(size_unpinned)
225 + " Bytes), pinned: "
226 + StringifyInt(size_pinned / (1024 * 1024))
227 + "MB (" + StringifyInt(size_pinned)
228 + " Bytes)\n";
229 talk_mgr->Answer(con_fd, size_str);
230 }
231 } else if (line == "cache instance") {
232 talk_mgr->Answer(con_fd, file_system->cache_mgr()->Describe());
233 } else if (line == "cache list") {
234 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
235 if (!quota_mgr->HasCapability(QuotaManager::kCapList)) {
236 talk_mgr->Answer(con_fd, "Cache cannot list its entries\n");
237 } else {
238 const vector<string> ls = quota_mgr->List();
239 talk_mgr->AnswerStringList(con_fd, ls);
240 }
241 } else if (line == "cache list pinned") {
242 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
243 if (!quota_mgr->HasCapability(QuotaManager::kCapList)) {
244 talk_mgr->Answer(con_fd, "Cache cannot list its entries\n");
245 } else {
246 const vector<string> ls_pinned = quota_mgr->ListPinned();
247 talk_mgr->AnswerStringList(con_fd, ls_pinned);
248 }
249 } else if (line == "cache list catalogs") {
250 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
251 if (!quota_mgr->HasCapability(QuotaManager::kCapList)) {
252 talk_mgr->Answer(con_fd, "Cache cannot list its entries\n");
253 } else {
254 const vector<string> ls_catalogs = quota_mgr->ListCatalogs();
255 talk_mgr->AnswerStringList(con_fd, ls_catalogs);
256 }
257 } else if (line.substr(0, 12) == "cleanup rate") {
258 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
259 if (!quota_mgr->HasCapability(QuotaManager::kCapIntrospectCleanupRate)) {
260 talk_mgr->Answer(con_fd, "Unsupported by this cache\n");
261 } else {
262 if (line.length() < 14) {
263 talk_mgr->Answer(con_fd, "Usage: cleanup rate <period in mn>\n");
264 } else {
265 const uint64_t period_s = String2Uint64(line.substr(13)) * 60;
266 const uint64_t rate = quota_mgr->GetCleanupRate(period_s);
267 talk_mgr->Answer(con_fd, StringifyInt(rate) + "\n");
268 }
269 }
270 } else if (line.substr(0, 15) == "cache limit set") {
271 if (line.length() < 16) {
272 talk_mgr->Answer(con_fd, "Usage: cache limit set <MB>\n");
273 } else {
274 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
275 const uint64_t size = String2Uint64(line.substr(16));
276 if (size < 1000) {
277 talk_mgr->Answer(con_fd, "New limit too low (minimum 1000)\n");
278 } else {
279 if (quota_mgr->SetLimit(size * 1024 * 1024)) {
280 file_system->options_mgr()->SetValueFromTalk("CVMFS_QUOTA_LIMIT",
281 StringifyUint(size));
282 talk_mgr->Answer(con_fd, "OK\n");
283 } else {
284 talk_mgr->Answer(con_fd, "Limit not reset\n");
285 }
286 }
287 }
288 } else if (line == "cache limit get") {
289 std::string limit_from_options;
290 file_system->options_mgr()->GetValue("CVMFS_QUOTA_LIMIT",
291 &limit_from_options);
292 talk_mgr->Answer(con_fd, limit_from_options + "\n");
293 } else if (line.substr(0, 7) == "cleanup") {
294 QuotaManager *quota_mgr = file_system->cache_mgr()->quota_mgr();
295 if (!quota_mgr->HasCapability(QuotaManager::kCapShrink)) {
296 talk_mgr->Answer(con_fd, "Cache cannot trigger eviction\n");
297 } else {
298 if (line.length() < 9) {
299 talk_mgr->Answer(con_fd, "Usage: cleanup <MB>\n");
300 } else {
301 const uint64_t size = String2Uint64(line.substr(8)) * 1024 * 1024;
302 if (quota_mgr->Cleanup(size)) {
303 talk_mgr->Answer(con_fd, "OK\n");
304 } else {
305 talk_mgr->Answer(con_fd, "Not fully cleaned "
306 "(there might be pinned chunks)\n");
307 }
308 }
309 }
310 } else if (line.substr(0, 5) == "evict") {
311 assert(mount_point->file_system()->type() == FileSystem::kFsFuse);
312 if (line.length() < 7) {
313 talk_mgr->Answer(con_fd, "Usage: evict <path>\n");
314 } else {
315 const string path = line.substr(6);
316 const bool found_regular = cvmfs::Evict(path);
317 if (found_regular)
318 talk_mgr->Answer(con_fd, "OK\n");
319 else
320 talk_mgr->Answer(con_fd, "No such regular file\n");
321 }
322 } else if (line.substr(0, 3) == "pin") {
323 assert(mount_point->file_system()->type() == FileSystem::kFsFuse);
324 if (line.length() < 5) {
325 talk_mgr->Answer(con_fd, "Usage: pin <path>\n");
326 } else {
327 const string path = line.substr(4);
328 const bool found_regular = cvmfs::Pin(path);
329 if (found_regular)
330 talk_mgr->Answer(con_fd, "OK\n");
331 else
332 talk_mgr->Answer(con_fd, "No such regular file or pinning failed\n");
333 }
334 } else if (line == "mountpoint") {
335 talk_mgr->Answer(con_fd, cvmfs::loader_exports_->mount_point + "\n");
336 } else if (line == "device id") {
337 if (cvmfs::loader_exports_->version >= 5)
338 talk_mgr->Answer(con_fd, cvmfs::loader_exports_->device_id + "\n");
339 else
340 talk_mgr->Answer(con_fd, "0:0\n");
341 } else if (line.substr(0, 13) == "send mount fd") {
342 // Hidden command intended to be used only by the cvmfs mount helper
343 if (line.length() < 15) {
344 talk_mgr->Answer(con_fd, "EINVAL\n");
345 } else {
346 const std::string socket_path = line.substr(14);
347 const bool retval = cvmfs::SendFuseFd(socket_path);
348 talk_mgr->Answer(con_fd, retval ? "OK\n" : "Failed\n");
349 LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslog,
350 "Attempt to send fuse connection info to new mount (via %s)%s",
351 socket_path.c_str(), retval ? "" : " -- failed!");
352 }
353 } else if (line.substr(0, 7) == "remount") {
354 FuseRemounter::Status status;
355 if (line == "remount sync")
356 status = remounter->CheckSynchronously();
357 else
358 status = remounter->Check();
359 switch (status) {
360 case FuseRemounter::kStatusFailGeneral:
361 talk_mgr->Answer(con_fd, "Failed\n");
362 break;
363 case FuseRemounter::kStatusFailNoSpace:
364 talk_mgr->Answer(con_fd, "Failed (no space)\n");
365 break;
366 case FuseRemounter::kStatusUp2Date:
367 talk_mgr->Answer(con_fd, "Catalog up to date\n");
368 break;
369 case FuseRemounter::kStatusDraining:
370 talk_mgr->Answer(con_fd, "New revision applied\n");
371 break;
372 case FuseRemounter::kStatusMaintenance:
373 talk_mgr->Answer(con_fd, "In maintenance mode\n");
374 break;
375 default:
376 talk_mgr->Answer(con_fd, "internal error\n");
377 }
378 } else if (line.substr(0, 6) == "chroot") {
379 if (line.length() < 8) {
380 talk_mgr->Answer(con_fd, "Usage: chroot <hash>\n");
381 } else {
382 const std::string root_hash =
383 Trim(line.substr(7), true /* trim_newline */);
384 const FuseRemounter::Status status = remounter->ChangeRoot(
385 MkFromHexPtr(shash::HexPtr(root_hash), shash::kSuffixCatalog));
386 switch (status) {
387 case FuseRemounter::kStatusUp2Date:
388 talk_mgr->Answer(con_fd, "OK\n");
389 break;
390 default:
391 talk_mgr->Answer(con_fd, "Failed\n");
392 break;
393 }
394 }
395 } else if (line == "detach nested catalogs") {
396 mount_point->catalog_mgr()->DetachNested();
397 talk_mgr->Answer(con_fd, "OK\n");
398 } else if (line == "revision") {
399 const string revision =
400 StringifyInt(mount_point->catalog_mgr()->GetRevision());
401 talk_mgr->Answer(con_fd, revision + "\n");
402 } else if (line == "max ttl info") {
403 const unsigned max_ttl = mount_point->GetMaxTtlMn();
404 if (max_ttl == 0) {
405 talk_mgr->Answer(con_fd, "unset\n");
406 } else {
407 const string max_ttl_str = StringifyInt(max_ttl) + " minutes\n";
408 talk_mgr->Answer(con_fd, max_ttl_str);
409 }
410 } else if (line.substr(0, 11) == "max ttl set") {
411 if (line.length() < 13) {
412 talk_mgr->Answer(con_fd, "Usage: max ttl set <minutes>\n");
413 } else {
414 const unsigned max_ttl = String2Uint64(line.substr(12));
415 mount_point->SetMaxTtlMn(max_ttl);
416 talk_mgr->Answer(con_fd, "OK\n");
417 }
418 } else if (line.substr(0, 14) == "nameserver get") {
419 const string dns_server = mount_point->download_mgr()->GetDnsServer();
420 const string reply = !dns_server.empty()
421 ? std::string("DNS server address: ")
422 + dns_server + "\n"
423 : std::string("DNS server not set.\n");
424 talk_mgr->Answer(con_fd, reply);
425 } else if (line.substr(0, 14) == "nameserver set") {
426 if (line.length() < 16) {
427 talk_mgr->Answer(con_fd, "Usage: nameserver set <host>\n");
428 } else {
429 const string host = line.substr(15);
430 mount_point->download_mgr()->SetDnsServer(host);
431 talk_mgr->Answer(con_fd, "OK\n");
432 }
433 } else if (line.substr(0, 22) == "__testing_freeze_cvmfs") {
434 const std::string fs_dir = line.substr(23) + "/dir";
435 mkdir(fs_dir.c_str(), 0700);
436 } else if (line == "external metalink info") {
437 const string external_metalink_info = talk_mgr->FormatMetalinkInfo(
438 mount_point->external_download_mgr());
439 talk_mgr->Answer(con_fd, external_metalink_info);
440 } else if (line == "metalink info") {
441 const string metalink_info = talk_mgr->FormatMetalinkInfo(
442 mount_point->download_mgr());
443 talk_mgr->Answer(con_fd, metalink_info);
444 } else if (line == "external host info") {
445 const string external_host_info = talk_mgr->FormatHostInfo(
446 mount_point->external_download_mgr());
447 talk_mgr->Answer(con_fd, external_host_info);
448 } else if (line == "host info") {
449 const string host_info = talk_mgr->FormatHostInfo(
450 mount_point->download_mgr());
451 talk_mgr->Answer(con_fd, host_info);
452 } else if (line == "host probe") {
453 mount_point->download_mgr()->ProbeHosts();
454 talk_mgr->Answer(con_fd, "OK\n");
455 } else if (line == "host probe geo") {
456 const bool retval = mount_point->download_mgr()->ProbeGeo();
457 if (retval)
458 talk_mgr->Answer(con_fd, "OK\n");
459 else
460 talk_mgr->Answer(con_fd, "Failed\n");
461 } else if (line == "external metalink switch") {
462 mount_point->external_download_mgr()->SwitchMetalink();
463 talk_mgr->Answer(con_fd, "OK\n");
464 } else if (line == "metalink switch") {
465 mount_point->download_mgr()->SwitchMetalink();
466 talk_mgr->Answer(con_fd, "OK\n");
467 } else if (line == "external host switch") {
468 mount_point->external_download_mgr()->SwitchHost();
469 talk_mgr->Answer(con_fd, "OK\n");
470 } else if (line == "host switch") {
471 mount_point->download_mgr()->SwitchHost();
472 talk_mgr->Answer(con_fd, "OK\n");
473 } else if (line.substr(0, 21) == "external metalink set") {
474 if (line.length() < 23) {
475 talk_mgr->Answer(con_fd, "Usage: external metalink set <URL>\n");
476 } else {
477 const std::string host = line.substr(22);
478 mount_point->external_download_mgr()->SetMetalinkChain(host);
479 talk_mgr->Answer(con_fd, "OK\n");
480 }
481 } else if (line.substr(0, 12) == "metalink set") {
482 if (line.length() < 14) {
483 talk_mgr->Answer(con_fd, "Usage: metalink set <URL>\n");
484 } else {
485 const std::string host = line.substr(13);
486 mount_point->download_mgr()->SetMetalinkChain(host);
487 talk_mgr->Answer(con_fd, "OK\n");
488 }
489 } else if (line.substr(0, 17) == "external host set") {
490 if (line.length() < 19) {
491 talk_mgr->Answer(con_fd, "Usage: external host set <URL>\n");
492 } else {
493 const std::string host = line.substr(18);
494 mount_point->external_download_mgr()->SetHostChain(host);
495 talk_mgr->Answer(con_fd, "OK\n");
496 }
497 } else if (line.substr(0, 8) == "host set") {
498 if (line.length() < 10) {
499 talk_mgr->Answer(con_fd, "Usage: host set <host list>\n");
500 } else {
501 const string hosts = line.substr(9);
502 mount_point->download_mgr()->SetHostChain(hosts);
503 talk_mgr->Answer(con_fd, "OK\n");
504 }
505 } else if (line == "external proxy info") {
506 const string external_proxy_info =
507 talk_mgr->FormatProxyInfo(mount_point->external_download_mgr());
508 talk_mgr->Answer(con_fd, external_proxy_info);
509 } else if (line == "proxy info") {
510 const string proxy_info =
511 talk_mgr->FormatProxyInfo(mount_point->download_mgr());
512 talk_mgr->Answer(con_fd, proxy_info);
513 } else if (line == "proxy rebalance") {
514 mount_point->download_mgr()->RebalanceProxies();
515 talk_mgr->Answer(con_fd, "OK\n");
516 } else if (line == "proxy group switch") {
517 mount_point->download_mgr()->SwitchProxyGroup();
518 talk_mgr->Answer(con_fd, "OK\n");
519 } else if (line.substr(0, 18) == "external proxy set") {
520 if (line.length() < 20) {
521 talk_mgr->Answer(con_fd, "Usage: external proxy set <proxy list>\n");
522 } else {
523 const string external_proxies = line.substr(19);
524 mount_point->external_download_mgr()->SetProxyChain(
525 external_proxies, "", download::DownloadManager::kSetProxyRegular);
526 talk_mgr->Answer(con_fd, "OK\n");
527 }
528 } else if (line.substr(0, 9) == "proxy set") {
529 if (line.length() < 11) {
530 talk_mgr->Answer(con_fd, "Usage: proxy set <proxy list>\n");
531 } else {
532 string proxies = line.substr(10);
533 proxies = download::ResolveProxyDescription(
534 proxies, "", mount_point->download_mgr());
535 if (proxies == "") {
536 talk_mgr->Answer(con_fd, "Failed, no valid proxies\n");
537 } else {
538 mount_point->download_mgr()->SetProxyChain(
539 proxies, "", download::DownloadManager::kSetProxyRegular);
540 talk_mgr->Answer(con_fd, "OK\n");
541 }
542 }
543 } else if (line.substr(0, 14) == "proxy fallback") {
544 if (line.length() < 15) {
545 talk_mgr->Answer(con_fd, "Usage: proxy fallback <proxy list>\n");
546 } else {
547 const string fallback_proxies = line.substr(15);
548 mount_point->download_mgr()->SetProxyChain(
549 "", fallback_proxies, download::DownloadManager::kSetProxyFallback);
550 talk_mgr->Answer(con_fd, "OK\n");
551 }
552 } else if (line == "timeout info") {
553 unsigned timeout;
554 unsigned timeout_direct;
555 mount_point->download_mgr()->GetTimeout(&timeout, &timeout_direct);
556 string timeout_str = "Timeout with proxy: ";
557 if (timeout)
558 timeout_str += StringifyInt(timeout) + "s\n";
559 else
560 timeout_str += "no timeout\n";
561 timeout_str += "Timeout without proxy: ";
562 if (timeout_direct)
563 timeout_str += StringifyInt(timeout_direct) + "s\n";
564 else
565 timeout_str += "no timeout\n";
566 talk_mgr->Answer(con_fd, timeout_str);
567 } else if (line.substr(0, 11) == "timeout set") {
568 if (line.length() < 13) {
569 talk_mgr->Answer(con_fd, "Usage: timeout set <proxy> <direct>\n");
570 } else {
571 uint64_t timeout;
572 uint64_t timeout_direct;
573 String2Uint64Pair(line.substr(12), &timeout, &timeout_direct);
574 mount_point->download_mgr()->SetTimeout(timeout, timeout_direct);
575 talk_mgr->Answer(con_fd, "OK\n");
576 }
577 } else if (line == "open catalogs") {
578 talk_mgr->Answer(con_fd, mount_point->catalog_mgr()->PrintHierarchy());
579 } else if (line == "drop metadata caches") {
580 // For testing
581 mount_point->inode_cache()->Pause();
582 mount_point->path_cache()->Pause();
583 mount_point->md5path_cache()->Pause();
584 mount_point->inode_cache()->Drop();
585 mount_point->path_cache()->Drop();
586 mount_point->md5path_cache()->Drop();
587 mount_point->inode_cache()->Resume();
588 mount_point->path_cache()->Resume();
589 mount_point->md5path_cache()->Resume();
590 talk_mgr->Answer(con_fd, "OK\n");
591 } else if (line == "internal affairs") {
592 int current;
593 int highwater;
594 string result;
595
596 result += "Inode Generation:\n " + cvmfs::PrintInodeGeneration();
597
598 // Manually setting the values of the ShortString counters
599 mount_point->statistics()
600 ->Lookup("pathstring.n_instances")
601 ->Set(PathString::num_instances());
602 mount_point->statistics()
603 ->Lookup("pathstring.n_overflows")
604 ->Set(PathString::num_overflows());
605 mount_point->statistics()
606 ->Lookup("namestring.n_instances")
607 ->Set(NameString::num_instances());
608 mount_point->statistics()
609 ->Lookup("namestring.n_overflows")
610 ->Set(NameString::num_overflows());
611 mount_point->statistics()
612 ->Lookup("linkstring.n_instances")
613 ->Set(LinkString::num_instances());
614 mount_point->statistics()
615 ->Lookup("linkstring.n_overflows")
616 ->Set(LinkString::num_overflows());
617
618 // Manually setting the inode tracker numbers
619 glue::InodeTracker::Statistics inode_stats = mount_point->inode_tracker()
620 ->GetStatistics();
621 const glue::DentryTracker::Statistics dentry_stats =
622 mount_point->dentry_tracker()->GetStatistics();
623 const glue::PageCacheTracker::Statistics page_cache_stats =
624 mount_point->page_cache_tracker()->GetStatistics();
625 mount_point->statistics()
626 ->Lookup("inode_tracker.n_insert")
627 ->Set(atomic_read64(&inode_stats.num_inserts));
628 mount_point->statistics()
629 ->Lookup("inode_tracker.n_remove")
630 ->Set(atomic_read64(&inode_stats.num_removes));
631 mount_point->statistics()
632 ->Lookup("inode_tracker.no_reference")
633 ->Set(atomic_read64(&inode_stats.num_references));
634 mount_point->statistics()
635 ->Lookup("inode_tracker.n_hit_inode")
636 ->Set(atomic_read64(&inode_stats.num_hits_inode));
637 mount_point->statistics()
638 ->Lookup("inode_tracker.n_hit_path")
639 ->Set(atomic_read64(&inode_stats.num_hits_path));
640 mount_point->statistics()
641 ->Lookup("inode_tracker.n_miss_path")
642 ->Set(atomic_read64(&inode_stats.num_misses_path));
643 mount_point->statistics()
644 ->Lookup("dentry_tracker.n_insert")
645 ->Set(dentry_stats.num_insert);
646 mount_point->statistics()
647 ->Lookup("dentry_tracker.n_remove")
648 ->Set(dentry_stats.num_remove);
649 mount_point->statistics()
650 ->Lookup("dentry_tracker.n_prune")
651 ->Set(dentry_stats.num_prune);
652 mount_point->statistics()
653 ->Lookup("page_cache_tracker.n_insert")
654 ->Set(page_cache_stats.n_insert);
655 mount_point->statistics()
656 ->Lookup("page_cache_tracker.n_remove")
657 ->Set(page_cache_stats.n_remove);
658 mount_point->statistics()
659 ->Lookup("page_cache_tracker.n_open_direct")
660 ->Set(page_cache_stats.n_open_direct);
661 mount_point->statistics()
662 ->Lookup("page_cache_tracker.n_open_flush")
663 ->Set(page_cache_stats.n_open_flush);
664 mount_point->statistics()
665 ->Lookup("page_cache_tracker.n_open_cached")
666 ->Set(page_cache_stats.n_open_cached);
667
668 if (file_system->cache_mgr()->id() == kPosixCacheManager) {
669 PosixCacheManager *cache_mgr = reinterpret_cast<PosixCacheManager *>(
670 file_system->cache_mgr());
671 result += "\nCache Mode: ";
672 switch (cache_mgr->cache_mode()) {
673 case PosixCacheManager::kCacheReadWrite:
674 result += "read-write";
675 break;
676 case PosixCacheManager::kCacheReadOnly:
677 result += "read-only";
678 break;
679 default:
680 result += "unknown";
681 }
682 }
683 bool drainout_mode;
684 bool maintenance_mode;
685 cvmfs::GetReloadStatus(&drainout_mode, &maintenance_mode);
686 result += "\nDrainout Mode: " + StringifyBool(drainout_mode) + "\n";
687 result += "Maintenance Mode: " + StringifyBool(maintenance_mode) + "\n";
688
689 if (file_system->IsNfsSource()) {
690 result += "\nNFS Map Statistics:\n";
691 result += file_system->nfs_maps()->GetStatistics();
692 }
693
694 result += "SQlite Statistics:\n";
695 sqlite3_status(SQLITE_STATUS_MALLOC_COUNT, &current, &highwater, 0);
696 result += " Number of allocations " + StringifyInt(current) + "\n";
697
698 sqlite3_status(SQLITE_STATUS_MEMORY_USED, &current, &highwater, 0);
699 result += " General purpose allocator " + StringifyInt(current / 1024)
700 + " KB / " + StringifyInt(highwater / 1024) + " KB\n";
701
702 sqlite3_status(SQLITE_STATUS_MALLOC_SIZE, &current, &highwater, 0);
703 result += " Largest malloc " + StringifyInt(highwater) + " Bytes\n";
704
705 sqlite3_status(SQLITE_STATUS_PAGECACHE_USED, &current, &highwater, 0);
706 result += " Page cache allocations " + StringifyInt(current) + " / "
707 + StringifyInt(highwater) + "\n";
708
709 sqlite3_status(SQLITE_STATUS_PAGECACHE_OVERFLOW, &current, &highwater, 0);
710 result += " Page cache overflows " + StringifyInt(current / 1024)
711 + " KB / " + StringifyInt(highwater / 1024) + " KB\n";
712
713 sqlite3_status(SQLITE_STATUS_PAGECACHE_SIZE, &current, &highwater, 0);
714 result += " Largest page cache allocation " + StringifyInt(highwater)
715 + " Bytes\n";
716
717 sqlite3_status(SQLITE_STATUS_SCRATCH_USED, &current, &highwater, 0);
718 result += " Scratch allocations " + StringifyInt(current) + " / "
719 + StringifyInt(highwater) + "\n";
720
721 sqlite3_status(SQLITE_STATUS_SCRATCH_OVERFLOW, &current, &highwater, 0);
722 result += " Scratch overflows " + StringifyInt(current) + " / "
723 + StringifyInt(highwater) + "\n";
724
725 sqlite3_status(SQLITE_STATUS_SCRATCH_SIZE, &current, &highwater, 0);
726 result += " Largest scratch allocation " + StringifyInt(highwater / 1024)
727 + " KB\n";
728
729 result += "\nPer-Connection Memory Statistics:\n"
730 + mount_point->catalog_mgr()->PrintAllMemStatistics();
731
732 result += "\nLatency distribution of system calls:\n";
733
734 result += "Lookup\n" + file_system->hist_fs_lookup()->ToString();
735 result += "Forget\n" + file_system->hist_fs_forget()->ToString();
736 result += "Multi-Forget\n"
737 + file_system->hist_fs_forget_multi()->ToString();
738 result += "Getattr\n" + file_system->hist_fs_getattr()->ToString();
739 result += "Readlink\n" + file_system->hist_fs_readlink()->ToString();
740 result += "Opendir\n" + file_system->hist_fs_opendir()->ToString();
741 result += "Releasedir\n" + file_system->hist_fs_releasedir()->ToString();
742 result += "Readdir\n" + file_system->hist_fs_readdir()->ToString();
743 result += "Open\n" + file_system->hist_fs_open()->ToString();
744 result += "Read\n" + file_system->hist_fs_read()->ToString();
745 result += "Release\n" + file_system->hist_fs_release()->ToString();
746
747 result += "\nRaw Counters:\n"
748 + mount_point->statistics()->PrintList(
749 perf::Statistics::kPrintHeader);
750
751 talk_mgr->Answer(con_fd, result);
752 } else if (line == "reset error counters") {
753 file_system->ResetErrorCounters();
754 talk_mgr->Answer(con_fd, "OK\n");
755 } else if (line == "pid") {
756 const string pid_str = StringifyInt(cvmfs::pid_) + "\n";
757 talk_mgr->Answer(con_fd, pid_str);
758 } else if (line == "pid cachemgr") {
759 const string pid_str = StringifyInt(file_system->cache_mgr()
760 ->quota_mgr()
761 ->GetPid())
762 + "\n";
763 talk_mgr->Answer(con_fd, pid_str);
764 } else if (line == "pid watchdog") {
765 const string pid_str = StringifyInt(Watchdog::GetPid()) + "\n";
766 talk_mgr->Answer(con_fd, pid_str);
767 } else if (line == "parameters") {
768 talk_mgr->Answer(con_fd, file_system->options_mgr()->Dump());
769 } else if (line == "hotpatch history") {
770 string history_str = StringifyTime(cvmfs::loader_exports_->boot_time,
771 true)
772 + " (start of CernVM-FS loader "
773 + cvmfs::loader_exports_->loader_version + ")\n";
774 for (loader::EventList::const_iterator
775 i = cvmfs::loader_exports_->history.begin(),
776 iEnd = cvmfs::loader_exports_->history.end();
777 i != iEnd;
778 ++i) {
779 history_str += StringifyTime((*i)->timestamp, true)
780 + " (loaded CernVM-FS Fuse Module " + (*i)->so_version
781 + ")\n";
782 }
783 talk_mgr->Answer(con_fd, history_str);
784 } else if (line == "vfs inodes") {
785 string result;
786 glue::InodeTracker::Cursor cursor(
787 mount_point->inode_tracker()->BeginEnumerate());
788 uint64_t inode;
789 while (mount_point->inode_tracker()->NextInode(&cursor, &inode)) {
790 result += StringifyInt(inode) + "\n";
791 }
792 mount_point->inode_tracker()->EndEnumerate(&cursor);
793 talk_mgr->Answer(con_fd, result);
794 } else if (line == "vfs entries") {
795 string result;
796 glue::InodeTracker::Cursor cursor(
797 mount_point->inode_tracker()->BeginEnumerate());
798 uint64_t inode_parent;
799 NameString name;
800 while (mount_point->inode_tracker()->NextEntry(&cursor, &inode_parent,
801 &name)) {
802 result += "<" + StringifyInt(inode_parent) + ">/" + name.ToString()
803 + "\n";
804 }
805 mount_point->inode_tracker()->EndEnumerate(&cursor);
806 talk_mgr->Answer(con_fd, result);
807 } else if (line == "version") {
808 const string version_str = string(CVMFS_VERSION)
809 + " (CernVM-FS Fuse Module)\n"
810 + cvmfs::loader_exports_->loader_version
811 + " (Loader)\n";
812 talk_mgr->Answer(con_fd, version_str);
813 } else if (line == "version patchlevel") {
814 talk_mgr->Answer(con_fd, string(CVMFS_PATCH_LEVEL) + "\n");
815 } else if (line == "tear down to read-only") {
816 if (file_system->cache_mgr()->id() != kPosixCacheManager) {
817 talk_mgr->Answer(con_fd, "not supported\n");
818 } else {
819 // hack
820 cvmfs::UnregisterQuotaListener();
821 file_system->TearDown2ReadOnly();
822 talk_mgr->Answer(con_fd, "In read-only mode\n");
823 }
824 } else if (line == "latency") {
825 const string result =
826 talk_mgr->FormatLatencies(*mount_point, file_system);
827 talk_mgr->Answer(con_fd, result);
828 } else {
829 talk_mgr->Answer(con_fd, "unknown command\n");
830 }
831 }
832
833 return NULL;
834 } // NOLINT(readability/fn_size)
835
836 string TalkManager::FormatLatencies(const MountPoint &mount_point,
837 FileSystem *file_system) {
838 string result;
839 const unsigned int bufSize = 300;
840 char buffer[bufSize];
841
842 vector<float> qs;
843 qs.push_back(.1);
844 qs.push_back(.2);
845 qs.push_back(.25);
846 qs.push_back(.3);
847 qs.push_back(.4);
848 qs.push_back(.5);
849 qs.push_back(.6);
850 qs.push_back(.7);
851 qs.push_back(.75);
852 qs.push_back(.8);
853 qs.push_back(.9);
854 qs.push_back(.95);
855 qs.push_back(.99);
856 qs.push_back(.999);
857 qs.push_back(.9999);
858
859 const string repo(mount_point.fqrn());
860
861 unsigned int format_index = snprintf(
862 buffer, bufSize, "\"%s\",\"%s\",\"%s\",\"%s\"", "repository", "action",
863 "total_count", "time_unit");
864 for (unsigned int i = 0; i < qs.size(); i++) {
865 format_index += snprintf(buffer + format_index, bufSize - format_index,
866 ",%0.5f", qs[i]);
867 }
868 format_index += snprintf(buffer + format_index, bufSize - format_index, "\n");
869 assert(format_index < bufSize);
870
871 result += buffer;
872 memset(buffer, 0, sizeof(buffer));
873 format_index = 0;
874
875 vector<Log2Histogram *> hist;
876 vector<string> names;
877 hist.push_back(file_system->hist_fs_lookup());
878 names.push_back("lookup");
879 hist.push_back(file_system->hist_fs_forget());
880 names.push_back("forget");
881 hist.push_back(file_system->hist_fs_forget_multi());
882 names.push_back("forget_multi");
883 hist.push_back(file_system->hist_fs_getattr());
884 names.push_back("getattr");
885 hist.push_back(file_system->hist_fs_readlink());
886 names.push_back("readlink");
887 hist.push_back(file_system->hist_fs_opendir());
888 names.push_back("opendir");
889 hist.push_back(file_system->hist_fs_releasedir());
890 names.push_back("releasedir");
891 hist.push_back(file_system->hist_fs_readdir());
892 names.push_back("readdir");
893 hist.push_back(file_system->hist_fs_open());
894 names.push_back("open");
895 hist.push_back(file_system->hist_fs_read());
896 names.push_back("read");
897 hist.push_back(file_system->hist_fs_release());
898 names.push_back("release");
899
900 for (unsigned int j = 0; j < hist.size(); j++) {
901 Log2Histogram *h = hist[j];
902 unsigned int format_index = snprintf(
903 buffer, bufSize, "\"%s\",\"%s\",%" PRIu64 ",\"nanoseconds\"",
904 repo.c_str(), names[j].c_str(), h->N());
905 for (unsigned int i = 0; i < qs.size(); i++) {
906 format_index += snprintf(buffer + format_index, bufSize - format_index,
907 ",%u", h->GetQuantile(qs[i]));
908 }
909 format_index += snprintf(buffer + format_index, bufSize - format_index,
910 "\n");
911 assert(format_index < bufSize);
912
913 result += buffer;
914 memset(buffer, 0, sizeof(buffer));
915 format_index = 0;
916 }
917 return result;
918 }
919
920 TalkManager::TalkManager(const string &socket_path,
921 MountPoint *mount_point,
922 FuseRemounter *remounter)
923 : socket_path_(socket_path)
924 , socket_fd_(-1)
925 , mount_point_(mount_point)
926 , remounter_(remounter)
927 , spawned_(false) {
928 memset(&thread_talk_, 0, sizeof(thread_talk_));
929 }
930
931
932 TalkManager::~TalkManager() {
933 if (!socket_path_.empty()) {
934 const int retval = unlink(socket_path_.c_str());
935 if ((retval != 0) && (errno != ENOENT)) {
936 LogCvmfs(kLogTalk, kLogSyslogWarn,
937 "Could not remove cvmfs_io socket from cache directory (%d)",
938 errno);
939 }
940 }
941
942 if (socket_fd_ >= 0) {
943 shutdown(socket_fd_, SHUT_RDWR);
944 close(socket_fd_);
945 }
946
947 if (spawned_) {
948 pthread_join(thread_talk_, NULL);
949 LogCvmfs(kLogTalk, kLogDebug, "talk thread stopped");
950 }
951 }
952
953
954 void TalkManager::Spawn() {
955 const int retval = pthread_create(&thread_talk_, NULL, MainResponder, this);
956 assert(retval == 0);
957 spawned_ = true;
958 }
959