Passing SSLContext for the HTTPS+X509 to uproot.dask fails because the context is not serializable
bockjoo opened this issue · 8 comments
I am using Coffea 2024.6.1 and uproot 5.3.10
python -c "import uproot ; print(uproot.__version__)"
5.3.10
To reproduce the issue, this script can be used:
import os
import ssl
from coffea.nanoevents import NanoEventsFactory, BaseSchema
sslctx = ssl.create_default_context()
sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
filename="https://cmsio2.rc.ufl.edu:1094/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root"
events = NanoEventsFactory.from_root(
{filename: "Events"},
steps_per_file=100_000,
metadata={"dataset": "DoubleMuon"},
schemaclass=BaseSchema,
uproot_options = {'ssl':sslctx},
).events()
p = MyProcessor()
out = p.process(events)
(computed,) = dask.compute(out)
print(computed)
The environmental variable X509_USER_PROXY should point to the voms-proxy file.
The input file is
https://cmsio2.rc.ufl.edu:1094/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root
but may be irrelevant as SSL Connection can not be established as the SSLContext is not serializable.
The stracetrace of the above script looks like:
2024-07-11 08:04:26,616 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1490a8411c00>
0. from-uproot-40070443cb475fde1ee0133653b4c164
1. hist-on-block-b00467fa12eb096c7b30b70057935672
2. histreduce-agg-aed8002a664acbeb30a8fcf9eb35df08
3. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-c407c0e517af63d39ccc70a2134bb7db
4. numaxis0-d04f5a3766776402558a814b4dff0437
>.
Traceback (most recent call last):
File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'SSLContext' object
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
62 try:
---> 63 result = pickle.dumps(x, **dump_kwargs)
64 except Exception:
AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
67 buffers.clear()
---> 68 pickler.dump(x)
69 result = f.getvalue()
AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/serialize.py:363, in serialize(x, serializers, on_error, context, iterate_collection)
362 try:
--> 363 header, frames = dumps(x, context=context) if wants_context else dumps(x)
364 header["serializer"] = name
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/serialize.py:78, in pickle_dumps(x, context)
76 writeable.append(not f.readonly)
---> 78 frames[0] = pickle.dumps(
79 x,
80 buffer_callback=buffer_callback,
81 protocol=context.get("pickle-protocol", None) if context else None,
82 )
83 header = {
84 "serializer": "pickle",
85 "writeable": tuple(writeable),
86 }
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py:81, in dumps(x, buffer_callback, protocol)
80 buffers.clear()
---> 81 result = cloudpickle.dumps(x, **dump_kwargs)
82 except Exception:
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
601 try:
--> 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
TypeError: cannot pickle 'SSLContext' object
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
Cell In[3], line 20
18 p = MyProcessor()
19 out = p.process(events)
---> 20 (computed,) = dask.compute(out)
21 print(computed)
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/dask/base.py:662, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
659 postcomputes.append(x.__dask_postcompute__())
661 with shorten_traceback():
--> 662 results = schedule(dsk, keys, **kwargs)
664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/serialize.py:389, in serialize(x, serializers, on_error, context, iterate_collection)
387 except Exception:
388 raise TypeError(msg) from exc
--> 389 raise TypeError(msg, str_x) from exc
390 else: # pragma: nocover
391 raise ValueError(f"{on_error=}; expected 'message' or 'raise'")
Thanks for looking into this issue!
I apologize I pasted a wrong script.
filename="https://cmsio2.rc.ufl.edu:1094/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root"
events = NanoEventsFactory.from_root(
{filename: "Events"},
steps_per_file=100_000,
metadata={"dataset": "DoubleMuon"},
schemaclass=BaseSchema,
uproot_options = {'ssl':sslctx},
).events()
print (dir(events))
#
works. This is the script that reproduces the issue:
import os
import time
import ssl
import dask
from distributed import LocalCluster, Client
import hist
#import dask
import awkward as ak
import hist.dask as hda
import dask_awkward as dak
from coffea import processor
from coffea.dataset_tools import (
apply_to_fileset,
max_chunks,
preprocess,
)
from coffea.nanoevents import NanoEventsFactory, BaseSchema
if __name__ == '__main__':
sslctx = ssl.create_default_context()
sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
cluster = LocalCluster(n_workers=10, threads_per_worker=1)
client = cluster.get_client()
redirector = "https://cmsio2.rc.ufl.edu:1094"
fileset = {
"TTbarPowheg_Dilepton": {
"files": {
redirector+"/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root": "Events",
},
},
}
dataset_runnable, dataset_updated = preprocess(
fileset,
align_clusters=False,
step_size=100_000,
files_per_batch=1,
skip_bad_files=True,
save_form=False,
uproot_options={'ssl': sslctx},
)
print ('dataset_runnable ',dataset_runnable)
It looks like CA certs that are set up by :
sslctx = ssl.create_default_context()
sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
disappear when I checked like so:
print ( "size of CA certs ", len ( sslctx.get_ca_certs()) )
size of CA certs 0
After updating the main script readNanoEventsMyProcessor.py and hacking lib/python3.12/asyncio/base_events.py,
HTTPS+X509 runs with the servers (cmsio3.rc.ufl.edu), but with the redirector ( cmsio2.rc.ufl.edu in the script), it only
succeeds once in a while with either
Traceback (most recent call last):
File "/opt/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/readNanoEventsMyProcessor.py", line 126, in <module>
(out,) = dask.compute(to_compute)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/dask/base.py", line 661, in compute
results = schedule(dsk, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1345, in __call__
result, _ = self._call_impl(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1298, in _call_impl
return self.read_tree(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 985, in read_tree
mapping = self.form_mapping_info.load_buffers(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/coffea/nanoevents/factory.py", line 157, in load_buffers
arrays = tree.arrays(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 823, in arrays
_ranges_or_baskets_to_arrays(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3105, in _ranges_or_baskets_to_arrays
uproot.source.futures.delayed_raise(*obj)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/futures.py", line 38, in delayed_raise
raise exception_value.with_traceback(traceback)
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3026, in chunk_to_basket
basket = uproot.models.TBasket.Model_TBasket.read(
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/model.py", line 854, in read
self.read_members(chunk, cursor, context, file)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/models/TBasket.py", line 227, in read_members
) = cursor.fields(chunk, _tbasket_format1, context)
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/cursor.py", line 201, in fields
return format.unpack(chunk.get(start, stop, self, context))
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 446, in get
self.wait(insist=stop)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 388, in wait
self._raw_data = numpy.frombuffer(self._future.result(), dtype=self._dtype)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/coalesce.py", line 36, in result
return self._parent.result(timeout=timeout)[self._s]
^^^^^^^^^^^^^^^^^
TypeError: 'ServerDisconnectedError' object is not subscriptable
or
Traceback (most recent call last):
File "/opt/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/readNanoEventsMyProcessor.py", line 126, in <module>
(out,) = dask.compute(to_compute)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/dask/base.py", line 661, in compute
results = schedule(dsk, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1345, in __call__
result, _ = self._call_impl(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1298, in _call_impl
return self.read_tree(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 985, in read_tree
mapping = self.form_mapping_info.load_buffers(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/coffea/nanoevents/factory.py", line 157, in load_buffers
arrays = tree.arrays(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 823, in arrays
_ranges_or_baskets_to_arrays(
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3105, in _ranges_or_baskets_to_arrays
uproot.source.futures.delayed_raise(*obj)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/futures.py", line 38, in delayed_raise
raise exception_value.with_traceback(traceback)
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3026, in chunk_to_basket
basket = uproot.models.TBasket.Model_TBasket.read(
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/model.py", line 854, in read
self.read_members(chunk, cursor, context, file)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/models/TBasket.py", line 227, in read_members
) = cursor.fields(chunk, _tbasket_format1, context)
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/cursor.py", line 201, in fields
return format.unpack(chunk.get(start, stop, self, context))
^^^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 446, in get
self.wait(insist=stop)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 388, in wait
self._raw_data = numpy.frombuffer(self._future.result(), dtype=self._dtype)
^^^^^^^^^^^^^^^
File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/coalesce.py", line 36, in result
return self._parent.result(timeout=timeout)[self._s]
^^^^^^^^^^^^^^^^^
TypeError: 'ClientOSError' object is not subscriptable
I am not sure how this information can help resolve the issue, though.
After decreasing the step size from 100,000 to 10,000 in the preprocess, the script (See below) is stable and there is no TypeError.
However, CA certs in SSLContext is not preserved and I had to add hacks to python3.12/site-packages/coffea/dataset_tools/preprocess.py and python3.12/asyncio/base_events.py.
Can something be done about this?
# readNanoEventsMyProcessor.py
import sys
import os
import time
import ssl
import dask
from distributed import LocalCluster, Client
import hist
import awkward as ak
import hist.dask as hda
import dask_awkward as dak
from coffea import processor
from coffea.dataset_tools import (
apply_to_fileset,
max_chunks,
preprocess,
)
from coffea.nanoevents import NanoEventsFactory, BaseSchema
from coffea.nanoevents.methods import candidate
from collections import defaultdict
import numba
import pickle, copyreg
from multiprocessing.reduction import _rebuild_socket, _reduce_socket
def save_sslcontext(obj):
return obj.__class__, (obj.protocol,)
class MyProcessor(processor.ProcessorABC):
def __init__(self):
pass
def process(self, events):
dataset = events.metadata['dataset']
muons = ak.zip(
{
"pt": events.Muon_pt,
"eta": events.Muon_eta,
"phi": events.Muon_phi,
"mass": events.Muon_mass,
"charge": events.Muon_charge,
"isolation": events.Muon_pfRelIso03_all,
},
with_name="PtEtaPhiMCandidate",
behavior=candidate.behavior,
)
# make sure they are sorted by transverse momentum
muons = muons[ak.argsort(muons.pt, axis=1)]
# impose some quality and minimum pt cuts on the muons
muons = muons[ (muons.pt > 5) ]
h_mass = (
hda.Hist.new
.StrCat(["opposite", "same"], name="sign")
.Log(1000, 0.00000000002, 200., name="mass", label="$m_{\\mu\\mu}$ [GeV]")
.Int64()
)
cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) == 0)
dimuon = muons[cut][:, 0] + muons[cut][:, 1]
h_mass.fill(sign="opposite", mass=dimuon.mass)
cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) != 0)
dimuon = muons[cut][:, 0] + muons[cut][:, 1]
h_mass.fill(sign="same", mass=dimuon.mass)
return {
dataset: {
"entries": ak.num(events, axis=0),
"mass": h_mass,
}
}
def postprocess(self, accumulator):
pass
if __name__ == '__main__':
cluster = LocalCluster(n_workers=10, threads_per_worker=1)
client = cluster.get_client()
redirector = "/cmsuf/data"
redirector = "root://cms-xrd-global.cern.ch//"
#redirector = "https://cmsio3.rc.ufl.edu:1094/"
redirector = "https://cmsio2.rc.ufl.edu:1094/"
fileset = {
"TTbarPowheg_Dilepton": {
"files": {
redirector+"/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root": "Events",
#redirector+"/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/44187D37-0301-3942-A6F7-C723E9F4813D.root"
},
},
}
copyreg.pickle(ssl.SSLSocket, _reduce_socket, _rebuild_socket)
copyreg.pickle(ssl.SSLContext, save_sslcontext)
sslctx = ssl.create_default_context()
sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
if True:
dataset_runnable, dataset_updated = preprocess(
fileset,
align_clusters=False,
#step_size=100_000,
step_size=10_000,
files_per_batch=1,
skip_bad_files=True,
save_form=False,
uproot_options={'ssl': sslctx, 'timeout': 300},
)
#print ('dataset_runnable ',dataset_runnable)
#sys.exit(0)
#dataset_runnable = {'TTbarPowheg_Dilepton': {'files': {'https://cmsio2.rc.ufl.edu:1094//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root': {'object_path': 'Events', 'steps': [[0, 101539], [101539, 203078], [203078, 304617], [304617, 406156], [406156, 507695], [507695, 609234], [609234, 710773], [710773, 812312], [812312, 913851], [913851, 1015390], [1015390, 1116929], [1116929, 1218468], [1218468, 1320000]], 'num_entries': 1320000, 'uuid': '33ff1236-eef3-11eb-b91b-31c010acbeef'}}, 'form': None, 'metadata': None}}
#dataset_runnable = {'TTbarPowheg_Dilepton': {'files': {'root://cms-xrd-global.cern.ch//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root': {'object_path': 'Events', 'steps': [[0, 101539], [101539, 203078], [203078, 304617], [304617, 406156], [406156, 507695], [507695, 609234], [609234, 710773], [710773, 812312], [812312, 913851], [913851, 1015390], [1015390, 1116929], [1116929, 1218468], [1218468, 1320000]], 'num_entries': 1320000, 'uuid': '33ff1236-eef3-11eb-b91b-31c010acbeef'}}, 'form': None, 'metadata': None}}
#dataset_runnable = {'TTbarPowheg_Dilepton': {'files': {'https://cmsio2.rc.ufl.edu:1094//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root': {'object_path': 'Events', 'steps': [[0, 101539]], 'num_entries': 1320000, 'uuid': '33ff1236-eef3-11eb-b91b-31c010acbeef'}}, 'form': None, 'metadata': None}}
print ('dataset_runnable ',dataset_runnable)
tstart = time.time()
to_compute = apply_to_fileset(
#FancyDimuonProcessor(),
#VLLProcessor(),
MyProcessor(),
max_chunks(dataset_runnable, 300),
schemaclass=BaseSchema,
uproot_options={'ssl': sslctx, 'timeout': 300 },
)
(out,) = dask.compute(to_compute)
print(out)
elapsed = time.time() - tstart
print(elapsed)
sys.exit(0)
#python3.12/site-packages/coffea/dataset_tools/preprocess.py 2024.6.1 version between line 67 and line 68
sslctx = ssl.create_default_context()
sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
uproot_options['ssl'] = sslctx
#python3.12/asyncio/base_events.py between line 1039 and line 1040
import ssl as s
if isinstance(ssl, s.SSLContext) :
if len ( ssl.get_ca_certs() ) == 0:
ssl = s.create_default_context()
ssl.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
TypeError: 'ServerDisconnectedError' object is not subscriptable
This might be the same as #1233
Here is a self-contained pickleable ssl context class:
import os
import ssl
import tempfile
from typing import Any
class PickleableSSLContext(ssl.SSLContext):
@classmethod
def create(cls, protocol=ssl.PROTOCOL_TLS_CLIENT):
out = cls(protocol)
out._set_default_state()
return out
def _set_default_state(self):
# this should do the same setup as ssl.create_default_context()
# for now it just loads default certificates
if self.verify_mode != ssl.CERT_NONE:
self.load_default_certs()
def load_cert_chain(self, certfile, keyfile=None, password=None):
with open(certfile, "rb") as fin:
self.certdata = fin.read()
self.keydata = None
if keyfile is not None:
with open(keyfile, "rb") as fin:
self.keydata = fin.read()
self.password = password
self._load_cert_chain()
def _load_cert_chain(self):
with tempfile.TemporaryDirectory() as dirname:
certpath = os.path.join(dirname, "cert.pem")
with open(certpath, "wb") as fout:
fout.write(self.certdata)
keypath = None
if self.keydata is not None:
keypath = os.path.join(dirname, "key.pem")
with open(keypath, "wb") as fout:
fout.write(self.keydata)
super().load_cert_chain(certpath, keypath, self.password)
def __getnewargs__(self):
return (self.protocol,)
def __getstate__(self) -> dict[str, Any]:
return {
"certdata": self.certdata,
"keydata": self.keydata,
"password": self.password,
}
def __setstate__(self, state: dict[str, Any]) -> None:
self.__dict__.update(state)
self._set_default_state()
self._load_cert_chain()
It's use is as follows:
import os
import pickle
import uproot
from pickleablessl import PickleableSSLContext
sslctx = PickleableSSLContext.create()
sslctx.load_cert_chain(os.environ['X509_USER_PROXY'])
sslctx = pickle.loads(pickle.dumps(sslctx))
url = "https://xrootd-local.unl.edu:1094//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root"
with uproot.open(url, ssl=sslctx) as file:
print(file["Events"].num_entries)
Now the question is, where to put this class?
The picklable ssl context reaches up to get_steps in preprocessor.py,
but does not arrive at async def _file_info and async def _cat_file in fsspec/implementations/http.py :
File "/home/bockjoo/opt/cmsio2/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/lib/python3.12/site-packages/fsspec/implementations/http.py", line 877, in _file_info
print ("DEBUG bockjoo hhtp.py sslctx ", kwargs['ssl'], " size of CA certs ", len(kwargs['ssl'].get_ca_certs()))
~~~~~~^^^^^^^
KeyError: 'ssl'
File "/home/bockjoo/opt/cmsio2/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/lib/python3.12/site-packages/fsspec/implementations/http.py", line 227, in _cat_file
print ("DEBUG bockjoo hhtp.py sslctx ", kw['ssl'], " size of CA certs ", len(kw['ssl'].get_ca_certs()))
~~~~^^^^^
KeyError: 'ssl'
If I mannualy add kw['ssl'] and kwargs['ssl'] , there is no issue.
So, I am wondering if there is some disconnect that does not pass uproot_options?
My apology. It wasn't working inside http.py (called by apply_to_fileset ) by my mistake.
I forgot to pass ssl through uproot_options. It works now.
I will validate this with SLURMCluster as well. I guess it should work there as well, though.
It worked with the SLURMCluster as well.