1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#include "upload_local.h" |
6 |
|
|
#include "cvmfs_config.h" |
7 |
|
|
|
8 |
|
|
#include <errno.h> |
9 |
|
|
|
10 |
|
|
#include <string> |
11 |
|
|
|
12 |
|
|
#include "compression.h" |
13 |
|
|
#include "logging.h" |
14 |
|
|
#include "util/posix.h" |
15 |
|
|
|
16 |
|
|
namespace upload { |
17 |
|
|
|
18 |
|
32 |
LocalUploader::LocalUploader(const SpoolerDefinition &spooler_definition) |
19 |
|
|
: AbstractUploader(spooler_definition), |
20 |
|
|
backend_file_mode_(default_backend_file_mode_ ^ GetUmask()), |
21 |
|
|
upstream_path_(spooler_definition.spooler_configuration), |
22 |
|
32 |
temporary_path_(spooler_definition.temporary_path) |
23 |
|
|
{ |
24 |
|
|
assert(spooler_definition.IsValid() && |
25 |
✓✗✗✓
|
32 |
spooler_definition.driver_type == SpoolerDefinition::Local); |
26 |
|
|
|
27 |
|
32 |
atomic_init32(©_errors_); |
28 |
|
32 |
} |
29 |
|
|
|
30 |
|
137 |
bool LocalUploader::WillHandle(const SpoolerDefinition &spooler_definition) { |
31 |
|
137 |
return spooler_definition.driver_type == SpoolerDefinition::Local; |
32 |
|
|
} |
33 |
|
|
|
34 |
|
49 |
unsigned int LocalUploader::GetNumberOfErrors() const { |
35 |
|
49 |
return atomic_read32(©_errors_); |
36 |
|
|
} |
37 |
|
|
|
38 |
|
526 |
void LocalUploader::FileUpload(const std::string &local_path, |
39 |
|
|
const std::string &remote_path, |
40 |
|
|
const CallbackTN *callback) { |
41 |
|
526 |
LogCvmfs(kLogSpooler, kLogVerboseMsg, "FileUpload call started."); |
42 |
|
|
|
43 |
|
|
// create destination in backend storage temporary directory |
44 |
|
526 |
std::string tmp_path = CreateTempPath(temporary_path_ + "/upload", 0666); |
45 |
✗✓ |
526 |
if (tmp_path.empty()) { |
46 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
47 |
|
|
"failed to create temp path for " |
48 |
|
|
"upload of file '%s' (errno: %d)", |
49 |
|
|
local_path.c_str(), errno); |
50 |
|
|
atomic_inc32(©_errors_); |
51 |
|
|
Respond(callback, UploaderResults(1, local_path)); |
52 |
|
|
return; |
53 |
|
|
} |
54 |
|
|
|
55 |
|
|
// copy file into controlled temporary directory location |
56 |
|
526 |
int retval = CopyPath2Path(local_path, tmp_path); |
57 |
✓✗ |
526 |
int retcode = retval ? 0 : 100; |
58 |
✗✓ |
526 |
if (retcode != 0) { |
59 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
60 |
|
|
"failed to copy file '%s' to staging " |
61 |
|
|
"area: '%s'", |
62 |
|
|
local_path.c_str(), tmp_path.c_str()); |
63 |
|
|
atomic_inc32(©_errors_); |
64 |
|
|
Respond(callback, UploaderResults(retcode, local_path)); |
65 |
|
|
return; |
66 |
|
|
} |
67 |
|
|
|
68 |
|
|
// move the file in place (atomic operation) |
69 |
|
526 |
retcode = Move(tmp_path, remote_path); |
70 |
✗✓ |
526 |
if (retcode != 0) { |
71 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
72 |
|
|
"failed to move file '%s' from the " |
73 |
|
|
"staging area to the final location: " |
74 |
|
|
"'%s'", |
75 |
|
|
tmp_path.c_str(), remote_path.c_str()); |
76 |
|
|
atomic_inc32(©_errors_); |
77 |
|
|
Respond(callback, UploaderResults(retcode, local_path)); |
78 |
|
|
return; |
79 |
|
|
} |
80 |
|
|
|
81 |
|
526 |
CountUploadedBytes(GetFileSize(remote_path)); |
82 |
|
526 |
Respond(callback, UploaderResults(retcode, local_path)); |
83 |
|
|
} |
84 |
|
|
|
85 |
|
116 |
UploadStreamHandle *LocalUploader::InitStreamedUpload( |
86 |
|
|
const CallbackTN *callback) { |
87 |
|
116 |
std::string tmp_path; |
88 |
|
116 |
const int tmp_fd = CreateAndOpenTemporaryChunkFile(&tmp_path); |
89 |
✗✓ |
116 |
if (tmp_fd < 0) { |
90 |
|
|
atomic_inc32(©_errors_); |
91 |
|
|
return NULL; |
92 |
|
|
} |
93 |
|
|
|
94 |
|
116 |
return new LocalStreamHandle(callback, tmp_fd, tmp_path); |
95 |
|
|
} |
96 |
|
|
|
97 |
|
817 |
void LocalUploader::StreamedUpload(UploadStreamHandle *handle, |
98 |
|
|
UploadBuffer buffer, |
99 |
|
|
const CallbackTN *callback) { |
100 |
|
817 |
LocalStreamHandle *local_handle = static_cast<LocalStreamHandle *>(handle); |
101 |
|
|
|
102 |
|
|
const size_t bytes_written = |
103 |
|
817 |
write(local_handle->file_descriptor, buffer.data, buffer.size); |
104 |
✗✓ |
817 |
if (bytes_written != buffer.size) { |
105 |
|
|
const int cpy_errno = errno; |
106 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
107 |
|
|
"failed to write %d bytes to '%s' " |
108 |
|
|
"(errno: %d)", |
109 |
|
|
buffer.size, local_handle->temporary_path.c_str(), |
110 |
|
|
cpy_errno); |
111 |
|
|
atomic_inc32(©_errors_); |
112 |
|
|
Respond(callback, |
113 |
|
|
UploaderResults(UploaderResults::kBufferUpload, cpy_errno)); |
114 |
|
|
return; |
115 |
|
|
} |
116 |
|
|
|
117 |
|
817 |
Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0)); |
118 |
|
|
} |
119 |
|
|
|
120 |
|
116 |
void LocalUploader::FinalizeStreamedUpload(UploadStreamHandle *handle, |
121 |
|
|
const shash::Any &content_hash) { |
122 |
|
116 |
int retval = 0; |
123 |
|
116 |
LocalStreamHandle *local_handle = static_cast<LocalStreamHandle *>(handle); |
124 |
|
|
|
125 |
|
116 |
retval = close(local_handle->file_descriptor); |
126 |
✗✓ |
116 |
if (retval != 0) { |
127 |
|
|
const int cpy_errno = errno; |
128 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
129 |
|
|
"failed to close temp file '%s' " |
130 |
|
|
"(errno: %d)", |
131 |
|
|
local_handle->temporary_path.c_str(), cpy_errno); |
132 |
|
|
atomic_inc32(©_errors_); |
133 |
|
|
Respond(handle->commit_callback, |
134 |
|
|
UploaderResults(UploaderResults::kChunkCommit, cpy_errno)); |
135 |
|
|
return; |
136 |
|
|
} |
137 |
|
|
|
138 |
|
116 |
const std::string final_path = "data/" + content_hash.MakePath(); |
139 |
✓✗ |
116 |
if (!Peek(final_path)) { |
140 |
|
116 |
retval = Move(local_handle->temporary_path, final_path); |
141 |
✗✓ |
116 |
if (retval != 0) { |
142 |
|
|
const int cpy_errno = errno; |
143 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
144 |
|
|
"failed to move temp file '%s' to " |
145 |
|
|
"final location '%s' (errno: %d)", |
146 |
|
|
local_handle->temporary_path.c_str(), final_path.c_str(), |
147 |
|
|
cpy_errno); |
148 |
|
|
atomic_inc32(©_errors_); |
149 |
|
|
Respond(handle->commit_callback, |
150 |
|
|
UploaderResults(UploaderResults::kChunkCommit, cpy_errno)); |
151 |
|
|
return; |
152 |
|
|
} |
153 |
✓✓✗✓ ✓✓ |
116 |
if (!content_hash.HasSuffix() |
154 |
|
|
|| content_hash.suffix == shash::kSuffixPartial) { |
155 |
|
100 |
CountUploadedBytes(GetFileSize(upstream_path_ + "/" + final_path)); |
156 |
|
|
} |
157 |
|
|
} else { |
158 |
|
|
const int retval = unlink(local_handle->temporary_path.c_str()); |
159 |
|
|
if (retval != 0) { |
160 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
161 |
|
|
"failed to remove temporary file '%s' (errno: %d)", |
162 |
|
|
local_handle->temporary_path.c_str(), errno); |
163 |
|
|
} |
164 |
|
|
} |
165 |
|
|
|
166 |
|
116 |
const CallbackTN *callback = handle->commit_callback; |
167 |
✓✗ |
116 |
delete local_handle; |
168 |
|
|
|
169 |
|
116 |
Respond(callback, UploaderResults(UploaderResults::kChunkCommit, 0)); |
170 |
|
|
} |
171 |
|
|
|
172 |
|
|
/** |
173 |
|
|
* TODO(jblomer): investigate if parallelism increases the GC speed on local |
174 |
|
|
* disks. |
175 |
|
|
*/ |
176 |
|
1 |
void LocalUploader::DoRemoveAsync(const std::string &file_to_delete) { |
177 |
|
1 |
const int retval = unlink((upstream_path_ + "/" + file_to_delete).c_str()); |
178 |
✗✓✗✗
|
1 |
if ((retval != 0) && (errno != ENOENT)) |
179 |
|
|
atomic_inc32(©_errors_); |
180 |
|
1 |
Respond(NULL, UploaderResults()); |
181 |
|
1 |
} |
182 |
|
|
|
183 |
|
120 |
bool LocalUploader::Peek(const std::string &path) const { |
184 |
|
120 |
bool retval = FileExists(upstream_path_ + "/" + path); |
185 |
✓✓ |
120 |
if (retval) { |
186 |
|
2 |
CountDuplicates(); |
187 |
|
|
} |
188 |
|
120 |
return retval; |
189 |
|
|
} |
190 |
|
|
|
191 |
|
1 |
bool LocalUploader::PlaceBootstrappingShortcut(const shash::Any &object) const { |
192 |
|
1 |
const std::string src = "data/" + object.MakePath(); |
193 |
|
1 |
const std::string dest = upstream_path_ + "/" + object.MakeAlternativePath(); |
194 |
|
1 |
return SymlinkForced(src, dest); |
195 |
|
|
} |
196 |
|
|
|
197 |
|
642 |
int LocalUploader::Move(const std::string &local_path, |
198 |
|
|
const std::string &remote_path) const { |
199 |
|
642 |
const std::string destination_path = upstream_path_ + "/" + remote_path; |
200 |
|
|
|
201 |
|
|
// make sure the file has the right permissions |
202 |
|
642 |
int retval = chmod(local_path.c_str(), backend_file_mode_); |
203 |
✓✗ |
642 |
int retcode = (retval == 0) ? 0 : 101; |
204 |
✗✓ |
642 |
if (retcode != 0) { |
205 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
206 |
|
|
"failed to set file permission '%s' " |
207 |
|
|
"errno: %d", |
208 |
|
|
local_path.c_str(), errno); |
209 |
|
|
return retcode; |
210 |
|
|
} |
211 |
|
|
|
212 |
|
|
// move the file in place |
213 |
|
642 |
retval = rename(local_path.c_str(), destination_path.c_str()); |
214 |
✗✓ |
642 |
retcode = (retval == 0) ? 0 : errno; |
215 |
✗✓ |
642 |
if (retcode != 0) { |
216 |
|
|
LogCvmfs(kLogSpooler, kLogVerboseMsg, |
217 |
|
|
"failed to move file '%s' to '%s' " |
218 |
|
|
"errno: %d", |
219 |
|
|
local_path.c_str(), remote_path.c_str(), errno); |
220 |
|
|
} |
221 |
|
|
|
222 |
|
642 |
return retcode; |
223 |
|
|
} |
224 |
|
|
|
225 |
|
|
int64_t LocalUploader::DoGetObjectSize(const std::string &file_name) { |
226 |
|
|
return GetFileSize(upstream_path_ + "/" + file_name); |
227 |
|
|
} |
228 |
|
|
|
229 |
|
|
} // namespace upload |