ydf0509/funboost

win平台multi_process_consume()消费日志缺少

User-Clb opened this issue · 45 comments

文件1:
import time,os
import random
from funboost import boost, BrokerEnum

@boost('test_rabbit_queue',broker_exclusive_config={'x-max-priority':5}, broker_kind=BrokerEnum.RABBITMQ_AMQPSTORM, qps=100,is_using_distributed_frequency_control=True)
def test_fun(x):
random_sleep = random.randrange(1, 50000) / 10000
time.sleep(random_sleep)
print(x, os.getpid())

if name == 'main':
test_fun.clear()
test_fun.multi_process_consume(3)

文件2:
from funboost import get_publisher, BrokerEnum,PriorityConsumingControlConfig

import random

publisher1 = get_publisher('test_rabbit_queue', broker_kind=BrokerEnum.RABBITMQ_AMQPSTORM,broker_exclusive_config={'x-max-priority':5})

if name == 'main':
for i in range(1000):
randx = random.randint(1, 4)
publisher1.publish({"x":i},priority_control_config=PriorityConsumingControlConfig(other_extra_params={'priroty': randx}))

屏幕截图 2023-09-29 194742

不同文件消费和推送,不需要手动get publisher和getconsumer,文档上应该没有这种写法。

你直接在发布文件里面import 那个boost装饰的函数就好了,你这种单独手写生成消费者和生产者的写法很奇怪

我是参考这段来写的
屏幕截图 2023-09-30 144014

现在即使我把消费发布都写在一个文件中,依然出现了任务丢失现象:
`import time,os
from funboost import boost, BrokerEnum

@boost('test_rabbit_queue', broker_kind=BrokerEnum.RABBITMQ_AMQPSTORM, qps=30,is_using_distributed_frequency_control=True)
def test_fun(x):
print(x)

def _push():
for i in range(1000):
test_fun.push(x=i)

if name == 'main':
test_fun.multi_process_consume(3)
while True:
input("")
_push()`

屏幕截图 2023-09-30 144545

image

image

不同文件写发布真的我很简单,这么简单的import就好了,你自己手动get_publisher我想不通

是的,我能明白这个简单的import方式,我正在学习这个框架更多的方式,但却遇到了些问题,这段代码并不是不能工作,只是发现有任务丢失

image

我搜索第8行print了1000次刚好

你要是不懂怎么判断有没有丢失,建议你开启结果持久化,去数据库里面看偶多少条结果,

或者自己在函数里面插入东西到数据库。

在数据库里面去统计count就好了

这个东西是分布式的啊,你一直不换队列名测试,你要确保别的地方没有消费啊,如果你还有其他地方在消费,你只看当前的控制台,消息肯定没运行那么多消息。建议你每次都换队列名来测试,这样就不会有其他地方也在消费这个队列了

你懂不懂,我让你换队列名的目的?

你进程x没关闭,在消费q1,

然后你行启动进程y,也在消费q1,你只看y的控制台打印消息次数,肯定就少了,你现在测试方法有很大问题

我建议你换队列名,然后把运行的消息保存到数据库,去数据库里面统计

我查看的是pythonlogs目录下的日志统计文件的打印结果,它打印了3个进程的日志输出

那个可不是日志,print不是日志,logging才是日志

那是nb_log转化print到文件了,不是日志,print和日志是两回事

建议你写到数据库,别看print文件了

print自动写到文件那是nb_log的高级自动转化,在win上批量魔法有缓冲,自己也可以把是否批量关了,nb_log_config.py有关闭批量sys.std的功能。

你看数量应该写入数据库,而不是看print文件。

image

你要是不想手写插入数据库,可以自己用框架的mongo自动写数据库

用redis统计了一下确实未丢失

关闭批量sys.std的功能是设置这个吗?USE_BULK_STDOUT_ON_WINDOWS = True # 在win上是否每隔0.1秒批量stdout,win的io太差了

那要多测试下多进程下的win和linux的文件写入,你不开多进程试试,

我发现你很喜欢 test_fun.multi_process_consume(n)这样启动消费,只有在单进程cpu达到100%了,才需要这样,test_fun.conume()就好了

我在测试多个进程下的分布式效果是什么样

这真的是一个方便且简洁还很强大的框架,爱了

我刚才windwos测试了

image

没有少消息,你看的日志就是个错误的地方啊,每隔队列的消费者和生产者都有单独的文件名,懂吗

image
mei每个消费者都有独立的日志文件

那个print文件不是funboost的义务,你用任何框架print都没义务自动把print写入日志文件。

你要写入日志文件,就要使用日志来记录消息,而不是用print记录消息。

如果你企图使用手动记录消息到文件,应该这样,而不是使用print,要养成良好的习惯,多用日志

例如nb_log的使用
···python
import nb_log

logger = nb_log.get_logger('name1',log_filename='name1.log')

logger.info(6666)
logger.debug(6666)
···

日志也可以使用loguru, logging,用print代替日志是很差的变成习惯

我这里只发现了这些文件
屏幕截图 2023-09-30 165228
屏幕截图 2023-09-30 165413

image
这不就是嘛,每隔队列名都有自己的日志文件。

因为我加了个2换了个队列名字啊
@boost('test_rabbit_queue2',broker_exclusive_config={'x-max-priority':5},

是的,在项目中我不会使用print作为输出

每哥日志文件你都要看看啊,那个.print 和.std 是魔法钩子,自动对python的print和sys.stdout的做了猴子补丁。.print 和 .std 不是日志文件,只是个魔法猴子补丁,不是日志本身。

funboost生成的日志都是logging记录的,默认都是.log文件后缀,

除了RabbitmqPublisherUsingAmqpStorm--test_rabbit_queue.log日志完整的记录了push消息,其他文件都有所缺失

image

我自己z在win运行了2次,运行一次后有1000个搜索,运行2次又2000个,1个都没丢

这是一个很奇怪的事情
屏幕截图 2023-09-30 170327

我把nb_log再打个包,你等会升级下,线上的nb_log第六个handler有问题

image
或者你先设置filehandler为1

python -m pip install nb_log==10.3 --upgrade -i https://pypi.org/simple 升级一下最新版本的nb_log,再试下6还行不

win上 第6种filehandler, 我升级到10.3后没有丢日志了,10.2的有丢日志

10.2的那个内存缓存没有及时写入文件,因为win的io性能很差,为了保证快速打印控制台快如写入文件 ,是和linux分开两种逻辑实现的

filehandler切换为1时未发现问题,python -m pip install nb_log==10.3 --upgrade -i https://pypi.org/simple 升级后切换回6依然存在问题
屏幕截图 2023-09-30 171440

你还是用多进程启动消费的吗,还是consume启动的

代码没有改动,我已经切换使用filehandler=1

那我要再改改第6种filehandler需要多测试在多进程环境下。
要保证多进程切割时候安全,并且使用Python自身切割日志,还要确保win和linux写入性能好,这种filehandler的设计非常难,为什么发明了那么多种,主要是这个原因,外国大神发明的concurrent_log_handler 是第5种filehandeler性能不好,尤其是在windows下性能太惨了,如果用户不在windwos部署生产代码和快速在win写入日志文件那还行。

concurrent_log_handler 是第五种是美国大神写的,也是最知名的三方多进程切割安全的filehanlder,采用的是文件锁实现的,每一次写入文件都使用一个文件锁,这样能够确保多进程非常安全,但是代价非常重,在win上的性能尤其惨烈,啥复杂逻辑都不做,只写入文件,每秒写入文件日志条数无法破千

image
funboost/core/muliti_process_enhance.py 第21行加了while 1,阻止主线程结束,那么log的第6个filehandler的守护线程才不会退出.

这样文件日志就不会缺少末尾的了.

funboost启动消费后上没有主线程在运行的,例如
f1.consume()
f2.consume()

是两个在子线程中 while 1循环获取消息,他没有主线程,所以每个进程最末尾加上 while 1: time.sleep(100) 无限懵逼死循环不让主线结束是最好的.

即使有其他while 1的永不结束的子线程在运行,守护线程也会结束,只有主线程不结束,守护线程才不会结束.

image

nb_log的第6种filehandler在win上是批量写入文件,但为了代码能自动结束,所以用的是守护线程.