9 #include <sys/socket.h>
27 "__CVMFS_CACHE_EXTERNAL_PIPE_READY__";
34 assert(msg_typed_ != NULL);
46 assert(msg_rpc_.IsInitialized());
47 if (msg_typed_ == NULL)
54 : owns_msg_typed_(false)
59 , is_msg_out_of_band_(false)
64 : owns_msg_typed_(false)
69 , is_msg_out_of_band_(false)
79 assert(msg_rpc_.IsInitialized());
80 if (msg_typed_ == NULL)
82 return is_msg_out_of_band_;
87 msg_rpc_.CheckTypeAndMergeFrom(other.
msg_rpc_);
88 owns_msg_typed_ =
true;
98 bool retval = msg_rpc_.ParseFromArray(buffer, size);
103 owns_msg_typed_ =
true;
112 msg_rpc_.release_msg_refcount_req();
113 msg_rpc_.release_msg_refcount_reply();
114 msg_rpc_.release_msg_read_req();
115 msg_rpc_.release_msg_read_reply();
116 msg_rpc_.release_msg_object_info_req();
117 msg_rpc_.release_msg_object_info_reply();
118 msg_rpc_.release_msg_store_req();
119 msg_rpc_.release_msg_store_abort_req();
120 msg_rpc_.release_msg_store_reply();
121 msg_rpc_.release_msg_handshake();
122 msg_rpc_.release_msg_handshake_ack();
123 msg_rpc_.release_msg_quit();
124 msg_rpc_.release_msg_ioctl();
125 msg_rpc_.release_msg_info_req();
126 msg_rpc_.release_msg_info_reply();
127 msg_rpc_.release_msg_shrink_req();
128 msg_rpc_.release_msg_shrink_reply();
129 msg_rpc_.release_msg_list_req();
130 msg_rpc_.release_msg_list_reply();
131 msg_rpc_.release_msg_detach();
132 msg_rpc_.release_msg_breadcrumb_store_req();
133 msg_rpc_.release_msg_breadcrumb_load_req();
134 msg_rpc_.release_msg_breadcrumb_reply();
140 att_size_ = original_att_size;
142 is_msg_out_of_band_ =
false;
145 owns_msg_typed_ =
false;
150 if (msg_typed_->GetTypeName() ==
"cvmfs.MsgHandshake") {
151 msg_rpc_.set_allocated_msg_handshake(
152 reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed_));
153 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgHandshakeAck") {
154 msg_rpc_.set_allocated_msg_handshake_ack(
155 reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed_));
156 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgQuit") {
157 msg_rpc_.set_allocated_msg_quit(
158 reinterpret_cast<cvmfs::MsgQuit *>(msg_typed_));
159 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgIoctl") {
160 msg_rpc_.set_allocated_msg_ioctl(
161 reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed_));
162 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgRefcountReq") {
163 msg_rpc_.set_allocated_msg_refcount_req(
164 reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed_));
165 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgRefcountReply") {
166 msg_rpc_.set_allocated_msg_refcount_reply(
167 reinterpret_cast<cvmfs::MsgRefcountReply *>(msg_typed_));
168 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgObjectInfoReq") {
169 msg_rpc_.set_allocated_msg_object_info_req(
170 reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed_));
171 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgObjectInfoReply") {
172 msg_rpc_.set_allocated_msg_object_info_reply(
173 reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg_typed_));
174 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgReadReq") {
175 msg_rpc_.set_allocated_msg_read_req(
176 reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed_));
177 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgReadReply") {
178 msg_rpc_.set_allocated_msg_read_reply(
179 reinterpret_cast<cvmfs::MsgReadReply *>(msg_typed_));
180 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgStoreReq") {
181 msg_rpc_.set_allocated_msg_store_req(
182 reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed_));
183 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgStoreAbortReq") {
184 msg_rpc_.set_allocated_msg_store_abort_req(
185 reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed_));
186 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgStoreReply") {
187 msg_rpc_.set_allocated_msg_store_reply(
188 reinterpret_cast<cvmfs::MsgStoreReply *>(msg_typed_));
189 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgInfoReq") {
190 msg_rpc_.set_allocated_msg_info_req(
191 reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed_));
192 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgInfoReply") {
193 msg_rpc_.set_allocated_msg_info_reply(
194 reinterpret_cast<cvmfs::MsgInfoReply *>(msg_typed_));
195 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgShrinkReq") {
196 msg_rpc_.set_allocated_msg_shrink_req(
197 reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed_));
198 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgShrinkReply") {
199 msg_rpc_.set_allocated_msg_shrink_reply(
200 reinterpret_cast<cvmfs::MsgShrinkReply *>(msg_typed_));
201 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgListReq") {
202 msg_rpc_.set_allocated_msg_list_req(
203 reinterpret_cast<cvmfs::MsgListReq *>(msg_typed_));
204 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgListReply") {
205 msg_rpc_.set_allocated_msg_list_reply(
206 reinterpret_cast<cvmfs::MsgListReply *>(msg_typed_));
207 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgBreadcrumbStoreReq") {
208 msg_rpc_.set_allocated_msg_breadcrumb_store_req(
209 reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed_));
210 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgBreadcrumbLoadReq") {
211 msg_rpc_.set_allocated_msg_breadcrumb_load_req(
212 reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed_));
213 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgBreadcrumbReply") {
214 msg_rpc_.set_allocated_msg_breadcrumb_reply(
215 reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(msg_typed_));
216 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgDetach") {
217 msg_rpc_.set_allocated_msg_detach(
218 reinterpret_cast<cvmfs::MsgDetach *>(msg_typed_));
219 is_msg_out_of_band_ =
true;
229 if (msg_rpc_.has_msg_handshake()) {
230 msg_typed_ = msg_rpc_.mutable_msg_handshake();
231 }
else if (msg_rpc_.has_msg_handshake_ack()) {
232 msg_typed_ = msg_rpc_.mutable_msg_handshake_ack();
233 }
else if (msg_rpc_.has_msg_quit()) {
234 msg_typed_ = msg_rpc_.mutable_msg_quit();
235 }
else if (msg_rpc_.has_msg_ioctl()) {
236 msg_typed_ = msg_rpc_.mutable_msg_ioctl();
237 }
else if (msg_rpc_.has_msg_refcount_req()) {
238 msg_typed_ = msg_rpc_.mutable_msg_refcount_req();
239 }
else if (msg_rpc_.has_msg_refcount_reply()) {
240 msg_typed_ = msg_rpc_.mutable_msg_refcount_reply();
241 }
else if (msg_rpc_.has_msg_object_info_req()) {
242 msg_typed_ = msg_rpc_.mutable_msg_object_info_req();
243 }
else if (msg_rpc_.has_msg_object_info_reply()) {
244 msg_typed_ = msg_rpc_.mutable_msg_object_info_reply();
245 }
else if (msg_rpc_.has_msg_read_req()) {
246 msg_typed_ = msg_rpc_.mutable_msg_read_req();
247 }
else if (msg_rpc_.has_msg_read_reply()) {
248 msg_typed_ = msg_rpc_.mutable_msg_read_reply();
249 }
else if (msg_rpc_.has_msg_store_req()) {
250 msg_typed_ = msg_rpc_.mutable_msg_store_req();
251 }
else if (msg_rpc_.has_msg_store_abort_req()) {
252 msg_typed_ = msg_rpc_.mutable_msg_store_abort_req();
253 }
else if (msg_rpc_.has_msg_store_reply()) {
254 msg_typed_ = msg_rpc_.mutable_msg_store_reply();
255 }
else if (msg_rpc_.has_msg_info_req()) {
256 msg_typed_ = msg_rpc_.mutable_msg_info_req();
257 }
else if (msg_rpc_.has_msg_info_reply()) {
258 msg_typed_ = msg_rpc_.mutable_msg_info_reply();
259 }
else if (msg_rpc_.has_msg_shrink_req()) {
260 msg_typed_ = msg_rpc_.mutable_msg_shrink_req();
261 }
else if (msg_rpc_.has_msg_shrink_reply()) {
262 msg_typed_ = msg_rpc_.mutable_msg_shrink_reply();
263 }
else if (msg_rpc_.has_msg_list_req()) {
264 msg_typed_ = msg_rpc_.mutable_msg_list_req();
265 }
else if (msg_rpc_.has_msg_list_reply()) {
266 msg_typed_ = msg_rpc_.mutable_msg_list_reply();
267 }
else if (msg_rpc_.has_msg_breadcrumb_store_req()) {
268 msg_typed_ = msg_rpc_.mutable_msg_breadcrumb_store_req();
269 }
else if (msg_rpc_.has_msg_breadcrumb_load_req()) {
270 msg_typed_ = msg_rpc_.mutable_msg_breadcrumb_load_req();
271 }
else if (msg_rpc_.has_msg_breadcrumb_reply()) {
272 msg_typed_ = msg_rpc_.mutable_msg_breadcrumb_reply();
273 }
else if (msg_rpc_.has_msg_detach()) {
274 msg_typed_ = msg_rpc_.mutable_msg_detach();
275 is_msg_out_of_band_ =
true;
295 : fd_connection_(fd_connection)
304 cvmfs::MsgHash *msg_hash)
308 msg_hash->set_algorithm(cvmfs::HASH_SHA1);
311 msg_hash->set_algorithm(cvmfs::HASH_RIPEMD160);
314 msg_hash->set_algorithm(cvmfs::HASH_SHAKE128);
324 int object_flags, cvmfs::EnumObjectType *wire_type)
326 *wire_type = cvmfs::OBJECT_REGULAR;
328 *wire_type = cvmfs::OBJECT_CATALOG;
330 *wire_type = cvmfs::OBJECT_VOLATILE;
335 const cvmfs::MsgHash &msg_hash,
338 switch (msg_hash.algorithm()) {
339 case cvmfs::HASH_SHA1:
342 case cvmfs::HASH_RIPEMD160:
345 case cvmfs::HASH_SHAKE128:
352 if (msg_hash.digest().length() != digest_size)
354 memcpy(hash->
digest, msg_hash.digest().data(), digest_size);
360 cvmfs::EnumObjectType wire_type,
int *object_flags)
364 case cvmfs::OBJECT_REGULAR:
366 case cvmfs::OBJECT_CATALOG:
369 case cvmfs::OBJECT_VOLATILE:
381 bool retval =
RecvHeader(&size, &has_attachment);
387 buffer = alloca(size);
389 buffer = smalloc(size);
391 if ((nbytes < 0) || (static_cast<uint32_t>(nbytes) != size)) {
396 uint32_t msg_size =
size;
397 if (has_attachment) {
404 msg_size = (*
reinterpret_cast<unsigned char *
>(buffer)) +
405 ((*(
reinterpret_cast<unsigned char *
>(buffer) + 1)) << 8);
412 void *ptr_msg = has_attachment
421 if (has_attachment) {
423 if (frame->
att_size() < attachment_size) {
427 void *ptr_attachment =
429 memcpy(frame->
attachment(), ptr_attachment, attachment_size);
442 if ((nbytes < 0) || (static_cast<unsigned>(nbytes) !=
kHeaderSize))
447 *size = header[1] + (header[2] << 8) + (header[3] << 16);
458 uint32_t total_size =
464 "sending message of size %u to cache transport", total_size);
468 header[1] = (total_size & 0x000000FF);
469 header[2] = (total_size & 0x0000FF00) >> 8;
470 header[3] = (total_size & 0x00FF0000) >> 16;
476 iov[0].iov_base = header;
480 inner_header[0] = (msg_size & 0x000000FF);
481 inner_header[1] = (msg_size & 0x0000FF00) >> 8;
482 iov[1].iov_base = inner_header;
484 iov[2].iov_base = message;
485 iov[2].iov_len = msg_size;
486 iov[3].iov_base = attachment;
487 iov[3].iov_len = att_size;
489 iov[1].iov_base = message;
490 iov[1].iov_len = msg_size;
500 "failed to write to external cache transport (%d), aborting", errno);
506 unsigned total_size = 0;
507 for (
unsigned i = 0; i < iovcnt; ++i)
508 total_size += iov[i].iov_len;
509 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(alloca(total_size));
512 for (
unsigned i = 0; i < iovcnt; ++i) {
513 memcpy(buffer + pos, iov[i].iov_base, iov[i].iov_len);
514 pos += iov[i].iov_len;
517 int retval = send(
fd_connection_, buffer, total_size, MSG_DONTWAIT);
519 assert(errno != EMSGSIZE);
522 "failed to write to external cache transport (%d), aborting",
530 cvmfs::MsgRpc *msg_rpc = frame->
GetMsgRpc();
531 int32_t
size = msg_rpc->ByteSize();
534 void *buffer = smalloc(size);
536 void *buffer = alloca(size);
538 bool retval = msg_rpc->SerializeToArray(buffer, size);
static const unsigned kMaxStackAlloc
static const int kLabelCatalog
static const unsigned char kWireProtocolVersion
void SendNonBlocking(struct iovec *iov, unsigned iovcnt)
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
void FillObjectType(int object_flags, cvmfs::EnumObjectType *wire_type)
unsigned char digest[digest_size_]
static const uint32_t kFlagSendIgnoreFailure
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
void MergeFrom(const Frame &other)
static const uint32_t kMaxMsgSize
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
void * attachment() const
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
cvmfs::MsgRpc * GetMsgRpc()
uint32_t att_size() const
void set_att_size(uint32_t size)
static const unsigned char kFlagHasAttachment
google::protobuf::MessageLite * GetMsgTyped()
void SendData(void *message, uint32_t msg_size, void *attachment=NULL, uint32_t att_size=0)
static const int kLabelVolatile
bool ParseMsgRpc(void *buffer, uint32_t size)
bool RecvFrame(Frame *frame)
static const unsigned kInnerHeaderSize
void Reset(uint32_t original_att_size)
const unsigned kDigestSizes[]
static const unsigned kHeaderSize
int fd_connection() const
bool SafeWriteV(int fd, struct iovec *iov, unsigned iovcnt)
static const char * kEnvReadyNotifyFd
static const uint32_t kFlagSendNonBlocking
bool ParseObjectType(cvmfs::EnumObjectType wire_type, int *object_flags)
CacheTransport(int fd_connection)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
bool RecvHeader(uint32_t *size, bool *has_attachment)