Directory: | cvmfs/ |
---|---|
File: | cvmfs/fetch.h |
Date: | 2025-02-02 02:34:22 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 22 | 29 | 75.9% |
Branches: | 3 | 14 | 21.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_FETCH_H_ | ||
6 | #define CVMFS_FETCH_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <map> | ||
11 | #include <string> | ||
12 | #include <vector> | ||
13 | |||
14 | #include "cache.h" | ||
15 | #include "crypto/hash.h" | ||
16 | #include "gtest/gtest_prod.h" | ||
17 | #include "network/download.h" | ||
18 | #include "network/sink.h" | ||
19 | |||
20 | class BackoffThrottle; | ||
21 | |||
22 | namespace perf { | ||
23 | class Statistics; | ||
24 | } | ||
25 | |||
26 | namespace cvmfs { | ||
27 | |||
28 | /** | ||
29 | * TransacionSink uses an open transaction in a cache manager as a sink. It | ||
30 | * allows the download manager to write data without knowing about the cache | ||
31 | * manager. | ||
32 | */ | ||
33 | class TransactionSink : public Sink { | ||
34 | public: | ||
35 | 41 | TransactionSink(CacheManager *cache_mgr, void *open_txn) | |
36 | 41 | : Sink(false), | |
37 | 41 | cache_mgr_(cache_mgr), | |
38 | 41 | open_txn_(open_txn) { } | |
39 | 82 | virtual ~TransactionSink() { } | |
40 | |||
41 | /** | ||
42 | * Appends data to the sink | ||
43 | * | ||
44 | * @returns on success: number of bytes written (can be less than requested) | ||
45 | * on failure: -errno. | ||
46 | */ | ||
47 | 63 | virtual int64_t Write(const void *buf, uint64_t sz) { | |
48 | 63 | return cache_mgr_->Write(buf, sz, open_txn_); | |
49 | } | ||
50 | |||
51 | /** | ||
52 | * Truncate all written data and start over at position zero. | ||
53 | * | ||
54 | * @returns Success = 0 | ||
55 | * Failure = -errno | ||
56 | */ | ||
57 | 6 | virtual int Reset() { | |
58 | 6 | return cache_mgr_->Reset(open_txn_); | |
59 | } | ||
60 | |||
61 | /** | ||
62 | * Purges all resources leaving the sink in an invalid state. | ||
63 | * More aggressive version of Reset(). | ||
64 | * For some sinks it might do the same as Reset(). | ||
65 | * | ||
66 | * @returns Success = 0 | ||
67 | * Failure = -errno | ||
68 | */ | ||
69 | 5 | virtual int Purge() { | |
70 | 5 | return Reset(); | |
71 | } | ||
72 | /** | ||
73 | * @returns true if the object is correctly initialized. | ||
74 | */ | ||
75 | 41 | virtual bool IsValid() { | |
76 |
2/4✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
|
41 | return cache_mgr_ != NULL && open_txn_ != NULL; |
77 | } | ||
78 | |||
79 | 41 | virtual int Flush() { return 0; } | |
80 | ✗ | virtual bool Reserve(size_t /*size*/) { return true; } | |
81 | 155 | virtual bool RequiresReserve() { return false; } | |
82 | |||
83 | /** | ||
84 | * Return a string representation describing the type of sink and its status | ||
85 | */ | ||
86 | ✗ | virtual std::string Describe() { | |
87 | ✗ | std::string result = "Transaction sink that is "; | |
88 | ✗ | result += IsValid() ? "valid" : "invalid"; | |
89 | ✗ | return result; | |
90 | } | ||
91 | |||
92 | private: | ||
93 | CacheManager *cache_mgr_; | ||
94 | void *open_txn_; | ||
95 | }; | ||
96 | |||
97 | |||
98 | /** | ||
99 | * The Fetcher uses a cache manager and a download manager in order to provide a | ||
100 | * (virtual) file descriptor to a requested object, which is valid in the | ||
101 | * context of the cache manager. | ||
102 | * If the object is not in the cache, it is downloaded and stored in the cache. | ||
103 | * | ||
104 | * Concurrent download requests for the same id are collapsed. | ||
105 | */ | ||
106 | class Fetcher : SingleCopy { | ||
107 | FRIEND_TEST(T_Fetcher, GetTls); | ||
108 | FRIEND_TEST(T_Fetcher, SignalWaitingThreads); | ||
109 | friend void *TestGetTls(void *data); | ||
110 | friend void *TestFetchCollapse(void *data); | ||
111 | friend void *TestFetchCollapse2(void *data); | ||
112 | friend void TLSDestructor(void *data); | ||
113 | |||
114 | public: | ||
115 | Fetcher(CacheManager *cache_mgr, | ||
116 | download::DownloadManager *download_mgr, | ||
117 | BackoffThrottle *backoff_throttle, | ||
118 | perf::StatisticsTemplate statistics); | ||
119 | ~Fetcher(); | ||
120 | |||
121 | int Fetch(const CacheManager::LabeledObject &object, | ||
122 | const std::string &alt_url = ""); | ||
123 | |||
124 | ✗ | void ReplaceCacheManager(CacheManager *new_cache_mgr) { | |
125 | ✗ | cache_mgr_ = new_cache_mgr; | |
126 | } | ||
127 | 92 | CacheManager *cache_mgr() { return cache_mgr_; } | |
128 | 20 | download::DownloadManager *download_mgr() { return download_mgr_; } | |
129 | |||
130 | private: | ||
131 | /** | ||
132 | * Multiple threads might want to download the same object at the same time. | ||
133 | * If that happens, only the first thread performs the download. The other | ||
134 | * threads wait on a pipe for a notification from the first thread. | ||
135 | */ | ||
136 | struct ThreadLocalStorage { | ||
137 |
1/2✓ Branch 2 taken 31 times.
✗ Branch 3 not taken.
|
31 | ThreadLocalStorage() { |
138 | 31 | pipe_wait[0] = -1; | |
139 | 31 | pipe_wait[1] = -1; | |
140 | 31 | fetcher = NULL; | |
141 | 31 | } | |
142 | |||
143 | /** | ||
144 | * Used during cleanup to find tls_blocks_. | ||
145 | */ | ||
146 | Fetcher *fetcher; | ||
147 | /** | ||
148 | * Wait on the reading end if another thread is already downloading the same | ||
149 | * object. | ||
150 | */ | ||
151 | int pipe_wait[2]; | ||
152 | /** | ||
153 | * Writer ends of all the pipes of threads that want to download the same | ||
154 | * object. | ||
155 | */ | ||
156 | std::vector<int> other_pipes_waiting; | ||
157 | /** | ||
158 | * It is sufficient to construct the JobInfo object once per thread, not | ||
159 | * on every call to Fetch(). | ||
160 | */ | ||
161 | download::JobInfo download_job; | ||
162 | }; | ||
163 | |||
164 | /** | ||
165 | * Maps currently downloaded chunks to the other_pipes_waiting member of the | ||
166 | * thread local storage of the downloading thread. This way, a thread can | ||
167 | * enqueue itself to such an other_pipes_waiting list and gets informed when | ||
168 | * the download is completed. | ||
169 | */ | ||
170 | typedef std::map< shash::Any, std::vector<int> * > ThreadQueues; | ||
171 | |||
172 | ThreadLocalStorage *GetTls(); | ||
173 | void CleanupTls(ThreadLocalStorage *tls); | ||
174 | void SignalWaitingThreads(const int fd, const shash::Any &id, | ||
175 | ThreadLocalStorage *tls); | ||
176 | int OpenSelect(const CacheManager::LabeledObject &object); | ||
177 | |||
178 | /** | ||
179 | * Key to the thread's ThreadLocalStorage memory | ||
180 | */ | ||
181 | pthread_key_t thread_local_storage_; | ||
182 | |||
183 | ThreadQueues queues_download_; | ||
184 | pthread_mutex_t *lock_queues_download_; | ||
185 | |||
186 | /** | ||
187 | * All the threads register their thread local storage here, so that it can | ||
188 | * be cleaned up properly in the destructor of Fetcher. | ||
189 | */ | ||
190 | std::vector<ThreadLocalStorage *> tls_blocks_; | ||
191 | pthread_mutex_t *lock_tls_blocks_; | ||
192 | |||
193 | CacheManager *cache_mgr_; | ||
194 | download::DownloadManager *download_mgr_; | ||
195 | BackoffThrottle *backoff_throttle_; | ||
196 | perf::Counter *n_downloads; | ||
197 | perf::Counter *n_invocations; | ||
198 | }; | ||
199 | |||
200 | } // namespace cvmfs | ||
201 | |||
202 | #endif // CVMFS_FETCH_H_ | ||
203 |