Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
test_reader.py
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file test_reader.py
1 import pytest
2 import os
3 from pathlib import Path
4 import multiprocessing
5 
6 from helpers import (
7  geant4Enabled,
8  edm4hepEnabled,
9  AssertCollectionExistsAlg,
10 )
11 from common import getOpenDataDetectorDirectory
12 
13 from acts.examples.odd import getOpenDataDetector
14 
15 import acts
16 from acts import PlanarModuleStepper, UnitConstants as u
17 from acts.examples import (
18  RootParticleWriter,
19  RootParticleReader,
20  RootMaterialTrackReader,
21  RootTrajectorySummaryReader,
22  CsvParticleWriter,
23  CsvParticleReader,
24  CsvMeasurementWriter,
25  CsvMeasurementReader,
26  CsvSimHitWriter,
27  CsvSimHitReader,
28  CsvPlanarClusterWriter,
29  CsvPlanarClusterReader,
30  PlanarSteppingAlgorithm,
31  Sequencer,
32 )
33 
34 
35 @pytest.mark.root
36 def test_root_particle_reader(tmp_path, conf_const, ptcl_gun):
37  # need to write out some particles first
38  s = Sequencer(numThreads=1, events=10, logLevel=acts.logging.WARNING)
39  evGen = ptcl_gun(s)
40 
41  file = tmp_path / "particles.root"
42  s.addWriter(
43  conf_const(
44  RootParticleWriter,
45  acts.logging.WARNING,
46  inputParticles=evGen.config.outputParticles,
47  filePath=str(file),
48  )
49  )
50 
51  s.run()
52 
53  del s # to properly close the root file
54 
55  # reset sequencer for reading
56 
57  s2 = Sequencer(numThreads=1, logLevel=acts.logging.WARNING)
58 
59  s2.addReader(
60  conf_const(
61  RootParticleReader,
62  acts.logging.WARNING,
63  particleCollection="input_particles",
64  filePath=str(file),
65  )
66  )
67 
68  alg = AssertCollectionExistsAlg(
69  "input_particles", "check_alg", acts.logging.WARNING
70  )
71  s2.addAlgorithm(alg)
72 
73  s2.run()
74 
75  assert alg.events_seen == 10
76 
77 
78 @pytest.mark.csv
79 def test_csv_particle_reader(tmp_path, conf_const, ptcl_gun):
80  s = Sequencer(numThreads=1, events=10, logLevel=acts.logging.WARNING)
81  evGen = ptcl_gun(s)
82 
83  out = tmp_path / "csv"
84 
85  out.mkdir()
86 
87  s.addWriter(
88  conf_const(
89  CsvParticleWriter,
90  acts.logging.WARNING,
91  inputParticles=evGen.config.outputParticles,
92  outputStem="particle",
93  outputDir=str(out),
94  )
95  )
96 
97  s.run()
98 
99  # reset the seeder
100  s = Sequencer(numThreads=1, logLevel=acts.logging.WARNING)
101 
102  s.addReader(
103  conf_const(
104  CsvParticleReader,
105  acts.logging.WARNING,
106  inputDir=str(out),
107  inputStem="particle",
108  outputParticles="input_particles",
109  )
110  )
111 
112  alg = AssertCollectionExistsAlg(
113  "input_particles", "check_alg", acts.logging.WARNING
114  )
115 
116  s.addAlgorithm(alg)
117 
118  s.run()
119 
120  assert alg.events_seen == 10
121 
122 
123 @pytest.mark.parametrize(
124  "reader",
125  [RootParticleReader, RootTrajectorySummaryReader],
126 )
127 @pytest.mark.root
128 def test_root_reader_interface(reader, conf_const, tmp_path):
129  assert hasattr(reader, "Config")
130 
131  config = reader.Config
132 
133  assert hasattr(config, "filePath")
134 
135  kw = {"level": acts.logging.INFO, "filePath": str(tmp_path / "file.root")}
136 
137  assert conf_const(reader, **kw)
138 
139 
140 @pytest.mark.slow
141 @pytest.mark.root
142 @pytest.mark.odd
143 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
144 def test_root_material_track_reader(material_recording):
145 
146  input_tracks = material_recording / "geant4_material_tracks.root"
147  assert input_tracks.exists()
148 
149  s = Sequencer(numThreads=1)
150 
151  s.addReader(
152  RootMaterialTrackReader(
153  level=acts.logging.INFO,
154  fileList=[str(input_tracks)],
155  )
156  )
157 
158  alg = AssertCollectionExistsAlg(
159  "material-tracks", "check_alg", acts.logging.WARNING
160  )
161  s.addAlgorithm(alg)
162 
163  s.run()
164 
165  assert alg.events_seen == 2
166 
167 
168 @pytest.mark.csv
169 def test_csv_meas_reader(tmp_path, fatras, trk_geo, conf_const):
170  s = Sequencer(numThreads=1, events=10)
171  evGen, simAlg, digiAlg = fatras(s)
172 
173  out = tmp_path / "csv"
174  out.mkdir()
175 
176  s.addWriter(
177  CsvMeasurementWriter(
178  level=acts.logging.INFO,
179  inputMeasurements=digiAlg.config.outputMeasurements,
180  inputClusters=digiAlg.config.outputClusters,
181  inputMeasurementSimHitsMap=digiAlg.config.outputMeasurementSimHitsMap,
182  outputDir=str(out),
183  )
184  )
185 
186  # Write hits, so we can later construct the measurement-particles-map
187  s.addWriter(
188  CsvSimHitWriter(
189  level=acts.logging.INFO,
190  inputSimHits=simAlg.config.outputSimHits,
191  outputDir=str(out),
192  outputStem="hits",
193  )
194  )
195 
196  s.run()
197 
198  # read back in
199  s = Sequencer(numThreads=1)
200 
201  s.addReader(
202  CsvSimHitReader(
203  level=acts.logging.INFO,
204  outputSimHits=simAlg.config.outputSimHits,
205  inputDir=str(out),
206  inputStem="hits",
207  )
208  )
209 
210  s.addReader(
211  conf_const(
212  CsvMeasurementReader,
213  level=acts.logging.WARNING,
214  outputMeasurements="measurements",
215  outputMeasurementSimHitsMap="simhitsmap",
216  outputSourceLinks="sourcelinks",
217  outputMeasurementParticlesMap="meas_ptcl_map",
218  inputSimHits=simAlg.config.outputSimHits,
219  inputDir=str(out),
220  )
221  )
222 
223  algs = [
224  AssertCollectionExistsAlg(k, f"check_alg_{k}", acts.logging.WARNING)
225  for k in ("measurements", "simhitsmap", "sourcelinks", "meas_ptcl_map")
226  ]
227  for alg in algs:
228  s.addAlgorithm(alg)
229 
230  s.run()
231 
232  for alg in algs:
233  assert alg.events_seen == 10
234 
235 
236 @pytest.mark.csv
237 def test_csv_simhits_reader(tmp_path, fatras, conf_const):
238  s = Sequencer(numThreads=1, events=10)
239  evGen, simAlg, digiAlg = fatras(s)
240 
241  out = tmp_path / "csv"
242  out.mkdir()
243 
244  s.addWriter(
245  CsvSimHitWriter(
246  level=acts.logging.INFO,
247  inputSimHits=simAlg.config.outputSimHits,
248  outputDir=str(out),
249  outputStem="hits",
250  )
251  )
252 
253  s.run()
254 
255  s = Sequencer(numThreads=1)
256 
257  s.addReader(
258  conf_const(
259  CsvSimHitReader,
260  level=acts.logging.INFO,
261  inputDir=str(out),
262  inputStem="hits",
263  outputSimHits="simhits",
264  )
265  )
266 
267  alg = AssertCollectionExistsAlg("simhits", "check_alg", acts.logging.WARNING)
268  s.addAlgorithm(alg)
269 
270  s.run()
271 
272  assert alg.events_seen == 10
273 
274 
275 @pytest.mark.csv
276 def test_csv_clusters_reader(tmp_path, fatras, conf_const, trk_geo, rng):
277  s = Sequencer(numThreads=1, events=10) # we're not going to use this one
278  evGen, simAlg, _ = fatras(s)
279  s = Sequencer(numThreads=1, events=10)
280  s.addReader(evGen)
281  s.addAlgorithm(simAlg)
282  digiAlg = PlanarSteppingAlgorithm(
283  level=acts.logging.WARNING,
284  inputSimHits=simAlg.config.outputSimHits,
285  outputClusters="clusters",
286  outputSourceLinks="sourcelinks",
287  outputDigiSourceLinks="digiSourceLink",
288  outputMeasurements="measurements",
289  outputMeasurementParticlesMap="meas_ptcl_map",
290  outputMeasurementSimHitsMap="meas_sh_map",
291  trackingGeometry=trk_geo,
292  randomNumbers=rng,
293  planarModuleStepper=PlanarModuleStepper(),
294  )
295  s.addAlgorithm(digiAlg)
296 
297  out = tmp_path / "csv"
298  out.mkdir()
299 
300  s.addWriter(
301  CsvPlanarClusterWriter(
302  level=acts.logging.WARNING,
303  outputDir=str(out),
304  inputSimHits=simAlg.config.outputSimHits,
305  inputClusters=digiAlg.config.outputClusters,
306  trackingGeometry=trk_geo,
307  )
308  )
309 
310  s.run()
311 
312  s = Sequencer(numThreads=1)
313 
314  s.addReader(
315  conf_const(
316  CsvPlanarClusterReader,
317  level=acts.logging.WARNING,
318  outputClusters="clusters",
319  inputDir=str(out),
320  outputHitIds="hits",
321  outputMeasurementParticlesMap="meas_ptcl_map",
322  outputSimHits="simhits",
323  trackingGeometry=trk_geo,
324  )
325  )
326 
327  algs = [
328  AssertCollectionExistsAlg(k, f"check_alg_{k}", acts.logging.WARNING)
329  for k in ("clusters", "simhits", "meas_ptcl_map")
330  ]
331  for alg in algs:
332  s.addAlgorithm(alg)
333 
334  s.run()
335 
336  for alg in algs:
337  assert alg.events_seen == 10
338 
339 
341  from DDSim.DD4hepSimulation import DD4hepSimulation
342 
343  ddsim = DD4hepSimulation()
344  if isinstance(ddsim.compactFile, list):
345  ddsim.compactFile = [input]
346  else:
347  ddsim.compactFile = input
348  ddsim.enableGun = True
349  ddsim.gun.direction = (1, 0, 0)
350  ddsim.gun.distribution = "eta"
351  ddsim.numberOfEvents = 10
352  ddsim.outputFile = output
353  ddsim.run()
354 
355 
356 @pytest.mark.slow
357 @pytest.mark.edm4hep
358 @pytest.mark.skipif(not edm4hepEnabled, reason="EDM4hep is not set up")
360  from acts.examples.edm4hep import EDM4hepSimHitReader
361 
362  tmp_file = str(tmp_path / "output_edm4hep.root")
363  odd_xml_file = str(getOpenDataDetectorDirectory() / "xml" / "OpenDataDetector.xml")
364 
365  with multiprocessing.get_context("spawn").Pool() as pool:
366  pool.apply(generate_input_test_edm4hep_simhit_reader, (odd_xml_file, tmp_file))
367 
368  assert os.path.exists(tmp_file)
369 
370  detector, trackingGeometry, decorators = getOpenDataDetector(
372  )
373 
374  s = Sequencer(numThreads=1)
375 
376  s.addReader(
377  EDM4hepSimHitReader(
378  level=acts.logging.INFO,
379  inputPath=tmp_file,
380  outputSimHits="simhits",
381  dd4hepDetector=detector,
382  )
383  )
384 
385  alg = AssertCollectionExistsAlg("simhits", "check_alg", acts.logging.WARNING)
386  s.addAlgorithm(alg)
387 
388  s.run()
389 
390  assert alg.events_seen == 10
391 
392 
393 @pytest.mark.edm4hep
394 @pytest.mark.skipif(not edm4hepEnabled, reason="EDM4hep is not set up")
395 def test_edm4hep_measurement_reader(tmp_path, fatras, conf_const):
396  from acts.examples.edm4hep import (
397  EDM4hepMeasurementWriter,
398  EDM4hepMeasurementReader,
399  )
400 
401  s = Sequencer(numThreads=1, events=10)
402  _, simAlg, digiAlg = fatras(s)
403 
404  out = tmp_path / "measurements_edm4hep.root"
405 
406  config = EDM4hepMeasurementWriter.Config(
407  inputMeasurements=digiAlg.config.outputMeasurements,
408  inputClusters=digiAlg.config.outputClusters,
409  outputPath=str(out),
410  )
411  s.addWriter(EDM4hepMeasurementWriter(level=acts.logging.INFO, config=config))
412  s.run()
413 
414  # read back in
415  s = Sequencer(numThreads=1)
416 
417  s.addReader(
418  conf_const(
419  EDM4hepMeasurementReader,
420  level=acts.logging.WARNING,
421  outputMeasurements="measurements",
422  outputMeasurementSimHitsMap="simhitsmap",
423  outputSourceLinks="sourcelinks",
424  inputPath=str(out),
425  )
426  )
427 
428  algs = [
429  AssertCollectionExistsAlg(k, f"check_alg_{k}", acts.logging.WARNING)
430  for k in ("measurements", "simhitsmap", "sourcelinks")
431  ]
432  for alg in algs:
433  s.addAlgorithm(alg)
434 
435  s.run()
436 
437  for alg in algs:
438  assert alg.events_seen == 10
439 
440 
441 @pytest.mark.edm4hep
442 @pytest.mark.skipif(not edm4hepEnabled, reason="EDM4hep is not set up")
443 def test_edm4hep_particle_reader(tmp_path, conf_const, ptcl_gun):
444  from acts.examples.edm4hep import (
445  EDM4hepParticleWriter,
446  EDM4hepParticleReader,
447  )
448 
449  s = Sequencer(numThreads=1, events=10, logLevel=acts.logging.WARNING)
450  evGen = ptcl_gun(s)
451 
452  out = tmp_path / "particles_edm4hep.root"
453 
454  out.mkdir()
455 
456  s.addWriter(
457  conf_const(
458  EDM4hepParticleWriter,
459  acts.logging.WARNING,
460  inputParticles=evGen.config.outputParticles,
461  outputPath=str(out),
462  )
463  )
464 
465  s.run()
466 
467  # reset the seeder
468  s = Sequencer(numThreads=1, logLevel=acts.logging.WARNING)
469 
470  s.addReader(
471  conf_const(
472  EDM4hepParticleReader,
473  acts.logging.WARNING,
474  inputPath=str(out),
475  outputParticles="input_particles",
476  )
477  )
478 
479  alg = AssertCollectionExistsAlg(
480  "input_particles", "check_alg", acts.logging.WARNING
481  )
482 
483  s.addAlgorithm(alg)
484 
485  s.run()
486 
487  assert alg.events_seen == 10
488 
489 
490 @pytest.mark.edm4hep
491 @pytest.mark.skipif(not edm4hepEnabled, reason="EDM4hep is not set up")
493  from acts.examples.edm4hep import EDM4hepTrackWriter, EDM4hepTrackReader
494 
495  detector, trackingGeometry, decorators = acts.examples.GenericDetector.create()
496  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
497 
498  from truth_tracking_kalman import runTruthTrackingKalman
499 
500  s = Sequencer(numThreads=1, events=10)
502  trackingGeometry,
503  field,
504  digiConfigFile=Path(
505  str(
506  Path(__file__).parent.parent.parent.parent
507  / "Examples/Algorithms/Digitization/share/default-smearing-config-generic.json"
508  )
509  ),
510  outputDir=tmp_path,
511  s=s,
512  )
513 
514  out = tmp_path / "tracks_edm4hep.root"
515 
516  s.addWriter(
517  EDM4hepTrackWriter(
518  level=acts.logging.VERBOSE,
519  inputTracks="kfTracks",
520  outputPath=str(out),
521  Bz=2 * u.T,
522  )
523  )
524 
525  s.run()
526 
527  del s
528 
529  s = Sequencer(numThreads=1)
530  s.addReader(
531  EDM4hepTrackReader(
532  level=acts.logging.VERBOSE,
533  outputTracks="kfTracks",
534  inputPath=str(out),
535  Bz=2 * u.T,
536  )
537  )
538 
539  s.run()