Directory: | cvmfs/ |
---|---|
File: | cvmfs/fetch.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 17 | 24 | 70.8% |
Branches: | 3 | 14 | 21.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_FETCH_H_ | ||
6 | #define CVMFS_FETCH_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <map> | ||
11 | #include <string> | ||
12 | #include <vector> | ||
13 | |||
14 | #include "cache.h" | ||
15 | #include "crypto/hash.h" | ||
16 | #include "gtest/gtest_prod.h" | ||
17 | #include "network/download.h" | ||
18 | #include "network/sink.h" | ||
19 | |||
20 | class BackoffThrottle; | ||
21 | |||
22 | namespace perf { | ||
23 | class Statistics; | ||
24 | } | ||
25 | |||
26 | namespace cvmfs { | ||
27 | |||
28 | /** | ||
29 | * TransacionSink uses an open transaction in a cache manager as a sink. It | ||
30 | * allows the download manager to write data without knowing about the cache | ||
31 | * manager. | ||
32 | */ | ||
33 | class TransactionSink : public Sink { | ||
34 | public: | ||
35 | 1228 | TransactionSink(CacheManager *cache_mgr, void *open_txn) | |
36 | 1228 | : Sink(false), cache_mgr_(cache_mgr), open_txn_(open_txn) { } | |
37 | 2456 | virtual ~TransactionSink() { } | |
38 | |||
39 | /** | ||
40 | * Appends data to the sink | ||
41 | * | ||
42 | * @returns on success: number of bytes written (can be less than requested) | ||
43 | * on failure: -errno. | ||
44 | */ | ||
45 | 2295 | virtual int64_t Write(const void *buf, uint64_t sz) { | |
46 | 2295 | return cache_mgr_->Write(buf, sz, open_txn_); | |
47 | } | ||
48 | |||
49 | /** | ||
50 | * Truncate all written data and start over at position zero. | ||
51 | * | ||
52 | * @returns Success = 0 | ||
53 | * Failure = -errno | ||
54 | */ | ||
55 | 53 | virtual int Reset() { return cache_mgr_->Reset(open_txn_); } | |
56 | |||
57 | /** | ||
58 | * Purges all resources leaving the sink in an invalid state. | ||
59 | * More aggressive version of Reset(). | ||
60 | * For some sinks it might do the same as Reset(). | ||
61 | * | ||
62 | * @returns Success = 0 | ||
63 | * Failure = -errno | ||
64 | */ | ||
65 | 52 | virtual int Purge() { return Reset(); } | |
66 | /** | ||
67 | * @returns true if the object is correctly initialized. | ||
68 | */ | ||
69 |
2/4✓ Branch 0 taken 1228 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1228 times.
✗ Branch 3 not taken.
|
1228 | virtual bool IsValid() { return cache_mgr_ != NULL && open_txn_ != NULL; } |
70 | |||
71 | 1228 | virtual int Flush() { return 0; } | |
72 | ✗ | virtual bool Reserve(size_t /*size*/) { return true; } | |
73 | 4762 | virtual bool RequiresReserve() { return false; } | |
74 | |||
75 | /** | ||
76 | * Return a string representation describing the type of sink and its status | ||
77 | */ | ||
78 | ✗ | virtual std::string Describe() { | |
79 | ✗ | std::string result = "Transaction sink that is "; | |
80 | ✗ | result += IsValid() ? "valid" : "invalid"; | |
81 | ✗ | return result; | |
82 | } | ||
83 | |||
84 | private: | ||
85 | CacheManager *cache_mgr_; | ||
86 | void *open_txn_; | ||
87 | }; | ||
88 | |||
89 | |||
90 | /** | ||
91 | * The Fetcher uses a cache manager and a download manager in order to provide a | ||
92 | * (virtual) file descriptor to a requested object, which is valid in the | ||
93 | * context of the cache manager. | ||
94 | * If the object is not in the cache, it is downloaded and stored in the cache. | ||
95 | * | ||
96 | * Concurrent download requests for the same id are collapsed. | ||
97 | */ | ||
98 | class Fetcher : SingleCopy { | ||
99 | FRIEND_TEST(T_Fetcher, GetTls); | ||
100 | FRIEND_TEST(T_Fetcher, SignalWaitingThreads); | ||
101 | friend void *TestGetTls(void *data); | ||
102 | friend void *TestFetchCollapse(void *data); | ||
103 | friend void *TestFetchCollapse2(void *data); | ||
104 | friend void TLSDestructor(void *data); | ||
105 | |||
106 | public: | ||
107 | Fetcher(CacheManager *cache_mgr, | ||
108 | download::DownloadManager *download_mgr, | ||
109 | BackoffThrottle *backoff_throttle, | ||
110 | perf::StatisticsTemplate statistics); | ||
111 | ~Fetcher(); | ||
112 | |||
113 | int Fetch(const CacheManager::LabeledObject &object, | ||
114 | const std::string &alt_url = ""); | ||
115 | |||
116 | ✗ | void ReplaceCacheManager(CacheManager *new_cache_mgr) { | |
117 | ✗ | cache_mgr_ = new_cache_mgr; | |
118 | } | ||
119 | 3613 | CacheManager *cache_mgr() { return cache_mgr_; } | |
120 | 733 | download::DownloadManager *download_mgr() { return download_mgr_; } | |
121 | |||
122 | private: | ||
123 | /** | ||
124 | * Multiple threads might want to download the same object at the same time. | ||
125 | * If that happens, only the first thread performs the download. The other | ||
126 | * threads wait on a pipe for a notification from the first thread. | ||
127 | */ | ||
128 | struct ThreadLocalStorage { | ||
129 |
1/2✓ Branch 2 taken 887 times.
✗ Branch 3 not taken.
|
887 | ThreadLocalStorage() { |
130 | 887 | pipe_wait[0] = -1; | |
131 | 887 | pipe_wait[1] = -1; | |
132 | 887 | fetcher = NULL; | |
133 | 887 | } | |
134 | |||
135 | /** | ||
136 | * Used during cleanup to find tls_blocks_. | ||
137 | */ | ||
138 | Fetcher *fetcher; | ||
139 | /** | ||
140 | * Wait on the reading end if another thread is already downloading the same | ||
141 | * object. | ||
142 | */ | ||
143 | int pipe_wait[2]; | ||
144 | /** | ||
145 | * Writer ends of all the pipes of threads that want to download the same | ||
146 | * object. | ||
147 | */ | ||
148 | std::vector<int> other_pipes_waiting; | ||
149 | /** | ||
150 | * It is sufficient to construct the JobInfo object once per thread, not | ||
151 | * on every call to Fetch(). | ||
152 | */ | ||
153 | download::JobInfo download_job; | ||
154 | }; | ||
155 | |||
156 | /** | ||
157 | * Maps currently downloaded chunks to the other_pipes_waiting member of the | ||
158 | * thread local storage of the downloading thread. This way, a thread can | ||
159 | * enqueue itself to such an other_pipes_waiting list and gets informed when | ||
160 | * the download is completed. | ||
161 | */ | ||
162 | typedef std::map<shash::Any, std::vector<int> *> ThreadQueues; | ||
163 | |||
164 | ThreadLocalStorage *GetTls(); | ||
165 | void CleanupTls(ThreadLocalStorage *tls); | ||
166 | void SignalWaitingThreads(const int fd, const shash::Any &id, | ||
167 | ThreadLocalStorage *tls); | ||
168 | int OpenSelect(const CacheManager::LabeledObject &object); | ||
169 | |||
170 | /** | ||
171 | * Key to the thread's ThreadLocalStorage memory | ||
172 | */ | ||
173 | pthread_key_t thread_local_storage_; | ||
174 | |||
175 | ThreadQueues queues_download_; | ||
176 | pthread_mutex_t *lock_queues_download_; | ||
177 | |||
178 | /** | ||
179 | * All the threads register their thread local storage here, so that it can | ||
180 | * be cleaned up properly in the destructor of Fetcher. | ||
181 | */ | ||
182 | std::vector<ThreadLocalStorage *> tls_blocks_; | ||
183 | pthread_mutex_t *lock_tls_blocks_; | ||
184 | |||
185 | CacheManager *cache_mgr_; | ||
186 | download::DownloadManager *download_mgr_; | ||
187 | BackoffThrottle *backoff_throttle_; | ||
188 | perf::Counter *n_downloads; | ||
189 | perf::Counter *n_invocations; | ||
190 | }; | ||
191 | |||
192 | } // namespace cvmfs | ||
193 | |||
194 | #endif // CVMFS_FETCH_H_ | ||
195 |