第 5 章 物流大数据分析案例 5.1-1 视频讲解 ..5.1运输路线优化 5.1.1案例背景 随着全球化贸易的发展和电子商务的兴起,物流行业迎来前所未有的 发展机遇。然而,这一增长同时伴随着运营效率、成本控制、环境可持续性 等方面的挑战。 某物流公司需要为其客户提供快速、准时、低成本的商品配送服务,拥 有多个仓库和多个配送中心,每个仓库和配送中心之间有一定的运输成本 和时间,每个配送中心有一定数量的车辆,每辆车有一定的载重量和行驶速 度,每个客户有一定的商品需求量和送达时间窗,即客户只能在规定的时间 范围内接收商品。该平台的目标是在满足客户需求的前提下,安排合理的 仓库选择、配送中心分配和车辆行驶路线,使得总的运输成本和总的行驶时 间最小,从而提高客户满意度和行业竞争力。 该项目属于车辆路径问题(VehicleRoutingProblem,VRP )。为了解决 该问题,该公司决定采用两种基于智能算法的方法,分别是遗传算法 (GeneticAlgorithm,GA)和蚁群优化算法(AntColonyOptimization,ACO )。 这两种算法都是模拟自然界中生物的进化或行为的启发式算法,具有较强 的全局搜索能力和自适应能力,能够在较短的时间内找到问题的较优解或 近似最优解。 遗传算法是模拟自然选择和遗传机制的一种算法,它通过对一组可行 解(称为个体)进行复制、交叉和变异等操作,产生新的一组可行解(称为子 代), 并根据适应度函数(即目标函数)进行选择,保留较优的个体,淘汰较差 的个体,从而实现优良基因的传承和累积,逐渐逼近最优解。 蚁群优化算法是模拟蚂蚁在寻找食物过程中的信息交流和协作的一种 算法,它通过对一组可行解(称为蚂蚁)进行迭代式的搜索,利用信息素的正 反馈机制,增强优良解的搜索概率,减弱劣质解的搜索概率,从而实现全局 最优解的搜索。 1 40 大数据分析基础及应用案例(微课版) 5.1.2 数据集介绍 假定物流公司需要从配送中心送货到4个不同的客户地点,然后返回配送中心。 地点间的距离以矩阵形式给出,其中,矩阵的每个元素代表相应地点间的距离(单 位:km)。为简化问题,假设距离矩阵是对称的,即从地点A 到地点B的距离等于从 地点B到地点A 的距离。 以下是一个示例距离矩阵,如表5.1所示,其中包含1个配送中心(0号点)和4 个客户地点(1~4号点)。 表5.1 配送地点距离矩阵 0 号点1 号点2 号点3 号点4 号点 0号点0 10 15 20 25 1号点10 0 35 25 30 2号点15 35 0 30 20 3号点20 25 30 0 18 4号点25 30 20 18 0 5.1.3 分析过程与代码实现 1.遗传算法代码实现 用于解决物流路线优化问题的遗传算法是一种基于生物进化理论的优化方法, 通过模拟生物进化过程中的选择、交叉和变异等操作来搜索最优解。其目标是找到 一条最优的路线,使得经过所有城市后的总行驶距离最短。 首先,定义遗传算法的参数,包括种群大小、精英个体数量、变异率和迭代次数 等。然后,定义一系列操作,包括创建随机路线、初始化种群、评估种群中每条路线 的适应度(即总行驶距离)、选择优秀个体、交叉配对、变异等。接着,通过运行遗传 算法,生成一系列代的种群,并根据总行驶距离对种群进行排序和筛选,逐步迭代直 到达到指定的迭代次数。之后,返回找到的最优路线及其对应的总行驶距离。最 后,基于具体案例,通过给定的距离矩阵和参数设置,运行遗传算法找到最优的路 线,并输出最优路线和总行驶距离。具体代码如下。 In[]: import numpy as np import random class GeneticAlgorithm: """遗传算法求解物流路线优化问题""" def _ _ init _ _ ( self, distance _ matrix, pop _ size, elite _ size, mutation_rate, generations): 第5 章 物流大数据分析案例1 41 """ distance_matrix:距离矩阵 pop_size:种群大小 elite_size:精英个体数量 mutation_rate:变异率 generations:迭代代数 """ self.distance_matrix = distance_matrix self.pop_size = pop_size self.elite_size = elite_size self.mutation_rate = mutation_rate self.generations = generations def create_route(self): """创建一个随机路线""" route = list(range(1, len(self.distance_matrix))) random.shuffle(route) route.insert(0, 0) route.append(0) return route def initial_population(self): """初始化种群""" population = [] for _ in range(self.pop_size): population.append(self.create_route()) return population def route_distance(self, route): """计算路线的总距离""" distance = 0 for i in range(len(route) - 1): distance + = self.distance_matrix[route[i]][route[i+ 1]] return distance def rank_routes(self, population): """评估种群,根据路线总距离排序""" fitness_results = {} for i in range(len(population)): fitness_results[i] = self.route_distance(population[i]) return sorted(fitness_results.items(), key= lambda x: x[1]) def selection(self, ranked_population): """选择(轮盘赌算法)""" selection_results = [] 1 42 大数据分析基础及应用案例(微课版) df = sum([1 / (item[1] + 1) for item in ranked_population]) probabilities = [1 / (ranked_population[i][1] + 1) / df for i in range(len(ranked_population))] cum_sum = np.cumsum(probabilities) for i in range(self.elite_size): selection_results.append(ranked_population[i][0]) for _ in range(len(ranked_population) - self.elite_size): pick = random.random() for i in range(len(ranked_population)): if pick < = cum_sum[i]: selection_results.append(ranked_population[i][0]) break return selection_results def mating_pool(self, population, selection_results): """创建交叉配对池""" pool = [] for i in range(len(selection_results)): index = selection_results[i] pool.append(population[index]) return pool def breed(self, parent1, parent2): """交叉(配对)""" child = [] childP1 = [] childP2 = [] geneA = int(random.random() * len(parent1)) geneB = int(random.random() * len(parent1)) start_gene = min(geneA, geneB) end_gene = max(geneA, geneB) for i in range(start_gene, end_gene): childP1.append(parent1[i]) childP2 = [item for item in parent2 if item not in childP1] child = childP1 + childP2 child.insert(0, 0) child.append(0) return child def mutate(self, individual): """变异""" for swapped in range(1, len(individual) - 1): if(random.random() < self.mutation_rate): 第5 章 物流大数据分析案例1 43 swapWith = int(random.random() * len(individual)) if swapWith = = 0 or swapWith = = len(individual)- 1: continue individual[swapped], individual[swapWith] = individual [swapWith], individual[swapped] return individual def next_generation(self, current_gen): """生成下一代""" ranked_pop = self.rank_routes(current_gen) selection_results = self.selection(ranked_pop) pool = self.mating_pool(current_gen, selection_results) children = [] length = len(pool) - self.elite_size for i in range(self.elite_size): children.append(pool[i]) for i in range(length): child = self.breed(pool[i], pool[len(pool)- i- 1]) children.append(self.mutate(child)) return children def run(self): """运行遗传算法""" population = self.initial_population() for _ in range(self.generations): population = self.next_generation(population) best_route_index = self.rank_routes(population)[0][0] best_route = population[best_route_index] return best_route, self.route_distance(best_route) In[]: # 距离矩阵 distance_matrix = np.array([ [0, 10, 15, 20, 25], [10, 0, 35, 25, 30], [15, 35, 0, 30, 20], [20, 25, 30, 0, 18], [25, 30, 20, 18, 0] ]) # 参数设置 ga = GeneticAlgorithm(distance_matrix= distance_matrix, pop_size= 100, 1 44 大数据分析基础及应用案例(微课版) elite_size= 20, mutation_rate= 0.01, generations= 500) # 运行遗传算法 best_route, best_distance = ga.run() print(f"最优路线: {best_route}\n 总行驶距离: {best_distance}") Out[]: 最优路线: [0, 2, 4, 3, 1, 0] 总行驶距离: 88 遗传算法的运行结果会因随机性而有所不同,但它应能找到一条相对较短的路 径,满足从配送中心出发,访问所有客户地点后返回配送中心的要求。 这意味着算法找到一条总行驶距离为88km 的配送路线。请注意,由于遗传算 法的随机性,每次运行得到的最优路线和最短距离可能会有所不同。 5.1-2 视频讲解 2.蚁群优化算法代码实现 蚁群优化算法模拟蚂蚁在寻找食物时的行为。算法首先初始化一些蚂蚁,并在 每次迭代中让这些蚂蚁通过路径选择策略逐步探索路径。在路径选择时,蚂蚁倾向 于选择信息素浓度高、距离较短的路径,这种选择策略既考虑信息素的吸引力,也考 虑路径的实际距离。 在每次迭代后,算法根据蚂蚁的搜索结果更新信息素浓度,并根据信息素的更新 来调整蚂蚁的路径选择策略。通过不断迭代和信息素的更新,算法最终能够找到一 条较优的路径,即最短路径,以及对应的总行驶距离。 通过给定的距离矩阵和算法参数,初始化蚁群优化算法,并运行该算法以求解问 题。最终输出找到的最短路径和对应的总行驶距离。具体代码如下。 In[]: import numpy as np class AntColonyOptimizer: def __init__(self, distances, n_ants, n_iterations, decay, alpha= 1, beta= 2): """ 初始化蚁群优化算法 distances: 距离矩阵 pheromone: 信息素矩阵 all_inds: 所有节点的索引列表 n_ants: 蚂蚁数量 n_iterations: 迭代次数 decay: 信息素衰减率 alpha: 信息素重要程度因子 beta: 距离重要程度因子 """ 第5 章 物流大数据分析案例1 45 self.distances = distances self.pheromone = np.ones(self.distances.shape) / len(distances) self.all_inds = range(len(distances)) self.n_ants = n_ants self.n_iterations = n_iterations self.decay = decay self.alpha = alpha self.beta = beta def run(self): """执行蚁群优化算法""" shortest_path = None best_cost = float('inf') for _ in range(self.n_iterations): all_paths = self.gen_all_paths() self.spread_pheromone(all_paths) shortest_path, best_cost = min(all_paths, key= lambda x: x [1], default=(None, float('inf'))) self.pheromone *= self.decay return shortest_path, best_cost def gen_all_paths(self): """生成所有蚂蚁的路径""" all_paths = [] for _ in range(self.n_ants): path = [0] while len(path) < len(self.distances): move = self.pick_move(path[- 1], path) path.append(move) path.append(0) # 返回起点 all_paths.append((path, self.path_cost(path))) return all_paths def path_cost(self, path): """计算给定路径的总成本,即路径上所有边的距离之和""" return sum([self.distances[path[i], path[i+ 1]] for i in range (len(path)- 1)]) def pick_move(self, current, visited): """基于当前节点的信息素浓度和距离,以概率方式选择下一个节点""" probs = self.pheromone[current] ** self.alpha * ((1.0 / self. distances[current]) ** self.beta) probs[visited] = 0 probs /= probs.sum() 1 46 大数据分析基础及应用案例(微课版) next_move = np.random.choice(self.all_inds, 1, p= probs)[0] return next_move def spread_pheromone(self, all_paths): """根据找到的所有路径更新信息素浓度""" for path, cost in all_paths: for move in range(len(path)- 1): self.pheromone[path[move], path[move+ 1]] + = 1.0 / cost In[]: # 距离矩阵 distances = np.array([ [0, 10, 15, 20, 25], [10, 0, 35, 25, 30], [15, 35, 0, 30, 20], [20, 25, 30, 0, 18], [25, 30, 20, 18, 0] ]) # 初始化并运行蚁群优化算法 aco = AntColonyOptimizer(distances, n_ants= 5, n_iterations= 100, decay = 0.95, alpha= 1, beta= 2) shortest_path, best_cost = aco.run() print(f"最短路径: {shortest_path}\n 总行驶距离: {best_cost}") Out[]: 最短路径: [0, 2, 4, 3, 1, 0] 总行驶距离: 88 由于算法中存在随机性,每次运行结果可能不同。这意味着蚁群优化算法找到 一条总行驶距离为88km 的最短路径,该路径覆盖从配送中心出发,访问所有客户地 点一次后返回配送中心的要求。 5.1.4 小结 在本案例中,通过遗传算法,成功地解决了一家物流公司面临的配送路线优化挑 战。这不仅极大地提升了配送的效率,还在显著降低运输成本方面取得突破。此项 目案例充分展示了遗传算法在处理复杂的优化问题上的出色表现,为物流领域的数 据分析和优化工作提供了极具价值的参考。 此外,还采用蚁群优化算法来进一步解决路线优化的问题,成功地确定最短的配 送路径。这不仅最小化了总行驶距离,也显著提高了整体物流效率。这个案例证明 启发式算法在解决实际物流问题中的巨大潜力,为物流数据分析和车队管理提供了 一种高效的解决方案。 第5 章 物流大数据分析案例1 47 5.2 视频讲解 .. 5.2 智能仓库管理 5.2.1 案例背景 随着全球电子商务的迅猛发展,物流行业正经历前所未有的转型,其中,智能仓 库管理系统成为提升效率、降低成本、增强客户满意度的关键技术。智能仓库利用 自动化设备和先进的信息技术,实现货物的快速入库、存储、拣选和出库,以应对日 益增长的订单处理需求。 尽管智能仓库带来诸多好处,但也面临着不少挑战。其中之一便是如何高效地 管理庞大的仓库数据,包括货物的种类、体积、重量、存取频率等,以及如何基于这些 数据做出快速准确的决策,如货物的最佳存储位置、存储策略的优化等。这些决策 直接关系到仓库的运营效率和成本控制。 机器学习技术中的决策树和随机森林算法,为智能仓库管理提供了强大的数据 分析能力。通过分析历史数据,这些算法可以预测货物的最佳存储区域,甚至预测 未来的库存需求,从而指导仓库布局优化、库存管理和订单处理等决策过程。 5.2.2 数据集介绍 本案例构建一个模拟的数据集,包含若干条货物记录,每条记录包括货物类型 (Product_Type)、体积(Volume)、重量(Weight)、到达仓库时是星期几(Arrival_ Weekday)、周转率(Turnover_ Rate)等特征,以及货物实际存储的仓库区域 (Warehouse_Section)。这个数据集旨在模拟真实的仓库运营情况,为算法提供训练 和测试的基础。 5.2.3 分析过程与代码实现 1.导入工具库及模拟数据集 通过函数generate_dataset()生成一个包含30条记录的数据集。该数据集用于 模拟货物的属性和仓库的管理情况。在生成数据集时,首先设定货物类型、到达时 是星期几和仓库区域的取值范围,分别表示货物的种类、到达仓库的时间和存放在 仓库的区域。然后,通过循环生成指定数量的记录,并在每次循环中随机选择货物 类型、到达时是星期几,根据货物类型分配体积、重量,根据体积和重量分配仓库区 域,并随机生成周转率。之后,将每次生成的记录以字典的形式添加到数据集中,并 返回生成的数据集。最终,将数据集转换为Pandas的DataFrame格式,并输出。 这样生成的数据集可以用于模拟和分析货物的属性、到达时间以及仓库的管理 情况,有助于进行物流规划、仓库管理等相关工作的研究和实践。具体代码如下。 In[]: import random 1 48 大数据分析基础及应用案例(微课版) import pandas as pd # 设定货物类型、到达时是星期几和仓库区域的取值 product_types = ['A', 'B', 'C', 'D', 'E'] arrival_weekdays = [1, 2, 3, 4, 5, 6, 7] warehouse_sections = ['Section_1', 'Section_2', 'Section_3'] # 生成数据集的函数 def generate_dataset(num_records): dataset = [] # 遍历生成指定数量的记录 for _ in range(num_records): # 随机选择货物类型 product_type = random.choice(product_types) # 根据货物类型分配体积和重量 if product_type in ['A', 'B']: volume = round(random.uniform(10, 20), 2) weight = round(random.uniform(5, 10), 2) elif product_type in ['C', 'D']: volume = round(random.uniform(15, 40), 2) weight = round(random.uniform(8, 20), 2) else: volume = round(random.uniform(35, 60), 2) weight = round(random.uniform(15, 30), 2) # 随机选择到达时是星期几 arrival_weekday = random.choice(arrival_weekdays) # 根据体积和重量分配仓库区域 if volume < 25 and weight < 15: warehouse_section = 'Section_1' elif volume < 45 and weight < 25: warehouse_section = 'Section_2' else: warehouse_section = 'Section_3' # 随机生成周转率 turnover_rate = round(random.uniform(0.3, 1.0), 2) # 将记录添加到数据集中 dataset.append({ 'Product_Type': product_type, 'Volume': volume, 'Weight': weight, 'Arrival_Weekday': arrival_weekday, 'Turnover_Rate': turnover_rate,