Rafael Sá
03/15/2022, 6:03 PMChris Reuter
03/15/2022, 6:04 PMKevin Kho
@task
def abc():
return [1,2,3]
with Flow(..) as flow:
x = abc()
len(x)
you can’t do this because the tasks defer execution to flow.run()
or when the flow is run with an agent so x
is still of type Task
here. Try only having tasks inside the Flow block so the len()
operation should be in a taskRafael Sá
03/15/2022, 6:28 PMdestino_insumos,caminho_destino_planilhas_fluxos,destino_bank1,destino_bank2 =configurar_caminho_diretorios(ano_referencia,mes_referencia,mes_bbank1,mes_bank2)
all those variables are normally assigned to the correct type.Kevin Kho
destino_insumos.attribute
inside the Flow because it’s not that type yet.Kevin Kho
.shape
Kevin Kho
destino_insumos,caminho_destino_planilhas_fluxos,destino_bank1,destino_bank2
are all of type Task while the Flow is constructedRafael Sá
03/15/2022, 6:46 PMKevin Kho
Rafael Sá
03/15/2022, 6:54 PMKevin Kho
Rafael Sá
03/15/2022, 6:58 PMKevin Kho
Rafael Sá
03/15/2022, 7:07 PMfrom sharepoint4 import *
import os
from datetime import datetime
import calendar
from datetime import timedelta
import locale
from shutil import copy
from dateutil.relativedelta import relativedelta
import openpyxl
import prefect
from prefect.run_configs import LocalRun
from prefect import task,Flow
@task(nout=2)
def obter_lista_origem_destino(meu_site,SP_DOC_LIBRARY,filtro,split_string,base_path_destino):
#configura o endereço de origem do arquivo e o destino caso deseje baixar o arquivo
origem = ""
destino = ""
#instancia objeto da classe customizada sharepoint
meu_sharepoint = Sharepoint(meu_site,origem,destino)
#constroi a lista de arquivos desejados a serem copiados
extratos = meu_sharepoint.listar_arquivos(SP_DOC_LIBRARY,filtro)
#configura o caminho do destino dos arquivos que irão do sharepoint para o servidor local considerando o caminho de origem
destino_fluxo = [destino.split(split_string)[1] for destino in extratos]
#adiciona o drive para a gravação do arquivo
servidor = [(base_path_destino+(destino)) for destino in destino_fluxo]
return extratos,servidor
#início do código principal(definição de variáveis globais)
with Flow("CRIA DIRETORIOS") as flow:
#define o site do sharepoint no qual se autenticar para acessar a biblioteca de documentos
meu_site = "<https://xxx/sites/yyy>"
#define a biblioteca de documentos a ser pesquisada
SP_DOC_LIBRARY ="Documentos"
arquivos_extrato,destino_servidor = obter_lista_origem_destino(meu_site,SP_DOC_LIBRARY,filtro_dir,split_string,base_path_destino)
for i in range(0,len(arquivos_extrato)):
copiar_insumo(arquivos_extrato[i],destino_servidor[i])
Kevin Kho