(* $Id: sortbench.ml 16166 2008-01-17 22:59:55Z gerd $ *) (* Benchmark for: Sorting and merging large files *) open Seqdb_containers open Printf (**********************************************************************) class type x_file_descr = object inherit Seqdb_rdwr.file_descr method dispose_descr : unit -> unit end class kvseq_file_descr fn : x_file_descr = object val mutable fd_opt = None method file_descr = match fd_opt with | None -> let modes = [Unix.O_RDWR] in let fd = Unix.openfile fn modes 0o666 in fd_opt <- Some fd; fd | Some fd -> fd method dispose_descr() = match fd_opt with | Some fd -> Unix.close fd; fd_opt <- None | None -> () end let create_file name = let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT] 0o666 in Unix.close fd let create_kvseq keyrepr valrepr name = create_file name; let fd = new kvseq_file_descr name in let kvseq = Kvseq.create ~keyrepr ~valrepr ~purpose:"OUTPUT" (fd :> Seqdb_rdwr.file_descr) in Kvseq.configure ~flush_every:max_int ~auto_sync:None kvseq; (fd, kvseq) let remove_from_page_cache name = let fd = Unix.openfile name [Unix.O_RDWR] 0 in Linux.fadvise fd 0L 0L Linux.FADV_DONTNEED; Unix.close fd (**********************************************************************) module KvReader : sig type kvreader type kv = string * string (* key, value *) type config = { kvr_chunksize : int; (* size of a chunk in bytes *) } (* A kvreader reads a kvseq file chunk by chunk. There is a cursor (chunk, entry) which is a pair that points to a certain chunk and a certain entry within the chunk. The cursor can be moved to the next entry or to the beginning of the next chunk. You can get the chunk as a whole, or you can get the entries one after the other (in which case the current chunk serves as a buffer). Deleted entries are skipped while the kvseq file is read. Every chunk has at least one entry, even if that entry is bigger than [kvr_chunksize]. *) val read_kvseq : Kvseq.t -> config -> kvreader (* Starts reading a kvseq file. Raises [End_of_file] if the file is empty *) val chunk_position : kvreader -> int (* Returns the chunk of the cursor. The chunks are numbered from 0 to [n_chunks - 1]. *) val entry_position : kvreader -> int (* Returns the entry position of the cursor. This position is per chunk, and for every chunk you can get a number from 0 to [chunk_length - 1] *) val chunk_length : kvreader -> int (* Returns the number of entries in the current chunk *) val next_chunk : kvreader -> unit (* Moves the cursor to the beginnning of the next chunk, or raises [End_of_file] if there is no next chunk. *) val next_entry : kvreader -> unit (* Moves the cursor to the next entry (and possibly to the next chunk if the current chunk is fully read), or raises [End_of_file] if there is no next entry. *) val get_chunk : kvreader -> kv array (* Returns the array with the current chunk *) val get_entry : kvreader -> kv (* Returns the current entry *) end = struct type kv = string * string type config = { kvr_chunksize : int } type kvreader = { kvseq : Kvseq.t; mutable chunk : kv array; mutable entry : Kvseq.entry option; mutable chunk_pos : int; mutable entry_pos : int; cfg : config } let load_chunk entry cfg = let c = ref [] in let s = ref 0L in let max_s = Int64.of_int cfg.kvr_chunksize in let cont = ref true in let e = ref entry in try while !cont do if not (Kvseq.get_delflag !e) then ( let s' = Int64.add !s (Kvseq.get_total_length !e) in if !c = [] || s' < max_s then ( c := (Kvseq.get_key !e, Kvseq.get_value !e) :: !c; s := s' ) else cont := false ); if !cont then e := Kvseq.next_entry !e; (* or End_of_file *) done; (Some !e, Array.of_list (List.rev !c)) with | End_of_file -> (None, Array.of_list (List.rev !c)) let read_kvseq kvseq cfg = let first_entry = Kvseq.first_entry kvseq in (* or End_of_file *) let e_opt, chunk = load_chunk first_entry cfg in if chunk = [| |] then raise End_of_file; { kvseq = kvseq; chunk = chunk; entry = e_opt; chunk_pos = 0; entry_pos = 0; cfg = cfg } let next_chunk kvr = match kvr.entry with | None -> raise End_of_file | Some e -> let e_opt, chunk = load_chunk e kvr.cfg in if chunk = [| |] then raise End_of_file; kvr.chunk <- chunk; kvr.entry <- e_opt; kvr.chunk_pos <- kvr.chunk_pos + 1; kvr.entry_pos <- 0 let next_entry kvr = let p = kvr.entry_pos in if p+1 < Array.length kvr.chunk then kvr.entry_pos <- p+1 else next_chunk kvr let chunk_position kvr = kvr.chunk_pos let entry_position kvr = kvr.entry_pos let chunk_length kvr = Array.length kvr.chunk let get_chunk kvr = kvr.chunk let get_entry kvr = kvr.chunk.( kvr.entry_pos ) end (**********************************************************************) module KvWriter : sig val append_chunk : Kvseq.t -> KvReader.kv array -> unit (* Appends a chunk to the kvseq file *) end = struct let append_chunk kvseq chunk = Array.iter (fun (k,v) -> let _ = Kvseq.add kvseq { Kvseq.delflag = false; key = k; value = v } in () ) chunk end (**********************************************************************) module Tempfiles : sig type tmp_config = { tmp_prefix : string } type tmp_set val create_tmp_set : tmp_config -> tmp_set val create_tmp_file : tmp_set -> string (* Create a new temporary file and return its name *) val delete_tmp_set : tmp_set -> unit (* Delete all temporary files of that set *) val delete_tmp_file : string -> tmp_set -> unit end = struct type tmp_config = { tmp_prefix : string } type tmp_set = { prefix : string; rnd : Random.State.t; mutable members : string list; } let create_tmp_set cfg = { prefix = cfg.tmp_prefix; rnd = Random.State.make_self_init(); members = [] } let create_tmp_file set = let rec attempt n = let r = Random.State.int set.rnd (256 * 256 * 256) in let name = Printf.sprintf "%s-%06x-%d.kvseq" set.prefix r n in try let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT; Unix.O_EXCL] 0o666 in Unix.close fd; set.members <- name :: set.members; name with | Unix.Unix_error(Unix.EEXIST,_,_) -> attempt (n+1) in attempt 0 let delete_tmp_file file set = if List.mem file set.members then ( ( try Unix.unlink file with _ -> () ); set.members <- List.filter (fun n -> n <> file) set.members ) let delete_tmp_set set = List.iter (fun name -> try Unix.unlink name with _ -> () ) set.members; set.members <- [] end (**********************************************************************) module Transform : sig val copy : KvReader.kvreader -> Kvseq.t -> unit (* Copies the entries of the kvreader to the kvseq output file *) val merge : KvReader.kvreader -> KvReader.kvreader -> Kvseq.t -> unit (* Merges the entries of the two kvreaders and writes the output to the kvseq *) type sortconfig = { tmpset : Tempfiles.tmp_set; memsize : int; nodelete : bool } val sort : sortconfig -> Kvseq.t -> Kvseq.t -> unit (* Sorts the contents of the first file, and appends it to the second file. *) end = struct let copy kvrd kvout = try while true do let (k,v) = KvReader.get_entry kvrd in let _ = Kvseq.add kvout { Kvseq.delflag = false; key = k; value = v } in KvReader.next_entry kvrd (* or End_of_file *) done; assert false with | End_of_file -> () let merge kvrd1 kvrd2 kvout = let kv1_opt = ref (Some(KvReader.get_entry kvrd1)) in let kv2_opt = ref (Some(KvReader.get_entry kvrd2)) in while !kv1_opt <> None && !kv2_opt <> None do match !kv1_opt, !kv2_opt with | Some (k1,v1), Some (k2,v2) -> let take_first = k1 < k2 in let (k_out, v_out) = if take_first then (k1,v1) else (k2,v2) in let _ = Kvseq.add kvout { Kvseq.delflag = false; key = k_out; value = v_out } in if take_first then ( kv1_opt := try Some(KvReader.next_entry kvrd1; KvReader.get_entry kvrd1) with End_of_file -> None; ) else ( kv2_opt := try Some(KvReader.next_entry kvrd2; KvReader.get_entry kvrd2) with End_of_file -> None; ) | _ -> assert false done; match !kv1_opt, !kv2_opt with | Some _, Some _ -> assert false | Some _, None -> copy kvrd1 kvout | None, Some _ -> copy kvrd2 kvout | None, None -> assert false type sortconfig = { tmpset : Tempfiles.tmp_set; memsize : int; nodelete : bool } let sort cfg kvin kvout = let keyrepr = Kvseq.keyrepr kvin in let valrepr = Kvseq.valrepr kvin in let sortcfg = {KvReader.kvr_chunksize = cfg.memsize} in let mergecfg = {KvReader.kvr_chunksize = cfg.memsize/2 } in let kvrd_opt = ref(try Some(KvReader.read_kvseq kvin sortcfg) with End_of_file -> None) in let next_chunk() = match !kvrd_opt with | None -> None | Some kvrd -> let chunk = KvReader.get_chunk kvrd in Array.stable_sort (fun (k1,_) (k2,_) -> String.compare k1 k2) chunk; ( try KvReader.next_chunk kvrd with End_of_file -> kvrd_opt := None); Some chunk in let read_kvseq file = let kvfd = new kvseq_file_descr file in let kvseq = Kvseq.access (kvfd :> Seqdb_rdwr.file_descr) in (kvfd, kvseq, KvReader.read_kvseq kvseq mergecfg) in let temp_kvseq() = let tmpfile = Tempfiles.create_tmp_file cfg.tmpset in let kvfd = new kvseq_file_descr tmpfile in let kv = Kvseq.create ~keyrepr ~valrepr ~supports_deletions:false ~have_statistics:false ~purpose:"TEMPFILE" (kvfd :> Seqdb_rdwr.file_descr) in Kvseq.configure ~flush_every:max_int ~auto_sync:None kv; (tmpfile, kvfd, kv) in let stack = Stack.create() in (* Elements are (file,height) *) let rec rollup finish_flag = (* Merge the elements on the stack from top to bottom. If finish_flag is set, all elements are merged in sequence, and the final merge is appended to kvout. If the flag is not set, elements are only merged if they have the same size. In this case, the result is again put onto the stack. *) let (top1_file, top1_height) = try Stack.pop stack with Stack.Empty -> assert false in let (top2_file, top2_height) = try Stack.pop stack with Stack.Empty -> assert false in if finish_flag || top1_height = top2_height then ( (* Do the merge. If the stack is now empty and finish_flag the merge is appended to kvout, else to a new temp file *) let top1_fd, top1_kvseq, top1_rd = read_kvseq top1_file in let top2_fd, top2_kvseq, top2_rd = read_kvseq top2_file in let is_empty = try ignore(Stack.top stack); false with Stack.Empty -> true in if is_empty && finish_flag then ( eprintf "Merging OUT = %s + %s\n%!" top1_file top2_file; merge top1_rd top2_rd kvout; Kvseq.flush top1_kvseq; top1_fd # dispose_descr(); Kvseq.flush top2_kvseq; top2_fd # dispose_descr(); if not cfg.nodelete then ( Tempfiles.delete_tmp_file top1_file cfg.tmpset; Tempfiles.delete_tmp_file top2_file cfg.tmpset ) ) else ( let merged_file, merged_fd, merged_kvseq = temp_kvseq() in eprintf "Merging %s = %s + %s\n%!" merged_file top1_file top2_file; merge top1_rd top2_rd merged_kvseq; Kvseq.flush merged_kvseq; merged_fd # dispose_descr(); Kvseq.flush top1_kvseq; top1_fd # dispose_descr(); Kvseq.flush top2_kvseq; top2_fd # dispose_descr(); if not cfg.nodelete then ( Tempfiles.delete_tmp_file top1_file cfg.tmpset; Tempfiles.delete_tmp_file top2_file cfg.tmpset; ); Stack.push (merged_file, (top2_height+1)) stack; if not is_empty then rollup finish_flag (* recurse in this case *) ) ) else ( (* Do nothing, and put the elements back: *) Stack.push (top2_file, top2_height) stack; Stack.push (top1_file, top1_height) stack; ) in let extend() = (* Fetch a new chunk from input, and add it to the stack. Returns true if a new chunk is available, otherwise false. *) match next_chunk() with | None -> false | Some chunk -> let tmp_file, tmp_fd, tmp_kvseq = temp_kvseq() in KvWriter.append_chunk tmp_kvseq chunk; Kvseq.flush tmp_kvseq; tmp_fd # dispose_descr(); Stack.push (tmp_file, 0) stack; true in let rec sort_loop() = (* Precondition: >= 1 element on the stack *) let success = extend() in if success then ( rollup false; sort_loop() ) else ( rollup true ) in let success1 = extend() in if success1 then ( let success2 = extend() in if success2 then (* At least two chunks: The sort'n'merge game can begin *) sort_loop() else ( (* input has only one chunk at all *) let (top_file, _) = try Stack.pop stack with Stack.Empty -> assert false in eprintf "Copying OUT = %s\n%!" top_file; let top_fd, top_kvseq, top_rd = read_kvseq top_file in copy top_rd kvout; Kvseq.flush top_kvseq; top_fd # dispose_descr(); ) ) (* else: input is empty, so don't write anything *) end (**********************************************************************) (* Read: read kvseq files chunk by chunk. Do nothing with the chunks *) let cmd_read() = let files = ref [] in let chunksize = ref (1024 * 1024) in Arg.parse [ "-chunksize", Arg.Set_int chunksize, " Set chunksize to bytes"] (fun s -> files := !files @ [s]) "usage: sortbench read ..."; let cfg = { KvReader.kvr_chunksize = !chunksize } in List.iter (fun file -> remove_from_page_cache file; let kvfd = new kvseq_file_descr file in let kvseq = Kvseq.access (kvfd :> Seqdb_rdwr.file_descr) in let n_chunks = ref 0 in let n_entries = ref 0 in ( try let kvrd = KvReader.read_kvseq kvseq cfg in incr n_chunks; n_entries := !n_entries + KvReader.chunk_length kvrd; while true do KvReader.next_chunk kvrd; incr n_chunks; n_entries := !n_entries + KvReader.chunk_length kvrd; done; assert false with End_of_file -> printf "File %s: %d chunks, %d entries\n%!" file !n_chunks !n_entries ); kvfd # dispose_descr() ) !files ;; (* Split: read kvseq files chunk by chunk, and write every chunk into a temp file. Optionally, the chunks are sorted by keys. *) let cmd_split() = let files = ref [] in let chunksize = ref (1024 * 1024) in let tmp_prefix = ref "/tmp/sortbench" in let sort = ref false in let nodelete = ref false in Arg.parse [ "-chunksize", Arg.Set_int chunksize, " Set chunksize to bytes"; "-tmp-prefix", Arg.Set_string tmp_prefix, " Set prefix for creating tempfiles"; "-sort", Arg.Set sort, " Sort the chunks by keys"; "-nodelete", Arg.Set nodelete, " Do not remove output files"; ] (fun s -> files := !files @ [s]) "usage: sortbench split ..."; let cfg = { KvReader.kvr_chunksize = !chunksize } in let process_chunk keyrepr valrepr kvrd tmpset = let chunk = KvReader.get_chunk kvrd in if !sort then Array.stable_sort (fun (k1,_) (k2,_) -> String.compare k1 k2) chunk; let tmpfile = Tempfiles.create_tmp_file tmpset in let kvfd = new kvseq_file_descr tmpfile in let kv = Kvseq.create ~keyrepr ~valrepr ~supports_deletions:false ~have_statistics:false ~purpose:"TEMPFILE" (kvfd :> Seqdb_rdwr.file_descr) in Kvseq.configure ~flush_every:max_int ~auto_sync:None kv; KvWriter.append_chunk kv chunk; Kvseq.flush kv; kvfd # dispose_descr() in List.iter (fun file -> remove_from_page_cache file; let tmpset = Tempfiles.create_tmp_set {Tempfiles.tmp_prefix = !tmp_prefix} in let kvfd = new kvseq_file_descr file in let kvseq = Kvseq.access (kvfd :> Seqdb_rdwr.file_descr) in let keyrepr = Kvseq.keyrepr kvseq in let valrepr = Kvseq.valrepr kvseq in let n_chunks = ref 0 in let n_entries = ref 0 in ( try let kvrd = KvReader.read_kvseq kvseq cfg in incr n_chunks; n_entries := !n_entries + KvReader.chunk_length kvrd; process_chunk keyrepr valrepr kvrd tmpset; while true do KvReader.next_chunk kvrd; incr n_chunks; n_entries := !n_entries + KvReader.chunk_length kvrd; process_chunk keyrepr valrepr kvrd tmpset; done; assert false with End_of_file -> printf "File %s: %d chunks, %d entries\n%!" file !n_chunks !n_entries ); kvfd # dispose_descr(); if not !nodelete then Tempfiles.delete_tmp_set tmpset ) !files ;; (* Merge: Merge two kvseq files and write output to a third file *) let cmd_merge() = let files = ref [] in let chunksize = ref (1024 * 1024) in let tmp_prefix = ref "/tmp/sortbench" in let nodelete = ref false in Arg.parse [ "-chunksize", Arg.Set_int chunksize, " Set chunksize to bytes"; "-tmp-prefix", Arg.Set_string tmp_prefix, " Set prefix for creating tempfiles"; "-nodelete", Arg.Set nodelete, " Do not remove output files"; ] (fun s -> files := !files @ [s]) "usage: sortbench merge file1.kvseq file2.kvseq"; let (file1, file2) = match !files with | [f1; f2] -> (f1,f2) | _ -> failwith "Need exact two files as arguments" in let cfg = { KvReader.kvr_chunksize = !chunksize } in let tmpset = Tempfiles.create_tmp_set {Tempfiles.tmp_prefix = !tmp_prefix} in remove_from_page_cache file1; let kvfd1 = new kvseq_file_descr file1 in let kvseq1 = Kvseq.access (kvfd1 :> Seqdb_rdwr.file_descr) in let kvrd1 = KvReader.read_kvseq kvseq1 cfg in let keyrepr1 = Kvseq.keyrepr kvseq1 in let valrepr1 = Kvseq.valrepr kvseq1 in remove_from_page_cache file2; let kvfd2 = new kvseq_file_descr file2 in let kvseq2 = Kvseq.access (kvfd2 :> Seqdb_rdwr.file_descr) in let kvrd2 = KvReader.read_kvseq kvseq2 cfg in let keyrepr2 = Kvseq.keyrepr kvseq2 in let valrepr2 = Kvseq.valrepr kvseq2 in if keyrepr1 <> keyrepr2 then failwith "Key representation differs"; if valrepr1 <> valrepr2 then failwith "Value representation differs"; let outfile = Tempfiles.create_tmp_file tmpset in let kvfdout = new kvseq_file_descr outfile in let kvout = Kvseq.create ~keyrepr:keyrepr1 ~valrepr:valrepr1 ~supports_deletions:false ~have_statistics:false ~purpose:"TEMPFILE" (kvfdout :> Seqdb_rdwr.file_descr) in Kvseq.configure ~flush_every:max_int ~auto_sync:None kvout; Transform.merge kvrd1 kvrd2 kvout; Kvseq.flush kvout; kvfdout # dispose_descr(); kvfd1 # dispose_descr(); kvfd2 # dispose_descr(); printf "Merged.\n%!"; if not !nodelete then Tempfiles.delete_tmp_set tmpset ;; let cmd_sort() = let files = ref [] in let chunksize = ref (1024 * 1024) in let tmp_prefix = ref "/tmp/sortbench" in let nodelete = ref false in Arg.parse [ "-chunksize", Arg.Set_int chunksize, " Set chunksize to bytes"; "-tmp-prefix", Arg.Set_string tmp_prefix, " Set prefix for creating tempfiles"; "-nodelete", Arg.Set nodelete, " Do not remove output files"; ] (fun s -> files := !files @ [s]) "usage: sortbench sort infile.kvseq outfile.kvseq"; let (file1, file2) = match !files with | [f1; f2] -> (f1,f2) | _ -> failwith "Need exact two files as arguments" in let tmpset = Tempfiles.create_tmp_set {Tempfiles.tmp_prefix = !tmp_prefix} in let cfg = { Transform.tmpset = tmpset; memsize = !chunksize; nodelete = !nodelete } in remove_from_page_cache file1; let file1_fd = new kvseq_file_descr file1 in let file1_kvseq = Kvseq.access (file1_fd :> Seqdb_rdwr.file_descr) in let keyrepr = Kvseq.keyrepr file1_kvseq in let valrepr = Kvseq.valrepr file1_kvseq in let file2_fd, file2_kvseq = create_kvseq keyrepr valrepr file2 in Transform.sort cfg file1_kvseq file2_kvseq; Kvseq.flush file1_kvseq; file1_fd # dispose_descr(); Kvseq.flush file2_kvseq; file2_fd # dispose_descr(); if not !nodelete then Tempfiles.delete_tmp_set tmpset; printf "Done\n%!" ;; (**********************************************************************) let commands = [ "read", cmd_read, "Read kvseq files chunk by chunk"; "split", cmd_split, "Read kvseq files, and split into chunks"; "merge", cmd_merge, "Merge two sorted kvseq files, write result to 3rd file"; "sort", cmd_sort, "Sort a kvseq file"; ] let usage pgm_name = eprintf "usage: %s \n" pgm_name; eprintf " is one of the following:\n"; List.iter (fun (name, _, text) -> eprintf " %s: %s\n" name text ) commands; eprintf " and depend on the command you are issuing.\n"; eprintf "use '%s -help' to get command-specific help.\n" pgm_name; flush stderr; exit 2 ;; let main() = if Array.length Sys.argv <= 1 then usage "sortbench"; let cmd_name = Sys.argv.(1) in let _, cmd, _ = try List.find (fun (name, _, _) -> name = cmd_name) commands with | Not_found -> usage "sortbench" in Arg.current := 1; try cmd () with | Arg.Bad msg | Failure msg -> prerr_endline ("sortbench: " ^ msg); exit 2 | Unix.Unix_error(err, _, param) -> let prefix = if param = "" then "sortbench: " else "sortbench: " ^ param ^ ": " in prerr_endline (prefix ^ Unix.error_message err); exit 2 ;; main()