Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
test_examples.py
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file test_examples.py
1 from pathlib import Path
2 import os
3 import json
4 import functools
5 import tarfile
6 import urllib.request
7 import subprocess
8 import sys
9 import re
10 import collections
11 
12 import pytest
13 
14 from helpers import (
15  geant4Enabled,
16  rootEnabled,
17  dd4hepEnabled,
18  hepmc3Enabled,
19  pythia8Enabled,
20  exatrkxEnabled,
21  onnxEnabled,
22  AssertCollectionExistsAlg,
23  failure_threshold,
24 )
25 
26 pytestmark = pytest.mark.skipif(not rootEnabled, reason="ROOT not set up")
27 
28 
29 import acts
30 from acts.examples import (
31  Sequencer,
32  GenericDetector,
33  AlignedDetector,
34 )
35 
36 from acts.examples.odd import getOpenDataDetector
37 from common import getOpenDataDetectorDirectory
38 
39 u = acts.UnitConstants
40 
41 
42 @pytest.fixture
43 def field():
44  return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
45 
46 
47 @pytest.fixture
48 def seq():
49  return Sequencer(events=10, numThreads=1)
50 
51 
52 def assert_csv_output(csv_path, stem):
53  __tracebackhide__ = True
54  # print(list(csv_path.iterdir()))
55  assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
56  assert all([f.stat().st_size > 100 for f in csv_path.iterdir()])
57 
58 
59 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
60  __tracebackhide__ = True
61  import ROOT
62 
63  ROOT.PyConfig.IgnoreCommandLineOptions = True
64  ROOT.gROOT.SetBatch(True)
65 
66  rf = ROOT.TFile.Open(str(root_file))
67  keys = [k.GetName() for k in rf.GetListOfKeys()]
68  assert tree_name in keys
69  print("Entries:", rf.Get(tree_name).GetEntries())
70  if non_zero:
71  assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
72  if exp is not None:
73  assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
74 
75 
76 def assert_has_entries(root_file, tree_name):
77  __tracebackhide__ = True
78  assert_entries(root_file, tree_name, non_zero=True)
79 
80 
81 @pytest.mark.slow
82 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
83 def test_pythia8(tmp_path, seq, assert_root_hash):
84  from pythia8 import runPythia8
85 
86  (tmp_path / "csv").mkdir()
87 
88  assert not (tmp_path / "pythia8_particles.root").exists()
89  assert len(list((tmp_path / "csv").iterdir())) == 0
90 
91  events = seq.config.events
92 
93  runPythia8(str(tmp_path), outputRoot=True, outputCsv=True, s=seq).run()
94 
95  del seq
96 
97  fp = tmp_path / "pythia8_particles.root"
98  assert fp.exists()
99  assert fp.stat().st_size > 2**10 * 50
100  assert_entries(fp, "particles", events)
101  assert_root_hash(fp.name, fp)
102 
103  assert len(list((tmp_path / "csv").iterdir())) > 0
104  assert_csv_output(tmp_path / "csv", "particles")
105 
106 
107 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
108  from fatras import runFatras
109 
110  csv = tmp_path / "csv"
111  csv.mkdir()
112 
113  nevents = 10
114 
115  root_files = [
116  (
117  "particles_final.root",
118  "particles",
119  ),
120  (
121  "particles_initial.root",
122  "particles",
123  ),
124  (
125  "hits.root",
126  "hits",
127  ),
128  ]
129 
130  assert len(list(csv.iterdir())) == 0
131  for rf, _ in root_files:
132  assert not (tmp_path / rf).exists()
133 
134  seq = Sequencer(events=nevents)
135  runFatras(trk_geo, field, str(tmp_path), s=seq).run()
136 
137  del seq
138 
139  assert_csv_output(csv, "particles_final")
140  assert_csv_output(csv, "particles_initial")
141  assert_csv_output(csv, "hits")
142  for f, tn in root_files:
143  rfp = tmp_path / f
144  assert rfp.exists()
145  assert rfp.stat().st_size > 2**10 * 10
146 
147  assert_has_entries(rfp, tn)
148  assert_root_hash(f, rfp)
149 
150 
151 @pytest.mark.slow
152 @pytest.mark.odd
153 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
154 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
155 def test_geant4(tmp_path, assert_root_hash):
156  # This test literally only ensures that the geant 4 example can run without erroring out
159  ) # just to make sure it can build
160 
161  csv = tmp_path / "csv"
162  csv.mkdir()
163 
164  root_files = [
165  "particles_final.root",
166  "particles_initial.root",
167  "hits.root",
168  ]
169 
170  assert len(list(csv.iterdir())) == 0
171  for rf in root_files:
172  assert not (tmp_path / rf).exists()
173 
174  script = (
175  Path(__file__).parent.parent.parent.parent
176  / "Examples"
177  / "Scripts"
178  / "Python"
179  / "geant4.py"
180  )
181  assert script.exists()
182  env = os.environ.copy()
183  env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
184  try:
185  subprocess.check_call(
186  [sys.executable, str(script)],
187  cwd=tmp_path,
188  env=env,
189  stderr=subprocess.STDOUT,
190  )
191  except subprocess.CalledProcessError as e:
192  print(e.output.decode("utf-8"))
193  raise
194 
195  assert_csv_output(csv, "particles_final")
196  assert_csv_output(csv, "particles_initial")
197  assert_csv_output(csv, "hits")
198  for f in root_files:
199  rfp = tmp_path / f
200  assert rfp.exists()
201  assert rfp.stat().st_size > 2**10 * 10
202 
203  assert_root_hash(f, rfp)
204 
205 
206 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
207  from seeding import runSeeding
208 
209  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
210 
211  csv = tmp_path / "csv"
212  csv.mkdir()
213 
214  seq = Sequencer(events=10, numThreads=1)
215 
216  root_files = [
217  (
218  "estimatedparams.root",
219  "estimatedparams",
220  ),
221  (
222  "performance_seeding.root",
223  None,
224  ),
225  (
226  "particles.root",
227  "particles",
228  ),
229  (
230  "particles_final.root",
231  "particles",
232  ),
233  (
234  "particles_initial.root",
235  "particles",
236  ),
237  ]
238 
239  for fn, _ in root_files:
240  fp = tmp_path / fn
241  assert not fp.exists()
242 
243  assert len(list(csv.iterdir())) == 0
244 
245  runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
246 
247  del seq
248 
249  for fn, tn in root_files:
250  fp = tmp_path / fn
251  assert fp.exists()
252  assert fp.stat().st_size > 100
253 
254  if tn is not None:
255  assert_has_entries(fp, tn)
256  assert_root_hash(fn, fp)
257 
258  assert_csv_output(csv, "particles")
259  assert_csv_output(csv, "particles_final")
260  assert_csv_output(csv, "particles_initial")
261 
262 
263 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
264  from seeding import runSeeding, SeedingAlgorithm
265 
266  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
267 
268  csv = tmp_path / "csv"
269  csv.mkdir()
270 
271  seq = Sequencer(events=10, numThreads=1)
272 
273  root_files = [
274  (
275  "estimatedparams.root",
276  "estimatedparams",
277  ),
278  (
279  "performance_seeding.root",
280  None,
281  ),
282  (
283  "particles.root",
284  "particles",
285  ),
286  (
287  "particles_final.root",
288  "particles",
289  ),
290  (
291  "particles_initial.root",
292  "particles",
293  ),
294  ]
295 
296  for fn, _ in root_files:
297  fp = tmp_path / fn
298  assert not fp.exists()
299 
300  assert len(list(csv.iterdir())) == 0
301 
302  runSeeding(
303  trk_geo,
304  field,
305  outputDir=str(tmp_path),
306  s=seq,
307  seedingAlgorithm=SeedingAlgorithm.Orthogonal,
308  ).run()
309 
310  del seq
311 
312  for fn, tn in root_files:
313  fp = tmp_path / fn
314  assert fp.exists()
315  assert fp.stat().st_size > 100
316 
317  if tn is not None:
318  assert_has_entries(fp, tn)
319  assert_root_hash(fn, fp)
320 
321  assert_csv_output(csv, "particles")
322  assert_csv_output(csv, "particles_final")
323  assert_csv_output(csv, "particles_initial")
324 
325 
326 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
327  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
328 
329  csv = tmp_path / "csv"
330  csv.mkdir()
331 
332  seq = Sequencer(events=10, numThreads=1)
333 
334  root_files = [
335  (
336  "estimatedparams.root",
337  "estimatedparams",
338  ),
339  (
340  "performance_seeding.root",
341  None,
342  ),
343  (
344  "particles.root",
345  "particles",
346  ),
347  (
348  "particles_final.root",
349  "particles",
350  ),
351  (
352  "particles_initial.root",
353  "particles",
354  ),
355  ]
356 
357  for fn, _ in root_files:
358  fp = tmp_path / fn
359  assert not fp.exists()
360 
361  assert len(list(csv.iterdir())) == 0
362 
363  rnd = acts.examples.RandomNumbers(seed=42)
364 
365  from acts.examples.simulation import (
366  addParticleGun,
367  EtaConfig,
368  MomentumConfig,
369  ParticleConfig,
370  addFatras,
371  addDigitization,
372  )
373 
375  seq,
376  MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
377  EtaConfig(-4.0, 4.0, True),
378  ParticleConfig(1, acts.PdgParticle.eMuon, True),
379  outputDirCsv=tmp_path / "csv",
380  outputDirRoot=str(tmp_path),
381  rnd=rnd,
382  )
383 
384  addFatras(
385  seq,
386  trk_geo,
387  field,
388  outputDirCsv=tmp_path / "csv",
389  outputDirRoot=str(tmp_path),
390  rnd=rnd,
391  )
392 
393  srcdir = Path(__file__).resolve().parent.parent.parent.parent
395  seq,
396  trk_geo,
397  field,
398  digiConfigFile=srcdir
399  / "Examples/Algorithms/Digitization/share/default-smearing-config-generic.json",
400  rnd=rnd,
401  )
402 
403  from acts.examples.reconstruction import (
404  addSeeding,
405  TruthSeedRanges,
406  )
407  from acts.examples.reconstruction import (
408  addSeeding,
409  TruthSeedRanges,
410  )
411  from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
412 
413  addSeeding(
414  seq,
415  trk_geo,
416  field,
417  TruthSeedRanges(pt=(1.0 * u.GeV, None), eta=(-4, 4), nHits=(9, None)),
418  *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
419  acts.logging.VERBOSE,
420  geoSelectionConfigFile=srcdir
421  / "Examples/Algorithms/TrackFinding/share/geoSelection-genericDetector.json",
422  inputParticles="particles_final", # use this to reproduce the original root_file_hashes.txt - remove to fix
423  outputDirRoot=str(tmp_path),
424  )
425 
426  seq.run()
427 
428  del seq
429 
430  for fn, tn in root_files:
431  fp = tmp_path / fn
432  assert fp.exists()
433  assert fp.stat().st_size > 100
434 
435  if tn is not None:
436  assert_has_entries(fp, tn)
437  assert_root_hash(fn, fp)
438 
439  assert_csv_output(csv, "particles")
440  assert_csv_output(csv, "particles_final")
441  assert_csv_output(csv, "particles_initial")
442 
443 
444 @pytest.mark.slow
445 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
446  from propagation import runPropagation
447 
448  obj = tmp_path / "obj"
449  obj.mkdir()
450 
451  root_files = [
452  (
453  "propagation_steps.root",
454  "propagation_steps",
455  10000,
456  )
457  ]
458 
459  for fn, _, _ in root_files:
460  fp = tmp_path / fn
461  assert not fp.exists()
462 
463  assert len(list(obj.iterdir())) == 0
464 
465  runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
466 
467  for fn, tn, ee in root_files:
468  fp = tmp_path / fn
469  assert fp.exists()
470  assert fp.stat().st_size > 2**10 * 50
471  assert_entries(fp, tn, ee)
472  assert_root_hash(fn, fp)
473 
474  assert len(list(obj.iterdir())) > 0
475 
476 
477 @pytest.mark.slow
478 @pytest.mark.odd
479 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
480 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
481 def test_material_recording(tmp_path, material_recording, assert_root_hash):
482  root_files = [
483  (
484  "geant4_material_tracks.root",
485  "material-tracks",
486  200,
487  )
488  ]
489 
490  for fn, tn, ee in root_files:
491  fp = material_recording / fn
492  assert fp.exists()
493  assert fp.stat().st_size > 2**10 * 50
494  assert_entries(fp, tn, ee)
495  assert_root_hash(fn, fp)
496 
497 
498 @pytest.mark.slow
499 @pytest.mark.odd
500 @pytest.mark.skipif(not hepmc3Enabled, reason="HepMC3 plugin not available")
501 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
502 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
503 def test_event_recording(tmp_path):
504  script = (
505  Path(__file__).parent.parent.parent.parent
506  / "Examples"
507  / "Scripts"
508  / "Python"
509  / "event_recording.py"
510  )
511  assert script.exists()
512 
513  env = os.environ.copy()
514  env["NEVENTS"] = "1"
515  env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
516  try:
517  subprocess.check_call(
518  [sys.executable, str(script)],
519  cwd=tmp_path,
520  env=env,
521  stderr=subprocess.STDOUT,
522  )
523  except subprocess.CalledProcessError as e:
524  print(e.output.decode("utf-8"))
525  raise
526 
527  from acts.examples.hepmc3 import HepMC3AsciiReader
528 
529  out_path = tmp_path / "hepmc3"
530  # out_path.mkdir()
531 
532  assert len([f for f in out_path.iterdir() if f.name.endswith("events.hepmc3")]) > 0
533  assert all([f.stat().st_size > 100 for f in out_path.iterdir()])
534 
535  s = Sequencer(numThreads=1)
536 
537  s.addReader(
538  HepMC3AsciiReader(
539  level=acts.logging.INFO,
540  inputDir=str(out_path),
541  inputStem="events",
542  outputEvents="hepmc-events",
543  )
544  )
545 
546  alg = AssertCollectionExistsAlg(
547  "hepmc-events", name="check_alg", level=acts.logging.INFO
548  )
549  s.addAlgorithm(alg)
550 
551  s.run()
552 
553  assert alg.events_seen == 1
554 
555 
556 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
558  tmp_path, assert_root_hash, revFiltMomThresh, detector_config
559 ):
560  from truth_tracking_kalman import runTruthTrackingKalman
561 
562  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
563 
564  seq = Sequencer(events=10, numThreads=1)
565 
566  root_files = [
567  ("trackstates_fitter.root", "trackstates", 19),
568  ("tracksummary_fitter.root", "tracksummary", 10),
569  ("performance_track_fitter.root", None, -1),
570  ]
571 
572  for fn, _, _ in root_files:
573  fp = tmp_path / fn
574  assert not fp.exists()
575 
577  trackingGeometry=detector_config.trackingGeometry,
578  field=field,
579  digiConfigFile=detector_config.digiConfigFile,
580  outputDir=tmp_path,
581  reverseFilteringMomThreshold=revFiltMomThresh,
582  s=seq,
583  )
584 
585  seq.run()
586 
587  del seq
588 
589  for fn, tn, ee in root_files:
590  fp = tmp_path / fn
591  assert fp.exists()
592  assert fp.stat().st_size > 1024
593  if tn is not None:
594  assert_has_entries(fp, tn)
595  assert_root_hash(fn, fp)
596 
597 
598 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
599  from truth_tracking_gsf import runTruthTrackingGsf
600 
601  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
602 
603  seq = Sequencer(
604  events=10,
605  numThreads=1,
606  fpeMasks=[
607  (
608  "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
609  acts.FpeType.FLTUND,
610  1,
611  ),
612  ],
613  )
614 
615  root_files = [
616  ("trackstates_gsf.root", "trackstates"),
617  ("tracksummary_gsf.root", "tracksummary"),
618  ]
619 
620  for fn, _ in root_files:
621  fp = tmp_path / fn
622  assert not fp.exists()
623 
625  trackingGeometry=detector_config.trackingGeometry,
626  decorators=detector_config.decorators,
627  field=field,
628  digiConfigFile=detector_config.digiConfigFile,
629  outputDir=tmp_path,
630  s=seq,
631  )
632 
633  # See https://github.com/acts-project/acts/issues/1300
634  with failure_threshold(acts.logging.FATAL):
635  seq.run()
636 
637  del seq
638 
639  for fn, tn in root_files:
640  fp = tmp_path / fn
641  assert fp.exists()
642  assert fp.stat().st_size > 1024
643  if tn is not None:
644  assert_root_hash(fn, fp)
645 
646 
647 def test_particle_gun(tmp_path, assert_root_hash):
648  from particle_gun import runParticleGun
649 
650  s = Sequencer(events=20, numThreads=-1)
651 
652  csv_dir = tmp_path / "csv"
653  root_file = tmp_path / "particles.root"
654 
655  assert not csv_dir.exists()
656  assert not root_file.exists()
657 
658  runParticleGun(str(tmp_path), s=s).run()
659 
660  assert csv_dir.exists()
661  assert root_file.exists()
662 
663  assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
664  assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
665 
666  assert root_file.stat().st_size > 200
667  assert_entries(root_file, "particles", 20)
668  assert_root_hash(root_file.name, root_file)
669 
670 
671 @pytest.mark.slow
672 @pytest.mark.odd
673 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
674 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
675  map_file = tmp_path / "material-map_tracks.root"
676  assert not map_file.exists()
677 
678  s = Sequencer(numThreads=1)
679 
680  detector, trackingGeometry, decorators = getOpenDataDetector(
682  )
683 
684  from material_mapping import runMaterialMapping
685 
687  trackingGeometry,
688  decorators,
689  outputDir=str(tmp_path),
690  inputDir=material_recording,
691  mappingStep=1,
692  s=s,
693  )
694 
695  s.run()
696 
697  # MaterialMapping alg only writes on destruct.
698  # See https://github.com/acts-project/acts/issues/881
699  del s
700 
701  mat_file = tmp_path / "material-map.json"
702 
703  assert mat_file.exists()
704  assert mat_file.stat().st_size > 10
705 
706  with mat_file.open() as fh:
707  assert json.load(fh)
708 
709  assert map_file.exists()
710  assert_entries(map_file, "material-tracks", 200)
711  assert_root_hash(map_file.name, map_file)
712 
713  val_file = tmp_path / "propagation-material.root"
714  assert not val_file.exists()
715 
716  # test the validation as well
717 
718  # we need to destroy the ODD to reload with material
719  del trackingGeometry
720  del detector
721 
722  detector, trackingGeometry, decorators = getOpenDataDetector(
724  mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
725  )
726 
727  from material_validation import runMaterialValidation
728 
729  s = Sequencer(events=10, numThreads=1)
730 
731  field = acts.NullBField()
732 
734  trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
735  )
736 
737  s.run()
738 
739  assert val_file.exists()
740  assert_entries(val_file, "material-tracks", 10000)
741  assert_root_hash(val_file.name, val_file)
742 
743 
744 @pytest.mark.slow
745 @pytest.mark.odd
746 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
747 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
748  map_file = tmp_path / "material-map-volume_tracks.root"
749  assert not map_file.exists()
750 
751  s = Sequencer(numThreads=1)
752 
753  geo_map = Path(__file__).parent / "geometry-volume-map.json"
754  assert geo_map.exists()
755  assert geo_map.stat().st_size > 10
756  with geo_map.open() as fh:
757  assert json.load(fh)
758 
759  detector, trackingGeometry, decorators = getOpenDataDetector(
761  mdecorator=acts.IMaterialDecorator.fromFile(geo_map),
762  )
763 
764  from material_mapping import runMaterialMapping
765 
767  trackingGeometry,
768  decorators,
769  mapName="material-map-volume",
770  outputDir=str(tmp_path),
771  inputDir=material_recording,
772  mappingStep=1,
773  s=s,
774  )
775 
776  s.run()
777 
778  # MaterialMapping alg only writes on destruct.
779  # See https://github.com/acts-project/acts/issues/881
780  del s
781 
782  mat_file = tmp_path / "material-map-volume.json"
783 
784  assert mat_file.exists()
785  assert mat_file.stat().st_size > 10
786 
787  with mat_file.open() as fh:
788  assert json.load(fh)
789 
790  assert map_file.exists()
791  assert_entries(map_file, "material-tracks", 200)
792  assert_root_hash(map_file.name, map_file)
793 
794  val_file = tmp_path / "propagation-volume-material.root"
795  assert not val_file.exists()
796 
797  # test the validation as well
798 
799  # we need to destroy the ODD to reload with material
800  del trackingGeometry
801  del detector
802 
803  detector, trackingGeometry, decorators = getOpenDataDetector(
805  mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
806  )
807 
808  from material_validation import runMaterialValidation
809 
810  s = Sequencer(events=10, numThreads=1)
811 
812  field = acts.NullBField()
813 
815  trackingGeometry,
816  decorators,
817  field,
818  outputDir=str(tmp_path),
819  outputName="propagation-volume-material",
820  s=s,
821  )
822 
823  s.run()
824 
825  assert val_file.exists()
826  assert_root_hash(val_file.name, val_file)
827 
828 
829 @pytest.mark.parametrize(
830  "geoFactory,nobj",
831  [
832  (GenericDetector.create, 450),
833  pytest.param(
834  functools.partial(getOpenDataDetector, getOpenDataDetectorDirectory()),
835  540,
836  marks=[
837  pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up"),
838  pytest.mark.slow,
839  pytest.mark.odd,
840  ],
841  ),
842  (functools.partial(AlignedDetector.create, iovSize=1), 450),
843  ],
844 )
845 @pytest.mark.slow
846 def test_geometry_example(geoFactory, nobj, tmp_path):
847  detector, trackingGeometry, decorators = geoFactory()
848 
849  from geometry import runGeometry
850 
851  json_dir = tmp_path / "json"
852  csv_dir = tmp_path / "csv"
853  obj_dir = tmp_path / "obj"
854 
855  for d in (json_dir, csv_dir, obj_dir):
856  d.mkdir()
857 
858  events = 5
859 
860  kwargs = dict(
861  trackingGeometry=trackingGeometry,
862  decorators=decorators,
863  events=events,
864  outputDir=str(tmp_path),
865  )
866 
867  runGeometry(outputJson=True, **kwargs)
868  runGeometry(outputJson=False, **kwargs)
869 
870  assert len(list(obj_dir.iterdir())) == nobj
871  assert all(f.stat().st_size > 200 for f in obj_dir.iterdir())
872 
873  assert len(list(csv_dir.iterdir())) == 3 * events
874  assert all(f.stat().st_size > 200 for f in csv_dir.iterdir())
875 
876  detector_files = [csv_dir / f"event{i:>09}-detectors.csv" for i in range(events)]
877  for detector_file in detector_files:
878  assert detector_file.exists()
879  assert detector_file.stat().st_size > 200
880 
881  contents = [f.read_text() for f in detector_files]
882  ref = contents[0]
883  for c in contents[1:]:
884  if isinstance(detector, AlignedDetector):
885  assert c != ref, "Detector writeout is expected to be different"
886  else:
887  assert c == ref, "Detector writeout is expected to be identical"
888 
889  if not isinstance(detector, AlignedDetector):
890  for f in [json_dir / f"event{i:>09}-detector.json" for i in range(events)]:
891  assert detector_file.exists()
892  with f.open() as fh:
893  data = json.load(fh)
894  assert data
895  material_file = tmp_path / "geometry-map.json"
896  assert material_file.exists()
897  assert material_file.stat().st_size > 200
898 
899 
900 DIGI_SHARE_DIR = (
901  Path(__file__).parent.parent.parent.parent
902  / "Examples/Algorithms/Digitization/share"
903 )
904 
905 
906 @pytest.mark.parametrize(
907  "digi_config_file",
908  [
909  DIGI_SHARE_DIR / "default-smearing-config-generic.json",
910  DIGI_SHARE_DIR / "default-geometric-config-generic.json",
911  ],
912  ids=["smeared", "geometric"],
913 )
914 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
915  from digitization import runDigitization
916 
917  s = Sequencer(events=10, numThreads=-1)
918 
919  csv_dir = tmp_path / "csv"
920  root_file = tmp_path / "measurements.root"
921 
922  assert not root_file.exists()
923  assert not csv_dir.exists()
924 
925  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
927  trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
928  )
929 
930  s.run()
931 
932  assert root_file.exists()
933  assert csv_dir.exists()
934 
935  assert len(list(csv_dir.iterdir())) == 3 * s.config.events
936  assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
937 
938  assert_entries(root_file, "vol9", 0)
939  assert_entries(root_file, "vol14", 0)
940 
941  if "smearing" in digi_config_file.name:
942  filled_entries = [f"vol{tn}" for tn in (8, 12, 13, 16, 17, 18)]
943  else:
944  # fmt: off
945  filled_entries = [
946  'vol8', 'vol8_lay2', 'vol12_lay8_mod117', 'vol12_lay10', 'vol12_lay10_mod154',
947  'vol12_lay10_mod163', 'vol12_lay12', 'vol12_lay12_mod150', 'vol13',
948  'vol13_lay2', 'vol16_lay2_mod53', 'vol16_lay4', 'vol16_lay6', 'vol16_lay8',
949  'vol16_lay10', 'vol16_lay12', 'vol17', 'vol17_lay2', 'vol18_lay2',
950  'vol18_lay2_mod1', 'vol18_lay2_mod49', 'vol18_lay2_mod86', 'vol18_lay4',
951  ]
952  # fmt: on
953 
954  for entry in filled_entries:
955  assert_has_entries(root_file, entry)
956 
957  assert_root_hash(root_file.name, root_file)
958 
959 
960 @pytest.mark.parametrize(
961  "digi_config_file",
962  [
963  DIGI_SHARE_DIR / "default-smearing-config-generic.json",
964  DIGI_SHARE_DIR / "default-geometric-config-generic.json",
965  ],
966  ids=["smeared", "geometric"],
967 )
969  trk_geo, tmp_path, assert_root_hash, digi_config_file
970 ):
971  from particle_gun import runParticleGun
972  from digitization import runDigitization
973 
974  ptcl_dir = tmp_path / "ptcl"
975  ptcl_dir.mkdir()
976  pgs = Sequencer(events=20, numThreads=-1)
977  runParticleGun(str(ptcl_dir), s=pgs)
978  pgs.run()
979 
980  s = Sequencer(numThreads=-1)
981 
982  csv_dir = tmp_path / "csv"
983  root_file = tmp_path / "measurements.root"
984 
985  assert not root_file.exists()
986  assert not csv_dir.exists()
987 
989  "particles.root",
990  ptcl_dir / "particles.root",
991  )
992 
993  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
995  trk_geo,
996  field,
997  outputDir=tmp_path,
998  digiConfigFile=digi_config_file,
999  particlesInput=ptcl_dir / "particles.root",
1000  s=s,
1001  doMerge=True,
1002  )
1003 
1004  s.run()
1005 
1006  assert root_file.exists()
1007  assert csv_dir.exists()
1008 
1009  assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
1010  assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
1011 
1012  assert_entries(root_file, "vol7", 0)
1013  assert_entries(root_file, "vol9", 0)
1014 
1015  if "smearing" in digi_config_file.name:
1016  filled_entries = [f"vol{tn}" for tn in (8, 12, 13, 16, 17, 18)]
1017  else:
1018  # fmt: off
1019  filled_entries = [
1020  "vol8", "vol8_lay2", "vol12_lay8_mod120", "vol12_lay10_mod120",
1021  "vol12_lay10_mod144", "vol12_lay12", "vol12_lay12_mod111",
1022  "vol12_lay12_mod137", "vol12_lay12_mod170", "vol13", "vol13_lay2",
1023  "vol14_lay2_mod93", "vol14_lay2_mod102", "vol14_lay2_mod112",
1024  "vol14_lay2_mod118", "vol14_lay4_mod112", "vol14_lay4_mod118",
1025  "vol14_lay4_mod152", "vol14_lay4_mod161", "vol16_lay4", "vol16_lay6",
1026  "vol16_lay8", "vol16_lay10", "vol16_lay12", "vol17", "vol17_lay2",
1027  "vol18_lay2", "vol18_lay2_mod71", "vol18_lay4", "vol18_lay6",
1028  "vol18_lay8", "vol18_lay10"
1029  ]
1030  # fmt: on
1031 
1032  for entry in filled_entries:
1033  assert_has_entries(root_file, entry)
1034 
1035  assert_root_hash(root_file.name, root_file)
1036 
1037 
1038 def test_digitization_config_example(trk_geo, tmp_path):
1039  from digitization_config import runDigitizationConfig
1040 
1041  out_file = tmp_path / "output.json"
1042  assert not out_file.exists()
1043 
1044  input = (
1045  Path(__file__).parent
1046  / "../../../Examples/Algorithms/Digitization/share/default-smearing-config-generic.json"
1047  )
1048  assert input.exists(), input.resolve()
1049 
1050  runDigitizationConfig(trk_geo, input=input, output=out_file)
1051 
1052  assert out_file.exists()
1053 
1054  with out_file.open() as fh:
1055  data = json.load(fh)
1056  assert len(data.keys()) == 2
1057  assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
1058  assert (
1059  data["acts-geometry-hierarchy-map"]["value-identifier"]
1060  == "digitization-configuration"
1061  )
1062  assert len(data["entries"]) == 27
1063 
1064 
1065 @pytest.mark.parametrize(
1066  "truthSmeared,truthEstimated",
1067  [
1068  [False, False],
1069  [False, True],
1070  [True, False],
1071  ],
1072  ids=["full_seeding", "truth_estimated", "truth_smeared"],
1073 )
1074 @pytest.mark.slow
1076  tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1077 ):
1078  csv = tmp_path / "csv"
1079 
1080  assert not csv.exists()
1081 
1082  field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1083  events = 100
1084  s = Sequencer(events=events, numThreads=1) # Digitization is not thread-safe
1085 
1086  root_files = [
1087  (
1088  "performance_ckf.root",
1089  None,
1090  ),
1091  (
1092  "trackstates_ckf.root",
1093  "trackstates",
1094  ),
1095  (
1096  "tracksummary_ckf.root",
1097  "tracksummary",
1098  ),
1099  ]
1100 
1101  if not truthSmeared:
1102  root_files += [
1103  (
1104  "performance_seeding.root",
1105  None,
1106  ),
1107  ]
1108 
1109  for rf, _ in root_files:
1110  assert not (tmp_path / rf).exists()
1111 
1112  from ckf_tracks import runCKFTracks
1113 
1114  runCKFTracks(
1115  detector_config.trackingGeometry,
1116  detector_config.decorators,
1117  field=field,
1118  outputCsv=True,
1119  outputDir=tmp_path,
1120  geometrySelection=detector_config.geometrySelection,
1121  digiConfigFile=detector_config.digiConfigFile,
1122  truthSmearedSeeded=truthSmeared,
1123  truthEstimatedSeeded=truthEstimated,
1124  s=s,
1125  )
1126 
1127  s.run()
1128 
1129  del s # files are closed in destructors, not great
1130 
1131  assert csv.exists()
1132  for rf, tn in root_files:
1133  rp = tmp_path / rf
1134  assert rp.exists()
1135  if tn is not None:
1136  assert_root_hash(rf, rp)
1137 
1138  assert (
1139  len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1140  )
1141  assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1142 
1143 
1144 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1145 @pytest.mark.odd
1146 @pytest.mark.slow
1148  # This test literally only ensures that the full chain example can run without erroring out
1151  ) # just to make sure it can build
1152 
1153  script = (
1154  Path(__file__).parent.parent.parent.parent
1155  / "Examples"
1156  / "Scripts"
1157  / "Python"
1158  / "full_chain_odd.py"
1159  )
1160  assert script.exists()
1161  env = os.environ.copy()
1162  env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1163  try:
1164  subprocess.check_call(
1165  [sys.executable, str(script), "-n1"],
1166  cwd=tmp_path,
1167  env=env,
1168  stderr=subprocess.STDOUT,
1169  )
1170  except subprocess.CalledProcessError as e:
1171  print(e.output.decode("utf-8"))
1172  raise
1173 
1174 
1175 @pytest.mark.skipif(
1176  not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1177 )
1178 @pytest.mark.slow
1180  # This test literally only ensures that the full chain example can run without erroring out
1183  ) # just to make sure it can build
1184 
1185  script = (
1186  Path(__file__).parent.parent.parent.parent
1187  / "Examples"
1188  / "Scripts"
1189  / "Python"
1190  / "full_chain_odd.py"
1191  )
1192  assert script.exists()
1193  env = os.environ.copy()
1194  env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1195  try:
1196  stdout = subprocess.check_output(
1197  [sys.executable, str(script), "-n1", "--geant4", "--ttbar"],
1198  cwd=tmp_path,
1199  env=env,
1200  stderr=subprocess.STDOUT,
1201  )
1202  stdout = stdout.decode("utf-8")
1203  except subprocess.CalledProcessError as e:
1204  print(e.output.decode("utf-8"))
1205  raise
1206 
1207  # collect and compare known errors
1208  errors = []
1209  error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1210  for match in error_regex.finditer(stdout):
1211  (algo,) = match.groups()
1212  errors.append(algo)
1213  errors = collections.Counter(errors)
1214  assert dict(errors) == {}, stdout
1215 
1216 
1217 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1218 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1219 @pytest.mark.slow
1220 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1221  root_file = "performance_ambiML.root"
1222  output_dir = "odd_output"
1223  assert not (tmp_path / root_file).exists()
1224  # This test literally only ensures that the full chain example can run without erroring out
1227  ) # just to make sure it can build
1228 
1229  script = (
1230  Path(__file__).parent.parent.parent.parent
1231  / "Examples"
1232  / "Scripts"
1233  / "Python"
1234  / "full_chain_odd.py"
1235  )
1236  assert script.exists()
1237  env = os.environ.copy()
1238  env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1239  try:
1240  subprocess.check_call(
1241  [sys.executable, str(script), "-n5", "--MLSolver"],
1242  cwd=tmp_path,
1243  env=env,
1244  stderr=subprocess.STDOUT,
1245  )
1246  except subprocess.CalledProcessError as e:
1247  print(e.output.decode("utf-8"))
1248  raise
1249 
1250  rfp = tmp_path / output_dir / root_file
1251  assert rfp.exists()
1252 
1253  assert_root_hash(root_file, rfp)
1254 
1255 
1256 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1257  from bfield_writing import runBFieldWriting
1258 
1259  root_files = [
1260  ("solenoid.root", "solenoid", 100),
1261  ("solenoid2.root", "solenoid", 100),
1262  ]
1263 
1264  for fn, _, _ in root_files:
1265  fp = tmp_path / fn
1266  assert not fp.exists()
1267 
1268  runBFieldWriting(outputDir=tmp_path, rewrites=1)
1269 
1270  for fn, tn, ee in root_files:
1271  fp = tmp_path / fn
1272  assert fp.exists()
1273  assert fp.stat().st_size > 2**10 * 2
1274  assert_entries(fp, tn, ee)
1275  assert_root_hash(fn, fp)
1276 
1277 
1278 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1279 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1280 @pytest.mark.skipif(not exatrkxEnabled, reason="ExaTrkX environment not set up")
1281 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1282  if backend == "onnx" and hardware == "cpu":
1283  pytest.skip("Combination of ONNX and CPU not yet supported")
1284 
1285  root_file = "performance_track_finding.root"
1286  assert not (tmp_path / root_file).exists()
1287 
1288  if backend == "onnx":
1289  url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1290  else:
1291  url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1292 
1293  tarfile_name = tmp_path / "models.tar"
1294  urllib.request.urlretrieve(url, tarfile_name)
1295  tarfile.open(tarfile_name).extractall(tmp_path)
1296  script = (
1297  Path(__file__).parent.parent.parent.parent
1298  / "Examples"
1299  / "Scripts"
1300  / "Python"
1301  / "exatrkx.py"
1302  )
1303  assert script.exists()
1304  env = os.environ.copy()
1305  env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1306 
1307  if hardware == "cpu":
1308  env["CUDA_VISIBLE_DEVICES"] = ""
1309 
1310  try:
1311  subprocess.check_call(
1312  [sys.executable, str(script), backend],
1313  cwd=tmp_path,
1314  env=env,
1315  stderr=subprocess.STDOUT,
1316  )
1317  except subprocess.CalledProcessError as e:
1318  print(e.output.decode("utf-8"))
1319  raise
1320 
1321  rfp = tmp_path / root_file
1322  assert rfp.exists()
1323 
1324  assert_root_hash(root_file, rfp)