(* $Id: seqdb_archive.ml 16166 2008-01-17 22:59:55Z gerd $ *) (* Format of the archives: * * - The archives consist of a sequence of blocks of size 512. * - The last four bytes of the blocks contain special tags: * * block[508] and block[509]: These bytes are non-zero if the block * contains the last byte of an entry trailer (see below), and zero * if not. In the first case, the ordinal number of the last such * trailer is stored (where the very first trailer of the archive * is number 1), big endian. * * block[510] and block[511]: The byte position of the last byte * of the last trailer in this block is stored (big endian), or 0 if * no such byte occurs. * - The first 508 bytes contain the payload of the block. * - The payload of all blocks is a sequence of pairs * (entry_data, entry_trailer) * - The entry_trailer is the XDR-encoded struct * * struct entry_trailer { * entry_data_size : int; * entry_name : string<>; * entry_timestamp : hyper; * entry_gzip : bool; * entry_trailer_size : int; * } * * where entry_name, entry_timestamp, and entry_gzip are * defined as for the [properties] of the entry. entry_data_size is the * number of bytes entry_data consists of. entry_trailer_size is the * size of the trailer itself in bytes (which is variable). * * In order to read such an archive, one has to read the blocks backwards * and look for the last block where block[508/509] is non-zero. This is * the last properly written block (i.e. the archive was closed after * the last entry had been appended). Then look for the last byte of the * last trailer whose position is in block[510/511]. Then read * entry_trailer_size and read the trailer. Jumping to the previous trailer * is possible by skipping over entry_data. * * Appending works by first finding out the last properly written block and * the last byte of the last trailer as before. The next byte is the * start of the unused region. After writing the entry, one also has to * update the last four bytes in the last block. * * The file format assumes that the file system and the block layer use * 512 bytes long blocks. If so, and the file system writes files in * block order, it even assures crash protection. *) let block_size_lg = 9 let block_size = 1 lsl block_size_lg let payload_size = block_size - 4 (* block size minus the 4 bytes used for the tags *) class type block_access = object method length : int (* Length in blocks *) method read : int -> string -> unit (* Reads the n-th block and copies it into the string which must have a * length of 512 or longer. *) method write : int -> string -> unit (* Writes the n-th block *) method truncate : int -> unit (* Truncates to n blocks *) method close : unit -> unit method keep_page_cache_clean : unit -> unit end class file_block_access fd : block_access = let st = Unix.LargeFile.fstat fd in object(self) val mutable length = Int64.to_int (Int64.shift_right st.Unix.LargeFile.st_size block_size_lg) val mutable closed = false method length = length method read n data = if n < 0 || n >= length then invalid_arg "file_block_access#read"; if closed then failwith "file_block_access#read: already closed"; let _ = Unix.LargeFile.lseek fd (Int64.shift_left (Int64.of_int n) block_size_lg) Unix.SEEK_SET in Netsys.really_read fd data 0 block_size method write n data = if n < 0 || n > length then invalid_arg "file_block_access#write"; if closed then failwith "file_block_access#write: already closed"; let _ = Unix.LargeFile.lseek fd (Int64.shift_left (Int64.of_int n) block_size_lg) Unix.SEEK_SET in Netsys.really_write fd data 0 block_size; if n >= length then length <- n+1 method truncate n = if n < 0 || n > length then invalid_arg "file_block_access#truncate"; if closed then failwith "file_block_access#truncate: already closed"; Unix.LargeFile.ftruncate fd (Int64.shift_left (Int64.of_int n) block_size_lg); length <- n method close() = if not closed then ( closed <- true; Unix.close fd ) method keep_page_cache_clean () = if not closed then ( Linux.fdatasync fd; Linux.fadvise fd 0L 0L Linux.FADV_DONTNEED ) end class filesys_file_block_access (fsys : 'fd Seqdb_fsys_types.file_system) (fd : 'fd) : block_access = let byte_length = fsys#file_size fd in object(self) val mutable length = Int64.to_int (Int64.shift_right byte_length block_size_lg) val mutable closed = false method length = length method read n data = if n < 0 || n >= length then invalid_arg "filesys_file_block_access#read"; if closed then failwith "filesys_file_block_access#read: already closed"; let p = Int64.shift_left (Int64.of_int n) block_size_lg in let n = fsys # read_file fd data 0 p block_size in if n < block_size then raise End_of_file method write n data = if n < 0 || n > length then invalid_arg "filesys_file_block_access#write"; if closed then failwith "filesys_file_block_access#write: already closed"; let p = Int64.shift_left (Int64.of_int n) block_size_lg in fsys # write_file fd data 0 p block_size; if n >= length then length <- n+1 method truncate n = if n < 0 || n > length then invalid_arg "filesys_file_block_access#truncate"; if closed then failwith "filesys_file_block_access#truncate: already closed"; let p = Int64.shift_left (Int64.of_int n) block_size_lg in fsys # truncate fd p; length <- n method close() = if not closed then ( closed <- true; fsys # close_file fd ) method keep_page_cache_clean () = () (* CHECK whether needed *) end class string_block_access s : block_access = let length = String.length s asr block_size_lg in object(self) method length = length method read n data = if n < 0 || n >= length then invalid_arg "string_block_access#read"; String.blit s (n lsl block_size_lg) data 0 block_size method write n data = failwith "string_block_access#write: read-only" method truncate n = failwith "string_block_access#truncate: read-only" method close() = () method keep_page_cache_clean () = () end class type block_based_channel_t = object inherit Netchannels.raw_in_channel inherit Netchannels.raw_out_channel method get_tag : int * int * int (* returns [(entry_ord, last_trailer_block, last_trailer_pos)] * of the last valid block. If the archive is empty, the method * returns (0,(-1),(-1)). *) method set_tag : int -> unit (* [set_tag entry_ord]: sets the tag to the current file position *) method block_pos : int * int (* Returns [(block, block_pos)] *) method seek_by_block : (int * int) -> unit (* [seek_by_block block block_pos]: Seeks to this [block] number at * [block_pos]. Works for both reading and writing. *) method keep_page_cache_clean : unit -> unit end class block_based_channel (ba : block_access) ro_flag : block_based_channel_t = object(self) val data = String.create block_size val mutable block = 0 val mutable block_pos = 0 val mutable state = `Empty (* `Empty: [data] does not contain data * `Clean: [data] contains the data of [block], but nothing must be written * back to disk * `Dirty: [data] contains the data of [block], and still must be written * to disk *) val mutable tag = None method pos_in = let block_base = block * payload_size in block_base + (min block_pos payload_size) method pos_out = self # pos_in method block_pos = if block_pos = payload_size then (block+1, 0) else (block, block_pos) method seek_by_block (b_nr, b_pos) = let b_len = ba # length in let b_len = if block = b_len then b_len + 1 else b_len in if b_nr > b_len then raise End_of_file; if b_nr <> block && state = `Dirty then self # save_block(); if b_nr <> block then state <- `Empty; block <- b_nr; block_pos <- b_pos method private load_block() = if state = `Empty then ( let b_len = ba # length in if block >= b_len then raise End_of_file; ba # read block data; state <- `Clean ) method private load_or_create_block() = if state = `Empty then ( let b_len = ba # length in if block >= b_len then ( String.fill data 0 block_size '\000' ) else ( ba # read block data; ); state <- `Clean ) method private save_block() = if state = `Dirty then ( ba # write block data; state <- `Clean ) method input s p l = if l > 0 then ( if block_pos >= payload_size then self # seek_by_block ((block+1), 0); self # load_block(); let n = min l (payload_size - block_pos) in String.blit data block_pos s p n; block_pos <- block_pos + n; n ) else 0 method output s p l = if ro_flag then failwith "block_based_channel#output: read-only"; if l > 0 then ( if block_pos >= payload_size then self # seek_by_block ((block+1), 0); self # load_or_create_block(); let n = min l (payload_size - block_pos) in String.blit s p data block_pos n; block_pos <- block_pos + n; state <- `Dirty; n ) else 0 method close_in() = self # close_all() method close_out() = self # close_all() method private close_all() = self # save_block(); ( match tag with | Some(_, tag_block, _) -> if ba#length > tag_block+1 && not ro_flag then ba # truncate (tag_block+1) | None -> () ); ba # close(); method flush() = self # save_block() method keep_page_cache_clean() = ba # keep_page_cache_clean() method get_tag = match tag with | None -> let t = self # find_tag in tag <- Some t; t | Some t -> t method private find_tag = let rec check_block n = if n < 0 then (0,-1,-1) else ( ba # read n data; if data.[block_size-4] <> '\000' || data.[block_size-3] <> '\000' then ( let entry_ord = Char.code data.[block_size-4] * 256 + Char.code data.[block_size-3] and last_trailer_pos = Char.code data.[block_size-2] * 256 + Char.code data.[block_size-1] in (entry_ord, n, last_trailer_pos) ) else check_block (n-1) ) in self # save_block(); let l = ba # length in let t = check_block (l-1) in state <- `Empty; (* because [data] has been used for a different purpose *) t method set_tag entry_ord = if ro_flag then failwith "block_based_channel#set_tag: read-only"; let old_block = block in let old_block_pos = block_pos in (* Go back 1 *) if block_pos = 0 then ( self # seek_by_block (block-1, payload_size-1); self # load_block(); ) else ( self # seek_by_block (block, block_pos-1); self # load_block(); ); data.[block_size-4] <- Char.chr (entry_ord asr 8); data.[block_size-3] <- Char.chr (entry_ord land 255); data.[block_size-2] <- Char.chr (block_pos asr 8); data.[block_size-1] <- Char.chr (block_pos land 255); state <- `Dirty; tag <- Some(entry_ord, block, block_pos); self # seek_by_block (old_block, old_block_pos); end let trailer_struct_tt = Xdr.X_struct [ "entry_data_size", Xdr.X_int; "entry_name", Xdr.x_string_max; "entry_timestamp", Xdr.X_hyper; "entry_gzip", Xdr.x_bool; "entry_comment", Xdr.x_string_max; ] let trailer_size_tt = Xdr.X_int let trailer_struct_t = Xdr.validate_xdr_type trailer_struct_tt let trailer_size_t = Xdr.validate_xdr_type trailer_size_tt type mode = [ `Rdonly | `Rdwr | `Rdwr_exclusive ] type properties = { entry_name : string; entry_timestamp : int64; entry_gzip : bool; entry_comment : string; } type entry = { arch : t; props : properties; data_start_block : int; data_start_block_pos : int; data_length : int; } and t = { chan : block_based_channel_t; mode : mode; mutable old_entries : entry list option; (* The last entry is the first in the list. None = not yet read in *) mutable new_entries : entry list; (* Appended entries *) mutable num_entries : int; mutable old_eof_block : int; (* EOF of old_entries *) mutable old_eof_block_pos : int; mutable new_eof_block : int; (* EOF of new_entries *) mutable new_eof_block_pos : int; mutable locked : bool; (* locked because a read or append is being performed *) } let openba mode ba = let chan = new block_based_channel ba (mode = `Rdonly) in let (entry_ord, last_trailer_block, last_trailer_pos) = chan # get_tag in let eof_block_pos = if last_trailer_pos = payload_size-1 then 0 else last_trailer_pos + 1 in let eof_block = if eof_block_pos = 0 then last_trailer_block + 1 else last_trailer_block in { chan = chan; mode = mode; old_entries = None; new_entries = []; num_entries = entry_ord; old_eof_block = eof_block; old_eof_block_pos = eof_block_pos; new_eof_block = eof_block; new_eof_block_pos = eof_block_pos; locked = false } let rec restart f arg = try f arg with | Unix.Unix_error(Unix.EINTR,_,_) -> restart f arg let openfile mode filename = let flags = match mode with | `Rdonly -> [ Unix.O_RDONLY ] | `Rdwr | `Rdwr_exclusive -> [ Unix.O_RDWR; Unix.O_CREAT ] in let fd = Unix.openfile filename flags 0o666 in if mode = `Rdwr_exclusive then ( restart (Unix.lockf fd Unix.F_LOCK) 0 ); let ba = new file_block_access fd in openba mode ba let openfile_filesys fsys mode filename = let fd = match mode with | `Rdonly -> fsys # open_file_rd filename [ 'a' ] | `Rdwr | `Rdwr_exclusive -> fst (fsys # open_file_wr filename [ 'a' ] (Some 'a')) in (* We lock the whole file system anyway, so no special action is necessary for `Rdwr_exclusive *) let ba = new filesys_file_block_access fsys fd in openba mode ba let openstring s = let ba = new string_block_access s in openba `Rdonly ba let close arch = arch.chan # close_in() (* implies close_out() *) let keep_page_cache_clean arch = arch.chan # keep_page_cache_clean() class appending_channel arch entry_name entry_timestamp entry_gzip entry_comment = let start_block = arch.new_eof_block in let start_block_pos = arch.new_eof_block_pos in let () = arch.chan # seek_by_block (start_block, start_block_pos) in let start_pos = arch.chan # pos_out in let () = arch.locked <- true in object(self) inherit Netchannels.lift_raw_out_channel (arch.chan :> Netchannels.raw_out_channel) as super val mutable entry_size_opt = None method set_uncompressed_size n = (* Must be called before close_out to be effective *) entry_size_opt <- Some n method close_out() = (* Write the trailer *) let entry_data_size = arch.chan # pos_out - start_pos in let _entry_size = match entry_size_opt with | None -> entry_data_size (* uncompressed *) | Some n -> n in (* We do not use entry_size for now *) let trailer_v = Xdr.XV_struct_fast [| Xdr.XV_int (Rtypes.int4_of_int entry_data_size); Xdr.XV_string entry_name; Xdr.XV_hyper (Rtypes.int8_of_int64 entry_timestamp); if entry_gzip then Xdr.xv_true else Xdr.xv_false; Xdr.XV_string entry_comment; |] in let trailer_s = Xdr.pack_xdr_value_as_string trailer_v trailer_struct_t [] in let trailer_size_v = Xdr.XV_int (Rtypes.int4_of_int (String.length trailer_s + 4)) in let trailer_size_s = Xdr.pack_xdr_value_as_string trailer_size_v trailer_size_t [] in self # output_string trailer_s; self # output_string trailer_size_s; let (block, block_pos) = arch.chan # block_pos in arch.new_eof_block <- block; arch.new_eof_block_pos <- block_pos; (* Tag the block *) arch.num_entries <- arch.num_entries + 1; arch.chan # set_tag arch.num_entries; (* Remember the entry *) let props = { entry_name = entry_name; entry_timestamp = entry_timestamp; entry_gzip = entry_gzip; entry_comment = entry_comment; } in let entry = { arch = arch; props = props; data_start_block = start_block; data_start_block_pos = start_block_pos; data_length = entry_data_size; } in arch.new_entries <- entry :: arch.new_entries; (* We do _not_ close arch.chan! *) arch.chan # flush(); arch.locked <- false end class compressing_channel ch = object(self) inherit Netgzip.output (ch :> Netchannels.out_obj_channel) as super method close_out() = ch # set_uncompressed_size (self # pos_out); super # close_out() end class decompressing_channel (ch : appending_channel) = (* Limitation: Cannot decompress on the fly *) let b = Buffer.create 1000 in object(self) inherit Netchannels.output_buffer b as super method close_out() = super # close_out(); let ch' = new Netgzip.input (new Netchannels.input_string (Buffer.contents b)) in ch # output_channel ch'; ch' # close_in(); ch # close_out() end let append arch entry_name entry_timestamp entry_gzip data_gzip comment = if arch.locked then failwith "Archive.append: already reading or writing an entry"; let ch = new appending_channel arch entry_name entry_timestamp entry_gzip comment in match data_gzip, entry_gzip with | false, true -> new compressing_channel ch | true, false -> new decompressing_channel ch | _ -> (ch :> Netchannels.out_obj_channel) let to_linear_pos (block, block_pos) = block * payload_size + block_pos let to_block_pos linear_pos = (linear_pos / payload_size, linear_pos mod payload_size) let read_string chan n = let s = String.create n in let rec read_at p = if p < n then ( let k = chan # input s p (n-p) in read_at (p+k) ) in read_at 0; s let scan_entry arch after_entry_pos = let trailer_size_pos = after_entry_pos - 4 in arch.chan # seek_by_block (to_block_pos trailer_size_pos); let trailer_size_s = read_string arch.chan 4 in let trailer_size = Rtypes.int_of_int4 (Rtypes.read_int4 trailer_size_s 0) in let trailer_pos = after_entry_pos - trailer_size in arch.chan # seek_by_block (to_block_pos trailer_pos); let trailer_s = read_string arch.chan (trailer_size - 4) in let trailer_v = Xdr.unpack_xdr_value ~fast:true trailer_s trailer_struct_t [] in let ts = Xdr.dest_xv_struct_fast trailer_v in let entry_data_size = Rtypes.int_of_int4 (Xdr.dest_xv_int ts.(0)) in let entry_name = Xdr.dest_xv_string ts.(1) in let entry_timestamp = Rtypes.int64_of_int8 (Xdr.dest_xv_hyper ts.(2)) in let entry_gzip = Xdr.dest_xv_enum_fast ts.(3) > 0 in let entry_comment = Xdr.dest_xv_string ts.(4) in let props = { entry_name = entry_name; entry_timestamp = entry_timestamp; entry_gzip = entry_gzip; entry_comment = entry_comment; } in let (data_start_block, data_start_block_pos) = to_block_pos (trailer_pos - entry_data_size) in let entry = { arch = arch; props = props; data_start_block = data_start_block; data_start_block_pos = data_start_block_pos; data_length = entry_data_size } in entry let contents arch = match arch.old_entries with | Some l -> arch.new_entries @ l | None -> if arch.locked then failwith "Archive.contents: already reading or writing an entry"; let l = ref [] in let cur_block = ref arch.old_eof_block in let cur_block_pos = ref arch.old_eof_block_pos in for k = 1 to arch.num_entries do let entry = scan_entry arch (to_linear_pos (!cur_block,!cur_block_pos)) in l := entry :: !l; cur_block := entry.data_start_block; cur_block_pos := entry.data_start_block_pos; done; let l' = List.rev !l in arch.old_entries <- Some l'; arch.new_entries @ l' let info entry = entry.props class override_close_in onclose ch = object(self) inherit Netchannels.in_obj_channel_delegation ch method close_in() = onclose() end let read entry = let arch = entry.arch in if arch.locked then failwith "Archive.read: already reading or writing an entry"; arch.chan # seek_by_block (entry.data_start_block, entry.data_start_block_pos); let ch0 = new Netchannels.lift_rec_in_channel (arch.chan :> Netchannels.rec_in_channel) in let ch1 = new Netstream.input_stream ~len:entry.data_length ch0 in let ch2 = new override_close_in (fun () -> arch.locked <- false) (ch1 :> Netchannels.in_obj_channel) in if entry.props.entry_gzip then new Netgzip.input ch2 else ch2