使用 python 接口测试 psi 的时候,如何设置打印文件日志。
notinghere opened this issue · 11 comments
notinghere commented
Feature Request Type
Build/Install
Have you searched existing issues?
Yes
Is your feature request related to a problem?
-
测试用例 legacy_psi_test.py 中,如何添加文件日志。
-
添加以下代码:
`
def run_streaming_psi(self, wsize, self_rank, link_id, party_ids, addrs, inputs, outputs, selected_fields, protocol):log_options = logging.LogOptions() log_options.log_level = logging.LogLevel.DEBUG log_options.system_log_path = "./alice.log" logging.setup_logging(log_options)
`
libpsi 中的日志并不会打印到日志文件中,会打印到console 中,如何设置可以让日志都打印到文件中。
Describe features you want to add to SPU
- 使用python接口,可以设置统一的文件日志。
Describe features you want to add to SPU
- 使用python接口,可以设置统一的文件日志。
aokaokd commented
可以截图看一下具体是哪些日志吗
notinghere commented
- console 日志
----------test_ecdh_2pc-------------
rank = 0
rank = 1
2024-07-11 15:12:54.167 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=1, max_retry=3, interval_ms=1000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '111', http status code '0', response header '', response body '', error msg '[E111]Fail to connect Socket{id=0 addr=127.0.0.1:20223} (0x0x17cb600): Connection refused'
2024-07-11 15:12:55.169 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=2, max_retry=3, interval_ms=3000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
2024-07-11 15:12:58.170 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=3, max_retry=3, interval_ms=5000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
[2024-07-11 15:13:03.177] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_PSI_2PC","broadcast_result":true,"input_params":{"path":"./data/alice.csv","select_fields":["id","idx"]},"output_params":{"path":"./alice-kkrt.csv","need_sort":true},"curve_type":"CURVE_25519"}
[2024-07-11 15:13:03.177] [info] [bucket_psi.cc:400] bucket size set to 1048576
[2024-07-11 15:13:03.178] [info] [bucket_psi.cc:252] Begin sanity check for input file: ./data/alice.csv, precheck_switch:false
[2024-07-11 15:13:03.198] [info] [bucket_psi.cc:265] End sanity check for input file: ./data/alice.csv, size=1546
[2024-07-11 15:13:03.198] [info] [bucket_psi.cc:425] Run psi protocol=1, self_items_count=1546
[2024-07-11 15:13:03.199] [info] [cryptor_selector.cc:38] Using IPPCP
[2024-07-11 15:13:03.246] [info] [thread_pool.cc:30] Create a fixed thread pool with size 1
[2024-07-11 15:13:03.290] [info] [ecdh_psi.cc:106] MaskSelf:9999 --finished, batch_count=1, self_item_count=1546
[2024-07-11 15:13:03.304] [info] [ecdh_psi.cc:365] ID 9999: MaskSelf finished.
[2024-07-11 15:13:03.360] [info] [ecdh_psi.cc:169] MaskPeer:9999 --finished, batch_count=1, peer_item_count=1560
[2024-07-11 15:13:03.360] [info] [ecdh_psi.cc:369] ID 9999: MaskPeer finished.
[2024-07-11 15:13:03.374] [info] [ecdh_psi.cc:212] RecvDualMaskedSelf:9999 recv last batch finished, batch_count=1
[2024-07-11 15:13:03.374] [info] [ecdh_psi.cc:373] ID 9999: RecvDualMaskedSelf finished.
[2024-07-11 15:13:03.420] [info] [bucket_psi.cc:382] Begin post filtering, indices.size=165, should_sort=true
[2024-07-11 15:13:03.435] [info] [key.cc:91] Executing sort scripts: tail -n +2 ./tmp-sort-in-9bc353dc-d742-45ea-a530-7122ea3c61a8 | LC_ALL=C sort --parallel=2 --buffer-size=1G --stable --field-separator=, --key=2,2 --key=1,1 >>./tmp-sort-out-9bc353dc-d742-45ea-a530-7122ea3c61a8
[2024-07-11 15:13:03.467] [info] [key.cc:93] Finished sort scripts: tail -n +2 ./tmp-sort-in-9bc353dc-d742-45ea-a530-7122ea3c61a8 | LC_ALL=C sort --parallel=2 --buffer-size=1G --stable --field-separator=, --key=2,2 --key=1,1 >>./tmp-sort-out-9bc353dc-d742-45ea-a530-7122ea3c61a8, ret=0
[2024-07-11 15:13:03.467] [info] [bucket_psi.cc:390] End post filtering, in=./data/alice.csv, out=./alice-kkrt.csv
id:abc, psi_type: 1, original_count: 1546, intersection_count: 165, source_count: 1547, output_count: 166
- 文件日志
2024-07-11 15:12:54.167 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=1, max_retry=3, interval_ms=1000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '111', http status code '0', response header '', response body '', error msg '[E111]Fail to connect Socket{id=0 addr=127.0.0.1:20223} (0x0x17cb600): Connection refused'
2024-07-11 15:12:55.169 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=2, max_retry=3, interval_ms=3000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
2024-07-11 15:12:58.170 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=3, max_retry=3, interval_ms=5000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
notinghere commented
求交双方代码如下
- alice.py
# Copyright 2021 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import multiprocess
from absl import app, flags
import libspu.link as link
import libspu.logging as logging
import psi as psi
from utils import get_free_port, wc_count
# from spu.utils.simulation import PropagatingThread
class Test:
def run_streaming_psi(self, wsize, self_rank, link_id, party_ids, addrs, inputs, outputs, selected_fields, protocol):
time_stamp = time.time()
lctx_desc = link.Desc()
lctx_desc.id = link_id
lctx_desc.recv_timeout_ms = 30*1000
log_options = logging.LogOptions()
log_options.log_level = logging.LogLevel.DEBUG
# log_options.enable_console_logger = False
log_options.system_log_path = "./alice.log"
# log_options.trace_log_path = "./alice_trace.log"
logging.setup_logging(log_options)
for rank in range(wsize):
print(f"rank = {rank}")
lctx_desc.add_party(party_ids[rank], addrs[rank])
def wrap(rank, selected_fields, input_path, output_path, type):
lctx = link.create_brpc(lctx_desc, rank)
config = psi.BucketPsiConfig(
psi_type=type,
broadcast_result=True,
input_params=psi.InputParams(
path=input_path, select_fields=selected_fields
),
output_params=psi.OutputParams(path=output_path, need_sort=True),
curve_type=psi.CurveType.CURVE_25519,
)
if type == psi.PsiType.DP_PSI_2PC:
config.dppsi_params.bob_sub_sampling = 0.9
config.dppsi_params.epsilon = 3
report = psi.bucket_psi(lctx, config)
source_count = wc_count(input_path)
output_count = wc_count(output_path)
print(
f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
)
lctx.stop_link()
# launch with multiprocess
job = multiprocess.Process(
target=wrap,
args=(
self_rank,
selected_fields,
inputs[self_rank],
outputs[self_rank],
protocol,
),
)
job.start()
job.join()
def test_kkrt_2pc(self):
print("----------test_kkrt_2pc-------------")
wsize = 2
self_rank = 0
link_id = "abc"
inputs = ["./data/alice.csv", "./data/bob.csv"]
outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
selected_fields = ["id", "idx"]
party_ids = ["9999","10000"]
addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]
self.run_streaming_psi(
wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
)
def test_ecdh_2pc(self):
print("----------test_ecdh_2pc-------------")
wsize = 2
self_rank = 0
link_id = "abc"
inputs = ["./data/alice.csv", "./data/bob.csv"]
outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
selected_fields = ["id", "idx"]
party_ids = ["9999","10000"]
addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]
self.run_streaming_psi(
wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
)
def test_ecdh_2pc(self):
print("----------test_ecdh_2pc-------------")
wsize = 2
self_rank = 0
link_id = "abc"
inputs = ["./data/alice.csv", "./data/bob.csv"]
outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
selected_fields = ["id", "idx"]
party_ids = ["9999","10000"]
addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]
self.run_streaming_psi(
wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
)
def test_ecdh_3pc(self):
print("----------test_ecdh_3pc-------------")
wsize = 3
self_rank = 0
link_id = "abc"
inputs = [
"./data/alice.csv",
"./data/bob.csv",
"./data/carol.csv",
]
outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
selected_fields = ["id", "idx"]
party_ids = ["9999","10000","9998"]
addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
# addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]
self.run_streaming_psi(
wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
)
def run_psi(_):
t= Test()
# t.test_dppsi_2pc()
t.test_ecdh_2pc()
# t.test_ecdh_3pc()
# t.test_kkrt_2pc()
if __name__ == '__main__':
app.run(run_psi)
- bob.py
# Copyright 2021 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import multiprocess
from absl import app, flags
import libspu.link as link
import psi as psi
from utils import get_free_port, wc_count
# from spu.utils.simulation import PropagatingThread
class Test:
def run_streaming_psi(self, wsize, self_rank, link_id,party_ids, addrs, inputs, outputs, selected_fields, protocol):
time_stamp = time.time()
lctx_desc = link.Desc()
lctx_desc.id = link_id
lctx_desc.recv_timeout_ms = 30*1000
for rank in range(wsize):
print(f"rank = {rank}")
lctx_desc.add_party(party_ids[rank], addrs[rank])
def wrap(rank, selected_fields, input_path, output_path, type):
lctx = link.create_brpc(lctx_desc, rank)
config = psi.BucketPsiConfig(
psi_type=type,
broadcast_result=True,
input_params=psi.InputParams(
path=input_path, select_fields=selected_fields
),
output_params=psi.OutputParams(path=output_path, need_sort=True),
curve_type=psi.CurveType.CURVE_25519,
)
if type == psi.PsiType.DP_PSI_2PC:
config.dppsi_params.bob_sub_sampling = 0.9
config.dppsi_params.epsilon = 3
report = psi.bucket_psi(lctx, config)
source_count = wc_count(input_path)
output_count = wc_count(output_path)
print(
f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
)
lctx.stop_link()
# launch with multiprocess
job = multiprocess.Process(
target=wrap,
args=(
self_rank,
selected_fields,
inputs[self_rank],
outputs[self_rank],
protocol,
),
)
job.start()
job.join()
def test_kkrt_2pc(self):
print("----------test_kkrt_2pc-------------")
wsize = 2
self_rank = 1
link_id = "abc"
inputs = ["./data/alice.csv", "./data/bob.csv"]
outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
selected_fields = ["id", "idx"]
party_ids = ["9999","10000"]
addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]
self.run_streaming_psi(
wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
)
def test_ecdh_2pc(self):
print("----------test_kkrt_2pc-------------")
wsize = 2
self_rank = 1
link_id = "abc"
inputs = ["./data/alice.csv", "./data/bob.csv"]
outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
selected_fields = ["id", "idx"]
party_ids = ["9999","10000"]
addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]
self.run_streaming_psi(
wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
)
def test_ecdh_3pc(self):
print("----------test_ecdh_3pc-------------")
wsize = 3
self_rank = 1
link_id = "abc"
inputs = [
"./data/alice.csv",
"./data/bob.csv",
"./data/carol.csv",
]
outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
selected_fields = ["id", "idx"]
party_ids = ["9999","10000","9998"]
addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
# addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]
self.run_streaming_psi(
wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
)
def run_psi(_):
t= Test()
# t.test_dppsi_2pc()
t.test_ecdh_2pc()
# t.test_ecdh_3pc()
# t.test_kkrt_2pc()
if __name__ == '__main__':
app.run(run_psi)
aokaokd commented
好的,我看到所有debug 的日志都打印了。你把 log_options.log_level = logging.LogLevel.DEBUG
这里的DEBUG改成 INFO 试一下
notinghere commented
-
试了,打印不出来. console 中打印的大部分 info 信息来自 libpsi.cc相关代码中。 这部分代码在psi库中。
-
文件中打印的内容来自yacl 代码中。
-
spu 中定义的日志级别
enum class LogLevel {
Debug = 0,
Info = 1,
Warn = 2,
Error = 3,
};
aokaokd commented
刚才又确认了下,不好意思,c++生成的日志不受python影响。如果你的需求比较强烈的话,可以用nohup ... &
运行
notinghere commented
- c++ 日志如何配置,可以落到文件?
aokaokd commented
抱歉,目前不支持的哈
notinghere commented
- 日志使用的spdlog , console 可以打印日志,没法打印到文件?
anakinxc commented
Hi @notinghere
You should be able to redirect stdout to file
github-actions commented
Stale issue message. Please comment to remove stale tag. Otherwise this issue will be closed soon.