1 from pathlib
import Path
22 AssertCollectionExistsAlg,
26 pytestmark = pytest.mark.skipif(
not rootEnabled, reason=
"ROOT not set up")
37 from common
import getOpenDataDetectorDirectory
39 u = acts.UnitConstants
44 return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
49 return Sequencer(events=10, numThreads=1)
53 __tracebackhide__ =
True
55 assert len([f
for f
in csv_path.iterdir()
if f.name.endswith(stem +
".csv")]) > 0
56 assert all([f.stat().st_size > 100
for f
in csv_path.iterdir()])
60 __tracebackhide__ =
True
63 ROOT.PyConfig.IgnoreCommandLineOptions =
True
64 ROOT.gROOT.SetBatch(
True)
66 rf = ROOT.TFile.Open(
str(root_file))
67 keys = [k.GetName()
for k
in rf.GetListOfKeys()]
68 assert tree_name
in keys
69 print(
"Entries:", rf.Get(tree_name).GetEntries())
71 assert rf.Get(tree_name).GetEntries() > 0, f
"{root_file}:{tree_name}"
73 assert rf.Get(tree_name).GetEntries() == exp, f
"{root_file}:{tree_name}"
77 __tracebackhide__ =
True
82 @pytest.mark.skipif(
not pythia8Enabled, reason=
"Pythia8 not set up")
84 from pythia8
import runPythia8
86 (tmp_path /
"csv").mkdir()
88 assert not (tmp_path /
"pythia8_particles.root").
exists()
89 assert len(list((tmp_path /
"csv").iterdir())) == 0
91 events = seq.config.events
97 fp = tmp_path /
"pythia8_particles.root"
99 assert fp.stat().st_size > 2**10 * 50
103 assert len(list((tmp_path /
"csv").iterdir())) > 0
108 from fatras
import runFatras
110 csv = tmp_path /
"csv"
117 "particles_final.root",
121 "particles_initial.root",
130 assert len(list(csv.iterdir())) == 0
131 for rf, _
in root_files:
132 assert not (tmp_path / rf).
exists()
134 seq = Sequencer(events=nevents)
142 for f, tn
in root_files:
145 assert rfp.stat().st_size > 2**10 * 10
153 @pytest.mark.skipif(
not geant4Enabled, reason=
"Geant4 not set up")
154 @pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up")
161 csv = tmp_path /
"csv"
165 "particles_final.root",
166 "particles_initial.root",
170 assert len(list(csv.iterdir())) == 0
171 for rf
in root_files:
172 assert not (tmp_path / rf).
exists()
175 Path(__file__).parent.parent.parent.parent
181 assert script.exists()
182 env = os.environ.copy()
183 env[
"ACTS_LOG_FAILURE_THRESHOLD"] =
"WARNING"
185 subprocess.check_call(
186 [sys.executable,
str(script)],
189 stderr=subprocess.STDOUT,
191 except subprocess.CalledProcessError
as e:
192 print(e.output.decode(
"utf-8"))
201 assert rfp.stat().st_size > 2**10 * 10
207 from seeding
import runSeeding
209 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
211 csv = tmp_path /
"csv"
214 seq = Sequencer(events=10, numThreads=1)
218 "estimatedparams.root",
222 "performance_seeding.root",
230 "particles_final.root",
234 "particles_initial.root",
239 for fn, _
in root_files:
241 assert not fp.exists()
243 assert len(list(csv.iterdir())) == 0
249 for fn, tn
in root_files:
252 assert fp.stat().st_size > 100
264 from seeding
import runSeeding, SeedingAlgorithm
266 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
268 csv = tmp_path /
"csv"
271 seq = Sequencer(events=10, numThreads=1)
275 "estimatedparams.root",
279 "performance_seeding.root",
287 "particles_final.root",
291 "particles_initial.root",
296 for fn, _
in root_files:
298 assert not fp.exists()
300 assert len(list(csv.iterdir())) == 0
305 outputDir=
str(tmp_path),
307 seedingAlgorithm=SeedingAlgorithm.Orthogonal,
312 for fn, tn
in root_files:
315 assert fp.stat().st_size > 100
327 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
329 csv = tmp_path /
"csv"
332 seq = Sequencer(events=10, numThreads=1)
336 "estimatedparams.root",
340 "performance_seeding.root",
348 "particles_final.root",
352 "particles_initial.root",
357 for fn, _
in root_files:
359 assert not fp.exists()
361 assert len(list(csv.iterdir())) == 0
363 rnd = acts.examples.RandomNumbers(seed=42)
379 outputDirCsv=tmp_path /
"csv",
380 outputDirRoot=
str(tmp_path),
388 outputDirCsv=tmp_path /
"csv",
389 outputDirRoot=
str(tmp_path),
393 srcdir = Path(__file__).resolve().parent.parent.parent.parent
398 digiConfigFile=srcdir
399 /
"Examples/Algorithms/Digitization/share/default-smearing-config-generic.json",
419 acts.logging.VERBOSE,
420 geoSelectionConfigFile=srcdir
421 /
"Examples/Algorithms/TrackFinding/share/geoSelection-genericDetector.json",
422 inputParticles=
"particles_final",
423 outputDirRoot=
str(tmp_path),
430 for fn, tn
in root_files:
433 assert fp.stat().st_size > 100
446 from propagation
import runPropagation
448 obj = tmp_path /
"obj"
453 "propagation_steps.root",
459 for fn, _, _
in root_files:
461 assert not fp.exists()
463 assert len(list(obj.iterdir())) == 0
467 for fn, tn, ee
in root_files:
470 assert fp.stat().st_size > 2**10 * 50
474 assert len(list(obj.iterdir())) > 0
479 @pytest.mark.skipif(
not geant4Enabled, reason=
"Geant4 not set up")
480 @pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up")
484 "geant4_material_tracks.root",
490 for fn, tn, ee
in root_files:
491 fp = material_recording / fn
493 assert fp.stat().st_size > 2**10 * 50
500 @pytest.mark.skipif(
not hepmc3Enabled, reason=
"HepMC3 plugin not available")
501 @pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up")
502 @pytest.mark.skipif(
not geant4Enabled, reason=
"Geant4 not set up")
505 Path(__file__).parent.parent.parent.parent
509 /
"event_recording.py"
511 assert script.exists()
513 env = os.environ.copy()
515 env[
"ACTS_LOG_FAILURE_THRESHOLD"] =
"WARNING"
517 subprocess.check_call(
518 [sys.executable,
str(script)],
521 stderr=subprocess.STDOUT,
523 except subprocess.CalledProcessError
as e:
524 print(e.output.decode(
"utf-8"))
529 out_path = tmp_path /
"hepmc3"
532 assert len([f
for f
in out_path.iterdir()
if f.name.endswith(
"events.hepmc3")]) > 0
533 assert all([f.stat().st_size > 100
for f
in out_path.iterdir()])
535 s = Sequencer(numThreads=1)
539 level=acts.logging.INFO,
540 inputDir=
str(out_path),
542 outputEvents=
"hepmc-events",
546 alg = AssertCollectionExistsAlg(
547 "hepmc-events", name=
"check_alg", level=acts.logging.INFO
553 assert alg.events_seen == 1
556 @pytest.mark.parametrize(
"revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
558 tmp_path, assert_root_hash, revFiltMomThresh, detector_config
560 from truth_tracking_kalman
import runTruthTrackingKalman
562 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
564 seq = Sequencer(events=10, numThreads=1)
567 (
"trackstates_fitter.root",
"trackstates", 19),
568 (
"tracksummary_fitter.root",
"tracksummary", 10),
569 (
"performance_track_fitter.root",
None, -1),
572 for fn, _, _
in root_files:
574 assert not fp.exists()
577 trackingGeometry=detector_config.trackingGeometry,
579 digiConfigFile=detector_config.digiConfigFile,
581 reverseFilteringMomThreshold=revFiltMomThresh,
589 for fn, tn, ee
in root_files:
592 assert fp.stat().st_size > 1024
599 from truth_tracking_gsf
import runTruthTrackingGsf
601 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
608 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
616 (
"trackstates_gsf.root",
"trackstates"),
617 (
"tracksummary_gsf.root",
"tracksummary"),
620 for fn, _
in root_files:
622 assert not fp.exists()
625 trackingGeometry=detector_config.trackingGeometry,
626 decorators=detector_config.decorators,
628 digiConfigFile=detector_config.digiConfigFile,
639 for fn, tn
in root_files:
642 assert fp.stat().st_size > 1024
648 from particle_gun
import runParticleGun
650 s = Sequencer(events=20, numThreads=-1)
652 csv_dir = tmp_path /
"csv"
653 root_file = tmp_path /
"particles.root"
655 assert not csv_dir.exists()
656 assert not root_file.exists()
660 assert csv_dir.exists()
661 assert root_file.exists()
663 assert len([f
for f
in csv_dir.iterdir()
if f.name.endswith(
"particles.csv")]) > 0
664 assert all([f.stat().st_size > 100
for f
in csv_dir.iterdir()])
666 assert root_file.stat().st_size > 200
673 @pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up")
675 map_file = tmp_path /
"material-map_tracks.root"
676 assert not map_file.exists()
678 s = Sequencer(numThreads=1)
684 from material_mapping
import runMaterialMapping
689 outputDir=
str(tmp_path),
690 inputDir=material_recording,
701 mat_file = tmp_path /
"material-map.json"
703 assert mat_file.exists()
704 assert mat_file.stat().st_size > 10
706 with mat_file.open()
as fh:
709 assert map_file.exists()
713 val_file = tmp_path /
"propagation-material.root"
714 assert not val_file.exists()
724 mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
727 from material_validation
import runMaterialValidation
729 s = Sequencer(events=10, numThreads=1)
731 field = acts.NullBField()
734 trackingGeometry, decorators, field, outputDir=
str(tmp_path), s=s
739 assert val_file.exists()
746 @pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up")
748 map_file = tmp_path /
"material-map-volume_tracks.root"
749 assert not map_file.exists()
751 s = Sequencer(numThreads=1)
753 geo_map = Path(__file__).parent /
"geometry-volume-map.json"
754 assert geo_map.exists()
755 assert geo_map.stat().st_size > 10
756 with geo_map.open()
as fh:
761 mdecorator=acts.IMaterialDecorator.fromFile(geo_map),
764 from material_mapping
import runMaterialMapping
769 mapName=
"material-map-volume",
770 outputDir=
str(tmp_path),
771 inputDir=material_recording,
782 mat_file = tmp_path /
"material-map-volume.json"
784 assert mat_file.exists()
785 assert mat_file.stat().st_size > 10
787 with mat_file.open()
as fh:
790 assert map_file.exists()
794 val_file = tmp_path /
"propagation-volume-material.root"
795 assert not val_file.exists()
805 mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
808 from material_validation
import runMaterialValidation
810 s = Sequencer(events=10, numThreads=1)
812 field = acts.NullBField()
818 outputDir=
str(tmp_path),
819 outputName=
"propagation-volume-material",
825 assert val_file.exists()
829 @pytest.mark.parametrize(
832 (GenericDetector.create, 450),
837 pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up"),
842 (functools.partial(AlignedDetector.create, iovSize=1), 450),
847 detector, trackingGeometry, decorators = geoFactory()
849 from geometry
import runGeometry
851 json_dir = tmp_path /
"json"
852 csv_dir = tmp_path /
"csv"
853 obj_dir = tmp_path /
"obj"
855 for d
in (json_dir, csv_dir, obj_dir):
861 trackingGeometry=trackingGeometry,
862 decorators=decorators,
864 outputDir=
str(tmp_path),
870 assert len(list(obj_dir.iterdir())) == nobj
871 assert all(f.stat().st_size > 200
for f
in obj_dir.iterdir())
873 assert len(list(csv_dir.iterdir())) == 3 * events
874 assert all(f.stat().st_size > 200
for f
in csv_dir.iterdir())
876 detector_files = [csv_dir / f
"event{i:>09}-detectors.csv" for i
in range(events)]
877 for detector_file
in detector_files:
878 assert detector_file.exists()
879 assert detector_file.stat().st_size > 200
881 contents = [f.read_text()
for f
in detector_files]
883 for c
in contents[1:]:
884 if isinstance(detector, AlignedDetector):
885 assert c != ref,
"Detector writeout is expected to be different"
887 assert c == ref,
"Detector writeout is expected to be identical"
889 if not isinstance(detector, AlignedDetector):
890 for f
in [json_dir / f
"event{i:>09}-detector.json" for i
in range(events)]:
891 assert detector_file.exists()
895 material_file = tmp_path /
"geometry-map.json"
896 assert material_file.exists()
897 assert material_file.stat().st_size > 200
901 Path(__file__).parent.parent.parent.parent
902 /
"Examples/Algorithms/Digitization/share"
906 @pytest.mark.parametrize(
909 DIGI_SHARE_DIR /
"default-smearing-config-generic.json",
910 DIGI_SHARE_DIR /
"default-geometric-config-generic.json",
912 ids=[
"smeared",
"geometric"],
915 from digitization
import runDigitization
917 s = Sequencer(events=10, numThreads=-1)
919 csv_dir = tmp_path /
"csv"
920 root_file = tmp_path /
"measurements.root"
922 assert not root_file.exists()
923 assert not csv_dir.exists()
925 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
927 trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
932 assert root_file.exists()
933 assert csv_dir.exists()
935 assert len(list(csv_dir.iterdir())) == 3 * s.config.events
936 assert all(f.stat().st_size > 50
for f
in csv_dir.iterdir())
941 if "smearing" in digi_config_file.name:
942 filled_entries = [f
"vol{tn}" for tn
in (8, 12, 13, 16, 17, 18)]
946 'vol8',
'vol8_lay2',
'vol12_lay8_mod117',
'vol12_lay10',
'vol12_lay10_mod154',
947 'vol12_lay10_mod163',
'vol12_lay12',
'vol12_lay12_mod150',
'vol13',
948 'vol13_lay2',
'vol16_lay2_mod53',
'vol16_lay4',
'vol16_lay6',
'vol16_lay8',
949 'vol16_lay10',
'vol16_lay12',
'vol17',
'vol17_lay2',
'vol18_lay2',
950 'vol18_lay2_mod1',
'vol18_lay2_mod49',
'vol18_lay2_mod86',
'vol18_lay4',
954 for entry
in filled_entries:
960 @pytest.mark.parametrize(
963 DIGI_SHARE_DIR /
"default-smearing-config-generic.json",
964 DIGI_SHARE_DIR /
"default-geometric-config-generic.json",
966 ids=[
"smeared",
"geometric"],
969 trk_geo, tmp_path, assert_root_hash, digi_config_file
971 from particle_gun
import runParticleGun
972 from digitization
import runDigitization
974 ptcl_dir = tmp_path /
"ptcl"
976 pgs = Sequencer(events=20, numThreads=-1)
980 s = Sequencer(numThreads=-1)
982 csv_dir = tmp_path /
"csv"
983 root_file = tmp_path /
"measurements.root"
985 assert not root_file.exists()
986 assert not csv_dir.exists()
990 ptcl_dir /
"particles.root",
993 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
998 digiConfigFile=digi_config_file,
999 particlesInput=ptcl_dir /
"particles.root",
1006 assert root_file.exists()
1007 assert csv_dir.exists()
1009 assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
1010 assert all(f.stat().st_size > 50
for f
in csv_dir.iterdir())
1015 if "smearing" in digi_config_file.name:
1016 filled_entries = [f
"vol{tn}" for tn
in (8, 12, 13, 16, 17, 18)]
1020 "vol8",
"vol8_lay2",
"vol12_lay8_mod120",
"vol12_lay10_mod120",
1021 "vol12_lay10_mod144",
"vol12_lay12",
"vol12_lay12_mod111",
1022 "vol12_lay12_mod137",
"vol12_lay12_mod170",
"vol13",
"vol13_lay2",
1023 "vol14_lay2_mod93",
"vol14_lay2_mod102",
"vol14_lay2_mod112",
1024 "vol14_lay2_mod118",
"vol14_lay4_mod112",
"vol14_lay4_mod118",
1025 "vol14_lay4_mod152",
"vol14_lay4_mod161",
"vol16_lay4",
"vol16_lay6",
1026 "vol16_lay8",
"vol16_lay10",
"vol16_lay12",
"vol17",
"vol17_lay2",
1027 "vol18_lay2",
"vol18_lay2_mod71",
"vol18_lay4",
"vol18_lay6",
1028 "vol18_lay8",
"vol18_lay10"
1032 for entry
in filled_entries:
1039 from digitization_config
import runDigitizationConfig
1041 out_file = tmp_path /
"output.json"
1042 assert not out_file.exists()
1045 Path(__file__).parent
1046 /
"../../../Examples/Algorithms/Digitization/share/default-smearing-config-generic.json"
1048 assert input.exists(), input.resolve()
1052 assert out_file.exists()
1054 with out_file.open()
as fh:
1055 data = json.load(fh)
1056 assert len(data.keys()) == 2
1057 assert data[
"acts-geometry-hierarchy-map"][
"format-version"] == 0
1059 data[
"acts-geometry-hierarchy-map"][
"value-identifier"]
1060 ==
"digitization-configuration"
1062 assert len(data[
"entries"]) == 27
1065 @pytest.mark.parametrize(
1066 "truthSmeared,truthEstimated",
1072 ids=[
"full_seeding",
"truth_estimated",
"truth_smeared"],
1076 tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1078 csv = tmp_path /
"csv"
1080 assert not csv.exists()
1082 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1084 s = Sequencer(events=events, numThreads=1)
1088 "performance_ckf.root",
1092 "trackstates_ckf.root",
1096 "tracksummary_ckf.root",
1101 if not truthSmeared:
1104 "performance_seeding.root",
1109 for rf, _
in root_files:
1110 assert not (tmp_path / rf).
exists()
1112 from ckf_tracks
import runCKFTracks
1115 detector_config.trackingGeometry,
1116 detector_config.decorators,
1120 geometrySelection=detector_config.geometrySelection,
1121 digiConfigFile=detector_config.digiConfigFile,
1122 truthSmearedSeeded=truthSmeared,
1123 truthEstimatedSeeded=truthEstimated,
1132 for rf, tn
in root_files:
1139 len([f
for f
in csv.iterdir()
if f.name.endswith(
"tracks_ckf.csv")]) == events
1141 assert all([f.stat().st_size > 300
for f
in csv.iterdir()])
1144 @pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up")
1154 Path(__file__).parent.parent.parent.parent
1158 /
"full_chain_odd.py"
1160 assert script.exists()
1161 env = os.environ.copy()
1162 env[
"ACTS_LOG_FAILURE_THRESHOLD"] =
"ERROR"
1164 subprocess.check_call(
1165 [sys.executable,
str(script),
"-n1"],
1168 stderr=subprocess.STDOUT,
1170 except subprocess.CalledProcessError
as e:
1171 print(e.output.decode(
"utf-8"))
1175 @pytest.mark.skipif(
1176 not dd4hepEnabled
or not geant4Enabled, reason=
"DD4hep and/or Geant4 not set up"
1186 Path(__file__).parent.parent.parent.parent
1190 /
"full_chain_odd.py"
1192 assert script.exists()
1193 env = os.environ.copy()
1194 env[
"ACTS_LOG_FAILURE_THRESHOLD"] =
"ERROR"
1196 stdout = subprocess.check_output(
1197 [sys.executable,
str(script),
"-n1",
"--geant4",
"--ttbar"],
1200 stderr=subprocess.STDOUT,
1202 stdout = stdout.decode(
"utf-8")
1203 except subprocess.CalledProcessError
as e:
1204 print(e.output.decode(
"utf-8"))
1209 error_regex = re.compile(
r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1210 for match
in error_regex.finditer(stdout):
1211 (algo,) = match.groups()
1213 errors = collections.Counter(errors)
1214 assert dict(errors) == {}, stdout
1217 @pytest.mark.skipif(
not dd4hepEnabled, reason=
"DD4hep not set up")
1218 @pytest.mark.skipif(
not onnxEnabled, reason=
"ONNX plugin not enabled")
1221 root_file =
"performance_ambiML.root"
1222 output_dir =
"odd_output"
1223 assert not (tmp_path / root_file).
exists()
1230 Path(__file__).parent.parent.parent.parent
1234 /
"full_chain_odd.py"
1236 assert script.exists()
1237 env = os.environ.copy()
1238 env[
"ACTS_LOG_FAILURE_THRESHOLD"] =
"ERROR"
1240 subprocess.check_call(
1241 [sys.executable,
str(script),
"-n5",
"--MLSolver"],
1244 stderr=subprocess.STDOUT,
1246 except subprocess.CalledProcessError
as e:
1247 print(e.output.decode(
"utf-8"))
1250 rfp = tmp_path / output_dir / root_file
1257 from bfield_writing
import runBFieldWriting
1260 (
"solenoid.root",
"solenoid", 100),
1261 (
"solenoid2.root",
"solenoid", 100),
1264 for fn, _, _
in root_files:
1266 assert not fp.exists()
1270 for fn, tn, ee
in root_files:
1273 assert fp.stat().st_size > 2**10 * 2
1278 @pytest.mark.parametrize(
"backend", [
"onnx",
"torch"])
1279 @pytest.mark.parametrize(
"hardware", [
"cpu",
"gpu"])
1280 @pytest.mark.skipif(
not exatrkxEnabled, reason=
"ExaTrkX environment not set up")
1281 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1282 if backend ==
"onnx" and hardware ==
"cpu":
1283 pytest.skip(
"Combination of ONNX and CPU not yet supported")
1285 root_file =
"performance_track_finding.root"
1286 assert not (tmp_path / root_file).
exists()
1288 if backend ==
"onnx":
1289 url =
"https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1291 url =
"https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1293 tarfile_name = tmp_path /
"models.tar"
1294 urllib.request.urlretrieve(url, tarfile_name)
1295 tarfile.open(tarfile_name).extractall(tmp_path)
1297 Path(__file__).parent.parent.parent.parent
1303 assert script.exists()
1304 env = os.environ.copy()
1305 env[
"ACTS_LOG_FAILURE_THRESHOLD"] =
"WARNING"
1307 if hardware ==
"cpu":
1308 env[
"CUDA_VISIBLE_DEVICES"] =
""
1311 subprocess.check_call(
1312 [sys.executable,
str(script), backend],
1315 stderr=subprocess.STDOUT,
1317 except subprocess.CalledProcessError
as e:
1318 print(e.output.decode(
"utf-8"))
1321 rfp = tmp_path / root_file