#!/usr/bin/env python
""" This module contains functions used to merge histograms and ROOT files.
For further information on functionality and options, see the function docstrings.
.. codeauthor:: Raymond Ehlers <raymond.ehlers@cern.ch>, Yale University
.. codeauthor:: James Mulligan <james.mulligan@yale.edu>, Yale University
"""
# Python 2/3 support
from __future__ import print_function
from __future__ import absolute_import
from future.utils import iteritems
# General
import copy
import os
import shutil
import logging
# Setup logger
logger = logging.getLogger(__name__)
# ROOT
import ROOT
from . import processingClasses
[docs]def merge(currentDir, run, subsystem, cumulativeMode = True, timeSlice = None):
""" For a given run and subsystem, handles merging of files into a "combined file" which
is suitable for processing.
For a standard subsystem, the "combined file" is the file which stores the most recent data
for a particular run and subsystem. The merge is only performed if we have received new files
in the specified run. The details of the merge are determined by the ``cumuluativeMode`` setting.
This setting, which is determined by the options sent in the data request sent to the HLT by
the ZMQ receiver, denotes whether the data that we received are reset each time the data is sent.
If the data is being reset, then the files are not cumulative, and therefore ``cumulativeMode`` should be
set to ``False``. Note that although this function supports both modes, we tend to operate
in cumulative mode because resetting the objects would interfere with other subscribes to the HLT
data. For example, if both Overwatch and another subscriber were set to request data with resets
every minute and were offset by 30 seconds, they would both only receive approximately half the
data! Thus, it's preferred to operate in cumulative mode.
For cumulative mode, the combined objects are created in two different ways: 1) For a standard
combined file, by simply copying the most recent file (because it contains data covering the entire
run); 2) For time slices, by subtracting the objects in two corresponding ROOT files. For reset
mode, ``TFileMerger`` is used to merge all files within the available timestamps together.
This function also handles merging files for time slices. The relevant parameters should be specified
in a ``timeSliceContainer``. The min and max requested times are extracted, and this function only
merges files within the fixed time range corresponding to those values. The output format of this
file is identical to any other combined file.
Note:
As side effects of this function, if it executes successfully for a combined file, the combined
filename will be updated in ``subsystemContainer``.
Args:
dirPrefix (str): Path to the root directory where the data is stored.
runDir (str): Run directory of current run. Of the form ``Run######``.
subsystem (str): The current subsystem by three letter, all capital name (ex. ``EMC``).
cumulativeMode (bool): Specifies whether the histograms we receive are cumulative or if they
have been reset between each acquired ROOT file, i.e. whether we merge in "subscribe mode" or
"request/reset mode". Default: True.
timeSlice (processingClasses.timeSliceContainer): Stores the properties of the requested time slice. If not specified,
it will be ignored and it will create a standard "combined file". Default: None
Returns:
None: On success, ``None`` is returned. Otherwise, an exception is raise.
Raises:
ValueError: If the number of input files doesn't match the number of files in the merger. Perhaps if a
file is inaccessible.
"""
# Determines which files are needed to merge
if timeSlice:
filesToMerge = timeSlice.filesToMerge
else:
filesToMerge = []
for fileCont in subsystem.files.values():
# This is not necessary since combined files are not stored in files anymore
#if fileCont.combinedFile == False:
filesToMerge.append(fileCont)
# Sort files by time
filesToMerge.sort(key=lambda x: x.fileTime)
# If in cumulativeMode, we subtract the earliest file from the latest file, unless
# the beginning of the time slice is the start of the run. In that case, case we don't
# subtract anything from the most recent
# (if >0, we should subtract the first file; if =0, we should include everything)
if cumulativeMode and timeSlice and timeSlice.minUnixTimeAvailable != subsystem.startOfRun:
earliestFile = filesToMerge[0].filename
latestFile = filesToMerge[-1].filename
# Subtract latestFile from earliestFile
timeSlicesFilename = os.path.join(currentDir, subsystem.baseDir, timeSlice.filename.filename)
subtractFiles(os.path.join(currentDir, earliestFile),
os.path.join(currentDir, latestFile),
timeSlicesFilename)
logger.info("Completed time slicing via subtraction with result stored in {}!\nMerging complete!".format(timeSlicesFilename))
return None
if cumulativeMode:
# Take the most recent file
filesToMerge = [filesToMerge[-1]]
# Merging using root. We use this for multiple files, but will skip it
# if we are only copying one file.
merger = ROOT.TFileMerger()
# Determine the files to merge
if len(filesToMerge) == 1:
# This is often cumulative mode, but could also be reset mode with only 1 file
numberOfFiles = len(filesToMerge)
else:
# If more than one file (almost assuredly reset mode), merge everything
for fileCont in filesToMerge:
logger.info("Added file {} to merger".format(fileCont.filename))
merger.AddFile(fileCont.filename)
numberOfFiles = merger.GetMergeList().GetEntries()
if numberOfFiles != len(filesToMerge):
errorMessage = "Problems encountered when adding files to merger! Number of input files ({}) do not match number in merger ({})!".format(len(filesToMerge), numberOfFiles)
logger.error(errorMessage)
raise ValueError(errorMessage)
if timeSlice:
filePath = os.path.join(subsystem.baseDir, timeSlice.filename.filename)
else:
# Define convenient variable
maxFilteredTimeStamp = filesToMerge[-1].fileTime
filePath = os.path.join(subsystem.baseDir, "hists.combined.{}.{}.root".format(numberOfFiles, maxFilteredTimeStamp))
outFile = os.path.join(currentDir, filePath)
logger.info("Number of files to be merged: {}".format(numberOfFiles))
logger.info("Output file: {}".format(outFile))
# Set the output and perform the actual merge
if numberOfFiles == 1:
# Avoid errors with TFileMerger and only one file.
# Plus, performance should be better
shutil.copy(os.path.join(currentDir, filesToMerge[0].filename), outFile)
else:
merger.OutputFile(outFile)
merger.Merge()
logger.info("Merging complete!")
# Add combined file to the subsystem
if not timeSlice:
subsystem.combinedFile = processingClasses.fileContainer(filePath, startOfRun = subsystem.startOfRun)
return None
[docs]def subtractFiles(minFile, maxFile, outfile):
""" Subtract histograms in one file from matching histograms in another.
This function is used for creating time slices in cumulative mode. Since each file is cumulative,
the later time stamped file needs to be subtracted from the earlier time stamped file. The
remaining data corresponds to the data stored during the time window ``early-late``.
This function is **not** used for creating a standard combined file because the cumulative information
is already stored in the most recent file.
Note:
The names of the histograms in each file must match exactly for them to be subtracted.
Note:
The output file is opened with "RECREATE", so it will always overwrite an existing
file with the given filename!
Args:
minFile (str): Filename of the ROOT file containing data to be subtracted.
maxFile (str): Filename of the ROOT file containing data to to subtracted from.
outfile (str): Filename of the output file which will contain the subtracted histograms.
Returns:
None.
"""
fMin = ROOT.TFile(minFile, "READ")
fMax = ROOT.TFile(maxFile, "READ")
fOut = ROOT.TFile(outfile, "RECREATE")
# Read in available keys in the file
keysMinFile = fMin.GetListOfKeys()
keysMaxFile = fMax.GetListOfKeys()
# Loop through both files, and subtract matching pairs of hists
for keyMin in keysMinFile:
# Ensure that we only take histograms (we would expect such, but better to check for safety)
classOfObject = ROOT.gROOT.GetClass(keyMin.GetClassName())
if not classOfObject.InheritsFrom(ROOT.TH1.Class()):
continue
minHistName = keyMin.GetName()
for keyMax in keysMaxFile:
# Ensure that we only take histograms
classOfObject = ROOT.gROOT.GetClass(keyMin.GetClassName())
if not classOfObject.InheritsFrom(ROOT.TH1.Class()):
continue
maxHistName = keyMax.GetName()
if minHistName == maxHistName:
minHist = keyMin.ReadObj()
maxHist = keyMax.ReadObj()
# Subtract the earlier hist from the later hist
maxHist.Add(minHist, -1)
fOut.cd()
maxHist.Write()
fMin.Close()
fMax.Close()
fOut.Close()
[docs]def mergeRootFiles(runs, dirPrefix, forceNewMerge = False, cumulativeMode = True):
""" Driver function for creating combined files for each subsystem within a given set of runs.
For a given list of runs, this function will iterate over all available subsystems, merging or
moving files as appropriate. To speed up this function, operations will only be performed if
a new file is available for a particular subsystem (ie. ``subsystemContainer.newFile == True``).
This function will result in a combined file per subsystem per run. For further information on
the format of this file, see ``merge()``.
Args:
runs (dict): Dict of ``runContainers`` to perform the merge over. The keys are the runDirs,
in the from of ``Run######``.
dirPrefix (str): Path to the root directory where the data is stored.
forceNewMerge (bool): Flag to force merging for all runs, regardless of whether it is supposed
to be merged. Default: False.
cumulativeMode (bool): Specifies whether the histograms we receive are cumulative or if they
have been reset between each acquired ROOT file, i.e. whether we merge in "subscribe mode" or
"request/reset mode". See ``merge()`` for further information on this mode. Default: True.
Returns:
None
"""
currentDir = dirPrefix
# Process runs
for runDir, run in iteritems(runs):
for subsystem in run.subsystems:
# Only merge if we there are new files to merge
if run.subsystems[subsystem].newFile is True or forceNewMerge:
# Convenience variable to prvoide direct access to the subsystem container.
# By defining here, it also means that the subsystem container details will be
# available for debugging.
subsystemObject = run.subsystems[subsystem]
# Skip if the subsystem does not have it's own files
if subsystemObject.subsystem != subsystemObject.fileLocationSubsystem:
continue
# Perform the merge
# Check for a combined file. The file has a name of the form:
# `hists.combined.(number of files which contributed to the combined file).(timestamp of combined file).root`
combinedFile = run.subsystems[subsystem].combinedFile
# If it doesn't exist then we go directly to merging; otherwise we remove the old one and then merge
# Previously, we handled the two modes as:
# In SUB mode, compare combined file timestamp with latest timestamp of uncombined file
# In REQ mode, compare combined file merge count with number of uncombined files
logger.info("Need to merge {}, {} again".format(runDir, subsystem))
if combinedFile:
logger.info("Removing previous merged file {}".format(combinedFile.filename))
os.remove(os.path.join(currentDir, combinedFile.filename))
# Remove from the file list
run.subsystems[subsystem].combinedFile = None
# Perform the actual merge
merge(currentDir, run, run.subsystems[subsystem], cumulativeMode)
# We have successfully merged!
# Still considered a new file until we have processed it entirely, so don't change state here
# Now we handle subsystems which are not their own fileLocationSubsystem - they also need a combinedFile. Note that we
# must do this after the above to ensure that the combined files to which they will refer actually exists.
for subsystem in run.subsystems:
if run.subsystems[subsystem].newFile is True or forceNewMerge:
if run.subsystems[subsystem].subsystem != run.subsystems[subsystem].fileLocationSubsystem:
run.subsystems[subsystem].combinedFile = copy.deepcopy(run.subsystems[run.subsystems[subsystem].fileLocationSubsystem].combinedFile)