| 1 |  |  | /** | 
    
    | 2 |  |  |  * This file is part of the CernVM File System. | 
    
    | 3 |  |  |  */ | 
    
    | 4 |  |  |  | 
    
    | 5 |  |  | #include "ingestion/task_read.h" | 
    
    | 6 |  |  |  | 
    
    | 7 |  |  | #include <errno.h> | 
    
    | 8 |  |  | #include <fcntl.h> | 
    
    | 9 |  |  | #include <unistd.h> | 
    
    | 10 |  |  |  | 
    
    | 11 |  |  | #include <cstring> | 
    
    | 12 |  |  |  | 
    
    | 13 |  |  | #include "backoff.h" | 
    
    | 14 |  |  | #include "logging.h" | 
    
    | 15 |  |  | #include "platform.h" | 
    
    | 16 |  |  | #include "smalloc.h" | 
    
    | 17 |  |  | #include "util/posix.h" | 
    
    | 18 |  |  |  | 
    
    | 19 |  |  |  | 
    
    | 20 |  |  | atomic_int64 TaskRead::tag_seq_ = 0; | 
    
    | 21 |  |  |  | 
    
    | 22 |  |  |  | 
    
    | 23 |  | 155 | void TaskRead::Process(FileItem *item) { | 
    
    | 24 |  | 155 |   BackoffThrottle throttle(kThrottleInitMs, kThrottleMaxMs, kThrottleResetMs); | 
    
    | 25 | ✓✓✓✓ ✓✓
 | 155 |   if ((high_watermark_ > 0) && (BlockItem::managed_bytes() > high_watermark_)) { | 
    
    | 26 |  | 1 |     atomic_inc64(&n_block_); | 
    
    | 27 | ✓✓ | 2 |     do { | 
    
    | 28 |  | 2 |       throttle.Throttle(); | 
    
    | 29 |  |  |     } while (BlockItem::managed_bytes() > low_watermark_); | 
    
    | 30 |  |  |   } | 
    
    | 31 |  |  |  | 
    
    | 32 | ✗✓ | 155 |   if (item->Open() == false) { | 
    
    | 33 |  |  |     LogCvmfs(kLogCvmfs, kLogStderr, "failed to open %s (%d)", | 
    
    | 34 |  |  |              item->path().c_str(), errno); | 
    
    | 35 |  |  |     abort(); | 
    
    | 36 |  |  |   } | 
    
    | 37 |  |  |   uint64_t size; | 
    
    | 38 | ✗✓ | 155 |   if (item->GetSize(&size) == false) { | 
    
    | 39 |  |  |     LogCvmfs(kLogCvmfs, kLogStderr, "failed to fstat %s (%d)", | 
    
    | 40 |  |  |              item->path().c_str(), errno); | 
    
    | 41 |  |  |     abort(); | 
    
    | 42 |  |  |   } | 
    
    | 43 |  | 155 |   item->set_size(size); | 
    
    | 44 |  |  |  | 
    
    | 45 | ✓✓ | 155 |   if (item->may_have_chunks()) { | 
    
    | 46 |  |  |     item->set_may_have_chunks( | 
    
    | 47 |  | 99 |       item->chunk_detector()->MightFindChunks(item->size())); | 
    
    | 48 |  |  |   } | 
    
    | 49 |  |  |  | 
    
    | 50 |  |  |   unsigned char *buffer[kBlockSize]; | 
    
    | 51 |  | 155 |   uint64_t tag = atomic_xadd64(&tag_seq_, 1); | 
    
    | 52 |  | 155 |   ssize_t nbytes = -1; | 
    
    | 53 |  | 155 |   unsigned cnt = 0; | 
    
    | 54 | ✓✓ | 216390 |   do { | 
    
    | 55 |  | 216390 |     nbytes = item->Read(buffer, kBlockSize); | 
    
    | 56 | ✗✓ | 216390 |     if (nbytes < 0) { | 
    
    | 57 |  |  |       LogCvmfs(kLogCvmfs, kLogStderr, "failed to read %s (%d)", | 
    
    | 58 |  |  |                item->path().c_str(), errno); | 
    
    | 59 |  |  |       abort(); | 
    
    | 60 |  |  |     } | 
    
    | 61 |  |  |  | 
    
    | 62 |  | 216390 |     BlockItem *block_item = new BlockItem(tag, allocator_); | 
    
    | 63 |  | 216382 |     block_item->SetFileItem(item); | 
    
    | 64 | ✓✓ | 216382 |     if (nbytes == 0) { | 
    
    | 65 |  | 155 |       item->Close(); | 
    
    | 66 |  | 155 |       block_item->MakeStop(); | 
    
    | 67 |  |  |     } else { | 
    
    | 68 |  |  |       block_item->MakeDataCopy(reinterpret_cast<unsigned char *>(buffer), | 
    
    | 69 |  | 216227 |                                nbytes); | 
    
    | 70 |  |  |     } | 
    
    | 71 |  | 216390 |     tubes_out_->Dispatch(block_item); | 
    
    | 72 |  |  |  | 
    
    | 73 |  | 215514 |     cnt++; | 
    
    | 74 | ✓✓ | 215514 |     if ((cnt % 32) == 0) { | 
    
    | 75 | ✓✗✗✓ ✗✓
 | 6752 |       if ((high_watermark_ > 0) && | 
    
    | 76 |  |  |           (BlockItem::managed_bytes() > high_watermark_)) | 
    
    | 77 |  |  |       { | 
    
    | 78 |  |  |         throttle.Throttle(); | 
    
    | 79 |  |  |       } | 
    
    | 80 |  |  |     } | 
    
    | 81 |  |  |   } while (nbytes > 0); | 
    
    | 82 |  | 155 | } | 
    
    | 83 |  |  |  | 
    
    | 84 |  |  |  | 
    
    | 85 |  | 705 | void TaskRead::SetWatermarks(uint64_t low, uint64_t high) { | 
    
    | 86 | ✗✓ | 705 |   assert(high > low); | 
    
    | 87 | ✗✓ | 705 |   assert(low > 0); | 
    
    | 88 |  | 705 |   low_watermark_ = low; | 
    
    | 89 |  | 705 |   high_watermark_ = high; | 
    
    | 90 |  | 705 | } |