欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

python3多进程写Kafka异步线程调用接口

时间:2023-05-12

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author: 风过无言花易落# @Date : 2022/02/14 22:30# @Desc : kafka生产脚本from confluent_kafka import Producerimport jsonimport timeit,time,osfrom faker import Fakerimport multiprocessing as mpimport randomimport datetimeimport stringimport threadingimport mathimport requestsimport loggingimport logging.handlersf = Faker(locale='zh-CN')class logs(object): def __init__(self, level, logger=None): self.logger = logger self.logger = logging.getLogger(logger) # 设置输出的等级 LEVELS = {'NOSET': logging.NOTSET, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL} # 创建文件目录 logs_dir = "log" if os.path.exists(logs_dir) and os.path.isdir(logs_dir): pass else: os.mkdir(logs_dir) # 修改log保存位置 timestamp = time.strftime("%Y-%m-%d", time.localtime()) logfilename = "log-%s.log" % timestamp logfilepath = os.path.join(logs_dir, logfilename) rotatingFileHandler = logging.handlers.RotatingFileHandler(filename=logfilepath, maxBytes=1024 * 1024 * 50, backupCount=500) # 设置输出格式 formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') rotatingFileHandler.setFormatter(formatter) # 控制台句柄 console = logging.StreamHandler() Level = LEVELS.get(level) console.setLevel(Level) console.setFormatter(formatter) # 添加内容到日志句柄中 self.logger.addHandler(rotatingFileHandler) self.logger.addHandler(console) self.logger.setLevel(Level) # 解决重复日志问题 self.logger.handlers = self.logger.handlers[:1] def info(self, message): self.logger.info(message) def debug(self, message): self.logger.debug(message) def warning(self, message): self.logger.warning(message) def error(self, message): self.logger.error(message)class CreateIp(object): ''' IP随机 ''' def __init__(self): self.ipv4_prov_prefix = { '10': ['192.168.169.', '192.168.239.', '192.168.135.'], '11': ['192.168.197.', '192.168.243.', '192.168.128.', '192.168.165.'], '13': ['192.168.212.', '192.168.132.', '192.168.166.'], '17': ['192.168.229.', '112.224.240.', '192.168.142.', '192.168.177.', '192.168.198.', '192.168.200.'], '18': ['192.168.217.', '192.168.238.', '192.168.133.', '192.168.167.', '192.168.201.', '192.168.202.'], '19': ['192.168.168.', '192.168.242.', '192.168.241.', '192.168.134.'], '30': ['192.168.178.', '192.168.244.', '192.168.143.'], '31': ['192.168.222.', '192.168.237.', '192.168.129.', '192.168.1168.', '192.168.221.'], '34': ['192.168.176.', '192.168.245.', '192.168.246.', '192.168.232.', '192.168.131.'], '36': ['192.168.168.', '192.168.144.', '192.168.180.'], '38': ['192.168.226.', '192.168.145.', '192.168.181.', '192.168.182.'], '50': ['192.168.185.', '192.168.147.'], '51': ['192.168.216.', '192.168.130.', '192.168.184.', '192.168.199.', '192.168.203.', '192.168.204.', '192.168.205.', '192.168.214.', '192.168.215.'], '59': ['192.168.186.', '192.168.148.'], '70': ['192.168.194.', '192.168.156.'], '71': ['192.168.173.', '192.168.139.'], '74': ['192.168.219.', '192.168.140.', '192.168.174.', '192.168.218.'], '75': ['192.168.183.', '192.168.146.'], '76': ['192.168.234.', '192.168.141.', '192.168.175.', '192.168.207.', '192.168.208.', '192.168.223.'], '168': ['192.168.191.', '192.168.153.'], '81': ['192.168.231.', '192.168.149.', '192.168.187.', '192.168.209.', '192.168.230.'], '83': ['192.168.211.', '192.168.150.', '192.168.188.', '192.168.210.'], '84': ['192.168.192.', '192.168.154.'], '85': ['192.168.228.', '192.168.151.', '192.168.189.', '192.168.227.'], '86': ['192.168.225.', '192.168.152.', '192.168.190.', '192.168.224.'], '87': ['192.168.193.', '192.168.236.', '192.168.235.', '192.168.155.'], '88': ['192.168.195.', '192.168.157.'], '89': ['192.168.213.', '192.168.233.', '192.168.158.', '192.168.196.'], '90': ['192.168.171.', '192.168.137.'], '91': ['192.168.192.', '192.168.240.', '192.168.136.', '192.168.170.'], '97': ['192.168.172.', '192.168.138.'] } def create_ipv4(self,roam_type, prov): if roam_type == 0: ipv4_add = random.choice(self.ipv4_prov_prefix[prov]) + str(random.randint(1, 125)) elif roam_type == 1: del self.ipv4_prov_prefix[prov] new_prov = str(random.sample(self.ipv4_prov_prefix.keys(),1)[0]) ipv4_add = random.choice(self.ipv4_prov_prefix[new_prov]) + str(random.randint(1, 125)) return ipv4_adddef getRandomString(number): ''' 随机字符串 ''' rule = string.ascii_letters + string.digits str = random.sample(rule, number) return "".join(str)def randomtimes(start, end, n, frmt="%Y-%m-%d %H:%M:%S"): ''' 随机时间区间 ''' stime = datetime.datetime.strptime(start, frmt) etime = datetime.datetime.strptime(end, frmt) time_datetime=[random.random() * (etime - stime) + stime for _ in range(n)] time_str=[t.strftime(frmt) for t in time_datetime] return time_str[0]def create_phone(): ''' 随机手机号 ''' # 第二位数字 second = [3, 4, 5, 7, 8][random.randint(1, 4)] #第三位数字 third = {0: random.randint(0, 9), 4: [5, 7, 9][random.randint(0, 2)], 5: [i for i in range(10) if i != 4][random.randint(0, 8)], 7: [i for i in range(10) if i not in [4, 9]][random.randint(0, 7)], 8: random.randint(0, 9), }[second] # 最后八位数字 suffix = random.randint(9999999, 100000000) # 拼接手机号 return "1{}{}{}".format(second, third, suffix)# 这里的参数包括一个基准点,和一个距离基准点的距离def generate_random_gps(base_log=None, base_lat=None, radius=None): ''' # 随机经纬度 # 这里的参数包括一个基准点,和一个距离基准点的距离 ''' if base_log == None or base_lat == None: base_log = 136.55491 base_lat = 49.919034 radius = 1000000 elif radius == None: print('距离半径不可为空') radius_in_degrees = radius / 111300 u = float(random.uniform(0.0, 1.0)) v = float(random.uniform(0.0, 1.0)) w = radius_in_degrees * math.sqrt(u) t = 2 * math.pi * v x = w * math.cos(t) y = w * math.sin(t) longitude = y + base_log latitude = x + base_lat # 这里是想保留6位小数点 loga = '%.6f' % longitude lata = '%.6f' % latitude return loga, latadef delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush()、""" if err is not None: #print('Message delivery failed: {}'.format(err)) with open('failed.log','a') as fobj: fobj.write(str(err)+'n') else: #print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) with open('Message_delivery.log','a') as fobj: fobj.write('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())+'n')def c_kafka(*args, **kwds): processId = os.getpid() print('Process', processId) topic,pool_data,sleep_time,count,lock,log_grade = args log = logs(log_grade, __name__) starttotal = timeit.default_timer() try: p = Producer(kwds) except Exception as e: log.error('Process- {} -kafka:连接失败-{}'.format(processId,e)) for i in range(count): start = timeit.default_timer() nowtime = time.time() dateArray = datetime.datetime.fromtimestamp(nowtime) nowtimeapi = dateArray.strftime("%Y-%m-%d %H:%M:%S") ip = CreateIp() prov = random.sample( ['10', '11', '13', '17', '18', '19', '30', '31', '34', '36', '38', '50', '51', '59', '70', '71', '74', '75', '76', '79', '81', '83', '84', '85', '86', '87', '88', '89', '90', '91', '97'], 1)[0] roam_type = random.randint(0, 1) cdr = { "titleName": "01", "mobile": create_phone(), "provinceCode": "013", "cityCode": "130", "netType": "112_3001", "loginTime": nowtime, "loginType": "01", "loginState": "01", "imei": '0'+create_phone(), "userIp": ip.create_ipv4(roam_type, prov), "appid": "ppp111", "iccid": "ppp111", "imsi": "ppp111", "mac": "F4:BF:80:0E:25:6F", "meid": "ppp111", "lat": generate_random_gps()[1], "lon": generate_random_gps()[0], "deviceBrand": "HUAWEI", "deviceModel": "HMA-AL00", "os": "android", "osVersion": "android8.2biiiopp", "screen": "2244*1080", "memorySpace": "1.61 GB", "phoneSpace": "52.92 GB", "version": "android@8.2buuupouu" } api_data = { "UNI_BSS_HEAD": { "APP_ID": "tyfkpAPPID", "TIMESTAMP": "{}.429".format(nowtimeapi), "TRANS_ID": "2021811061409363562951", "TOKEN": "6dc6f60246bf79cfc2c513fea5194402" }, "UNI_BSS_BODY": { "LOGIN_USER_RISK_CONTROL_REQ": { "USER_ID": cdr["mobile"], "RISK_CONTROL_CODE": "PloyEventIdCE001", "HANDLE_TIME": nowtimeapi, "USER_IP": cdr["userIp"], "LON": cdr["lon"], "LAT": cdr["lat"], "IMEI":cdr["imei"] } } } # json_cdr = json.dumps(cdr,indent = 4) json_api = json.dumps(api_data, indent=4) log.info('Process- {} -Time consuming data construction:{}'.format(processId,timeit.default_timer() - start)) p.poll(0) p.produce(topic, json.dumps(cdr).encode('utf-8'), callback=delivery_report) putkafkatime = timeit.default_timer() json_api = { 'key': [putkafkatime, json_api] } if sleep_time > 0: p.flush() pool_data.put_nowait(json_api) time.sleep(int(sleep_time)) else: p.flush() pool_data.put_nowait(json_api) end = timeit.default_timer() - starttotal return [processId, end, count]class Creat_Thread(threading.Thread): def __init__(self, t_msg, pool_data, lock, url, sleeptime,threadsleeptime,log_grade): threading.Thread.__init__(self) self.t_msg = t_msg self.pool_data = pool_data self.lock = lock self.url = url self.sleeptime = sleeptime self.threadsleeptime = threadsleeptime self.log = logs(log_grade, __name__) def run(self): start = timeit.default_timer() count = 0 while True: count += 1 if self.pool_data.empty(): if count <= self.threadsleeptime: self.log.warning('Thread- {} - Wait for 1 second when the queue is empty'.format(self.t_msg)) time.sleep(1) continue else: self.log.error('Thread- {} - All items have been taken off the queue'.format(self.t_msg)) break else: try: dataFromQueue = self.pool_data.get_nowait() except Exception as e: pass else: end1 = timeit.default_timer() if (end1 - dataFromQueue['key'][0]) >= self.sleeptime: self.log.warning('Thread- {} - Call interval:{}'.format(self.t_msg,(end - dataFromQueue['key'][0]))) headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2' } end = timeit.default_timer() try: rsp = requests.post(url=self.url, data=dataFromQueue['key'][1], headers=headers, verify=False,timeout=10) # 请求 except Exception as e: self.log.info('Thread- {} - Interface call failed:{}'.format(self.t_msg,e)) self.log.info('Thread- {} - Asynchronous call time consuming:{}'.format(self.t_msg, ( timeit.default_timer() - end))) rsp_json = json.dumps(rsp.json(), indent=4, ensure_ascii=False) # 响应文本json化 # print(dataFromQueue['key'][1]) print(rsp_json) response = { 'msg':dataFromQueue['key'][1], 'rsp':rsp.json() } with open('response', 'a') as fobj: fobj.write(json.dumps(response, indent=4) + 'n') else: self.pool_data.put_nowait(dataFromQueue) self.log.warning('Thread- {} - Rewrite queue'.format(self.t_msg)) self.log.error('Thread- {} - Total asynchronous call time:{}'.format(self.t_msg,timeit.default_timer() - start))def thread_run(thread_num,pool_data,lock,url,sleeptime,threadsleeptime,log_grade): t_msg = 0 threads = [] for tName in range(thread_num): t_msg += 1 thread = Creat_Thread(t_msg,pool_data,lock,url,sleeptime,threadsleeptime,log_grade) thread.start() threads.append(thread)if __name__ == "__main__": fake = Faker(locale='zh_CN') print('父进程', os.getpid()) print('Resource preparation in progress') # -------------------------------日志级别-----------------------------------# log_grade = 'DEBUG' # -------------------------------进程配置-----------------------------------# count_ms = 200 # 消息数 processes = 10 # 进程数 至少为2 count = int(count_ms / (processes - 1)) remainder = count_ms % (processes - 1) manager = mp.Manager() pool_data = manager.Queue() lock = manager.Lock() # 初始化一把锁 # -------------------------------kafka配置-----------------------------------# sleep_time = 0 # 推Kafka休眠 topic = 'topicname' conf = { "bootstrap.servers": "192.18.0.82:3007", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanisms": "SCRAM-SHA-256", "sasl.username": "test", "sasl.password": "text", 'queue.buffering.max.kbytes': 2000000, 'queue.buffering.max.messages': 1000000 } #-----------------------------接口线程调用配置--------------------------------# thread_num = 100 # 调用API资源线程数 url = 'http://192.168.1.1:5555/' #生产 sleeptime = 5 #设置 等待调用接口时间 threadsleeptime = 10 #线程等待时间 # 创建新线程 threads = [] t_msg = 0 # 自定义线程号 Python的太难获取 #--------------------------------------------------------------------------# # 创建进程 result = [] pool = mp.Pool(processes=processes) # processes_num 进程池数 for p_name in range(processes-1): if (p_name+1) == processes-1 and remainder != 0: count += remainder result.append(pool.apply_async(func=c_kafka, args=(topic, pool_data, sleep_time,count,lock,log_grade),kwds={**conf})) else: result.append(pool.apply_async(func=c_kafka, args=(topic, pool_data, sleep_time,count,lock,log_grade), kwds={**conf})) result.append(pool.apply_async(func=thread_run, args=(thread_num,pool_data,lock,url,sleeptime,threadsleeptime,log_grade),)) pool.close() pool.join() for res in range(len(result)-1): process, rtime ,count = result[res].get() print("process(%s) done、--Running time: %s Seconds" % (process, rtime),count)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。