【问题标题】:Why am I getting an unexpected EOF from this API call?为什么我会从此 API 调用中收到意外的 EOF?
【发布时间】:2018-12-12 16:29:49
【问题描述】:

我有以下作为 AWS Lambda cron 运行的 Go 代码,但我不确定为什么会出现此错误:

sls logs --stage prod --region eu-west-1 --function esCronFn
2018/12/12 12:07:01 unexpected EOF
2018/12/12 12:07:01 unexpected EOF
END RequestId: 6bf33d28-fe03-11e8-949d-f39174c57cab
REPORT RequestId: 6bf33d28-fe03-11e8-949d-f39174c57cab  Duration: 464734.47 ms  Billed Duration: 464800 ms      Memory Size: 256 MB     Max Memory Used: 257 MB

RequestId: 6bf33d28-fe03-11e8-949d-f39174c57cab Process exited before completing request

这是我的 main.go - 它基本上连接到外部 API 并提取我正在处理的记录并上传到 S3 存储桶。

主包

import (
    "bytes"
    "encoding/csv"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "os"
    "strings"
    "time"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
    "github.com/aws/aws-sdk-go/service/s3/s3iface"
)

var (
    // ENDPOINT is the endpoint from which incomplete CSV records are downloaded
    ENDPOINT = os.Getenv("ENDPOINT")

    PARSED_ENDPOINT *url.URL

    // TOKEN authenticates requests sent to eshot API
    TOKEN = os.Getenv("TOKEN")

    // BUCKET is the S3 bucket to which CSV files are uploaded
    BUCKET = os.Getenv("BUCKET")

    svc s3iface.S3API
)

// Record is the JSON response returned by a successful request to API
type EsRecord struct {
    Salutation   string    `json:"Salutation"`
    Firstname    string    `json:"Firstname"`
    Lastname     string    `json:"Lastname"`
    Company      string    `json:"Company"`
    EEA          string    `json:"EEA"`
    ModifiedDate time.Time `json:"ModifiedDate"`
    SubaccountID string    `json:"SubaccountId"`
    Email        string    `json:"Email"`
}

// CsvData holds reference to underlying buffer and the csv writer
type CsvData struct {
    Buffer *bytes.Buffer
    Writer *csv.Writer
}

func init() {
    today := time.Now()

    // If ENDPOINT is empty, It'll use this hardcoded endpoint. The ENDPOINT variable should not contain any text after "ModifiedDate gt". The actual date is currentDay-1
    if ENDPOINT == "" {
        ENDPOINT = "https://rest-api.domain.tld/Export/?$select=Email,Firstname,Lastname,SubaccountId,EEA,ModifiedDate&$filter=(EEA eq '' or EEA eq null) and ModifiedDate gt"
    }

    // Append CurrentDay-1 in YYYY-MM-DDTHH:MM:SSZ format.
    // The time is NOT in UTC. It's the local time of the machine on which lambda function was running
    ENDPOINT = fmt.Sprintf("%s %sT00:00:00Z", ENDPOINT, today.AddDate(0, 0, -1).Format("2006-01-02"))

    var err error
    PARSED_ENDPOINT, err = url.Parse(ENDPOINT)
    if err != nil {
        log.Fatalln("Invalid $ENDPOINT", err)
    }

    PARSED_ENDPOINT.RawQuery = QueryEscape(PARSED_ENDPOINT.RawQuery)
}

func main() {
    if TOKEN == "" {
        log.Fatalln("$TOKEN is empty")
    }
    if BUCKET == "" {
        log.Fatalln("$BUCKET is empty")
    }
    // Create S3 session
    svc = s3iface.S3API(s3.New(session.Must(session.NewSession())))

    lambda.Start(CreateAndUploadCsvToS3)
}

func CreateAndUploadCsvToS3() error {

    resp, err := fetchRecords()
    if err != nil {
        return fmt.Errorf("error in fetching records: %s", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        b, _ := ioutil.ReadAll(resp.Body)
        return fmt.Errorf("api returned non 200 response(%d), URL: %s, %s", resp.StatusCode, PARSED_ENDPOINT.String(), string(b))
    }

    // API returns array of EshotRecord
    esRecords := []EsRecord{}

    err = json.NewDecoder(resp.Body).Decode(&esRecords)
    if err != nil {
        b, _ := ioutil.ReadAll(resp.Body)
        return fmt.Errorf("error in parsing response %s: %s", err, string(b))
    }

    recordsMap := ParseEsRecordsJSON(esRecords)

    ct := time.Now().String()
    for k, v := range recordsMap {

        key := fmt.Sprintf("%s_%s.csv", k, ct)

        _, err := svc.PutObject(&s3.PutObjectInput{
            Bucket: aws.String(BUCKET),
            // Key is in format, <subaccountid>_<current timestamp>.csv
            Key:  aws.String(key),
            Body: bytes.NewReader(v.Buffer.Bytes()),
        })
        if err != nil {
            return fmt.Errorf("error in uploading %s: %s", key, err)
        }
    }

    return nil
}

// ParseEsRecordsJSON takes an array of EsRecord
// Seperates each record by subAccountId
// Creates CSV files for each SubAccountId
// Returns the hashmap
func ParseEsRecordsJSON(esRecords []EsRecord) map[string]CsvData {
    recordsMap := make(map[string]CsvData)

    for _, v := range esRecords {
        // If v.SubaccountID was encountered for the first time
        // 1. Create a Buffer
        // 2. Write CSV headers to this buffer
        // 3. Store reference to this buffer and csv writer in hashmap
        if _, ok := recordsMap[v.SubaccountID]; !ok {
            var buf bytes.Buffer

            writer := csv.NewWriter(&buf)
            // Write CSV headers
            err := writer.Write([]string{"Firstname", "Lastname", "Email"})
            if err != nil {
                log.Printf("error occurred in inserting headers for subAccountId(%s): %s\n", v.SubaccountID, err)
            }

            // store reference to writer object for this subaccountid in hashmap
            recordsMap[v.SubaccountID] = CsvData{
                Buffer: &buf,
                Writer: writer,
            }
        }
        csvRef := recordsMap[v.SubaccountID]

        err := csvRef.Writer.Write([]string{v.Firstname, v.Lastname, v.Email})
        if err != nil {
            log.Printf("error occurred in inserting headers for subAccountId(%s): %s\n", v.SubaccountID, err)
        }
        csvRef.Writer.Flush()
    }
    return recordsMap
}

// FetchRecords makes a request to API and returns http.Response
func fetchRecords() (*http.Response, error) {
    req, err := http.NewRequest("GET", PARSED_ENDPOINT.String(), nil)
    if err != nil {
        return nil, err
    }

    req.Header.Set("Authorization", fmt.Sprintf("Token %s", TOKEN))
    client := &http.Client{}
    return client.Do(req)
}

// QueryEscape replaces URL unsafe characters as listed in HTTP RFC with their HEX values.
// The QueryEscape function in Go strictly adheres to the RFC and replaces all the characters listed in RFC with their HEX values.
// Curl/Postman only encodes parameters on a strict "need" only bases. Presumably, the endpoint does not seems to be working with Go's encoded string.
// This code escapes all the charactes and then performs uses string replace to make the URL more like what CURL would have done.
func QueryEscape(s string) string {
    s = url.QueryEscape(s)

    s = strings.Replace(s, "%2C", ",", -1)
    s = strings.Replace(s, "%24", "$", -1)
    s = strings.Replace(s, "%3D", "=", -1)
    s = strings.Replace(s, "+", "%20", -1)
    s = strings.Replace(s, "%26", "&", -1)
    s = strings.Replace(s, "%3A", ":", -1)

    return s
}

如果我将 ENDPOINT 更改为:

ENDPOINT = "https://rest-api.domain.tld/Export/?$select=Email,Firstname,Lastname,SubaccountId,EEA,ModifiedDate&$filter=(EEA eq '' or EEA eq null) and ModifiedDate gt"

ENDPOINT = "https://rest-api.domain.tld/Export/?$select=Email,Firstname,Lastname,SubaccountId,EEA,ModifiedDate&$filter=EEA eq '' and ModifiedDate gt"

我没有收到 EOF 错误,但是我没有得到完整的列表,运行 curl,我得到了我需要的数据,所以我不确定我的代码为什么失败以及如何最好地跟踪哪里失败了吗?

【问题讨论】:

  • 与根本问题无关,但您可能想阅读 Effective Go 并考虑对您的代码运行 linter。这些 VARIABLE_NAMES 看起来更像 Bash 而不是 Go,而且相当分散注意力,尤其是您在此处发布的代码如此之多。
  • 我只是按照 aws lambda 约定 docs.aws.amazon.com/lambda/latest/dg/… 来命名我的变量
  • 这些约定是针对环境变量的,这些是环境变量的完全正常的名称,是 Go 变量的完全不惯用的名称。当您读取环境变量并将其存储在 Go 变量中时,没有理由为您存储其值的 Go 变量使用环境变量命名约定。

标签: go


【解决方案1】:

问题是我的 lambda 函数只有 128Mb 内存,而端点提供了 130Mb 的文件,所以增加这个解决了这个问题

【讨论】:

  • 我的 Lambda 函数的最大内存为 3008 Mb,我只使用了 1300 mb,但我仍然收到 curl: (56) Unexpected EOF,我还将超时设置为 15 分钟,我的请求在 6 分钟内完成。
  • 你能在本地运行吗?否则添加一些打印语句,看看你的函数是否中断
猜你喜欢
  • 2018-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-01-06
  • 1970-01-01
  • 1970-01-01
  • 2019-05-14
  • 2020-02-24
相关资源
最近更新 更多