(* $Id: seqdb_containers.ml 16180 2008-01-18 20:57:28Z gerd $ *) open Seqdb_rdwr let ( +@ ) = Int64.add let ( -@ ) = Int64.sub let ( *@ ) = Int64.mul let ( /@ ) = Int64.div let ( ~> ) = Int64.of_int (* ">" = "convert to the bigger type" *) let ( ~< ) = Int64.to_int (* ">" = "convert to the smaller type" *) let ( ~<< ) = fun n -> if n > Int64.of_int max_int then failwith "Integer too large"; Int64.to_int n let read_int64 s = Rtypes.int64_of_int8 (Rtypes.read_int8 s 0) let write_int64 n = Rtypes.int8_as_string (Rtypes.int8_of_int64 n) module Sb_consts = struct let winkme = "#!WINKME" (* Names of superblock variables: *) let sbsize_name = "SBSIZE" let format_name = "FORMAT" let purpose_name = "PURPOSE" let filesize_name = "FILESIZE" let fileincr_name = "FILEINCR" let syncsize_name = "SYNCSIZE" let synctime_name = "SYNCTIME" let keyrepr_name = "KEYREPR" let valrepr_name = "VALREPR" let kvdelfl_name = "KVDELFL" let align_name = "ALIGN" let entries_name = "ENTRIES" let aentries_name = "AENTRIES" let htalgo_name = "HTALGO" let htsize_name = "HTSIZE" let permsize_name = "PERMSIZE" let cellsz_name = "CELLSZ" let havedups_name = "HAVEDUPS" let isz_name = "ISZ" let itotsz_name = "ITOTSZ" let dtotsz_name = "DTOTSZ" let timemp_name = "TIMEMP" let timemark_name = "TIMEMARK" let timemn_name = "TIMEMN" let lastmt_name = "LASTMT" (* Values for FORMAT: *) let kvseq_format = 0x10L let hindex_format = 0x20L let perm_format = 0x30L (* Values for DELFLAG and KVDELFL: *) let false_bool = 0L let true_bool = 1L (* Values for KEYREPR and VALREPR: *) let int8_repr = 0L let int16_repr = 1L let int32_repr = 2L let int64_repr = 3L let fixed_repr_min = 4L let fixed_repr_max = 259L (* let lim8_repr = 259L *) let lim8_repr_min = 260L let lim8_repr_max = 514L (* Values for HTALGO: *) let md5_algo = 1L end module Superblock = struct open Sb_consts type t = { vars : (string, int64) Hashtbl.t; sbsize : int } let int64_of_string s = if String.length s > 8 then failwith "Seqdb_containers.Superblock.int64_of_string"; let t = String.make 8 ' ' in String.blit s 0 t 0 (String.length s); read_int64 t let strip_spaces s = let rec strip n = if n > 0 then ( if s.[n-1] = ' ' then strip (n-1) else String.sub s 0 n ) else "" in strip (String.length s) let create_superblock ~sbsize ~format ~purpose () = if sbsize < 64 || sbsize > 65536 then failwith "Seqdb_containers.Superblock.create_superblock: SBSIZE outside valid range"; let vars = Hashtbl.create 50 in Hashtbl.add vars sbsize_name (Int64.of_int sbsize); Hashtbl.add vars format_name format; if String.length purpose > 8 then failwith "Seqdb_containers.Superblock.create_superblock: PURPOSE too long"; Hashtbl.add vars purpose_name (int64_of_string purpose); { vars = vars; sbsize = sbsize } let read_superblock_from_rw rw = rw#seek 0L; let magic = input_string rw 8 in if magic <> winkme then failwith "Seqdb_containers.Superblock.read_superblock: No superblock found"; let sbsize_n = input_string rw 8 in if strip_spaces sbsize_n <> sbsize_name then failwith "Seqdb_containers.Superblock.read_superblock: No superblock found"; let sbsize_s = input_string rw 8 in let sbsize = read_int64 sbsize_s in if sbsize < 64L || sbsize > 65536L then failwith "Seqdb_containers.Superblock.read_superblock: SBSIZE outside valid range"; let rw = new sub_rd_wr sbsize rw in let vars = Hashtbl.create 50 in rw # seek 8L; ( try let name = ref (input_string rw 8) in while !name <> "\000\000\000\000\000\000\000\000" do let value = read_int64 (input_string rw 8) in Hashtbl.replace vars (strip_spaces !name) value; name := input_string rw 8 done with | End_of_file -> () ); { vars = vars; sbsize = Int64.to_int sbsize } let read_superblock fd = let rw = new buf_rd_wr ~buffer_size:4096 fd in read_superblock_from_rw rw let write_superblock_to_rw rw sb = let sbsize = try Hashtbl.find sb.vars sbsize_name with Not_found -> assert false in let special_varnames = [ sbsize_name; format_name; purpose_name ] in let varnames = special_varnames @ (Hashtbl.fold (fun name _ acc -> if List.mem name special_varnames then acc else name :: acc) sb.vars []) in if Int64.of_int(List.length varnames * 16 + 16) > sbsize then failwith "Seqdb_containers.Superblock.write_superblock: superblock too large"; let rw = new sub_rd_wr sbsize rw in rw # seek 0L; output_string rw winkme; List.iter (fun varname -> let var64 = int64_of_string varname in output_string rw (Rtypes.int8_as_string (Rtypes.int8_of_int64 var64)); let value = try Hashtbl.find sb.vars varname with Not_found -> assert false in output_string rw (Rtypes.int8_as_string (Rtypes.int8_of_int64 value)); ) varnames; if rw#pos < sbsize then output_string rw (String.make (Int64.to_int (Int64.sub sbsize rw#pos)) '\000'); rw#flush() let write_superblock fd sb = let sbsize = try Hashtbl.find sb.vars sbsize_name with Not_found -> assert false in let rw = new buf_rd_wr ~buffer_size:~ (name,value) :: acc) sb.vars [] let set_variable sb name value = Hashtbl.replace sb.vars name value let filesize sb fd = let real_size = (Unix.LargeFile.fstat fd#file_descr).Unix.LargeFile.st_size in try let size = variable sb filesize_name in if size > real_size then failwith "Seqdb_containers.Superblock.filesize: FILESIZE is bigger than real file size"; size with | Not_found -> real_size let sbsize sb = sb.sbsize end module Hash_algo = struct type hash_algo = [ `MD5 ] let algos = [ Sb_consts.md5_algo, `MD5 ] let hash_algo_of_htalgo al = List.assoc al algos let htalgo_of_hash_algo al = fst (List.find (fun (_,al') -> al = al') algos) let md5_hash s = let dg = Digest.string s in let h = Rtypes.int64_of_int8 (Rtypes.read_int8 dg 8) in Int64.logand h 0x7fff_ffff_ffff_ffffL let other_md5_hash s = let dg = Digest.string s in let h = Rtypes.int64_of_int8 (Rtypes.read_int8 dg 0) in Int64.logand h 0x7fff_ffff_ffff_ffffL let hash = function | `MD5 -> md5_hash let other_hash = function | `MD5 -> other_md5_hash end module type POINTABLE = sig type t type entry type pointer val string_of_pointer : pointer -> string val pointer_of_string : string -> pointer val pointer_length : int val validate_pointer : t -> pointer -> bool val get_pointer : entry -> pointer val lookup : t -> pointer -> entry end module type HASHABLE = sig include POINTABLE val get_key : entry -> string val has_key : entry -> string -> bool val suggested_hash_algo : t -> Hash_algo.hash_algo option val free_mark : string val del_mark : string end module type KVSEQ = sig type t type entry type pointer type contents = { delflag : bool; key : string; value : string } type repr_class = [ `Int8 | `Int16 | `Int32 | `Int64 | `Fixed of int | `Lim8 of int ] val create : ?buffer_size:int -> ?chunk_size:int -> ?sbsize:int -> ?fileincr:int64 -> ?supports_deletions:bool -> ?keyrepr:repr_class -> ?valrepr:repr_class -> ?alignment:int -> ?have_statistics:bool -> ?suggested_hash_algo:Hash_algo.hash_algo -> ?purpose:string -> Seqdb_rdwr.file_descr -> t val access : ?buffer_size:int -> ?chunk_size:int -> ?conservative:bool -> Seqdb_rdwr.file_descr -> t val superblock : t -> Superblock.t val mark_superblock_as_dirty : t -> unit val rollback_flag : t -> bool val configure : ?flush_every:int -> ?auto_sync:int option -> ?auto_fadvise:bool -> ?onsync:(unit -> unit) -> t -> unit val get_pointer : entry -> pointer val get_contents : entry -> contents val get_key : entry -> string val has_key : entry -> string -> bool val get_value : entry -> string val get_value_length : entry -> int64 val get_total_length : entry -> int64 val get_delflag : entry -> bool val lookup : t -> pointer -> entry val validate_pointer : t -> pointer -> bool val add : t -> contents -> entry val replace : entry -> contents -> unit val rename : entry -> string -> unit val delete : entry -> unit val blit_to_string : entry -> int64 -> string -> int -> int -> unit val blit_from_string : string -> int -> entry -> int64 -> int -> unit val flush : t -> unit val sync : t -> unit val first_entry : t -> entry val next_entry : entry -> entry val recover_entry : Pcre.regexp -> t -> pointer option -> entry val string_of_pointer : pointer -> string val pointer_of_string : string -> pointer val int64_of_pointer : pointer -> int64 val pointer_length : int val keyrepr : t -> repr_class val valrepr : t -> repr_class val supports_deletions : t -> bool val alignment : t -> int val have_statistics : t -> bool val suggested_hash_algo : t -> Hash_algo.hash_algo option val num_entries : t -> int64 val num_active_entries : t -> int64 val free_mark : string val del_mark : string val fadvise_wontneed : t -> unit val fadvise_iterating : t -> unit val fadvise_willneed : t -> int64 -> int64 -> unit end let zero_block = String.make 1024 '\000' module Kvseq = struct open Sb_consts type repr_class = [ `Int8 | `Int16 | `Int32 | `Int64 | `Fixed of int | `Lim8 of int ] type t = { fd : file_descr; rw_phys : reader_writer; rw : reader_writer; sb : Superblock.t; keyrepr : repr_class; valrepr : repr_class; supports_deletions : bool; alignment : int; have_stats : bool; mutable dirty_sb_cnt : int; mutable dirty_sb_max : int; mutable auto_sync : int option; mutable auto_fadvise : bool; mutable onsync : unit -> unit; mutable iter_fadv_until : int64 option; mutable rollback : bool; } type entry = { e_kv : t; e_pointer : int64; mutable e_delflag : bool option; mutable e_keylen : int64 option; mutable e_key : string option; mutable e_vallen : int64 option; mutable e_val : string option; mutable e_pointer' : int64 option; (* end of entry *) } type pointer = int64 type contents = { delflag : bool; key : string; value : string } let sb_of_bool b = if b then true_bool else false_bool let bool_of_sb x = x = true_bool let sb_of_repr = function | `Int8 -> int8_repr | `Int16 -> int16_repr | `Int32 -> int32_repr | `Int64 -> int64_repr | `Fixed n -> if n < 0 || n > 255 then failwith "sb_of_repr"; Int64.add fixed_repr_min (Int64.of_int n) | `Lim8 n -> if n < 1 || n > 255 then failwith "sb_of_repr"; Int64.add lim8_repr_min (Int64.of_int (n-1)) let repr_of_sb n = if n = int8_repr then `Int8 else if n = int16_repr then `Int16 else if n = int32_repr then `Int32 else if n = int64_repr then `Int64 else if n >= fixed_repr_min && n <= fixed_repr_max then `Fixed (Int64.to_int (n -@ fixed_repr_min)) else if n > lim8_repr_min && n <= lim8_repr_max then `Lim8 (Int64.to_int (n -@ lim8_repr_min) + 1) else failwith "repr_of_sb: invalid value" let lenlen_of_repr repr = match repr with | `Int8 -> 1 | `Int16 -> 2 | `Int32 -> 4 | `Int64 -> 8 | `Fixed _ -> 0 | `Lim8 _ -> 1 let create ?buffer_size ?chunk_size ?(sbsize = 512) ?(fileincr = Int64.of_int(4 * 1024 * 1024)) ?(supports_deletions = true) ?(keyrepr = `Int64) ?(valrepr = `Int64) ?(alignment = 0) ?(have_statistics = true) ?(suggested_hash_algo = `MD5) ?(purpose = "") fd = let sb = Superblock.create_superblock ~sbsize ~format:kvseq_format ~purpose () in Superblock.set_variable sb filesize_name (Int64.of_int sbsize); Superblock.set_variable sb fileincr_name fileincr; Superblock.set_variable sb kvdelfl_name (sb_of_bool supports_deletions); Superblock.set_variable sb keyrepr_name (sb_of_repr keyrepr); Superblock.set_variable sb valrepr_name (sb_of_repr valrepr); Superblock.set_variable sb align_name (Int64.of_int alignment); if have_statistics then ( Superblock.set_variable sb entries_name 0L; Superblock.set_variable sb aentries_name 0L; ); Superblock.set_variable sb htalgo_name (Hash_algo.htalgo_of_hash_algo suggested_hash_algo); let rw_phys = new buf_rd_wr ?buffer_size ?chunk_size fd in Superblock.write_superblock_to_rw rw_phys sb; rw_phys # seek (Int64.of_int sbsize); for k = 1 to ~<< (fileincr /@ 1024L) do rw_phys # really_output zero_block 0 1024 done; rw_phys # really_output zero_block 0 (~<< (Int64.rem fileincr 1024L)); rw_phys # flush(); Linux.fdatasync fd#file_descr; let rw = new sub_rd_wr ~incr:fileincr (Int64.of_int sbsize) rw_phys in { rw_phys = rw_phys; rw = rw; fd = fd; sb = sb; keyrepr = keyrepr; valrepr = valrepr; supports_deletions = supports_deletions; alignment = alignment; have_stats = have_statistics; dirty_sb_cnt = 0; dirty_sb_max = 1; auto_sync = Some 0; auto_fadvise = false; onsync = (fun () -> ()); iter_fadv_until = None; rollback = false; } let access ?buffer_size ?chunk_size ?(conservative=false) fd = let rw_phys = new buf_rd_wr ?buffer_size ?chunk_size fd in let sb = Superblock.read_superblock_from_rw rw_phys in let real_size = (Unix.LargeFile.fstat fd#file_descr).Unix.LargeFile.st_size in let fsize = (* We cannot use [Superblock.filesize] because we don't want to check whether FILESIZE is bigger than the file is long _here_. We want to do this later, when SYNCSIZE is taken into account. *) try Superblock.variable sb filesize_name with Not_found -> real_size in let keyrepr = try repr_of_sb (Superblock.variable sb keyrepr_name) with | Not_found -> failwith "Seqdb_containers.Kvseq.access: No KEYREPR in superblock" | _ -> failwith "Seqdb_containers.Kvseq.access: Bad KEYREPR in superblock" in let valrepr = try repr_of_sb (Superblock.variable sb valrepr_name) with | Not_found -> failwith "Seqdb_containers.Kvseq.access: No VALREPR in superblock" | _ -> failwith "Seqdb_containers.Kvseq.access: Bad VALREPR in superblock" in let supports_deletions = try bool_of_sb (Superblock.variable sb kvdelfl_name) with | Not_found -> false | _ -> failwith "Seqdb_containers.Kvseq.access: Bad KVDELFL in superblock" in let alignment = try Int64.to_int (Superblock.variable sb align_name) with | Not_found -> 0 | _ -> failwith "Seqdb_containers.Kvseq.access: Bad ALIGN in superblock" in let have_stats = try ignore(Superblock.variable sb entries_name); ignore(Superblock.variable sb aentries_name); true with | Not_found -> false in let fileincr = try Superblock.variable sb fileincr_name with | Not_found -> 0L in let syncsize = try Superblock.variable sb syncsize_name with | Not_found -> fsize in let effsize, rollback = if conservative && syncsize <> fsize then ( Log.logf `Alert "Seqdb_containers.Kvseq.access: \ SYNCSIZE <> FILESIZE, using SYNCSIZE=%Ld to open file" syncsize; syncsize, true ) else fsize, false in if effsize > real_size then failwith "Seqdb_containers.Kvseq.access: FILESIZE/SYNCSIZE is bigger than real file size"; let rw = new sub_rd_wr ~incr:fileincr effsize rw_phys in { rw_phys = rw_phys; rw = rw; fd = fd; sb = sb; keyrepr = keyrepr; valrepr = valrepr; supports_deletions = supports_deletions; alignment = alignment; have_stats = have_stats; dirty_sb_cnt = 0; dirty_sb_max = 1; (* write sb after every add *) auto_sync = Some 0; auto_fadvise = false; onsync = (fun () -> ()); iter_fadv_until = None; rollback = rollback; } let configure ?flush_every ?auto_sync ?auto_fadvise ?onsync kv = ( match flush_every with | None -> () | Some n -> kv.dirty_sb_max <- n ); ( match auto_sync with | None -> () | Some sync_opt -> kv.auto_sync <- sync_opt ); ( match auto_fadvise with | None -> () | Some b -> kv.auto_fadvise <- b ); ( match onsync with | None -> () | Some f -> kv.onsync <- f ) let superblock kv = kv.sb let get_pointer e = e.e_pointer let get_delflag e = match e.e_delflag with | Some b -> b | None -> assert(e.e_kv.supports_deletions); e.e_kv.rw # seek e.e_pointer; let s = input_string e.e_kv.rw 1 in let b = match s.[0] with | '\000' -> false | '\001' -> true | _ -> failwith "Seqdb_containers: File format violation" in e.e_delflag <- Some b; b let get_delflaglen e = if e.e_kv.supports_deletions then 1L else 0L let get_keylenlen e = lenlen_of_repr e.e_kv.keyrepr let get_keylen e = match e.e_keylen with | Some l -> l | None -> let delflaglen = get_delflaglen e in e.e_kv.rw # seek (e.e_pointer +@ delflaglen); let n = get_keylenlen e in assert(n > 0); (* `Fixed is not handled here *) let s = input_string e.e_kv.rw n in let u = String.make 8 '\000' in String.blit s 0 u (8 - n) n; let l = read_int64 u in e.e_keylen <- Some l; l let get_keyspc e = match e.e_kv.keyrepr with | `Lim8 n -> ~> n | _ -> get_keylen e let get_key e = match e.e_key with | Some s -> s | None -> (* Get the delflag if it is there first. This reduces the number of reads if we get the key first and then the delflag *) if e.e_kv.supports_deletions then ignore(get_delflag e); let delflaglen = get_delflaglen e in let keylenlen = get_keylenlen e in let keylen = get_keylen e in let p = e.e_pointer +@ delflaglen +@ (~> keylenlen) in e.e_kv.rw # seek p; let l = ~<< keylen in if l >= 4096 then Linux.fadvise e.e_kv.fd#file_descr p keylen Linux.FADV_WILLNEED; let s = input_string e.e_kv.rw l in e.e_key <- Some s; s let has_key e k = get_key e = k let get_vallenlen e = lenlen_of_repr e.e_kv.valrepr let get_vallen e = match e.e_vallen with | Some l -> l | None -> let delflaglen = get_delflaglen e in let keylenlen = get_keylenlen e in let keyspc = get_keyspc e in let vallenlen = get_vallenlen e in assert(vallenlen > 0); (* `Fixed is not handled here *) let p' = e.e_pointer +@ delflaglen +@ (~> keylenlen) +@ keyspc in e.e_kv.rw # seek p'; let s = input_string e.e_kv.rw vallenlen in let u = String.make 8 '\000' in String.blit s 0 u (8 - vallenlen) vallenlen; let l = read_int64 u in e.e_vallen <- Some l; e.e_pointer' <- Some (p' +@ (~> vallenlen) +@ l); l let get_valspc e = match e.e_kv.valrepr with | `Lim8 n -> ~> n | _ -> get_vallen e let get_value_length = get_vallen let get_total_length e = let delflaglen = get_delflaglen e in let keylenlen = get_keylenlen e in let keyspc = get_keyspc e in let vallenlen = get_vallenlen e in let valspc = get_valspc e in delflaglen +@ (~> keylenlen) +@ keyspc +@ (~> vallenlen) +@ valspc let rec get_pointer' e = match e.e_pointer' with | Some p -> p | None -> let delflaglen = get_delflaglen e in let keylenlen = get_keylenlen e in let keyspc = get_keyspc e in let vallenlen = get_vallenlen e in let valspc = get_valspc e in let p = e.e_pointer +@ delflaglen +@ (~> keylenlen) +@ keyspc +@ (~> vallenlen) +@ valspc in e.e_pointer' <- Some p; p let get_value e = match e.e_val with | Some s -> s | None -> let delflaglen = get_delflaglen e in let keylenlen = get_keylenlen e in let keyspc = get_keyspc e in let vallenlen = get_vallenlen e in let vallen = get_vallen e in let p = e.e_pointer +@ delflaglen +@ (~> keylenlen) +@ keyspc +@ (~> vallenlen) in e.e_kv.rw # seek p; let l = ~<< vallen in if l >= 4096 then Linux.fadvise e.e_kv.fd#file_descr p vallen Linux.FADV_WILLNEED; let s = input_string e.e_kv.rw l in e.e_val <- Some s; s let blit_to_string e e_pos s s_pos len = match e.e_val with | Some v -> String.blit v (~<< e_pos) s s_pos len | None -> if e_pos < 0L || s_pos < 0 || s_pos + len > String.length s then invalid_arg "blit_to_string"; let delflaglen = get_delflaglen e in let keylenlen = get_keylenlen e in let keyspc = get_keyspc e in let vallenlen = get_vallenlen e in let vallen = get_vallen e in if e_pos > vallen || e_pos > vallen -@ (~> len) then invalid_arg "blit_to_string"; let p = e.e_pointer +@ delflaglen +@ (~> keylenlen) +@ keyspc +@ (~> vallenlen) +@ e_pos in e.e_kv.rw # seek p; e.e_kv.rw # really_input s s_pos len let get_contents e = let key = get_key e in (* Ensure we read the key before the value *) { delflag = get_delflag e; key = key; value = get_value e } let validate_pointer kv p = p >= ~> (Superblock.sbsize kv.sb) && p < kv.rw # eof && (kv.alignment = 0 || Int64.rem p (~> (kv.alignment)) = 0L ) let lookup kv p = if not (validate_pointer kv p) then failwith "Seqdb_containers.Kvseq.lookup: Bad pointer"; { e_kv = kv; e_pointer = p; e_delflag = if kv.supports_deletions then None else Some false; e_keylen = ( match kv.keyrepr with `Fixed n -> Some (~> n) | _ -> None ); e_key = None; e_vallen = ( match kv.valrepr with `Fixed n -> Some (~> n) | _ -> None ); e_val = None; e_pointer' = None; } let align_pointer kv p = (* Return the aligned pointer: the next bigger number that is a multiple of the alignment factor *) if kv.alignment > 0 then ( match Int64.rem p (~> (kv.alignment)) with | 0L -> (* Already aligned *) p | r -> p +@ (~> (kv.alignment) -@ r) ) else p (* no alignment *) let fadvise_wontneed kv = (* fadvise everything but the superblock and the last bytes: *) let buffer_size = kv.rw#chunk_size in let start_offset = max buffer_size (Superblock.sbsize kv.sb) in let len = max 1L (kv.rw#eof -@ (~> (2*start_offset))) in Linux.fadvise kv.fd#file_descr (~> start_offset) len Linux.FADV_DONTNEED; Linux.fadvise kv.fd#file_descr (kv.rw#eof +@ (~> (2*buffer_size))) 0L Linux.FADV_DONTNEED; kv.iter_fadv_until <- None let fadvise_iterating kv = kv.iter_fadv_until <- Some 0L let fadvise_willneed kv p size = Linux.fadvise kv.fd#file_descr p size Linux.FADV_WILLNEED let do_sync kv = Linux.fdatasync kv.fd#file_descr; if kv.auto_fadvise && kv.iter_fadv_until = None then fadvise_wontneed kv let flush kv = if kv.dirty_sb_cnt > 0 then ( Superblock.write_superblock_to_rw kv.rw kv.sb; ); kv.rw # flush(); kv.rw # dispose_descr(); kv.dirty_sb_cnt <- 0 let maybe_write_sb ?(force_sync = false) kv = let now = Int64.of_float(Unix.gettimeofday()) in let synctime = try Superblock.variable kv.sb synctime_name with Not_found -> 0L in let syncsize = try Superblock.variable kv.sb syncsize_name with | Not_found -> Superblock.filesize kv.sb kv.fd in let rollback_done = if kv.rollback then ( (* Set FILESIZE to real size *) Superblock.set_variable kv.sb filesize_name syncsize; kv.rollback <- false; true ) else false in let need_sync = force_sync || rollback_done || match kv.auto_sync with | None -> false | Some delta -> now >= synctime +@ (~> delta) in kv.dirty_sb_cnt <- kv.dirty_sb_cnt + 1; if kv.dirty_sb_cnt >= kv.dirty_sb_max || need_sync then ( kv.rw # flush(); if need_sync then ( kv.onsync(); do_sync kv; Superblock.set_variable kv.sb synctime_name now; Superblock.set_variable kv.sb syncsize_name kv.rw#eof; ); Superblock.write_superblock_to_rw kv.rw kv.sb; kv.rw # flush(); if need_sync then Linux.fdatasync kv.fd#file_descr; (* Sync superblock *) kv.rw # dispose_descr(); kv.dirty_sb_cnt <- 0; ) let mark_superblock_as_dirty kv = maybe_write_sb kv let sync kv = maybe_write_sb ~force_sync:true kv let rollback_flag kv = kv.rollback let check_repr repr s = match repr with | `Int8 -> String.length s <= 255 | `Int16 -> String.length s <= 65535 | `Int32 -> Int64.of_int (String.length s) <= 0xffff_ffffL | `Int64 -> true | `Fixed n -> String.length s = n | `Lim8 n -> String.length s <= n let check procname kv data = try if not kv.supports_deletions && data.delflag then failwith "This file does not support deleted entries"; if not (check_repr kv.keyrepr data.key) then failwith "Bad key length"; if not (check_repr kv.valrepr data.value) then failwith "Bad value length"; with | Failure msg -> failwith (procname ^ ": " ^ msg) let write_vallen kv repr vallen = let n = lenlen_of_repr repr in let len_s = write_int64 vallen in kv.rw # really_output len_s (8-n) n let s256 = String.make 256 '\000' let write_repr kv repr s = let n = lenlen_of_repr repr in let len_s = write_int64 (Int64.of_int (String.length s)) in kv.rw # really_output len_s (8-n) n; output_string kv.rw s; match repr with | `Lim8 l -> kv.rw # really_output s256 0 (l - String.length s) (* padding *) | _ -> () let write kv data = if kv.supports_deletions then ( let s = if data.delflag then "\001" else "\000" in output_string kv.rw s ); write_repr kv kv.keyrepr data.key; write_repr kv kv.valrepr data.value let add kv data = check "File_containers.Kvseq.add" kv data; let p = align_pointer kv (kv.rw # eof) in kv.rw # seek p; write kv data; Superblock.set_variable kv.sb filesize_name kv.rw#eof; kv.rollback <- false; ( try Superblock.set_variable kv.sb entries_name (Int64.succ (Superblock.variable kv.sb entries_name)); if not data.delflag then Superblock.set_variable kv.sb aentries_name (Int64.succ (Superblock.variable kv.sb aentries_name)); with | Not_found -> () ); maybe_write_sb kv; { e_kv = kv; e_pointer = p; e_delflag = Some data.delflag; e_keylen = Some (~> (String.length data.key)); e_key = Some data.key; e_vallen = Some (~> (String.length data.value)); e_val = Some data.value; e_pointer' = None; } let repr_spc repr s = match repr with | `Int8 | `Int16 | `Int32 | `Int64 | `Fixed _ -> String.length s | `Lim8 n -> n let replace e data = let kv = e.e_kv in check "Seqdb_containers.Kvseq.replace" kv data; let e_del = get_delflag e in let p' = get_pointer' e in if p' <> kv.rw#eof then ( (* Not replacing at the end, so the size of [data] must exactly match the size of the previous entry *) let keyspc = get_keyspc e in let valspc = get_valspc e in if ~> (repr_spc kv.keyrepr data.key) <> keyspc then failwith "Seqdb_containers.Kvseq.replace: New key must have the same length as the old key"; if ~> (repr_spc kv.valrepr data.value) <> valspc then failwith "Seqdb_containers.Kvseq.replace: New value must have the same length as the old value"; ); let p = get_pointer e in kv.rw # seek p; write kv data; e.e_delflag <- Some data.delflag; e.e_key <- Some data.key; e.e_val <- Some data.value; Superblock.set_variable kv.sb filesize_name kv.rw#eof; kv.rollback <- false; if e_del <> data.delflag then ( try let k = if data.delflag then -1L else 1L in Superblock.set_variable kv.sb aentries_name (k +@ (Superblock.variable kv.sb aentries_name)) with | Not_found -> () ); maybe_write_sb kv let rename e new_key = let kv = e.e_kv in if not (check_repr kv.keyrepr new_key) then failwith "Seqdb_containers.Kvseq.rename: Bad key length"; let delflaglen = get_delflaglen e in let keyspc = get_keyspc e in if ~> (repr_spc kv.keyrepr new_key) <> keyspc then failwith "Seqdb_containers.Kvseq.rename: New key must have the same length as the old key"; let p = get_pointer e in kv.rw # seek (p +@ delflaglen); write_repr kv kv.keyrepr new_key; e.e_key <- Some new_key; maybe_write_sb kv let blit_from_string s s_pos e e_pos len = if e_pos < 0L || s_pos < 0 || s_pos + len > String.length s then invalid_arg "blit_from_string"; let delflaglen = get_delflaglen e in let keylenlen = get_keylenlen e in let keyspc = get_keyspc e in let vallenlen = get_vallenlen e in let old_vallen = get_vallen e in if e_pos > old_vallen then invalid_arg "blit_from_string"; let p' = get_pointer' e in let at_end = (p' = e.e_kv.rw#eof) in if not at_end then ( (* Not replacing at the end, so we cannot extend the length of the value *) if e_pos > old_vallen -@ (~> len) then invalid_arg "blit_from_string"; ); (* Maybe we have to update vallen *) let vallen = max old_vallen (e_pos +@ ~> len) in if old_vallen <> vallen then ( let p_vallen = e.e_pointer +@ delflaglen +@ (~> keylenlen) +@ keyspc in e.e_kv.rw # seek p_vallen; write_vallen e.e_kv e.e_kv.valrepr vallen ); (* Update data *) let p = e.e_pointer +@ delflaglen +@ (~> keylenlen) +@ keyspc +@ (~> vallenlen) +@ e_pos in e.e_kv.rw # seek p; e.e_kv.rw # really_output s s_pos len; e.e_val <- None; (* can be improved... *) e.e_vallen <- Some vallen; Superblock.set_variable e.e_kv.sb filesize_name e.e_kv.rw#eof; e.e_kv.rollback <- false; maybe_write_sb e.e_kv let delete e = let kv = e.e_kv in if not kv.supports_deletions then failwith "Seqdb_containers.Kvseq.delete: This file does not support deleted entries"; let e_del = get_delflag e in if not e_del then ( let p = get_pointer e in kv.rw # seek p; output_string kv.rw "\001"; e.e_delflag <- Some true; try Superblock.set_variable kv.sb aentries_name (Int64.pred (Superblock.variable kv.sb aentries_name)); maybe_write_sb kv with | Not_found -> () ) let five_mb = ~> (5 * 1024 * 1024) let ten_mb = ~> (10 * 1024 * 1024) let maybe_fadvise_iter kv p = match kv.iter_fadv_until with | None -> () | Some q -> if p >= q then ( (* advise to forget everything before p *) let buffer_size = kv.rw#chunk_size in let start_offset = max buffer_size (Superblock.sbsize kv.sb) in if p > ~>start_offset then Linux.fadvise kv.fd#file_descr (~> start_offset) (p -@ (~> start_offset)) Linux.FADV_DONTNEED; (* advise to read the next 10 MB of data *) Linux.fadvise kv.fd#file_descr p ten_mb Linux.FADV_WILLNEED; kv.iter_fadv_until <- Some (p +@ five_mb) ) let first_pointer kv = align_pointer kv (~> (Superblock.sbsize kv.sb)) let first_entry kv = let p = first_pointer kv in if p >= kv.rw # eof then ( kv.iter_fadv_until <- None; raise End_of_file; ); maybe_fadvise_iter kv p; lookup kv p let next_entry e = let p = align_pointer e.e_kv (get_pointer' e) in if p >= e.e_kv.rw # eof then ( e.e_kv.iter_fadv_until <- None; raise End_of_file; ); maybe_fadvise_iter e.e_kv p; lookup e.e_kv p let rec recover_entry regexp kv p0_opt = (* search the next entry e at a position >= p0 *) let p0 = match p0_opt with | None -> first_pointer kv | Some p0 -> p0 in let p = ref (Int64.succ p0) in while not (validate_pointer kv !p) do if !p >= kv.rw # eof then raise End_of_file; p := Int64.succ !p done; let e = lookup kv !p in (* never fails if pointer is validated *) try let (_:bool) = get_delflag e in (* may fail *) let keylen = get_keylen e in (* may fail (read beyond eof) *) if (keylen < 0L || !p +@ get_delflaglen e +@ ~>(get_keylenlen e) +@ keylen > kv.rw # eof) then raise Not_found; let k = get_key e in if not(Pcre.pmatch ~rex:regexp k) then raise Not_found; let vallen = get_vallen e in (* may fail (read beyond eof) *) if (vallen < 0L || !p +@ get_delflaglen e +@ ~>(get_keylenlen e) +@ keylen +@ ~>(get_vallenlen e) +@ vallen > kv.rw # eof) then raise Not_found; (* This entry looks good *) e with | _ -> recover_entry regexp kv (Some !p) let int64_of_pointer p = p let pointer_of_int64 p = p let string_of_pointer p = write_int64 p let pointer_of_string s = read_int64 s let pointer_length = 8 let keyrepr kv = kv.keyrepr let valrepr kv = kv.valrepr let supports_deletions kv = kv.supports_deletions let alignment kv = kv.alignment let have_statistics kv = kv.have_stats let suggested_hash_algo kv = try Some (Hash_algo.hash_algo_of_htalgo (Superblock.variable kv.sb htalgo_name)) with | Not_found -> None let num_entries kv = Superblock.variable kv.sb entries_name let num_active_entries kv = Superblock.variable kv.sb aentries_name let free_mark = "\000\000\000\000\000\000\000\000" (* 0L *) let del_mark = "\255\255\255\255\255\255\255\255" (* -1L *) end module Hpointer (C:HASHABLE) = struct type t = C.t type pointer = string type entry = { hp_pnt : pointer; hp_cnt : t option; mutable hp_entry : C.entry option } let pointer_length = C.pointer_length + 8 let string_of_pointer p = p let pointer_of_string s = if String.length s <> pointer_length then invalid_arg "Hpointer.pointer_of_string"; s let get_pointer e = e.hp_pnt let downgrade_pointer p = C.pointer_of_string (String.sub p 0 C.pointer_length) let validate_pointer c p = C.validate_pointer c (downgrade_pointer p) let lookup c p = { hp_pnt = p; hp_cnt = Some c; hp_entry = None } let downgrade e = match e.hp_entry with | None -> let c = match e.hp_cnt with | None -> assert false | Some c -> c in let e' = C.lookup c (downgrade_pointer e.hp_pnt) in e.hp_entry <- Some e'; e' | Some e' -> e' let md5_algo = Hash_algo.other_hash `MD5 let upgrade e' = let p' = C.string_of_pointer (C.get_pointer e') in let k = C.get_key e' in let h = write_int64 (md5_algo k) in { hp_pnt = p' ^ h; hp_cnt = None; hp_entry = Some e' } let get_key e = C.get_key(downgrade e) let has_key e k = let h = write_int64 (md5_algo k) in let h' = String.sub e.hp_pnt C.pointer_length 8 in h = h' let suggested_hash_algo c = C.suggested_hash_algo c let free_mark = C.free_mark ^ "\000\000\000\000\000\000\000\000" let del_mark = C.del_mark ^ "\000\000\000\000\000\000\000\000" end module Kvseq_hp = struct module K = Hpointer(Kvseq) type t = Kvseq.t type entry = K.entry type pointer = K.pointer type contents = Kvseq.contents = { delflag : bool; key : string; value : string } type repr_class = Kvseq.repr_class let create = Kvseq.create let access = Kvseq.access let superblock = Kvseq.superblock let mark_superblock_as_dirty = Kvseq.mark_superblock_as_dirty let rollback_flag = Kvseq.rollback_flag let configure = Kvseq.configure let get_pointer = K.get_pointer let get_contents e = Kvseq.get_contents (K.downgrade e) let get_key = K.get_key let has_key = K.has_key let get_value e = Kvseq.get_value (K.downgrade e) let get_value_length e = Kvseq.get_value_length (K.downgrade e) let get_total_length e = Kvseq.get_total_length (K.downgrade e) let get_delflag e = Kvseq.get_delflag (K.downgrade e) let lookup = K.lookup let validate_pointer = K.validate_pointer let add kv c = K.upgrade (Kvseq.add kv c) let replace e c = Kvseq.replace (K.downgrade e) c let rename e n = Kvseq.rename (K.downgrade e) n let delete e = Kvseq.delete (K.downgrade e) let blit_to_string e = Kvseq.blit_to_string (K.downgrade e) let blit_from_string s spos e = Kvseq.blit_from_string s spos (K.downgrade e) let flush = Kvseq.flush let sync = Kvseq.sync let first_entry kv = K.upgrade (Kvseq.first_entry kv) let next_entry e = K.upgrade (Kvseq.next_entry (K.downgrade e)) let recover_entry re kv popt = K.upgrade (Kvseq.recover_entry re kv (match popt with | None -> None | Some p -> Some(K.downgrade_pointer p))) let string_of_pointer = K.string_of_pointer let pointer_of_string = K.pointer_of_string let int64_of_pointer p = Kvseq.int64_of_pointer (K.downgrade_pointer p) let pointer_length = K.pointer_length let keyrepr = Kvseq.keyrepr let valrepr = Kvseq.valrepr let supports_deletions = Kvseq.supports_deletions let alignment = Kvseq.alignment let have_statistics = Kvseq.have_statistics let suggested_hash_algo = K.suggested_hash_algo let num_entries = Kvseq.num_entries let num_active_entries = Kvseq.num_active_entries let free_mark = K.free_mark let del_mark = K.del_mark let fadvise_wontneed = Kvseq.fadvise_wontneed let fadvise_iterating = Kvseq.fadvise_iterating let fadvise_willneed = Kvseq.fadvise_willneed end module type HINDEX = sig type t type entry module Container : HASHABLE type contents = Container.entry val create : ?buffer_size:int -> ?chunk_size:int -> ?sbsize:int -> ?htsize:int64 -> ?cellsz:int -> ?have_statistics:bool -> ?hash_algo:Hash_algo.hash_algo -> ?purpose:string -> Container.t -> Seqdb_rdwr.file_descr -> t val access : ?fully_buffered_index:bool -> ?buffer_size:int -> ?chunk_size:int -> ?superblock:Superblock.t -> Container.t -> Seqdb_rdwr.file_descr -> t val superblock : t -> Superblock.t val mark_superblock_as_dirty : t -> unit val configure : ?flush_every:int -> ?auto_fadvise:bool -> ?random_fadvise:bool -> t -> unit val get_pointer : entry -> Container.pointer val get_contents : entry -> contents val lookup : t -> string -> entry val pointer_hint : t -> string -> Container.pointer val index_hint : t -> string -> int64 val add : t -> Container.entry -> entry val replace : entry -> Container.entry -> unit val delete : entry -> unit val first_entry : t -> entry val next_entry : entry -> entry val flush : t -> unit val sync : t -> unit val have_statistics : t -> bool val hash_algo : t -> Hash_algo.hash_algo val htsize : t -> int64 val num_entries : t -> int64 val num_active_entries : t -> int64 val fadvise_wontneed : t -> unit end module Hindex (C : HASHABLE) = struct open Sb_consts module Container = C type t = { fd : file_descr; rw : reader_writer; sb : Superblock.t; have_stats : bool; htsize : int64; cellsz : int; htalgo : Hash_algo.hash_algo; mutable dirty_sb_cnt : int; mutable dirty_sb_max : int; mutable auto_fadvise : bool; cont : Container.t; } type entry = { e_hi : t; e_index : int64; mutable e_cell : string; mutable e_cell_contents : Container.entry option; } type contents = Container.entry (* type pointer = string *) let create ?buffer_size ?chunk_size ?(sbsize = 512) ?(htsize = 1024L *@ 1024L) ?(cellsz = Container.pointer_length / 8) ?(have_statistics = true) ?hash_algo ?(purpose = "") cont fd = let hash_algo = match hash_algo with | Some a -> a | None -> ( match Container.suggested_hash_algo cont with | Some a -> a | None -> `MD5 ) in let sb = Superblock.create_superblock ~sbsize ~format:hindex_format ~purpose () in if Container.pointer_length <> cellsz * 8 || cellsz < 1 then invalid_arg "Seqdb_containers.Hindex.create: cellsz does not match pointer_length"; let cellsz64 = Int64.of_int cellsz in Superblock.set_variable sb cellsz_name cellsz64; if have_statistics then ( Superblock.set_variable sb entries_name 0L; Superblock.set_variable sb aentries_name 0L; ); Superblock.set_variable sb htalgo_name (Hash_algo.htalgo_of_hash_algo hash_algo); Superblock.set_variable sb htsize_name htsize; let filesize = htsize *@ 8L *@ cellsz64 +@ ~> sbsize in Superblock.set_variable sb filesize_name filesize; let empty_block = String.create (1024 * cellsz) in (* block of 128 cells *) let empty_cell = Container.free_mark in for k = 0 to 127 do String.blit empty_cell 0 empty_block (8*cellsz*k) (8*cellsz); done; let rw = new sub_rd_wr filesize (new buf_rd_wr ?buffer_size ?chunk_size fd) in Superblock.write_superblock_to_rw rw sb; rw # seek (~> sbsize); for k = 1 to ~<< (htsize /@ 128L) do rw # really_output empty_block 0 (1024 * cellsz); if k mod 1024 = 0 then ( (* Do not pollute the page cache when creating the hash table: *) rw # flush(); Linux.fdatasync fd#file_descr; let buffer_size = rw#chunk_size in let start_offset = max buffer_size sbsize in Linux.fadvise fd#file_descr (~>start_offset) 0L Linux.FADV_DONTNEED ) done; rw # really_output empty_block 0 (~<< ((Int64.rem htsize 128L) *@ 8L *@ cellsz64)); rw # flush(); Linux.fdatasync fd#file_descr; let buffer_size = rw#chunk_size in let start_offset = max buffer_size sbsize in Linux.fadvise fd#file_descr (~>start_offset) 0L Linux.FADV_DONTNEED; { fd = fd; rw = rw; sb = sb; have_stats = have_statistics; htsize = htsize; htalgo = hash_algo; cellsz = cellsz; dirty_sb_cnt = 0; dirty_sb_max = 1; auto_fadvise = false; cont = cont; } let access ?(fully_buffered_index=false) ?buffer_size ?chunk_size ?superblock cont fd = let buffer_size, chunk_size = if fully_buffered_index then (Some max_int (* closest to infinity *), Some(1024*1024)) else (buffer_size, chunk_size) in let rw_phys = new buf_rd_wr ?buffer_size ?chunk_size fd in let sb = match superblock with | Some sb -> sb | None -> Superblock.read_superblock_from_rw rw_phys in let sbsize = Superblock.sbsize sb in let fsize = Superblock.filesize sb fd in let htsize = try Superblock.variable sb htsize_name with | Not_found -> failwith "Seqdb_containers.Hindex.access: No HTSIZE in superblock" in let cellsz64 = try Superblock.variable sb cellsz_name with | Not_found -> 1L in let cellsz = ~< cellsz64 in if cellsz * 8 <> Container.pointer_length then failwith "Seqdb_containers.Hindex.access: CELLSZ mismatch"; if fsize < htsize *@ 8L *@ cellsz64 +@ ~> sbsize then failwith "Seqdb_containers.Hindex.access: HTSIZE mismatch"; let htalgo = try Hash_algo.hash_algo_of_htalgo (Superblock.variable sb htalgo_name) with | Not_found -> failwith "Seqdb_containers.Hindex.access: No HTALGO in superblock" | _ -> failwith "Seqdb_containers.Hindex.access: Bad HTALGO in superblock" in let have_stats = try ignore(Superblock.variable sb entries_name); ignore(Superblock.variable sb aentries_name); true with | Not_found -> false in let rw = new sub_rd_wr fsize rw_phys in { fd = fd; rw = rw; sb = sb; have_stats = have_stats; htsize = htsize; htalgo = htalgo; cellsz = cellsz; dirty_sb_cnt = 0; dirty_sb_max = 1; auto_fadvise = false; cont = cont; } let configure ?flush_every ?auto_fadvise ?random_fadvise hi = ( match flush_every with | None -> () | Some n -> hi.dirty_sb_max <- n ); ( match auto_fadvise with | None -> () | Some b -> hi.auto_fadvise <- b ); ( match random_fadvise with | None -> () | Some b -> Linux.fadvise hi.fd#file_descr 0L 0L (if b then Linux.FADV_RANDOM else Linux.FADV_NORMAL) ) let superblock hi = hi.sb let get_pointer e = let c = e.e_cell in if c = Container.free_mark || c = Container.del_mark then raise Not_found; Container.pointer_of_string c let get_contents e = match e.e_cell_contents with | Some cc -> cc | None -> let cc = Container.lookup e.e_hi.cont (get_pointer e) in e.e_cell_contents <- Some cc; cc let iterate_to_next hi i = let sbsize = Superblock.sbsize hi.sb in let cellsz = hi.cellsz in let cellsz64 = ~> cellsz in let rec to_next i = if i >= hi.htsize then raise End_of_file; let p = (~> sbsize) +@ (i *@ 8L *@ cellsz64) in hi.rw # seek p; let c = input_string hi.rw (8*cellsz) in if c = Container.free_mark || c = Container.del_mark then to_next (Int64.succ i) else ( { e_hi = hi; e_index = i; e_cell = c; e_cell_contents = None } ) in to_next i let first_entry hi = iterate_to_next hi 0L let next_entry e = iterate_to_next e.e_hi (Int64.succ e.e_index) let fadvise_wontneed hi = (* fadvise everything but the superblock: *) let buffer_size = hi.rw#chunk_size in let start_offset = max buffer_size (Superblock.sbsize hi.sb) in Linux.fadvise hi.fd#file_descr (~> start_offset) 0L Linux.FADV_DONTNEED let do_sync hi = Linux.fdatasync hi.fd#file_descr; if hi.auto_fadvise then fadvise_wontneed hi let flush hi = if hi.dirty_sb_cnt > 0 then ( Superblock.write_superblock_to_rw hi.rw hi.sb; ); hi.rw # flush(); hi.rw # dispose_descr(); hi.dirty_sb_cnt <- 0 let sync hi = flush hi; do_sync hi let maybe_write_sb hi = hi.dirty_sb_cnt <- hi.dirty_sb_cnt + 1; if hi.dirty_sb_cnt >= hi.dirty_sb_max then ( hi.rw # flush(); Superblock.write_superblock_to_rw hi.rw hi.sb; hi.rw # flush(); hi.rw # dispose_descr(); hi.dirty_sb_cnt <- 0; ) let mark_superblock_as_dirty = maybe_write_sb let rec probe check hi i_start i = let sbsize = Superblock.sbsize hi.sb in let cellsz = hi.cellsz in let cellsz64 = ~> cellsz in let p = (~> sbsize) +@ (i *@ 8L *@ cellsz64) in hi.rw # seek p; let c = input_string hi.rw (8*cellsz) in if c = Container.free_mark then `Free i else if c = Container.del_mark then probe_next check hi i_start i else match check i c with | Some r -> `Found r | None -> probe_next check hi i_start i and probe_next check hi i_start i = let i' = Int64.rem (i +@ 1L) hi.htsize in if i' = i_start then `Full else probe check hi i_start i' let lookup hi key = let h = Hash_algo.hash hi.htalgo key in assert(h >= 0L); let i = Int64.rem h hi.htsize in let check i c = let e = { e_hi = hi; e_index = i; e_cell = c; e_cell_contents = None } in if Container.has_key (get_contents e) key then Some e else None in match probe check hi i i with | `Free _ -> raise Not_found | `Full -> raise Not_found | `Found e -> e let index_hint hi key = let h = Hash_algo.hash hi.htalgo key in Int64.rem h hi.htsize let pointer_hint hi key : Container.pointer = let cellsz = hi.cellsz in let cellsz64 = ~> cellsz in let h = Hash_algo.hash hi.htalgo key in assert(h >= 0L); let i = Int64.rem h hi.htsize in let sbsize = Superblock.sbsize hi.sb in let p = (~> sbsize) +@ (i *@ 8L *@ cellsz64) in hi.rw # seek p; let s = input_string hi.rw (8*cellsz) in if s = Container.free_mark || s = Container.del_mark then raise Not_found else Container.pointer_of_string s let fill_cell hi i cc = let sbsize = Superblock.sbsize hi.sb in let cellsz = hi.cellsz in let cellsz64 = ~> cellsz in let p = (~> sbsize) +@ (i *@ 8L *@ cellsz64) in hi.rw # seek p; let s = Container.string_of_pointer (Container.get_pointer cc) in assert(String.length s = 8*cellsz); output_string hi.rw s; s let add hi cc = let key = Container.get_key cc in let h = Hash_algo.hash hi.htalgo key in assert(h >= 0L); let i = Int64.rem h hi.htsize in match probe (fun _ _ -> None) hi i i with | `Free j -> let c = fill_cell hi j cc in ( try Superblock.set_variable hi.sb entries_name (Int64.succ (Superblock.variable hi.sb entries_name)); Superblock.set_variable hi.sb aentries_name (Int64.succ (Superblock.variable hi.sb aentries_name)); with | Not_found -> () ); maybe_write_sb hi; { e_hi = hi; e_index = j; e_cell = c; e_cell_contents = Some cc } | `Full -> failwith "Seqdb_containers.Hindex.add: hash table is full" | `Found _ -> assert false let replace e cc = let e_key = Container.get_key (get_contents e) in if e_key <> Container.get_key cc then failwith "Seqdb_containers.Hindex.replace: Old and new contents have different keys"; let c = fill_cell e.e_hi e.e_index cc in e.e_cell <- c; e.e_cell_contents <- Some cc let delete e = if e.e_cell <> Container.del_mark && e.e_cell <> Container.free_mark then ( let hi = e.e_hi in let sbsize = Superblock.sbsize hi.sb in let cellsz = hi.cellsz in let cellsz64 = ~> cellsz in let p = (~> sbsize) +@ (e.e_index *@ 8L *@ cellsz64) in hi.rw # seek p; (* Check whether the cell is really filled: *) let m1 = input_string e.e_hi.rw (8*cellsz) in if m1 <> Container.del_mark && m1 <> Container.free_mark then ( (* Also check the following cell. If it is free, we can delete with a free_mark instead of a del_mark, which is more efficient in the long run *) let p' = (~> sbsize) +@ (Int64.rem(e.e_index +@ 1L) hi.htsize) *@ 8L *@ cellsz64 in hi.rw # seek p'; let m2 = input_string e.e_hi.rw (8*cellsz) in let mark = if m2 = Container.free_mark then Container.free_mark else Container.del_mark in hi.rw # seek p; assert(String.length mark = 8*cellsz); output_string e.e_hi.rw mark; ( try Superblock.set_variable hi.sb aentries_name (Int64.pred (Superblock.variable hi.sb aentries_name)); with | Not_found -> () ); if mark = Container.free_mark then ( try Superblock.set_variable hi.sb entries_name (Int64.pred (Superblock.variable hi.sb entries_name)); with | Not_found -> () ); maybe_write_sb hi; e.e_cell <- mark; e.e_cell_contents <- None ) ) let have_statistics hi = hi.have_stats let hash_algo hi = hi.htalgo let htsize hi = hi.htsize let num_entries hi = Superblock.variable hi.sb entries_name let num_active_entries hi = Superblock.variable hi.sb aentries_name end module Autoindex (C:HASHABLE) = struct module HI1 = Hindex(C) module HP = Hpointer(C) module HI2 = Hindex(HP) module Container = C (* Note that HI1.Container.t = HI2.Container.t = C.t *) type t = [ `HI1 of HI1.t | `HI2 of HI2.t ] type entry = [ `E1 of HI1.entry | `E2 of HI2.entry ] type contents = Container.entry let create ?buffer_size ?chunk_size ?sbsize ?htsize ?(cellsz=1) ?have_statistics ?hash_algo ?purpose c fd : t = match cellsz with | 1 -> `HI1(HI1.create ?buffer_size ?chunk_size ?sbsize ?htsize ~cellsz ?have_statistics ?hash_algo ?purpose c fd) | 2 -> `HI2(HI2.create ?buffer_size ?chunk_size ?sbsize ?htsize ~cellsz ?have_statistics ?hash_algo ?purpose c fd) | _ -> invalid_arg "Seqdb_containers.Autoindex.create: invalid cellsz" let access ?fully_buffered_index ?buffer_size ?chunk_size ?superblock c fd : t = let sb = match superblock with | None -> Superblock.read_superblock fd | Some sb -> sb in let cellsz64 = try Superblock.variable sb Sb_consts.cellsz_name with | Not_found -> 1L in let cellsz = ~< cellsz64 in match cellsz with | 1 -> `HI1(HI1.access ?fully_buffered_index ?buffer_size ?chunk_size ~superblock:sb c fd) | 2 -> `HI2(HI2.access ?fully_buffered_index ?buffer_size ?chunk_size ~superblock:sb c fd) | _ -> invalid_arg "Seqdb_containers.Autoindex.access: invalid cellsz" let superblock = function | `HI1 hi1 -> HI1.superblock hi1 | `HI2 hi2 -> HI2.superblock hi2 let mark_superblock_as_dirty = function | `HI1 hi1 -> HI1.mark_superblock_as_dirty hi1 | `HI2 hi2 -> HI2.mark_superblock_as_dirty hi2 let configure ?flush_every ?auto_fadvise ?random_fadvise = function | `HI1 hi1 -> HI1.configure ?flush_every ?auto_fadvise ?random_fadvise hi1 | `HI2 hi2 -> HI2.configure ?flush_every ?auto_fadvise ?random_fadvise hi2 let get_pointer = function | `E1 e1 -> HI1.get_pointer e1 | `E2 e2 -> HP.downgrade_pointer (HI2.get_pointer e2) let get_contents = function | `E1 e1 -> HI1.get_contents e1 | `E2 e2 -> HP.downgrade (HI2.get_contents e2) let first_entry = function | `HI1 hi1 -> `E1(HI1.first_entry hi1) | `HI2 hi2 -> `E2(HI2.first_entry hi2) let next_entry = function | `E1 e1 -> `E1(HI1.next_entry e1) | `E2 e2 -> `E2(HI2.next_entry e2) let lookup = function | `HI1 hi1 -> (fun key -> `E1(HI1.lookup hi1 key)) | `HI2 hi2 -> (fun key -> `E2(HI2.lookup hi2 key)) let index_hint = function | `HI1 hi1 -> HI1.index_hint hi1 | `HI2 hi2 -> HI2.index_hint hi2 let pointer_hint = function | `HI1 hi1 -> HI1.pointer_hint hi1 | `HI2 hi2 -> (fun key -> let e2 = HI2.lookup hi2 key in HP.downgrade_pointer (HI2.get_pointer e2) ) let add = function | `HI1 hi1 -> (fun e -> `E1(HI1.add hi1 e)) | `HI2 hi2 -> (fun e -> `E2(HI2.add hi2 (HP.upgrade e))) let replace = function | `E1 e1 -> (fun e -> HI1.replace e1 e) | `E2 e2 -> (fun e -> HI2.replace e2 (HP.upgrade e)) let delete = function | `E1 e1 -> HI1.delete e1 | `E2 e2 -> HI2.delete e2 let flush = function | `HI1 hi1 -> HI1.flush hi1 | `HI2 hi2 -> HI2.flush hi2 let sync = function | `HI1 hi1 -> HI1.sync hi1 | `HI2 hi2 -> HI2.sync hi2 let have_statistics = function | `HI1 hi1 -> HI1.have_statistics hi1 | `HI2 hi2 -> HI2.have_statistics hi2 let hash_algo = function | `HI1 hi1 -> HI1.hash_algo hi1 | `HI2 hi2 -> HI2.hash_algo hi2 let htsize = function | `HI1 hi1 -> HI1.htsize hi1 | `HI2 hi2 -> HI2.htsize hi2 let num_entries = function | `HI1 hi1 -> HI1.num_entries hi1 | `HI2 hi2 -> HI2.num_entries hi2 let num_active_entries = function | `HI1 hi1 -> HI1.num_active_entries hi1 | `HI2 hi2 -> HI2.num_active_entries hi2 let fadvise_wontneed = function | `HI1 hi1 -> HI1.fadvise_wontneed hi1 | `HI2 hi2 -> HI2.fadvise_wontneed hi2 end module type PERM = sig type t type entry module Container : HASHABLE val create : ?sbsize:int -> ?hash_algo:Hash_algo.hash_algo -> ?fileincr:int64 -> ?purpose:string -> Container.t -> Seqdb_rdwr.file_descr -> t val access : Container.t -> Seqdb_rdwr.file_descr -> t val superblock : t -> Superblock.t val mark_superblock_as_dirty : t -> unit val configure : ?flush_every:int -> ?auto_fadvise:bool -> t -> unit val size : t -> int64 val add : t -> Container.entry -> entry val flush : t -> unit val sync : t -> unit val get_contents : entry -> Container.entry val get_index : entry -> int64 val lookup : t -> int64 -> entry val group : ?ext:(file_descr * int) -> t -> unit val hash_algo : t -> Hash_algo.hash_algo val fadvise_wontneed : t -> unit end module Perm (C : HASHABLE) = struct open Sb_consts module Container = C type t = { fd : file_descr; rw : reader_writer; sb : Superblock.t; mutable size : int64; htalgo : Hash_algo.hash_algo; mutable dirty_sb_cnt : int; mutable dirty_sb_max : int; mutable auto_fadvise : bool; cont : Container.t; } type entry = { e_perm : t; e_index : int64; mutable e_ptr : string; mutable e_hash : int64; } let () = if Container.pointer_length <> 8 then invalid_arg "Seqdb_containers.Perm: Module only compatible with 64 bit pointers" let create ?(sbsize = 512) ?hash_algo ?(fileincr = 1024L *@ 1024L) ?(purpose = "") cont fd = let hash_algo = match hash_algo with | Some a -> a | None -> ( match Container.suggested_hash_algo cont with | Some a -> a | None -> `MD5 ) in let sb = Superblock.create_superblock ~sbsize ~format:perm_format ~purpose () in Superblock.set_variable sb permsize_name 0L; Superblock.set_variable sb htalgo_name (Hash_algo.htalgo_of_hash_algo hash_algo); Superblock.set_variable sb filesize_name (~> sbsize); Superblock.set_variable sb fileincr_name fileincr; let rw_phys = new buf_rd_wr fd in Superblock.write_superblock_to_rw rw_phys sb; rw_phys # seek (Int64.of_int sbsize); for k = 1 to ~<< (fileincr /@ 1024L) do rw_phys # really_output zero_block 0 1024 done; rw_phys # really_output zero_block 0 (~<< (Int64.rem fileincr 1024L)); rw_phys # flush(); Linux.fdatasync fd#file_descr; let buffer_size = rw_phys#chunk_size in let start_offset = max buffer_size sbsize in Linux.fadvise fd#file_descr (~>start_offset) 0L Linux.FADV_DONTNEED; let rw = new sub_rd_wr ~incr:fileincr (Int64.of_int sbsize) rw_phys in { fd = fd; rw = rw; sb = sb; size = 0L; htalgo = hash_algo; dirty_sb_cnt = 0; dirty_sb_max = 1; auto_fadvise = false; cont = cont; } let access cont fd = let sb = Superblock.read_superblock fd in let sbsize = Superblock.sbsize sb in let fsize = Superblock.filesize sb fd in let size = try Superblock.variable sb permsize_name with | Not_found -> failwith "Seqdb_containers.Perm.access: No PERMSIZE in superblock" in if fsize < size *@ 16L +@ ~> sbsize then failwith "Seqdb_containers.Perm.access: PERMSIZE mismatch"; let htalgo = try Hash_algo.hash_algo_of_htalgo (Superblock.variable sb htalgo_name) with | Not_found -> failwith "Seqdb_containers.Hindex.access: No HTALGO in superblock" | _ -> failwith "Seqdb_containers.Hindex.access: Bad HTALGO in superblock" in let fileincr = try Superblock.variable sb fileincr_name with | Not_found -> 0L in let rw_phys = new buf_rd_wr fd in let rw = new sub_rd_wr ~incr:fileincr fsize rw_phys in { fd = fd; rw = rw; sb = sb; size = size; htalgo = htalgo; dirty_sb_cnt = 0; dirty_sb_max = 1; auto_fadvise = false; cont = cont; } let configure ?flush_every ?auto_fadvise kv = ( match flush_every with | None -> () | Some n -> kv.dirty_sb_max <- n ); ( match auto_fadvise with | None -> () | Some b -> kv.auto_fadvise <- b ) let size perm = perm.size let hash_algo perm = perm.htalgo let superblock perm = perm.sb let fadvise_wontneed hi = (* fadvise everything but the superblock: *) let buffer_size = hi.rw#chunk_size in let start_offset = max buffer_size (Superblock.sbsize hi.sb) in Linux.fadvise hi.fd#file_descr (~> start_offset) 0L Linux.FADV_DONTNEED let do_sync hi = Linux.fdatasync hi.fd#file_descr; if hi.auto_fadvise then fadvise_wontneed hi let flush hi = if hi.dirty_sb_cnt > 0 then ( Superblock.write_superblock_to_rw hi.rw hi.sb; ); hi.rw # flush(); hi.rw # dispose_descr(); hi.dirty_sb_cnt <- 0 let sync hi = flush hi; do_sync hi let maybe_write_sb hi = hi.dirty_sb_cnt <- hi.dirty_sb_cnt + 1; if hi.dirty_sb_cnt >= hi.dirty_sb_max then ( hi.rw # flush(); Superblock.write_superblock_to_rw hi.rw hi.sb; hi.rw # flush(); hi.rw # dispose_descr(); hi.dirty_sb_cnt <- 0; ) let mark_superblock_as_dirty = maybe_write_sb let add perm ce = let i = perm.size in let key = Container.get_key ce in let h = Hash_algo.hash perm.htalgo key in assert(h >= 0L); let sbsize = Superblock.sbsize perm.sb in let p = (~> sbsize) +@ (i *@ 16L) in perm.rw # seek p; let c = Container.string_of_pointer (Container.get_pointer ce) in output_string perm.rw c; output_string perm.rw (write_int64 h); Superblock.set_variable perm.sb filesize_name perm.rw#eof; Superblock.set_variable perm.sb permsize_name (Int64.succ perm.size); perm.size <- Int64.succ perm.size; maybe_write_sb perm; { e_perm = perm; e_index = i; e_ptr = c; e_hash = h } let get_contents e = Container.lookup e.e_perm.cont (Container.pointer_of_string e.e_ptr) let get_index e = e.e_index let lookup perm i = if i < 0L || i >= perm.size then invalid_arg "Seqdb_containers.Perm.lookup"; let sbsize = Superblock.sbsize perm.sb in let p = (~> sbsize) +@ (i *@ 16L) in perm.rw # seek p; let c = input_string perm.rw 8 in let h = read_int64 (input_string perm.rw 8) in { e_perm = perm; e_index = i; e_ptr = c; e_hash = h } let read_bigarray perm i n = (* Read cells i to i+n-1 into a bigarray of size 2*n *) let ba = Bigarray.Array1.create Bigarray.int64 Bigarray.c_layout (2*n) in let fadv_idx = ref 0 in let sbsize = Superblock.sbsize perm.sb in let buffer_size = perm.rw#chunk_size in let start_offset = max buffer_size sbsize in perm.rw # seek (~> sbsize); for k = 0 to n-1 do if k >= !fadv_idx then ( Linux.fadvise perm.fd#file_descr (~> start_offset) (max 1L (perm.rw#pos -@ ~>start_offset)) Linux.FADV_DONTNEED; Linux.fadvise perm.fd#file_descr perm.rw#pos 1048576L Linux.FADV_WILLNEED; fadv_idx := !fadv_idx + 32768; ); let c = read_int64 (input_string perm.rw 8) in let h = read_int64 (input_string perm.rw 8) in ba.{ k lsl 1 } <- c; ba.{ (k lsl 1) + 1 } <- h; done; Linux.fadvise perm.fd#file_descr (~> start_offset) 0L Linux.FADV_DONTNEED; ba exception Bottom of int;; let sort_bigarray perm (ba : (int64, Bigarray.int64_elt, Bigarray.c_layout) Bigarray.Array1.t) = (* The heap sort algorithm is copied from ocaml sources (Array) *) let cmp_keys c1 c2 = let ce1 = Container.lookup perm.cont (Container.pointer_of_string (write_int64 c1)) in let ce2 = Container.lookup perm.cont (Container.pointer_of_string (write_int64 c2)) in String.compare (Container.get_key ce1) (Container.get_key ce2) in let cmp_ii i j = let r = Int64.compare ba.{ (i lsl 1) + 1 } ba.{ (j lsl 1) + 1 } in if r = 0 then cmp_keys ba.{ (i lsl 1) } ba.{ (j lsl 1) } else r in let cmp_ic i c = let r = Int64.compare ba.{ (i lsl 1) + 1 } (snd c) in if r = 0 then cmp_keys ba.{ (i lsl 1) } (fst c) else r in let set_ii i j = ba.{ (i lsl 1) } <- ba.{ (j lsl 1) }; ba.{ (i lsl 1)+1 } <- ba.{ (j lsl 1)+1 } in let set_ic i c = ba.{ (i lsl 1) } <- fst c; ba.{ (i lsl 1)+1 } <- snd c in let get i = (ba.{ (i lsl 1) }, ba.{ (i lsl 1)+1 }) in let l = (Bigarray.Array1.dim ba) lsr 1 in let maxson m i = let i31 = i+i+i+1 in let x = ref i31 in if i31+2 < m then begin if cmp_ii i31 (i31+1) < 0 then x := i31+1; if cmp_ii !x (i31+2) < 0 then x := i31+2; !x end else if i31+1 < m && cmp_ii i31 (i31+1) < 0 then i31+1 else if i31 < m then i31 else raise (Bottom i) in let rec trickledown i e = let j = maxson l i in if cmp_ic j e > 0 then begin set_ii i j; trickledown j e; end else begin set_ic i e; end; in let trickle i e = try trickledown i e with Bottom i -> set_ic i e in let rec bubbledown m i = let j = maxson m i in set_ii i j; bubbledown m j in let bubble m i = try bubbledown m i with Bottom i -> i in let rec trickleup i e = let father = (i - 1) / 3 in assert (i <> father); if cmp_ic father e < 0 then begin set_ii i father; if father > 0 then trickleup father e else set_ic 0 e; end else begin set_ic i e; end; in for i = (l + 1) / 3 - 1 downto 0 do trickle i (get i); done; for i = l - 1 downto 2 do let e = get i in set_ii i 0; trickleup (bubble i 0) e; done; if l > 1 then (let e = get 1 in set_ii 1 0; set_ic 0 e) let write_bigarray perm (ba : (int64, Bigarray.int64_elt, Bigarray.c_layout) Bigarray.Array1.t) i = (* Write ba to perm at index i *) let sbsize = Superblock.sbsize perm.sb in let buffer_size = perm.rw#chunk_size in let start_offset = max buffer_size sbsize in perm.rw # seek ((~> sbsize) +@ i *@ 16L); let n = (Bigarray.Array1.dim ba) lsr 1 in for k = 0 to n-1 do let c = ba.{ k lsl 1 } in let h = ba.{ (k lsl 1) + 1 } in output_string perm.rw (write_int64 c); output_string perm.rw (write_int64 h); if (k land 0x7fff = 0) then ( Linux.fdatasync perm.fd#file_descr; Linux.fadvise perm.fd#file_descr (~> start_offset) 0L Linux.FADV_DONTNEED; ) done; Linux.fdatasync perm.fd#file_descr; Linux.fadvise perm.fd#file_descr (~> start_offset) 0L Linux.FADV_DONTNEED let group_inmem perm = let ba = read_bigarray perm 0 (~< (perm.size)) in sort_bigarray perm ba; write_bigarray perm ba 0L let group ?ext perm = group_inmem perm (* match ext with | None -> group_inmem perm | Some(fd,n) -> (* TODO *) () *) end