OpenIM Mongo 替换 Mysql
目标
完成 openim-rpc-friend
的 Mysql -> Mongo 的替换。
有些接口实现了,没有调用就可以删了
阅读
type conversationServer struct {
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase
conversationNotificationSender *notification.ConversationNotificationSender
}
groupRpcClient
: 这个字段是指向rpcclient.GroupRpcClient
类型的指针。在Go中,指针是用于存储另一个变量的内存地址的变量。rpcclient.GroupRpcClient
很可能是一个用于RPC(远程过程调用)通信的客户端类型。它可能被用来与远程服务或服务器组进行交互。conversationDatabase
: 这个字段的类型是controller.ConversationDatabase
。这表明conversationDatabase
是用来处理与会话数据库相关的操作的。这可能包括查询、更新、删除会话数据等。conversationNotificationSender
: 这是指向notification.ConversationNotificationSender
类型的指针。这个字段很可能被用来发送与会话相关的通知。例如,在会话中添加新消息时,可能需要通知相关用户。notification
: 这个字段并不直接出现在结构体定义中,但从conversationNotificationSender
字段可以推测,notification
可能是一个包含了与通知发送相关功能的包(package)。
start 初始化和启动与会话(如聊天应用中的会话)相关的服务。
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
// db, err := relation.NewGormDB(): 初始化一个GORM数据库实例。GORM是一个流行的Go ORM库,用于与SQL数据库交互。
db, err := relation.NewGormDB()
if err != nil {
return err
}
// 使用GORM的自动迁移功能来创建或更新会话模型的数据库表
if err := db.AutoMigrate(&tablerelation.ConversationModel{}); err != nil {
return err
}
// 初始化一个Redis缓存实例。Redis通常用于快速缓存数据。
rdb, err := cache.NewRedis()
if err != nil {
return err
}
// 初始化一个MongoDB数据库实例。
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
// 使用MongoDB创建一个专门处理会话数据的数据库实例。
conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase())
if err != nil {
return err
}
// 初始化一个用于群组操作的RPC客户端。
groupRpcClient := rpcclient.NewGroupRpcClient(client)
// 初始化一个用于消息操作的RPC客户端。
msgRpcClient := rpcclient.NewMessageRpcClient(client)
// 在gRPC服务器上注册一个新的会话服务,传入之前初始化的各种组件。
pbconversation.RegisterConversationServer(server, &conversationServer{
conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient),
groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)),
})
return nil
}
详细介绍 GORM 的 AutoMigrate
GORM 的 AutoMigrate
方法是一个在Go语言中使用GORM库时非常有用的功能,主要用于数据库迁移。以下是对这个方法的详细解析:
功能
- 自动迁移:
AutoMigrate
会自动根据你的Go结构体(通常用于表示数据库中的表)来创建或修改数据库表。这意味着它可以根据结构体的定义来创建新的表,或者根据结构体的修改来调整现有的表(如添加、删除或修改字段)。
工作原理
与结构体同步:
AutoMigrate
接收一个或多个结构体作为参数,并根据这些结构体的定义在数据库中创建或更新表。例如,如果你有一个如下所示的结构体:type User struct { gorm.Model Name string Age int }
AutoMigrate(&User{})
会在数据库中创建一个名为users
的表(GORM默认使用结构体名称的复数形式作为表名),其中包含name
和age
字段,以及GORM模型中的默认字段(如ID
,CreatedAt
,UpdatedAt
,DeletedAt
)。安全性:
AutoMigrate
通常只会添加缺失的字段、创建缺失的索引或外键,并不会删除或修改现有字段。这意味着它是一个相对安全的操作,不会导致数据丢失。
使用场景
- 应用程序启动时:通常在应用程序启动时调用
AutoMigrate
,以确保数据库结构与应用程序的最新结构体定义保持同步。 - 开发过程中:在开发过程中,当你修改了结构体的定义后,可以使用
AutoMigrate
来快速更新数据库结构,而不需要手动进行数据库迁移。
注意事项
- 不是数据库版本控制工具:虽然
AutoMigrate
很方便,但它不应该被视为一个完整的数据库版本控制解决方案。对于更复杂的数据库迁移和版本控制,可能需要使用专门的迁移工具,如golang-migrate/migrate
。 - 谨慎使用:在生产环境中使用
AutoMigrate
时应格外小心,因为错误的使用可能会导致数据丢失或表结构的意外变化。
初始化 mongo
// NewMongo Initialize MongoDB connection.
func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.Uri != "" {
uri = config.Config.Mongo.Uri
} else {
mongodbHosts := ""
for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 {
mongodbHosts += v
} else {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
}
fmt.Println("mongo:", uri)
var mongoClient *mongo.Client
var err error = nil
for i := 0; i <= maxRetry; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err == nil {
return &Mongo{db: mongoClient}, nil
}
if cmdErr, ok := err.(mongo.CommandError); ok {
if cmdErr.Code == 13 || cmdErr.Code == 18 {
return nil, err
} else {
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
}
}
}
return nil, err
}
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
: 这行代码将MongoDB的ErrNoDocuments
错误替换为自定义错误ErrRecordNotFound
。这通常用于统一错误处理。在 URL 构建中
- 函数首先定义了一个默认的MongoDB URI。
- 如果
config.Config.Mongo.Uri
非空,则使用这个作为URI。 - 如果为空,则根据配置 (
config.Config.Mongo.Address
等) 构建URI。如果提供了用户名和密码,则在URI中包含这些凭据;否则,不包含。
数据库尝试链接
- 使用
for
循环和重试逻辑(由maxRetry
控制)来尝试连接数据库。 context.WithTimeout(context.Background(), time.Second*10)
: 创建一个最多10秒的超时上下文,确保连接尝试不会无限期等待。mongo.Connect(ctx, options.Client().ApplyURI(uri))
: 尝试使用上面构建的URI连接MongoDB。- 如果连接成功,返回一个封装了
mongoClient
的Mongo
实例。
- 使用
解析下面的PopulateGroupMember
:
func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*relation.GroupMemberModel) error {
if len(members) == 0 {
return nil
}
emptyUserIDs := make(map[string]struct{})
for _, member := range members {
if member.Nickname == "" || member.FaceURL == "" {
emptyUserIDs[member.UserID] = struct{}{}
}
}
if len(emptyUserIDs) > 0 {
users, err := g.getUsersInfo(ctx, utils.Keys(emptyUserIDs))
if err != nil {
return err
}
userMap := make(map[string]CommonUser)
for i, user := range users {
userMap[user.GetUserID()] = users[i]
}
for i, member := range members {
user, ok := userMap[member.UserID]
if !ok {
continue
}
if member.Nickname == "" {
members[i].Nickname = user.GetNickname()
}
if member.FaceURL == "" {
members[i].FaceURL = user.GetFaceURL()
}
}
}
return nil
}
其作用是在一个群组通知发送器(GroupNotificationSender
)中填充群组成员的信息。代码的主要逻辑如下:
- 方法定义:
PopulateGroupMember
是GroupNotificationSender
类型的一个方法,它接受一个context.Context
和多个*relation.GroupMemberModel
类型的参数。 - 检查成员列表是否为空:首先检查传入的群组成员(
members
)是否为空。如果没有成员,则直接返回,不执行任何操作。 - 查找空信息的用户:使用一个循环遍历所有成员。如果成员的昵称(
Nickname
)或头像URL(FaceURL
)为空,则将其用户ID(UserID
)添加到一个名为emptyUserIDs
的map中。这个map用于存储需要更新信息的用户ID。 - 获取用户信息:如果
emptyUserIDs
中有数据,即存在需要更新信息的用户,就调用g.getUsersInfo
方法,获取这些用户的详细信息。这个方法需要上下文(ctx
)和用户ID列表。 - 错误处理:如果在获取用户信息过程中出现错误,该方法会返回相应的错误。
- 更新成员信息:使用获取到的用户信息更新群组成员的信息。首先,创建一个新的map(
userMap
),用于将用户ID映射到用户信息。然后,遍历每个成员,如果在userMap
中找到对应的用户信息,并且该成员的昵称或头像URL为空,则用获取到的用户信息进行更新。 - 返回:最后,如果整个过程顺利,该方法返回
nil
,表示没有错误发生。
主要的变化部分
所有的 interface{}
都变成了 any
conversationDB := relation.NewConversationGorm(db)
替换为:
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase())
if err != nil {
return err
}
User 中:
以前的逻辑:
userDB := relation.NewUserGorm(db)
更新为:
userDB, err := newmgo.NewUserMongo(mongo.GetDatabase())
if err != nil {
return err
}
以前的逻辑:
user := convert.UserPb2DB(req.UserInfo)
if err != nil {
return nil, err
}
err = s.Update(ctx, user)
if err != nil {
return nil, err
}
更新为:
data := convert.UserPb2DBMap(req.UserInfo)
if len(data) == 0 {
return nil, errs.ErrArgs.Wrap("no data to update")
}
if err := s.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err
}
其中 UserPb2DB
函数:
func UserPb2DB(user *sdkws.UserInfo) *relationtb.UserModel {
var userDB relationtb.UserModel
userDB.UserID = user.UserID
userDB.Nickname = user.Nickname
userDB.FaceURL = user.FaceURL
userDB.Ex = user.Ex
userDB.CreateTime = time.UnixMilli(user.CreateTime)
userDB.AppMangerLevel = user.AppMangerLevel
userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt
return &userDB
}
其中 UserPb2DBMap
函数的逻辑:
func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
if user == nil {
return nil
}
val := make(map[string]any)
if user.Nickname != "" {
val["nickname"] = user.Nickname
}
if user.FaceURL != "" {
val["face_url"] = user.FaceURL
}
if user.Ex != "" {
val["ex"] = user.FaceURL
}
if user.AppMangerLevel != 0 {
val["app_manger_level"] = user.AppMangerLevel
}
if user.GlobalRecvMsgOpt != 0 {
val["global_recv_msg_opt"] = user.GlobalRecvMsgOpt
}
return val
}
最初,使用 relation.NewUserGorm(db)
建立到数据库的基于VMM的连接。
convert.UserPb2DB
函数将用户信息从协议缓冲区(protobuf)格式转换为与IBM兼容的数据库模型。
更新后的用户数据随后被传递给服务的 Update
方法。
代码现在使用 newmgo.NewUserMongo(mongo.GetDatabase())
建立到MongoDB的连接。这表明从关系数据库方法(RIMM)到NoSQL方法(MongoDB)的转变。
更新效率:使用 UserPb2DBMap
的新逻辑允许部分更新,因为只有更改的字段才包含在映射中。这比关系模型更有效,因为关系模型可能需要更新整行。
以前 Mysql 的逻辑
我正在阅读和学习源码:
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
if err != nil {
return err
}
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
users := make([]*tablerelation.UserModel, 0)
if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
}
for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
}
userDB, err := newmgo.NewUserMongo(mongo.GetDatabase())
if err != nil {
return err
}
tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
if err != nil {
return err
}
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
database := controller.NewUserDatabase(userDB, cache, tx, userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
u := &userServer{
UserDatabase: database,
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
}
pbuser.RegisterUserServer(server, u)
return u.UserDatabase.InitOnce(context.Background(), users)
}
帮我讲解这段源码的实现,步骤,以及设计实现,以及可以改进的思路
微服务或服务端应用程序的启动函数。它初始化各种服务和数据库连接,然后将服务注册到一个gRPC服务器。
代码分析
- 初始化Redis缓存:
cache.NewRedis()
创建一个新的Redis缓存实例。 - 初始化MongoDB连接:
unrelation.NewMongo()
用于创建和配置MongoDB连接。 - 检查用户ID和昵称配置: 代码确保配置中的用户ID数量与昵称数量相等,如果不相等,则返回错误。
- 构造用户模型: 遍历配置中的用户ID和昵称,创建用户模型数组。
- 创建用户数据库实例:
newmgo.NewUserMongo(mongo.GetDatabase())
用于初始化用户数据库操作的MongoDB实现。 - 创建事务处理器:
tx2.NewAuto(context.Background(), mongo.GetClient())
用于MongoDB的事务处理。 - 创建缓存实例: 结合Redis缓存和用户数据库,创建用户缓存。
- 创建用户MongoDB驱动:
unrelation.NewUserMongoDriver(mongo.GetDatabase())
用于操作用户数据。 - 初始化控制器:
controller.NewUserDatabase(userDB, cache, tx, userMongoDB)
创建一个控制器,集成了用户数据库、缓存、事务处理器和MongoDB驱动。 - 初始化RPC客户端: 创建用于与好友、群组和消息服务通信的RPC客户端。
- 设置用户服务器: 创建一个用户服务器实例,并注册到gRPC服务器。
- 初始化用户数据库: 使用初始用户模型数据初始化数据库。
设计实现
- 模块化设计: 每个功能(如数据库访问、缓存、RPC通信)都被分解为独立的模块,有助于代码的维护和扩展。
- 错误处理: 在每个重要步骤之后都有错误检查,确保在出现问题时能够及时返回错误。
- 配置验证: 检查配置的有效性,例如用户ID和昵称的数量匹配。
改进思路
- 配置和错误处理: 可以使用更先进的配置管理和错误处理库来简化代码和增强可读性。
- 依赖注入: 使用依赖注入框架可以更好地管理对象的创建和依赖关系,减少代码耦合。
- 日志记录: 在关键步骤中添加更详细的日志记录,有助于调试和跟踪问题。
- 代码重构: 某些代码块(例如RPC客户端初始化)可以进一步抽象和封装,以减少重复代码。
- 异步处理: 考虑在适当的地方引入异步处理,以提高性能和响应速度。
- 单元测试: 加强单元测试,确保代码的可靠性和未来的可维护性。
- 配置外部化: 将配置文件外部化,而不是硬编码在代码中,以增强灵活性和可配置性。
创建数据库事务
func NewAuto(ctx context.Context, cli *mongo.Client) (tx.CtxTx, error) {
var res map[string]any
if err := cli.Database("admin").RunCommand(ctx, bson.M{"isMaster": 1}).Decode(&res); err != nil {
return nil, err
}
if _, ok := res["setName"]; ok {
return NewMongoTx(cli), nil
}
return NewInvalidTx(), nil
}
具体针对MongoDB。让我们逐步分析这个函数:
NewAuto
分析
函数 - 函数签名:
NewAuto(ctx context.Context, cli *mongo.Client) (tx.CtxTx, error)
: 这个函数接收一个上下文(context.Context
)和一个MongoDB客户端(*mongo.Client
),返回一个事务处理器接口(tx.CtxTx
)和一个错误(error
)。
- 运行MongoDB命令:
cli.Database("admin").RunCommand(ctx, bson.M{"isMaster": 1}).Decode(&res)
: 这行代码向MongoDB的admin数据库发送一个命令(isMaster: 1
),来检查当前连接的MongoDB节点是否是主节点(master)。这个命令的响应被解码到变量res
中。
- 检查响应:
if _, ok := res["setName"]; ok { ... }
: 检查响应中是否包含键setName
。setName
存在表明当前MongoDB实例是一个复制集的一部分。
- 返回适当的事务处理器:
- 如果是复制集,使用
NewMongoTx(cli)
返回一个适合MongoDB复制集的事务处理器。 - 如果不是复制集,使用
NewInvalidTx()
返回一个无效的事务处理器。
- 如果是复制集,使用
设计意图
- 动态检测: 通过检查MongoDB的配置(是否是复制集),动态地确定使用哪种类型的事务处理器。
- 灵活性和兼容性: 代码可以处理不同的MongoDB部署配置(单节点或复制集),提高了通用性。
可能的改进
- 更详细的错误处理: 当
RunCommand
失败时,除了返回错误外,也可以记录更多关于错误的上下文信息,以便更好地理解错误发生的原因。 - 配置检查优化: 如果这个函数经常被调用,频繁地运行
isMaster
命令可能会影响性能。可以考虑缓存这个信息或者在初始化时确定这个设置。 - 增加日志记录: 在关键步骤(如命令执行、判断逻辑)添加日志记录,有助于调试和跟踪问题。
- 异常情况处理: 在无法确定事务处理器类型时,除了返回一个无效的事务处理器外,还可以考虑提供一种默认的处理策略或者更明确的错误提示。
启动 RPC
fnc (a *RpcCmd) StartSvr(name string, rpcFn func(discov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error,) error {
if a.GetPortFlag() == 0 {
return errors.New("port is required")
}
return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn)
}
StartSvr
分析
函数 - 函数签名:
func (a *RpcCmd) StartSvr(name string, rpcFn func(discoveryregistry.SvcDiscoveryRegistry, *grpc.Server) error) error
: 这个函数是RpcCmd
结构体的一个方法。它接收一个服务名称name
和一个函数rpcFn
,这个函数自身接收一个服务发现注册表和一个gRPC服务器,并返回一个错误。StartSvr
函数也返回一个错误。
- 检查端口:
if a.GetPortFlag() == 0 { return errors.New("port is required") }
: 这行代码检查是否已经指定了端口号(通过GetPortFlag()
方法)。如果没有指定端口号(即端口为0),则返回一个错误,表明端口是必需的。
- 启动RPC服务:
return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn)
: 如果端口号存在,该行代码使用提供的端口号、服务名称、Prometheus端口(用于监控)和RPC函数来启动RPC服务。
设计意图
- 参数验证: 通过检查端口号,确保在启动服务前所有必需的配置都已设置。
- 模块化: 将RPC服务的启动逻辑封装在一个单独的函数中,有助于代码的重用和维护。
- 灵活性: 通过将RPC启动逻辑作为一个函数参数传递,这个方法可以启动不同类型的RPC服务。
Mongo 中的事务
在一次业务实践中需要在 MongoDB 中使用自增 ID,而 MongoDB 本身并不支持自增 ID。我们需要通过一个单独的集合保存 ID,使用 FindOneAndUpdate
和 $inc
操作符实现 ID 的自增.
然而此时需要操作两个集合,因 MongoDB 的原子性只是针对单文档的,故会出现 ID 增加而插入失败的情况。
好在 MongoDB 在 4.0 中,支持了副本集上的多文档事务,在版本 4.2 中,引入了分布式事务,这增加了对分片群集上的多文档事务的支持,并合并了对副本集上多文档事务的现有支持。
//开启一个会话
sess = db.getMongo().startSession()
//开启事务
sess.startTransaction()
//回滚
sess.abortTransaction()
//提交
sess.commitTransaction()
//结束会话
sess.endSession()
重构后的 Mongo 的逻辑
参考 user 模块 mysql 重构为 mongo 代码:
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
db, err := relation.NewGormDB()
if err != nil {
return err
}
rdb, err := cache.NewRedis()
if err != nil {
return err
}
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil {
return err
}
users := make([]*tablerelation.UserModel, 0)
if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
}
for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
}
userDB := relation.NewUserGorm(db)
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db), userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
u := &userServer{
UserDatabase: database,
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
}
pbuser.RegisterUserServer(server, u)
return u.UserDatabase.InitOnce(context.Background(), users)
}
下面是 user重构后的代码:
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
db, err := relation.NewGormDB()
if err != nil {
return err
}
rdb, err := cache.NewRedis()
if err != nil {
return err
}
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil {
return err
}
users := make([]*tablerelation.UserModel, 0)
if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
}
for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
}
userDB, err := newmgo.NewUserMongo(mongo.GetDatabase())
if err != nil {
return err
}
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
database := controller.NewUserDatabase(userDB, cache, tx.NewMongo(mongo.GetClient()), userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
u := &userServer{
UserDatabase: database,
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
}
pbuser.RegisterUserServer(server, u)
return u.UserDatabase.InitOnce(context.Background(), users)
}
帮我模仿重构 friend 代码:
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
db, err := relation.NewGormDB()
if err != nil {
return err
}
if err := db.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil {
return err
}
rdb, err := cache.NewRedis()
if err != nil {
return err
}
blackDB := relation.NewBlackGorm(db)
friendDB := relation.NewFriendGorm(db)
userRpcClient := rpcclient.NewUserRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
notificationSender := notification.NewFriendNotificationSender(
&msgRpcClient,
notification.WithRpcFunc(userRpcClient.GetUsersInfo),
)
pbfriend.RegisterFriendServer(server, &friendServer{
friendDatabase: controller.NewFriendDatabase(
friendDB,
relation.NewFriendRequestGorm(db),
cache.NewFriendCacheRedis(rdb, friendDB, cache.GetDefaultOpt()),
tx.NewGorm(db),
),
blackDatabase: controller.NewBlackDatabase(
blackDB,
cache.NewBlackCacheRedis(rdb, blackDB, cache.GetDefaultOpt()),
),
userRpcClient: &userRpcClient,
notificationSender: notificationSender,
RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client),
})
return nil
}
给出改进后的代码
速读一遍
func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer)
rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil {
panic(err.Error())
}
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil {
panic(err.Error())
}
}
创建RPC命令对象:
rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer)
这行代码调用了
cmd
包中的NewRpcCmd
函数,创建了一个RPC命令对象。传递给这个函数的参数cmd.RpcUserServer
可能是一个配置或者常量,用于指定RPC服务的类型或配置。添加端口标志:
rpcCmd.AddPortFlag()
这行代码对
rpcCmd
对象调用AddPortFlag
方法。这个方法可能用于添加或配置与RPC服务监听端口相关的命令行参数。添加普罗米修斯端口标志:
rpcCmd.AddPrometheusPortFlag()
类似于上面的端口标志添加,这行代码可能是为了配置Prometheus(一个开源监控系统)端口的相关参数。
执行RPC命令:
if err := rpcCmd.Exec(); err != nil { panic(err.Error()) }
在这里,
Exec
方法被调用以执行RPC命令。如果执行中出现错误,程序会触发panic,打印错误信息并终止。启动RPC服务:
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil { panic(err.Error()) }
最后,
StartSvr
方法被调用来启动RPC服务。这个方法接受两个参数:一是配置中的RPC注册名(可能是服务名称),二是启动服务时调用的函数(在这个例子中是user.Start
)。
继续来看启动函数:
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
if err != nil {
return err
}
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
users := make([]*tablerelation.UserModel, 0)
if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
}
for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
}
userDB, err := mgo.NewUserMongo(mongo.GetDatabase())
if err != nil {
return err
}
tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
if err != nil {
return err
}
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
database := controller.NewUserDatabase(userDB, cache, tx, userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
u := &userServer{
UserDatabase: database,
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
}
pbuser.RegisterUserServer(server, u)
return u.UserDatabase.InitOnce(context.Background(), users)
}
初始化缓存和数据库连接:
coderdb, err := cache.NewRedis() mongo, err := unrelation.NewMongo()
这两行代码分别初始化Redis缓存和Mongo数据库的连接。如果其中任何一个初始化过程出现错误,函数将返回该错误。
配置管理员用户:
users := make([]*tablerelation.UserModel, 0) ... for k, v := range config.Config.Manager.UserID { ... }
这段代码创建了一个用户模型的切片,并基于配置中的管理员用户ID和昵称填充这个切片。它还检查ID和昵称数量是否匹配,不匹配则返回错误。
创建数据库和事务实例:
userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
这里初始化了用户数据库的Mongo实例和一个自动事务处理器。
创建缓存和数据库访问层:
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()) userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase()) database := controller.NewUserDatabase(userDB, cache, tx, userMongoDB)
这部分代码设置了用户缓存,并创建了一个数据库访问层,它结合了Redis缓存、Mongo数据库和事务处理器。
初始化RPC客户端:
friendRpcClient := rpcclient.NewFriendRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client)
这里创建了几个RPC客户端,用于与其他服务(如朋友服务、群组服务和消息服务)进行通信。
配置用户服务:
u := &userServer{ ... }
在这一步,创建了一个
userServer
实例,它封装了上面创建的所有组件和服务。注册RPC服务和初始化数据库:
pbuser.RegisterUserServer(server, u) return u.UserDatabase.InitOnce(context.Background(), users)
最后,这个
userServer
实例被注册为一个RPC服务,并且调用InitOnce
方法来初始化用户数据库。
Third
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
logdb, err := mgo.NewLogMongo(mongo.GetDatabase())
if err != nil {
return err
}
s3db, err := mgo.NewS3Mongo(mongo.GetDatabase())
if err != nil {
return err
}
apiURL := config.Config.Object.ApiURL
if apiURL == "" {
return fmt.Errorf("api url is empty")
}
if _, err := url.Parse(config.Config.Object.ApiURL); err != nil {
return err
}
if apiURL[len(apiURL)-1] != '/' {
apiURL += "/"
}
apiURL += "object/"
rdb, err := cache.NewRedis()
if err != nil {
return err
}
// 根据配置文件策略选择 oss 方式
enable := config.Config.Object.Enable
var o s3.Interface
switch config.Config.Object.Enable {
case "minio":
o, err = minio.NewMinio(cache.NewMinioCache(rdb))
case "cos":
o, err = cos.NewCos()
case "oss":
o, err = oss.NewOSS()
default:
err = fmt.Errorf("invalid object enable: %s", enable)
}
if err != nil {
return err
}
third.RegisterThirdServer(server, &thirdServer{
apiURL: apiURL,
thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb), logdb),
userRpcClient: rpcclient.NewUserRpcClient(client),
s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7,
})
return nil
}
初始化Mongo数据库连接:
mongo, err := unrelation.NewMongo()
这一行初始化Mongo数据库的连接。如果出现错误,则函数返回该错误。
创建日志和S3数据库实例:
logdb, err := mgo.NewLogMongo(mongo.GetDatabase()) s3db, err := mgo.NewS3Mongo(mongo.GetDatabase())
这两行代码分别初始化日志数据库和S3数据库的实例。
配置和验证API URL:
apiURL := config.Config.Object.ApiURL ... if _, err := url.Parse(config.Config.Object.ApiURL); err != nil { return err } ... apiURL += "object/"
这部分代码获取API URL的配置,检查其有效性,并对其进行必要的格式调整。
初始化Redis缓存:
rdb, err := cache.NewRedis()
这行代码初始化Redis缓存。如果初始化失败,函数将返回错误。
配置对象存储服务(OSS):
var o s3.Interface switch config.Config.Object.Enable { case "minio": ... case "cos": ... case "oss": ... default: ... }
根据配置,这段代码选择并初始化相应的对象存储服务,例如Minio、COS或OSS。
注册RPC服务和初始化数据库:
third.RegisterThirdServer(server, &thirdServer{ ... })
最后,这个
thirdServer
实例被注册为一个RPC服务。这个实例集成了API URL、数据库、缓存以及用户RPC客户端。
Friend:
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
// Initialize MongoDB
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
// Initialize Redis
rdb, err := cache.NewRedis()
if err != nil {
return err
}
friendMongoDB, err := mgo.NewFriendMongo(mongo.GetDatabase())
if err != nil {
return err
}
friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mongo.GetDatabase())
if err != nil {
return err
}
blackMongoDB, err := mgo.NewBlackMongo(mongo.GetDatabase())
if err != nil {
return err
}
// Initialize RPC clients
userRpcClient := rpcclient.NewUserRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
// Initialize notification sender
notificationSender := notification.NewFriendNotificationSender(
&msgRpcClient,
notification.WithRpcFunc(userRpcClient.GetUsersInfo),
)
tx, err := tx2.NewAuto(context.Background(), mongo.GetClient())
if err != nil {
return err
}
// Register Friend server with refactored MongoDB and Redis integrations
pbfriend.RegisterFriendServer(server, &friendServer{
friendDatabase: controller.NewFriendDatabase(
friendMongoDB,
friendRequestMongoDB,
cache.NewFriendCacheRedis(rdb, friendMongoDB, cache.GetDefaultOpt()),
tx,
),
blackDatabase: controller.NewBlackDatabase(
blackMongoDB,
cache.NewBlackCacheRedis(rdb, blackMongoDB, cache.GetDefaultOpt()),
),
userRpcClient: &userRpcClient,
notificationSender: notificationSender,
RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client),
})
return nil
}
- 初始化MongoDB和Redis:
- 这部分代码初始化MongoDB和Redis连接。MongoDB用于存储数据,而Redis可能用于缓存以提高性能。
- 创建不同类型的MongoDB实例:
friendMongoDB
,friendRequestMongoDB
,blackMongoDB
分别初始化了针对朋友关系、朋友请求和黑名单的MongoDB实例。
- 初始化RPC客户端:
userRpcClient
和msgRpcClient
分别初始化了用户和消息的RPC客户端,这表明Friend
服务需要与用户服务和消息服务进行通信。
- 初始化通知发送器:
notificationSender
初始化了一个朋友通知发送器,它可能用于在朋友关系的各种操作(如添加朋友、朋友请求等)时发送通知。
- 初始化事务处理器:
tx
表示一个自动事务处理器,它可能用于处理涉及多个数据库操作的复杂事务。
- 注册Friend服务:
pbfriend.RegisterFriendServer
这行代码将friendServer
实例注册为一个gRPC服务。这个服务包括朋友数据库、黑名单数据库、RPC客户端以及通知发送器。
- 服务功能:
- 根据这段代码,
Friend
服务可能负责管理用户间的朋友关系,处理朋友请求,维护黑名单,以及处理与这些功能相关的通知和通信。
- 根据这段代码,
总体来看,Friend
服务似乎是一个专门处理用户社交关系(如朋友关系、朋友请求和黑名单)的后端服务。它通过RPC客户端与其他服务(如用户服务和消息服务)交互,并利用MongoDB和Redis进行数据存储和缓存管理。这样的服务对于社交网络、通讯应用或任何涉及用户间交互的平台来说都是非常重要的。
继续解读:
// NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry.
func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry) UserRpcClient {
return UserRpcClient(*NewUser(client))
}
// UserRpcClient represents the structure for a User RPC client.
type UserRpcClient User
// NewUser initializes and returns a User instance based on the provided service discovery registry.
func NewUser(discov discoveryregistry.SvcDiscoveryRegistry) *User {
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImUserName)
if err != nil {
panic(err)
}
client := user.NewUserClient(conn)
return &User{Discov: discov, Client: client, conn: conn}
}
type User struct {
conn grpc.ClientConnInterface
Client user.UserClient
Discov discoveryregistry.SvcDiscoveryRegistry
}
NewUserRpcClient
函数:- 这个函数接受一个服务发现注册表对象(
discoveryregistry.SvcDiscoveryRegistry
)。 - 它调用
NewUser
函数来创建一个User
实例,并将其转换为UserRpcClient
类型。 - 这种设计模式表明,
UserRpcClient
是User
的包装或别名,可能用于提供更具体的或附加的功能。
- 这个函数接受一个服务发现注册表对象(
UserRpcClient
类型:UserRpcClient
被定义为User
类型的别名。- 这意味着它继承了
User
类型的所有字段和方法,但也可以定义额外的方法或覆盖现有的方法。
NewUser
函数:- 这个函数同样接受服务发现注册表对象。
- 它使用这个注册表来获取与用户服务的连接(通过
discov.GetConn
方法)。 - 如果获取连接过程中出现错误,函数会触发panic,这表明无法连接至用户服务是一个严重的错误情况。
- 一旦连接被建立,它使用这个连接来创建一个gRPC客户端(
user.NewUserClient
)。
User
结构体:User
结构体包含三个字段:conn
(gRPC连接),Client
(用户服务的gRPC客户端),和Discov
(服务发现注册表)。- 这个结构体封装了与用户服务通信所需的所有元素。
s3 模块学习
func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHashs []string) (*UploadResult, error) {
defer log.ZDebug(ctx, "return")
upload, err := parseMultipartUploadID(uploadID)
if err != nil {
return nil, err
}
if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash {
return nil, errors.New("md5 mismatching")
}
if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
return &UploadResult{
Key: info.Key,
Size: info.Size,
Hash: info.ETag,
}, nil
} else if !c.IsNotFound(err) {
return nil, err
}
cleanObject := make(map[string]struct{})
defer func() {
for key := range cleanObject {
_ = c.impl.DeleteObject(ctx, key)
}
}()
var targetKey string
switch upload.Type {
case UploadTypeMultipart:
parts := make([]s3.Part, len(partHashs))
for i, part := range partHashs {
parts[i] = s3.Part{
PartNumber: i + 1,
ETag: part,
}
}
// todo: 验证大小
result, err := c.impl.CompleteMultipartUpload(ctx, upload.ID, upload.Key, parts)
if err != nil {
return nil, err
}
targetKey = result.Key
case UploadTypePresigned:
uploadInfo, err := c.StatObject(ctx, upload.Key)
if err != nil {
return nil, err
}
cleanObject[uploadInfo.Key] = struct{}{}
if uploadInfo.Size != upload.Size {
return nil, errors.New("upload size mismatching")
}
md5Sum := md5.Sum([]byte(strings.Join([]string{uploadInfo.ETag}, partSeparator)))
if md5val := hex.EncodeToString(md5Sum[:]); md5val != upload.Hash {
return nil, errs.ErrArgs.Wrap(fmt.Sprintf("md5 mismatching %s != %s", md5val, upload.Hash))
}
// 防止在这个时候,并发操作,导致文件被覆盖
copyInfo, err := c.impl.CopyObject(ctx, uploadInfo.Key, upload.Key+"."+c.UUID())
if err != nil {
return nil, err
}
cleanObject[copyInfo.Key] = struct{}{}
if copyInfo.ETag != uploadInfo.ETag {
return nil, errors.New("[concurrency]copy md5 mismatching")
}
hashCopyInfo, err := c.impl.CopyObject(ctx, copyInfo.Key, c.HashPath(upload.Hash))
if err != nil {
return nil, err
}
log.ZInfo(ctx, "hashCopyInfo", "value", fmt.Sprintf("%+v", hashCopyInfo))
targetKey = hashCopyInfo.Key
default:
return nil, errors.New("invalid upload id type")
}
if err := c.cache.DelS3Key(c.impl.Engine(), targetKey).ExecDel(ctx); err != nil {
return nil, err
}
return &UploadResult{
Key: targetKey,
Size: upload.Size,
Hash: upload.Hash,
}, nil
}
第一部分:路由定义
objectGroup.POST("/complete_multipart_upload", t.CompleteMultipartUpload)
- 这一行定义了一个HTTP POST路由,当客户端向
/complete_multipart_upload
发送POST请求时,会调用CompleteMultipartUpload
方法。
第二部分:处理请求
func (o *ThirdApi) CompleteMultipartUpload(c *gin.Context) {
a2r.Call(third.ThirdClient.CompleteMultipartUpload, o.Client, c)
}
- 这个函数是一个路由处理器,它使用
gin.Context
来处理HTTP请求。它调用a2r.Call
方法,将请求转发给实际处理函数。
第三部分:业务逻辑
func (t *thirdServer) CompleteMultipartUpload(ctx context.Context, req *third.CompleteMultipartUploadReq) (*third.CompleteMultipartUploadResp, error) {
// ...
}
- 这是核心的业务逻辑函数。它首先进行一些检查,然后调用
s3Database
的CompleteMultipartUpload
方法来完成上传。接着,它创建一个对象模型并保存到数据库。最后返回一个响应。
第四部分:完成上传
func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) {
return s.s3.CompleteUpload(ctx, uploadID, parts)
}
- 这个函数实际上只是一个简单的封装,调用另一个
CompleteUpload
函数来完成上传。
第五部分:完成上传的实现
func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHashs []string) (*UploadResult, error) {
// ...
}
- 这是一个更复杂的函数,包含实际完成上传的逻辑。它处理多部分上传的不同场景(如多部分上传、预签名上传等),验证MD5值,处理文件复制和移动,最后返回上传结果。
关键点
- 错误处理:在每个函数中,错误情况都被仔细处理,并返回相应的错误信息。
- MD5验证:代码在多个地方验证上传部分的MD5哈希值,确保数据的一致性和完整性。
- 并发处理:在处理上传的最后阶段,代码考虑了并发操作的情况,确保数据的一致性。
整体而言,这是一段处理文件上传的后端服务代码,特别关注于多部分上传的完成和验证。代码结构清晰,有适当的错误处理和日志记录,符合现代后端开发的标准。
复制对象存储
继续开始看看复制对象存储是如何做的:
func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
result, err := o.bucket.CopyObject(src, dst)
if err != nil {
return nil, err
}
return &s3.CopyObjectInfo{
Key: dst,
ETag: strings.ToLower(strings.ReplaceAll(result.ETag, `"`, ``)),
}, nil
}
函数定义
func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
o *OSS
: 这个函数是OSS
结构体的一个方法,其中OSS
可能是对某个对象存储服务的封装。ctx context.Context
: 传入的上下文,用于控制函数的执行和取消。src string
: 源对象的路径或标识符。dst string
: 目标对象的路径或标识符。- 返回值:返回一个指向
s3.CopyObjectInfo
的指针和一个错误。如果复制成功,错误将是nil
,否则会包含错误信息。
函数体
result, err := o.bucket.CopyObject(src, dst)
- 这一行调用
o.bucket.CopyObject
方法来实际执行复制操作,其中src
是源对象,dst
是目标对象。 - 如果复制过程中发生错误,
err
将被赋值,并且该函数将返回错误。
return &s3.CopyObjectInfo{
Key: dst,
ETag: strings.ToLower(strings.ReplaceAll(result.ETag, `"`, ``)),
}, nil
- 如果复制成功,函数返回一个新的
s3.CopyObjectInfo
实例。这个实例包含目标对象的键(Key
)和经过处理的ETag。 ETag
是一种标记,通常用于验证对象的完整性。这里的ETag
被转换为小写,并且去除了所有的双引号。
Minio
func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
result, err := m.core.Client.CopyObject(ctx, minio.CopyDestOptions{
Bucket: m.bucket,
Object: dst,
}, minio.CopySrcOptions{
Bucket: m.bucket,
Object: src,
})
if err != nil {
return nil, err
}
return &s3.CopyObjectInfo{
Key: dst,
ETag: strings.ToLower(result.ETag),
}, nil
}
函数定义
func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
m *Minio
: 这是Minio
结构体的一个方法。Minio
结构体可能代表了对Minio服务的封装。ctx context.Context
: 传入的上下文,用于控制函数的执行和取消。src string
: 源对象的路径或标识符。dst string
: 目标对象的路径或标识符。- 返回值:返回一个指向
s3.CopyObjectInfo
的指针和一个错误。如果复制成功,错误将是nil
,否则会包含错误信息。
函数体
if err := m.initMinio(ctx); err != nil {
return nil, err
}
- 这一行调用
m.initMinio
方法来初始化Minio客户端。如果初始化过程中出现错误,将返回错误。
result, err := m.core.Client.CopyObject(ctx, minio.CopyDestOptions{
Bucket: m.bucket,
Object: dst,
}, minio.CopySrcOptions{
Bucket: m.bucket,
Object: src,
})
- 这里实际执行复制操作,使用
CopyObject
方法。方法接受目标和源的选项,包括存储桶名称(Bucket
)和对象名称(Object
)。 - 如果复制过程中发生错误,
err
将被赋值,并且该函数将返回错误。
return &s3.CopyObjectInfo{
Key: dst,
ETag: strings.ToLower(result.ETag),
}, nil
- 如果复制成功,函数返回一个新的
s3.CopyObjectInfo
实例。这个实例包含目标对象的键(Key
)和ETag(用于验证对象的完整性)。 ETag
被转换为小写。