Avi Haiat
04/18/2021, 4:44 PMrange(0,count,LIMIT)
pseudo code:
@task
def count_collection_mongo(collectionName):
# calculate number of rows in the collection and return it
return count
@task
def load_data(collectionName, skip, limit):
# data = db.collectionName.find().skip(skip).limit(limit)
return data
@task
def transform(data):
print(data.get('name'))
return data
@task
def calculate_iterations_data(count_result):
res = range(0, count_result, LIMIT)
return res
with Flow("ETL flow for mycollection") as flow:
result_count = count_collection_mongo("mycollection")
iterations = calculate_iterations_data(result_count)
data = load_data.map(iterations)
transform.map(flatten(data))
flow.run()
Avi Haiat
04/18/2021, 4:45 PMVarun Joshi
04/19/2021, 3:56 AMfrom google.cloud import datacatalog_v1
from google.cloud import bigquery
client = bigquery.Client()
datacatalog_client = datacatalog_v1.DataCatalogClient()
I get an error saying that:
Clients have non-trivial state that is local and unpickleable
Can someone tell me what does this mean? I'm assuming this has something to do with GCP.
I'm using a local service account json file and also importing pubsub modules, I didn't have any issue then.Rob Fowler
04/19/2021, 11:46 AMfrom time import sleep
import prefect
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.storage import Docker
@task
def slow_task(item, sleep_time=4):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"==== IN TASK item {item} sleeping {sleep_time}")
sleep(sleep_time)
<http://logger.info|logger.info>(f"## Awake {item}")
return item
@prefect.task
def produce_range(rangelen):
return range(rangelen)
with Flow("parallel") as flow:
nrange = produce_range(6)
numbers = slow_task.map(item=nrange)
flow.executor = LocalDaskExecutor(workers=10, scheduler="processes")
#flow.executor = DaskExecutor()
if 'local_script_path' in prefect.context:
flow.storage = Docker(registry_url='<http://old7.mianos.com:5000|old7.mianos.com:5000>')
if __name__ == '__main__':
flow.run()
Would start 10 processes inside the docker container running, returning after a few seconds but it executes each task serially unless I run it without any arguments to LocalDaskExecutor.
Under k8s DaskExecutor seems to start a few threads but nowhere near enough to what I want (30+).
I feel I am missing something.Milly gupta
04/19/2021, 1:24 PMJack Sundberg
04/19/2021, 3:37 PMJonathan Chu
04/20/2021, 12:13 AMxyzz
04/20/2021, 8:45 AMxyzz
04/20/2021, 8:57 AMRob Fowler
04/20/2021, 9:01 AMDotan Asselmann
04/20/2021, 1:37 PMPedro Machado
04/20/2021, 2:45 PMCreateContainer
, StartContainer
, etc.).
However, it seems that the logs would need to be retrieved after the container finishes running with GetContainerLogs
. Is this correct? If so, this is less than ideal for our use case given that these are long-running processes we'd want to see the logs in real-time.
So far, I've thought about a couple of alternatives:
1. Modify the Docker Tasks to use the logs
method with stream=True
(I haven't tested this yet, but the docs suggest this could work)
2. Add prefect to their docker image and create a flow that runs inside of the image
Do you see another option? What would you recommend? Thanks!Kelby
04/20/2021, 3:22 PMKayvan Shah
04/20/2021, 6:29 PMMitchell Bregman
04/20/2021, 7:48 PMservice_account_name
, but it seems the pods are still using the default service account to the namespace.. my agent is also deployed with an undefined service_account_name
.. any thoughts?
flow.run_configs = KubernetesRun(
image_pull_secrets=["xxx"],
job_template=config.KUBE_JOB_TEMPLATE,
cpu_limit=config.KUBE_CPU_LIMIT,
cpu_request=config.KUBE_CPU_REQUEST,
memory_limit=config.KUBE_MEMORY_LIMIT,
memory_request=config.KUBE_MEMORY_REQUEST,
service_account_name="my-service-account",
)
Gage Toschlog
04/20/2021, 9:14 PM@task
def run_DBT_task():
task = DbtShellTask(
profile_name='snowflake',
environment='sandbox',
profiles_dir='.',
stream_output=True,
return_all=True
)
task.run(command="cd ~/src/prefect && dbt run")
with Flow(name="Local Test") as flow:
run_DBT_task()
flowID = flow.register(project_name="51", labels=["localtest"])
client.create_flow_run(flow_id = flowID)
Aurélien Vallée
04/21/2021, 10:16 AM@task
def mytask():
logger = prefect.context.get('logger')
<http://logger.info|logger.info>('hello')
and
logger = logging.getLogger(__name__)
@task
def mytask():
<http://logger.info|logger.info>('hello')
i.e. What is the point of using a prefect-provided logger instead of regular python logging loggers?Aurélien Vallée
04/21/2021, 10:17 AMAurélien Vallée
04/21/2021, 10:18 AMRob Fowler
04/21/2021, 10:32 AMciaran
04/21/2021, 11:55 AMJérémy Trudel
04/21/2021, 1:34 PMMarc Lipoff
04/21/2021, 2:49 PMprefect build -p /home/circleci/project/data_flows/flows/vaccine_cdc.py
(log in thread)Kayvan Shah
04/21/2021, 4:35 PMSveta Kochkaeva
04/21/2021, 4:52 PMMark McDonald
04/21/2021, 5:18 PMCharles Liu
04/21/2021, 6:25 PMKayvan Shah
04/21/2021, 6:39 PM~/.prefect/config.toml
in Windows 10Charles Liu
04/21/2021, 9:55 PMdata = results.write(df, **prefect.context)
print(data)
Ismail Cenik
04/22/2021, 12:49 AM