Skip to content

API Reference

Package

BaseSOFAController

This class is a pure python reimplementation of a SOFA controller. It provides read/write access to SOFA data

You may override the callbacks before_animate and after_animate in a subclass.

Source code in gdsofa/controller/controller.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class BaseSOFAController:
    """
    This class is a pure python reimplementation of a SOFA controller. It provides read/write access to SOFA data

    You may override the callbacks `before_animate` and `after_animate` in a subclass.
    """

    def __init__(self, root: Node, params: BaseSOFAParams):
        self.root: Node = root
        self.params = params
        self.r = None

        self.iter = 0
        self.data = ControllerData(self)

        self.get_node = self.root.get_node
        self.find = self.root.find
        self.rfind = self.root.rfind

    def init(self):
        """SOFAController init"""
        pass

    def before_animate(self):
        """BeginAnimateEvent"""
        pass

    def after_animate(self):
        """EndAnimateEvent"""
        pass

    def after_init(self):
        """Event triggered after components `init`"""
        pass

    @property
    def t(self):
        return self.r.time.value

    @property
    def dt(self):
        return self.r.dt.value

    @property
    def T(self):
        return self.params.n * self.params.dt

    def _before_animate(self, event):
        self.before_animate()

    def _after_animate(self, event):
        self.after_animate()
        self.iter += 1

    def handle_event(self, event: Munch):
        """General purpose handler"""
        pass

    @cache
    def _check_links(self, path):
        if isinstance(path, str):
            return path
        if isinstance(path, (Link, MultiLink)):
            path = path.resolve()[1:].replace("/", ".")
        return path

    def get(self, path):
        _path = self._check_links(path)
        try:
            y = self.r[_path]
            try:
                return y.value if hasattr(y, "value") else y
            except Exception:
                return y
        except Exception as e:
            log.error(
                f"{e}(Could not get data): could not read {_path!r}, maybe path is wrong or component does not exist"
            )

    def get_link(self, *a):
        return self.get(Link(*a))

    def set(self, path, value):  # .findData(data)
        if isinstance(value, np.ndarray):
            value = value.tolist()
        self.r[self._check_links(path)].value = value

    def to_SOFA(self, sofa_root):
        from Sofa.Core import Controller as Ct

        class Controller(Ct):
            def init(s_):
                self.init()

            def onAnimateBeginEvent(s_, event):
                self._before_animate(event)

            def onAnimateEndEvent(s_, event):
                self._after_animate(event)

            def handle_event(s_, event):
                self.handle_event(munchify(event))

            def onSimulationInitDoneEvent(s_, e):
                return self.after_init()

            def onSimulationInitStartEvent(s_, e):
                return self.handle_event(e)

            def onSimulationInitTexturesDoneEvent(s_, e):
                return self.handle_event(e)

            def onSimulationStartEvent(s_, e):
                return self.handle_event(e)

            def onSimulationStopEvent(s_, e):
                return self.handle_event(e)

            def onKeypressedEvent(s_, e):
                return self.handle_event(e)

            def onKeyreleasedEvent(s_, e):
                return self.handle_event(e)

            def bwdInit(s_, e):
                return self.handle_event(e)

        self.r = sofa_root
        return Controller(self.r, name="controller")

    def plot(self):
        pass

    def get_json(self, s) -> dict:
        """
        Deserialize json data to python dictionnary
        """
        return json.loads(self.get(s))

    def get_map_real(self, s) -> Munch:
        """
        Cast SOFA data to python dictionnary `str:float`

        using MapReal = std::map<std::string, Real>;
        Data<MapReal> d_map_real;
        """
        lines, d = self.get(s).split("\n"), {}
        for x in lines:
            key, val = (y := x.split(" "))[0], y[1]
            d[key] = float(val)
        return munchify(d)

    def get_map_coord(self, s) -> Munch:
        """
        Cast SOFA data to python dictionnary `str:numpy array`

        using MapCoord = std::map<std::string, sofa::defaulttype::Vec3Types::Coord>;
        Data<MapCoord> d_map_coord;
        """
        lines, d = self.get(s).split("\n"), {}
        for x in lines:
            key, val = (y := x.split(" "))[0], y[1:4]
            d[key] = np.array(val, dtype=float)
        return munchify(d)

    def get_map_vec_real(self, s) -> Munch:
        """
        Cast SOFA data to python dictionnary `str:numpy array`

        using MapVecReal = std::map<std::string, sofa::defaulttype::Vec3Types::VecReal>;
        Data<MapVecReal> d_map_vec_real;
        """
        lines, d = self.get(s).split("\n"), {}
        for x in lines:
            key, val = (y := x.split(" "))[0], y[1:]
            d[key] = np.array(val, dtype=float)
        return munchify(d)

    def get_map_vec_coord(self, s) -> Munch:
        """
        Cast SOFA data to python dictionnary `str:numpy array`

        using MapVecCoord = std::map<std::string, sofa::defaulttype::Vec3Types::VecCoord>;
        Data<MapVecCoord> d_map_vec_coord;
        """
        lines, d = self.get(s).split("\n"), {}
        for x in lines:
            key, val = (y := x.split(" "))[0], y[1:]
            z = np.array(val, dtype=float)
            d[key] = z.reshape((len(z) // 3, 3))
        return munchify(d)

    get_map_vec_deriv = get_map_vec_coord

    def get_springs(self, s) -> Springs:
        """
        Cast SOFA data to python array

        Data<type::vector<LinearSpring<Real>>> d_springs;
        """
        data = self.get(s).getValueString().split(" ")
        springs = Springs()
        for i in range(0, len(data), 5):
            springs.append(
                Spring(
                    int(data[i]),
                    int(data[i + 1]),
                    float(data[i + 2]),
                    float(data[i + 3]),
                    float(data[i + 4]),
                )
            )
        return springs

after_animate()

EndAnimateEvent

Source code in gdsofa/controller/controller.py
66
67
68
def after_animate(self):
    """EndAnimateEvent"""
    pass

after_init()

Event triggered after components init

Source code in gdsofa/controller/controller.py
70
71
72
def after_init(self):
    """Event triggered after components `init`"""
    pass

before_animate()

BeginAnimateEvent

Source code in gdsofa/controller/controller.py
62
63
64
def before_animate(self):
    """BeginAnimateEvent"""
    pass

get_json(s)

Deserialize json data to python dictionnary

Source code in gdsofa/controller/controller.py
172
173
174
175
176
def get_json(self, s) -> dict:
    """
    Deserialize json data to python dictionnary
    """
    return json.loads(self.get(s))

get_map_coord(s)

Cast SOFA data to python dictionnary str:numpy array

using MapCoord = std::map; Data d_map_coord;

Source code in gdsofa/controller/controller.py
191
192
193
194
195
196
197
198
199
200
201
202
def get_map_coord(self, s) -> Munch:
    """
    Cast SOFA data to python dictionnary `str:numpy array`

    using MapCoord = std::map<std::string, sofa::defaulttype::Vec3Types::Coord>;
    Data<MapCoord> d_map_coord;
    """
    lines, d = self.get(s).split("\n"), {}
    for x in lines:
        key, val = (y := x.split(" "))[0], y[1:4]
        d[key] = np.array(val, dtype=float)
    return munchify(d)

get_map_real(s)

Cast SOFA data to python dictionnary str:float

using MapReal = std::map; Data d_map_real;

Source code in gdsofa/controller/controller.py
178
179
180
181
182
183
184
185
186
187
188
189
def get_map_real(self, s) -> Munch:
    """
    Cast SOFA data to python dictionnary `str:float`

    using MapReal = std::map<std::string, Real>;
    Data<MapReal> d_map_real;
    """
    lines, d = self.get(s).split("\n"), {}
    for x in lines:
        key, val = (y := x.split(" "))[0], y[1]
        d[key] = float(val)
    return munchify(d)

get_map_vec_coord(s)

Cast SOFA data to python dictionnary str:numpy array

using MapVecCoord = std::map; Data d_map_vec_coord;

Source code in gdsofa/controller/controller.py
217
218
219
220
221
222
223
224
225
226
227
228
229
def get_map_vec_coord(self, s) -> Munch:
    """
    Cast SOFA data to python dictionnary `str:numpy array`

    using MapVecCoord = std::map<std::string, sofa::defaulttype::Vec3Types::VecCoord>;
    Data<MapVecCoord> d_map_vec_coord;
    """
    lines, d = self.get(s).split("\n"), {}
    for x in lines:
        key, val = (y := x.split(" "))[0], y[1:]
        z = np.array(val, dtype=float)
        d[key] = z.reshape((len(z) // 3, 3))
    return munchify(d)

get_map_vec_real(s)

Cast SOFA data to python dictionnary str:numpy array

using MapVecReal = std::map; Data d_map_vec_real;

Source code in gdsofa/controller/controller.py
204
205
206
207
208
209
210
211
212
213
214
215
def get_map_vec_real(self, s) -> Munch:
    """
    Cast SOFA data to python dictionnary `str:numpy array`

    using MapVecReal = std::map<std::string, sofa::defaulttype::Vec3Types::VecReal>;
    Data<MapVecReal> d_map_vec_real;
    """
    lines, d = self.get(s).split("\n"), {}
    for x in lines:
        key, val = (y := x.split(" "))[0], y[1:]
        d[key] = np.array(val, dtype=float)
    return munchify(d)

get_springs(s)

Cast SOFA data to python array

Data>> d_springs;

Source code in gdsofa/controller/controller.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def get_springs(self, s) -> Springs:
    """
    Cast SOFA data to python array

    Data<type::vector<LinearSpring<Real>>> d_springs;
    """
    data = self.get(s).getValueString().split(" ")
    springs = Springs()
    for i in range(0, len(data), 5):
        springs.append(
            Spring(
                int(data[i]),
                int(data[i + 1]),
                float(data[i + 2]),
                float(data[i + 3]),
                float(data[i + 4]),
            )
        )
    return springs

handle_event(event)

General purpose handler

Source code in gdsofa/controller/controller.py
93
94
95
def handle_event(self, event: Munch):
    """General purpose handler"""
    pass

init()

SOFAController init

Source code in gdsofa/controller/controller.py
58
59
60
def init(self):
    """SOFAController init"""
    pass

JsonEncoder

Bases: JSONEncoder

JSON encoder for Path, numpy types (replaces tf.JsonEncoder).

Source code in gdsofa/utils.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class JsonEncoder(json.JSONEncoder):
    """JSON encoder for Path, numpy types (replaces tf.JsonEncoder)."""

    def default(self, o):
        if isinstance(o, Path):
            return str(o)
        try:
            import numpy as np
            if isinstance(o, (np.integer, np.floating)):
                return float(o) if isinstance(o, np.floating) else int(o)
            if isinstance(o, np.ndarray):
                return o.tolist()
        except ImportError:
            pass
        return super().default(o)

Implementation of a link between two components

Source code in gdsofa/core/links.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Link:
    """
    Implementation of a link between two components
    """

    def __init__(self, obj: Object | Node, attr: str = None):
        if obj is None:
            log.error((m := "The linked object is None"))
            raise RuntimeError(m)

        self.obj = obj
        self.attr = attr

    def resolve(self, context: Node = None):
        """
        Convert the Link object to a string representing the relative path from the current
        context (or the root node by default) to the object `self.obj`
        """
        if self.obj.name is None:
            self.obj.name = f"{self.obj.class_name}__{random_name()}"
        if context:
            s = self.obj.parent.path_from(context)
            if len(s) > 1 and not s.endswith("/"):
                s += "/"
            s += self.obj.name
        else:
            s = str(self.obj.path)
        if self.attr:
            s += f".{self.attr}"
        return s

resolve(context=None)

Convert the Link object to a string representing the relative path from the current context (or the root node by default) to the object self.obj

Source code in gdsofa/core/links.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def resolve(self, context: Node = None):
    """
    Convert the Link object to a string representing the relative path from the current
    context (or the root node by default) to the object `self.obj`
    """
    if self.obj.name is None:
        self.obj.name = f"{self.obj.class_name}__{random_name()}"
    if context:
        s = self.obj.parent.path_from(context)
        if len(s) > 1 and not s.endswith("/"):
            s += "/"
        s += self.obj.name
    else:
        s = str(self.obj.path)
    if self.attr:
        s += f".{self.attr}"
    return s

Bases: List[Link]

Implementation of a multi link, which is a list of single links

Source code in gdsofa/core/links.py
48
49
50
51
52
53
54
55
56
57
class MultiLink(List[Link]):
    """
    Implementation of a multi link, which is a list of single links
    """

    def __init__(self, *a):
        super().__init__([Link(*as_iterable(x)) for x in a])

    def resolve(self, context: Node = None):
        return [x.resolve(context) for x in self]

MultiLinkExporter

Bases: MultiLink

MultiLink for VTKExporter

Source code in gdsofa/core/links.py
60
61
62
63
64
65
66
class MultiLinkExporter(MultiLink):
    """
    MultiLink for VTKExporter
    """

    def resolve(self, context: Node = None):
        return [f"{x.attr}={x.resolve(context)}" for x in self]

Munch

Bases: dict

Dict with attribute access (replaces tf.munchify / treefiles Munch).

Source code in gdsofa/utils.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Munch(dict):
    """Dict with attribute access (replaces tf.munchify / treefiles Munch)."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        for k, v in self.items():
            if isinstance(v, dict) and not isinstance(v, Munch):
                self[k] = Munch(v)

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError as e:
            raise AttributeError(key) from e

    def __setattr__(self, key, value):
        self[key] = value

Node

Implementation of a tree node

A node contains a parent (except the root node) and two containers for its own components and for its child nodes.

Source code in gdsofa/core/node.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
class Node:
    """
    Implementation of a tree node

    A node contains a parent (except the root node) and two containers for its own components and for its child nodes.
    """

    def __init__(self, name, parent: Node = None, gravity=None, **kwargs):
        self.name = name
        self.bbox: BBOX = None
        self.gravity = none_default(gravity, [0, 0, 0])
        self.node_args = kwargs
        self.parent = parent
        self.children: List[Node] = []
        self.objs: List[Object] = []

    def __truediv__(self, other):
        return Path(str(self.path).lstrip("@")) / other

    def get_root(self):
        x = self
        while x.parent is not None:
            x = x.parent
        return x

    def get_node(self, name: str) -> Node:
        for x in self.children:
            if x.name == name:
                return x
        for x in self.children:
            if y := x.get_node(name):
                return y
        return None

    @property
    def last(self):
        return self.objs[-1]

    @property
    def path(self) -> str:
        x, y = self, []
        while x.parent is not None:
            y.append(x.name)
            x = x.parent
        y = list(reversed(y))
        return f"@{'/'.join(y)}"

    def add_visual(self, *visual, color="grey"):
        """Create a simple visual child node"""
        from gdsofa.core.component import Object
        from gdsofa.comps.base_component import IdentityMapping, VisualStyle

        visu = self.add_child(f"visu__{(idx := random_name())}")
        visu + Object(
            "OglModel",
            src=Link(self.find(class_name="MechanicalObject")),
            topology=Link(
                self.find(callback=lambda x: "TopologyContainer" in x.class_name)
            ),
            color=Color(color).rgb,
            name=f"ogl__{idx}",
        )
        visu + VisualStyle(*visual, name=f"style__{idx}")
        visu + IdentityMapping()
        return visu

    def add_child(self, name, **kwargs):
        self.children.append(Node(name, parent=self, **kwargs))
        return self.children[-1]

    def add_child_first(self, name, **kwargs):
        self.children.insert(0, Node(name, parent=self, **kwargs))
        return self.children[0]

    def _get_dict(self):
        m = {}
        for x in self.children:
            m[x.name] = x._get_dict()
        m["__objs"] = [x for x in self.objs]
        return m

    def tree(self, show_data=True):
        """Build tree for current node"""
        tree = Tree()
        tree.create_node(self.name, self.name)  # root node

        def walk_dict(d, anchor="root"):
            for k, v in d.items():
                anc = unique_id()
                if k != "__objs":
                    s = f"{k}"  # Display node
                    tree.create_node(s, anc, parent=anchor)
                else:
                    for y in v:
                        y: Object
                        s = f"{y.class_name}"  # Display component
                        if y.name is not None:
                            s += f" ({y.name})"
                        a2 = unique_id()
                        tree.create_node(s, a2, parent=anchor)

                        if show_data:
                            for k2, v2 in y.kw.items():
                                if k2 not in ("name",):
                                    s = f"{k2}={v2}"
                                    tree.create_node(s, unique_id(), parent=a2)

                if isinstance(v, dict):
                    walk_dict(v, anchor=anc)

        walk_dict(self._get_dict())
        return tree

    def update_links(self):
        for x in self.children:
            x.update_links()
        for x in self.objs:
            for k, v in x.kw.items():
                if isinstance(v, (Link, MultiLink)):
                    x.kw[k] = v.resolve(context=self)

    def _to_file(self, parent_node):
        s = ""
        conv = lambda x: json.loads(json.dumps(x, cls=JsonEncoder))
        wrap = lambda x: x if isinstance(x, (float, list, dict)) else f"'{x}'"
        rr = lambda x: ", ".join([f"{k}={wrap(v)}" for k, v in conv(x).items()])
        if parent_node.name == "root":
            if self.gravity is not None:
                s += f"\t{parent_node.name}.gravity.value = {self.gravity}\n"

            if len(parent_node.node_args) > 0:
                for k, v in conv(parent_node.node_args).items():
                    s += f"\t{parent_node.name}.{k}.value = {v!r}\n"

        if self.bbox is not None:
            s += f"\t{parent_node.name}.bbox.value = '{self.bbox.value}'\n"
        for x in self.objs:
            kw = f", {rr(x.kw)}" if len(x.kw) > 0 else ""
            s += f"\t{parent_node.name}.addObject('{x.class_name}'{kw})\n"
        for x in self.children:
            kw = f", {rr(x.node_args)}" if len(x.node_args) > 0 else ""
            s += f"\n\t{x.name} = {parent_node.name}.addChild('{x.name}'{kw})\n"
            s += x._to_file(x)
        return s

    def to_file(self, fname, clean_paths=False, doc=None):
        from gdsofa.comps import RequiredPlugin

        plgn_node = self.add_child_first("RequiredPlugins")
        for x in self.build_plugins_list():
            if not self.find("RequiredPlugin", name=x, silent=True):
                obj = RequiredPlugin(name=x)
                obj.parent = plgn_node
                plgn_node.objs.append(obj)

        self.update_links()
        SOFA_EXE = Path(os.environ.get("SOFA_ROOT", "")) / "bin/runSofa"
        _link = "https://gdsofa-83b928.gitlabpages.inria.fr/"
        s = f"# This file implements a SOFA-framework simulation scene, generated with [gd-sofa-utils]({_link})\n"
        s += f"# runSofa -l SofaPython3 {Path(fname).name}\n"
        s += f"# {SOFA_EXE} -l SofaPython3 {Path(fname).resolve()}\n"

        if doc is not None:
            docs = doc.split("\n")
            s += "\n"
            for x in docs:
                s += f"# {x}\n"

        s += "\n\ndef createScene(root):\n"
        s += self._to_file(self)

        if clean_paths:
            cpath = os.path.commonpath(re.findall(r'\/[^\s,"\']+\.\w+', s))
            s = s.replace(cpath + "/", "")

        try:
            p = Path(fname)
            p.parent.mkdir(parents=True, exist_ok=True)
            p.write_text(s, encoding="utf-8")
        except Exception as e:
            log.error(f"Could not write scene to file: {e}")
        else:
            gd.get_logger().info(f"Written: {fname}")

    def build_plugins_list(self):
        from gdsofa.comps import REQUIRED_PLUGINS

        d = dict(REQUIRED_PLUGINS)
        pl = []
        for x in self.up2down:
            if x.class_name in d:
                pl.append(d[x.class_name])
                del d[x.class_name]
            elif x.class_name == "RequiredPlugin":
                pl.append(x.name)
        self.imported_plugins = pl
        return pl

    def to_dict(self):
        return self.tree().to_dict(sort=False)

    def print(self, show_data=True):
        self.update_links()
        self.tree(show_data).show(key=False)
        return self

    def dump_tree(self, fname: str = None):
        fname = Path(none_default(fname, "."))
        if fname.is_dir():
            fname = fname / "tree.txt"
        fname = Path(fname)
        fname.unlink(missing_ok=True)
        self.tree().save2file(str(fname))
        gd.get_logger().info(f"Written: {fname}")

    def add(self, other):
        for y in as_iterable(other):
            if isinstance(y, Node):
                y.parent = self
                self.children.append(y)
            else:
                y.attach_to(self)
        return other

    def __add__(self, other):
        return self.add(other)

    def _add_sofa(self, parent_node):
        if self.gravity is not None:
            parent_node.gravity.value = self.gravity
        if self.bbox is not None:
            parent_node.bbox.value = self.bbox.value
        for x in self.objs:
            try:
                parent_node.addObject(x.class_name, **x.kw)
            except Exception as e:
                log.error(
                    f"Error creating {x.class_name!r} in node {parent_node.name.value!r}"
                )
                raise e
        for x in self.children:
            n = parent_node.addChild(x.name, **x.node_args)
            x._add_sofa(n)

    def to_sofa(self):
        from Sofa.Core import Node as SofaNode

        root = SofaNode("root")
        self.update_links()
        self._add_sofa(root)

        return root

    def path_from(self, node: Node) -> str:
        """
        Return the relative path between 2 nodes
        :param node: the starting node
        """
        spl = lambda x: as_iterable(x[1:].split("/"))
        a, b = spl(self.path), spl(node.path)
        if a == b:
            return "@"
        i = -1
        s = Path()
        j = -1
        for ej, (ea, eb) in enumerate(zip(a, b)):
            if ea == eb:
                j = ej
        if j >= 0:
            a = a[j + 1:]
            b = b[j + 1:]
        for x in list(reversed(b)):
            if len(x) > 0:
                if x in a:
                    i = a.index(x)
                else:
                    s = s / ".."
        for x in a[i + 1:]:
            s = s / x
        return f"@{s}"

    @property
    def nodes_up2down(self):
        """
        Generator that browses the scene graph nodes from current node to leaves.
        """
        yield self
        for child in self.children:
            yield from child.nodes_up2down

    @property
    def up2down(self):
        """
        Generator that browses the node's objects from current node to down
        """
        for x in self.objs:
            yield x
        for x in self.children:
            for y in x.up2down:
                yield y

    @property
    def down2up(self):
        """
        Generator that browses the node's objects from current node to root
        """
        for x in self.objs:
            yield x
        if self.parent:
            for y in self.parent.down2up:
                yield y

    def _find(
        self,
        class_name: str = None,
        name: str = None,
        callback: Callable = None,
        direction: Generator = None,
        silent=False,
    ):
        direction = none_default(direction, self.up2down)
        infos = {}
        if callback is None:
            if name is not None:
                infos["name"] = name
                callback = lambda x: x.name == name
            elif class_name is not None:
                infos["class_name"] = class_name
                callback = lambda x: x.class_name == class_name
            else:
                raise RuntimeError("At least one argument must be set")
        infos["callback"] = callback
        for x in direction:
            if callback(x):
                return x
        if not silent:
            j = make_string(**infos)
            log.error((m := f"Object {j!r} not found in the {self.name} node"))
            raise KeyError(m)

    def find(
        self,
        class_name: str = None,
        name: str = None,
        callback: Callable = None,
        silent=False,
    ):
        """Find the first object that satisfies `Callable(object) == True`"""
        return self._find(
            class_name, name, callback, direction=self.up2down, silent=silent
        )

    def rfind(
        self,
        class_name: str = None,
        name: str = None,
        callback: Callable = None,
        silent=False,
    ):
        """Similar to find, but from current node to root"""
        return self._find(
            class_name, name, callback, direction=self.down2up, silent=silent
        )

    def set_bbox(self, val: str | list | tuple, fmt="xmin_ymin"):
        # fmt: xmin_ymin or xmin_xmax
        if isinstance(val, str):
            _bbox = val.split(" ")
        elif isinstance(as_iterable(val), (list, tuple)):
            _bbox = list(map(float, val))
        else:
            raise NotImplementedError

        if fmt == "xmin_ymin":
            self.bbox = BBOX(
                min=[_bbox[0], _bbox[1], _bbox[2]], max=[_bbox[3], _bbox[4], _bbox[5]]
            )
        elif fmt == "xmin_xmax":
            self.bbox = BBOX(
                min=[_bbox[0], _bbox[2], _bbox[4]], max=[_bbox[1], _bbox[3], _bbox[5]]
            )
        else:
            raise NotImplementedError

down2up property

Generator that browses the node's objects from current node to root

nodes_up2down property

Generator that browses the scene graph nodes from current node to leaves.

up2down property

Generator that browses the node's objects from current node to down

add_visual(*visual, color='grey')

Create a simple visual child node

Source code in gdsofa/core/node.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def add_visual(self, *visual, color="grey"):
    """Create a simple visual child node"""
    from gdsofa.core.component import Object
    from gdsofa.comps.base_component import IdentityMapping, VisualStyle

    visu = self.add_child(f"visu__{(idx := random_name())}")
    visu + Object(
        "OglModel",
        src=Link(self.find(class_name="MechanicalObject")),
        topology=Link(
            self.find(callback=lambda x: "TopologyContainer" in x.class_name)
        ),
        color=Color(color).rgb,
        name=f"ogl__{idx}",
    )
    visu + VisualStyle(*visual, name=f"style__{idx}")
    visu + IdentityMapping()
    return visu

find(class_name=None, name=None, callback=None, silent=False)

Find the first object that satisfies Callable(object) == True

Source code in gdsofa/core/node.py
375
376
377
378
379
380
381
382
383
384
385
def find(
    self,
    class_name: str = None,
    name: str = None,
    callback: Callable = None,
    silent=False,
):
    """Find the first object that satisfies `Callable(object) == True`"""
    return self._find(
        class_name, name, callback, direction=self.up2down, silent=silent
    )

path_from(node)

Return the relative path between 2 nodes :param node: the starting node

Source code in gdsofa/core/node.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def path_from(self, node: Node) -> str:
    """
    Return the relative path between 2 nodes
    :param node: the starting node
    """
    spl = lambda x: as_iterable(x[1:].split("/"))
    a, b = spl(self.path), spl(node.path)
    if a == b:
        return "@"
    i = -1
    s = Path()
    j = -1
    for ej, (ea, eb) in enumerate(zip(a, b)):
        if ea == eb:
            j = ej
    if j >= 0:
        a = a[j + 1:]
        b = b[j + 1:]
    for x in list(reversed(b)):
        if len(x) > 0:
            if x in a:
                i = a.index(x)
            else:
                s = s / ".."
    for x in a[i + 1:]:
        s = s / x
    return f"@{s}"

rfind(class_name=None, name=None, callback=None, silent=False)

Similar to find, but from current node to root

Source code in gdsofa/core/node.py
387
388
389
390
391
392
393
394
395
396
397
def rfind(
    self,
    class_name: str = None,
    name: str = None,
    callback: Callable = None,
    silent=False,
):
    """Similar to find, but from current node to root"""
    return self._find(
        class_name, name, callback, direction=self.down2up, silent=silent
    )

tree(show_data=True)

Build tree for current node

Source code in gdsofa/core/node.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def tree(self, show_data=True):
    """Build tree for current node"""
    tree = Tree()
    tree.create_node(self.name, self.name)  # root node

    def walk_dict(d, anchor="root"):
        for k, v in d.items():
            anc = unique_id()
            if k != "__objs":
                s = f"{k}"  # Display node
                tree.create_node(s, anc, parent=anchor)
            else:
                for y in v:
                    y: Object
                    s = f"{y.class_name}"  # Display component
                    if y.name is not None:
                        s += f" ({y.name})"
                    a2 = unique_id()
                    tree.create_node(s, a2, parent=anchor)

                    if show_data:
                        for k2, v2 in y.kw.items():
                            if k2 not in ("name",):
                                s = f"{k2}={v2}"
                                tree.create_node(s, unique_id(), parent=a2)

            if isinstance(v, dict):
                walk_dict(v, anchor=anc)

    walk_dict(self._get_dict())
    return tree

Object

Class representing a SOFA component. The component's data is stored in the kw attribute.

Source code in gdsofa/core/component.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Object:
    """
    Class representing a SOFA component. The component's data is stored in the `kw` attribute.
    """

    def __init__(self, class_name, **kw):
        self.class_name = class_name
        # Do not munchify Link/MultiLink - they are list subclasses and would be turned into generators
        self.kw = Munch({k: v if isinstance(v, (Link, MultiLink)) else munchify(v) for k, v in kw.items()})
        self.parent = None

        if self.name is None:
            self.name = f"{class_name}__{random_name()}"

    @property
    def name(self):
        return self.kw.get("name")

    @name.setter
    def name(self, value):
        self.kw["name"] = value

    def attach_to(self, node: Node):
        self.parent = node
        node.objs.append(self)

    @property
    def path(self) -> str:
        if self.parent is None:
            raise RuntimeError("Component is not attached to any scene")
        p = str(self.parent.path)
        if p == "@":
            return f"@{self.name}"
        return f"{p}/{self.name}"

    def __call__(self, **kw):
        c = deepcopy(self)
        c.kw.update(kw)
        return c

TObject

Bases: Object

Object whose SOFA class is guessed from the python class name

Source code in gdsofa/core/component.py
53
54
55
56
57
58
59
class TObject(Object):
    """
    Object whose SOFA class is guessed from the python class name
    """

    def __init__(self, **kw):
        super().__init__(self.__class__.__name__, **kw)

StdRedirect(stream, fname)

Redirect stream to a file (replaces tf.StdRedirect).

Source code in gdsofa/utils.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@contextlib.contextmanager
def StdRedirect(stream, fname: Union[str, Path]):
    """Redirect stream to a file (replaces tf.StdRedirect)."""
    path = Path(fname)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        if stream is sys.stdout:
            with contextlib.redirect_stdout(f):
                yield f
        elif stream is sys.stderr:
            with contextlib.redirect_stderr(f):
                yield f
        else:
            raise ValueError("stream must be sys.stdout or sys.stderr")

dump_json(path, data, **kwargs)

Write JSON file; creates parent dirs. Wraps gdutils.dump_json.

Source code in gdsofa/utils.py
140
141
142
143
144
def dump_json(path: Union[str, Path], data: Any, **kwargs) -> None:
    """Write JSON file; creates parent dirs. Wraps gdutils.dump_json."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    gdutils.dump_json(path, data, **kwargs)

dump_path(path)

Create directory (parents, exist_ok) and return resolved path. Replaces tf.dump.

Source code in gdsofa/utils.py
116
117
118
119
120
def dump_path(path: Union[str, Path]) -> Path:
    """Create directory (parents, exist_ok) and return resolved path. Replaces tf.dump."""
    p = Path(path).resolve()
    p.mkdir(parents=True, exist_ok=True)
    return p

ensure_ext(fname, ext)

Ensure path has the given extension (e.g. '.json'). Replaces tf.ensure_ext.

Source code in gdsofa/utils.py
100
101
102
103
104
105
106
107
def ensure_ext(fname: Union[str, Path], ext: str) -> Path:
    """Ensure path has the given extension (e.g. '.json'). Replaces tf.ensure_ext."""
    p = Path(fname)
    if not ext.startswith("."):
        ext = "." + ext
    if p.suffix != ext:
        p = p.with_suffix(ext)
    return p

make_string(**kwargs)

Format key=value for logging (replaces tf.make_string).

Source code in gdsofa/utils.py
95
96
97
def make_string(**kwargs: Any) -> str:
    """Format key=value for logging (replaces tf.make_string)."""
    return ", ".join(f"{k}={v!r}" for k, v in kwargs.items())

munchify(obj)

Convert dict (and nested dicts) to Munch.

Source code in gdsofa/utils.py
76
77
78
79
80
81
82
def munchify(obj: Any) -> Munch:
    """Convert dict (and nested dicts) to Munch."""
    if isinstance(obj, dict):
        return Munch({k: munchify(v) for k, v in obj.items()})
    if isinstance(obj, (list, tuple)):
        return type(obj)(munchify(x) for x in obj)
    return obj

none_default(a, b)

Return a if a is not None else b (replaces tf.none).

Source code in gdsofa/utils.py
85
86
87
def none_default(a: Any, b: Any) -> Any:
    """Return a if a is not None else b (replaces tf.none)."""
    return a if a is not None else b

path_insert_before(fname, suffix, insert)

E.g. path_insert_before('a/b/foo.json', '.json', '_schema') -> a/b/foo_schema.json.

Source code in gdsofa/utils.py
110
111
112
113
def path_insert_before(fname: Union[str, Path], suffix: str, insert: str) -> Path:
    """E.g. path_insert_before('a/b/foo.json', '.json', '_schema') -> a/b/foo_schema.json."""
    p = Path(fname)
    return p.with_stem(p.stem + insert)

unique_id()

Return a unique string (e.g. for treelib node IDs). Replaces tf.get_string().

Source code in gdsofa/utils.py
90
91
92
def unique_id() -> str:
    """Return a unique string (e.g. for treelib node IDs). Replaces tf.get_string()."""
    return uuid.uuid4().hex

Scene and runner

Thin wrapper for SOFA scene root and runner.

RunSofa

Util class to run a scene.

Source code in gdsofa/sofawrap.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class RunSofa:
    """
    Util class to run a scene.
    """

    def __init__(
        self,
        root: Node,
        params: Union[BaseSOFAParams, None] = None,
        *controllers: Union[BaseSOFAController, List[BaseSOFAController]],
    ):
        if not os.environ.get("SOFA_ROOT"):
            raise RuntimeError("SOFA_ROOT environment variable is required")
        load_SOFA()

        self.params = none_default(params, BaseSOFAParams())
        self.root = root

        self.import_plugins()

        self.sofa_root = root.to_sofa()

        self.controllers = {}
        self.sofa_controllers = {}

        xt = as_iterable(controllers)
        if len(xt) > 0:
            for ctrl in xt:
                self.controllers[ctrl.__name__] = lxt = ctrl(self.root, self.params)
                self.sofa_controllers[ctrl.__name__] = lxst = lxt.to_SOFA(
                    self.sofa_root
                )
                self.sofa_root.addObject(lxst)

    def run(
        self,
        gui: bool = False,
        std_to_file: bool = False,
        viewer: str = "qglviewer",
        title: str = "MyProject",
    ) -> SimulationResult:
        from Sofa import Simulation  # type: ignore[import-untyped]

        def _p(key: str, default):
            return self.params[key] if key in self.params else default

        n = int(_p("n", 100))
        dt = float(_p("dt", 0.005))
        self.sofa_root.setDt(dt)

        Simulation.initRoot(self.sofa_root)

        out_dir = None
        if "out_dir" in self.params:
            out_dir = Path(self.params["out_dir"])

        with Timer() as ts:
            if gui:
                from Sofa.Gui import GUIManager
                from SofaRuntime import importPlugin

                importPlugin("Sofa.Component")
                importPlugin("Sofa.GL.Component")
                importPlugin("Sofa.GUI.Component")
                importPlugin("Sofa.Qt")
                log.info("Available GUIs: %s", GUIManager.ListSupportedGUI())

                GUIManager.Init(title, viewer)
                GUIManager.createGUI(self.sofa_root, title)
                GUIManager.SetDimension(900, 700)
                GUIManager.MainLoop(self.sofa_root)
                GUIManager.closeGUI()
            else:
                log.info("Starting SOFA for %s iterations", n)
                if std_to_file:
                    if out_dir is None:
                        raise ValueError("out_dir required when std_to_file=True")
                    out = out_dir / "Output_Python.stdout"
                    err = out_dir / "Error_Python.stderr"
                    with StdRedirect(sys.stdout, out):
                        with StdRedirect(sys.stderr, err):
                            for _ in range(n):
                                try:
                                    Simulation.animate(self.sofa_root, dt)
                                except NanSimulationError as e:
                                    log.error(f"{e}: NaN detected, stopping simulation")
                                    break
                    log.debug("Finished writing std to file")
                else:
                    for _ in range(n):
                        try:
                            Simulation.animate(self.sofa_root, dt)
                        except NanSimulationError as e:
                            log.error(f"{e}: NaN detected, stopping simulation")
                            break

        return SimulationResult(self.params, ts.secs)

    def import_plugins(self):
        """
        Check if plugins have to be imported by browsing the scene graph
        for components listed in REQUIRED_PLUGINS.
        """
        from SofaRuntime import importPlugin

        for x in self.root.build_plugins_list():
            importPlugin(x)

    def to_file(self, fname=None, clean_paths=False, doc=None):
        if fname is None:
            fname = Path(self.params.out_dir) / "sofa_scene.py"
        self.root.to_file(fname, clean_paths=clean_paths, doc=doc)

    def disable_sofa_logger(self):
        for x in self.root.up2down:
            if "printLog" in x.kw:
                x.kw["printLog"] = False

import_plugins()

Check if plugins have to be imported by browsing the scene graph for components listed in REQUIRED_PLUGINS.

Source code in gdsofa/sofawrap.py
142
143
144
145
146
147
148
149
150
def import_plugins(self):
    """
    Check if plugins have to be imported by browsing the scene graph
    for components listed in REQUIRED_PLUGINS.
    """
    from SofaRuntime import importPlugin

    for x in self.root.build_plugins_list():
        importPlugin(x)

RootNode(**kw)

Create the root node of the scene graph.

Source code in gdsofa/sofawrap.py
21
22
23
24
25
26
27
28
29
def RootNode(**kw) -> Node:
    """
    Create the root node of the scene graph.
    """
    from gdsofa.comps.base_component import DefaultVisualManagerLoop

    node = Node("root", **kw)
    node + DefaultVisualManagerLoop()
    return node

Parameters

BaseSOFAParams

Bases: Parameterized

Source code in gdsofa/sofa_parameters.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class BaseSOFAParams(param.Parameterized):
    title = param.String(
        default="SOFA Parameters",
        doc="`BaseSOFAParams` is a container for SOFA Parameters",
    )
    out_dir = Foldername(
        doc="Simulation output directory", create_if_not_exist=True, must_exist=False
    )
    data_path = Foldername(doc="Data input directory", must_exist=False)

    n: int = param.Integer(100, bounds=(0, np.inf), doc="Number of iterations")
    dt: float = param.Number(1, bounds=(0, np.inf), doc="Time step")
    scale: float = param.Number(
        1, bounds=(0, np.inf), doc="Scaling coefficient (for SOFA loaders)"
    )
    simu_dir = Foldername(doc="Simulation output directory")

    def update(self, *a, **kw):
        self.param.update(*a, **kw)

    def __len__(self):
        return len(list(self.param))

    def clean_simu_dir(self):
        p = Path(self.simu_dir)
        if p.is_dir():
            shutil.rmtree(p)
        dump_path(p)

    @staticmethod
    def _paths_to_serializable(obj):
        """Return a JSON-serializable copy of obj with Path replaced by absolute path str."""
        if isinstance(obj, Path):
            return str(obj.resolve())
        if isinstance(obj, dict):
            return {k: BaseSOFAParams._paths_to_serializable(v) for k, v in obj.items()}
        if isinstance(obj, list):
            return [BaseSOFAParams._paths_to_serializable(v) for v in obj]
        return obj

    def to_dict(self) -> dict:
        d = {name: getattr(self, name) for name in self.param}
        return self._paths_to_serializable(d)

    def dump_json(self, fname: str):
        fname = ensure_ext(fname, "json")
        dump_json(fname, self.to_dict())
        dump_json(path_insert_before(fname, ".json", "_schema"), self.param.schema())

    @classmethod
    def from_json(cls, fname: str):
        fname = ensure_ext(fname, "json")
        s = load_json(fname)
        return cls.from_dict(s)

    @classmethod
    def from_dict(cls, d: dict):
        path_param_types = (Foldername, Filename)
        path_keys = {
            name for name, p in cls.param.objects().items()
            if isinstance(p, path_param_types)
        }
        path_vals = {k: d[k] for k in path_keys if k in d}
        d_no_paths = {k: v for k, v in d.items() if k not in path_keys}

        x = cls().param
        for k, v in d_no_paths.items():
            if not hasattr(x, k):
                if isinstance(v, float):
                    z = param.Number(v)
                elif isinstance(v, list):
                    z = param.List(v)    
                elif isinstance(v, bool):
                    z = param.Boolean(v)
                elif isinstance(v, str):
                    z = Filename(v)
                else:
                    z = param.String(v)
                x.add_parameter(k, z)
        y = x.deserialize_parameters(json.dumps(d_no_paths))
        obj = cls(**y)

        for k, v in path_vals.items():
            if v is not None:
                obj._param__private.values[k] = v

        return obj

    @param.depends("out_dir", watch=True, on_init=True)
    def _update_simu_dir(self):
        if self.out_dir is not None:
            self.simu_dir = str(dump_path(Path(self.out_dir) / "simu"))

    def save(self):
        self.dump_json(Path(self.out_dir) / "params.json")
        gd.get_logger().info(f"Written: {Path(self.out_dir) / 'params.json'}")

    @classmethod
    def from_dir(cls, dname):
        return cls.from_json(Path(dname) / "params.json")

    def __iter__(self):
        return self.param.__iter__()

    def __getitem__(self, item):
        return getattr(self, item)

    def __setitem__(self, key, value):
        return setattr(self, key, value)

    def add_container(self, ct: Union[gd.Container, str]):
        r = ct if isinstance(ct, gd.Container) else gd.Container(ct)
        self.data_path = str(r)
        for k in r._files.keys():
            self.param.add_parameter(k, Filename(getattr(r, k)))
        return self

SOFA loader

Controllers

Utilities

Helpers for gdsofa: Munch, path/iterable utilities, JSON encoding.

  • From gdutils (see https://gdutils-a2ef81.gitlabpages.inria.fr/): load_json, dump_json, Timer, and get_iterable (re-exported as as_iterable).
  • gdsofa-specific: dump_json wraps gdutils to create parent directories; dump_path, Munch / munchify, JsonEncoder, StdRedirect, and path/string helpers remain here.

JsonEncoder

Bases: JSONEncoder

JSON encoder for Path, numpy types (replaces tf.JsonEncoder).

Source code in gdsofa/utils.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class JsonEncoder(json.JSONEncoder):
    """JSON encoder for Path, numpy types (replaces tf.JsonEncoder)."""

    def default(self, o):
        if isinstance(o, Path):
            return str(o)
        try:
            import numpy as np
            if isinstance(o, (np.integer, np.floating)):
                return float(o) if isinstance(o, np.floating) else int(o)
            if isinstance(o, np.ndarray):
                return o.tolist()
        except ImportError:
            pass
        return super().default(o)

Munch

Bases: dict

Dict with attribute access (replaces tf.munchify / treefiles Munch).

Source code in gdsofa/utils.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Munch(dict):
    """Dict with attribute access (replaces tf.munchify / treefiles Munch)."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        for k, v in self.items():
            if isinstance(v, dict) and not isinstance(v, Munch):
                self[k] = Munch(v)

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError as e:
            raise AttributeError(key) from e

    def __setattr__(self, key, value):
        self[key] = value

StdRedirect(stream, fname)

Redirect stream to a file (replaces tf.StdRedirect).

Source code in gdsofa/utils.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@contextlib.contextmanager
def StdRedirect(stream, fname: Union[str, Path]):
    """Redirect stream to a file (replaces tf.StdRedirect)."""
    path = Path(fname)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        if stream is sys.stdout:
            with contextlib.redirect_stdout(f):
                yield f
        elif stream is sys.stderr:
            with contextlib.redirect_stderr(f):
                yield f
        else:
            raise ValueError("stream must be sys.stdout or sys.stderr")

dump_json(path, data, **kwargs)

Write JSON file; creates parent dirs. Wraps gdutils.dump_json.

Source code in gdsofa/utils.py
140
141
142
143
144
def dump_json(path: Union[str, Path], data: Any, **kwargs) -> None:
    """Write JSON file; creates parent dirs. Wraps gdutils.dump_json."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    gdutils.dump_json(path, data, **kwargs)

dump_path(path)

Create directory (parents, exist_ok) and return resolved path. Replaces tf.dump.

Source code in gdsofa/utils.py
116
117
118
119
120
def dump_path(path: Union[str, Path]) -> Path:
    """Create directory (parents, exist_ok) and return resolved path. Replaces tf.dump."""
    p = Path(path).resolve()
    p.mkdir(parents=True, exist_ok=True)
    return p

ensure_ext(fname, ext)

Ensure path has the given extension (e.g. '.json'). Replaces tf.ensure_ext.

Source code in gdsofa/utils.py
100
101
102
103
104
105
106
107
def ensure_ext(fname: Union[str, Path], ext: str) -> Path:
    """Ensure path has the given extension (e.g. '.json'). Replaces tf.ensure_ext."""
    p = Path(fname)
    if not ext.startswith("."):
        ext = "." + ext
    if p.suffix != ext:
        p = p.with_suffix(ext)
    return p

make_string(**kwargs)

Format key=value for logging (replaces tf.make_string).

Source code in gdsofa/utils.py
95
96
97
def make_string(**kwargs: Any) -> str:
    """Format key=value for logging (replaces tf.make_string)."""
    return ", ".join(f"{k}={v!r}" for k, v in kwargs.items())

munchify(obj)

Convert dict (and nested dicts) to Munch.

Source code in gdsofa/utils.py
76
77
78
79
80
81
82
def munchify(obj: Any) -> Munch:
    """Convert dict (and nested dicts) to Munch."""
    if isinstance(obj, dict):
        return Munch({k: munchify(v) for k, v in obj.items()})
    if isinstance(obj, (list, tuple)):
        return type(obj)(munchify(x) for x in obj)
    return obj

none_default(a, b)

Return a if a is not None else b (replaces tf.none).

Source code in gdsofa/utils.py
85
86
87
def none_default(a: Any, b: Any) -> Any:
    """Return a if a is not None else b (replaces tf.none)."""
    return a if a is not None else b

path_insert_before(fname, suffix, insert)

E.g. path_insert_before('a/b/foo.json', '.json', '_schema') -> a/b/foo_schema.json.

Source code in gdsofa/utils.py
110
111
112
113
def path_insert_before(fname: Union[str, Path], suffix: str, insert: str) -> Path:
    """E.g. path_insert_before('a/b/foo.json', '.json', '_schema') -> a/b/foo_schema.json."""
    p = Path(fname)
    return p.with_stem(p.stem + insert)

unique_id()

Return a unique string (e.g. for treelib node IDs). Replaces tf.get_string().

Source code in gdsofa/utils.py
90
91
92
def unique_id() -> str:
    """Return a unique string (e.g. for treelib node IDs). Replaces tf.get_string()."""
    return uuid.uuid4().hex

SOFA component wrappers

Visual flags