secretflow/psi

使用 python 接口测试 psi 的时候,如何设置打印文件日志。

notinghere opened this issue · 11 comments

Feature Request Type

Build/Install

Have you searched existing issues?

Yes

Is your feature request related to a problem?

  • 测试用例 legacy_psi_test.py 中,如何添加文件日志。

  • 添加以下代码:
    `
    def run_streaming_psi(self, wsize, self_rank, link_id, party_ids, addrs, inputs, outputs, selected_fields, protocol):

      log_options = logging.LogOptions()
      log_options.log_level = logging.LogLevel.DEBUG
      log_options.system_log_path = "./alice.log"
      logging.setup_logging(log_options)
    

`
libpsi 中的日志并不会打印到日志文件中,会打印到console 中,如何设置可以让日志都打印到文件中。

Describe features you want to add to SPU

  • 使用python接口,可以设置统一的文件日志。

Describe features you want to add to SPU

  • 使用python接口,可以设置统一的文件日志。

可以截图看一下具体是哪些日志吗

  • console 日志
----------test_ecdh_2pc-------------
rank = 0
rank = 1
2024-07-11 15:12:54.167 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=1, max_retry=3, interval_ms=1000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '111', http status code '0', response header '', response body '', error msg '[E111]Fail to connect Socket{id=0 addr=127.0.0.1:20223} (0x0x17cb600): Connection refused'
2024-07-11 15:12:55.169 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=2, max_retry=3, interval_ms=3000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
2024-07-11 15:12:58.170 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=3, max_retry=3, interval_ms=5000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
[2024-07-11 15:13:03.177] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_PSI_2PC","broadcast_result":true,"input_params":{"path":"./data/alice.csv","select_fields":["id","idx"]},"output_params":{"path":"./alice-kkrt.csv","need_sort":true},"curve_type":"CURVE_25519"}
[2024-07-11 15:13:03.177] [info] [bucket_psi.cc:400] bucket size set to 1048576
[2024-07-11 15:13:03.178] [info] [bucket_psi.cc:252] Begin sanity check for input file: ./data/alice.csv, precheck_switch:false
[2024-07-11 15:13:03.198] [info] [bucket_psi.cc:265] End sanity check for input file: ./data/alice.csv, size=1546
[2024-07-11 15:13:03.198] [info] [bucket_psi.cc:425] Run psi protocol=1, self_items_count=1546
[2024-07-11 15:13:03.199] [info] [cryptor_selector.cc:38] Using IPPCP
[2024-07-11 15:13:03.246] [info] [thread_pool.cc:30] Create a fixed thread pool with size 1
[2024-07-11 15:13:03.290] [info] [ecdh_psi.cc:106] MaskSelf:9999 --finished, batch_count=1, self_item_count=1546
[2024-07-11 15:13:03.304] [info] [ecdh_psi.cc:365] ID 9999: MaskSelf finished.
[2024-07-11 15:13:03.360] [info] [ecdh_psi.cc:169] MaskPeer:9999 --finished, batch_count=1, peer_item_count=1560
[2024-07-11 15:13:03.360] [info] [ecdh_psi.cc:369] ID 9999: MaskPeer finished.
[2024-07-11 15:13:03.374] [info] [ecdh_psi.cc:212] RecvDualMaskedSelf:9999 recv last batch finished, batch_count=1
[2024-07-11 15:13:03.374] [info] [ecdh_psi.cc:373] ID 9999: RecvDualMaskedSelf finished.
[2024-07-11 15:13:03.420] [info] [bucket_psi.cc:382] Begin post filtering, indices.size=165, should_sort=true
[2024-07-11 15:13:03.435] [info] [key.cc:91] Executing sort scripts: tail -n +2 ./tmp-sort-in-9bc353dc-d742-45ea-a530-7122ea3c61a8 | LC_ALL=C sort  --parallel=2 --buffer-size=1G --stable --field-separator=, --key=2,2 --key=1,1  >>./tmp-sort-out-9bc353dc-d742-45ea-a530-7122ea3c61a8
[2024-07-11 15:13:03.467] [info] [key.cc:93] Finished sort scripts: tail -n +2 ./tmp-sort-in-9bc353dc-d742-45ea-a530-7122ea3c61a8 | LC_ALL=C sort  --parallel=2 --buffer-size=1G --stable --field-separator=, --key=2,2 --key=1,1  >>./tmp-sort-out-9bc353dc-d742-45ea-a530-7122ea3c61a8, ret=0
[2024-07-11 15:13:03.467] [info] [bucket_psi.cc:390] End post filtering, in=./data/alice.csv, out=./alice-kkrt.csv
id:abc, psi_type: 1, original_count: 1546, intersection_count: 165, source_count: 1547, output_count: 166

  • 文件日志
2024-07-11 15:12:54.167 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=1, max_retry=3, interval_ms=1000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '111', http status code '0', response header '', response body '', error msg '[E111]Fail to connect Socket{id=0 addr=127.0.0.1:20223} (0x0x17cb600): Connection refused'
2024-07-11 15:12:55.169 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=2, max_retry=3, interval_ms=3000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
2024-07-11 15:12:58.170 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=3, max_retry=3, interval_ms=5000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'

求交双方代码如下

  • alice.py
# Copyright 2021 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import unittest

import multiprocess

from absl import app, flags

import libspu.link as link
import libspu.logging as logging
import psi as psi
from utils import get_free_port, wc_count
# from spu.utils.simulation import PropagatingThread


class Test:

    def run_streaming_psi(self, wsize, self_rank, link_id, party_ids, addrs, inputs, outputs, selected_fields, protocol):
        time_stamp = time.time()
        lctx_desc = link.Desc()
        lctx_desc.id = link_id
        lctx_desc.recv_timeout_ms = 30*1000

        log_options = logging.LogOptions()
        log_options.log_level = logging.LogLevel.DEBUG
        # log_options.enable_console_logger = False
        log_options.system_log_path = "./alice.log"
        # log_options.trace_log_path = "./alice_trace.log"
        
        logging.setup_logging(log_options)


        for rank in range(wsize):
            print(f"rank = {rank}")
            lctx_desc.add_party(party_ids[rank], addrs[rank])

        def wrap(rank, selected_fields, input_path, output_path, type):
            lctx = link.create_brpc(lctx_desc, rank)

            config = psi.BucketPsiConfig(
                psi_type=type,
                broadcast_result=True,
                input_params=psi.InputParams(
                    path=input_path, select_fields=selected_fields
                ),
                output_params=psi.OutputParams(path=output_path, need_sort=True),
                curve_type=psi.CurveType.CURVE_25519,
            )

            if type == psi.PsiType.DP_PSI_2PC:
                config.dppsi_params.bob_sub_sampling = 0.9
                config.dppsi_params.epsilon = 3

            report = psi.bucket_psi(lctx, config)

            source_count = wc_count(input_path)
            output_count = wc_count(output_path)
            print(
                f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
            )

            lctx.stop_link()

        # launch with multiprocess
        job = multiprocess.Process(
                target=wrap,
                args=(
                    self_rank,
                    selected_fields,
                    inputs[self_rank],
                    outputs[self_rank],
                    protocol,
                ),
            )
        job.start()
        job.join()

    def test_kkrt_2pc(self):
        print("----------test_kkrt_2pc-------------")

        wsize = 2
        self_rank = 0
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
        )

    def test_ecdh_2pc(self):
        print("----------test_ecdh_2pc-------------")

        wsize = 2
        self_rank = 0
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
        )

    def test_ecdh_2pc(self):
        print("----------test_ecdh_2pc-------------")

        wsize = 2
        self_rank = 0
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
        )

    def test_ecdh_3pc(self):
        print("----------test_ecdh_3pc-------------")

        wsize = 3
        self_rank = 0
        link_id = "abc"
        inputs = [
            "./data/alice.csv",
            "./data/bob.csv",
            "./data/carol.csv",
        ]
        outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000","9998"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
        # addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
        )

def run_psi(_):
    t= Test()
    # t.test_dppsi_2pc()
    t.test_ecdh_2pc()
    # t.test_ecdh_3pc()
    # t.test_kkrt_2pc()
    

if __name__ == '__main__':
    app.run(run_psi)

  • bob.py

# Copyright 2021 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import unittest

import multiprocess

from absl import app, flags

import libspu.link as link
import psi as psi
from utils import get_free_port, wc_count
# from spu.utils.simulation import PropagatingThread


class Test:

    def run_streaming_psi(self, wsize, self_rank, link_id,party_ids, addrs, inputs, outputs, selected_fields, protocol):
        time_stamp = time.time()
        lctx_desc = link.Desc()
        lctx_desc.id = link_id
        lctx_desc.recv_timeout_ms = 30*1000

        for rank in range(wsize):
            print(f"rank = {rank}")
            lctx_desc.add_party(party_ids[rank], addrs[rank])

        def wrap(rank, selected_fields, input_path, output_path, type):
            lctx = link.create_brpc(lctx_desc, rank)

            config = psi.BucketPsiConfig(
                psi_type=type,
                broadcast_result=True,
                input_params=psi.InputParams(
                    path=input_path, select_fields=selected_fields
                ),
                output_params=psi.OutputParams(path=output_path, need_sort=True),
                curve_type=psi.CurveType.CURVE_25519,
            )

            if type == psi.PsiType.DP_PSI_2PC:
                config.dppsi_params.bob_sub_sampling = 0.9
                config.dppsi_params.epsilon = 3

            report = psi.bucket_psi(lctx, config)

            source_count = wc_count(input_path)
            output_count = wc_count(output_path)
            print(
                f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
            )

            lctx.stop_link()

        # launch with multiprocess
        job = multiprocess.Process(
                target=wrap,
                args=(
                    self_rank,
                    selected_fields,
                    inputs[self_rank],
                    outputs[self_rank],
                    protocol,
                ),
            )
        job.start()
        job.join()

    def test_kkrt_2pc(self):
        print("----------test_kkrt_2pc-------------")

        wsize = 2
        self_rank = 1
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
        )

    def test_ecdh_2pc(self):
        print("----------test_kkrt_2pc-------------")

        wsize = 2
        self_rank = 1
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
        )
    def test_ecdh_3pc(self):
        print("----------test_ecdh_3pc-------------")

        wsize = 3
        self_rank = 1
        link_id = "abc"
        inputs = [
            "./data/alice.csv",
            "./data/bob.csv",
            "./data/carol.csv",
        ]
        outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000","9998"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
        # addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
        )


def run_psi(_):
    t= Test()
    # t.test_dppsi_2pc()
    t.test_ecdh_2pc()
    # t.test_ecdh_3pc()
    # t.test_kkrt_2pc()
    

if __name__ == '__main__':
    app.run(run_psi)

好的,我看到所有debug 的日志都打印了。你把 log_options.log_level = logging.LogLevel.DEBUG 这里的DEBUG改成 INFO 试一下

  • 试了,打印不出来. console 中打印的大部分 info 信息来自 libpsi.cc相关代码中。 这部分代码在psi库中。

  • 文件中打印的内容来自yacl 代码中。

  • spu 中定义的日志级别

enum class LogLevel {
  Debug = 0,
  Info = 1,
  Warn = 2,
  Error = 3,
};

刚才又确认了下,不好意思,c++生成的日志不受python影响。如果你的需求比较强烈的话,可以用nohup ... & 运行

  • c++ 日志如何配置,可以落到文件?

抱歉,目前不支持的哈

  • 日志使用的spdlog , console 可以打印日志,没法打印到文件?

Hi @notinghere

You should be able to redirect stdout to file

Stale issue message. Please comment to remove stale tag. Otherwise this issue will be closed soon.