Directory: | cvmfs/ |
---|---|
File: | cvmfs/fetch.cc |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 151 | 153 | 98.7% |
Branches: | 101 | 173 | 58.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | |||
6 | #include "fetch.h" | ||
7 | |||
8 | #include <unistd.h> | ||
9 | |||
10 | #include "backoff.h" | ||
11 | #include "cache.h" | ||
12 | #include "clientctx.h" | ||
13 | #include "interrupt.h" | ||
14 | #include "network/download.h" | ||
15 | #include "quota.h" | ||
16 | #include "statistics.h" | ||
17 | #include "util/concurrency.h" | ||
18 | #include "util/logging.h" | ||
19 | #include "util/posix.h" | ||
20 | |||
21 | using namespace std; // NOLINT | ||
22 | |||
23 | namespace cvmfs { | ||
24 | |||
25 | 2 | void TLSDestructor(void *data) { | |
26 | 2 | Fetcher::ThreadLocalStorage *tls = | |
27 | static_cast<Fetcher::ThreadLocalStorage *>(data); | ||
28 | 2 | std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks = | |
29 | 2 | &tls->fetcher->tls_blocks_; | |
30 | |||
31 | { | ||
32 | 2 | MutexLockGuard m(tls->fetcher->lock_tls_blocks_); | |
33 | 2 | for (vector<Fetcher::ThreadLocalStorage *>::iterator | |
34 | 2 | i = tls_blocks->begin(), | |
35 | 2 | iEnd = tls_blocks->end(); | |
36 |
1/2✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
|
4 | i != iEnd; ++i) { |
37 |
2/2✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
|
4 | if ((*i) == tls) { |
38 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | tls_blocks->erase(i); |
39 | 2 | break; | |
40 | } | ||
41 | } | ||
42 | 2 | } | |
43 | 2 | tls->fetcher->CleanupTls(tls); | |
44 | 2 | } | |
45 | |||
46 | |||
47 | /** | ||
48 | * Called when a thread exists, releases a ThreadLocalStorage object and | ||
49 | * removes the pointer to it from tls_blocks_. | ||
50 | */ | ||
51 | 31 | void Fetcher::CleanupTls(ThreadLocalStorage *tls) { | |
52 | 31 | ClosePipe(tls->pipe_wait); | |
53 |
1/2✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
|
31 | delete tls; |
54 | 31 | } | |
55 | |||
56 | |||
57 | /** | ||
58 | * Initialized thread-local storage if called the first time in a new thread. | ||
59 | */ | ||
60 | 54 | Fetcher::ThreadLocalStorage *Fetcher::GetTls() { | |
61 | ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>( | ||
62 | 54 | pthread_getspecific(thread_local_storage_)); | |
63 |
2/2✓ Branch 0 taken 23 times.
✓ Branch 1 taken 31 times.
|
54 | if (tls != NULL) |
64 | 23 | return tls; | |
65 | |||
66 |
2/4✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 31 times.
✗ Branch 5 not taken.
|
31 | tls = new ThreadLocalStorage(); |
67 | 31 | tls->fetcher = this; | |
68 |
1/2✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
|
31 | MakePipe(tls->pipe_wait); |
69 | 31 | tls->download_job.SetCompressed(true); | |
70 | 31 | tls->download_job.SetProbeHosts(true); | |
71 | 31 | int retval = pthread_setspecific(thread_local_storage_, tls); | |
72 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 31 times.
|
31 | assert(retval == 0); |
73 | |||
74 | 31 | MutexLockGuard m(lock_tls_blocks_); | |
75 |
1/2✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
|
31 | tls_blocks_.push_back(tls); |
76 | |||
77 | 31 | return tls; | |
78 | 31 | } | |
79 | |||
80 | |||
81 | 64 | int Fetcher::Fetch( | |
82 | const CacheManager::LabeledObject &object, | ||
83 | const std::string &alt_url) | ||
84 | { | ||
85 | int fd_return; // Read-only file descriptor that is returned | ||
86 | int retval; | ||
87 | |||
88 | 64 | perf::Inc(n_invocations); | |
89 | |||
90 | // Try to open from local cache | ||
91 |
3/4✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 20 times.
✓ Branch 4 taken 44 times.
|
64 | if ((fd_return = OpenSelect(object)) >= 0) { |
92 |
1/2✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
|
20 | LogCvmfs(kLogCache, kLogDebug, "hit: %s", object.label.path.c_str()); |
93 | 20 | return fd_return; | |
94 | } | ||
95 | |||
96 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
|
44 | if (object.id.IsNull()) { |
97 | // This has been seen when trying to load the root catalog signed by an | ||
98 | // invalid certificate on an empty cache | ||
99 | // TODO(jblomer): check if still necessary after the catalog reload refactor | ||
100 | ✗ | LogCvmfs(kLogCache, kLogDebug, "cancel attempt to download null hash"); | |
101 | ✗ | return -EIO; | |
102 | } | ||
103 | |||
104 |
1/2✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
|
44 | ThreadLocalStorage *tls = GetTls(); |
105 | |||
106 | // Synchronization point: either act as a master thread for this object or | ||
107 | // enqueue to the list of waiting threads. | ||
108 | 44 | pthread_mutex_lock(lock_queues_download_); | |
109 |
1/2✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
|
44 | ThreadQueues::iterator iDownloadQueue = queues_download_.find(object.id); |
110 |
2/2✓ Branch 2 taken 1 times.
✓ Branch 3 taken 43 times.
|
44 | if (iDownloadQueue != queues_download_.end()) { |
111 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s", |
112 | object.label.path.c_str()); | ||
113 | |||
114 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | iDownloadQueue->second->push_back(tls->pipe_wait[1]); |
115 | 1 | pthread_mutex_unlock(lock_queues_download_); | |
116 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int)); |
117 | |||
118 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s", |
119 | fd_return, object.label.path.c_str()); | ||
120 | 1 | return fd_return; | |
121 | } else { | ||
122 | // Seems we are the first one, check again in the cache (race condition) | ||
123 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | fd_return = OpenSelect(object); |
124 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 42 times.
|
43 | if (fd_return >= 0) { |
125 | 1 | pthread_mutex_unlock(lock_queues_download_); | |
126 | 1 | return fd_return; | |
127 | } | ||
128 | |||
129 | // Create a new queue for this chunk | ||
130 |
1/2✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
|
42 | queues_download_[object.id] = &tls->other_pipes_waiting; |
131 | 42 | pthread_mutex_unlock(lock_queues_download_); | |
132 | } | ||
133 | |||
134 | 42 | perf::Inc(n_downloads); | |
135 | |||
136 | // Involve the download manager | ||
137 |
1/2✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
|
42 | LogCvmfs(kLogCache, kLogDebug, "downloading %s", object.label.path.c_str()); |
138 | 42 | std::string url; | |
139 |
2/2✓ Branch 1 taken 3 times.
✓ Branch 2 taken 39 times.
|
42 | if (object.label.IsExternal()) { |
140 |
2/4✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
|
3 | url = !alt_url.empty() ? alt_url : object.label.path; |
141 | } else { | ||
142 |
8/14✓ Branch 1 taken 1 times.
✓ Branch 2 taken 38 times.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 38 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 38 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 39 times.
✗ Branch 14 not taken.
✓ Branch 18 taken 38 times.
✓ Branch 19 taken 1 times.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
|
39 | url = "/" + (alt_url.size() ? alt_url : "data/" + object.id.MakePath()); |
143 | } | ||
144 |
1/2✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
|
42 | void *txn = alloca(cache_mgr_->SizeOfTxn()); |
145 |
1/2✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
|
42 | retval = cache_mgr_->StartTxn(object.id, object.label.size, txn); |
146 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 41 times.
|
42 | if (retval < 0) { |
147 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s", |
148 | object.label.path.c_str()); | ||
149 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | SignalWaitingThreads(retval, object.id, tls); |
150 | 1 | return retval; | |
151 | } | ||
152 |
1/2✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
|
41 | cache_mgr_->CtrlTxn(object.label, 0, txn); |
153 | |||
154 |
1/2✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
|
41 | LogCvmfs(kLogCache, kLogDebug, "miss: %s %s", |
155 | object.label.path.c_str(), url.c_str()); | ||
156 | 41 | TransactionSink sink(cache_mgr_, txn); | |
157 | 41 | tls->download_job.SetUrl(&url); | |
158 | 41 | tls->download_job.SetSink(&sink); | |
159 | 41 | tls->download_job.SetExpectedHash(&object.id); | |
160 | 41 | tls->download_job.SetExtraInfo(&object.label.path); | |
161 |
1/2✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
|
41 | ClientCtx *ctx = ClientCtx::GetInstance(); |
162 |
3/4✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 37 times.
|
41 | if (ctx->IsSet()) { |
163 |
1/2✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
|
4 | ctx->Get(tls->download_job.GetUidPtr(), |
164 | tls->download_job.GetGidPtr(), | ||
165 | tls->download_job.GetPidPtr(), | ||
166 | tls->download_job.GetInterruptCuePtr()); | ||
167 | } | ||
168 | 41 | tls->download_job.SetCompressed( | |
169 | 41 | object.label.zip_algorithm == zlib::kZlibDefault); | |
170 | 41 | tls->download_job.SetRangeOffset(object.label.range_offset); | |
171 | 41 | tls->download_job.SetRangeSize(static_cast<int64_t>(object.label.size)); | |
172 |
1/2✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
|
41 | download_mgr_->Fetch(&tls->download_job); |
173 | |||
174 |
2/2✓ Branch 1 taken 36 times.
✓ Branch 2 taken 5 times.
|
41 | if (tls->download_job.error_code() == download::kFailOk) { |
175 |
1/2✓ Branch 2 taken 36 times.
✗ Branch 3 not taken.
|
36 | LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str()); |
176 | |||
177 |
1/2✓ Branch 1 taken 36 times.
✗ Branch 2 not taken.
|
36 | fd_return = cache_mgr_->OpenFromTxn(txn); |
178 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 35 times.
|
36 | if (fd_return < 0) { |
179 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | cache_mgr_->AbortTxn(txn); |
180 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | SignalWaitingThreads(fd_return, object.id, tls); |
181 | 1 | return fd_return; | |
182 | } | ||
183 | |||
184 |
1/2✓ Branch 1 taken 35 times.
✗ Branch 2 not taken.
|
35 | retval = cache_mgr_->CommitTxn(txn); |
185 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 33 times.
|
35 | if (retval < 0) { |
186 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | cache_mgr_->Close(fd_return); |
187 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | SignalWaitingThreads(retval, object.id, tls); |
188 | 2 | return retval; | |
189 | } | ||
190 |
1/2✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
|
33 | SignalWaitingThreads(fd_return, object.id, tls); |
191 | 33 | return fd_return; | |
192 | } | ||
193 | |||
194 | // Download failed | ||
195 |
1/5✗ Branch 4 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
|
10 | LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
196 | "failed to fetch %s (hash: %s, error %d [%s])", | ||
197 | object.label.path.c_str(), | ||
198 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
10 | object.id.ToString().c_str(), |
199 | 5 | tls->download_job.error_code(), | |
200 | download::Code2Ascii(tls->download_job.error_code())); | ||
201 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | cache_mgr_->AbortTxn(txn); |
202 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | backoff_throttle_->Throttle(); |
203 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | SignalWaitingThreads(-EIO, object.id, tls); |
204 | 5 | return -EIO; | |
205 | 42 | } | |
206 | |||
207 | |||
208 | 82 | Fetcher::Fetcher( | |
209 | CacheManager *cache_mgr, | ||
210 | download::DownloadManager *download_mgr, | ||
211 | BackoffThrottle *backoff_throttle, | ||
212 | 82 | perf::StatisticsTemplate statistics) | |
213 | 82 | : lock_queues_download_(NULL) | |
214 | 82 | , lock_tls_blocks_(NULL) | |
215 | 82 | , cache_mgr_(cache_mgr) | |
216 | 82 | , download_mgr_(download_mgr) | |
217 | 82 | , backoff_throttle_(backoff_throttle) | |
218 | { | ||
219 | int retval; | ||
220 | 82 | retval = pthread_key_create(&thread_local_storage_, TLSDestructor); | |
221 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
|
82 | assert(retval == 0); |
222 | 82 | lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>( | |
223 | 82 | smalloc(sizeof(pthread_mutex_t))); | |
224 | 82 | retval = pthread_mutex_init(lock_queues_download_, NULL); | |
225 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
|
82 | assert(retval == 0); |
226 | 82 | lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>( | |
227 | 82 | smalloc(sizeof(pthread_mutex_t))); | |
228 | 82 | retval = pthread_mutex_init(lock_tls_blocks_, NULL); | |
229 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
|
82 | assert(retval == 0); |
230 |
3/6✓ Branch 2 taken 82 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 82 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 82 times.
✗ Branch 10 not taken.
|
82 | n_downloads = statistics.RegisterTemplated("n_downloads", |
231 | "overall number of downloaded files (incl. catalogs, chunks)"); | ||
232 |
3/6✓ Branch 2 taken 82 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 82 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 82 times.
✗ Branch 10 not taken.
|
82 | n_invocations = statistics.RegisterTemplated("n_invocations", |
233 | "overall number of object requests (incl. catalogs, chunks)"); | ||
234 | 82 | } | |
235 | |||
236 | |||
237 | 82 | Fetcher::~Fetcher() { | |
238 | int retval; | ||
239 | |||
240 | { | ||
241 | 82 | MutexLockGuard m(lock_tls_blocks_); | |
242 |
2/2✓ Branch 1 taken 29 times.
✓ Branch 2 taken 82 times.
|
111 | for (unsigned i = 0; i < tls_blocks_.size(); ++i) |
243 | 29 | CleanupTls(tls_blocks_[i]); | |
244 | 82 | } | |
245 | |||
246 | 82 | retval = pthread_mutex_destroy(lock_tls_blocks_); | |
247 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
|
82 | assert(retval == 0); |
248 | 82 | free(lock_tls_blocks_); | |
249 | |||
250 | 82 | retval = pthread_mutex_destroy(lock_queues_download_); | |
251 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
|
82 | assert(retval == 0); |
252 | 82 | free(lock_queues_download_); | |
253 | |||
254 | 82 | retval = pthread_key_delete(thread_local_storage_); | |
255 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
|
82 | assert(retval == 0); |
256 | 82 | } | |
257 | |||
258 | |||
259 | /** | ||
260 | * Depending on the object type, uses either Open() or OpenPinned() from the | ||
261 | * cache manager | ||
262 | */ | ||
263 | 107 | int Fetcher::OpenSelect(const CacheManager::LabeledObject &object) { | |
264 |
5/6✓ Branch 1 taken 28 times.
✓ Branch 2 taken 79 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 28 times.
✓ Branch 6 taken 79 times.
✓ Branch 7 taken 28 times.
|
107 | if (object.label.IsCatalog() || object.label.IsPinned()) { |
265 | 79 | return cache_mgr_->OpenPinned(object); | |
266 | } else { | ||
267 | 28 | return cache_mgr_->Open(object); | |
268 | } | ||
269 | } | ||
270 | |||
271 | |||
272 | 45 | void Fetcher::SignalWaitingThreads( | |
273 | const int fd, | ||
274 | const shash::Any &id, | ||
275 | ThreadLocalStorage *tls) | ||
276 | { | ||
277 | 45 | MutexLockGuard m(lock_queues_download_); | |
278 |
2/2✓ Branch 1 taken 4 times.
✓ Branch 2 taken 45 times.
|
49 | for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) { |
279 |
3/4✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
|
4 | int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd; |
280 |
1/2✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
|
4 | WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int)); |
281 | } | ||
282 | 45 | tls->other_pipes_waiting.clear(); | |
283 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
45 | queues_download_.erase(id); |
284 | 45 | } | |
285 | |||
286 | } // namespace cvmfs | ||
287 |