OpenIM Mongo 替换 Mysql

目标

完成 openim-rpc-friend 的 Mysql -> Mongo 的替换。

有些接口实现了,没有调用就可以删了

阅读

type conversationServer struct {
	groupRpcClient                 *rpcclient.GroupRpcClient
	conversationDatabase           controller.ConversationDatabase
	conversationNotificationSender *notification.ConversationNotificationSender
}
  1. groupRpcClient: 这个字段是指向 rpcclient.GroupRpcClient 类型的指针。在Go中,指针是用于存储另一个变量的内存地址的变量。rpcclient.GroupRpcClient 很可能是一个用于RPC(远程过程调用)通信的客户端类型。它可能被用来与远程服务或服务器组进行交互。
  2. conversationDatabase: 这个字段的类型是 controller.ConversationDatabase。这表明 conversationDatabase 是用来处理与会话数据库相关的操作的。这可能包括查询、更新、删除会话数据等。
  3. conversationNotificationSender: 这是指向 notification.ConversationNotificationSender 类型的指针。这个字段很可能被用来发送与会话相关的通知。例如,在会话中添加新消息时,可能需要通知相关用户。
  4. notification: 这个字段并不直接出现在结构体定义中,但从 conversationNotificationSender 字段可以推测,notification 可能是一个包含了与通知发送相关功能的包(package)。

start 初始化和启动与会话(如聊天应用中的会话)相关的服务。

func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
    // db, err := relation.NewGormDB(): 初始化一个GORM数据库实例。GORM是一个流行的Go ORM库,用于与SQL数据库交互。
	db, err := relation.NewGormDB()
	if err != nil {
		return err
	}
    
    // 使用GORM的自动迁移功能来创建或更新会话模型的数据库表
	if err := db.AutoMigrate(&tablerelation.ConversationModel{}); err != nil {
		return err
	}
    
    // 初始化一个Redis缓存实例。Redis通常用于快速缓存数据。
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}
    
    // 初始化一个MongoDB数据库实例。
	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}
    
    // 使用MongoDB创建一个专门处理会话数据的数据库实例。
	conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}
    
    // 初始化一个用于群组操作的RPC客户端。
	groupRpcClient := rpcclient.NewGroupRpcClient(client)
    
    // 初始化一个用于消息操作的RPC客户端。
	msgRpcClient := rpcclient.NewMessageRpcClient(client)
    
    // 在gRPC服务器上注册一个新的会话服务,传入之前初始化的各种组件。
	pbconversation.RegisterConversationServer(server, &conversationServer{
		conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient),
		groupRpcClient:                 &groupRpcClient,
		conversationDatabase:           controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)),
	})
	return nil
}

详细介绍 GORM 的 AutoMigrate

GORM 的 AutoMigrate 方法是一个在Go语言中使用GORM库时非常有用的功能,主要用于数据库迁移。以下是对这个方法的详细解析:

功能

  • 自动迁移AutoMigrate 会自动根据你的Go结构体(通常用于表示数据库中的表)来创建或修改数据库表。这意味着它可以根据结构体的定义来创建新的表,或者根据结构体的修改来调整现有的表(如添加、删除或修改字段)。

工作原理

  • 与结构体同步AutoMigrate 接收一个或多个结构体作为参数,并根据这些结构体的定义在数据库中创建或更新表。例如,如果你有一个如下所示的结构体:

    type User struct {
        gorm.Model
        Name string
        Age  int
    }
    

    AutoMigrate(&User{}) 会在数据库中创建一个名为 users 的表(GORM默认使用结构体名称的复数形式作为表名),其中包含 nameage 字段,以及GORM模型中的默认字段(如 ID, CreatedAt, UpdatedAt, DeletedAt)。

  • 安全性AutoMigrate 通常只会添加缺失的字段、创建缺失的索引或外键,并不会删除或修改现有字段。这意味着它是一个相对安全的操作,不会导致数据丢失。

使用场景

  • 应用程序启动时:通常在应用程序启动时调用 AutoMigrate,以确保数据库结构与应用程序的最新结构体定义保持同步。
  • 开发过程中:在开发过程中,当你修改了结构体的定义后,可以使用 AutoMigrate 来快速更新数据库结构,而不需要手动进行数据库迁移。

注意事项

  • 不是数据库版本控制工具:虽然 AutoMigrate 很方便,但它不应该被视为一个完整的数据库版本控制解决方案。对于更复杂的数据库迁移和版本控制,可能需要使用专门的迁移工具,如 golang-migrate/migrate
  • 谨慎使用:在生产环境中使用 AutoMigrate 时应格外小心,因为错误的使用可能会导致数据丢失或表结构的意外变化。

初始化 mongo

// NewMongo Initialize MongoDB connection.
func NewMongo() (*Mongo, error) {
	specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
	uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
	if config.Config.Mongo.Uri != "" {
		uri = config.Config.Mongo.Uri
	} else {
		mongodbHosts := ""
		for i, v := range config.Config.Mongo.Address {
			if i == len(config.Config.Mongo.Address)-1 {
				mongodbHosts += v
			} else {
				mongodbHosts += v + ","
			}
		}
		if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
			uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
				config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
				config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
		} else {
			uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
				mongodbHosts, config.Config.Mongo.Database,
				config.Config.Mongo.MaxPoolSize)
		}
	}
	fmt.Println("mongo:", uri)
	var mongoClient *mongo.Client
	var err error = nil
	for i := 0; i <= maxRetry; i++ {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
		defer cancel()
		mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
		if err == nil {
			return &Mongo{db: mongoClient}, nil
		}
		if cmdErr, ok := err.(mongo.CommandError); ok {
			if cmdErr.Code == 13 || cmdErr.Code == 18 {
				return nil, err
			} else {
				fmt.Printf("Failed to connect to MongoDB: %s\n", err)
			}
		}
	}
	return nil, err
}
  • specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound): 这行代码将MongoDB的 ErrNoDocuments 错误替换为自定义错误 ErrRecordNotFound。这通常用于统一错误处理。

  • 在 URL 构建中

    • 函数首先定义了一个默认的MongoDB URI。
    • 如果 config.Config.Mongo.Uri 非空,则使用这个作为URI。
    • 如果为空,则根据配置 (config.Config.Mongo.Address 等) 构建URI。如果提供了用户名和密码,则在URI中包含这些凭据;否则,不包含。
  • 数据库尝试链接

    • 使用 for 循环和重试逻辑(由 maxRetry 控制)来尝试连接数据库。
    • context.WithTimeout(context.Background(), time.Second*10): 创建一个最多10秒的超时上下文,确保连接尝试不会无限期等待。
    • mongo.Connect(ctx, options.Client().ApplyURI(uri)): 尝试使用上面构建的URI连接MongoDB。
    • 如果连接成功,返回一个封装了 mongoClientMongo 实例。

解析下面的PopulateGroupMember

func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*relation.GroupMemberModel) error {
	if len(members) == 0 {
		return nil
	}
	emptyUserIDs := make(map[string]struct{})
	for _, member := range members {
		if member.Nickname == "" || member.FaceURL == "" {
			emptyUserIDs[member.UserID] = struct{}{}
		}
	}
	if len(emptyUserIDs) > 0 {
		users, err := g.getUsersInfo(ctx, utils.Keys(emptyUserIDs))
		if err != nil {
			return err
		}
		userMap := make(map[string]CommonUser)
		for i, user := range users {
			userMap[user.GetUserID()] = users[i]
		}
		for i, member := range members {
			user, ok := userMap[member.UserID]
			if !ok {
				continue
			}
			if member.Nickname == "" {
				members[i].Nickname = user.GetNickname()
			}
			if member.FaceURL == "" {
				members[i].FaceURL = user.GetFaceURL()
			}
		}
	}
	return nil
}

其作用是在一个群组通知发送器(GroupNotificationSender)中填充群组成员的信息。代码的主要逻辑如下:

  1. 方法定义PopulateGroupMemberGroupNotificationSender类型的一个方法,它接受一个context.Context和多个*relation.GroupMemberModel类型的参数。
  2. 检查成员列表是否为空:首先检查传入的群组成员(members)是否为空。如果没有成员,则直接返回,不执行任何操作。
  3. 查找空信息的用户:使用一个循环遍历所有成员。如果成员的昵称(Nickname)或头像URL(FaceURL)为空,则将其用户ID(UserID)添加到一个名为emptyUserIDs的map中。这个map用于存储需要更新信息的用户ID。
  4. 获取用户信息:如果emptyUserIDs中有数据,即存在需要更新信息的用户,就调用g.getUsersInfo方法,获取这些用户的详细信息。这个方法需要上下文(ctx)和用户ID列表。
  5. 错误处理:如果在获取用户信息过程中出现错误,该方法会返回相应的错误。
  6. 更新成员信息:使用获取到的用户信息更新群组成员的信息。首先,创建一个新的map(userMap),用于将用户ID映射到用户信息。然后,遍历每个成员,如果在userMap中找到对应的用户信息,并且该成员的昵称或头像URL为空,则用获取到的用户信息进行更新。
  7. 返回:最后,如果整个过程顺利,该方法返回nil,表示没有错误发生。

主要的变化部分

所有的 interface{} 都变成了 any

conversationDB := relation.NewConversationGorm(db)

替换为:

	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}
	conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}

User 中:

以前的逻辑:

userDB := relation.NewUserGorm(db)

更新为:

userDB, err := newmgo.NewUserMongo(mongo.GetDatabase())
if err != nil {
	return err
}

以前的逻辑:

user := convert.UserPb2DB(req.UserInfo)
if err != nil {
	return nil, err
}
err = s.Update(ctx, user)
if err != nil {
	return nil, err
}

更新为:

data := convert.UserPb2DBMap(req.UserInfo)
if len(data) == 0 {
	return nil, errs.ErrArgs.Wrap("no data to update")
}
if err := s.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
	return nil, err
}

其中 UserPb2DB 函数:

func UserPb2DB(user *sdkws.UserInfo) *relationtb.UserModel {
	var userDB relationtb.UserModel
	userDB.UserID = user.UserID
	userDB.Nickname = user.Nickname
	userDB.FaceURL = user.FaceURL
	userDB.Ex = user.Ex
	userDB.CreateTime = time.UnixMilli(user.CreateTime)
	userDB.AppMangerLevel = user.AppMangerLevel
	userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt
	return &userDB
}

其中 UserPb2DBMap 函数的逻辑:

func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
	if user == nil {
		return nil
	}
	val := make(map[string]any)
	if user.Nickname != "" {
		val["nickname"] = user.Nickname
	}
	if user.FaceURL != "" {
		val["face_url"] = user.FaceURL
	}
	if user.Ex != "" {
		val["ex"] = user.FaceURL
	}
	if user.AppMangerLevel != 0 {
		val["app_manger_level"] = user.AppMangerLevel
	}
	if user.GlobalRecvMsgOpt != 0 {
		val["global_recv_msg_opt"] = user.GlobalRecvMsgOpt
	}
	return val
}

最初,使用 relation.NewUserGorm(db) 建立到数据库的基于VMM的连接。

convert.UserPb2DB 函数将用户信息从协议缓冲区(protobuf)格式转换为与IBM兼容的数据库模型。

更新后的用户数据随后被传递给服务的 Update 方法。

代码现在使用 newmgo.NewUserMongo(mongo.GetDatabase()) 建立到MongoDB的连接。这表明从关系数据库方法(RIMM)到NoSQL方法(MongoDB)的转变。

更新效率:使用 UserPb2DBMap 的新逻辑允许部分更新,因为只有更改的字段才包含在映射中。这比关系模型更有效,因为关系模型可能需要更新整行。

以前 Mysql 的逻辑

我正在阅读和学习源码:

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}
	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}
	users := make([]*tablerelation.UserModel, 0)
	if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
		return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
	}
	for k, v := range config.Config.Manager.UserID {
		users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
	}
	userDB, err := newmgo.NewUserMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}
	tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
	if err != nil {
		return err
	}
	cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
	userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
	database := controller.NewUserDatabase(userDB, cache, tx, userMongoDB)
	friendRpcClient := rpcclient.NewFriendRpcClient(client)
	groupRpcClient := rpcclient.NewGroupRpcClient(client)
	msgRpcClient := rpcclient.NewMessageRpcClient(client)
	u := &userServer{
		UserDatabase:             database,
		RegisterCenter:           client,
		friendRpcClient:          &friendRpcClient,
		groupRpcClient:           &groupRpcClient,
		friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
		userNotificationSender:   notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
	}
	pbuser.RegisterUserServer(server, u)
	return u.UserDatabase.InitOnce(context.Background(), users)
}

帮我讲解这段源码的实现,步骤,以及设计实现,以及可以改进的思路

微服务或服务端应用程序的启动函数。它初始化各种服务和数据库连接,然后将服务注册到一个gRPC服务器。

代码分析

  1. 初始化Redis缓存: cache.NewRedis() 创建一个新的Redis缓存实例。
  2. 初始化MongoDB连接: unrelation.NewMongo() 用于创建和配置MongoDB连接。
  3. 检查用户ID和昵称配置: 代码确保配置中的用户ID数量与昵称数量相等,如果不相等,则返回错误。
  4. 构造用户模型: 遍历配置中的用户ID和昵称,创建用户模型数组。
  5. 创建用户数据库实例: newmgo.NewUserMongo(mongo.GetDatabase()) 用于初始化用户数据库操作的MongoDB实现。
  6. 创建事务处理器: tx2.NewAuto(context.Background(), mongo.GetClient()) 用于MongoDB的事务处理。
  7. 创建缓存实例: 结合Redis缓存和用户数据库,创建用户缓存。
  8. 创建用户MongoDB驱动: unrelation.NewUserMongoDriver(mongo.GetDatabase()) 用于操作用户数据。
  9. 初始化控制器: controller.NewUserDatabase(userDB, cache, tx, userMongoDB) 创建一个控制器,集成了用户数据库、缓存、事务处理器和MongoDB驱动。
  10. 初始化RPC客户端: 创建用于与好友、群组和消息服务通信的RPC客户端。
  11. 设置用户服务器: 创建一个用户服务器实例,并注册到gRPC服务器。
  12. 初始化用户数据库: 使用初始用户模型数据初始化数据库。

设计实现

  • 模块化设计: 每个功能(如数据库访问、缓存、RPC通信)都被分解为独立的模块,有助于代码的维护和扩展。
  • 错误处理: 在每个重要步骤之后都有错误检查,确保在出现问题时能够及时返回错误。
  • 配置验证: 检查配置的有效性,例如用户ID和昵称的数量匹配。

改进思路

  1. 配置和错误处理: 可以使用更先进的配置管理和错误处理库来简化代码和增强可读性。
  2. 依赖注入: 使用依赖注入框架可以更好地管理对象的创建和依赖关系,减少代码耦合。
  3. 日志记录: 在关键步骤中添加更详细的日志记录,有助于调试和跟踪问题。
  4. 代码重构: 某些代码块(例如RPC客户端初始化)可以进一步抽象和封装,以减少重复代码。
  5. 异步处理: 考虑在适当的地方引入异步处理,以提高性能和响应速度。
  6. 单元测试: 加强单元测试,确保代码的可靠性和未来的可维护性。
  7. 配置外部化: 将配置文件外部化,而不是硬编码在代码中,以增强灵活性和可配置性。

创建数据库事务

func NewAuto(ctx context.Context, cli *mongo.Client) (tx.CtxTx, error) {
	var res map[string]any
	if err := cli.Database("admin").RunCommand(ctx, bson.M{"isMaster": 1}).Decode(&res); err != nil {
		return nil, err
	}
	if _, ok := res["setName"]; ok {
		return NewMongoTx(cli), nil
	}
	return NewInvalidTx(), nil
}

具体针对MongoDB。让我们逐步分析这个函数:

函数 NewAuto 分析

  1. 函数签名:
    • NewAuto(ctx context.Context, cli *mongo.Client) (tx.CtxTx, error): 这个函数接收一个上下文(context.Context)和一个MongoDB客户端(*mongo.Client),返回一个事务处理器接口(tx.CtxTx)和一个错误(error)。
  2. 运行MongoDB命令:
    • cli.Database("admin").RunCommand(ctx, bson.M{"isMaster": 1}).Decode(&res): 这行代码向MongoDB的admin数据库发送一个命令(isMaster: 1),来检查当前连接的MongoDB节点是否是主节点(master)。这个命令的响应被解码到变量 res 中。
  3. 检查响应:
    • if _, ok := res["setName"]; ok { ... }: 检查响应中是否包含键 setNamesetName 存在表明当前MongoDB实例是一个复制集的一部分。
  4. 返回适当的事务处理器:
    • 如果是复制集,使用 NewMongoTx(cli) 返回一个适合MongoDB复制集的事务处理器。
    • 如果不是复制集,使用 NewInvalidTx() 返回一个无效的事务处理器。

设计意图

  • 动态检测: 通过检查MongoDB的配置(是否是复制集),动态地确定使用哪种类型的事务处理器。
  • 灵活性和兼容性: 代码可以处理不同的MongoDB部署配置(单节点或复制集),提高了通用性。

可能的改进

  1. 更详细的错误处理: 当 RunCommand 失败时,除了返回错误外,也可以记录更多关于错误的上下文信息,以便更好地理解错误发生的原因。
  2. 配置检查优化: 如果这个函数经常被调用,频繁地运行 isMaster 命令可能会影响性能。可以考虑缓存这个信息或者在初始化时确定这个设置。
  3. 增加日志记录: 在关键步骤(如命令执行、判断逻辑)添加日志记录,有助于调试和跟踪问题。
  4. 异常情况处理: 在无法确定事务处理器类型时,除了返回一个无效的事务处理器外,还可以考虑提供一种默认的处理策略或者更明确的错误提示。

启动 RPC

fnc (a *RpcCmd) StartSvr(name string, rpcFn func(discov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error,) error {
	if a.GetPortFlag() == 0 {
		return errors.New("port is required")
	}
	return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn)
}

函数 StartSvr 分析

  1. 函数签名:
    • func (a *RpcCmd) StartSvr(name string, rpcFn func(discoveryregistry.SvcDiscoveryRegistry, *grpc.Server) error) error: 这个函数是 RpcCmd 结构体的一个方法。它接收一个服务名称 name 和一个函数 rpcFn,这个函数自身接收一个服务发现注册表和一个gRPC服务器,并返回一个错误。StartSvr 函数也返回一个错误。
  2. 检查端口:
    • if a.GetPortFlag() == 0 { return errors.New("port is required") }: 这行代码检查是否已经指定了端口号(通过 GetPortFlag() 方法)。如果没有指定端口号(即端口为0),则返回一个错误,表明端口是必需的。
  3. 启动RPC服务:
    • return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn): 如果端口号存在,该行代码使用提供的端口号、服务名称、Prometheus端口(用于监控)和RPC函数来启动RPC服务。

设计意图

  • 参数验证: 通过检查端口号,确保在启动服务前所有必需的配置都已设置。
  • 模块化: 将RPC服务的启动逻辑封装在一个单独的函数中,有助于代码的重用和维护。
  • 灵活性: 通过将RPC启动逻辑作为一个函数参数传递,这个方法可以启动不同类型的RPC服务。

Mongo 中的事务

在一次业务实践中需要在 MongoDB 中使用自增 ID,而 MongoDB 本身并不支持自增 ID。我们需要通过一个单独的集合保存 ID,使用 FindOneAndUpdate$inc 操作符实现 ID 的自增.

然而此时需要操作两个集合,因 MongoDB 的原子性只是针对单文档的,故会出现 ID 增加而插入失败的情况。

好在 MongoDB 在 4.0 中,支持了副本集上的多文档事务,在版本 4.2 中,引入了分布式事务,这增加了对分片群集上的多文档事务的支持,并合并了对副本集上多文档事务的现有支持。

//开启一个会话
sess = db.getMongo().startSession()
//开启事务
sess.startTransaction()
//回滚
sess.abortTransaction()
//提交
sess.commitTransaction()
//结束会话
sess.endSession()

重构后的 Mongo 的逻辑

参考 user 模块 mysql 重构为 mongo 代码:

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
	db, err := relation.NewGormDB()
	if err != nil {
		return err
	}
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}
	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}
	if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil {
		return err
	}
	users := make([]*tablerelation.UserModel, 0)
	if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
		return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
	}
	for k, v := range config.Config.Manager.UserID {
		users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
	}
	userDB := relation.NewUserGorm(db)
	cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
	userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
	database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db), userMongoDB)
	friendRpcClient := rpcclient.NewFriendRpcClient(client)
	groupRpcClient := rpcclient.NewGroupRpcClient(client)
	msgRpcClient := rpcclient.NewMessageRpcClient(client)
	u := &userServer{
		UserDatabase:             database,
		RegisterCenter:           client,
		friendRpcClient:          &friendRpcClient,
		groupRpcClient:           &groupRpcClient,
		friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
		userNotificationSender:   notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
	}
	pbuser.RegisterUserServer(server, u)
	return u.UserDatabase.InitOnce(context.Background(), users)
}

下面是 user重构后的代码:

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
	db, err := relation.NewGormDB()
	if err != nil {
		return err
	}
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}
	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}
	if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil {
		return err
	}
	users := make([]*tablerelation.UserModel, 0)
	if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
		return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
	}
	for k, v := range config.Config.Manager.UserID {
		users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
	}
	userDB, err := newmgo.NewUserMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}
	cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
	userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
	database := controller.NewUserDatabase(userDB, cache, tx.NewMongo(mongo.GetClient()), userMongoDB)
	friendRpcClient := rpcclient.NewFriendRpcClient(client)
	groupRpcClient := rpcclient.NewGroupRpcClient(client)
	msgRpcClient := rpcclient.NewMessageRpcClient(client)
	u := &userServer{
		UserDatabase:             database,
		RegisterCenter:           client,
		friendRpcClient:          &friendRpcClient,
		groupRpcClient:           &groupRpcClient,
		friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
		userNotificationSender:   notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
	}
	pbuser.RegisterUserServer(server, u)
	return u.UserDatabase.InitOnce(context.Background(), users)
}

帮我模仿重构 friend 代码:

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
	db, err := relation.NewGormDB()
	if err != nil {
		return err
	}
	if err := db.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil {
		return err
	}
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}
	blackDB := relation.NewBlackGorm(db)
	friendDB := relation.NewFriendGorm(db)
	userRpcClient := rpcclient.NewUserRpcClient(client)
	msgRpcClient := rpcclient.NewMessageRpcClient(client)
	notificationSender := notification.NewFriendNotificationSender(
		&msgRpcClient,
		notification.WithRpcFunc(userRpcClient.GetUsersInfo),
	)
	pbfriend.RegisterFriendServer(server, &friendServer{
		friendDatabase: controller.NewFriendDatabase(
			friendDB,
			relation.NewFriendRequestGorm(db),
			cache.NewFriendCacheRedis(rdb, friendDB, cache.GetDefaultOpt()),
			tx.NewGorm(db),
		),
		blackDatabase: controller.NewBlackDatabase(
			blackDB,
			cache.NewBlackCacheRedis(rdb, blackDB, cache.GetDefaultOpt()),
		),
		userRpcClient:         &userRpcClient,
		notificationSender:    notificationSender,
		RegisterCenter:        client,
		conversationRpcClient: rpcclient.NewConversationRpcClient(client),
	})
	return nil
}

给出改进后的代码

速读一遍

func main() {
	rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer)
	rpcCmd.AddPortFlag()
	rpcCmd.AddPrometheusPortFlag()
	if err := rpcCmd.Exec(); err != nil {
		panic(err.Error())
	}
	if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil {
		panic(err.Error())
	}
}
  1. 创建RPC命令对象:

    rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer)
    

    这行代码调用了cmd包中的NewRpcCmd函数,创建了一个RPC命令对象。传递给这个函数的参数cmd.RpcUserServer可能是一个配置或者常量,用于指定RPC服务的类型或配置。

  2. 添加端口标志:

    rpcCmd.AddPortFlag()
    

    这行代码对rpcCmd对象调用AddPortFlag方法。这个方法可能用于添加或配置与RPC服务监听端口相关的命令行参数。

  3. 添加普罗米修斯端口标志:

    rpcCmd.AddPrometheusPortFlag()
    

    类似于上面的端口标志添加,这行代码可能是为了配置Prometheus(一个开源监控系统)端口的相关参数。

  4. 执行RPC命令:

    if err := rpcCmd.Exec(); err != nil {
        panic(err.Error())
    }
    

    在这里,Exec方法被调用以执行RPC命令。如果执行中出现错误,程序会触发panic,打印错误信息并终止。

  5. 启动RPC服务:

    if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil {
        panic(err.Error())
    }
    

    最后,StartSvr方法被调用来启动RPC服务。这个方法接受两个参数:一是配置中的RPC注册名(可能是服务名称),二是启动服务时调用的函数(在这个例子中是user.Start)。

继续来看启动函数:

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}
	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}
	users := make([]*tablerelation.UserModel, 0)
	if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
		return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
	}
	for k, v := range config.Config.Manager.UserID {
		users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
	}
	userDB, err := mgo.NewUserMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}
	tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
	if err != nil {
		return err
	}
	cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
	userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
	database := controller.NewUserDatabase(userDB, cache, tx, userMongoDB)
	friendRpcClient := rpcclient.NewFriendRpcClient(client)
	groupRpcClient := rpcclient.NewGroupRpcClient(client)
	msgRpcClient := rpcclient.NewMessageRpcClient(client)
	u := &userServer{
		UserDatabase:             database,
		RegisterCenter:           client,
		friendRpcClient:          &friendRpcClient,
		groupRpcClient:           &groupRpcClient,
		friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
		userNotificationSender:   notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
	}
	pbuser.RegisterUserServer(server, u)
	return u.UserDatabase.InitOnce(context.Background(), users)
}
  1. 初始化缓存和数据库连接:

    coderdb, err := cache.NewRedis()
    mongo, err := unrelation.NewMongo()
    

    这两行代码分别初始化Redis缓存和Mongo数据库的连接。如果其中任何一个初始化过程出现错误,函数将返回该错误。

  2. 配置管理员用户:

    users := make([]*tablerelation.UserModel, 0)
    ...
    for k, v := range config.Config.Manager.UserID {
        ...
    }
    

    这段代码创建了一个用户模型的切片,并基于配置中的管理员用户ID和昵称填充这个切片。它还检查ID和昵称数量是否匹配,不匹配则返回错误。

  3. 创建数据库和事务实例:

    userDB, err := mgo.NewUserMongo(mongo.GetDatabase())
    tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
    

    这里初始化了用户数据库的Mongo实例和一个自动事务处理器。

  4. 创建缓存和数据库访问层:

    cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
    userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
    database := controller.NewUserDatabase(userDB, cache, tx, userMongoDB)
    

    这部分代码设置了用户缓存,并创建了一个数据库访问层,它结合了Redis缓存、Mongo数据库和事务处理器。

  5. 初始化RPC客户端:

    friendRpcClient := rpcclient.NewFriendRpcClient(client)
    groupRpcClient := rpcclient.NewGroupRpcClient(client)
    msgRpcClient := rpcclient.NewMessageRpcClient(client)
    

    这里创建了几个RPC客户端,用于与其他服务(如朋友服务、群组服务和消息服务)进行通信。

  6. 配置用户服务:

    u := &userServer{
        ...
    }
    

    在这一步,创建了一个userServer实例,它封装了上面创建的所有组件和服务。

  7. 注册RPC服务和初始化数据库:

    pbuser.RegisterUserServer(server, u)
    return u.UserDatabase.InitOnce(context.Background(), users)
    

    最后,这个userServer实例被注册为一个RPC服务,并且调用InitOnce方法来初始化用户数据库。

Third

func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}
	logdb, err := mgo.NewLogMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}
	s3db, err := mgo.NewS3Mongo(mongo.GetDatabase())
	if err != nil {
		return err
	}
	apiURL := config.Config.Object.ApiURL
	if apiURL == "" {
		return fmt.Errorf("api url is empty")
	}
	if _, err := url.Parse(config.Config.Object.ApiURL); err != nil {
		return err
	}
	if apiURL[len(apiURL)-1] != '/' {
		apiURL += "/"
	}
	apiURL += "object/"
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}
	// 根据配置文件策略选择 oss 方式
	enable := config.Config.Object.Enable
	var o s3.Interface
	switch config.Config.Object.Enable {
	case "minio":
		o, err = minio.NewMinio(cache.NewMinioCache(rdb))
	case "cos":
		o, err = cos.NewCos()
	case "oss":
		o, err = oss.NewOSS()
	default:
		err = fmt.Errorf("invalid object enable: %s", enable)
	}
	if err != nil {
		return err
	}
	third.RegisterThirdServer(server, &thirdServer{
		apiURL:        apiURL,
		thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb), logdb),
		userRpcClient: rpcclient.NewUserRpcClient(client),
		s3dataBase:    controller.NewS3Database(rdb, o, s3db),
		defaultExpire: time.Hour * 24 * 7,
	})
	return nil
}
  1. 初始化Mongo数据库连接:

    mongo, err := unrelation.NewMongo()
    

    这一行初始化Mongo数据库的连接。如果出现错误,则函数返回该错误。

  2. 创建日志和S3数据库实例:

    logdb, err := mgo.NewLogMongo(mongo.GetDatabase())
    s3db, err := mgo.NewS3Mongo(mongo.GetDatabase())
    

    这两行代码分别初始化日志数据库和S3数据库的实例。

  3. 配置和验证API URL:

    apiURL := config.Config.Object.ApiURL
    ...
    if _, err := url.Parse(config.Config.Object.ApiURL); err != nil {
        return err
    }
    ...
    apiURL += "object/"
    

    这部分代码获取API URL的配置,检查其有效性,并对其进行必要的格式调整。

  4. 初始化Redis缓存:

    rdb, err := cache.NewRedis()
    

    这行代码初始化Redis缓存。如果初始化失败,函数将返回错误。

  5. 配置对象存储服务(OSS):

    var o s3.Interface
    switch config.Config.Object.Enable {
        case "minio":
            ...
        case "cos":
            ...
        case "oss":
            ...
        default:
            ...
    }
    

    根据配置,这段代码选择并初始化相应的对象存储服务,例如Minio、COS或OSS。

  6. 注册RPC服务和初始化数据库:

    third.RegisterThirdServer(server, &thirdServer{
        ...
    })
    

    最后,这个thirdServer实例被注册为一个RPC服务。这个实例集成了API URL、数据库、缓存以及用户RPC客户端。

Friend:

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
	// Initialize MongoDB
	mongo, err := unrelation.NewMongo()
	if err != nil {
		return err
	}

	// Initialize Redis
	rdb, err := cache.NewRedis()
	if err != nil {
		return err
	}

	friendMongoDB, err := mgo.NewFriendMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}

	friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}

	blackMongoDB, err := mgo.NewBlackMongo(mongo.GetDatabase())
	if err != nil {
		return err
	}

	// Initialize RPC clients
	userRpcClient := rpcclient.NewUserRpcClient(client)
	msgRpcClient := rpcclient.NewMessageRpcClient(client)

	// Initialize notification sender
	notificationSender := notification.NewFriendNotificationSender(
		&msgRpcClient,
		notification.WithRpcFunc(userRpcClient.GetUsersInfo),
	)
	tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
	if err != nil {
		return err
	}
	// Register Friend server with refactored MongoDB and Redis integrations
	pbfriend.RegisterFriendServer(server, &friendServer{
		friendDatabase: controller.NewFriendDatabase(
			friendMongoDB,
			friendRequestMongoDB,
			cache.NewFriendCacheRedis(rdb, friendMongoDB, cache.GetDefaultOpt()),
			tx,
		),
		blackDatabase: controller.NewBlackDatabase(
			blackMongoDB,
			cache.NewBlackCacheRedis(rdb, blackMongoDB, cache.GetDefaultOpt()),
		),
		userRpcClient:         &userRpcClient,
		notificationSender:    notificationSender,
		RegisterCenter:        client,
		conversationRpcClient: rpcclient.NewConversationRpcClient(client),
	})

	return nil
}
  1. 初始化MongoDB和Redis:
    • 这部分代码初始化MongoDB和Redis连接。MongoDB用于存储数据,而Redis可能用于缓存以提高性能。
  2. 创建不同类型的MongoDB实例:
    • friendMongoDB, friendRequestMongoDB, blackMongoDB分别初始化了针对朋友关系、朋友请求和黑名单的MongoDB实例。
  3. 初始化RPC客户端:
    • userRpcClientmsgRpcClient 分别初始化了用户和消息的RPC客户端,这表明Friend服务需要与用户服务和消息服务进行通信。
  4. 初始化通知发送器:
    • notificationSender 初始化了一个朋友通知发送器,它可能用于在朋友关系的各种操作(如添加朋友、朋友请求等)时发送通知。
  5. 初始化事务处理器:
    • tx 表示一个自动事务处理器,它可能用于处理涉及多个数据库操作的复杂事务。
  6. 注册Friend服务:
    • pbfriend.RegisterFriendServer 这行代码将friendServer实例注册为一个gRPC服务。这个服务包括朋友数据库、黑名单数据库、RPC客户端以及通知发送器。
  7. 服务功能:
    • 根据这段代码,Friend服务可能负责管理用户间的朋友关系,处理朋友请求,维护黑名单,以及处理与这些功能相关的通知和通信。

总体来看,Friend服务似乎是一个专门处理用户社交关系(如朋友关系、朋友请求和黑名单)的后端服务。它通过RPC客户端与其他服务(如用户服务和消息服务)交互,并利用MongoDB和Redis进行数据存储和缓存管理。这样的服务对于社交网络、通讯应用或任何涉及用户间交互的平台来说都是非常重要的。

继续解读:

// NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry.
func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry) UserRpcClient {
	return UserRpcClient(*NewUser(client))
}

// UserRpcClient represents the structure for a User RPC client.
type UserRpcClient User

// NewUser initializes and returns a User instance based on the provided service discovery registry.
func NewUser(discov discoveryregistry.SvcDiscoveryRegistry) *User {
	conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImUserName)
	if err != nil {
		panic(err)
	}
	client := user.NewUserClient(conn)
	return &User{Discov: discov, Client: client, conn: conn}
}

type User struct {
	conn   grpc.ClientConnInterface
	Client user.UserClient
	Discov discoveryregistry.SvcDiscoveryRegistry
}
  1. NewUserRpcClient函数:
    • 这个函数接受一个服务发现注册表对象(discoveryregistry.SvcDiscoveryRegistry)。
    • 它调用NewUser函数来创建一个User实例,并将其转换为UserRpcClient类型。
    • 这种设计模式表明,UserRpcClientUser的包装或别名,可能用于提供更具体的或附加的功能。
  2. UserRpcClient类型:
    • UserRpcClient被定义为User类型的别名。
    • 这意味着它继承了User类型的所有字段和方法,但也可以定义额外的方法或覆盖现有的方法。
  3. NewUser函数:
    • 这个函数同样接受服务发现注册表对象。
    • 它使用这个注册表来获取与用户服务的连接(通过discov.GetConn方法)。
    • 如果获取连接过程中出现错误,函数会触发panic,这表明无法连接至用户服务是一个严重的错误情况。
    • 一旦连接被建立,它使用这个连接来创建一个gRPC客户端(user.NewUserClient)。
  4. User结构体:
    • User结构体包含三个字段:conn(gRPC连接),Client(用户服务的gRPC客户端),和Discov(服务发现注册表)。
    • 这个结构体封装了与用户服务通信所需的所有元素。

s3 模块学习

func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHashs []string) (*UploadResult, error) {
	defer log.ZDebug(ctx, "return")
	upload, err := parseMultipartUploadID(uploadID)
	if err != nil {
		return nil, err
	}
	if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash {
		return nil, errors.New("md5 mismatching")
	}
	if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
		return &UploadResult{
			Key:  info.Key,
			Size: info.Size,
			Hash: info.ETag,
		}, nil
	} else if !c.IsNotFound(err) {
		return nil, err
	}
	cleanObject := make(map[string]struct{})
	defer func() {
		for key := range cleanObject {
			_ = c.impl.DeleteObject(ctx, key)
		}
	}()
	var targetKey string
	switch upload.Type {
	case UploadTypeMultipart:
		parts := make([]s3.Part, len(partHashs))
		for i, part := range partHashs {
			parts[i] = s3.Part{
				PartNumber: i + 1,
				ETag:       part,
			}
		}
		// todo: 验证大小
		result, err := c.impl.CompleteMultipartUpload(ctx, upload.ID, upload.Key, parts)
		if err != nil {
			return nil, err
		}
		targetKey = result.Key
	case UploadTypePresigned:
		uploadInfo, err := c.StatObject(ctx, upload.Key)
		if err != nil {
			return nil, err
		}
		cleanObject[uploadInfo.Key] = struct{}{}
		if uploadInfo.Size != upload.Size {
			return nil, errors.New("upload size mismatching")
		}
		md5Sum := md5.Sum([]byte(strings.Join([]string{uploadInfo.ETag}, partSeparator)))
		if md5val := hex.EncodeToString(md5Sum[:]); md5val != upload.Hash {
			return nil, errs.ErrArgs.Wrap(fmt.Sprintf("md5 mismatching %s != %s", md5val, upload.Hash))
		}
		// 防止在这个时候,并发操作,导致文件被覆盖
		copyInfo, err := c.impl.CopyObject(ctx, uploadInfo.Key, upload.Key+"."+c.UUID())
		if err != nil {
			return nil, err
		}
		cleanObject[copyInfo.Key] = struct{}{}
		if copyInfo.ETag != uploadInfo.ETag {
			return nil, errors.New("[concurrency]copy md5 mismatching")
		}
		hashCopyInfo, err := c.impl.CopyObject(ctx, copyInfo.Key, c.HashPath(upload.Hash))
		if err != nil {
			return nil, err
		}
		log.ZInfo(ctx, "hashCopyInfo", "value", fmt.Sprintf("%+v", hashCopyInfo))
		targetKey = hashCopyInfo.Key
	default:
		return nil, errors.New("invalid upload id type")
	}
	if err := c.cache.DelS3Key(c.impl.Engine(), targetKey).ExecDel(ctx); err != nil {
		return nil, err
	}
	return &UploadResult{
		Key:  targetKey,
		Size: upload.Size,
		Hash: upload.Hash,
	}, nil
}

第一部分:路由定义

objectGroup.POST("/complete_multipart_upload", t.CompleteMultipartUpload)
  • 这一行定义了一个HTTP POST路由,当客户端向/complete_multipart_upload发送POST请求时,会调用CompleteMultipartUpload方法。

第二部分:处理请求

func (o *ThirdApi) CompleteMultipartUpload(c *gin.Context) {
    a2r.Call(third.ThirdClient.CompleteMultipartUpload, o.Client, c)
}
  • 这个函数是一个路由处理器,它使用gin.Context来处理HTTP请求。它调用a2r.Call方法,将请求转发给实际处理函数。

第三部分:业务逻辑

func (t *thirdServer) CompleteMultipartUpload(ctx context.Context, req *third.CompleteMultipartUploadReq) (*third.CompleteMultipartUploadResp, error) {
    // ...
}
  • 这是核心的业务逻辑函数。它首先进行一些检查,然后调用s3DatabaseCompleteMultipartUpload方法来完成上传。接着,它创建一个对象模型并保存到数据库。最后返回一个响应。

第四部分:完成上传

func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) {
    return s.s3.CompleteUpload(ctx, uploadID, parts)
}
  • 这个函数实际上只是一个简单的封装,调用另一个CompleteUpload函数来完成上传。

第五部分:完成上传的实现

func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHashs []string) (*UploadResult, error) {
    // ...
}
  • 这是一个更复杂的函数,包含实际完成上传的逻辑。它处理多部分上传的不同场景(如多部分上传、预签名上传等),验证MD5值,处理文件复制和移动,最后返回上传结果。

关键点

  1. 错误处理:在每个函数中,错误情况都被仔细处理,并返回相应的错误信息。
  2. MD5验证:代码在多个地方验证上传部分的MD5哈希值,确保数据的一致性和完整性。
  3. 并发处理:在处理上传的最后阶段,代码考虑了并发操作的情况,确保数据的一致性。

整体而言,这是一段处理文件上传的后端服务代码,特别关注于多部分上传的完成和验证。代码结构清晰,有适当的错误处理和日志记录,符合现代后端开发的标准。

复制对象存储

继续开始看看复制对象存储是如何做的:

func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
	result, err := o.bucket.CopyObject(src, dst)
	if err != nil {
		return nil, err
	}
	return &s3.CopyObjectInfo{
		Key:  dst,
		ETag: strings.ToLower(strings.ReplaceAll(result.ETag, `"`, ``)),
	}, nil
}

函数定义

func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
  • o *OSS: 这个函数是OSS结构体的一个方法,其中OSS可能是对某个对象存储服务的封装。
  • ctx context.Context: 传入的上下文,用于控制函数的执行和取消。
  • src string: 源对象的路径或标识符。
  • dst string: 目标对象的路径或标识符。
  • 返回值:返回一个指向s3.CopyObjectInfo的指针和一个错误。如果复制成功,错误将是nil,否则会包含错误信息。

函数体

result, err := o.bucket.CopyObject(src, dst)
  • 这一行调用o.bucket.CopyObject方法来实际执行复制操作,其中src是源对象,dst是目标对象。
  • 如果复制过程中发生错误,err将被赋值,并且该函数将返回错误。
return &s3.CopyObjectInfo{
	Key:  dst,
	ETag: strings.ToLower(strings.ReplaceAll(result.ETag, `"`, ``)),
}, nil
  • 如果复制成功,函数返回一个新的s3.CopyObjectInfo实例。这个实例包含目标对象的键(Key)和经过处理的ETag。
  • ETag是一种标记,通常用于验证对象的完整性。这里的ETag被转换为小写,并且去除了所有的双引号。

Minio

func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
	if err := m.initMinio(ctx); err != nil {
		return nil, err
	}
	result, err := m.core.Client.CopyObject(ctx, minio.CopyDestOptions{
		Bucket: m.bucket,
		Object: dst,
	}, minio.CopySrcOptions{
		Bucket: m.bucket,
		Object: src,
	})
	if err != nil {
		return nil, err
	}
	return &s3.CopyObjectInfo{
		Key:  dst,
		ETag: strings.ToLower(result.ETag),
	}, nil
}

函数定义

func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
  • m *Minio: 这是Minio结构体的一个方法。Minio结构体可能代表了对Minio服务的封装。
  • ctx context.Context: 传入的上下文,用于控制函数的执行和取消。
  • src string: 源对象的路径或标识符。
  • dst string: 目标对象的路径或标识符。
  • 返回值:返回一个指向s3.CopyObjectInfo的指针和一个错误。如果复制成功,错误将是nil,否则会包含错误信息。

函数体

if err := m.initMinio(ctx); err != nil {
	return nil, err
}
  • 这一行调用m.initMinio方法来初始化Minio客户端。如果初始化过程中出现错误,将返回错误。
result, err := m.core.Client.CopyObject(ctx, minio.CopyDestOptions{
	Bucket: m.bucket,
	Object: dst,
}, minio.CopySrcOptions{
	Bucket: m.bucket,
	Object: src,
})
  • 这里实际执行复制操作,使用CopyObject方法。方法接受目标和源的选项,包括存储桶名称(Bucket)和对象名称(Object)。
  • 如果复制过程中发生错误,err将被赋值,并且该函数将返回错误。
return &s3.CopyObjectInfo{
	Key:  dst,
	ETag: strings.ToLower(result.ETag),
}, nil
  • 如果复制成功,函数返回一个新的s3.CopyObjectInfo实例。这个实例包含目标对象的键(Key)和ETag(用于验证对象的完整性)。
  • ETag被转换为小写。