11use errors:: * ;
2+ use std:: sync:: Arc ;
23use std:: path:: { PathBuf , Path } ;
34use std:: process:: { Command , Stdio } ;
45
56use chrono:: prelude:: * ;
67use serde_json;
78use uuid:: Uuid ;
89
10+ use util:: data_layer:: AbstractionLayer ;
911use state:: StateHandling ;
1012use cerberus_proto:: mapreduce as pb;
11- use queued_work_store:: QueuedWork ;
1213
1314/// `JobOptions` stores arguments used to construct a `Job`.
1415#[ derive( Default ) ]
@@ -79,7 +80,7 @@ pub enum SerializableJobStatus {
7980}
8081
8182impl Job {
82- pub fn new ( options : JobOptions ) -> Result < Self > {
83+ pub fn new_no_validate ( options : JobOptions ) -> Result < Self > {
8384 let input_directory = options. input_directory ;
8485
8586 let output_directory = match options. output_directory {
@@ -95,7 +96,7 @@ impl Job {
9596 }
9697 } ;
9798
98- let job = Job {
99+ Ok ( Job {
99100 client_id : options. client_id ,
100101 id : Uuid :: new_v4 ( ) . to_string ( ) ,
101102 binary_path : options. binary_path ,
@@ -116,36 +117,70 @@ impl Job {
116117 time_completed : None ,
117118
118119 cpu_time : 0 ,
119- } ;
120- if options. validate_paths {
121- job. validate_input ( ) . chain_err ( || "Error validating input" ) ?;
120+ } )
121+ }
122+
123+ pub fn new (
124+ options : JobOptions ,
125+ data_abstraction_layer : & Arc < AbstractionLayer + Send + Sync > ,
126+ ) -> Result < Self > {
127+ let validate_paths = options. validate_paths ;
128+ let job = Job :: new_no_validate ( options) . chain_err (
129+ || "Unable to create job" ,
130+ ) ?;
131+
132+ if validate_paths {
133+ job. validate_input ( data_abstraction_layer) . chain_err (
134+ || "Error validating input" ,
135+ ) ?;
122136 }
123137 Ok ( job)
124138 }
125139
126- fn validate_input ( & self ) -> Result < ( ) > {
140+ fn validate_input (
141+ & self ,
142+ data_abstraction_layer : & Arc < AbstractionLayer + Send + Sync > ,
143+ ) -> Result < ( ) > {
127144 // Validate the existence of the input directory and the binary file.
128- let input_path = Path :: new ( self . input_directory . as_str ( ) . clone ( ) ) ;
129- if !( input_path. exists ( ) && input_path. is_dir ( ) ) {
145+ let input_path = Path :: new ( & self . input_directory ) ;
146+ let is_dir = data_abstraction_layer. is_dir ( input_path) . chain_err (
147+ || "Error checking if path is a directory" ,
148+ ) ?;
149+ if !is_dir {
130150 return Err (
131- format ! ( "Input directory does not exist: {}" , self . input_directory) . into ( ) ,
151+ format ! (
152+ "Input directory does not exist: {:?}" ,
153+ data_abstraction_layer. absolute_path( input_path)
154+ ) . into ( ) ,
132155 ) ;
133156 }
134157
135- let binary_path = Path :: new ( self . binary_path . as_str ( ) . clone ( ) ) ;
136- if !( binary_path. exists ( ) && binary_path. is_file ( ) ) {
158+ let binary_path = Path :: new ( & self . binary_path ) ;
159+ let is_file = data_abstraction_layer. is_file ( binary_path) . chain_err (
160+ || "Error checking if path is a file" ,
161+ ) ?;
162+ if !is_file {
137163 return Err (
138- format ! ( "Binary does not exist: {}" , self . binary_path) . into ( ) ,
164+ format ! (
165+ "Binary does not exist: {:?}" ,
166+ data_abstraction_layer. absolute_path( binary_path)
167+ ) . into ( ) ,
139168 ) ;
140169 }
141170
142171 // Binary exists, so run sanity-check on it to verify that it's a libcerberus binary.
143- self . run_sanity_check ( )
172+ self . run_sanity_check ( data_abstraction_layer )
144173 }
145174
146- fn run_sanity_check ( & self ) -> Result < ( ) > {
147- let binary_path = Path :: new ( self . binary_path . as_str ( ) ) ;
148- let child = Command :: new ( binary_path)
175+ fn run_sanity_check (
176+ & self ,
177+ data_abstraction_layer : & Arc < AbstractionLayer + Send + Sync > ,
178+ ) -> Result < ( ) > {
179+ let binary_path = Path :: new ( & self . binary_path ) ;
180+ let absolute_path = data_abstraction_layer
181+ . absolute_path ( binary_path)
182+ . chain_err ( || "unable to get absolute path" ) ?;
183+ let child = Command :: new ( absolute_path)
149184 . arg ( "sanity-check" )
150185 . stdin ( Stdio :: piped ( ) )
151186 . stdout ( Stdio :: piped ( ) )
@@ -198,10 +233,10 @@ impl StateHandling for Job {
198233 . chain_err ( || "Unable to convert input_directory" ) ?,
199234 output_directory : serde_json:: from_value ( data[ "output_directory" ] . clone ( ) )
200235 . chain_err ( || "Unable to convert output dir" ) ?,
201- validate_paths : true ,
236+ validate_paths : false ,
202237 } ;
203238
204- let mut job = Job :: new ( options) . chain_err (
239+ let mut job = Job :: new_no_validate ( options) . chain_err (
205240 || "Unable to create map reduce job" ,
206241 ) ?;
207242
@@ -294,18 +329,6 @@ impl StateHandling for Job {
294329 }
295330}
296331
297- impl QueuedWork for Job {
298- type Key = String ;
299-
300- fn get_work_bucket ( & self ) -> String {
301- self . client_id . clone ( )
302- }
303-
304- fn get_work_id ( & self ) -> String {
305- self . id . clone ( )
306- }
307- }
308-
309332#[ cfg( test) ]
310333mod tests {
311334 use super :: * ;
@@ -321,7 +344,7 @@ mod tests {
321344
322345 #[ test]
323346 fn test_defaults ( ) {
324- let job = Job :: new ( get_test_job_options ( ) ) . unwrap ( ) ;
347+ let job = Job :: new_no_validate ( get_test_job_options ( ) ) . unwrap ( ) ;
325348 // Assert that the default status for a map reduce job is Queued.
326349 assert_eq ! ( pb:: Status :: IN_QUEUE , job. status) ;
327350 // Assert that completed tasks starts at 0.
@@ -332,18 +355,10 @@ mod tests {
332355 assert_eq ! ( 0 , job. reduce_tasks_total) ;
333356 }
334357
335- #[ test]
336- fn test_queued_work_impl ( ) {
337- let job = Job :: new ( get_test_job_options ( ) ) . unwrap ( ) ;
338-
339- assert_eq ! ( job. get_work_bucket( ) , "client-1" ) ;
340- assert_eq ! ( job. get_work_id( ) , job. id) ;
341- }
342-
343358 #[ test]
344359 fn test_output_directory ( ) {
345- let job1 = Job :: new ( get_test_job_options ( ) ) . unwrap ( ) ;
346- let job2 = Job :: new ( JobOptions {
360+ let job1 = Job :: new_no_validate ( get_test_job_options ( ) ) . unwrap ( ) ;
361+ let job2 = Job :: new_no_validate ( JobOptions {
347362 client_id : "client-1" . to_owned ( ) ,
348363 binary_path : "/tmp/binary" . to_owned ( ) ,
349364 input_directory : "/tmp/input/" . to_owned ( ) ,
0 commit comments