(* $Id: seqdb_rdwr.ml 16180 2008-01-18 20:57:28Z gerd $ *) let ( +@ ) = Int64.add let ( -@ ) = Int64.sub let ( *@ ) = Int64.mul let ( /@ ) = Int64.div let ( ~> ) = Int64.of_int (* ">" = "convert to the bigger type" *) let ( ~< ) = Int64.to_int (* "<" = "convert to the smaller type" *) class type file_descr = object method file_descr : Unix.file_descr method dispose_hint : unit -> unit end class type disposable_descr = object inherit file_descr method dispose_descr : unit -> bool end class managed_descr ~filename ~flags ~reopen_flags ~perm ~reopen_perm : disposable_descr = object val mutable reopen_flag = false val mutable fd = None val mutable disposable = true method file_descr = match fd with | Some d -> disposable <- false; d | None -> let d = if reopen_flag then Unix.openfile filename reopen_flags reopen_perm else Unix.openfile filename flags perm in reopen_flag <- true; fd <- Some d; disposable <- false; d method dispose_hint() = disposable <- true method dispose_descr() = match fd with | Some d -> disposable && ( Unix.close d; fd <- None; true ) | None -> true end class type reader_writer = object method chunk_size : int method pos : int64 method seek : int64 -> unit method eof : int64 method input : string -> int -> int -> int method really_input : string -> int -> int -> unit method output : string -> int -> int -> int method really_output : string -> int -> int -> unit method flush : unit -> unit method dispose_descr : unit -> unit end class virtual really_io = object (self) method virtual input : string -> int -> int -> int method virtual output : string -> int -> int -> int method really_input s s_pos s_len = let p = ref s_pos in let n = ref s_len in while !n > 0 do let m = self # input s !p !n in if m = 0 then raise End_of_file; p := !p + m; n := !n - m done method really_output s s_pos s_len = let p = ref s_pos in let n = ref s_len in while !n > 0 do let m = self # output s !p !n in p := !p + m; n := !n - m done end module Q : sig (* A Queue of int64 where one can move elements to the end *) type t and cell val create : unit -> t val add : int64 -> t -> cell val take : t -> int64 val move : t -> cell -> unit val clear : t -> unit end = struct type t = { q : int64 option ref Queue.t; mutable qsize : int } and cell = int64 option ref ref let create() = { q = Queue.create(); qsize = 0 } let add x q = let r = ref (Some x) in Queue.add r q.q; q.qsize <- q.qsize + 1; ref r let rec take q = let r = Queue.take q.q in match !r with | None -> take q | Some x -> r := None; q.qsize <- q.qsize - 1; x let cleanup q = let q' = Queue.create () in Queue.iter (fun r -> match !r with | None -> () | Some x -> Queue.add r q' ) q.q; Queue.clear q.q; Queue.transfer q' q.q let move q r = match ! (!r) with | None -> failwith "Q.move: cannot move taken element" | Some x -> (!r) := None; q.qsize <- q.qsize - 1; let r' = add x q in if Queue.length q.q > 2*q.qsize then cleanup q; r := !r' let clear q = Queue.iter (fun r -> match !r with | None -> () | Some _ -> r := None ) q.q; Queue.clear q.q end type buffer = { bstring : string; mutable bpos : int64; (* file pos of the first byte of the buffer *) mutable blen : int; (* relevant length of bstring content *) mutable dstart : int; (* Start of dirty part of buffer *) mutable dlen : int; (* Length of dirty part of buffer *) mutable qcell : Q.cell option } let create_buffer bstring_size = { bstring = String.create bstring_size; bpos = 0L; blen = 0; dstart = 0; dlen = 0; qcell = None } let os_page_size = 4096 (* Assumption about the page size of the operating system. This affects performance (but not very much) *) class buf_rd_wr ?(buffer_size = 0) ?(chunk_size = 16384) (io_fd : file_descr) : reader_writer = let aligned_buffers = buffer_size > chunk_size in let bstring_size = chunk_size in let bg_buffer_cnt = if aligned_buffers then buffer_size / chunk_size else 1 in object(self) inherit really_io val mutable buffer = create_buffer bstring_size (* This is the currently used buffer ([pos] points to it or the byte after it). It is only valid if blen > 0, and in this case it is in bg_buffers and bg_queue. *) val bg_buffers = Hashtbl.create 1000 (* Background buffers, hashed by bpos *) val bg_queue = Q.create() (* Least recently used bpos is first in this queue *) val mutable pos = 0L (* File position, as exposed in the interface *) val mutable fd_opt = None val mutable fd_pos = None (* The file descriptor, if available, and the real file position *) val mutable eof = None (* The EOF position, as exposed in the interface *) method chunk_size = bstring_size method private fd = match fd_opt with | Some fd -> fd | None -> let fd = io_fd#file_descr in let p = Unix.LargeFile.lseek fd 0L Unix.SEEK_CUR in fd_opt <- Some fd; fd_pos <- Some p; fd method private fd_pos = let fd = self#fd in match fd_pos with | None -> let p = Unix.LargeFile.lseek fd 0L Unix.SEEK_CUR in fd_pos <- Some p; p | Some p -> p method private fd_seek p = if Some p <> fd_pos then self#fd_really_seek p method private fd_really_seek p = let fd = self#fd in ignore(Unix.LargeFile.lseek fd p Unix.SEEK_SET); fd_pos <- Some p method dispose_descr() = self # flush(); self # drop_buffers(); fd_opt <- None; eof <- None; io_fd # dispose_hint() method private drop_buffers() = Hashtbl.clear bg_buffers; Q.clear bg_queue; buffer.blen <- 0 method pos = pos method seek p = pos <- p method eof = match eof with | None -> (* Nothing known about eof, so read it from the file *) let fd = self#fd in let st = Unix.LargeFile.fstat fd in eof <- Some st.Unix.LargeFile.st_size; st.Unix.LargeFile.st_size | Some p -> p method flush() = (* Flush in increasing position order *) let dirty_buffers = ref [ buffer ] in (* might be flushed twice *) Hashtbl.iter (fun _ b -> if b.dlen > 0 then dirty_buffers := b :: !dirty_buffers) bg_buffers; dirty_buffers := List.sort (fun b1 b2 -> Int64.compare b1.bpos b2.bpos) !dirty_buffers; List.iter (fun b -> self # tracked_write b ) !dirty_buffers method private new_buffer bpos = (* Return an unregistered buffer for bpos *) (* prerr_endline("new_buffer " ^ Int64.to_string bpos); *) let b = if buffer.blen > 0 then ( (* The active buffer is the wrong one. Because blen>0 we know this buffer is entered into bg_buffers, so we can simply replace it *) let buffer_to_use = if aligned_buffers then ( if Hashtbl.length bg_buffers >= bg_buffer_cnt then ( (* We must get rid of one of the background buffers *) let old_bpos = try Q.take bg_queue with Queue.Empty -> assert false in let old_buffer = try Hashtbl.find bg_buffers old_bpos with Not_found -> assert false in self # tracked_write old_buffer; Hashtbl.remove bg_buffers old_bpos; old_buffer ) else create_buffer bstring_size ) else ( self # tracked_write buffer; buffer ) in buffer_to_use ) else buffer in b.bpos <- bpos; b.blen <- 0; b.dstart <- 0; b.dlen <- 0; b.qcell <- None; b method private count b = if aligned_buffers then ( (* Check bg_buffers *) if not (Hashtbl.mem bg_buffers b.bpos) then Hashtbl.add bg_buffers b.bpos b; (* Move buffer to the end of the bg_queue *) match b.qcell with | None -> (* Not yet in bg_queue *) let r = Q.add b.bpos bg_queue in b.qcell <- Some r | Some r -> Q.move bg_queue r ) method private tracked_write b = (* Write out the dirty part of buffer [b] *) if b.dlen > 0 then ( let fd = self#fd in if aligned_buffers then ( (* Extend the dirty part to OS pages *) let dstart = (b.dstart / os_page_size) * os_page_size in let dend0 = b.dstart + b.dlen in let dend1 = ((dend0-1) / os_page_size + 1) * os_page_size in let dend = min b.blen dend1 in let dlen = dend - dstart in self # fd_seek (b.bpos +@ ~>dstart); try Netsys.really_write fd b.bstring dstart dlen; fd_pos <- Some(self#fd_pos +@ (~> dlen)); with (* on error, fd_pos cannot be predicted, so invalidate it *) | error -> fd_pos <- None; raise error ) else ( (* Write the dirty part only *) self # fd_seek (b.bpos +@ ~>(b.dstart)); try Netsys.really_write fd b.bstring b.dstart b.dlen; fd_pos <- Some(self#fd_pos +@ (~> (b.dlen))); with (* on error, fd_pos cannot be predicted, so invalidate it *) | error -> fd_pos <- None; raise error ); b.dlen <- 0; ) method private tracked_read b = assert(b.dlen = 0); let fd = self#fd in let epos = self#eof in if b.bpos < epos then ( self # fd_seek b.bpos; try let n = Netsys.blocking_read fd b.bstring 0 bstring_size in fd_pos <- Some(self#fd_pos +@ (~> n)); b.blen <- n with (* on error, fd_pos cannot be predicted, so invalidate it *) | error -> fd_pos <- None; raise error ) else b.blen <- 0 method input s s_pos s_len = if s_len > 0 then ( if pos < buffer.bpos || pos >= buffer.bpos +@ (~> (buffer.blen)) then ( (* The buffer isn't useful at all, so activate a better one *) self # activate pos; ); if buffer.blen > 0 then self # count buffer; let buffer_start = ~< (pos -@ buffer.bpos) in let buffer_avail = buffer.blen - buffer_start in let n = min s_len buffer_avail in if n > 0 then ( String.blit buffer.bstring buffer_start s s_pos n; pos <- pos +@ (~> n); n ) else (* buffer_avail=0 and aligned_buffers: We might have to return zeros in case of file gaps: *) if (buffer.blen < bstring_size && self#eof >= buffer.bpos +@ ~>bstring_size) then ( assert(aligned_buffers); let avail = bstring_size - buffer.blen in let n = min s_len avail in String.fill s s_pos n '\000'; pos <- pos +@ (~> n); n ) else (* Even if this is the last buffer of the file, we can be sure that there are no bytes past blen *) 0 ) else 0 method private activate p = (* Activate a buffer so that it covers position [p] *) (* prerr_endline ("activate " ^ Int64.to_string p); *) let p' = if aligned_buffers then (p /@ ~>bstring_size) *@ ~>bstring_size else p in let alt_buffer_opt = try Some(Hashtbl.find bg_buffers p') with Not_found -> None in ( match alt_buffer_opt with | Some b -> (* b starts at p' *) buffer <- b | None -> let b = self # new_buffer p' in self # tracked_read b; buffer <- b (* Note: if an exception is raised by tracked_read, the buffer remains unregistered. *) ); assert(p >= buffer.bpos && p < buffer.bpos +@ (~> bstring_size)) (* POSTCONDITION *) method output s s_pos s_len = if s_len > 0 then ( let buf_overwriting = (* Can overwrite at least one char of the buffer *) pos >= buffer.bpos && pos < buffer.bpos +@ (~> (buffer.blen)) in let buf_appending = (* Can append at least one char to the buffer *) pos = buffer.bpos +@ (~> (buffer.blen)) && buffer.blen > 0 && buffer.blen < bstring_size in if not buf_overwriting && not buf_appending then ( (* The buffer isn't useful at all. In the unaligned case, we can simply invent a new buffer. In the aligned case, we have to read a buffer first. *) if not aligned_buffers then buffer <- self # new_buffer pos else self # activate pos ); (* Put the string into the buffer *) let n = self # put buffer pos s s_pos s_len in pos <- pos +@ (~> n); if buffer.blen > 0 then self#count buffer; if pos > self#eof then eof <- Some pos; (* File got longer *) n ) else 0 method private put b put_pos s s_pos s_len = (* Put the string into the buffer b at put_pos (a file position). Return the # of bytes *) (* prerr_endline("put " ^ Int64.to_string put_pos); *) assert(put_pos >= buffer.bpos); (* PRECONDITION *) let buffer_start = ~< (put_pos -@ buffer.bpos) in assert(buffer_start <= bstring_size); (* PRECONDITION *) if buffer_start > buffer.blen then (* Fill the gap with zero's *) String.fill buffer.bstring buffer.blen (buffer_start - buffer.blen) '\000'; let buffer_avail = bstring_size - buffer_start in let n = min s_len buffer_avail in String.blit s s_pos buffer.bstring buffer_start n; let new_blen = max buffer.blen (buffer_start + n) in buffer.blen <- new_blen; if buffer.dlen > 0 then ( let buffer_end = buffer_start + n in let dend = buffer.dstart + buffer.dlen in buffer.dstart <- min buffer.dstart buffer_start; buffer.dlen <- (max dend buffer_end) - buffer.dstart; ) else ( (* buffer.dstart not initialized! *) buffer.dstart <- buffer_start; buffer.dlen <- n; ); n end let zero_block = String.make 16384 '\000' (* 16384 is the size of the buffer in the ocaml runtime used for I/O. More than this does not make sense *) class sub_rd_wr ?incr eof (rd_wr : reader_writer) = (* Enforce that eof is at [eof] *) object(self) inherit really_io val mutable log_eof = eof method chunk_size = rd_wr # chunk_size method pos = rd_wr # pos method seek p = if p > rd_wr#eof then failwith "sub_rd_wr: seeking outside the valid range"; rd_wr # seek p method eof = log_eof method input s s_pos s_len = let p = rd_wr # pos in let n = ~< (min (~> s_len) (log_eof -@ p)) in rd_wr # input s s_pos n method output s s_pos s_len = match incr with | Some 0L | None -> let n = rd_wr # output s s_pos s_len in if rd_wr # pos > log_eof then log_eof <- rd_wr # pos; n | Some i -> if rd_wr#pos = rd_wr#eof then ( (* Append to file *) let old_pos = rd_wr#pos in for k = 1 to ~< (i /@ 16384L) do rd_wr # really_output zero_block 0 16384 done; rd_wr # really_output zero_block 0 (~< (Int64.rem i 16384L)); rd_wr # flush(); rd_wr # seek old_pos ); let s_len' = ~< (min (~> s_len) (rd_wr#eof -@ rd_wr#pos)) in let n = rd_wr # output s s_pos s_len' in if rd_wr # pos > log_eof then log_eof <- rd_wr # pos; n method flush = rd_wr # flush method dispose_descr = rd_wr # dispose_descr end let input_string (rw : reader_writer) n = let s = String.create n in rw # really_input s 0 n; s let output_string (rw : reader_writer) s = rw # really_output s 0 (String.length s)