CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_local.cc
Go to the documentation of this file.
1 
5 #include "upload_local.h"
6 #include "cvmfs_config.h"
7 
8 #include <errno.h>
9 
10 #include <string>
11 
12 #include "compression.h"
13 #include "util/logging.h"
14 #include "util/posix.h"
15 
16 namespace upload {
17 
19  : AbstractUploader(spooler_definition),
20  backend_file_mode_(default_backend_file_mode_ ^ GetUmask()),
21  backend_dir_mode_(default_backend_dir_mode_ ^ GetUmask()),
22  upstream_path_(spooler_definition.spooler_configuration),
23  temporary_path_(spooler_definition.temporary_path)
24 {
25  assert(spooler_definition.IsValid() &&
26  spooler_definition.driver_type == SpoolerDefinition::Local);
27 
28  atomic_init32(&copy_errors_);
29 }
30 
31 bool LocalUploader::WillHandle(const SpoolerDefinition &spooler_definition) {
32  return spooler_definition.driver_type == SpoolerDefinition::Local;
33 }
34 
35 unsigned int LocalUploader::GetNumberOfErrors() const {
36  return atomic_read32(&copy_errors_);
37 }
38 
41  MkdirDeep(upstream_path_ + "/stats", backend_dir_mode_, false);
42 }
43 
44 void LocalUploader::DoUpload(const std::string &remote_path,
45  IngestionSource *source,
46  const CallbackTN *callback) {
47  LogCvmfs(kLogSpooler, kLogVerboseMsg, "FileUpload call started.");
48 
49  // create destination in backend storage temporary directory
50  std::string tmp_path;
51  FILE *ftmp = CreateTempFile(temporary_path_ + "/upload", 0666, "w",
52  &tmp_path);
53  if (ftmp == NULL) {
55  "failed to create temp path for "
56  "upload of file '%s' (errno: %d)",
57  source->GetPath().c_str(), errno);
58  atomic_inc32(&copy_errors_);
59  Respond(callback, UploaderResults(1, source->GetPath()));
60  return;
61  }
62 
63  // copy file into controlled temporary directory location
64  bool rvb = source->Open();
65  if (!rvb) {
66  fclose(ftmp);
67  unlink(tmp_path.c_str());
68  atomic_inc32(&copy_errors_);
69  Respond(callback, UploaderResults(100, source->GetPath()));
70  return;
71  }
72  unsigned char buffer[kPageSize];
73  ssize_t rbytes;
74  do {
75  rbytes = source->Read(buffer, kPageSize);
76  size_t wbytes = 0;
77  if (rbytes > 0) {
78  wbytes = fwrite(buffer, 1, rbytes, ftmp);
79  }
80  if ((rbytes < 0) || (static_cast<size_t>(rbytes) != wbytes)) {
81  source->Close();
82  fclose(ftmp);
83  unlink(tmp_path.c_str());
84  atomic_inc32(&copy_errors_);
85  Respond(callback, UploaderResults(100, source->GetPath()));
86  return;
87  }
88  } while (rbytes == kPageSize);
89  source->Close();
90  fclose(ftmp);
91 
92  // move the file in place (atomic operation)
93  int rvi = Move(tmp_path, remote_path);
94  if (rvi != 0) {
96  "failed to move file '%s' from the "
97  "staging area to the final location: "
98  "'%s'",
99  tmp_path.c_str(), remote_path.c_str());
100  unlink(tmp_path.c_str());
101  atomic_inc32(&copy_errors_);
102  Respond(callback, UploaderResults(rvi, source->GetPath()));
103  return;
104  }
105 
106  Respond(callback, UploaderResults(rvi, source->GetPath()));
107 }
108 
110  const CallbackTN *callback) {
111  std::string tmp_path;
112  const int tmp_fd = CreateAndOpenTemporaryChunkFile(&tmp_path);
113  if (tmp_fd < 0) {
114  atomic_inc32(&copy_errors_);
115  return NULL;
116  }
117 
118  return new LocalStreamHandle(callback, tmp_fd, tmp_path);
119 }
120 
122  UploadBuffer buffer,
123  const CallbackTN *callback) {
124  LocalStreamHandle *local_handle = static_cast<LocalStreamHandle *>(handle);
125 
126  const size_t bytes_written =
127  write(local_handle->file_descriptor, buffer.data, buffer.size);
128  if (bytes_written != buffer.size) {
129  const int cpy_errno = errno;
131  "failed to write %lu bytes to '%s' "
132  "(errno: %d)",
133  buffer.size, local_handle->temporary_path.c_str(),
134  cpy_errno);
135  atomic_inc32(&copy_errors_);
136  Respond(callback,
138  return;
139  }
140 
142 }
143 
145  const shash::Any &content_hash) {
146  int retval = 0;
147  LocalStreamHandle *local_handle = static_cast<LocalStreamHandle *>(handle);
148 
149  retval = close(local_handle->file_descriptor);
150  if (retval != 0) {
151  const int cpy_errno = errno;
153  "failed to close temp file '%s' "
154  "(errno: %d)",
155  local_handle->temporary_path.c_str(), cpy_errno);
156  atomic_inc32(&copy_errors_);
157  Respond(handle->commit_callback,
159  return;
160  }
161 
162  std::string final_path;
163  if (local_handle->remote_path != "") {
164  final_path = local_handle->remote_path;
165  } else {
166  final_path = "data/" + content_hash.MakePath();
167  }
168  if (!Peek(final_path)) {
169  retval = Move(local_handle->temporary_path, final_path);
170  if (retval != 0) {
171  const int cpy_errno = errno;
173  "failed to move temp file '%s' to "
174  "final location '%s' (errno: %d)",
175  local_handle->temporary_path.c_str(), final_path.c_str(),
176  cpy_errno);
177  atomic_inc32(&copy_errors_);
178  Respond(handle->commit_callback,
180  return;
181  }
182  if (!content_hash.HasSuffix()
183  || content_hash.suffix == shash::kSuffixPartial) {
185  CountUploadedBytes(GetFileSize(upstream_path_ + "/" + final_path));
186  } else if (content_hash.suffix == shash::kSuffixCatalog) {
189  }
190  } else {
191  const int retval = unlink(local_handle->temporary_path.c_str());
192  if (retval != 0) {
194  "failed to remove temporary file '%s' (errno: %d)",
195  local_handle->temporary_path.c_str(), errno);
196  }
197  CountDuplicates();
198  }
199 
200  const CallbackTN *callback = handle->commit_callback;
201  delete local_handle;
202 
204 }
205 
210 void LocalUploader::DoRemoveAsync(const std::string &file_to_delete) {
211  const int retval = unlink((upstream_path_ + "/" + file_to_delete).c_str());
212  if ((retval != 0) && (errno != ENOENT))
213  atomic_inc32(&copy_errors_);
214  Respond(NULL, UploaderResults());
215 }
216 
217 bool LocalUploader::Peek(const std::string &path) {
218  bool retval = FileExists(upstream_path_ + "/" + path);
219  return retval;
220 }
221 
222 bool LocalUploader::Mkdir(const std::string &path) {
223  return MkdirDeep(upstream_path_ + "/" + path, backend_dir_mode_, false);
224 }
225 
227  const std::string src = "data/" + object.MakePath();
228  const std::string dest = upstream_path_ + "/" + object.MakeAlternativePath();
229  return SymlinkForced(src, dest);
230 }
231 
232 int LocalUploader::Move(const std::string &local_path,
233  const std::string &remote_path) const {
234  const std::string destination_path = upstream_path_ + "/" + remote_path;
235 
236  // make sure the file has the right permissions
237  int retval = chmod(local_path.c_str(), backend_file_mode_);
238  int retcode = (retval == 0) ? 0 : 101;
239  if (retcode != 0) {
241  "failed to set file permission '%s' "
242  "errno: %d",
243  local_path.c_str(), errno);
244  return retcode;
245  }
246 
247  // move the file in place
248  retval = rename(local_path.c_str(), destination_path.c_str());
249  retcode = (retval == 0) ? 0 : errno;
250  if (retcode != 0) {
252  "failed to move file '%s' to '%s' "
253  "errno: %d",
254  local_path.c_str(), remote_path.c_str(), errno);
255  }
256 
257  return retcode;
258 }
259 
260 int64_t LocalUploader::DoGetObjectSize(const std::string &file_name) {
261  return GetFileSize(upstream_path_ + "/" + file_name);
262 }
263 
264 } // namespace upload
bool MakeCacheDirectories(const std::string &path, const mode_t mode)
Definition: posix.cc:882
const std::string upstream_path_
Definition: upload_local.h:89
mode_t GetUmask()
Definition: posix.cc:1390
bool SymlinkForced(const std::string &src, const std::string &dest)
Definition: posix.cc:833
virtual bool Create()
Definition: upload_local.cc:39
DriverType driver_type
the type of the spooler driver
int CreateAndOpenTemporaryChunkFile(std::string *path) const
LocalUploader(const SpoolerDefinition &spooler_definition)
Definition: upload_local.cc:18
UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1005
void Respond(const CallbackTN *callback, const UploaderResults &result) const
bool Peek(const std::string &path)
const std::string temporary_path_
Definition: upload_local.h:90
bool HasSuffix() const
Definition: hash.h:239
virtual bool Close()=0
assert((mem||(size==0))&&"Out Of Memory")
void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback=NULL)
void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
void CountUploadedChunks() const
bool FileExists(const std::string &path)
Definition: posix.cc:791
const CallbackTN * commit_callback
unsigned int GetNumberOfErrors() const
Definition: upload_local.cc:35
virtual bool Open()=0
int Move(const std::string &local_path, const std::string &remote_path) const
bool Mkdir(const std::string &path)
const char kSuffixPartial
Definition: hash.h:57
const char kSuffixCatalog
Definition: hash.h:54
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:846
void CountUploadedBytes(int64_t bytes_written) const
const std::string temporary_path
Definition: upload_local.h:27
const mode_t backend_file_mode_
Definition: upload_local.h:40
int64_t DoGetObjectSize(const std::string &file_name)
virtual std::string GetPath() const =0
bool PlaceBootstrappingShortcut(const shash::Any &object)
void CountUploadedCatalogBytes(int64_t bytes_written) const
void DoRemoveAsync(const std::string &file_to_delete)
const unsigned kPageSize
Definition: posix.h:30
void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
Definition: upload_local.cc:44
const mode_t backend_dir_mode_
Definition: upload_local.h:41
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:801
const int kLogVerboseMsg
atomic_int32 copy_errors_
Definition: upload_local.h:91
Suffix suffix
Definition: hash.h:126
std::string MakePath() const
Definition: hash.h:316
static bool WillHandle(const SpoolerDefinition &spooler_definition)
Definition: upload_local.cc:31
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528